blob: 22f10401efbf191f4e9e14002aaf0a674b92695c [file] [log] [blame]
// Copyright 2010 The Goma Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "worker_thread_manager.h"
#ifndef _WIN32
#include <limits.h>
#endif // _WIN32
#include <queue>
#include <sstream>
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "autolock_timer.h"
#include "callback.h"
#include "compiler_specific.h"
#include "descriptor_poller.h"
#include "glog/logging.h"
#include "simple_timer.h"
#include "socket_descriptor.h"
#include "worker_thread.h"
#ifdef _WIN32
# include "socket_helper_win.h"
#endif
namespace devtools_goma {
/* static */
const int WorkerThreadManager::kDeadPool = -2;
const int WorkerThreadManager::kAlarmPool = -1;
const int WorkerThreadManager::kFreePool = 0;
// Once we register atfork handler, we can't unregister it.
// However, we'd like to fork at SetUp in each unit test of
// subprocess_task_unittest.
// g_initialize_atfork is used to call pthread_atfork once.
// g_enable_fork will be true when WorkerThreadManager is not alive.
#ifndef _WIN32
static bool g_initialize_atfork = false;
#endif
static bool g_enable_fork = false;
WorkerThreadManager::CancelableClosure::CancelableClosure(
const char* const location, Closure* closure)
: closure_(closure),
location_(location) {
}
WorkerThreadManager::CancelableClosure::~CancelableClosure() {
CHECK(closure_ == nullptr);
}
void WorkerThreadManager::CancelableClosure::Cancel() {
delete closure_;
closure_ = nullptr;
}
const char* WorkerThreadManager::CancelableClosure::location() const {
return location_;
}
#ifndef _WIN32
static void DontCallForkInWorkerThreadManager() {
if (!g_enable_fork)
DLOG(FATAL) << "fork called";
}
#endif
WorkerThreadManager::WorkerThreadManager()
: next_worker_index_(0),
next_pool_(kFreePool + 1),
alarm_worker_(nullptr),
next_periodic_closure_id_(1) {
WorkerThread::Initialize();
#ifndef _WIN32
g_enable_fork = false;
if (!g_initialize_atfork) {
pthread_atfork(&DontCallForkInWorkerThreadManager, nullptr, nullptr);
g_initialize_atfork = true;
}
#endif
}
WorkerThreadManager::~WorkerThreadManager() {
CHECK(alarm_worker_ == nullptr);
for (const auto* worker : workers_) {
CHECK(worker == nullptr);
}
g_enable_fork = true;
}
void WorkerThreadManager::Start(int num_threads) {
AUTO_EXCLUSIVE_LOCK(lock, &mu_);
CHECK(workers_.empty());
CHECK(GetCurrentWorker() == nullptr);
alarm_worker_ = new WorkerThread(this, kAlarmPool, "alarm_worker");
alarm_worker_->Start();
next_worker_index_ = 0;
for (int i = 0; i < num_threads; ++i) {
WorkerThread* worker = new WorkerThread(this, kFreePool, "worker");
worker->Start();
workers_.push_back(worker);
}
}
int WorkerThreadManager::StartPool(int num_threads, const std::string& name) {
AUTO_EXCLUSIVE_LOCK(lock, &mu_);
CHECK(GetCurrentWorker() == nullptr);
int pool = next_pool_++;
for (int i = 0; i < num_threads; ++i) {
WorkerThread* worker = new WorkerThread(this, pool, name);
worker->Start();
workers_.push_back(worker);
}
return pool;
}
void WorkerThreadManager::NewThread(OneshotClosure* callback,
const std::string& name) {
AUTO_EXCLUSIVE_LOCK(lock, &mu_);
int pool = next_pool_++;
WorkerThread* worker = new WorkerThread(this, pool, name);
worker->Start();
workers_.push_back(worker);
worker->RunClosure(FROM_HERE, callback, PRIORITY_IMMEDIATE);
}
size_t WorkerThreadManager::num_threads() const {
AUTO_SHARED_LOCK(lock, &mu_);
return workers_.size();
}
void WorkerThreadManager::Shutdown() {
LOG(INFO) << "Shutdown";
AUTO_SHARED_LOCK(lock, &mu_);
CHECK(GetCurrentWorker() == nullptr);
if (alarm_worker_ != nullptr)
alarm_worker_->Shutdown();
for (auto* worker : workers_) {
if (worker)
worker->Shutdown();
}
}
void WorkerThreadManager::Finish() {
LOG(INFO) << "Finish";
AUTO_EXCLUSIVE_LOCK(lock, &mu_);
CHECK(GetCurrentWorker() == nullptr);
if (alarm_worker_ != nullptr)
alarm_worker_->Quit();
for (auto* worker : workers_) {
if (worker)
worker->Quit();
}
// join threads
if (alarm_worker_) {
alarm_worker_->Join();
delete alarm_worker_;
alarm_worker_ = nullptr;
}
for (std::vector<WorkerThread*>::iterator iter = workers_.begin();
iter != workers_.end();
++iter) {
WorkerThread* worker = *iter;
if (worker) {
worker->Join();
delete worker;
*iter = nullptr;
}
}
}
WorkerThreadManager::ThreadId WorkerThreadManager::GetCurrentThreadId() {
return devtools_goma::GetCurrentThreadId();
}
bool WorkerThreadManager::Dispatch() {
WorkerThread* worker = GetCurrentWorker();
DCHECK(worker) << "thread " << GetCurrentThreadId();
return worker->Dispatch();
}
SocketDescriptor* WorkerThreadManager::RegisterSocketDescriptor(
ScopedSocket&& fd, WorkerThreadManager::Priority priority) {
WorkerThread* worker = GetCurrentWorker();
DCHECK(worker) << "thread " << GetCurrentThreadId();
return worker->RegisterSocketDescriptor(std::move(fd), priority);
}
ScopedSocket WorkerThreadManager::DeleteSocketDescriptor(
SocketDescriptor* d) {
WorkerThread* worker = GetCurrentWorker();
DCHECK(worker) << "thead " << GetCurrentThreadId();
return worker->DeleteSocketDescriptor(d);
}
PeriodicClosureId WorkerThreadManager::NextPeriodicClosureId() {
AUTOLOCK(lock, &periodic_closure_id_mu_);
return next_periodic_closure_id_++;
}
PeriodicClosureId WorkerThreadManager::RegisterPeriodicClosure(
const char* const location,
int ms, std::unique_ptr<PermanentClosure> closure) {
DCHECK(alarm_worker_);
PeriodicClosureId id = NextPeriodicClosureId();
alarm_worker_->RunClosure(
FROM_HERE,
NewCallback(
&WorkerThreadManager::RegisterPeriodicClosureOnAlarmer,
alarm_worker_, id, location, ms, std::move(closure)),
PRIORITY_IMMEDIATE);
return id;
}
/* static */
void WorkerThreadManager::RegisterPeriodicClosureOnAlarmer(
WorkerThread* alarmer, PeriodicClosureId id, const char* location,
int ms, std::unique_ptr<PermanentClosure> closure) {
alarmer->RegisterPeriodicClosure(id, location, ms, std::move(closure));
}
void WorkerThreadManager::UnregisterPeriodicClosure(PeriodicClosureId id) {
CHECK(GetCurrentWorker() != alarm_worker_);
DCHECK(alarm_worker_);
UnregisteredClosureData unregistered_data;
alarm_worker_->RunClosure(
FROM_HERE,
NewCallback(
alarm_worker_,
&WorkerThreadManager::WorkerThread::UnregisterPeriodicClosure,
id, &unregistered_data),
PRIORITY_IMMEDIATE);
SimpleTimer timer;
timer.Start();
// Make sure periodic closure was destructed before returning from
// this method.
while (!unregistered_data.Done()) {
const char* location = unregistered_data.Location();
LOG_EVERY_N(INFO, 100)
<< "UnregisterPeriodicClosure id=" << id
<< " location="
<< (location ? location : "")
<< " timer=" << timer.GetInMilliseconds() << " [ms]";
CHECK_LT(timer.GetInMilliseconds(), 60 * 1000)
<< "UnregisterPeriodicClosure didn't finish in 60 seconds";
absl::SleepFor(absl::Milliseconds(10));
}
}
void WorkerThreadManager::RunClosure(
const char* const location,
Closure* closure, Priority priority) {
RunClosureInPool(location, kFreePool, closure, priority);
}
void WorkerThreadManager::RunClosureInPool(
const char* const location,
int pool, Closure* closure, Priority priority) {
// Note: having global pendings queue make slower than this implementation?
WorkerThread* candidate_worker = nullptr;
{
AUTO_EXCLUSIVE_LOCK(lock, &mu_); // updates |next_worker_index_|.
size_t min_load = INT_MAX;
size_t i;
for (i = next_worker_index_;
i < next_worker_index_ + workers_.size();
++i) {
WorkerThread* worker = workers_[i % workers_.size()];
if (!worker) continue;
if (worker->pool() != pool) continue;
if (worker == GetCurrentWorker() && worker->pendings() == 0) {
candidate_worker = worker;
break;
}
size_t load = worker->load();
if (load == 0) {
candidate_worker = worker;
break;
}
if (load < min_load) {
min_load = load;
candidate_worker = worker;
}
}
CHECK(candidate_worker);
next_worker_index_ = (i + 1) % workers_.size();
}
return candidate_worker->RunClosure(location, closure, priority);
}
void WorkerThreadManager::RunClosureInThread(
const char* const location,
ThreadId id,
Closure* closure, Priority priority) {
WorkerThread* worker = GetWorker(id);
DCHECK(worker);
worker->RunClosure(location, closure, priority);
}
WorkerThreadManager::CancelableClosure*
WorkerThreadManager::RunDelayedClosureInThread(
const char* const location,
ThreadId id, int msec,
Closure* closure) {
WorkerThread* worker = GetWorker(id);
DCHECK(worker);
return worker->RunDelayedClosure(location, msec, closure);
}
string WorkerThreadManager::DebugString() const {
AUTO_SHARED_LOCK(lock, &mu_);
std::ostringstream s;
s << workers_.size() << " workers\n";
for (const auto& worker : workers_) {
if (!worker) continue;
s << worker->DebugString();
s << "\n";
}
s << "\n";
return s.str();
}
void WorkerThreadManager::DebugLog() const {
AUTO_SHARED_LOCK(lock, &mu_);
int num_idles = 0;
for (const auto& worker : workers_) {
if (!worker) continue;
if (worker->IsIdle()) {
num_idles++;
continue;
}
LOG(INFO) << worker->DebugString();
}
LOG(INFO) << "idle workers:" << num_idles;
}
/* static */
string WorkerThreadManager::Priority_Name(int priority) {
switch (priority) {
case PRIORITY_LOW: return "PriLow";
case PRIORITY_MED: return "PriMed";
case PRIORITY_HIGH: return "PriHigh";
case PRIORITY_IMMEDIATE: return "PriImmediate";
default:
break;
}
std::ostringstream ss;
ss << "PriUnknown[" << priority << "]";
return ss.str();
}
WorkerThreadManager::WorkerThread* WorkerThreadManager::GetWorker(ThreadId id) {
WorkerThread* worker = nullptr;
{
AUTO_SHARED_LOCK(lock, &mu_);
worker = GetWorkerUnlocked(id);
}
if (worker != nullptr)
return worker;
LOG(FATAL) << "No worker for id=" << id
<< " current=" << GetCurrentThreadId() << " " << DebugString();
return nullptr;
}
WorkerThreadManager::WorkerThread* WorkerThreadManager::GetWorkerUnlocked(
ThreadId id) {
for (auto* worker : workers_) {
if (worker && id == worker->id()) {
return worker;
}
}
return nullptr;
}
WorkerThreadManager::WorkerThread* WorkerThreadManager::GetCurrentWorker() {
return WorkerThread::GetCurrentWorker();
}
WorkerThreadRunner::WorkerThreadRunner(
WorkerThreadManager* wm,
const char* const location, OneshotClosure* closure)
: done_(false) {
LOG(INFO) << "run closure=" << closure
<< " from " << location;
wm->RunClosure(location,
NewCallback(
this,
&WorkerThreadRunner::Run,
closure),
WorkerThreadManager::PRIORITY_MED);
}
WorkerThreadRunner::~WorkerThreadRunner() {
Wait();
}
void WorkerThreadRunner::Wait() {
AUTOLOCK(lock, &mu_);
while (!done_) {
cond_.Wait(&mu_);
}
}
bool WorkerThreadRunner::Done() const {
AUTOLOCK(lock, &mu_);
return done_;
}
void WorkerThreadRunner::Run(OneshotClosure* closure) {
closure->Run();
LOG(INFO) << "done closure=" << closure;
AUTOLOCK(lock, &mu_);
done_ = true;
cond_.Signal();
}
} // namespace devtools_goma