blob: e6496c5f1c7328a9587d1d878d833dbb03356e8c [file] [log] [blame]
// Copyright 2016 The Goma Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "named_pipe_server_win.h"
#include <deque>
#include <string>
#include "absl/strings/string_view.h"
#include "autolock_timer.h"
#include "callback.h"
#include "config_win.h"
#include "counterz.h"
#include "glog/logging.h"
#include "worker_thread_manager.h"
namespace devtools_goma {
static const int kInputBufSize = 64 * 1024; // bytes
static const int kOutputBufSize = 128 * 1024; // bytes
static const int kTimeoutMillisec = 50;
class NamedPipeServer::Conn {
public:
Conn(NamedPipeServer* server, ScopedNamedPipe&& pipe)
: server_(server),
pipe_(std::move(pipe)),
thread_id_(server->wm_->GetCurrentThreadId()),
err_(0),
written_(0),
closed_thread_id_(0) {
memset(&overlapped_, 0, sizeof overlapped_);
buf_.resize(kInputBufSize);
req_.reset(new Req(this));
close_watcher_.reset(new CloseWatcher(this));
}
~Conn() {
// Cancel all pending I/O before delete of this instance.
// It is meaningless to proceed pending I/O after the delete,
// and also cause use-after-free to execute completion routine.
if (pipe_.get()) {
if (CancelIo(pipe_.get()) == 0) {
LOG_SYSRESULT(GetLastError());
LOG(ERROR) << "cancel io failed: " << this;
}
}
}
Conn(const Conn&) = delete;
Conn& operator=(const Conn&) = delete;
NamedPipeServer::Request* req() const {
return req_.get();
}
bool BelongsToCurrentThread() const {
return THREAD_ID_IS_SELF(thread_id_);
}
bool Start() {
VLOG(1) << "conn start " << this;
DCHECK(BelongsToCurrentThread());
memset(&overlapped_, 0, sizeof overlapped_);
CHECK_EQ(reinterpret_cast<LPOVERLAPPED>(this), &overlapped_);
return ReadFileEx(pipe_.get(), &buf_[0], kInputBufSize,
&overlapped_,
&NamedPipeServer::Conn::ReadCompleted) != 0;
}
bool Reply() {
VLOG(1) << "conn reply " << this;
DCHECK(BelongsToCurrentThread());
// stop Read detecting EOF.
// no need to detect EOF once it starts replying.
if (CancelIo(pipe_.get()) == 0) {
LOG_SYSRESULT(GetLastError());
LOG(ERROR) << "cancel EOF detector " << this;
}
{
AUTOLOCK(lock, &mu_);
closed_callback_.reset();
}
VLOG_IF(1, buf_.size() > kOutputBufSize)
<< "conn reply too large: size=" << buf_.size();
CHECK_EQ(written_, 0) << "conn reply";
memset(&overlapped_, 0, sizeof overlapped_);
CHECK_EQ(reinterpret_cast<LPOVERLAPPED>(this), &overlapped_);
return WriteFileEx(pipe_.get(), &buf_[written_], buf_.size() - written_,
&overlapped_,
&NamedPipeServer::Conn::WriteCompleted) != 0;
}
void WatchClosed() {
DCHECK(BelongsToCurrentThread());
{
AUTOLOCK(lock, &mu_);
if (closed_callback_ == nullptr) {
// WatchClosed might be called after Reply.
// no need to start close_watcher_.
return;
}
}
close_watcher_->Run();
}
void Flush() {
if (FlushFileBuffers(pipe_.get()) == 0) {
LOG_SYSRESULT(GetLastError());
LOG(ERROR) << "conn failed to flush " << this;
}
}
DWORD error() const { return err_; }
private:
class Req : public NamedPipeServer::Request {
public:
explicit Req(Conn* conn) : conn_(conn) {}
~Req() override {}
Req(const Req&) = delete;
Req& operator=(const Req&) = delete;
absl::string_view request_message() const override {
return conn_->request_message_;
}
void SendReply(absl::string_view reply) override {
conn_->SendReply(reply);
}
void NotifyWhenClosed(OneshotClosure* callback) override {
conn_->NotifyWhenClosed(callback);
}
private:
Conn* conn_;
};
class CloseWatcher {
public:
explicit CloseWatcher(Conn* conn) : conn_(conn) {}
~CloseWatcher() {}
CloseWatcher(const CloseWatcher&) = delete;
CloseWatcher& operator=(const CloseWatcher&) = delete;
void Run() {
memset(&overlapped_, 0, sizeof overlapped_);
// start Read and if it got error, fire close notifier.
CHECK_EQ(reinterpret_cast<LPOVERLAPPED>(this), &overlapped_);
if (ReadFileEx(conn_->pipe_.get(), eofBuf_, sizeof eofBuf_,
&overlapped_,
&NamedPipeServer::Conn::CloseWatcher::EOFDetected) == 0) {
DWORD err = GetLastError();
if (err == ERROR_HANDLE_EOF) {
NotifyClosed(err, 0);
return;
}
LOG_SYSRESULT(err);
LOG(ERROR) << "conn failed to setup eof detector " << this;
}
}
private:
static void EOFDetected(
DWORD err, DWORD num_bytes, LPOVERLAPPED overlapped) {
VLOG(1) << "EOFDetected err=" << err
<< " num_bytes=" << num_bytes;
CloseWatcher* cw = reinterpret_cast<CloseWatcher*>(overlapped);
cw->NotifyClosed(err, num_bytes);
}
void NotifyClosed(DWORD err, DWORD num_bytes) {
if (err == 0) {
if (GetOverlappedResult(conn_->pipe_.get(), &overlapped_,
&num_bytes, FALSE) == 0) {
LOG_SYSRESULT(GetLastError());
LOG(ERROR) << "conn close watcher error";
}
err = GetLastError();
}
conn_->NotifyClosed(err, num_bytes);
}
OVERLAPPED overlapped_;
Conn* conn_;
char eofBuf_[1];
};
void SendReply(absl::string_view reply) {
buf_.assign(reply.begin(), reply.end());
server_->ReadyToReply(this);
}
void NotifyWhenClosed(OneshotClosure* callback) {
CHECK(callback != nullptr);
{
AUTOLOCK(lock, &mu_);
CHECK(closed_callback_ == nullptr);
closed_callback_.reset(callback);
closed_thread_id_ = server_->wm_->GetCurrentThreadId();
}
server_->NotifyWhenClosed(this);
}
static void ReadCompleted(
DWORD err, DWORD num_bytes, LPOVERLAPPED overlapped) {
VLOG(1) << "ReadCompleted err=" << err
<< " num_bytes=" << num_bytes;
Conn* conn = reinterpret_cast<Conn*>(overlapped);
conn->ReadDone(err, num_bytes);
}
static void WriteCompleted(
DWORD err, DWORD num_bytes, LPOVERLAPPED overlapped) {
VLOG(1) << "WriteCompleted err=" << err
<< " num_bytes=" << num_bytes;
Conn* conn = reinterpret_cast<Conn*>(overlapped);
conn->WriteDone(err, num_bytes);
}
void ReadDone(DWORD err, DWORD num_bytes) {
DCHECK(BelongsToCurrentThread());
err_ = err;
// num_bytes = 0 means some error happens.
if (num_bytes > 0) {
request_message_ = absl::string_view(buf_.data(), num_bytes);
} else {
request_message_ = absl::string_view();
}
server_->ReadDone(this);
}
void NotifyClosed(DWORD err, DWORD num_bytes) {
DCHECK(BelongsToCurrentThread());
if (err == ERROR_OPERATION_ABORTED) {
// I/O operation were canceled. No need to notify.
return;
}
LOG(INFO) << "named pipe closed. err=" << err;
err_ = err;
server_->Closed(this);
OneshotClosure* callback = nullptr;
WorkerThreadManager::ThreadId thread_id;
{
AUTOLOCK(lock, &mu_);
callback = closed_callback_.release();
thread_id = closed_thread_id_;
}
if (callback != nullptr) {
CHECK_NE(thread_id, 0U);
server_->wm_->RunClosureInThread(
FROM_HERE,
thread_id,
NewCallback(static_cast<Closure*>(callback), &Closure::Run),
WorkerThreadManager::PRIORITY_HIGH);
}
}
void WriteDone(DWORD err, DWORD num_bytes) {
DCHECK(BelongsToCurrentThread());
err_ = err;
if (err == 0) {
BOOL r = false;
if (GetOverlappedResult(pipe_.get(), &overlapped_,
&num_bytes, FALSE)) {
if (num_bytes > 0) {
written_ += num_bytes;
if (written_ == buf_.size()) {
server_->WriteDone(this);
return;
}
CHECK_LT(written_, buf_.size()) << "conn write overrun?";
memset(&overlapped_, 0, sizeof overlapped_);
CHECK_EQ(reinterpret_cast<LPOVERLAPPED>(this), &overlapped_);
r = WriteFileEx(pipe_.get(),
&buf_[written_], buf_.size() - written_,
&overlapped_,
&NamedPipeServer::Conn::WriteCompleted);
if (r != 0) {
return;
}
}
LOG(ERROR) << "conn write num_bytes=" << num_bytes
<< " written=" << written_
<< " WriteFileEx=" << r;
}
err = GetLastError();
if (err == ERROR_IO_PENDING) {
// never happens?
return;
}
err_ = err;
}
LOG_SYSRESULT(err);
LOG(ERROR) << "conn write done error err=" << err
<< " num_bytes=" << num_bytes
<< " buf_size=" << buf_.size()
<< " written=" << written_;
server_->WriteDone(this);
}
OVERLAPPED overlapped_; // should be initial member at offset 0.
NamedPipeServer* server_;
ScopedNamedPipe pipe_;
WorkerThreadManager::ThreadId thread_id_;
DWORD err_;
std::vector<char> buf_;
absl::string_view request_message_;
size_t written_;
mutable Lock mu_;
WorkerThreadManager::ThreadId closed_thread_id_ GUARDED_BY(mu_);
std::unique_ptr<OneshotClosure> closed_callback_ GUARDED_BY(mu_);
std::unique_ptr<Req> req_;
std::unique_ptr<CloseWatcher> close_watcher_;
};
NamedPipeServer::~NamedPipeServer() {
CHECK(!ready_.valid());
CHECK(!watch_closed_.valid());
CHECK(!reply_.valid());
CHECK(!shutdown_.valid());
CHECK(!done_.valid());
CHECK(!flush_.valid());
CHECK(!flusher_done_.valid());
CHECK(actives_.empty());
CHECK(replies_.empty());
CHECK(finished_.empty());
CHECK(flushes_.empty());
}
void NamedPipeServer::Start(const std::string& name) {
LOG(INFO) << "Start for " << name;
ready_.reset(CreateEvent(nullptr, TRUE, FALSE, nullptr));
if (!ready_.valid()) {
LOG_SYSRESULT(GetLastError());
LOG(FATAL) << "Failed to create event for ready";
}
watch_closed_.reset(CreateEvent(nullptr, FALSE, FALSE, nullptr));
if (!watch_closed_.valid()) {
LOG_SYSRESULT(GetLastError());
LOG(FATAL) << "Failed to create event for watch_closed";
}
reply_.reset(CreateEvent(nullptr, FALSE, FALSE, nullptr));
if (!reply_.valid()) {
LOG_SYSRESULT(GetLastError());
LOG(FATAL) << "Failed to create event for reply";
}
shutdown_.reset(CreateEvent(nullptr, TRUE, FALSE, nullptr));
if (!shutdown_.valid()) {
LOG_SYSRESULT(GetLastError());
LOG(FATAL) << "Failed to create event for shutdown";
}
done_.reset(CreateEvent(nullptr, TRUE, FALSE, nullptr));
if (!done_.valid()) {
LOG_SYSRESULT(GetLastError());
LOG(FATAL) << "Failed to create event for done";
}
flush_.reset(CreateEvent(nullptr, FALSE, FALSE, nullptr));
if (!flush_.valid()) {
LOG_SYSRESULT(GetLastError());
LOG(FATAL) << "Failed to create event for flush";
}
flusher_done_.reset(CreateEvent(nullptr, FALSE, FALSE, nullptr));
if (!flusher_done_.valid()) {
LOG_SYSRESULT(GetLastError());
LOG(FATAL) << "Failed to create event for flusher";
}
wm_->NewThread(NewCallback(this, &NamedPipeServer::Flusher),
"pipe_flusher");
wm_->NewThread(NewCallback(this, &NamedPipeServer::Run, name),
"pipe_server");
DWORD w = WaitForSingleObject(ready_.handle(), 10*1000); // 10 secs timeout
if (w != WAIT_OBJECT_0) {
LOG_SYSRESULT(GetLastError());
LOG(FATAL) << "Failed to wait for ready: w=" << w;
}
}
void NamedPipeServer::Stop() {
LOG(INFO) << "Stop";
if (!shutdown_.valid() || !done_.valid()) {
LOG(INFO) << "not running?";
return;
}
{
AUTOLOCK(lock, &mu_);
shutting_down_ = true;
}
if (!SetEvent(shutdown_.handle())) {
LOG_SYSRESULT(GetLastError());
LOG(FATAL) << "Failed to signal shutdown";
}
bool finished = false;
HANDLE events[2];
events[0] = done_.handle();
events[1] = flusher_done_.handle();
while (!finished) {
DWORD w = WaitForMultipleObjectsEx(
2, events,
TRUE, // wait all,
INFINITE,
TRUE);
switch (w) {
case WAIT_OBJECT_0: FALLTHROUGH_INTENDED;
case WAIT_OBJECT_0 + 1:
finished = true;
break;
case WAIT_IO_COMPLETION:
continue;
default:
LOG_SYSRESULT(GetLastError());
LOG(FATAL) << "Failed to wait for done: w=" << w;
}
}
LOG(INFO) << "done";
ready_.Close();
watch_closed_.Close();
reply_.Close();
shutdown_.Close();
done_.Close();
flush_.Close();
flusher_done_.Close();
std::unordered_set<Conn*> conns;
{
AUTOLOCK(lock, &mu_);
conns.insert(actives_.begin(), actives_.end());
actives_.clear();
conns.insert(replies_.begin(), replies_.end());
replies_.clear();
conns.insert(finished_.begin(), finished_.end());
finished_.clear();
conns.insert(flushes_.begin(), flushes_.end());
flushes_.clear();
}
for (const auto* conn : conns) {
delete conn;
}
}
void NamedPipeServer::ReadyToReply(Conn* conn) {
{
AUTOLOCK(lock, &mu_);
actives_.erase(conn);
watches_.erase(conn);
if (shutting_down_) {
LOG(WARNING) << "will not update replies_ because shutting down.";
delete conn;
return;
} else {
replies_.push_back(conn);
}
}
if (!SetEvent(reply_.handle())) {
LOG_SYSRESULT(GetLastError());
LOG(ERROR) << "Failed to signal reply";
}
}
void NamedPipeServer::NotifyWhenClosed(Conn* conn) {
{
AUTOLOCK(lock, &mu_);
watches_.insert(conn);
}
if (!SetEvent(watch_closed_.handle())) {
LOG_SYSRESULT(GetLastError());
LOG(ERROR) << "Failed to signal watch_closed";
}
}
void NamedPipeServer::Run(std::string name) {
thread_id_ = wm_->GetCurrentThreadId();
std::string pipename = "\\\\.\\pipe\\" + name;
LOG(INFO) << "Run pipe=" << pipename;
ScopedFd connected(CreateEvent(
nullptr, // default security attribute
TRUE, // manual reset event
TRUE, // initial state = signaled
nullptr));
if (!connected.valid()) {
LOG_SYSRESULT(GetLastError());
LOG(FATAL) << "Failed to create event for connect";
}
OVERLAPPED o_connect;
o_connect.hEvent = connected.handle();
bool is_pending = NewPipe(pipename, &o_connect);
if (!SetEvent(ready_.handle())) {
LOG_SYSRESULT(GetLastError());
LOG(FATAL) << "Failed to signal ready";
}
LOG(INFO) << "pipe=" << pipename << " ready";
HANDLE events[4];
events[0] = connected.handle();
events[1] = watch_closed_.handle();
events[2] = reply_.handle();
events[3] = shutdown_.handle();
for (;;) {
DWORD w = WaitForMultipleObjectsEx(
4, events,
FALSE, // wait all
INFINITE,
TRUE);
GOMA_COUNTERZ("After WaitForMultipleObjectsEx");
switch (w) {
case WAIT_OBJECT_0: // connected
if (is_pending) {
DWORD num_bytes = 0;
BOOL ok = GetOverlappedResult(
pipe_.get(),
&o_connect,
&num_bytes,
FALSE);
if (!ok) {
LOG_SYSRESULT(GetLastError());
LOG(ERROR) << "Failed to GetOverlappedResult for connect";
return;
}
}
if (pipe_.valid()) {
GOMA_COUNTERZ("new Conn and etc.");
VLOG(1) << "connected";
Conn* conn = new Conn(this, std::move(pipe_));
{
AUTOLOCK(lock, &mu_);
actives_.insert(conn);
}
if (!conn->Start()) {
LOG(ERROR) << "conn start failed";
{
AUTOLOCK(lock, &mu_);
actives_.erase(conn);
}
delete conn;
}
}
is_pending = NewPipe(pipename, &o_connect);
VLOG(1) << "new pipe is_pending=" << is_pending;
break;
case WAIT_OBJECT_0 + 1: // watch closed
VLOG(1) << "watch closed";
ProcessWatchClosed();
break;
case WAIT_OBJECT_0 + 2: // ready to reply
VLOG(1) << "ready to reply";
ProcessReplies();
break;
case WAIT_OBJECT_0 + 3:
LOG(INFO) << "shutting down";
if (CancelIo(pipe_.get()) == 0) {
LOG_SYSRESULT(GetLastError());
LOG(ERROR) << "cancel connect named pipe";
}
if (!SetEvent(done_.handle())) {
LOG_SYSRESULT(GetLastError());
LOG(FATAL) << "Failed to signal done";
}
return;
case WAIT_IO_COMPLETION:
VLOG(2) << "io completion";
// The wait is satisfied by a completed read or write operation.
// This allows the system to execute the completion routine.
break;
default:
LOG_SYSRESULT(GetLastError());
LOG(FATAL) << "WaitForMultipleObjectsEx";
return;
}
}
}
bool NamedPipeServer::NewPipe(
const std::string& pipename, OVERLAPPED* overlapped) {
GOMA_COUNTERZ("");
DCHECK(THREAD_ID_IS_SELF(thread_id_));
pipe_ = ScopedNamedPipe(
CreateNamedPipeA(pipename.c_str(),
PIPE_ACCESS_DUPLEX |
FILE_FLAG_OVERLAPPED,
PIPE_TYPE_BYTE |
PIPE_READMODE_BYTE |
PIPE_WAIT |
PIPE_REJECT_REMOTE_CLIENTS,
PIPE_UNLIMITED_INSTANCES,
kOutputBufSize,
kInputBufSize,
kTimeoutMillisec,
nullptr)); // TODO: set security attributes.
if (!pipe_.valid()) {
LOG_SYSRESULT(GetLastError());
LOG(ERROR) << "Failed to open pipe " << pipename;
return false;
}
// TODO: Make calling ConnectNamedpipe loop as quickly as possible or
// make the loop multi-threaded.
if (ConnectNamedPipe(pipe_.get(), overlapped)) {
LOG_SYSRESULT(GetLastError());
LOG(ERROR) << "Failed to ConnectNamedPipe";
return false;
}
switch (GetLastError()) {
case ERROR_IO_PENDING:
// overlapped connection in progress.
return true;
case ERROR_PIPE_CONNECTED:
// client is already connected, signal.
if (SetEvent(overlapped->hEvent)) {
break;
}
FALLTHROUGH_INTENDED;
default:
LOG_SYSRESULT(GetLastError());
LOG(ERROR) << "Failed to ConnectNamedPipe";
}
return false;
}
void NamedPipeServer::ReadDone(Conn* conn) {
VLOG(1) << "ReadDone err=" << conn->error();
DCHECK(THREAD_ID_IS_SELF(thread_id_));
if (conn->error() != 0) {
LOG(ERROR) << "Read error:" << conn->error();
{
AUTOLOCK(lock, &mu_);
actives_.erase(conn);
}
delete conn;
return;
}
wm_->RunClosure(FROM_HERE,
NewCallback(handler_,
&NamedPipeServer::Handler::HandleIncoming,
conn->req()),
WorkerThreadManager::PRIORITY_HIGH);
}
void NamedPipeServer::ProcessWatchClosed() {
VLOG(1) << "ProcessWatchClosed";
DCHECK(THREAD_ID_IS_SELF(thread_id_));
std::unordered_set<Conn*> watches;
{
AUTOLOCK(lock, &mu_);
watches.swap(watches_);
}
for (auto& conn : watches) {
VLOG(1) << "process watch conn=" << conn;
conn->WatchClosed();
}
}
void NamedPipeServer::ProcessReplies() {
VLOG(1) << "ProcessReplies";
DCHECK(THREAD_ID_IS_SELF(thread_id_));
std::deque<Conn*> replies;
{
AUTOLOCK(lock, &mu_);
replies.swap(replies_);
}
for (auto& conn : replies) {
VLOG(1) << "process reply conn=" << conn;
if (!conn->Reply()) {
LOG_SYSRESULT(GetLastError());
LOG(WARNING) << "Reply error";
{
AUTOLOCK(lock, &mu_);
CHECK_EQ(watches_.count(conn), 0U);
}
delete conn;
} else {
AUTOLOCK(lock, &mu_);
finished_.insert(conn);
}
}
}
void NamedPipeServer::Closed(Conn* conn) {
DCHECK(THREAD_ID_IS_SELF(thread_id_));
VLOG(1) << "Closed";
AUTOLOCK(lock, &mu_);
actives_.erase(conn);
}
void NamedPipeServer::WriteDone(Conn* conn) {
DCHECK(THREAD_ID_IS_SELF(thread_id_));
VLOG(1) << "WriteDone";
{
AUTOLOCK(lock, &mu_);
CHECK_EQ(watches_.count(conn), 0U);
finished_.erase(conn);
flushes_.insert(conn);
}
if (!SetEvent(flush_.handle())) {
LOG_SYSRESULT(GetLastError());
LOG(ERROR) << "Failed to signal flush";
}
}
void NamedPipeServer::Flusher() {
LOG(INFO) << "Run flusher";
HANDLE events[2];
events[0] = flush_.handle();
events[1] = shutdown_.handle();
for (;;) {
DWORD w = WaitForMultipleObjectsEx(
2, events,
FALSE, // wait all
INFINITE,
TRUE);
GOMA_COUNTERZ("After WaitForMultipleObjectsEx");
switch (w) {
case WAIT_OBJECT_0: // flush
ProcessFlushes();
break;
case WAIT_OBJECT_0 + 1: // shutdown
LOG(INFO) << "shutting down";
if (!SetEvent(flusher_done_.handle())) {
LOG_SYSRESULT(GetLastError());
LOG(FATAL) << "Failed to signal done";
}
return;
case WAIT_IO_COMPLETION:
break;
default:
LOG_SYSRESULT(GetLastError());
LOG(FATAL) << "WaitForMultipleObjectsEx";
return;
}
}
}
void NamedPipeServer::ProcessFlushes() {
GOMA_COUNTERZ("");
VLOG(1) << "ProcessFlushes";
std::unordered_set<Conn*> flushes;
{
AUTOLOCK(lock, &mu_);
flushes.swap(flushes_);
}
for (auto& conn : flushes) {
VLOG(1) << "process flush conn=" << conn;
conn->Flush();
{
AUTOLOCK(lock, &mu_);
CHECK_EQ(watches_.count(conn), 0U);
CHECK_EQ(finished_.count(conn), 0U);
}
delete conn;
}
}
} // namespace devtools_goma