blob: 805448bad7b0077d6d40715435d270a9fd0ab15c [file] [log] [blame]
// Copyright 2012 The Goma Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "tls_descriptor.h"
#include <sstream>
#include <utility>
#include "absl/strings/escaping.h"
#include "absl/strings/str_cat.h"
#include "callback.h"
#include "compiler_proxy_info.h"
#include "compiler_specific.h"
#include "glog/logging.h"
#include "http_util.h"
#include "socket_descriptor.h"
namespace devtools_goma {
TLSDescriptor::TLSDescriptor(SocketDescriptor* desc,
TLSEngine* e,
Options options,
WorkerThreadManager* wm)
: socket_descriptor_(desc),
engine_(e),
wm_(wm),
readable_closure_(nullptr),
writable_closure_(nullptr),
network_write_offset_(0),
ssl_pending_(false),
active_read_(false),
active_write_(false),
io_failed_(false),
options_(std::move(options)),
connect_status_(READY),
is_closed_(false),
cancel_readable_closure_(nullptr) {
thread_ = GetCurrentThreadId();
}
TLSDescriptor::~TLSDescriptor() {
DCHECK(THREAD_ID_IS_SELF(thread_));
if (cancel_readable_closure_) {
cancel_readable_closure_->Cancel();
cancel_readable_closure_ = nullptr;
}
}
void TLSDescriptor::Init() {
if (options_.use_proxy && !engine_->IsRecycled())
connect_status_ = NEED_WRITE;
socket_descriptor_->NotifyWhenReadable(
NewPermanentCallback(this, &TLSDescriptor::TransportLayerReadable));
socket_descriptor_->NotifyWhenWritable(
NewPermanentCallback(this, &TLSDescriptor::TransportLayerWritable));
}
void TLSDescriptor::NotifyWhenReadable(
std::unique_ptr<PermanentClosure> closure) {
DCHECK(THREAD_ID_IS_SELF(thread_));
readable_closure_ = std::move(closure);
active_read_ = true;
RestartTransportLayer();
VLOG(1) << "Notify when " << socket_descriptor_->fd()
<< " readable " << readable_closure_.get();
}
void TLSDescriptor::NotifyWhenWritable(
std::unique_ptr<PermanentClosure> closure) {
DCHECK(THREAD_ID_IS_SELF(thread_));
writable_closure_ = std::move(closure);
active_write_ = true;
RestartTransportLayer();
VLOG(1) << "Notify when " << socket_descriptor_->fd()
<< " writable " << writable_closure_.get();
}
void TLSDescriptor::ClearWritable() {
DCHECK(THREAD_ID_IS_SELF(thread_));
VLOG(1) << "Clear " << socket_descriptor_->fd() << " writable "
<< writable_closure_.get();
active_write_ = false;
writable_closure_.reset();
}
void TLSDescriptor::NotifyWhenTimedout(absl::Duration timeout,
OneshotClosure* closure) {
DCHECK(THREAD_ID_IS_SELF(thread_));
socket_descriptor_->NotifyWhenTimedout(timeout, closure);
}
void TLSDescriptor::ChangeTimeout(absl::Duration timeout) {
DCHECK(THREAD_ID_IS_SELF(thread_));
// once is_closed_, timeout closure is cleared (in StopTransportLayer)
if (is_closed_)
return;
socket_descriptor_->ChangeTimeout(timeout);
}
ssize_t TLSDescriptor::Read(void* ptr, size_t len) {
CHECK_GT(len, 0) << "fd=" << socket_descriptor_->fd();
cancel_readable_closure_ = nullptr;
if (io_failed_)
return -1;
if (is_closed_) {
VLOG(1) << "reading from tls engine buffer after connection closed"
<< " fd=" << socket_descriptor_->fd();
} else {
// It seems to get stuck if we do not restart transport layer
// communications.
// It might be because TLS may send something like ACK, we guess.
socket_descriptor_->RestartWrite();
}
const int ret = engine_->Read(ptr, len);
if (ret == TLSEngine::TLS_WANT_READ || ret == TLSEngine::TLS_WANT_WRITE) {
if (is_closed_) {
LOG(INFO) << "socket has already been closed by peer: fd="
<< socket_descriptor_->fd();
return 0;
}
ssl_pending_ = true;
} else if (ret < 0) { // TLSEngine error except want read/write.
LOG(ERROR) << "Error occured during application read.";
} else {
ssl_pending_ = false;
}
if (is_closed_ && ret > 0) {
// Make readable_closure_ read all available data.
DCHECK(readable_closure_.get());
cancel_readable_closure_ = wm_->RunDelayedClosureInThread(
FROM_HERE, thread_, absl::ZeroDuration(),
NewCallback(static_cast<Closure*>(readable_closure_.get()),
&Closure::Run));
}
return ret;
}
ssize_t TLSDescriptor::Write(const void* ptr, size_t len) {
CHECK_GT(len, 0) << "fd=" << socket_descriptor_->fd();
if (io_failed_ || is_closed_)
return -1;
ResumeTransportWritable();
const int ret = engine_->Write(ptr, len);
if (ret == TLSEngine::TLS_WANT_READ || ret == TLSEngine::TLS_WANT_WRITE) {
ssl_pending_ = true;
} else if (ret < 0) { // TLSEngine error except want read/write.
LOG(ERROR) << "Error occured during application write.";
} else {
ssl_pending_ = false;
}
return ret;
}
bool TLSDescriptor::NeedRetry() const {
// TLS engine will not get interrupted but view from application side
// should be similar.
return ssl_pending_ && !io_failed_ && !is_closed_;
}
string TLSDescriptor::GetLastErrorMessage() const {
return absl::StrCat(
"fd:", socket_descriptor_->fd(), " ",
socket_descriptor_->PeerName(), " ",
"socket:", socket_descriptor_->GetLastErrorMessage(),
(io_failed_ ? " io_failed" : ""),
(is_closed_ ? " is_closed" : ""),
" ",
"tls_engine:", engine_->GetLastErrorMessage());
}
void TLSDescriptor::StopRead() {
DCHECK(THREAD_ID_IS_SELF(thread_));
active_read_ = false;
if (!active_write_ && !ssl_pending_) {
StopTransportLayer();
}
if (cancel_readable_closure_) {
cancel_readable_closure_->Cancel();
cancel_readable_closure_ = nullptr;
}
}
void TLSDescriptor::StopWrite() {
DCHECK(THREAD_ID_IS_SELF(thread_));
active_write_ = false;
if (!active_read_ && !ssl_pending_) {
StopTransportLayer();
}
}
void TLSDescriptor::TransportLayerReadable() {
size_t read_size = std::min(engine_->GetBufSizeFromTransport(),
sizeof(network_read_buffer_));
if (read_size == 0) {
LOG(INFO) << "Transport layer is readable, "
<< "but engine is not ready to read from transport";
PutClosuresInRunQueue();
return;
}
const ssize_t read_bytes = socket_descriptor_->Read(network_read_buffer_,
read_size);
if (read_bytes < 0 && socket_descriptor_->NeedRetry())
return;
if (read_bytes == 0) { // EOF.
LOG(INFO) << "Remote closed. "
<< " fd=" << socket_descriptor_->fd()
<< " read_size=" << read_size
<< " read_bytes=" << read_bytes
<< " err=" << socket_descriptor_->GetLastErrorMessage();
is_closed_ = true;
StopTransportLayer();
PutClosuresInRunQueue();
return;
}
if (read_bytes < 0) { // error.
LOG(WARNING) << "Transport layer read " << socket_descriptor_->fd()
<< " read_size=" << read_size
<< " read_bytes=" << read_bytes
<< " err=" << socket_descriptor_->GetLastErrorMessage();
StopTransportLayer();
io_failed_ = true;
PutClosuresInRunQueue();
return;
}
switch (connect_status_) {
case READY:
{
int ret = engine_->SetDataFromTransport(
absl::string_view(network_read_buffer_, read_bytes));
if (ret < 0) { // Error in TLS engine.
StopTransportLayer();
io_failed_ = true;
PutClosuresInRunQueue();
return;
}
CHECK_EQ(ret, static_cast<int>(read_bytes));
ResumeTransportWritable();
if (engine_->IsReady()) {
PutClosuresInRunQueue();
}
return;
}
case NEED_READ:
{
int status_code = 0;
size_t offset;
size_t content_length;
proxy_response_.append(network_read_buffer_, read_bytes);
if (ParseHttpResponse(proxy_response_, &status_code, &offset,
&content_length, nullptr)) {
if (status_code / 100 == 2) {
connect_status_ = READY;
ResumeTransportWritable();
} else {
LOG(ERROR) << "Proxy's status code != 2xx."
<< " Details:" << proxy_response_;
StopTransportLayer();
io_failed_ = true;
PutClosuresInRunQueue();
}
}
return;
}
case NEED_WRITE:
LOG(ERROR) << "Unexpected read occured when waiting writable."
<< "buf:"
<< absl::CEscape(
absl::string_view(network_read_buffer_, read_bytes));
}
}
void TLSDescriptor::TransportLayerWritable() {
if (network_write_buffer_.empty()) {
if (connect_status_ == READY)
CHECK_GE(engine_->GetDataToSendTransport(&network_write_buffer_), 0);
else if (connect_status_ == NEED_WRITE)
network_write_buffer_ = CreateProxyRequestMessage();
network_write_offset_ = 0;
if (network_write_buffer_.size() == 0) {
SuspendTransportWritable();
}
if (!engine_->IsIOPending()) {
PutClosuresInRunQueue();
return;
}
}
ssize_t write_size = network_write_buffer_.size() - network_write_offset_;
if (write_size == 0) {
return;
}
DCHECK_GT(write_size, 0);
const ssize_t write_bytes =
socket_descriptor_->Write(
network_write_buffer_.c_str() + network_write_offset_,
write_size);
if (write_bytes < 0 && socket_descriptor_->NeedRetry())
return;
if (write_bytes <= 0) {
LOG(WARNING) << "Transport layer write " << socket_descriptor_->fd()
<< " failed."
<< " write_size=" << write_size
<< " write_bytes=" << write_bytes
<< " err=" << socket_descriptor_->GetLastErrorMessage();
StopTransportLayer();
io_failed_ = true;
PutClosuresInRunQueue();
return;
}
network_write_offset_ += write_bytes;
DCHECK_LE(network_write_offset_, network_write_buffer_.size());
if (network_write_buffer_.size() == network_write_offset_) {
network_write_buffer_.clear();
network_write_offset_ = 0;
if (connect_status_ == NEED_WRITE)
connect_status_ = NEED_READ;
}
}
void TLSDescriptor::PutClosuresInRunQueue() const {
// TODO: check readable/writeble of data if possible.
// Since SSL_pending seems not works well with BIO pair, we cannot check
// readable. I could not find a good function to check it writable.
bool set_callback = false;
if (active_write_ && writable_closure_.get() != nullptr) {
wm_->RunClosureInThread(
FROM_HERE,
thread_, writable_closure_.get(),
WorkerThread::PRIORITY_IMMEDIATE);
set_callback = true;
}
if (active_read_ && readable_closure_.get() != nullptr) {
wm_->RunClosureInThread(
FROM_HERE,
thread_, readable_closure_.get(),
WorkerThread::PRIORITY_IMMEDIATE);
set_callback = true;
}
LOG_IF(ERROR, !set_callback)
<< "PutClosuresInRunQueue actually did nothing. "
<< "We expect control goes back to the user of this libary."
<< " active_write=" << active_write_
<< " writable_closure=" << (writable_closure_ != nullptr)
<< " active_read=" << active_read_
<< " readable_closure" << (readable_closure_ != nullptr)
<< " is_closed=" << is_closed_
<< " io_failed=" << io_failed_;
}
void TLSDescriptor::SuspendTransportWritable() {
socket_descriptor_->StopWrite();
socket_descriptor_->UnregisterWritable();
}
void TLSDescriptor::ResumeTransportWritable() {
if (is_closed_) {
LOG(INFO) << "socket has already been closed: fd="
<< socket_descriptor_->fd();
return;
}
socket_descriptor_->RestartWrite();
}
void TLSDescriptor::StopTransportLayer() {
socket_descriptor_->StopRead();
socket_descriptor_->StopWrite();
if (is_closed_) {
socket_descriptor_->ClearTimeout();
}
}
void TLSDescriptor::RestartTransportLayer() {
if (is_closed_) {
LOG(INFO) << "socket has already been closed: fd="
<< socket_descriptor_->fd();
return;
}
socket_descriptor_->RestartRead();
socket_descriptor_->RestartWrite();
}
string TLSDescriptor::CreateProxyRequestMessage() {
std::ostringstream http_send_message;
std::ostringstream dest_host_port;
dest_host_port << options_.dest_host_name << ":" << options_.dest_port;
http_send_message << "CONNECT " << dest_host_port.str() << " HTTP/1.1\r\n";
http_send_message << "Host: " << dest_host_port.str() << "\r\n";
http_send_message << "UserAgent: " << kUserAgentString << "\r\n";
http_send_message << "\r\n";
return http_send_message.str();
}
bool TLSDescriptor::CanReuse() const {
return !is_closed_ && !io_failed_ && socket_descriptor_->CanReuse();
}
} // namespace devtools_goma