| // Copyright 2012 The Goma Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| |
| #include "worker_thread.h" |
| |
| #include <algorithm> |
| #include <sstream> |
| #include <utility> |
| |
| #include "absl/memory/memory.h" |
| #include "absl/time/clock.h" |
| #include "autolock_timer.h" |
| #include "callback.h" |
| #include "compiler_specific.h" |
| #include "descriptor_poller.h" |
| #include "glog/logging.h" |
| #include "ioutil.h" |
| #include "socket_descriptor.h" |
| #include "worker_thread_manager.h" |
| |
| #ifdef _WIN32 |
| # include "socket_helper_win.h" |
| #endif |
| |
| #define LOG_EVERY_SEC_CONCAT(base, line) base##line |
| #define LOG_EVERY_SEC_VARNAME(base, line) LOG_EVERY_SEC_CONCAT(base, line) |
| |
| #define LOG_EVERY_SEC_NEXT_LOG_TIME \ |
| LOG_EVERY_SEC_VARNAME(last_log_time_, __LINE__) |
| #define LOG_EVERY_SEC_NEXT_LOG_TIME_MU \ |
| LOG_EVERY_SEC_VARNAME(last_log_time_mu_, __LINE__) |
| |
| // Used for LOG at most once per second. |
| // This will be useful to log inside high frequency loop. |
| #define LOG_EVERY_SEC(severity) \ |
| static absl::Time LOG_EVERY_SEC_NEXT_LOG_TIME; \ |
| static devtools_goma::Lock LOG_EVERY_SEC_NEXT_LOG_TIME_MU; \ |
| if (update_log_time(&LOG_EVERY_SEC_NEXT_LOG_TIME, \ |
| &LOG_EVERY_SEC_NEXT_LOG_TIME_MU)) \ |
| LOG(severity) |
| |
| namespace devtools_goma { |
| |
| namespace { |
| |
| // This function returns true at most one per second, |
| // used to decide whether LOG_EVERY_SEC logs or not. |
| bool update_log_time(absl::Time* t, devtools_goma::Lock* mu) { |
| AUTOLOCK(lock, mu); |
| const auto now = absl::Now(); |
| if (*t > now) |
| return false; |
| *t = now + absl::Seconds(1); |
| return true; |
| } |
| |
| } // namespace |
| |
| WorkerThread::ClosureData::ClosureData( |
| const char* const location, |
| Closure* closure, |
| int queuelen, |
| int tick, |
| Timestamp timestamp) |
| : location_(location), |
| closure_(closure), |
| queuelen_(queuelen), |
| tick_(tick), |
| timestamp_(timestamp) { |
| } |
| |
| void WorkerThread::DelayedClosureImpl::Run() { |
| Closure* closure = GetClosure(); |
| if (closure != nullptr) { |
| VLOG(3) << "delayed=" << closure; |
| closure->Run(); |
| } else { |
| VLOG(1) << "closure " << location() << " has been cancelled"; |
| } |
| // Delete delayed_closure after closure runs. |
| delete this; |
| } |
| |
| class WorkerThread::PeriodicClosure { |
| public: |
| PeriodicClosure(PeriodicClosureId id, const char* const location, |
| Timestamp time_now, absl::Duration period, |
| std::unique_ptr<PermanentClosure> closure) |
| : id_(id), |
| location_(location), |
| last_time_(time_now), |
| period_(period), |
| closure_(std::move(closure)) { |
| } |
| |
| PeriodicClosureId id() const { return id_; } |
| const char* location() const { return location_; } |
| |
| PermanentClosure* GetClosure(Timestamp time_now) { |
| CHECK_GE(time_now, last_time_); |
| if (time_now >= last_time_ + period_) { |
| last_time_ = time_now; |
| return closure_.get(); |
| } |
| return nullptr; |
| } |
| |
| PermanentClosure* closure() const { return closure_.get(); } |
| std::unique_ptr<PermanentClosure> ReleaseClosure() { |
| return std::move(closure_); |
| } |
| |
| private: |
| const PeriodicClosureId id_; |
| const char* const location_; |
| Timestamp last_time_; |
| const absl::Duration period_; |
| std::unique_ptr<PermanentClosure> closure_; |
| DISALLOW_COPY_AND_ASSIGN(PeriodicClosure); |
| }; |
| |
| WorkerThread::WorkerThread(int pool, std::string name) |
| : pool_(pool), |
| handle_(kNullThreadHandle), |
| tick_(0), |
| shutting_down_(false), |
| quit_(false), |
| name_(std::move(name)), |
| auto_lock_stat_next_closure_(nullptr), |
| auto_lock_stat_poll_events_(nullptr) { |
| VLOG(2) << "WorkerThread " << name_; |
| int pipe_fd[2]; |
| #ifndef _WIN32 |
| PCHECK(pipe(pipe_fd) == 0); |
| #else |
| CHECK_EQ(async_socketpair(pipe_fd), 0); |
| #endif |
| ScopedSocket pr(pipe_fd[0]); |
| PCHECK(pr.SetCloseOnExec()); |
| PCHECK(pr.SetNonBlocking()); |
| ScopedSocket pw(pipe_fd[1]); |
| PCHECK(pw.SetCloseOnExec()); |
| PCHECK(pw.SetNonBlocking()); |
| id_ = 0; |
| // poller takes ownership of both pipe fds. |
| poller_ = DescriptorPoller::NewDescriptorPoller( |
| absl::make_unique<SocketDescriptor>(std::move(pr), PRIORITY_HIGH, this), |
| std::move(pw)); |
| timer_.Start(); |
| if (g_auto_lock_stats) { |
| // TODO: Split stats per pool. |
| auto_lock_stat_next_closure_ = g_auto_lock_stats->NewStat( |
| "worker_thread::NextClosure"); |
| |
| auto_lock_stat_poll_events_ = g_auto_lock_stats->NewStat( |
| "descriptor_poller::PollEvents"); |
| } |
| for (int priority = PRIORITY_MIN; priority < NUM_PRIORITIES; ++priority) { |
| max_queuelen_[priority] = 0; |
| max_wait_time_[priority] = absl::ZeroDuration(); |
| } |
| } |
| |
| WorkerThread::~WorkerThread() { |
| VLOG(2) << "~WorkerThread " << name_; |
| CHECK_EQ(kNullThreadHandle, handle_); |
| CHECK(!id_); |
| } |
| |
| /* static */ |
| void WorkerThread::Initialize() { |
| absl::call_once(key_worker_once_, |
| &WorkerThread::InitializeWorkerKey); |
| } |
| |
| /* static */ |
| WorkerThread* WorkerThread::GetCurrentWorker() { |
| #ifndef _WIN32 |
| return static_cast<WorkerThread*>(pthread_getspecific(key_worker_)); |
| #else |
| return static_cast<WorkerThread*>(TlsGetValue(key_worker_)); |
| #endif |
| } |
| |
| WorkerThread::Timestamp WorkerThread::NowCached() { |
| if (!now_cached_) |
| now_cached_ = timer_.GetDuration(); |
| return *now_cached_; |
| } |
| |
| void WorkerThread::Shutdown() { |
| VLOG(2) << "Shutdown " << name_; |
| AUTOLOCK(lock, &mu_); |
| shutting_down_ = true; |
| } |
| |
| void WorkerThread::Quit() { |
| VLOG(2) << "Quit " << name_; |
| AUTOLOCK(lock, &mu_); |
| shutting_down_ = true; |
| quit_ = true; |
| poller_->Signal(); |
| } |
| |
| void WorkerThread::ThreadMain() { |
| #ifndef _WIN32 |
| pthread_setspecific(key_worker_, this); |
| #else |
| TlsSetValue(key_worker_, this); |
| #endif |
| { |
| AUTOLOCK(lock, &mu_); |
| id_ = GetCurrentThreadId(); |
| VLOG(1) << "Start thread:" << id_ << " " << name_; |
| cond_id_.Signal(); |
| } |
| while (Dispatch()) { } |
| LOG(INFO) << id_ << " Dispatch loop finished " << name_; |
| { |
| AUTOLOCK(lock, &mu_); |
| for (int priority = PRIORITY_MIN; priority < NUM_PRIORITIES; ++priority) { |
| CHECK(pendings_[priority].empty()); |
| } |
| CHECK(descriptors_.empty()); |
| CHECK(periodic_closures_.empty()); |
| CHECK(quit_); |
| } |
| } |
| |
| bool WorkerThread::Dispatch() { |
| VLOG(2) << "Dispatch " << name_; |
| now_cached_.reset(); |
| if (!NextClosure()) { |
| VLOG(2) << "Dispatch end " << name_; |
| return false; |
| } |
| if (!current_closure_data_) |
| return true; |
| VLOG(2) << "Loop closure=" << current_closure_data_->closure_ << " " << name_; |
| const Timestamp start = timer_.GetDuration(); |
| current_closure_data_->closure_->Run(); |
| LOG_EVERY_SEC(INFO) << "dispatched closure location:" |
| << current_closure_data_->location_; |
| |
| absl::Duration duration = timer_.GetDuration() - start; |
| if (duration > absl::Minutes(1)) { |
| LOG(WARNING) << id_ << " closure run too long: " << duration |
| << " " << current_closure_data_->location_ |
| << " " << current_closure_data_->closure_; |
| } |
| return true; |
| } |
| |
| absl::once_flag WorkerThread::key_worker_once_; |
| |
| #ifndef _WIN32 |
| pthread_key_t WorkerThread::key_worker_; |
| #else |
| DWORD WorkerThread::key_worker_ = TLS_OUT_OF_INDEXES; |
| #endif |
| |
| SocketDescriptor* WorkerThread::RegisterSocketDescriptor(ScopedSocket&& fd, |
| Priority priority) { |
| VLOG(2) << "RegisterSocketDescriptor " << name_; |
| AUTOLOCK(lock, &mu_); |
| DCHECK_LT(priority, PRIORITY_IMMEDIATE); |
| auto d = absl::make_unique<SocketDescriptor>(std::move(fd), priority, this); |
| auto* d_ptr = d.get(); |
| CHECK(descriptors_.emplace(d_ptr->fd(), std::move(d)).second); |
| return d_ptr; |
| } |
| |
| ScopedSocket WorkerThread::DeleteSocketDescriptor( |
| SocketDescriptor* d) { |
| VLOG(2) << "DeleteSocketDescriptor " << name_; |
| AUTOLOCK(lock, &mu_); |
| poller_->UnregisterDescriptor(d); |
| ScopedSocket fd(d->ReleaseFd()); |
| if (fd.valid()) { |
| descriptors_.erase(fd.get()); |
| } |
| return fd; |
| } |
| |
| void WorkerThread::RegisterPeriodicClosure( |
| PeriodicClosureId id, const char* const location, |
| absl::Duration period, std::unique_ptr<PermanentClosure> closure) { |
| VLOG(2) << "RegisterPeriodicClosure " << name_; |
| AUTOLOCK(lock, &mu_); |
| periodic_closures_.emplace_back( |
| new PeriodicClosure(id, location, NowCached(), period, |
| std::move(closure))); |
| } |
| |
| void WorkerThread::UnregisterPeriodicClosure( |
| PeriodicClosureId id, UnregisteredClosureData* data) { |
| VLOG(2) << "UnregisterPeriodicClosure " << name_; |
| DCHECK(data); |
| AUTOLOCK(lock, &mu_); |
| CHECK_NE(id, kInvalidPeriodicClosureId); |
| |
| { |
| std::unique_ptr<PermanentClosure> closure; |
| |
| auto it = std::find_if(periodic_closures_.begin(), periodic_closures_.end(), |
| [id](const std::unique_ptr<PeriodicClosure>& it) { |
| return it->id() == id; |
| }); |
| if (it != periodic_closures_.end()) { |
| closure = (*it)->ReleaseClosure(); |
| // Since location is used when this function |
| // takes long time, this should be set when it's available. |
| data->SetLocation((*it)->location()); |
| periodic_closures_.erase(it); |
| } |
| |
| DCHECK(closure) << "Removing unregistered closure id=" << id; |
| |
| std::deque<ClosureData> pendings; |
| while (!pendings_[PRIORITY_IMMEDIATE].empty()) { |
| ClosureData pending_closure = |
| pendings_[PRIORITY_IMMEDIATE].front(); |
| pendings_[PRIORITY_IMMEDIATE].pop_front(); |
| if (pending_closure.closure_ == closure.get()) |
| continue; |
| pendings.push_back(pending_closure); |
| } |
| pendings_[PRIORITY_IMMEDIATE].swap(pendings); |
| } |
| |
| // Notify that |closure| is removed from the queues. |
| // SetDone(true) after |closure| has been deleted. |
| data->SetDone(true); |
| } |
| |
| void WorkerThread::RunClosure(const char* const location, Closure* closure, |
| Priority priority) { |
| VLOG(2) << "RunClosure " << name_; |
| DCHECK_GE(priority, PRIORITY_MIN); |
| DCHECK_LT(priority, NUM_PRIORITIES); |
| { |
| AUTOLOCK(lock, &mu_); |
| AddClosure(location, priority, closure); |
| // If this is the same thread, or this worker is running some closure |
| // (or in other words, this worker is not in select wait), |
| // next Dispatch could pick a closure from pendings_, so we don't need |
| // to signal via pipe. |
| if (THREAD_ID_IS_SELF(id_) || current_closure_data_) |
| return; |
| } |
| // send select loop something to read about, so new pendings will be |
| // processed soon. |
| poller_->Signal(); |
| } |
| |
| WorkerThread::CancelableClosure* WorkerThread::RunDelayedClosure( |
| const char* const location, |
| absl::Duration delay, Closure* closure) { |
| VLOG(2) << "RunDelayedClosure " << name_; |
| AUTOLOCK(lock, &mu_); |
| DelayedClosureImpl* delayed_closure = |
| new DelayedClosureImpl(location, NowCached() + delay, closure); |
| delayed_pendings_.push(delayed_closure); |
| return delayed_closure; |
| } |
| |
| size_t WorkerThread::load() const { |
| AUTOLOCK(lock, &mu_); |
| size_t n = 0; |
| if (current_closure_data_) { |
| n += 1; |
| } |
| n += descriptors_.size(); |
| for (int priority = PRIORITY_MIN; priority < NUM_PRIORITIES; ++priority) { |
| int w = 1 << priority; |
| n += pendings_[priority].size() * w; |
| } |
| return n; |
| } |
| |
| size_t WorkerThread::pendings() const { |
| AUTOLOCK(lock, &mu_); |
| size_t n = 0; |
| for (int priority = PRIORITY_MIN; priority < NUM_PRIORITIES; ++priority) { |
| n += pendings_[priority].size(); |
| } |
| return n; |
| } |
| |
| bool WorkerThread::IsIdle() const { |
| AUTOLOCK(lock, &mu_); |
| return !current_closure_data_ && descriptors_.size() == 0; |
| } |
| |
| std::string WorkerThread::DebugString() const { |
| AUTOLOCK(lock, &mu_); |
| std::ostringstream s; |
| s << "thread[" << id_ << "/" << name_ << "] "; |
| s << " tick=" << tick_; |
| if (current_closure_data_) { |
| s << " " << current_closure_data_->location_; |
| s << " " << current_closure_data_->closure_; |
| } |
| s << ": " << descriptors_.size() << " descriptors"; |
| s << ": poll_interval=" << poll_interval_; |
| s << ": "; |
| for (int priority = PRIORITY_MIN; priority < NUM_PRIORITIES; ++priority) { |
| s << Priority_Name(static_cast<Priority>(priority)) |
| << "[" << pendings_[priority].size() << " pendings " |
| << " q=" << max_queuelen_[priority] |
| << " w=" << max_wait_time_[priority] |
| << "] "; |
| } |
| s << ": delayed=" << delayed_pendings_.size(); |
| s << ": periodic=" << periodic_closures_.size(); |
| if (pool_ != 0) |
| s << ": pool=" << pool_; |
| return s.str(); |
| } |
| |
| /* static */ |
| std::string WorkerThread::Priority_Name(Priority priority) { |
| switch (priority) { |
| case PRIORITY_LOW: return "PriLow"; |
| case PRIORITY_MED: return "PriMed"; |
| case PRIORITY_HIGH: return "PriHigh"; |
| case PRIORITY_IMMEDIATE: return "PriImmediate"; |
| default: |
| break; |
| } |
| std::ostringstream ss; |
| ss << "PriUnknown[" << priority << "]"; |
| return ss.str(); |
| } |
| |
| bool WorkerThread::NextClosure() { |
| AUTOLOCK_WITH_STAT(lock, &mu_, auto_lock_stat_next_closure_); |
| VLOG(5) << "NextClosure " << name_; |
| DCHECK(!now_cached_); // NowCached() will get new time |
| ++tick_; |
| current_closure_data_.reset(); |
| |
| // Default descriptor polling timeout. |
| // If there are pending closures, it will check descriptors without timeout. |
| // If there are deplayed closures, it will reduce intervals to the nearest |
| // delayed closure. |
| constexpr absl::Duration kPollInterval = absl::Milliseconds(500); |
| |
| poll_interval_ = kPollInterval; |
| |
| int priority = PRIORITY_IMMEDIATE; |
| for (priority = PRIORITY_IMMEDIATE; priority >= PRIORITY_MIN; --priority) { |
| if (!pendings_[priority].empty()) { |
| // PRIORITY_IMMEDIATE has higher priority than descriptors. |
| if (priority == PRIORITY_IMMEDIATE) { |
| current_closure_data_ = GetClosure(static_cast<Priority>(priority)); |
| return true; |
| } |
| // For lower priorities, descriptor availability is checked before |
| // running the closures. |
| poll_interval_ = absl::ZeroDuration(); |
| break; |
| } |
| } |
| |
| if (poll_interval_ > absl::ZeroDuration() && !delayed_pendings_.empty()) { |
| // Adjust poll_interval for delayed closure. |
| absl::Duration next_delay = delayed_pendings_.top()->time() - NowCached(); |
| if (next_delay < absl::ZeroDuration()) |
| next_delay = absl::ZeroDuration(); |
| poll_interval_ = std::min(poll_interval_, next_delay); |
| } |
| DescriptorPoller::CallbackQueue io_pendings; |
| VLOG(2) << "poll_interval=" << poll_interval_; |
| CHECK_GE(poll_interval_, absl::ZeroDuration()); |
| |
| const Timestamp poll_start_time = timer_.GetDuration(); |
| poller_->PollEvents(descriptors_, poll_interval_, priority, &io_pendings, |
| &mu_, &auto_lock_stat_poll_events_); |
| // Updated cached time value. |
| now_cached_ = timer_.GetDuration(); |
| // on Windows, poll time would be 0.51481 or so when no event happened. |
| // multiply 1.1 (i.e. 0.55) would be good. |
| if (NowCached() - poll_start_time > 1.1 * kPollInterval) { |
| LOG(WARNING) << id_ << " poll too slow:" |
| << (NowCached() - poll_start_time) << " nsec" |
| << " interval=" << poll_interval_ << " msec" |
| << " #descriptors=" << descriptors_.size() |
| << " priority=" << priority; |
| if (NowCached() - poll_start_time > absl::Seconds(1)) { |
| for (const auto& desc : descriptors_) { |
| LOG(WARNING) << id_ << " list of sockets on slow poll:" |
| << " fd=" << desc.first << " sd=" << desc.second.get() |
| << " sd.fd=" << desc.second->fd() |
| << " readable=" << desc.second->IsReadable() |
| << " closed=" << desc.second->IsClosed() |
| << " canreuse=" << desc.second->CanReuse() |
| << " err=" << desc.second->GetLastErrorMessage(); |
| } |
| } |
| } |
| |
| // Check delayed closures. |
| while (!delayed_pendings_.empty() && |
| (delayed_pendings_.top()->time() < NowCached() || shutting_down_)) { |
| DelayedClosureImpl* delayed_closure = delayed_pendings_.top(); |
| LOG_EVERY_SEC(INFO) << "delayed_closure location:" |
| << delayed_closure->location() |
| << " time:" << delayed_closure->time(); |
| delayed_pendings_.pop(); |
| AddClosure(delayed_closure->location(), PRIORITY_IMMEDIATE, |
| NewCallback(delayed_closure, &DelayedClosureImpl::Run)); |
| } |
| |
| // Check periodic closures. |
| for (const auto& periodic_closure : periodic_closures_) { |
| PermanentClosure* closure = periodic_closure->GetClosure(NowCached()); |
| if (closure != nullptr) { |
| VLOG(3) << "periodic=" << closure; |
| LOG_EVERY_SEC(INFO) << "periodic_closure location:" |
| << periodic_closure->location(); |
| AddClosure(periodic_closure->location(), |
| PRIORITY_IMMEDIATE, closure); |
| } |
| } |
| |
| // Check descriptors I/O. |
| for (auto& iter : io_pendings) { |
| Priority io_priority = iter.first; |
| std::deque<OneshotClosure*>& pendings = iter.second; |
| while (!pendings.empty()) { |
| LOG_EVERY_SEC(INFO) << "io closure: " << pendings.front(); |
| // TODO: use original location |
| AddClosure(FROM_HERE, io_priority, pendings.front()); |
| pendings.pop_front(); |
| } |
| } |
| |
| // Check pendings again. |
| for (priority = PRIORITY_IMMEDIATE; priority >= PRIORITY_MIN; --priority) { |
| if (!pendings_[priority].empty()) { |
| auto priority_typed = static_cast<Priority>(priority); |
| VLOG(2) << "pendings " << Priority_Name(priority_typed); |
| current_closure_data_ = GetClosure(priority_typed); |
| |
| if (quit_) { |
| // If worker thread is quiting, wake up thread soon. |
| poller_->Signal(); |
| } |
| return true; |
| } |
| } |
| |
| // No pendings. |
| DCHECK_LT(priority, PRIORITY_MIN); |
| if (quit_) { |
| VLOG(3) << "NextClosure: terminating"; |
| if (delayed_pendings_.empty() && |
| periodic_closures_.empty() && |
| descriptors_.empty()) { |
| pool_ = WorkerThreadManager::kDeadPool; |
| return false; |
| } |
| LOG(INFO) << "NextClosure: terminating but still active " |
| << " delayed_pendings=" << delayed_pendings_.size() |
| << " periodic_closures=" << periodic_closures_.size() |
| << " descriptors=" << descriptors_.empty(); |
| } |
| VLOG(4) << "NextClosure: no closure to run"; |
| return true; |
| } |
| |
| void WorkerThread::AddClosure(const char* const location, Priority priority, |
| Closure* closure) { |
| VLOG(2) << "AddClosure " << name_; |
| // mu_ held. |
| ClosureData closure_data(location, closure, pendings_[priority].size(), tick_, |
| timer_.GetDuration()); |
| if (closure_data.queuelen_ > max_queuelen_[priority]) { |
| max_queuelen_[priority] = closure_data.queuelen_; |
| } |
| pendings_[priority].push_back(closure_data); |
| } |
| |
| WorkerThread::ClosureData WorkerThread::GetClosure(Priority priority) { |
| // mu_ held. |
| CHECK(!pendings_[priority].empty()); |
| ClosureData closure_data = pendings_[priority].front(); |
| pendings_[priority].pop_front(); |
| absl::Duration wait_time = timer_.GetDuration() - closure_data.timestamp_; |
| if (wait_time > max_wait_time_[priority]) { |
| max_wait_time_[priority] = wait_time; |
| } |
| if (wait_time > absl::Minutes(1)) { |
| LOG(WARNING) << id_ << " too long in pending queue " |
| << Priority_Name(priority) |
| << " " << wait_time |
| << " queuelen=" << closure_data.queuelen_ |
| << " tick=" << (tick_ - closure_data.tick_); |
| } |
| return closure_data; |
| } |
| |
| void WorkerThread::InitializeWorkerKey() { |
| #ifndef _WIN32 |
| pthread_key_create(&key_worker_, nullptr); |
| #else |
| key_worker_ = TlsAlloc(); |
| #endif |
| } |
| |
| void WorkerThread::RegisterPollEvent(SocketDescriptor* d, |
| DescriptorEventType type) { |
| VLOG(2) << "RegisterPollEvent " << name_; |
| AUTOLOCK(lock, &mu_); |
| poller_->RegisterPollEvent(d, type); |
| } |
| |
| void WorkerThread::UnregisterPollEvent(SocketDescriptor* d, |
| DescriptorEventType type) { |
| VLOG(2) << "UnregisterPollEvent " << name_; |
| AUTOLOCK(lock, &mu_); |
| poller_->UnregisterPollEvent(d, type); |
| } |
| |
| void WorkerThread::RegisterTimeoutEvent(SocketDescriptor* d) { |
| VLOG(2) << "RegisterTimeoutEvent " << name_; |
| AUTOLOCK(lock, &mu_); |
| poller_->RegisterTimeoutEvent(d); |
| } |
| |
| void WorkerThread::UnregisterTimeoutEvent(SocketDescriptor* d) { |
| VLOG(2) << "UnregisterTimeoutEvent " << name_; |
| AUTOLOCK(lock, &mu_); |
| poller_->UnregisterTimeoutEvent(d); |
| } |
| |
| void WorkerThread::Start() { |
| VLOG(2) << "Start " << name_; |
| CHECK(PlatformThread::Create(this, &handle_)); |
| AUTOLOCK(lock, &mu_); |
| CHECK_NE(handle_, kNullThreadHandle); |
| while (id_ == 0) |
| cond_id_.Wait(&mu_); |
| } |
| |
| void WorkerThread::Join() { |
| VLOG(2) << "Join " << name_; |
| if (handle_ != kNullThreadHandle) { |
| LOG(INFO) << "Join thread:" << DebugString(); |
| { |
| AUTOLOCK(lock, &mu_); |
| CHECK(quit_); |
| } |
| FlushLogFiles(); |
| PlatformThread::Join(handle_); |
| } |
| handle_ = kNullThreadHandle; |
| id_ = 0; |
| } |
| |
| } // namespace devtools_goma |