blob: 3f8fc7db4cc8b4c355de8135366fbeacee978959 [file] [log] [blame]
// Copyright 2010 The Goma Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//
// A threadpool HTTP server implementation.
#include "threadpool_http_server.h"
#ifndef _WIN32
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <netinet/in.h>
#include <pthread.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/un.h>
#else
#include "socket_helper_win.h"
#endif
#include <algorithm>
#include <cstdio>
#include <cstring>
#include <iostream>
#include <iterator>
#include <memory>
#include <sstream>
#include <utility>
#include <vector>
#include "absl/strings/str_split.h"
#include "autolock_timer.h"
#include "callback.h"
#include "compiler_proxy_info.h"
#include "compiler_specific.h"
#include "socket_descriptor.h"
#include "file.h"
#include "fileflag.h"
#include "glog/logging.h"
#include "goma_ipc_addr.h"
#include "goma_ipc_peer.h"
#include "ioutil.h"
#ifdef _WIN32
#include "named_pipe_server_win.h"
#endif
#include "platform_thread.h"
#include "scoped_fd.h"
#include "simple_timer.h"
#include "trustedipsmanager.h"
#include "util.h"
#include "worker_thread_manager.h"
#define BACKLOG 128
namespace devtools_goma {
// TODO: make it flag?
const int kDefaultTimeoutSec = 900;
ThreadpoolHttpServer::ThreadpoolHttpServer(string listen_addr,
int port,
int num_find_ports,
WorkerThreadManager* wm,
int num_threads,
HttpHandler* http_handler,
int max_num_sockets)
: listen_addr_(std::move(listen_addr)),
port_(port),
port_ready_(false),
num_find_ports_(num_find_ports),
wm_(wm),
pool_(WorkerThreadManager::kFreePool),
num_http_threads_(num_threads),
http_handler_(http_handler),
monitor_(nullptr),
trustedipsmanager_(nullptr),
max_num_sockets_(max_num_sockets),
idle_counting_(true),
last_closure_id_(kInvalidClosureId) {
for (int i = 0; i < NUM_SOCKET_TYPES; ++i) {
max_sockets_[i] = max_num_sockets_;
num_sockets_[i] = 0;
idle_counter_[i] = 0;
}
if (num_threads > 0) {
pool_ = wm->StartPool(num_threads, "threadpool_http_server");
DCHECK_NE(WorkerThreadManager::kFreePool, pool_);
}
}
ThreadpoolHttpServer::~ThreadpoolHttpServer() {
}
void ThreadpoolHttpServer::SetMonitor(Monitor* monitor) {
monitor_ = monitor;
}
void ThreadpoolHttpServer::SetTrustedIpsManager(
TrustedIpsManager* trustedipsmanager) {
trustedipsmanager_ = trustedipsmanager;
}
#ifdef _WIN32
class ThreadpoolHttpServer::PipeHandler : public NamedPipeServer::Handler {
public:
explicit PipeHandler(ThreadpoolHttpServer* server) : server_(server) {}
~PipeHandler() override {}
PipeHandler(const PipeHandler&) = delete;
PipeHandler& operator=(const PipeHandler&) = delete;
void HandleIncoming(NamedPipeServer::Request* req) override {
server_->SendNamedPipeJobToWorkerThread(req);
}
private:
ThreadpoolHttpServer* server_;
};
#endif
void ThreadpoolHttpServer::StartIPC(
const string& addr, int num_threads,
int max_overcommit_incoming_sockets) {
#ifdef _WIN32
pipe_handler_.reset(new PipeHandler(this));
pipe_server_.reset(new NamedPipeServer(wm_, pipe_handler_.get()));
pipe_server_->Start(addr);
// Each thread has a select() for at most FD_SETSIZE of sockets.
// 1 for event pipe fd.
// Note that NamedPipeServer doesn't use select(). It only waits for
// connection for a named pipe, creates new named pipe once
// the connection is established, and read/write pipes with asynchronous
// overlapped I/O.
int max_incoming = std::min(
max_num_sockets_,
num_threads * (FD_SETSIZE + max_overcommit_incoming_sockets - 1));
max_incoming = std::min(
max_incoming,
num_http_threads_ * (FD_SETSIZE + max_overcommit_incoming_sockets - 1));
#else
// compiler_proxy would consume almost 3 fds per request, so it would be
// safe to limit active accepting sockets by max_num_sockets / 3.
// Each worker thread has pipe (2 fds) and we use 2 sockets to accept
// requests, so we count them too.
int max_incoming = max_num_sockets_ / 3 - num_threads * 2 - 2;
const int kNumRetry = 10;
bool socket_ok = false;
for (int i = 0; i < kNumRetry; ++i) {
if (OpenUnixDomainSocket(addr)) {
socket_ok = true;
break;
}
devtools_goma::PlatformThread::Sleep(1);
}
CHECK(socket_ok) << "Failed to open " << addr;
LOG(INFO) << "unix domain:" << addr;
#endif
LOG(INFO) << "max incoming: " << max_incoming
<< " FD_SETSIZE=" << FD_SETSIZE
<< " max_num_sockets=" << max_num_sockets_
#ifdef USE_EPOLL
<< " USE_EPOLL=1"
#elif USE_KQUEUE
<< " USE_KQUEUE=1"
#endif
<< " threads=" << num_threads
<< "+" << num_http_threads_;
CHECK_GT(max_incoming, 0);
SetAcceptLimit(max_incoming, ThreadpoolHttpServer::SOCKET_IPC);
}
void ThreadpoolHttpServer::StopIPC() {
#ifdef _WIN32
pipe_server_->Stop();
#else
CloseUnixDomainSocket();
#endif
}
#ifndef _WIN32
bool ThreadpoolHttpServer::OpenUnixDomainSocket(const string& path) {
GomaIPCAddr addr;
socklen_t addr_len = InitializeGomaIPCAddress(path, &addr);
remove(path.c_str());
// TODO: use named pipe.
un_socket_.reset(socket(AF_GOMA_IPC, SOCK_STREAM, 0));
if (!un_socket_.valid())
return false;
CHECK_EQ(0, SetFileDescriptorFlag(un_socket_.get(), FD_CLOEXEC));
if (!un_socket_.SetNonBlocking()) {
PLOG(ERROR) << "set non blocking";
un_socket_.reset(-1);
return false;
}
if (!un_socket_.SetReuseAddr()) {
PLOG(ERROR) << "setsockopt SO_REUSEADDR";
un_socket_.reset(-1);
return false;
}
if (bind(un_socket_.get(), (struct sockaddr*)&addr, addr_len) < 0) {
PLOG(ERROR) << "bind";
un_socket_.reset(-1);
return false;
}
// drop permission to others.
if (chmod(path.c_str(), S_IRUSR|S_IWUSR) != 0) {
PLOG(ERROR) << "chmod";
un_socket_.reset(-1);
return false;
}
un_socket_name_ = path;
listen(un_socket_.get(), BACKLOG);
return true;
}
void ThreadpoolHttpServer::CloseUnixDomainSocket() {
if (un_socket_.valid()) {
un_socket_.Close();
if (!un_socket_name_.empty()) {
remove(un_socket_name_.c_str());
}
}
}
#endif
void ThreadpoolHttpServer::SetAcceptLimit(int n, SocketType socket_type) {
CHECK_GE(socket_type, 0);
CHECK_LT(socket_type, NUM_SOCKET_TYPES);
CHECK_GE(n, 0);
CHECK_LE(n, max_num_sockets_);
AUTOLOCK(lock, &mu_);
max_sockets_[socket_type] = n;
}
/* static */
bool ThreadpoolHttpServer::ParseRequestLine(
absl::string_view request, string* method,
string* req_path, string* query) {
// Find the first request string which would look like
// 'GET / HTTP/1.1\r\n'
absl::string_view::size_type pos = request.find("\r\n");
if (pos == absl::string_view::npos) {
return false;
}
const string firstline = string(request.substr(0, pos));
std::vector<string> method_path_protocol = ToVector(
absl::StrSplit(firstline, ' ', absl::SkipEmpty()));
if (method_path_protocol.size() != 3) {
return false;
}
*method = method_path_protocol[0];
const string &request_uri(method_path_protocol[1]);
size_t question_mark;
if ((question_mark = request_uri.find("?")) != string::npos) {
*req_path = request_uri.substr(0, question_mark);
*query =
request_uri.substr(question_mark + 1,
request_uri.size() - question_mark - 1);
} else {
*req_path = request_uri;
query->clear();
}
return true;
}
ThreadpoolHttpServer::HttpServerRequest::HttpServerRequest(
WorkerThreadManager* wm,
ThreadpoolHttpServer* server,
const Stat& stat,
Monitor* monitor)
: wm_(wm), thread_id_(0),
server_(server),
monitor_(monitor),
request_offset_(0),
request_content_length_(0),
request_len_(0),
parsed_valid_http_request_(false),
peer_pid_(0),
stat_(stat) {
}
#ifdef _WIN32
class ThreadpoolHttpServer::RequestFromNamedPipe : public HttpServerRequest {
public:
RequestFromNamedPipe(
WorkerThreadManager* wm,
ThreadpoolHttpServer* server,
const Stat& stat,
Monitor* monitor,
NamedPipeServer::Request* req)
: HttpServerRequest(wm, server, stat, monitor),
req_(req) {
}
RequestFromNamedPipe(const RequestFromNamedPipe&) = delete;
RequestFromNamedPipe& operator=(const RequestFromNamedPipe&) = delete;
bool IsTrusted() override {
return CheckCredential();
}
bool CheckCredential() override;
void Start();
void SendReply(const string& response) override;
void NotifyWhenClosed(OneshotClosure* callback) override;
private:
~RequestFromNamedPipe() override {}
NamedPipeServer::Request* req_;
};
bool ThreadpoolHttpServer::RequestFromNamedPipe::CheckCredential() {
// TODO: get peer_pid_ ?
return true;
}
void ThreadpoolHttpServer::RequestFromNamedPipe::Start() {
stat_.waiting_time_msec = stat_.timer.GetInMs();
stat_.timer.Start();
thread_id_ = wm_->GetCurrentThreadId();
request_ = string(req_->request_message());
request_len_ = request_.size();
bool request_is_chunked = false;
if (!FindContentLengthAndBodyOffset(
request_,
&request_content_length_,
&request_offset_,
&request_is_chunked)) {
LOG(ERROR) << "failed to find content length and body offset:"
<< request_;
server_->HandleIncoming(this);
return;
}
// We do not support request encoded with chunked transfer coding.
if (request_is_chunked) {
LOG(ERROR) << "request is encoded with chunked transfer coding:"
<< request_;
server_->HandleIncoming(this);
return;
}
if (request_len_ < request_offset_ + request_content_length_) {
LOG(ERROR) << "request not fully received? "
<< " len=" << request_len_
<< " offset=" << request_offset_
<< " content_length=" << request_content_length_;
server_->HandleIncoming(this);
return;
}
stat_.read_req_time_msec = stat_.timer.GetInMs();
if (!ParseRequestLine(request_,
&method_, &req_path_, &query_)) {
LOG(ERROR) << "parse request line failed";
server_->HandleIncoming(this);
return;
}
stat_.req_size = request_len_;
parsed_valid_http_request_ = true;
server_->HandleIncoming(this);
return;
}
void ThreadpoolHttpServer::RequestFromNamedPipe::SendReply(
const string& response) {
stat_.handler_time_msec = stat_.timer.GetInMs();
stat_.resp_size = response.size();
stat_.timer.Start();
req_->SendReply(response);
if (monitor_)
monitor_->FinishHandle(stat_);
delete this;
}
void ThreadpoolHttpServer::RequestFromNamedPipe::NotifyWhenClosed(
OneshotClosure* callback) {
req_->NotifyWhenClosed(callback);
}
#endif
class ThreadpoolHttpServer::RequestFromSocket : public HttpServerRequest {
public:
RequestFromSocket(
WorkerThreadManager* wm,
ScopedSocket&& sock, SocketType sock_type, const Stat& stat,
Monitor* monitor,
TrustedIpsManager* trustedipsmanager,
ThreadpoolHttpServer* server);
RequestFromSocket() = delete;
RequestFromSocket(const RequestFromSocket&) = delete;
RequestFromSocket& operator=(const RequestFromSocket&) = delete;
bool CheckCredential() override;
bool IsTrusted() override;
void Start();
void SendReply(const string& response) override;
void NotifyWhenClosed(OneshotClosure* callback) override;
private:
~RequestFromSocket() override;
void NotifyWhenClosedInternal(
WorkerThreadManager::ThreadId thread_id,
OneshotClosure* callback);
void DoRead();
void DoWrite();
void DoTimeout();
void ReadFinished();
void WriteFinished();
void DoReadEOF();
void DoCheckClosed();
void DoClosed();
void Finish();
ScopedSocket sock_;
SocketType socket_type_;
SocketDescriptor* d_;
bool request_is_chunked_;
size_t response_written_;
TrustedIpsManager* trustedipsmanager_;
// true if it finished read request, and waiting for ReadFinished()
// called back. In other words, callback to ReadFinished on the fly in
// worker thread manager.
bool read_finished_;
// true if it got timed out, and waiting for Finish() called back.
// In other words, callback to Finish on the fly in worker thread manager.
bool timed_out_;
WorkerThreadManager::ThreadId closed_thread_id_;
OneshotClosure* closed_callback_;
};
ThreadpoolHttpServer::RequestFromSocket::RequestFromSocket(
WorkerThreadManager* wm,
ScopedSocket&& sock, SocketType socket_type,
const ThreadpoolHttpServer::Stat& stat,
ThreadpoolHttpServer::Monitor* monitor,
TrustedIpsManager* trustedipsmanager,
ThreadpoolHttpServer* server)
: HttpServerRequest(wm, server, stat, monitor),
sock_(std::move(sock)),
socket_type_(socket_type),
d_(nullptr),
response_written_(0),
trustedipsmanager_(trustedipsmanager),
read_finished_(false),
timed_out_(false),
closed_thread_id_(0),
closed_callback_(nullptr) {
}
ThreadpoolHttpServer::RequestFromSocket::~RequestFromSocket() {
delete closed_callback_;
ScopedSocket fd(wm_->DeleteSocketDescriptor(d_));
d_ = nullptr;
server_->RemoveAccept(socket_type_);
}
bool ThreadpoolHttpServer::RequestFromSocket::CheckCredential() {
if (socket_type_ != SOCKET_IPC) {
return false;
}
if (d_ == nullptr) {
return false;
}
return CheckGomaIPCPeer(d_->wrapper(), &peer_pid_);
}
bool ThreadpoolHttpServer::RequestFromSocket::IsTrusted() {
if (trustedipsmanager_ == nullptr)
return true;
if (d_ == nullptr) {
return false;
}
union {
struct sockaddr_storage storage;
struct sockaddr_in in;
} addr;
socklen_t addrlen = sizeof(addr);
int r = getpeername(d_->fd(), reinterpret_cast<sockaddr*>(&addr), &addrlen);
if (r != 0) {
PLOG(WARNING) << "getpeername";
return false;
}
if (addr.storage.ss_family == AF_UNIX) {
VLOG(1) << "Access from unix domain socket";
return CheckCredential();
}
if (addr.storage.ss_family != AF_INET) {
LOG(WARNING) << "Access from no-INET:" << addr.storage.ss_family;
return false;
}
bool trusted = trustedipsmanager_->IsTrustedClient(addr.in.sin_addr);
char buf[128];
if (trusted) {
VLOG(1) << "Access from "
<< inet_ntop(AF_INET, &addr.in.sin_addr, buf, sizeof buf)
<< " trusted";
return true;
}
LOG(WARNING) << "Access from "
<< inet_ntop(AF_INET, &addr.in.sin_addr, buf, sizeof buf)
<< " untrusted";
return false;
}
void ThreadpoolHttpServer::RequestFromSocket::Start() {
stat_.waiting_time_msec = stat_.timer.GetInMs();
stat_.timer.Start();
thread_id_ = wm_->GetCurrentThreadId();
d_ = wm_->RegisterSocketDescriptor(std::move(sock_),
WorkerThreadManager::PRIORITY_HIGH);
d_->NotifyWhenReadable(NewPermanentCallback(
this, &ThreadpoolHttpServer::RequestFromSocket::DoRead));
d_->NotifyWhenTimedout(
kDefaultTimeoutSec,
NewCallback(
this, &ThreadpoolHttpServer::RequestFromSocket::DoTimeout));
}
void ThreadpoolHttpServer::RequestFromSocket::NotifyWhenClosed(
OneshotClosure* callback) {
CHECK(closed_callback_ == nullptr);
CHECK(callback != nullptr);
CHECK(read_finished_);
wm_->RunClosureInThread(
FROM_HERE,
thread_id_,
NewCallback(
this,
&ThreadpoolHttpServer::RequestFromSocket::NotifyWhenClosedInternal,
wm_->GetCurrentThreadId(),
callback),
WorkerThreadManager::PRIORITY_HIGH);
}
void ThreadpoolHttpServer::RequestFromSocket::NotifyWhenClosedInternal(
WorkerThreadManager::ThreadId thread_id,
OneshotClosure* callback) {
CHECK(closed_callback_ == nullptr);
CHECK(callback != nullptr);
CHECK(read_finished_);
closed_thread_id_ = thread_id;
closed_callback_ = callback;
d_->NotifyWhenReadable(NewPermanentCallback(
this, &ThreadpoolHttpServer::RequestFromSocket::DoCheckClosed));
}
void ThreadpoolHttpServer::RequestFromSocket::DoRead() {
CHECK(d_);
// If it already got timed out, do nothing. Eventually, Finish() will be
// called.
if (timed_out_)
return;
bool found_header = request_offset_ > 0 && request_content_length_ > 0;
int buf_size = request_.size() - request_len_;
if (found_header) {
if (request_.size() < request_offset_ + request_content_length_) {
request_.resize(request_offset_ + request_content_length_);
}
} else if (buf_size < kNetworkBufSize / 2) {
request_.resize(request_.size() + kNetworkBufSize);
}
char* buf = &request_[request_len_];
buf_size = request_.size() - request_len_;
CHECK_GT(buf_size, 0)
<< " request_len=" << request_len_
<< " request_.size=" << request_.size()
<< " offset=" << request_offset_
<< " content_length=" << request_content_length_;
int r = d_->Read(buf, buf_size);
if (r <= 0) { // EOF or error
if (d_->NeedRetry())
return;
d_->StopRead();
read_finished_ = true;
wm_->RunClosureInThread(
FROM_HERE,
thread_id_,
NewCallback(
this, &ThreadpoolHttpServer::RequestFromSocket::ReadFinished),
WorkerThreadManager::PRIORITY_IMMEDIATE);
return;
}
request_len_ += r;
absl::string_view req(request_.data(), request_len_);
if (found_header ||
FindContentLengthAndBodyOffset(
req, &request_content_length_, &request_offset_,
&request_is_chunked_)) {
// We do not support request encoded with chunked transfer coding.
if (request_is_chunked_) { // treat this as error.
LOG(ERROR) << "request is encoded with chunked transfer coding:"
<< req;
d_->StopRead();
read_finished_ = true;
wm_->RunClosureInThread(
FROM_HERE,
thread_id_,
NewCallback(
this, &ThreadpoolHttpServer::RequestFromSocket::ReadFinished),
WorkerThreadManager::PRIORITY_IMMEDIATE);
return;
}
if (request_len_ < request_offset_ + request_content_length_) {
// not fully received yet.
return;
}
stat_.read_req_time_msec = stat_.timer.GetInMs();
if (ParseRequestLine(req, &method_, &req_path_, &query_)) {
d_->StopRead();
stat_.req_size = request_len_;
read_finished_ = true;
parsed_valid_http_request_ = true;
wm_->RunClosureInThread(
FROM_HERE,
thread_id_,
NewCallback(
this, &ThreadpoolHttpServer::RequestFromSocket::ReadFinished),
WorkerThreadManager::PRIORITY_IMMEDIATE);
}
}
}
void ThreadpoolHttpServer::RequestFromSocket::DoWrite() {
DCHECK(d_);
int n = d_->Write(
response_.data() + response_written_,
response_.size() - response_written_);
if (n <= 0) {
if (d_->NeedRetry())
return;
d_->StopWrite();
wm_->RunClosureInThread(
FROM_HERE,
thread_id_,
NewCallback(
this, &ThreadpoolHttpServer::RequestFromSocket::Finish),
WorkerThreadManager::PRIORITY_HIGH);
return;
}
response_written_ += n;
if (response_written_ == response_.size()) {
d_->StopWrite();
stat_.write_resp_time_msec = stat_.timer.GetInMs();
wm_->RunClosureInThread(
FROM_HERE,
thread_id_,
NewCallback(
this, &ThreadpoolHttpServer::RequestFromSocket::WriteFinished),
WorkerThreadManager::PRIORITY_IMMEDIATE);
}
}
void ThreadpoolHttpServer::RequestFromSocket::DoTimeout() {
// If it already finished reading, do nothing. Eventually, ReadFinished()
// will be called.
if (read_finished_)
return;
d_->StopRead();
d_->StopWrite();
timed_out_ = true;
wm_->RunClosureInThread(
FROM_HERE,
thread_id_,
NewCallback(
this, &ThreadpoolHttpServer::RequestFromSocket::Finish),
WorkerThreadManager::PRIORITY_HIGH);
}
void ThreadpoolHttpServer::RequestFromSocket::DoCheckClosed() {
d_->StopRead();
d_->StopWrite();
if (!d_->IsReadable() && closed_callback_ != nullptr) {
VLOG(1) << "closed=" << d_->fd();
} else {
PLOG(WARNING) << "readable after request? fd=" << d_->fd();
}
wm_->RunClosureInThread(
FROM_HERE,
thread_id_,
NewCallback(
this, &ThreadpoolHttpServer::RequestFromSocket::DoClosed),
WorkerThreadManager::PRIORITY_IMMEDIATE);
}
void ThreadpoolHttpServer::RequestFromSocket::DoClosed() {
d_->ClearReadable();
OneshotClosure* callback = closed_callback_;
closed_callback_ = nullptr;
if (callback != nullptr) {
wm_->RunClosureInThread(
FROM_HERE,
closed_thread_id_,
NewCallback(static_cast<Closure*>(callback), &Closure::Run),
WorkerThreadManager::PRIORITY_HIGH);
}
}
void ThreadpoolHttpServer::RequestFromSocket::ReadFinished() {
CHECK(read_finished_);
stat_.timer.Start();
d_->ClearReadable();
d_->ClearTimeout();
server_->HandleIncoming(this);
}
void ThreadpoolHttpServer::RequestFromSocket::WriteFinished() {
CHECK(d_);
d_->ClearWritable();
d_->ShutdownForSend();
// Wait for readable, and expecting Read()==0 (EOF).
d_->NotifyWhenReadable(NewPermanentCallback(
this, &ThreadpoolHttpServer::RequestFromSocket::DoReadEOF));
}
void ThreadpoolHttpServer::RequestFromSocket::DoReadEOF() {
CHECK(d_);
char buf[1];
int r = d_->Read(buf, sizeof buf);
if (r == 0) {
// EOF
VLOG(1) << d_->fd() << " EOF";
} else if (r < 0) {
const string err = d_->GetLastErrorMessage();
// client may have closed once it had received all response message,
// before server ack EOF.
VLOG(1) << "shutdown error? fd=" << d_->fd() << ":" << err;
} else {
// unexpected receiving data?
LOG(WARNING) << "unexpected data after shutdown fd=" << d_->fd();
}
d_->StopRead();
wm_->RunClosureInThread(
FROM_HERE,
thread_id_,
NewCallback(
this, &ThreadpoolHttpServer::RequestFromSocket::Finish),
WorkerThreadManager::PRIORITY_HIGH);
}
void ThreadpoolHttpServer::RequestFromSocket::Finish() {
if (monitor_)
monitor_->FinishHandle(stat_);
delete this;
}
void ThreadpoolHttpServer::RequestFromSocket::SendReply(
const string& response) {
response_ = response;
stat_.handler_time_msec = stat_.timer.GetInMs();
stat_.resp_size = response.size();
stat_.timer.Start();
d_->NotifyWhenWritable(
NewPermanentCallback(
this, &ThreadpoolHttpServer::RequestFromSocket::DoWrite));
}
void ThreadpoolHttpServer::HandleIncoming(HttpServerRequest* request) {
if (request->ParsedValidHttpRequest()) {
http_handler_->HandleHttpRequest(request);
} else {
request->SendReply("500 Unexpected Server Error\r\n\r\n");
}
}
// Returns true if bind succeeded with at most num_find_ports retries.
// The parameter sa and port may be modified when retries happen.
static bool BindPortWithRetries(int fd, int num_find_ports,
struct sockaddr_in* sa, int* port) {
socklen_t sa_size = sizeof(*sa);
int num_retries = 0;
int orig_port = *port;
for (;;) {
sa->sin_port = htons(static_cast<u_short>(*port));
if (bind(fd, (struct sockaddr*)sa, sa_size) >= 0) {
return true;
}
if (num_retries < num_find_ports) {
PLOG(WARNING) << "bind failed for port " << *port
<< ". We will check the next port...";
++num_retries;
++*port;
} else {
PLOG(ERROR) << "bind failed with " << num_retries << " retries. "
<< "We checked ports from " << orig_port
<< " to " << *port << " inclusive.";
return false;
}
}
}
class ThreadpoolHttpServer::IdleClosure {
public:
// closure must be a permanent callback.
IdleClosure(SocketType socket_type,
int count,
ThreadpoolHttpServer::RegisteredClosureID id,
std::unique_ptr<PermanentClosure> closure)
: socket_type_(socket_type),
count_(count),
id_(id),
closure_(std::move(closure)) {
}
~IdleClosure() {
}
SocketType socket_type() const { return socket_type_; }
int count() const { return count_; }
ThreadpoolHttpServer::RegisteredClosureID id() const { return id_; }
PermanentClosure* closure() const { return closure_.get(); }
private:
const SocketType socket_type_;
const int count_;
const ThreadpoolHttpServer::RegisteredClosureID id_;
std::unique_ptr<PermanentClosure> closure_;
DISALLOW_COPY_AND_ASSIGN(IdleClosure);
};
ThreadpoolHttpServer::RegisteredClosureID
ThreadpoolHttpServer::RegisterIdleClosure(
SocketType socket_type, int count,
std::unique_ptr<PermanentClosure> closure) {
DCHECK_GT(count, 0);
AUTOLOCK(lock, &mu_);
++last_closure_id_;
CHECK_GT(last_closure_id_, kInvalidClosureId);
idle_closures_.push_back(
new IdleClosure(socket_type, count,
last_closure_id_, std::move(closure)));
return last_closure_id_;
}
void ThreadpoolHttpServer::UnregisterIdleClosure(
RegisteredClosureID id) {
AUTOLOCK(lock, &mu_);
for (std::vector<IdleClosure*>::iterator iter = idle_closures_.begin();
iter != idle_closures_.end();
++iter) {
IdleClosure* idle_closure = *iter;
if (idle_closure->id() == id) {
delete idle_closure;
idle_closures_.erase(iter);
return;
}
}
LOG(ERROR) << "try to unregister invalid closure"
<< " id=" << id;
}
void ThreadpoolHttpServer::UpdateSocketIdleUnlocked(SocketType socket_type) {
if (!idle_counting_) {
LOG(INFO) << "update socket type:" << socket_type
<< " while suspending idle counting";
return;
}
if (num_sockets_[socket_type] == 0) {
++idle_counter_[socket_type];
for (size_t i = 0; i < idle_closures_.size(); ++i) {
IdleClosure* idle_closure = idle_closures_[i];
if (idle_closure->socket_type() == socket_type &&
((idle_counter_[socket_type] % idle_closure->count()) == 0)) {
LOG(INFO) << "idle closure socket_type:" << socket_type
<< " idle_counter=" << idle_counter_[socket_type];
wm_->RunClosure(FROM_HERE,
idle_closure->closure(),
WorkerThreadManager::PRIORITY_MIN);
}
}
}
}
int ThreadpoolHttpServer::Loop() {
struct sockaddr_in sa;
ScopedSocket incoming_socket; // the main waiting socket
socklen_t sa_size = sizeof(sa);
// TODO: listen IPv6 if any. Need to fix BindPortWithRetries().
incoming_socket.reset(socket(AF_INET, SOCK_STREAM, 0));
if (!incoming_socket.valid()) {
PLOG(ERROR) << "socket";
return 1;
}
CHECK(incoming_socket.SetCloseOnExec());
CHECK(incoming_socket.SetNonBlocking());
if (!incoming_socket.SetReuseAddr()) {
PLOG(ERROR) << "setsockopt SO_REUSEADDR";
return 1;
}
memset(&sa, 0, sizeof(sa));
if (listen_addr_ == "localhost") {
sa.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
} else if (listen_addr_ == "") {
sa.sin_addr.s_addr = htonl(INADDR_ANY);
} else {
LOG(FATAL) << "Unsupported listen_addr:" << listen_addr_;
}
sa.sin_family = AF_INET;
if (!BindPortWithRetries(incoming_socket.get(), num_find_ports_,
&sa, &port_)) {
return 1;
}
listen(incoming_socket.get(), BACKLOG);
if (getsockname(incoming_socket.get(),
(struct sockaddr*)&sa, &sa_size) == 0) {
std::cout << "GOMA version " << kBuiltRevisionString << " is ready."
<< std::endl;
std::cout << "HTTP server now listening to port " << ntohs(sa.sin_port)
<< ", access with http://localhost:" << ntohs(sa.sin_port)
<< std::endl;
}
{
AUTOLOCK(lock, &mu_);
port_ready_ = true;
cond_.Broadcast();
}
LOG(INFO) << "listen on port " << ntohs(sa.sin_port);
for (;;) {
if (http_handler_->shutting_down()) {
LOG(INFO) << "Shutting down...";
un_socket_.reset(-1);
incoming_socket.reset(-1);
return 0;
}
fd_set read_fd;
auto max_fd = incoming_socket.get();
FD_ZERO(&read_fd);
MSVC_PUSH_DISABLE_WARNING_FOR_FD_SET();
FD_SET(incoming_socket.get(), &read_fd);
MSVC_POP_WARNING();
if (un_socket_.valid()) {
MSVC_PUSH_DISABLE_WARNING_FOR_FD_SET();
FD_SET(un_socket_.get(), &read_fd);
MSVC_POP_WARNING();
max_fd = std::max(max_fd, un_socket_.get());
}
struct timeval tv;
tv.tv_sec = 1;
tv.tv_usec = 0;
int r = select(max_fd + 1, &read_fd, nullptr, nullptr, &tv);
if (r == 0) {
// timeout?
AUTOLOCK(lock, &mu_);
// 1 sec idle on both socket.
for (int i = 0; i < NUM_SOCKET_TYPES; ++i) {
UpdateSocketIdleUnlocked(static_cast<SocketType>(i));
}
continue;
}
if (r == -1) {
PLOG(WARNING) << "select";
continue;
}
if (FD_ISSET(incoming_socket.get(), &read_fd)) {
struct sockaddr_in tmpisa;
socklen_t addrlen = sizeof(tmpisa);
ScopedSocket accepted_socket(accept(incoming_socket.get(),
(struct sockaddr*)&tmpisa,
&addrlen));
if (!accepted_socket.valid()) {
if (errno == EINTR)
continue;
PLOG(ERROR) << "accept incoming_socket";
return 1;
}
AddAccept(SOCKET_TCP);
if (!accepted_socket.SetCloseOnExec()) {
LOG(ERROR) << "failed to set FD_CLOEXEC";
RemoveAccept(SOCKET_TCP);
accepted_socket.Close();
return 1;
}
// send the new incoming socket to a worker thread.
SendJobToWorkerThread(std::move(accepted_socket), SOCKET_TCP);
} else {
AUTOLOCK(lock, &mu_);
// tcp was idle, but unix would have some event in 1 sec.
UpdateSocketIdleUnlocked(SOCKET_TCP);
}
if (un_socket_.valid() && FD_ISSET(un_socket_.get(), &read_fd)) {
GomaIPCAddr tmpaddr;
socklen_t addrlen = sizeof(tmpaddr);
ScopedSocket accepted_socket(accept(un_socket_.get(),
(struct sockaddr*)&tmpaddr,
&addrlen));
if (!accepted_socket.valid()) {
if (errno == EINTR)
continue;
PLOG(ERROR) << "accept unix domain socket";
if (errno == EMFILE) {
PlatformThread::Sleep(100000);
continue;
}
return 1;
}
AddAccept(SOCKET_IPC);
if (!accepted_socket.SetCloseOnExec()) {
LOG(ERROR) << "failed to set FD_CLOEXEC";
RemoveAccept(SOCKET_IPC);
accepted_socket.Close();
return 1;
}
VLOG(1) << "un_socket=" << un_socket_.get()
<< "=>" << accepted_socket;
SendJobToWorkerThread(std::move(accepted_socket), SOCKET_IPC);
} else if (un_socket_.valid()) {
AUTOLOCK(lock, &mu_);
// unix was idle, but tcp would have some event in 1 sec.
UpdateSocketIdleUnlocked(SOCKET_IPC);
}
}
// Unreachable
}
void ThreadpoolHttpServer::Wait() {
AUTOLOCK(lock, &mu_);
LOG(INFO) << "Wait for http requests...";
for (;;) {
bool busy = false;
for (int i = 0; i < NUM_SOCKET_TYPES; ++i) {
if (num_sockets_[i] > 0) {
LOG(INFO) << "socket[" << i << "]=" << num_sockets_[i];
busy = true;
break;
}
}
if (busy) {
cond_.Wait(&mu_);
continue;
}
LOG(INFO) << "All http requests done.";
return;
}
}
int ThreadpoolHttpServer::idle_counter(SocketType socket_type) const {
AUTOLOCK(lock, &mu_);
return idle_counter_[socket_type];
}
void ThreadpoolHttpServer::SuspendIdleCounter() {
AUTOLOCK(lock, &mu_);
LOG(INFO) << "suspend idle counter";
idle_counting_ = false;
}
void ThreadpoolHttpServer::ResumeIdleCounter() {
AUTOLOCK(lock, &mu_);
LOG(INFO) << "resume idle counter";
idle_counting_ = true;
}
void ThreadpoolHttpServer::AddAccept(SocketType socket_type) {
AUTOLOCK(lock, &mu_);
// WorkerThreadManager is using select(2) to handle sockets I/O
// (for compaibility reason), so it couldn't handle fd >= max_num_sockets_.
++num_sockets_[socket_type];
if (idle_counting_) {
idle_counter_[socket_type] = 0;
} else {
LOG(INFO) << "accept socket type:" << socket_type
<< " while suspending idle counting";
}
while ((num_sockets_[socket_type] > max_sockets_[socket_type]) ||
(num_sockets_[SOCKET_TCP] + num_sockets_[SOCKET_IPC] >=
max_num_sockets_)) {
LOG(WARNING) << "Too many accepting socket: "
<< " tcp:" << num_sockets_[SOCKET_TCP]
<< " ipc:" << num_sockets_[SOCKET_IPC];
// Wait some request finishes and release socket by RemoveAccept().
cond_.Wait(&mu_);
}
}
void ThreadpoolHttpServer::RemoveAccept(SocketType socket_type) {
AUTOLOCK(lock, &mu_);
--num_sockets_[socket_type];
// Notify some request waiting in AddAccept().
cond_.Signal();
}
void ThreadpoolHttpServer::WaitPortReady() {
AUTOLOCK(lock, &mu_);
while (!port_ready_) {
LOG(INFO) << "http server is not yet ready";
cond_.Wait(&mu_);
}
}
#ifdef _WIN32
void ThreadpoolHttpServer::SendNamedPipeJobToWorkerThread(
NamedPipeServer::Request* req) {
WaitPortReady();
RequestFromNamedPipe* http_server_request =
new RequestFromNamedPipe(wm_, this, Stat(), monitor_, req);
wm_->RunClosureInPool(
FROM_HERE,
pool_,
NewCallback(
http_server_request,
&ThreadpoolHttpServer::RequestFromNamedPipe::Start),
WorkerThreadManager::PRIORITY_HIGH);
}
#endif
void ThreadpoolHttpServer::SendJobToWorkerThread(
ScopedSocket&& socket, SocketType socket_type) {
WaitPortReady();
RequestFromSocket* http_server_request =
new RequestFromSocket(wm_, std::move(socket), socket_type, Stat(),
monitor_, trustedipsmanager_, this);
wm_->RunClosureInPool(
FROM_HERE,
pool_,
NewCallback(
http_server_request,
&ThreadpoolHttpServer::RequestFromSocket::Start),
WorkerThreadManager::PRIORITY_HIGH);
}
} // namespace devtools_goma