blob: c1cbdf9296aa9427b25d6dfa42c13b0731cb2a5c [file] [log] [blame]
// Copyright 2011 The Goma Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "subprocess_controller_server.h"
#include <string.h>
#include <memory>
#include <set>
#include <utility>
#ifndef _WIN32
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <sys/select.h>
#include <unistd.h>
#endif
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "compiler_specific.h"
#include "fileflag.h"
#include "glog/logging.h"
#include "glog/stl_logging.h"
#include "ioutil.h"
#include "path.h"
#include "platform_thread.h"
#include "subprocess_impl.h"
MSVC_PUSH_DISABLE_WARNING_FOR_PROTO()
#include "prototmp/subprocess.pb.h"
MSVC_POP_WARNING()
#ifdef _WIN32
#include "spawner_win.h"
#endif
namespace devtools_goma {
#ifdef THREAD_SANITIZER
// When tsan is enabled, signal is swallowed by sanitizer and causing long wait
// in select. To improve time in that case, we use small wait time for select.
// TODO: remove this when below issue is fixed
// https://github.com/google/sanitizers/issues/838
static const int kIdleIntervalMilliSec = 50;
#else
static const int kIdleIntervalMilliSec = 500;
#endif
#ifndef _WIN32
static const int kWaitIntervalMilliSec = 5;
#endif
#ifndef _WIN32
// siginfo is passed from signal handler to SubProcessControllerServer loop.
static int g_signal_fd;
void SigChldAction(int signo ALLOW_UNUSED,
siginfo_t* siginfo,
void* context ALLOW_UNUSED) {
if (write(g_signal_fd, siginfo, sizeof(siginfo_t)) != sizeof(siginfo_t))
abort();
}
#endif
SubProcessControllerServer::SubProcessControllerServer(
int sock_fd,
SubProcessController::Options options)
: sock_fd_(sock_fd),
#ifndef _WIN32
signal_fd_(-1),
#endif
timeout_millisec_(kIdleIntervalMilliSec),
options_(std::move(options)) {
LOG(INFO) << "SubProcessControllerServer started fd=" << sock_fd
<< " " << options_.DebugString();
#ifdef _WIN32
SpawnerWin::Setup();
#endif
}
SubProcessControllerServer::~SubProcessControllerServer() {
#ifdef _WIN32
SpawnerWin::TearDown();
#endif
LOG(INFO) << "SubProcessControllerServer deleted.";
}
bool SubProcessControllerServer::Loop() {
VLOG(1) << "Loop";
#ifndef _WIN32
SetupSigchldHandler();
#endif
DCHECK(sock_fd_.valid());
#ifndef _WIN32
DCHECK(signal_fd_.valid());
#endif
for (;;) {
if (!sock_fd_.valid()) {
VLOG(1) << "sock_fd closed";
break;
}
fd_set read_fd;
fd_set write_fd;
#ifdef MEMORY_SANITIZER
// workaround to make MSan understand fd_set initialized.
// FD_ZERO use inline asm. MSan cannot understand that.
// See: https://github.com/google/sanitizers/issues/192
memset(&read_fd, 0, sizeof(read_fd));
memset(&write_fd, 0, sizeof(write_fd));
#endif // MEMORY_SANITIZER
FD_ZERO(&read_fd);
FD_ZERO(&write_fd);
MSVC_PUSH_DISABLE_WARNING_FOR_FD_SET();
FD_SET(sock_fd_.get(), &read_fd);
MSVC_POP_WARNING();
if (has_pending_write()) {
MSVC_PUSH_DISABLE_WARNING_FOR_FD_SET();
FD_SET(sock_fd_.get(), &write_fd);
MSVC_POP_WARNING();
}
int max_fd = std::max<int>(-1, sock_fd_.get());
#ifndef _WIN32
FD_SET(signal_fd_.fd(), &read_fd);
max_fd = std::max(max_fd, signal_fd_.fd());
#endif
struct timeval tv;
tv.tv_sec = timeout_millisec_ / 1000;
tv.tv_usec = (timeout_millisec_ - (tv.tv_sec * 1000)) * 1000;
int r = select(max_fd + 1, &read_fd, &write_fd, nullptr, &tv);
if (r < 0) {
if (errno == EINTR || errno == EAGAIN)
continue;
PLOG(FATAL) << "select";
}
VLOG(2) << "r=" << r
<< " sock_fd=" << FD_ISSET(sock_fd_.get(), &read_fd)
#ifndef _WIN32
<< " signal_fd=" << FD_ISSET(signal_fd_.fd(), &read_fd)
#endif
<< " t=" << tv.tv_sec << "," << tv.tv_usec;
if (r == 0) {
DoTimeout();
continue;
}
if (FD_ISSET(sock_fd_.get(), &write_fd)) {
DoWrite();
if (!has_pending_write()) {
FlushLogFiles();
}
}
if (FD_ISSET(sock_fd_.get(), &read_fd)) {
DoRead();
}
#ifndef _WIN32
if (FD_ISSET(signal_fd_.fd(), &read_fd)) {
DoSignal();
}
#endif
if (shutdowned_ && subprocs_.empty()) {
LOG(INFO) << "shutdown: no subprocs";
break;
}
}
LOG(INFO) << "Terminating...";
FlushLogFiles();
for (const auto& iter : subprocs_) {
SubProcessImpl* s = iter.second.get();
if (s->req().detach()) {
continue;
}
std::unique_ptr<SubProcessTerminated> terminated;
s->Kill();
// Wait for the running subprocess termination.
// Because Wait() would emit log message and it would take some time to
// terminate the subprocess, it will sleep for a while.
// b/5370450
while ((terminated = s->Wait(true)) == nullptr) {
absl::SleepFor(absl::Seconds(10));
}
}
FlushLogFiles();
subprocs_.clear();
return shutdowned_;
}
void SubProcessControllerServer::Register(std::unique_ptr<SubProcessReq> req) {
LOG(INFO) << "id=" << req->id() << " Register " << req->trace_id();
if (shutdowned_) {
LOG(INFO) << "shutdowning: refuse Regsiter id=" << req->id();
SubProcessTerminated terminated;
terminated.set_id(req->id());
SendNotify(SubProcessController::TERMINATED, terminated);
return;
}
bool dont_kill = options_.dont_kill_subprocess;
VLOG(1) << "id=" << req->id() << " Kill? " << req->trace_id()
<< " prog=" << req->prog()
<< " dont_kill=" << dont_kill;
SubProcessImpl* s = new SubProcessImpl(*req, dont_kill);
CHECK(subprocs_.insert(std::make_pair(req->id(), s)).second);
TrySpawnSubProcess();
}
void SubProcessControllerServer::RequestRun(
std::unique_ptr<SubProcessRun> run) {
VLOG(1) << "id=" << run->id() << " Run";
if (shutdowned_) {
LOG(WARNING) << "shutdowning: ignore RequestRun id=" << run->id();
return;
}
SubProcessImpl* s = LookupSubProcess(run->id());
if (s == nullptr) {
LOG(WARNING) << "id=" << run->id() << " request run unknown id "
<< "(maybe already killed?)";
return;
}
s->RaisePriority();
TrySpawnSubProcess();
}
void SubProcessControllerServer::Kill(std::unique_ptr<SubProcessKill> kill) {
VLOG(1) << "id=" << kill->id() << " Kill";
SubProcessImpl* s = LookupSubProcess(kill->id());
if (s == nullptr) {
LOG(WARNING) << "id=" << kill->id() << " kill unknown id "
<< "(maybe already killed?)";
return;
}
if (shutdowned_) {
LOG(WARNING) << "shutdowning: Kill id=" << kill->id();
}
if (!s->Kill()) {
std::unique_ptr<SubProcessTerminated> terminated(s->Wait(false));
if (terminated != nullptr) {
Terminated(std::move(terminated));
return;
}
ErrorTerminate(kill->id(), SubProcessTerminated::kFailedToKill);
}
}
void SubProcessControllerServer::SetOption(
std::unique_ptr<SubProcessSetOption> opt) {
if (shutdowned_) {
LOG(INFO) << "shutdowning: ignore SetOption";
return;
}
if (opt->has_max_subprocs() &&
options_.max_subprocs != opt->max_subprocs()) {
if (opt->max_subprocs() > 0) {
options_.max_subprocs = opt->max_subprocs();
LOG(INFO) << "option changed: max_subprocs="
<< opt->max_subprocs();
} else {
LOG(WARNING) << "option max_subprocs is not changed: "
<< "max_subprocs should be positive. value="
<< opt->max_subprocs();
}
}
if (opt->has_max_subprocs_low_priority() &&
options_.max_subprocs_low_priority != opt->max_subprocs_low_priority()) {
if (opt->max_subprocs_low_priority() > 0) {
options_.max_subprocs_low_priority = opt->max_subprocs_low_priority();
LOG(INFO) << "option changed: max_subprocs_low_priority="
<< opt->max_subprocs_low_priority();
} else {
LOG(WARNING) << "option max_subprocs_low_priority is not changed: "
<< "max_subprocs_low_priority should be positive. value="
<< opt->max_subprocs_low_priority();
}
}
if (opt->has_max_subprocs_heavy_weight() &&
options_.max_subprocs_heavy_weight != opt->max_subprocs_heavy_weight()) {
if (opt->max_subprocs_heavy_weight() > 0) {
options_.max_subprocs_heavy_weight = opt->max_subprocs_heavy_weight();
LOG(INFO) << "option changed: max_subprocs_heavy_weight="
<< opt->max_subprocs_heavy_weight();
} else {
LOG(WARNING) << "option max_subprocs_heavy_weight is not changed: "
<< "max_subprocs_heavy_weight should be positive. value="
<< opt->max_subprocs_heavy_weight();
}
}
}
void SubProcessControllerServer::Started(
std::unique_ptr<SubProcessStarted> started) {
LOG(INFO) << "id=" << started->id() << " Started pid=" << started->pid();
SendNotify(SubProcessController::STARTED, *started);
}
void SubProcessControllerServer::Terminated(
std::unique_ptr<SubProcessTerminated> terminated) {
LOG_IF(INFO, terminated->status() != SubProcessTerminated::kInternalError)
<< "id=" << terminated->id() << " Terminated"
<< " status=" << terminated->status();
subprocs_.erase(terminated->id());
SendNotify(SubProcessController::TERMINATED, *terminated);
TrySpawnSubProcess();
}
SubProcessImpl* SubProcessControllerServer::LookupSubProcess(int id) {
auto found = subprocs_.find(id);
if (found == subprocs_.end()) {
// There is information gap between server and client.
// The server can execute a subprocess and send SubProcessTerminated
// any time. If it send SubProcessTerminated, the subprocess's id is
// removed from subprocs_.
// If SubProcessTerminated is in-flight, the client does not know it
// removed from server's subprocs_, and it may send the request for the id.
// If the client is not broken, REGISTER should come before anything else.
// We MUST NOT think unknown id as error.
LOG(INFO) << "id=" << id << " failed to LookupSubProcess "
<< "(maybe already killed?)";
// In case subprocess_controller_client leaks id,
// we will send ErrorTerminate.
ErrorTerminate(id, SubProcessTerminated::kFailedToLookup);
return nullptr;
}
return found->second.get();
}
void SubProcessControllerServer::TrySpawnSubProcess() {
VLOG(1) << "TrySpawnSubProcess";
int running = 0;
int num_heavy_weight = 0;
SubProcessImpl* candidate = nullptr;
// Find next candidate from subprocs_.
// Higher priority will be selected.
// If the same priority exists, oldest one (smallest id number in the
// priority) will be selected. In other words, latter subproc with the
// same priority in the list would not be executed before former subproc.
// subproc weight is not checked to select next candidate.
for (const auto& iter : subprocs_) {
SubProcessImpl* s = iter.second.get();
VLOG(2) << s->req().id() << " " << s->req().trace_id()
<< " " << SubProcessState::State_Name(s->state());
if (s->state() == SubProcessState::PENDING &&
s->req().priority() == SubProcessReq::HIGHEST_PRIORITY) {
// hightest priority is used in SubProcessTask::ReadCommandOutput.
DCHECK_EQ(SubProcessReq::LIGHT_WEIGHT, s->req().weight());
candidate = s;
break;
}
if (s->state() == SubProcessState::RUN) {
++running;
if (running >= options_.max_subprocs) {
VLOG(1) << "Too many subprocesses already running";
return;
}
if (s->req().weight() == SubProcessReq::HEAVY_WEIGHT) {
++num_heavy_weight;
}
}
if (s->state() != SubProcessState::PENDING)
continue;
if (candidate == nullptr) {
candidate = s;
continue;
}
if (candidate->req().priority() == SubProcessReq::LOW_PRIORITY &&
s->req().priority() == SubProcessReq::HIGH_PRIORITY) {
candidate = s;
}
}
if (candidate == nullptr) {
VLOG(2) << "no candidate";
return;
}
VLOG(2) << "candiate:" << candidate->req().id()
<< " " << candidate->req().trace_id();
// Once a candidate is selected, check max_subprocs_heavey_weight
// and max_subprocs_low_priority.
if (candidate->req().weight() == SubProcessReq::HEAVY_WEIGHT &&
num_heavy_weight >= options_.max_subprocs_heavy_weight) {
VLOG(1) << "Heavy weight subprocess already running "
<< num_heavy_weight;
return;
}
if (candidate->req().priority() == SubProcessReq::LOW_PRIORITY &&
running >= options_.max_subprocs_low_priority) {
VLOG(1) << "candidate priority is low";
return;
}
std::unique_ptr<SubProcessStarted> started(candidate->Spawn());
if (started != nullptr) {
Started(std::move(started));
return;
}
if (candidate->req().detach()) {
return;
}
ErrorTerminate(candidate->req().id(), SubProcessTerminated::kFailedToSpawn);
}
void SubProcessControllerServer::ErrorTerminate(
int id, SubProcessTerminated_ErrorTerminate reason) {
VLOG(1) << "id=" << id << " ErrorTerminate";
std::unique_ptr<SubProcessTerminated> terminated(new SubProcessTerminated);
terminated->set_id(id);
terminated->set_status(SubProcessTerminated::kInternalError);
terminated->set_error(reason);
Terminated(std::move(terminated));
}
void SubProcessControllerServer::SendNotify(
int op, const google::protobuf::Message& message) {
VLOG(2) << "SendNotify op=" << op << " message=" << message.DebugString();
AddMessage(op, message);
}
void SubProcessControllerServer::DoWrite() {
VLOG(2) << "DoWrite";
if (!WriteMessage(&sock_fd_)) {
LOG(ERROR) << "write error";
sock_fd_.reset(-1);
}
}
void SubProcessControllerServer::DoRead() {
VLOG(2) << "DoRead";
int op = 0;
int len = 0;
if (!ReadMessage(&sock_fd_, &op, &len)) {
return;
}
VLOG(2) << "op=" << op << " len=" << len;
switch (op) {
case SubProcessController::CLOSED:
LOG(ERROR) << "read: closed";
sock_fd_.reset(-1);
break;
case SubProcessController::REGISTER: {
std::unique_ptr<SubProcessReq> req(new SubProcessReq);
if (req->ParseFromArray(payload_data(), len)) {
Register(std::move(req));
} else {
LOG(ERROR) << "broken SubProcessReq";
}
}
break;
case SubProcessController::REQUEST_RUN: {
std::unique_ptr<SubProcessRun> run(new SubProcessRun);
if (run->ParseFromArray(payload_data(), len)) {
RequestRun(std::move(run));
} else {
LOG(ERROR) << "broken SubProcessRun";
}
}
break;
case SubProcessController::KILL: {
std::unique_ptr<SubProcessKill> kill(new SubProcessKill);
if (kill->ParseFromArray(payload_data(), len)) {
Kill(std::move(kill));
} else {
LOG(ERROR) << "broken SubProcessKill";
}
}
break;
case SubProcessController::SET_OPTION: {
std::unique_ptr<SubProcessSetOption> option(new SubProcessSetOption);
if (option->ParseFromArray(payload_data(), len)) {
SetOption(std::move(option));
} else {
LOG(ERROR) << "broken SubProcessSetOption";
}
}
break;
case SubProcessController::SHUTDOWN:
LOG(INFO) << "shutdown requested";
shutdowned_ = true;
break;
default:
LOG(FATAL) << "Unknown SubProcessController::Op " << op;
}
ReadDone();
return;
}
#ifndef _WIN32
void SubProcessControllerServer::SetupSigchldHandler() {
int fds[2];
PCHECK(pipe(fds) == 0);
signal_fd_.reset(fds[0]);
g_signal_fd = fds[1];
SetFileDescriptorFlag(g_signal_fd, FD_CLOEXEC);
struct sigaction sa;
memset(&sa, 0, sizeof(struct sigaction));
sa.sa_sigaction = SigChldAction;
sa.sa_flags = SA_NOCLDSTOP | SA_SIGINFO | SA_RESTART;
PCHECK(sigaction(SIGCHLD, &sa, nullptr) == 0);
}
void SubProcessControllerServer::DoSignal() {
VLOG(1) << "DoSignal";
siginfo_t si;
int r = read(signal_fd_.fd(), &si, sizeof(si));
if (r <= 0) {
PLOG(FATAL) << "signal_fd " << r;
}
LOG(INFO) << "signal pid=" << si.si_pid << " status=" << si.si_status;
for (const auto& iter : subprocs_) {
SubProcessImpl* s = iter.second.get();
if (s->started().pid() == si.si_pid) {
s->Signaled(si.si_status);
timeout_millisec_ = kWaitIntervalMilliSec;
return;
}
}
LOG(WARNING) << "no subprocess found for pid:" << si.si_pid;
timeout_millisec_ = kIdleIntervalMilliSec;
}
#endif
void SubProcessControllerServer::DoTimeout() {
VLOG(1) << "DoTimeout";
bool check_terminated = true;
bool in_signaled = false;
while (check_terminated) {
check_terminated = false;
in_signaled = false;
for (const auto& iter : subprocs_) {
SubProcessImpl* s = iter.second.get();
if (s->started().pid() == SubProcessState::kInvalidPid)
continue;
bool need_kill = s->state() == SubProcessState::SIGNALED;
if (need_kill)
in_signaled = true;
std::unique_ptr<SubProcessTerminated> terminated(s->Wait(need_kill));
if (terminated != nullptr) {
Terminated(std::move(terminated));
// subprocs_ was modified, so iter was invalidated.
check_terminated = true;
break;
}
}
}
// If no subprocess is in SIGNALED, we don't need to wait for terminated
// task in kWaitIntervalMilliSec.
if (!in_signaled)
timeout_millisec_ = kIdleIntervalMilliSec;
}
} // namespace devtools_goma