blob: 4783440961d92c113faf8a5a7b3af04683156953 [file] [log] [blame]
// Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "posix_translation/tcp_socket.h"
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <string.h>
#include <algorithm>
#include "common/arc_strace.h"
#include "common/alog.h"
#include "posix_translation/socket_util.h"
#include "posix_translation/time_util.h"
#include "posix_translation/virtual_file_system.h"
#include "ppapi/c/pp_errors.h"
#include "ppapi/cpp/module.h"
#include "ppapi/cpp/net_address.h"
#include "ppapi/cpp/tcp_socket.h"
namespace posix_translation {
// Thin wrapper of pp::TCPSocket to manage the lifetime of pp::TCPSocket.
// Background: the problem is some blocking call (such as ::read()), and
// ::close() for this class may have race condition.
// Assuming ::read() is called on a thread and it is blocked, and ::close() is
// called on another thread.
// On current FileStream implementation, the final ::close() destructs the
// stream instance. So, when ::read() is unblocked after the ::close(), it is
// necessary to know if the socket is closed or not without touching the
// TCPSocket instance (otherwise it may cause use-after-free problem).
// This thin wrapper provides such a functionality.
//
// How to use:
//
// :
// // Keep the reference to SocketWrapper locally.
// scoped_refptr<SocketWrapper> wrapper(socket_);
// VirtualFileSystem* sys = VirtualFileSystem::GetVirtualFileSystem();
// const base::TimeTicks time_limit = internal::TimeOutToTimeLimit(timeout);
// bool is_timedout = false;
// while (!is_timedout && ... condition ...) {
// is_timedout = sys->WaitUntil(time_limit);
// // Check close state before accessing any member variables since this
// // instance might be destroyed while this thread was waiting.
// if (wrapper->is_closed()) {
// errno = EBADF;
// return -1;
// }
// }
// :
//
// Note: this is very close to what WeakPtr does. However, we cannot use
// WeakPtrFactory because the factory is bound to the thread where the first
// WeakPtr is created, and then it is prohibited to destruct it on another
// thread.
//
// This class will be touched from multi threads. To access is_closed() and
// Close(), the caller has responsibility to lock the filesystem-wise giant
// mutex in advance.
class TCPSocket::SocketWrapper
: public base::RefCountedThreadSafe<SocketWrapper> {
public:
// Takes the ownership of socket.
explicit SocketWrapper(const pp::TCPSocket& socket)
: socket_(socket), closed_(false) {
}
bool is_closed() const {
VirtualFileSystem::GetVirtualFileSystem()->mutex().AssertAcquired();
return closed_;
}
void Close() {
VirtualFileSystem::GetVirtualFileSystem()->mutex().AssertAcquired();
if (is_closed())
return;
closed_ = true;
socket_.Close();
}
pp::TCPSocket* socket() {
return &socket_;
}
private:
// Do not allow to destruct this class manually from the client code
// to avoid to delete the object accidentally while there are still
// references to it.
friend class base::RefCountedThreadSafe<SocketWrapper>;
~SocketWrapper() {
}
pp::TCPSocket socket_;
bool closed_;
DISALLOW_COPY_AND_ASSIGN(SocketWrapper);
};
TCPSocket::TCPSocket(int fd, int socket_family, int oflag)
: SocketStream(socket_family, oflag), fd_(fd), factory_(this),
socket_(new SocketWrapper(pp::TCPSocket(
VirtualFileSystem::GetVirtualFileSystem()->instance()))),
read_buf_(kBufSize), connect_state_(TCP_SOCKET_NEW), eof_(false),
read_sent_(false), write_sent_(false), connect_error_(0),
no_delay_(0) {
ALOG_ASSERT(socket_family == AF_INET || socket_family == AF_INET6);
}
TCPSocket::TCPSocket(const pp::TCPSocket& socket)
: SocketStream(kUnknownSocketFamily, O_RDWR), fd_(-1), factory_(this),
socket_(new SocketWrapper(socket)),
read_buf_(kBufSize), connect_state_(TCP_SOCKET_NEW), eof_(false),
read_sent_(false), write_sent_(false), connect_error_(0),
no_delay_(0) {
}
TCPSocket::~TCPSocket() {
if (!socket_->is_closed()) {
// Unlike UDPSocket, this happens when TCPSocket instance is created but
// discarded before it is registered to file system. For example, this
// happens on error case of accept().
CloseLocked();
}
}
void TCPSocket::MarkAsErrorLocked(int error) {
VirtualFileSystem* sys = VirtualFileSystem::GetVirtualFileSystem();
sys->mutex().AssertAcquired();
if (!IsTerminated()) {
if (connect_state_ == TCP_SOCKET_CONNECTING)
connect_error_ = error;
if (!is_block()) {
// getsockopt() does not seem to expose SO_ERROR for blocking sockets.
// This is likely because the main reason for SO_ERROR is to allow apps
// to query errors after a successful select() call, during which
// a non-blocking connect may have failed.
error_ = error;
}
connect_state_ = TCP_SOCKET_ERROR;
NotifyListeners();
}
}
int TCPSocket::bind(const sockaddr* addr, socklen_t addrlen) {
int error =
internal::VerifyInputSocketAddress(addr, addrlen, socket_family_);
if (error) {
errno = error;
return -1;
}
VirtualFileSystem* sys = VirtualFileSystem::GetVirtualFileSystem();
pp::NetAddress address =
internal::SockAddrToNetAddress(sys->instance(), addr);
ALOGI("TCPSocket::Bind: %s",
address.DescribeAsString(true).AsString().c_str());
scoped_refptr<SocketWrapper> wrapper(socket_);
int32_t result = PP_OK_COMPLETIONPENDING;
{
base::AutoUnlock unlock(sys->mutex());
result = wrapper->socket()->Bind(address, pp::BlockUntilComplete());
}
ARC_STRACE_REPORT_PP_ERROR(result);
// Check close state before accessing any member variables since this
// instance might be destroyed while this thread was waiting.
if (wrapper->is_closed()) {
errno = EBADF;
return -1;
}
if (result != PP_OK) {
if (result == PP_ERROR_ADDRESS_IN_USE) {
errno = EADDRINUSE;
} else {
errno = EINVAL;
}
return -1;
}
return 0;
}
int TCPSocket::listen(int backlog) {
if (connect_state_ != TCP_SOCKET_NEW) {
// This could happen, for example, when a user writes as follows:
// s = socket(AF_INET, SOCK_STREAM, 0);
// connect(s, ... something peer ...);
// listen(s, 5);
// There is no explicit documentation in the man page, but empirically
// under Linux, EINVAL is raised.
errno = EINVAL;
return -1;
}
connect_state_ = TCP_SOCKET_LISTENING;
scoped_refptr<SocketWrapper> wrapper(socket_);
int32_t result = PP_OK_COMPLETIONPENDING;
{
base::AutoUnlock unlock(VirtualFileSystem::GetVirtualFileSystem()->mutex());
result = wrapper->socket()->Listen(backlog, pp::BlockUntilComplete());
}
ARC_STRACE_REPORT_PP_ERROR(result);
// Check close state before accessing any member variables since this
// instance might be destroyed while this thread was waiting.
if (wrapper->is_closed()) {
errno = EBADF;
return -1;
}
if (result != PP_OK) {
if (result == PP_ERROR_NOSPACE)
errno = EOPNOTSUPP;
else
errno = EADDRINUSE;
MarkAsErrorLocked(errno);
return -1;
}
// The listen() has actually been started. So, start "accept" as a background
// task to support non-blocking ::accept().
pp::Module::Get()->core()->CallOnMainThread(
0, factory_.NewCallback(&TCPSocket::Accept));
return 0;
}
int TCPSocket::accept(sockaddr* addr, socklen_t* addrlen) {
// accept(2) allows NULL/NULL to be passed for sockaddr.
if (addr) {
int error = internal::VerifyOutputSocketAddress(addr, addrlen);
if (error) {
errno = error;
return -1;
}
}
if (connect_state_ != TCP_SOCKET_LISTENING) {
errno = EINVAL;
return -1;
}
VirtualFileSystem* sys = VirtualFileSystem::GetVirtualFileSystem();
if (is_block()) {
// Wait until some peer connects to the listening socket, or timed out.
const base::TimeTicks time_limit =
internal::TimeOutToTimeLimit(recv_timeout_);
bool is_timedout = false;
scoped_refptr<SocketWrapper> wrapper(socket_);
while (!is_timedout && accepted_socket_.is_null()) {
is_timedout = sys->WaitUntil(time_limit);
// Check close state before accessing any member variables since this
// instance might be destroyed while this thread was waiting.
if (wrapper->is_closed()) {
errno = EBADF;
return -1;
}
}
}
if (accepted_socket_.is_null()) {
errno = EAGAIN;
return -1;
}
pp::TCPSocket accepted_socket = accepted_socket_;
accepted_socket_ = pp::TCPSocket();
pp::Module::Get()->core()->CallOnMainThread(
0, factory_.NewCallback(&TCPSocket::Accept));
// Before creating TCPSocket instance, extract the address to check an error.
sockaddr_storage storage = {};
if (addr) {
if (!internal::NetAddressToSockAddrStorage(
accepted_socket.GetRemoteAddress(), AF_UNSPEC, false, &storage)) {
// According to man, there seems no appropriate error is defined for
// this case. So, use ENOBUF to let the client know that this is some
// internal error.
errno = ENOBUFS;
return -1;
}
}
scoped_refptr<TCPSocket> socket = new TCPSocket(accepted_socket);
int fd = sys->AddFileStreamLocked(socket);
if (fd < 0) {
errno = EMFILE;
return -1;
}
socket->fd_ = fd;
socket->connect_state_ = TCP_SOCKET_CONNECTED;
// Start reading on background.
socket->PostReadTaskLocked();
// Finally, copy the address data if necessary.
if (addr)
internal::CopySocketAddress(storage, addr, addrlen);
return fd;
}
int TCPSocket::connect(const sockaddr* serv_addr, socklen_t addrlen) {
int error =
internal::VerifyInputSocketAddress(serv_addr, addrlen, socket_family_);
if (error) {
errno = error;
return -1;
}
if (IsTerminated()) {
// TODO(crbug.com/358855): Allow new connect() calls after an unsuccessful
// connection attempt.
errno = EBADF;
return -1;
}
if (connect_state_ == TCP_SOCKET_CONNECTED ||
connect_state_ == TCP_SOCKET_LISTENING) {
errno = EISCONN;
return -1;
}
VirtualFileSystem* sys = VirtualFileSystem::GetVirtualFileSystem();
if (connect_state_ == TCP_SOCKET_NEW) {
pp::NetAddress address =
internal::SockAddrToNetAddress(sys->instance(), serv_addr);
ALOGI("TCPSocket::connect: %s",
address.DescribeAsString(true).AsString().c_str());
connect_state_ = TCP_SOCKET_CONNECTING;
pp::Module::Get()->core()->CallOnMainThread(
0, factory_.NewCallback(&TCPSocket::Connect, address));
if (!is_block()) {
errno = EINPROGRESS;
return -1;
}
} else {
ALOG_ASSERT(connect_state_ == TCP_SOCKET_CONNECTING);
if (!is_block()) {
errno = EALREADY;
return -1;
}
// Blocking connect should block, waiting for results of a pending connect.
}
scoped_refptr<SocketWrapper> wrapper(socket_);
while (connect_state_ == TCP_SOCKET_CONNECTING) {
sys->Wait();
// Check close state before accessing any member variables since this
// instance might be destroyed while this thread was waiting.
if (wrapper->is_closed()) {
errno = EBADF;
return -1;
}
}
if (connect_state_ == TCP_SOCKET_ERROR) {
errno = connect_error_;
return -1;
}
ALOG_ASSERT(connect_state_ == TCP_SOCKET_CONNECTED);
return 0;
}
off64_t TCPSocket::lseek(off64_t offset, int whence) {
errno = ESPIPE;
return -1;
}
ssize_t TCPSocket::read(void* buf, size_t count) {
return recv(buf, count, 0);
}
ssize_t TCPSocket::recv(void *buf, size_t len, int flags) {
// TODO(crbug.com/242604): Handle flags such as MSG_DONTWAIT
if (connect_state_ == TCP_SOCKET_NEW ||
connect_state_ == TCP_SOCKET_LISTENING) {
errno = ENOTCONN;
return -1;
}
if (is_block()) {
scoped_refptr<SocketWrapper> wrapper(socket_);
VirtualFileSystem* sys = VirtualFileSystem::GetVirtualFileSystem();
const base::TimeTicks time_limit =
internal::TimeOutToTimeLimit(recv_timeout_);
bool is_timedout = false;
while (!is_timedout && !IsSelectReadReady() && !IsTerminated()) {
is_timedout = sys->WaitUntil(time_limit);
// Check close state before accessing any member variables since this
// instance might be destroyed while this thread was waiting.
if (wrapper->is_closed()) {
errno = EBADF;
return -1;
}
}
} else if (connect_state_ == TCP_SOCKET_CONNECTING) {
// Non-blocking and still connecting.
errno = EAGAIN;
return -1;
}
size_t nread = std::min(len, in_buf_.size());
if (nread) {
std::copy(in_buf_.begin(), in_buf_.begin() + nread,
reinterpret_cast<char*>(buf));
if (!(flags & MSG_PEEK))
in_buf_.erase(in_buf_.begin(), in_buf_.begin() + nread);
PostReadTaskLocked();
return nread;
}
if (!is_connected() || eof_)
return 0;
errno = EAGAIN;
return -1;
}
ssize_t TCPSocket::recvfrom(void* buf, size_t len, int flags, sockaddr* addr,
socklen_t* addrlen) {
if (!addr && !addrlen)
return recv(buf, len, flags);
errno = EINVAL;
return -1;
}
int TCPSocket::recvmsg(struct msghdr* msg, int flags) {
if (!msg || !msg->msg_iov) {
errno = EINVAL;
return -1;
}
if (msg->msg_iovlen != 1) {
ALOGE("TCPSocket only supports trivial recvmsg with msg_iovlen of 1");
errno = EINVAL;
return -1;
}
if (msg->msg_controllen != 0) {
ALOGE("TCPSocket only supports trivial recvmsg with no control data");
errno = EINVAL;
return -1;
}
msg->msg_flags = 0;
return recv(msg->msg_iov[0].iov_base, msg->msg_iov[0].iov_len, flags);
}
ssize_t TCPSocket::write(const void* buf, size_t count) {
return send(buf, count, 0);
}
ssize_t TCPSocket::send(const void* buf, size_t len, int flags) {
// TODO(crbug.com/242604): Handle flags such as MSG_DONTWAIT
if (!is_connected()) {
errno = EPIPE;
return -1;
}
bool is_blocking = is_block();
if (is_blocking && out_buf_.size() >= kBufSize) {
scoped_refptr<SocketWrapper> wrapper(socket_);
VirtualFileSystem* sys = VirtualFileSystem::GetVirtualFileSystem();
const base::TimeTicks time_limit =
internal::TimeOutToTimeLimit(send_timeout_);
bool is_timedout = false;
while (!is_timedout && out_buf_.size() >= kBufSize && is_connected()) {
is_timedout = sys->WaitUntil(time_limit);
// Check close state before accessing any member variables since this
// instance might be destroyed while this thread was waiting.
if (wrapper->is_closed()) {
errno = EBADF;
return -1;
}
}
if (!is_connected()) {
errno = EIO;
return -1;
}
}
if (out_buf_.size() < kBufSize) {
out_buf_.insert(out_buf_.end(),
reinterpret_cast<const char*>(buf),
reinterpret_cast<const char*>(buf) + len);
if (!write_sent_) {
pp::Module::Get()->core()->CallOnMainThread(
0, factory_.NewCallback(&TCPSocket::Write));
}
return len;
}
ALOG_ASSERT(!is_blocking);
errno = EAGAIN;
return -1;
}
ssize_t TCPSocket::sendto(const void* buf, size_t len, int flags,
const sockaddr* dest_addr, socklen_t addrlen) {
if (!dest_addr && !addrlen)
return send(buf, len, flags);
errno = EINVAL;
return -1;
}
int TCPSocket::sendmsg(const struct msghdr* msg, int flags) {
if (!msg || !msg->msg_iov) {
errno = EINVAL;
return -1;
}
if (msg->msg_iovlen != 1) {
ALOGE("TCPSocket only supports trivial sendmsg with msg_iovlen of 1");
errno = EINVAL;
return -1;
}
if (msg->msg_controllen != 0) {
ALOGE("TCPSocket only supports trivial sendmsg with no control data");
errno = EINVAL;
return -1;
}
return send(msg->msg_iov[0].iov_base, msg->msg_iov[0].iov_len, flags);
}
int TCPSocket::ioctl(int request, va_list ap) {
if (request == FIONREAD) {
int* out = va_arg(ap, int*);
*out = in_buf_.size();
return 0;
}
return SocketStream::ioctl(request, ap);
}
bool TCPSocket::GetOptNameData(
int level, int optname, socklen_t* len, void** storage,
const void* user_data, socklen_t user_data_len) {
if (level == IPPROTO_TCP) {
switch (optname) {
case TCP_NODELAY:
*storage = &no_delay_;
*len = SIZEOF_AS_SOCKLEN(int); // NOLINT
ALOG_ASSERT(*len == sizeof(no_delay_));
return true;
}
}
return SocketStream::GetOptNameData(level, optname, len, storage, user_data,
user_data_len);
}
int TCPSocket::setsockopt(int level, int optname, const void* optval,
socklen_t optlen) {
if (level == SOL_IPV6 && optname == IPV6_V6ONLY) {
// Currently, IPV6_V6ONLY is not supported by pepper.
// This is just a work around until it is supported. The default value of
// IPV6_V6ONLY is 0 (false). Some applications try to set the 0
// explicitly and fail if it is not supported. So, here, we return
// 0 (success) only if *optval is 0.
// TODO(crbug.com/371334): Use pepper's IPV6_V6ONLY option when supported.
if (optlen < SIZEOF_AS_SOCKLEN(int) || // NOLINT(readability/casting)
*static_cast<const int*>(optval) != 0) {
errno = EINVAL;
return -1;
}
return 0;
}
int no_delay = no_delay_;
int result = SocketStream::setsockopt(level, optname, optval, optlen);
if (result != 0)
return result;
if (no_delay == no_delay_)
return 0;
scoped_refptr<SocketWrapper> wrapper(socket_);
int32_t pp_error = PP_OK_COMPLETIONPENDING;
{
base::AutoUnlock unlock(
VirtualFileSystem::GetVirtualFileSystem()->mutex());
pp_error = wrapper->socket()->SetOption(
PP_TCPSOCKET_OPTION_NO_DELAY, pp::Var(no_delay_ ? true : false),
pp::BlockUntilComplete());
}
ARC_STRACE_REPORT_PP_ERROR(pp_error);
// Check close state before accessing any member variables since this
// instance might be destroyed while this thread was waiting.
if (wrapper->is_closed()) {
errno = EBADF;
return -1;
}
if (pp_error != PP_OK) {
errno = ENOPROTOOPT; // TODO(crbug.com/358932): Pick correct errno.
return -1;
}
return 0;
}
int TCPSocket::getpeername(sockaddr* name, socklen_t* namelen) {
int error = internal::VerifyOutputSocketAddress(name, namelen);
if (error) {
errno = error;
return -1;
}
sockaddr_storage storage;
if (!internal::NetAddressToSockAddrStorage(
socket_->socket()->GetRemoteAddress(), AF_UNSPEC, false, &storage)) {
memset(&storage, 0, sizeof(storage));
storage.ss_family = socket_family_;
}
internal::CopySocketAddress(storage, name, namelen);
return 0;
}
int TCPSocket::getsockname(sockaddr* name, socklen_t* namelen) {
int error = internal::VerifyOutputSocketAddress(name, namelen);
if (error) {
errno = error;
return -1;
}
sockaddr_storage storage;
if (!internal::NetAddressToSockAddrStorage(
socket_->socket()->GetLocalAddress(), AF_UNSPEC, false, &storage)) {
memset(&storage, 0, sizeof(storage));
storage.ss_family = socket_family_;
}
internal::CopySocketAddress(storage, name, namelen);
return 0;
}
bool TCPSocket::IsSelectReadReady() const {
// Closed socket should return an error without blocking.
if (socket_->is_closed())
return true;
switch (connect_state_) {
case TCP_SOCKET_NEW:
// If the socket is neither connected nor listening, the socket is
// considered read_ready, as the read() should return error without
// blocking.
return true;
case TCP_SOCKET_CONNECTING:
// If the socket is connecting, no readable data is available.
return false;
case TCP_SOCKET_CONNECTED:
// A connected socket is considered read_ready if there is data
// available for reading, or if EOF has been detected.
return !in_buf_.empty() || eof_;
case TCP_SOCKET_LISTENING:
// A listening socket is considered read_ready when there is a
// connection waiting to be accepted.
return !accepted_socket_.is_null();
case TCP_SOCKET_ERROR:
// On error, the read() should return error without blocking.
return true;
default:
// Should not reach here.
ALOG_ASSERT(false);
}
return false;
}
bool TCPSocket::IsSelectWriteReady() const {
// Closed socket should return an error without blocking.
if (socket_->is_closed())
return true;
switch (connect_state_) {
case TCP_SOCKET_NEW:
// If the socket is neither connected nor listening, the socket is
// considered write_ready, as the write() should return error without
// blocking.
return true;
case TCP_SOCKET_CONNECTING:
// If the socket is connecting, the socket is not yet writable.
return false;
case TCP_SOCKET_CONNECTED:
// A connected socket is considered write_ready if there is some space
// available in the internal buffer.
return out_buf_.size() < kBufSize;
case TCP_SOCKET_LISTENING:
// The listening socket is unwritable.
return false;
case TCP_SOCKET_ERROR:
// On error, the write() should return error without blocking.
return true;
default:
// Should not reach here.
ALOG_ASSERT(false);
}
return false;
}
bool TCPSocket::IsSelectExceptionReady() const {
return connect_state_ == TCP_SOCKET_ERROR;
}
int16_t TCPSocket::GetPollEvents() const {
// Currently we use IsSelect*Ready() family temporarily (and wrongly).
// TODO(crbug.com/359400): Fix the implementation.
return ((IsSelectReadReady() ? POLLIN : 0) |
(IsSelectWriteReady() ? POLLOUT : 0) |
(IsSelectExceptionReady() ? POLLERR : 0));
}
void TCPSocket::OnLastFileRef() {
ALOG_ASSERT(!socket_->is_closed());
CloseLocked();
}
bool TCPSocket::IsTerminated() const {
return socket_->is_closed() || connect_state_ == TCP_SOCKET_ERROR;
}
void TCPSocket::PostReadTaskLocked() {
VirtualFileSystem* sys = VirtualFileSystem::GetVirtualFileSystem();
sys->mutex().AssertAcquired();
if (!is_connected() || read_sent_) {
return; // No more async reads.
}
if (in_buf_.size() >= kBufSize / 2) {
return; // Enough to read locally.
}
if (eof_) {
return; // We already hit the EOF.
}
read_sent_ = true;
if (!pp::Module::Get()->core()->IsMainThread()) {
pp::Module::Get()->core()->CallOnMainThread(
0, factory_.NewCallback(&TCPSocket::Read));
} else {
// If on main Pepper thread call it directly.
ReadLocked();
}
}
void TCPSocket::Accept(int32_t result) {
ALOG_ASSERT(result == PP_OK);
VirtualFileSystem* sys = VirtualFileSystem::GetVirtualFileSystem();
base::AutoLock lock(sys->mutex());
int32_t pp_error = socket_->socket()->Accept(
factory_.NewCallbackWithOutput(&TCPSocket::OnAccept));
ALOG_ASSERT(pp_error == PP_OK_COMPLETIONPENDING);
}
void TCPSocket::OnAccept(int32_t result, const pp::TCPSocket& accepted_socket) {
// TODO(crbug.com/364744): Handle error cases.
VirtualFileSystem* sys = VirtualFileSystem::GetVirtualFileSystem();
base::AutoLock lock(sys->mutex());
ALOG_ASSERT(accepted_socket_.is_null());
accepted_socket_ = accepted_socket;
sys->Broadcast();
NotifyListeners();
}
void TCPSocket::Connect(int32_t result, const pp::NetAddress& address) {
ALOG_ASSERT(result == PP_OK);
VirtualFileSystem* sys = VirtualFileSystem::GetVirtualFileSystem();
base::AutoLock lock(sys->mutex());
// A closed socket means we are in destructor. On the other hand,
// error should not happen in connect.
ALOG_ASSERT(connect_state_ == TCP_SOCKET_CONNECTING);
int32_t pp_error = socket_->socket()->Connect(
address, factory_.NewCallback(&TCPSocket::OnConnect));
ALOG_ASSERT(pp_error == PP_OK_COMPLETIONPENDING);
}
void TCPSocket::OnConnect(int32_t result) {
VirtualFileSystem* sys = VirtualFileSystem::GetVirtualFileSystem();
base::AutoLock lock(sys->mutex());
// A closed socket means we are in destructor. On the other hand,
// error should not happen in connect.
ALOG_ASSERT(connect_state_ == TCP_SOCKET_CONNECTING);
if (result == PP_OK) {
connect_state_ = TCP_SOCKET_CONNECTED;
PostReadTaskLocked();
NotifyListeners();
} else {
MarkAsErrorLocked(ECONNREFUSED);
}
sys->Broadcast();
}
void TCPSocket::Read(int32_t result) {
ALOG_ASSERT(result == PP_OK);
VirtualFileSystem* sys = VirtualFileSystem::GetVirtualFileSystem();
base::AutoLock lock(sys->mutex());
ReadLocked();
}
void TCPSocket::ReadLocked() {
VirtualFileSystem* sys = VirtualFileSystem::GetVirtualFileSystem();
sys->mutex().AssertAcquired();
if (IsTerminated()) {
read_sent_ = false;
sys->Broadcast();
return;
}
pp::CompletionCallback callback = factory_.NewCallback(&TCPSocket::OnRead);
int32_t pp_error = socket_->socket()->Read(
&read_buf_[0], read_buf_.size(),
callback);
if (pp_error >= 0) {
// This usually only happens on tests. We need to cancel the original
// callback to avoid leaks, and to use OnReadLocked instead of OnRead in
// order to avoid re-acquiring sys->mutex() and crashing.
callback.Run(PP_ERROR_USERCANCEL);
OnReadLocked(pp_error);
} else {
ALOG_ASSERT(pp_error == PP_OK_COMPLETIONPENDING);
}
}
void TCPSocket::OnRead(int32_t result) {
if (result == PP_ERROR_USERCANCEL) {
// The callback was cancelled since it was possible to run it synchronously
// on the same thread that requested the read.
return;
}
VirtualFileSystem* sys = VirtualFileSystem::GetVirtualFileSystem();
base::AutoLock lock(sys->mutex());
OnReadLocked(result);
}
void TCPSocket::OnReadLocked(int32_t result) {
VirtualFileSystem* sys = VirtualFileSystem::GetVirtualFileSystem();
sys->mutex().AssertAcquired();
read_sent_ = false;
if (IsTerminated()) {
sys->Broadcast();
return;
}
if (result > 0) {
in_buf_.insert(in_buf_.end(),
read_buf_.begin(), read_buf_.begin() + result);
PostReadTaskLocked();
NotifyListeners();
} else if (result == 0) {
eof_ = true;
NotifyListeners();
} else {
MarkAsErrorLocked(EIO); // TODO(crbug.com/358932): Pick correct error.
}
sys->Broadcast();
}
void TCPSocket::Write(int32_t result) {
VirtualFileSystem* sys = VirtualFileSystem::GetVirtualFileSystem();
base::AutoLock lock(sys->mutex());
if (!write_sent_) {
WriteLocked();
}
}
void TCPSocket::WriteLocked() {
VirtualFileSystem* sys = VirtualFileSystem::GetVirtualFileSystem();
sys->mutex().AssertAcquired();
ALOG_ASSERT(!write_sent_);
if (IsTerminated()) {
sys->Broadcast();
return;
}
if (write_buf_.size() == 0) {
write_buf_.swap(out_buf_);
} else if (write_buf_.size() < kBufSize / 2) {
// Avoid to shift the content in out_buf_ too often by only allowing
// to move chunks either larger than kBufSize / 2 or coinciding with
// the whole out_buf_ buffer.
int size = std::min(kBufSize - write_buf_.size(), out_buf_.size());
write_buf_.insert(write_buf_.end(), out_buf_.begin(),
out_buf_.begin() + size);
out_buf_.erase(out_buf_.begin(), out_buf_.begin() + size);
}
write_sent_ = true;
int32_t result = socket_->socket()->Write(
&write_buf_[0], write_buf_.size(),
factory_.NewCallback(&TCPSocket::OnWrite));
ALOG_ASSERT(result == PP_OK_COMPLETIONPENDING);
}
void TCPSocket::OnWrite(int32_t result) {
VirtualFileSystem* sys = VirtualFileSystem::GetVirtualFileSystem();
base::AutoLock lock(sys->mutex());
write_sent_ = false;
if (IsTerminated()) {
sys->Broadcast();
return;
}
if (result < 0 || (size_t)result > write_buf_.size()) {
// Write error.
ALOGI("TCPSocket::OnWrite: write error on %d, result: %d", fd_, result);
MarkAsErrorLocked(EIO); // TODO(crbug.com/358932): Pick correct error.
sys->Broadcast();
return;
} else {
write_buf_.erase(write_buf_.begin(), write_buf_.begin() + (size_t)result);
}
if (!write_buf_.empty() || !out_buf_.empty()) {
WriteLocked();
}
sys->Broadcast();
NotifyListeners();
}
void TCPSocket::CloseLocked() {
VirtualFileSystem* sys = VirtualFileSystem::GetVirtualFileSystem();
// Wait for write operations to complete
// TODO(crbug.com/351755): Refactor code so that close can't hang for ever.
while (write_sent_ && is_connected()) {
sys->Wait();
}
// Post task to the main thread, so that any pending tasks on main thread
// will be canceled.
int32_t result = PP_OK_COMPLETIONPENDING;
pp::Module::Get()->core()->CallOnMainThread(
0, factory_.NewCallback(&TCPSocket::Close, &result));
while (result == PP_OK_COMPLETIONPENDING)
sys->Wait();
ARC_STRACE_REPORT_PP_ERROR(result);
}
void TCPSocket::Close(int32_t result, int32_t* pres) {
ALOG_ASSERT(result == PP_OK);
VirtualFileSystem* sys = VirtualFileSystem::GetVirtualFileSystem();
base::AutoLock lock(sys->mutex());
factory_.CancelAll();
socket_->Close();
*pres = PP_OK;
// Don't access any member variable after sys->Browadcast() is called.
// It may make destructor have completed.
NotifyListeners();
sys->Broadcast();
}
const char* TCPSocket::GetStreamType() const {
return "tcp";
}
} // namespace posix_translation