blob: 7b91928d51bd03a6610663279ba675625085ed4f [file] [log] [blame]
// Copyright 2011 The Goma Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "subprocess_controller_client.h"
#ifndef _WIN32
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#else
#include "config_win.h"
#include "socket_helper_win.h"
#endif
#include <memory>
#include <sstream>
#include <utility>
#include <vector>
#include "absl/base/call_once.h"
#include "autolock_timer.h"
#include "callback.h"
#include "compiler_specific.h"
#include "socket_descriptor.h"
#include "glog/logging.h"
#include "glog/stl_logging.h"
MSVC_PUSH_DISABLE_WARNING_FOR_PROTO()
#include "prototmp/subprocess.pb.h"
MSVC_POP_WARNING()
#include "subprocess_task.h"
#include "worker_thread.h"
using std::string;
namespace devtools_goma {
namespace {
// g_mu is initialized in Create().
// SubProcessControllerClient is created in Create(), which is called
// from SubProcessController::Initialize. So, Create is called first.
//
// TODO: We cannot call IsRunning() or Get() unless Create()
// is called with this implementation.
Lock* g_mu;
SubProcessControllerClient* g_client_instance GUARDED_BY(*g_mu);
absl::once_flag g_init_once;
void InitializeOnce() {
g_mu = new Lock();
}
} // anonymous namespace
/* static */
SubProcessControllerClient* SubProcessControllerClient::Create(
int fd, pid_t pid, const Options& options) {
// Must be called before starting threads.
absl::call_once(g_init_once, InitializeOnce);
AUTOLOCK(lock, g_mu);
CHECK(g_client_instance == nullptr);
g_client_instance = new SubProcessControllerClient(fd, pid, options);
CHECK(g_client_instance != nullptr);
return g_client_instance;
}
/* static */
bool SubProcessControllerClient::IsRunning() {
AUTOLOCK(lock, g_mu);
return g_client_instance != nullptr;
}
/* static */
SubProcessControllerClient* SubProcessControllerClient::Get() {
AUTOLOCK(lock, g_mu);
CHECK(g_client_instance != nullptr);
return g_client_instance;
}
/* static */
void SubProcessControllerClient::Initialize(
WorkerThreadManager* wm, const string& tmp_dir) {
wm->NewThread(
NewCallback(
Get(), &SubProcessControllerClient::Setup,
wm, tmp_dir), "subprocess_controller_client");
}
SubProcessControllerClient::SubProcessControllerClient(int fd,
pid_t pid,
Options options)
: wm_(nullptr),
thread_id_(0),
socket_descriptor_(nullptr),
fd_(fd),
server_pid_(pid),
next_id_(0),
current_options_(std::move(options)),
periodic_closure_id_(kInvalidPeriodicClosureId),
quit_(false),
closed_(false),
initialized_(false) {}
SubProcessControllerClient::~SubProcessControllerClient() {
CHECK(quit_);
CHECK(subproc_tasks_.empty());
CHECK_EQ(periodic_closure_id_, kInvalidPeriodicClosureId);
ScopedSocket fd(wm_->DeleteSocketDescriptor(socket_descriptor_));
fd.Close();
socket_descriptor_ = nullptr;
thread_id_ = 0;
wm_ = nullptr;
}
void SubProcessControllerClient::Setup(
WorkerThreadManager* wm, string tmp_dir) {
wm_ = wm;
thread_id_ = wm_->GetCurrentThreadId();
socket_descriptor_ =
wm_->RegisterSocketDescriptor(std::move(fd_), WorkerThread::PRIORITY_MED);
SetInitialized();
socket_descriptor_->NotifyWhenReadable(
NewPermanentCallback(this, &SubProcessControllerClient::DoRead));
SetTmpDir(tmp_dir);
{
AUTOLOCK(lock, &mu_);
CHECK_EQ(periodic_closure_id_, kInvalidPeriodicClosureId);
periodic_closure_id_ = wm_->RegisterPeriodicClosure(
FROM_HERE, absl::Seconds(10), NewPermanentCallback(
this, &SubProcessControllerClient::RunCheckSignaled));
}
LOG(INFO) << "SubProcessControllerClient Initialized"
<< " fd=" << socket_descriptor_->fd();
}
void SubProcessControllerClient::SetInitialized() {
AUTOLOCK(lock, &initialized_mu_);
initialized_ = true;
}
bool SubProcessControllerClient::Initialized() const {
AUTOLOCK(lock, &initialized_mu_);
return initialized_;
}
void SubProcessControllerClient::Quit() {
LOG(INFO) << "SubProcessControllerClient Quit";
std::vector<std::unique_ptr<SubProcessKill>> kills;
{
AUTOLOCK(lock, &mu_);
quit_ = true;
for (std::map<int, SubProcessTask*>::iterator iter = subproc_tasks_.begin();
iter != subproc_tasks_.end();
++iter) {
std::unique_ptr<SubProcessKill> kill(new SubProcessKill);
kill->set_id(iter->first);
kills.emplace_back(std::move(kill));
}
}
for (size_t i = 0; i < kills.size(); ++i) {
Kill(std::move(kills[i]));
}
wm_->RunClosureInThread(
FROM_HERE,
thread_id_,
devtools_goma::NewCallback(
this, &SubProcessControllerClient::SendRequest,
SubProcessController::SHUTDOWN,
std::unique_ptr<google::protobuf::Message>(
absl::make_unique<SubProcessShutdown>())),
WorkerThread::PRIORITY_MED);
{
AUTOLOCK(lock, &mu_);
if (periodic_closure_id_ != kInvalidPeriodicClosureId) {
wm_->UnregisterPeriodicClosure(periodic_closure_id_);
periodic_closure_id_ = kInvalidPeriodicClosureId;
}
}
}
void SubProcessControllerClient::Shutdown() {
LOG(INFO) << "SubProcessControllerClient shutdown";
{
AUTOLOCK(lock, &mu_);
CHECK(quit_);
CHECK_EQ(periodic_closure_id_, kInvalidPeriodicClosureId);
while (!subproc_tasks_.empty() || !closed_) {
LOG(INFO) << "wait for subproc_tasks_ become empty and peer closed";
cond_.Wait(&mu_);
}
}
// Not to pass SubProcessControllerClient::SendRequest to send Kill,
// this should be executed with PRIORITY_MED.
wm_->RunClosureInThread(
FROM_HERE,
thread_id_,
NewCallback(
this, &SubProcessControllerClient::Delete),
WorkerThread::PRIORITY_MED);
}
void SubProcessControllerClient::RegisterTask(SubProcessTask* task) {
CHECK_EQ(-1, task->req().id()) << task->req().DebugString();
CHECK_EQ(SubProcessState::PENDING, task->state())
<< task->req().DebugString();
int id = 0;
bool quit = false;
{
AUTOLOCK(lock, &mu_);
if (quit_) {
quit = true;
// don't put in subproc_tasks_.
} else {
id = ++next_id_;
// detach task would not notify back, so no need to set it
// in subproc_tasks_.
if (!task->req().detach()) {
subproc_tasks_.insert(std::make_pair(id, task));
}
}
}
if (quit) {
LOG(INFO) << task->req().trace_id() << ": RegisterTask in quit";
std::unique_ptr<SubProcessTerminated> terminated(new SubProcessTerminated);
terminated->set_id(id);
terminated->set_status(SubProcessTerminated::kNotStarted);
wm_->RunClosureInThread(
FROM_HERE,
thread_id_,
devtools_goma::NewCallback(
task, &SubProcessTask::Terminated, std::move(terminated)),
WorkerThread::PRIORITY_MED);
return;
}
VLOG(1) << task->req().trace_id() << ": RegisterTask id=" << id;
task->mutable_req()->set_id(id);
std::unique_ptr<SubProcessReq> req(new SubProcessReq);
*req = task->req();
Register(std::move(req));
}
void SubProcessControllerClient::Register(std::unique_ptr<SubProcessReq> req) {
{
AUTOLOCK(lock, &mu_);
if (quit_)
return;
}
VLOG(1) << "Register id=" << req->id() << " " << req->trace_id();
wm_->RunClosureInThread(
FROM_HERE,
thread_id_,
devtools_goma::NewCallback(
this, &SubProcessControllerClient::SendRequest,
SubProcessController::REGISTER,
std::unique_ptr<google::protobuf::Message>(std::move(req))),
WorkerThread::PRIORITY_MED);
}
void SubProcessControllerClient::RequestRun(
std::unique_ptr<SubProcessRun> run) {
VLOG(1) << "Run id=" << run->id();
{
AUTOLOCK(lock, &mu_);
if (quit_)
return;
}
wm_->RunClosureInThread(
FROM_HERE,
thread_id_,
devtools_goma::NewCallback(
this, &SubProcessControllerClient::SendRequest,
SubProcessController::REQUEST_RUN,
std::unique_ptr<google::protobuf::Message>(std::move(run))),
WorkerThread::PRIORITY_MED);
}
void SubProcessControllerClient::Kill(std::unique_ptr<SubProcessKill> kill) {
{
AUTOLOCK(lock, &mu_);
if (periodic_closure_id_ == kInvalidPeriodicClosureId) {
return;
}
}
LOG(INFO) << "Kill id=" << kill->id();
wm_->RunClosureInThread(
FROM_HERE,
thread_id_,
devtools_goma::NewCallback(
this, &SubProcessControllerClient::SendRequest,
SubProcessController::KILL,
std::unique_ptr<google::protobuf::Message>(std::move(kill))),
WorkerThread::PRIORITY_MED);
}
void SubProcessControllerClient::SetOption(
std::unique_ptr<SubProcessSetOption> option) {
{
AUTOLOCK(lock, &mu_);
if (periodic_closure_id_ == kInvalidPeriodicClosureId) {
return;
}
current_options_.max_subprocs = option->max_subprocs();
current_options_.max_subprocs_low_priority =
option->max_subprocs_low_priority();
current_options_.max_subprocs_heavy_weight =
option->max_subprocs_heavy_weight();
}
LOG(INFO) << "SetOption"
<< " max_subprocs=" << option->max_subprocs()
<< " max_subprocs_heavy_weight="
<< option->max_subprocs_heavy_weight()
<< " max_subprocs_low_priority="
<< option->max_subprocs_low_priority();
wm_->RunClosureInThread(
FROM_HERE,
thread_id_,
devtools_goma::NewCallback(
this, &SubProcessControllerClient::SendRequest,
SubProcessController::SET_OPTION,
std::unique_ptr<google::protobuf::Message>(std::move(option))),
WorkerThread::PRIORITY_MED);
}
void SubProcessControllerClient::Started(
std::unique_ptr<SubProcessStarted> started) {
VLOG(1) << "Started " << started->id() << " pid=" << started->pid();
DCHECK(BelongsToCurrentThread());
int id = started->id();
SubProcessTask* task = nullptr;
{
AUTOLOCK(lock, &mu_);
std::map<int, SubProcessTask*>::iterator found =
subproc_tasks_.find(id);
if (found != subproc_tasks_.end()) {
task = found->second;
}
}
if (task == nullptr) {
LOG(WARNING) << "No task for id=" << id;
std::unique_ptr<SubProcessKill> kill(new SubProcessKill);
kill->set_id(id);
Kill(std::move(kill));
return;
}
task->Started(std::move(started));
}
void SubProcessControllerClient::Terminated(
std::unique_ptr<SubProcessTerminated> terminated) {
DCHECK(BelongsToCurrentThread());
VLOG(1) << "Terminated " << terminated->id()
<< " status=" << terminated->status();
int id = terminated->id();
SubProcessTask* task = nullptr;
{
AUTOLOCK(lock, &mu_);
std::map<int, SubProcessTask*>::iterator found =
subproc_tasks_.find(id);
if (found != subproc_tasks_.end()) {
task = found->second;
subproc_tasks_.erase(found);
}
}
if (task != nullptr) {
bool async = task->async_callback();
task->Terminated(std::move(terminated));
// If task is synchronous (!async), task may already be deleted here.
if (async) {
wm_->RunClosureInThread(
FROM_HERE,
task->thread_id(),
NewCallback(
task, &SubProcessTask::Done),
WorkerThread::PRIORITY_MED);
}
} else {
std::ostringstream ss;
ss << "no task found for id=" << id
<< " status=" << terminated->status()
<< " error=" << SubProcessTerminated_ErrorTerminate_Name(
terminated->error());
if (terminated->error() == SubProcessTerminated::kFailedToLookup) {
LOG(INFO) << ss.str();
} else {
LOG(WARNING) << ss.str();
}
}
{
AUTOLOCK(lock, &mu_);
if (quit_ && subproc_tasks_.empty()) {
LOG(INFO) << "all subproc_tasks done";
CHECK(subproc_tasks_.empty());
cond_.Signal();
}
}
}
int SubProcessControllerClient::NumPending() const {
AUTOLOCK(lock, &mu_);
int num_pending = 0;
for (std::map<int, SubProcessTask*>::const_iterator iter =
subproc_tasks_.begin();
iter != subproc_tasks_.end();
++iter) {
SubProcessTask* task = iter->second;
switch (task->state()) {
case SubProcessState::SETUP: case SubProcessState::PENDING:
++num_pending;
break;
default:
{ }
}
}
return num_pending;
}
bool SubProcessControllerClient::BelongsToCurrentThread() const {
return THREAD_ID_IS_SELF(thread_id_);
}
void SubProcessControllerClient::Delete() {
DCHECK(BelongsToCurrentThread());
socket_descriptor_->ClearReadable();
// Maybe not good to accessing g_client_instance which is being
// deleted. So, guard `delete this`, too.
#ifndef _WIN32
pid_t server_pid = server_pid_;
#endif
{
AUTOLOCK(lock, g_mu);
delete this;
g_client_instance = nullptr;
}
#ifndef _WIN32
int status = 0;
if (waitpid(server_pid, &status, 0) == -1) {
PLOG(ERROR) << "SubProcessControllerServer wait failed pid="
<< server_pid;
return;
}
int exit_status = -1;
if (WIFEXITED(status)) {
exit_status = WEXITSTATUS(status);
}
int signaled = 0;
if (WIFSIGNALED(status)) {
signaled = WTERMSIG(status);
}
LOG(INFO) << "SubProcessControllerServer exited"
<< " status=" << exit_status
<< " signal=" << signaled;
if (exit_status != 0 && signaled != 0) {
LOG(ERROR) << "unexpected SubProcessController exit";
}
#endif
}
void SubProcessControllerClient::SendRequest(
SubProcessController::Op op,
std::unique_ptr<google::protobuf::Message> message) {
DCHECK(BelongsToCurrentThread());
if (AddMessage(op, *message)) {
VLOG(3) << "SendRequest has pending write";
socket_descriptor_->NotifyWhenWritable(
NewPermanentCallback(this, &SubProcessControllerClient::DoWrite));
}
}
void SubProcessControllerClient::DoWrite() {
VLOG(2) << "DoWrite";
DCHECK(BelongsToCurrentThread());
if (!WriteMessage(socket_descriptor_->wrapper())) {
LOG(FATAL) << "Unexpected peer shutdown in WriteMessage";
}
if (!has_pending_write()) {
VLOG(3) << "DoWrite no pending";
wm_->RunClosureInThread(
FROM_HERE,
thread_id_,
NewCallback(
this, &SubProcessControllerClient::WriteDone),
WorkerThread::PRIORITY_IMMEDIATE);
}
}
void SubProcessControllerClient::WriteDone() {
VLOG(2) << "WriteDone";
DCHECK(BelongsToCurrentThread());
if (has_pending_write())
return;
socket_descriptor_->ClearWritable();
}
void SubProcessControllerClient::DoRead() {
VLOG(2) << "DoRead";
DCHECK(BelongsToCurrentThread());
int op = 0;
int len = 0;
if (!ReadMessage(socket_descriptor_->wrapper(), &op, &len)) {
VLOG(2) << "pending read op=" << op << " len=" << len;
return;
}
VLOG(2) << "DoRead op=" << op << " len=" << len;
switch (op) {
case SubProcessController::CLOSED:
{
AUTOLOCK(lock, &mu_);
if (quit_) {
VLOG(1) << "peer shutdown in quit";
CHECK(subproc_tasks_.empty())
<< "SubProcessControllerServer closed but subproc_tasks exist:"
<< subproc_tasks_.size();
wm_->RunClosureInThread(
FROM_HERE,
thread_id_,
devtools_goma::NewCallback(
this, &SubProcessControllerClient::OnClosed),
WorkerThread::PRIORITY_MED);
break;
}
}
LOG(FATAL) << "Unexpected peer shutdown in ReadMessage";
// Note: STARTED and TERMINATED should run closure with the same priority
// Otherwise, they may not be executed in order.
case SubProcessController::STARTED: {
std::unique_ptr<SubProcessStarted> started(new SubProcessStarted);
if (started->ParseFromArray(payload_data(), len)) {
wm_->RunClosureInThread(
FROM_HERE,
thread_id_,
devtools_goma::NewCallback(
this, &SubProcessControllerClient::Started,
std::move(started)),
WorkerThread::PRIORITY_MED);
} else {
LOG(ERROR) << "broken SubProcessStarted";
}
}
break;
case SubProcessController::TERMINATED: {
std::unique_ptr<SubProcessTerminated> terminated(
new SubProcessTerminated);
if (terminated->ParseFromArray(payload_data(), len)) {
wm_->RunClosureInThread(
FROM_HERE,
thread_id_,
devtools_goma::NewCallback(
this, &SubProcessControllerClient::Terminated,
std::move(terminated)),
WorkerThread::PRIORITY_MED);
} else {
LOG(ERROR) << "broken SubProcessTerminated";
}
}
break;
default:
LOG(FATAL) << "Unknown SubProcessController::Op " << op;
}
ReadDone();
return;
}
void SubProcessControllerClient::OnClosed() {
AUTOLOCK(lock, &mu_);
if (closed_) {
return;
}
LOG(INFO) << "peer closed";
CHECK(subproc_tasks_.empty());
closed_ = true;
socket_descriptor_->StopRead();
socket_descriptor_->StopWrite();
cond_.Signal();
}
void SubProcessControllerClient::RunCheckSignaled() {
if (!IsRunning()) {
// RunCheckSignaled is periodic closure managed by g_client_instance
// it should never be called when not running.
LOG(FATAL) << "SubProcessControllerClient is not running";
return;
}
// Switch from alarm worker to client thread.
wm_->RunClosureInThread(
FROM_HERE,
thread_id_,
NewCallback(
this, &SubProcessControllerClient::CheckSignaled),
WorkerThread::PRIORITY_MED);
}
void SubProcessControllerClient::CheckSignaled() {
if (!IsRunning()) {
// g_client_instnace (and this pointer) may be nullptr because Delete is
// higher priority (put in WorkerThreadManager in Shutdown).
// Should not access any member fields here.
return;
}
DCHECK(BelongsToCurrentThread());
std::vector<std::unique_ptr<SubProcessKill>> kills;
{
AUTOLOCK(lock, &mu_);
for (std::map<int, SubProcessTask*>::const_iterator iter =
subproc_tasks_.begin();
iter != subproc_tasks_.end();
++iter) {
int id = iter->first;
SubProcessTask* task = iter->second;
if (task->state() == SubProcessState::SIGNALED) {
std::unique_ptr<SubProcessKill> kill(new SubProcessKill);
kill->set_id(id);
kills.emplace_back(std::move(kill));
}
}
}
if (!kills.empty()) {
for (size_t i = 0; i < kills.size(); ++i) {
Kill(std::move(kills[i]));
}
}
}
string SubProcessControllerClient::DebugString() const {
AUTOLOCK(lock, &mu_);
std::ostringstream ss;
ss << "options: " << current_options_.DebugString() << '\n';
for (std::map<int, SubProcessTask*>::const_iterator iter =
subproc_tasks_.begin();
iter != subproc_tasks_.end();
++iter) {
int id = iter->first;
SubProcessTask* task = iter->second;
ss << id << " "
<< task->req().trace_id() << " "
<< SubProcessState::State_Name(task->state()) << " "
<< SubProcessReq::Priority_Name(task->req().priority()) << " "
<< SubProcessReq::Weight_Name(task->req().weight()) << " "
<< "pid=" << task->started().pid() << "\n";
}
return ss.str();
}
} // namespace devtools_goma