blob: 990bb98ff84ab575659cf06356517d5bc2890b62 [file] [log] [blame]
// Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "tcp_socket.h"
#include <algorithm>
#include <assert.h>
#include <string.h>
#include "ppapi/c/pp_errors.h"
#include "ppapi/cpp/module.h"
#include "file_system.h"
TCPSocket::TCPSocket(int fd, int oflag)
: ref_(1), fd_(fd), oflag_(oflag), factory_(this), socket_(NULL),
read_buf_(kBufSize), read_sent_(false), write_sent_(false) {
}
TCPSocket::~TCPSocket() {
assert(!socket_);
assert(!ref_);
}
void TCPSocket::addref() {
++ref_;
}
void TCPSocket::release() {
if (!--ref_)
delete this;
}
FileStream* TCPSocket::dup(int fd) {
return NULL;
}
bool TCPSocket::connect(const char* host, uint16_t port) {
int32_t result = PP_OK_COMPLETIONPENDING;
pp::Module::Get()->core()->CallOnMainThread(0,
factory_.NewCallback(&TCPSocket::Connect, host, port, &result));
FileSystem* sys = FileSystem::GetFileSystem();
while (result == PP_OK_COMPLETIONPENDING)
sys->cond().wait(sys->mutex());
return result == PP_OK;
}
bool TCPSocket::accept(PP_Resource resource) {
int32_t result = PP_OK_COMPLETIONPENDING;
pp::Module::Get()->core()->CallOnMainThread(0,
factory_.NewCallback(&TCPSocket::Accept, resource, &result));
FileSystem* sys = FileSystem::GetFileSystem();
while (result == PP_OK_COMPLETIONPENDING)
sys->cond().wait(sys->mutex());
return result == PP_OK;
}
void TCPSocket::close() {
if (socket_) {
int32_t result = PP_OK_COMPLETIONPENDING;
pp::Module::Get()->core()->CallOnMainThread(0,
factory_.NewCallback(&TCPSocket::Close, &result));
FileSystem* sys = FileSystem::GetFileSystem();
while (result == PP_OK_COMPLETIONPENDING)
sys->cond().wait(sys->mutex());
}
}
int TCPSocket::read(char* buf, size_t count, size_t* nread) {
if (is_block()) {
FileSystem* sys = FileSystem::GetFileSystem();
while (in_buf_.empty() && is_open())
sys->cond().wait(sys->mutex());
}
*nread = std::min(count, in_buf_.size());
if (*nread) {
std::copy(in_buf_.begin(), in_buf_.begin() + *nread, buf);
in_buf_.erase(in_buf_.begin(), in_buf_.begin() + *nread);
}
if (*nread == 0) {
if (!is_open()) {
return 0;
} else {
*nread = -1;
return EAGAIN;
}
}
PostReadTask();
return 0;
}
int TCPSocket::write(const char* buf, size_t count, size_t* nwrote) {
if (!is_open())
return EIO;
out_buf_.insert(out_buf_.end(), buf, buf + count);
if (is_block()) {
int32_t result = PP_OK_COMPLETIONPENDING;
PostWriteTask(&result, true);
FileSystem* sys = FileSystem::GetFileSystem();
while (result == PP_OK_COMPLETIONPENDING)
sys->cond().wait(sys->mutex());
if ((size_t)result != count) {
*nwrote = -1;
return EIO;
} else {
*nwrote = count;
return 0;
}
} else {
PostWriteTask(NULL, true);
*nwrote = count;
return 0;
}
}
int TCPSocket::fcntl(int cmd, va_list ap) {
if (cmd == F_GETFL) {
return oflag_;
} else if (cmd == F_SETFL) {
oflag_ = va_arg(ap, long);
return 0;
} else {
return -1;
}
}
bool TCPSocket::is_read_ready() {
return !is_open() || !in_buf_.empty();
}
bool TCPSocket::is_write_ready() {
return !is_open() || out_buf_.size() < kBufSize;
}
bool TCPSocket::is_exception() {
return !is_open();
}
void TCPSocket::PostReadTask() {
if (is_open() && !read_sent_ && in_buf_.size() < kBufSize / 2) {
read_sent_ = true;
if (!pp::Module::Get()->core()->IsMainThread()) {
pp::Module::Get()->core()->CallOnMainThread(
0, factory_.NewCallback(&TCPSocket::Read));
} else {
// If on main Pepper thread and delay is not required call it directly.
Read(PP_OK);
}
}
}
void TCPSocket::PostWriteTask(int32_t* pres, bool always_post) {
if (is_open() && !write_sent_ && !out_buf_.empty()) {
write_sent_ = true;
if (always_post || !pp::Module::Get()->core()->IsMainThread()) {
pp::Module::Get()->core()->CallOnMainThread(0,
factory_.NewCallback(&TCPSocket::Write, pres));
} else {
// If on main Pepper thread and delay is not required call it directly.
Write(PP_OK, pres);
}
}
}
void TCPSocket::Connect(int32_t result, const char* host, uint16_t port,
int32_t* pres) {
FileSystem* sys = FileSystem::GetFileSystem();
Mutex::Lock lock(sys->mutex());
assert(!socket_);
socket_ = new pp::TCPSocketPrivate(sys->instance());
*pres = socket_->Connect(host, port,
factory_.NewCallback(&TCPSocket::OnConnect, pres));
if (*pres != PP_OK_COMPLETIONPENDING)
sys->cond().broadcast();
}
void TCPSocket::OnConnect(int32_t result, int32_t* pres) {
FileSystem* sys = FileSystem::GetFileSystem();
Mutex::Lock lock(sys->mutex());
if (result == PP_OK) {
PostReadTask();
} else {
delete socket_;
socket_ = NULL;
}
*pres = result;
sys->cond().broadcast();
}
void TCPSocket::Read(int32_t result) {
FileSystem* sys = FileSystem::GetFileSystem();
Mutex::Lock lock(sys->mutex());
if (!is_open()) {
read_sent_ = false;
sys->cond().broadcast();
return;
}
result = socket_->Read(&read_buf_[0], read_buf_.size(),
factory_.NewCallback(&TCPSocket::OnRead));
if (result != PP_OK_COMPLETIONPENDING) {
delete socket_;
socket_ = NULL;
read_sent_ = false;
sys->cond().broadcast();
}
}
void TCPSocket::OnRead(int32_t result) {
FileSystem* sys = FileSystem::GetFileSystem();
Mutex::Lock lock(sys->mutex());
read_sent_ = false;
if (!is_open()) {
sys->cond().broadcast();
return;
}
if (result > 0) {
in_buf_.insert(in_buf_.end(),
read_buf_.begin(), read_buf_.begin() + result);
PostReadTask();
} else {
delete socket_;
socket_ = NULL;
}
sys->cond().broadcast();
}
void TCPSocket::Write(int32_t result, int32_t* pres) {
FileSystem* sys = FileSystem::GetFileSystem();
Mutex::Lock lock(sys->mutex());
if (!is_open()) {
if (pres)
*pres = PP_ERROR_FAILED;
write_sent_ = false;
sys->cond().broadcast();
return;
}
if (write_buf_.size()) {
// Previous write operation is in progress.
PostWriteTask(pres, true);
return;
}
assert(out_buf_.size());
write_buf_.swap(out_buf_);
result = socket_->Write(&write_buf_[0], write_buf_.size(),
factory_.NewCallback(&TCPSocket::OnWrite, pres));
if (result != PP_OK_COMPLETIONPENDING) {
LOG("TCPSocket::Write: failed %d %d %d\n", fd_, result, write_buf_.size());
delete socket_;
socket_ = NULL;
if (pres)
*pres = result;
write_sent_ = false;
sys->cond().broadcast();
}
}
void TCPSocket::OnWrite(int32_t result, int32_t* pres) {
FileSystem* sys = FileSystem::GetFileSystem();
Mutex::Lock lock(sys->mutex());
write_sent_ = false;
if (!is_open()) {
if (pres)
*pres = PP_ERROR_FAILED;
sys->cond().broadcast();
return;
}
if (result < 0 || (size_t)result > write_buf_.size()) {
// Write error.
LOG("TCPSocket::OnWrite: close socket %d\n", fd_);
delete socket_;
socket_ = NULL;
} else if ((size_t)result < write_buf_.size()) {
// Partial write. Insert remaining bytes at the beginning of out_buf_.
out_buf_.insert(out_buf_.begin(), &write_buf_[result], &*write_buf_.end());
}
if (pres)
*pres = result;
write_buf_.clear();
sys->cond().broadcast();
if (!is_block()) {
// For async sockets some more data could be written while Pepper sends
// previous portion so check do we have some data to write. For sync case,
// we always wait write operation completion.
PostWriteTask(NULL, false);
}
}
void TCPSocket::Close(int32_t result, int32_t* pres) {
FileSystem* sys = FileSystem::GetFileSystem();
Mutex::Lock lock(sys->mutex());
delete socket_;
socket_ = NULL;
if (pres)
*pres = PP_OK;
sys->cond().broadcast();
}
bool TCPSocket::Accept(int32_t result, PP_Resource resource, int32_t* pres) {
FileSystem* sys = FileSystem::GetFileSystem();
Mutex::Lock lock(sys->mutex());
assert(!socket_);
socket_ = new pp::TCPSocketPrivate(pp::PassRef(), resource);
PostReadTask();
*pres = PP_OK;
sys->cond().broadcast();
return true;
}