blob: 221859061d204e67679514108d70ec7ca3799470 [file] [log] [blame]
// Copyright 2012 The Goma Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "socket_descriptor.h"
#ifndef _WIN32
#include <sys/ioctl.h>
#include <sys/socket.h>
#else
#include <Winsock2.h>
#endif
#include <memory>
#include "callback.h"
#include "compiler_specific.h"
#include "glog/logging.h"
#include "worker_thread.h"
namespace devtools_goma {
SocketDescriptor::SocketDescriptor(ScopedSocket&& fd,
WorkerThreadManager::Priority priority,
WorkerThreadManager::WorkerThread* worker)
: fd_(std::move(fd)),
priority_(priority),
worker_(worker),
readable_closure_(nullptr),
writable_closure_(nullptr),
timeout_(0),
last_time_(worker->Now()),
ALLOW_THIS_IN_INITIALIZER_LIST(
timeout_run_closure_(NewPermanentCallback(
this, &SocketDescriptor::TimeoutClosure))),
timeout_closure_(nullptr),
read_in_queue_(false),
write_in_queue_(false),
timeout_in_queue_(false),
active_read_(false),
active_write_(false),
write_poll_registered_(false),
is_closed_(false),
need_retry_(false) {
thread_ = GetCurrentThreadId();
CHECK(fd_.valid());
}
SocketDescriptor::~SocketDescriptor() {
CHECK(!read_in_queue_);
CHECK(!write_in_queue_);
CHECK(!timeout_in_queue_);
// Note that WorkerThreadManager::DeleteSocketDescriptor will take care of
// unregistering closures from polling loop.
// We do not need either to call UnregisterPollEvent or
// CHECK(!write_poll_registered_).
}
void SocketDescriptor::NotifyWhenReadable(
std::unique_ptr<PermanentClosure> closure) {
DCHECK(THREAD_ID_IS_SELF(thread_));
readable_closure_ = std::move(closure);
last_time_ = worker_->Now();
active_read_ = true;
worker_->RegisterPollEvent(this, DescriptorPoller::kReadEvent);
VLOG(1) << "Notify when " << fd_.get()
<< " readable" << readable_closure_.get();
}
void SocketDescriptor::NotifyWhenWritable(
std::unique_ptr<PermanentClosure> closure) {
DCHECK(THREAD_ID_IS_SELF(thread_));
writable_closure_ = std::move(closure);
last_time_ = worker_->Now();
active_write_ = true;
worker_->RegisterPollEvent(this, DescriptorPoller::kWriteEvent);
write_poll_registered_ = true;
VLOG(1) << "Notify when " << fd_.get()
<< " writable" << writable_closure_.get();
}
void SocketDescriptor::ClearReadable() {
DCHECK(THREAD_ID_IS_SELF(thread_));
VLOG(1) << "Clear " << fd_.get() << " readable " << readable_closure_.get();
readable_closure_.reset();
active_read_ = false;
worker_->UnregisterPollEvent(this, DescriptorPoller::kReadEvent);
}
void SocketDescriptor::ClearWritable() {
DCHECK(THREAD_ID_IS_SELF(thread_));
VLOG(1) << "Clear " << fd_.get() << " writable " << writable_closure_.get();
writable_closure_.reset();
active_write_ = false;
if (write_poll_registered_) {
worker_->UnregisterPollEvent(this, DescriptorPoller::kWriteEvent);
write_poll_registered_ = false;
}
}
void SocketDescriptor::NotifyWhenTimedout(double timeout,
OneshotClosure* closure) {
DCHECK(THREAD_ID_IS_SELF(thread_));
DCHECK(!timeout_closure_);
timeout_ = timeout;
timeout_closure_.reset(closure);
last_time_ = worker_->Now();
worker_->RegisterTimeoutEvent(this);
}
void SocketDescriptor::ChangeTimeout(double timeout) {
DCHECK(THREAD_ID_IS_SELF(thread_));
DCHECK(timeout_closure_);
timeout_ = timeout;
last_time_ = worker_->Now();
}
void SocketDescriptor::ClearTimeout() {
DCHECK(THREAD_ID_IS_SELF(thread_));
timeout_ = 0;
if (timeout_closure_) {
timeout_closure_.reset();
}
worker_->UnregisterTimeoutEvent(this);
}
ssize_t SocketDescriptor::Read(void* ptr, size_t len) {
need_retry_ = false;
last_time_ = worker_->Now();
ssize_t r = fd_.Read(ptr, len);
if (r < 0)
UpdateLastErrorStatus();
if (r == 0)
is_closed_ = true;
return r;
}
ssize_t SocketDescriptor::Write(const void* ptr, size_t len) {
need_retry_ = false;
last_time_ = worker_->Now();
ssize_t r = fd_.Write(ptr, len);
if (r < 0)
UpdateLastErrorStatus();
return r;
}
bool SocketDescriptor::NeedRetry() const {
return need_retry_;
}
int SocketDescriptor::ShutdownForSend() {
need_retry_ = false;
last_time_ = worker_->Now();
int r;
#ifndef _WIN32
r = shutdown(fd_.get(), SHUT_WR);
#else
r = shutdown(fd_.get(), SD_SEND);
#endif
if (r < 0)
UpdateLastErrorStatus();
return r;
}
bool SocketDescriptor::IsReadable() const {
int n;
#ifndef _WIN32
bool ioctl_ret = ioctl(fd_.get(), FIONREAD, &n) == -1;
#else
DWORD byte_returned;
bool ioctl_ret = WSAIoctl(fd_.get(), FIONREAD, nullptr, 0, &n, sizeof(n),
&byte_returned, nullptr, nullptr) != SOCKET_ERROR;
#endif
return !(ioctl_ret || n == 0);
}
void SocketDescriptor::StopRead() {
DCHECK(THREAD_ID_IS_SELF(thread_));
active_read_ = false;
}
void SocketDescriptor::StopWrite() {
DCHECK(THREAD_ID_IS_SELF(thread_));
active_write_ = false;
}
void SocketDescriptor::RestartRead() {
DCHECK(THREAD_ID_IS_SELF(thread_));
active_read_ = true;
}
void SocketDescriptor::RestartWrite() {
DCHECK(THREAD_ID_IS_SELF(thread_));
active_write_ = true;
if (!write_poll_registered_) {
VLOG(2) << "Register write again: fd=" << fd();
worker_->RegisterPollEvent(this, DescriptorPoller::kWriteEvent);
write_poll_registered_ = true;
}
}
bool SocketDescriptor::wait_readable() const {
DCHECK(THREAD_ID_IS_SELF(thread_));
return active_read_ && readable_closure_ != nullptr && !read_in_queue_;
}
bool SocketDescriptor::wait_writable() const {
DCHECK(THREAD_ID_IS_SELF(thread_));
return active_write_ && writable_closure_ != nullptr && !write_in_queue_;
}
OneshotClosure* SocketDescriptor::GetReadableClosure() {
DCHECK(THREAD_ID_IS_SELF(thread_));
OneshotClosure* c =
GetClosure(&read_in_queue_, &active_read_, readable_closure_.get());
if (c != nullptr) {
last_time_ = worker_->Now();
}
return c;
}
OneshotClosure* SocketDescriptor::GetWritableClosure() {
DCHECK(THREAD_ID_IS_SELF(thread_));
OneshotClosure* c =
GetClosure(&write_in_queue_, &active_write_, writable_closure_.get());
if (c != nullptr) {
last_time_ = worker_->Now();
}
return c;
}
OneshotClosure* SocketDescriptor::GetTimeoutClosure() {
DCHECK(THREAD_ID_IS_SELF(thread_));
if (timeout_ > 0 && (worker_->Now() - last_time_) > timeout_ &&
!read_in_queue_ && !write_in_queue_ && !timeout_in_queue_) {
return GetClosure(&timeout_in_queue_, nullptr, timeout_run_closure_.get());
}
return nullptr;
}
OneshotClosure* SocketDescriptor::GetClosure(
bool* in_queue, bool* active, PermanentClosure* closure) {
DCHECK(THREAD_ID_IS_SELF(thread_));
if ((active == nullptr && (!active_read_ && !active_write_)) ||
((active != nullptr) && !(*active)))
return nullptr;
if (closure == nullptr)
return nullptr;
DCHECK(in_queue != nullptr);
if (*in_queue)
return nullptr;
*in_queue = true;
return NewCallback(this, &SocketDescriptor::RunCallback,
closure, in_queue, active);
}
void SocketDescriptor::RunCallback(
PermanentClosure* closure, bool* in_queue, bool* active) {
DCHECK(THREAD_ID_IS_SELF(thread_));
DCHECK(closure != nullptr);
DCHECK(in_queue != nullptr);
DCHECK(*in_queue);
*in_queue = false;
if ((active == nullptr && (!active_read_ && !active_write_)) ||
((active != nullptr) && !(*active))) {
// no need to delete closure. it must be permanent closure.
return;
}
closure->Run();
}
void SocketDescriptor::TimeoutClosure() {
DCHECK(THREAD_ID_IS_SELF(thread_));
if (read_in_queue_ || write_in_queue_)
return;
if (!active_read_ && !active_write_)
return;
if (timeout_ > 0 && (worker_->Now() - last_time_) > timeout_) {
// no need to delete closure. it deletes itself.
OneshotClosure* closure = timeout_closure_.release();
if (closure) {
LOG(INFO) << "socket timeout fd=" << fd_.get()
<< " timeout=" << timeout_;
closure->Run();
}
}
}
void SocketDescriptor::UpdateLastErrorStatus() {
#ifndef _WIN32
if (errno == EINTR || errno == EAGAIN) {
need_retry_ = true;
return;
}
#endif
char error_message[1024] = {0};
#ifndef _WIN32
// Meaning of returned value of strerror_r is different between
// XSI and GNU. Need to ignore.
(void)strerror_r(errno, error_message, sizeof error_message);
#else
FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM, 0, WSAGetLastError(), 0,
error_message, sizeof error_message, 0);
#endif
last_error_message_ = error_message;
}
void SocketDescriptor::UnregisterWritable() {
DCHECK(THREAD_ID_IS_SELF(thread_));
if (!active_write_ && write_poll_registered_) {
worker_->UnregisterPollEvent(this, DescriptorPoller::kWriteEvent);
write_poll_registered_ = false;
}
}
} // namespace devtools_goma