blob: d54deba155fc6c1bbe0f9d68570b976799bf8285 [file]
// Copyright 2024 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/task/thread_pool/thread_group_semaphore.h"
#include <algorithm>
#include <string_view>
#include "base/metrics/histogram_macros.h"
#include "base/sequence_token.h"
#include "base/strings/stringprintf.h"
#include "base/task/common/checked_lock.h"
#include "base/task/thread_pool/thread_group.h"
#include "base/task/thread_pool/worker_thread_semaphore.h"
#include "base/threading/scoped_blocking_call.h"
#include "base/threading/scoped_blocking_call_internal.h"
#include "base/threading/thread_checker.h"
#include "base/time/time_override.h"
#include "base/trace_event/base_tracing.h"
namespace base {
namespace internal {
namespace {
constexpr size_t kMaxNumberOfWorkers = 256;
} // namespace
// Upon destruction, executes actions that control the number of active workers.
// Useful to satisfy locking requirements of these actions.
class ThreadGroupSemaphore::SemaphoreScopedCommandsExecutor
: public ThreadGroup::BaseScopedCommandsExecutor {
public:
explicit SemaphoreScopedCommandsExecutor(ThreadGroupSemaphore* outer)
: BaseScopedCommandsExecutor(outer) {}
SemaphoreScopedCommandsExecutor(const SemaphoreScopedCommandsExecutor&) =
delete;
SemaphoreScopedCommandsExecutor& operator=(
const SemaphoreScopedCommandsExecutor&) = delete;
~SemaphoreScopedCommandsExecutor() override {
CheckedLock::AssertNoLockHeldOnCurrentThread();
for (int i = 0; i < semaphore_signal_count_; ++i) {
TRACE_EVENT_INSTANT("wakeup.flow", "WorkerThreadSemaphore::Signal",
perfetto::Flow::FromPointer(&outer()->semaphore_));
outer()->semaphore_.Signal();
}
}
void ScheduleSignal() EXCLUSIVE_LOCKS_REQUIRED(outer()->lock_) {
++semaphore_signal_count_;
++outer()->num_active_signals_;
}
private:
friend class ThreadGroupSemaphore;
ThreadGroupSemaphore* outer() {
return static_cast<ThreadGroupSemaphore*>(outer_);
}
int semaphore_signal_count_ = 0;
};
class ThreadGroupSemaphore::SemaphoreWorkerDelegate
: public ThreadGroup::ThreadGroupWorkerDelegate,
public WorkerThreadSemaphore::Delegate {
public:
// `outer` owns the worker for which this delegate is
// constructed. `join_called_for_testing` is shared amongst workers, and
// owned by `outer`.
SemaphoreWorkerDelegate(TrackedRef<ThreadGroup> outer,
bool is_excess,
AtomicFlag* join_called_for_testing);
SemaphoreWorkerDelegate(const SemaphoreWorkerDelegate&) = delete;
SemaphoreWorkerDelegate& operator=(const SemaphoreWorkerDelegate&) = delete;
// OnMainExit() handles the thread-affine cleanup;
// SemaphoreWorkerDelegate can thereafter safely be deleted from any thread.
~SemaphoreWorkerDelegate() override = default;
// WorkerThread::Delegate:
void OnMainEntry(WorkerThread* worker) override;
void OnMainExit(WorkerThread* worker) override;
RegisteredTaskSource GetWork(WorkerThread* worker) override;
RegisteredTaskSource SwapProcessedTask(RegisteredTaskSource task_source,
WorkerThread* worker) override;
void RecordUnnecessaryWakeup() override;
TimeDelta GetSleepTimeout() override;
private:
const ThreadGroupSemaphore* outer() const {
return static_cast<ThreadGroupSemaphore*>(outer_.get());
}
ThreadGroupSemaphore* outer() {
return static_cast<ThreadGroupSemaphore*>(outer_.get());
}
// ThreadGroup::ThreadGroupWorkerDelegate:
bool CanGetWorkLockRequired(BaseScopedCommandsExecutor* executor,
WorkerThread* worker)
EXCLUSIVE_LOCKS_REQUIRED(outer()->lock_) override;
void CleanupLockRequired(BaseScopedCommandsExecutor* executor,
WorkerThread* worker)
EXCLUSIVE_LOCKS_REQUIRED(outer()->lock_) override;
void OnWorkerBecomesIdleLockRequired(BaseScopedCommandsExecutor* executor,
WorkerThread* worker)
EXCLUSIVE_LOCKS_REQUIRED(outer()->lock_) override;
// Returns true if `worker` is allowed to cleanup and remove itself from the
// thread group. Called from GetWork() when no work is available.
bool CanCleanupLockRequired(const WorkerThread* worker)
EXCLUSIVE_LOCKS_REQUIRED(outer()->lock_) override;
};
std::unique_ptr<ThreadGroup::BaseScopedCommandsExecutor>
ThreadGroupSemaphore::GetExecutor() {
return std::make_unique<SemaphoreScopedCommandsExecutor>(this);
}
ThreadGroupSemaphore::ThreadGroupSemaphore(std::string_view histogram_label,
std::string_view thread_group_label,
ThreadType thread_type_hint,
TrackedRef<TaskTracker> task_tracker,
TrackedRef<Delegate> delegate)
: ThreadGroup(histogram_label,
thread_group_label,
thread_type_hint,
std::move(task_tracker),
std::move(delegate)),
tracked_ref_factory_(this) {
DCHECK(!thread_group_label_.empty());
}
void ThreadGroupSemaphore::Start(
size_t max_tasks,
size_t max_best_effort_tasks,
TimeDelta suggested_reclaim_time,
scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,
WorkerThreadObserver* worker_thread_observer,
WorkerEnvironment worker_environment,
bool synchronous_thread_start_for_testing,
std::optional<TimeDelta> may_block_threshold) {
ThreadGroup::StartImpl(
max_tasks, max_best_effort_tasks, suggested_reclaim_time,
service_thread_task_runner, worker_thread_observer, worker_environment,
synchronous_thread_start_for_testing, may_block_threshold);
SemaphoreScopedCommandsExecutor executor(this);
CheckedAutoLock auto_lock(lock_);
DCHECK(workers_.empty());
EnsureEnoughWorkersLockRequired(&executor);
}
ThreadGroupSemaphore::~ThreadGroupSemaphore() {
// ThreadGroup should only ever be deleted:
// 1) In tests, after JoinForTesting().
// 2) In production, iff initialization failed.
// In both cases `workers_` should be empty.
DCHECK(workers_.empty());
}
void ThreadGroupSemaphore::UpdateSortKey(TaskSource::Transaction transaction) {
SemaphoreScopedCommandsExecutor executor(this);
UpdateSortKeyImpl(&executor, std::move(transaction));
}
void ThreadGroupSemaphore::PushTaskSourceAndWakeUpWorkers(
RegisteredTaskSourceAndTransaction transaction_with_task_source) {
SemaphoreScopedCommandsExecutor executor(this);
PushTaskSourceAndWakeUpWorkersImpl(&executor,
std::move(transaction_with_task_source));
}
size_t ThreadGroupSemaphore::NumberOfIdleWorkersLockRequiredForTesting() const {
return ClampSub(workers_.size(), num_active_signals_);
}
ThreadGroupSemaphore::SemaphoreWorkerDelegate::SemaphoreWorkerDelegate(
TrackedRef<ThreadGroup> outer,
bool is_excess,
AtomicFlag* join_called_for_testing)
: ThreadGroupWorkerDelegate(std::move(outer), is_excess),
WorkerThreadSemaphore::Delegate(
&static_cast<ThreadGroupSemaphore*>(outer.get())->semaphore_,
join_called_for_testing) {}
void ThreadGroupSemaphore::SemaphoreWorkerDelegate::OnMainEntry(
WorkerThread* worker) {
OnMainEntryImpl(worker);
}
void ThreadGroupSemaphore::SemaphoreWorkerDelegate::OnMainExit(
WorkerThread* worker_base) {
DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
#if DCHECK_IS_ON()
WorkerThreadSemaphore* worker =
static_cast<WorkerThreadSemaphore*>(worker_base);
{
bool shutdown_complete = outer()->task_tracker_->IsShutdownComplete();
CheckedAutoLock auto_lock(outer()->lock_);
// `worker` should already have been removed from `workers_` by the time the
// thread is about to exit. (except in the cases where the thread group is
// no longer going to be used - in which case, it's fine for there to be
// invalid workers in the thread group).
if (!shutdown_complete && !outer()->join_called_for_testing_.IsSet()) {
DCHECK(!ContainsWorker(outer()->workers_, worker));
}
}
#endif
#if BUILDFLAG(IS_WIN)
worker_only().win_thread_environment.reset();
#endif // BUILDFLAG(IS_WIN)
// Count cleaned up workers for tests. It's important to do this here
// instead of at the end of CleanupLockRequired() because some side-effects
// of cleaning up happen outside the lock (e.g. recording histograms) and
// resuming from tests must happen-after that point or checks on the main
// thread will be flaky (crbug.com/1047733).
CheckedAutoLock auto_lock(outer()->lock_);
++outer()->num_workers_cleaned_up_for_testing_;
#if DCHECK_IS_ON()
outer()->some_workers_cleaned_up_for_testing_ = true;
#endif
if (outer()->num_workers_cleaned_up_for_testing_cv_) {
outer()->num_workers_cleaned_up_for_testing_cv_->Signal();
}
}
bool ThreadGroupSemaphore::SemaphoreWorkerDelegate::CanGetWorkLockRequired(
BaseScopedCommandsExecutor* executor,
WorkerThread* worker_base) {
DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
WorkerThreadSemaphore* worker =
static_cast<WorkerThreadSemaphore*>(worker_base);
AnnotateAcquiredLockAlias annotate(outer()->lock_, lock());
// `timed_out_` is set by TimedWait().
if (timed_out_) {
if (CanCleanupLockRequired(worker)) {
CleanupLockRequired(executor, worker);
}
return false;
}
// If too many workers are currently awake (contrasted with ThreadGroupImpl
// where this decision is made by the number of workers which were signaled),
// this worker should not get work, until tasks are no longer in excess
// (i.e. max tasks increases).
if (outer()->num_active_signals_ > outer()->max_tasks_) {
OnWorkerBecomesIdleLockRequired(executor, worker);
return false;
}
return true;
}
RegisteredTaskSource ThreadGroupSemaphore::SemaphoreWorkerDelegate::GetWork(
WorkerThread* worker) {
DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
DCHECK(!read_worker().current_task_priority);
DCHECK(!read_worker().current_shutdown_behavior);
SemaphoreScopedCommandsExecutor executor(outer());
CheckedAutoLock auto_lock(outer()->lock_);
AnnotateAcquiredLockAlias alias(
outer()->lock_, static_cast<ThreadGroupSemaphore*>(outer_.get())->lock_);
return GetWorkLockRequired(&executor, worker);
}
RegisteredTaskSource
ThreadGroupSemaphore::SemaphoreWorkerDelegate::SwapProcessedTask(
RegisteredTaskSource task_source,
WorkerThread* worker) {
DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
DCHECK(read_worker().current_task_priority);
DCHECK(read_worker().current_shutdown_behavior);
// A transaction to the TaskSource to reenqueue, if any. Instantiated here as
// `TaskSource::lock_` is a UniversalPredecessor and must always be acquired
// prior to acquiring a second lock
std::optional<RegisteredTaskSourceAndTransaction>
transaction_with_task_source;
if (task_source) {
transaction_with_task_source.emplace(
RegisteredTaskSourceAndTransaction::FromTaskSource(
std::move(task_source)));
}
SemaphoreScopedCommandsExecutor workers_executor(outer());
ScopedReenqueueExecutor reenqueue_executor;
CheckedAutoLock auto_lock(outer()->lock_);
AnnotateAcquiredLockAlias annotate(outer()->lock_, lock());
// During shutdown, max_tasks may have been incremented in
// OnShutdownStartedLockRequired().
if (incremented_max_tasks_for_shutdown_) {
DCHECK(outer()->shutdown_started_);
outer()->DecrementMaxTasksLockRequired();
if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT) {
outer()->DecrementMaxBestEffortTasksLockRequired();
}
incremented_max_tasks_since_blocked_ = false;
incremented_max_best_effort_tasks_since_blocked_ = false;
incremented_max_tasks_for_shutdown_ = false;
}
DCHECK(read_worker().blocking_start_time.is_null());
DCHECK(!incremented_max_tasks_since_blocked_);
DCHECK(!incremented_max_best_effort_tasks_since_blocked_);
// Running task bookkeeping.
outer()->DecrementTasksRunningLockRequired(
*read_worker().current_task_priority);
write_worker().current_shutdown_behavior = std::nullopt;
write_worker().current_task_priority = std::nullopt;
if (transaction_with_task_source) {
// If there is a task to enqueue, we can swap it for another task without
// changing DesiredNumAwakeWorkers(), and thus without worrying about
// signaling/waiting.
outer()->ReEnqueueTaskSourceLockRequired(
&workers_executor, &reenqueue_executor,
std::move(transaction_with_task_source.value()));
return GetWorkLockRequired(&workers_executor,
static_cast<WorkerThreadSemaphore*>(worker));
} else if (outer()->GetDesiredNumAwakeWorkersLockRequired() >=
outer()->num_active_signals_) {
// When the thread pool wants more work to be run but hasn't signaled
// workers for it yet we can take advantage and grab more work without
// signal/wait contention.
return GetWorkLockRequired(&workers_executor,
static_cast<WorkerThreadSemaphore*>(worker));
}
// In the case where the worker does not have a task source to exchange and
// the thread group doesn't want more work than the number of workers awake,
// it must WaitForWork(), to keep `num_active_signals` synchronized with the
// number of desired awake workers.
OnWorkerBecomesIdleLockRequired(&workers_executor, worker);
return nullptr;
}
TimeDelta ThreadGroupSemaphore::SemaphoreWorkerDelegate::GetSleepTimeout() {
return ThreadPoolSleepTimeout();
}
bool ThreadGroupSemaphore::SemaphoreWorkerDelegate::CanCleanupLockRequired(
const WorkerThread* worker) {
DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
return is_excess_ && LIKELY(!outer()->worker_cleanup_disallowed_for_testing_);
}
void ThreadGroupSemaphore::SemaphoreWorkerDelegate::CleanupLockRequired(
BaseScopedCommandsExecutor* executor,
WorkerThread* worker_base) {
WorkerThreadSemaphore* worker =
static_cast<WorkerThreadSemaphore*>(worker_base);
DCHECK(!outer()->join_called_for_testing_.IsSet());
DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
worker->Cleanup();
// Remove the worker from `workers_`.
DCHECK(!outer()->after_start().no_worker_reclaim ||
outer()->workers_.size() > outer()->after_start().initial_max_tasks);
auto num_erased = std::erase(outer()->workers_, worker);
CHECK_EQ(num_erased, 1u);
}
void ThreadGroupSemaphore::SemaphoreWorkerDelegate::
OnWorkerBecomesIdleLockRequired(BaseScopedCommandsExecutor* executor,
WorkerThread* worker_base) {
DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
CHECK_GT(outer()->num_active_signals_, 0u);
--outer()->num_active_signals_;
outer()->idle_workers_set_cv_for_testing_.Signal();
}
void ThreadGroupSemaphore::SemaphoreWorkerDelegate::RecordUnnecessaryWakeup() {
DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
RecordUnnecessaryWakeupImpl();
}
void ThreadGroupSemaphore::JoinForTesting() {
decltype(workers_) workers_copy;
{
SemaphoreScopedCommandsExecutor executor(this);
CheckedAutoLock auto_lock(lock_);
AnnotateAcquiredLockAlias alias(lock_, executor.outer()->lock_);
priority_queue_.EnableFlushTaskSourcesOnDestroyForTesting();
DCHECK_GT(workers_.size(), size_t(0))
<< "Joined an unstarted thread group.";
join_called_for_testing_.Set();
// Ensure WorkerThreads in `workers_` do not attempt to cleanup while
// being joined.
worker_cleanup_disallowed_for_testing_ = true;
// Make a copy of the WorkerThreads so that we can call
// WorkerThread::JoinForTesting() without holding `lock_` since
// WorkerThreads may need to access `workers_`.
workers_copy = workers_;
for (size_t i = 0; i < workers_copy.size(); ++i) {
executor.ScheduleSignal();
}
join_called_for_testing_.Set();
}
for (const auto& worker : workers_copy) {
static_cast<WorkerThreadSemaphore*>(worker.get())->JoinForTesting();
}
CheckedAutoLock auto_lock(lock_);
DCHECK(workers_ == workers_copy);
// Release `workers_` to clear their TrackedRef against `this`.
workers_.clear();
}
void ThreadGroupSemaphore::CreateAndRegisterWorkerLockRequired(
SemaphoreScopedCommandsExecutor* executor) {
if (workers_.size() == kMaxNumberOfWorkers) {
return;
}
DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
if (workers_.size() >= max_tasks_) {
return;
}
DCHECK(!join_called_for_testing_.IsSet());
// WorkerThread needs `lock_` as a predecessor for its thread lock because in
// GetWork(), `lock_` is first acquired and then the thread lock is acquired
// when GetLastUsedTime() is called on the worker by CanGetWorkLockRequired().
scoped_refptr<WorkerThreadSemaphore> worker =
MakeRefCounted<WorkerThreadSemaphore>(
thread_type_hint_,
std::make_unique<SemaphoreWorkerDelegate>(
tracked_ref_factory_.GetTrackedRef(),
/*is_excess=*/after_start().no_worker_reclaim
? workers_.size() >= after_start().initial_max_tasks
: true,
&join_called_for_testing_),
task_tracker_, worker_sequence_num_++, &lock_, &semaphore_);
DCHECK(worker);
workers_.push_back(worker);
DCHECK_LE(workers_.size(), max_tasks_);
executor->ScheduleStart(worker);
}
void ThreadGroupSemaphore::DidUpdateCanRunPolicy() {
SemaphoreScopedCommandsExecutor executor(this);
CheckedAutoLock auto_lock(lock_);
EnsureEnoughWorkersLockRequired(&executor);
}
ThreadGroup::ThreadGroupWorkerDelegate* ThreadGroupSemaphore::GetWorkerDelegate(
WorkerThread* worker) {
return static_cast<ThreadGroup::ThreadGroupWorkerDelegate*>(
static_cast<SemaphoreWorkerDelegate*>(worker->delegate()));
}
void ThreadGroupSemaphore::OnShutdownStarted() {
SemaphoreScopedCommandsExecutor executor(this);
OnShutDownStartedImpl(&executor);
}
void ThreadGroupSemaphore::EnsureEnoughWorkersLockRequired(
BaseScopedCommandsExecutor* base_executor) {
// Don't do anything if the thread group isn't started.
if (max_tasks_ == 0 || UNLIKELY(join_called_for_testing_.IsSet())) {
return;
}
SemaphoreScopedCommandsExecutor* executor =
static_cast<SemaphoreScopedCommandsExecutor*>(base_executor);
const size_t desired_awake_workers = GetDesiredNumAwakeWorkersLockRequired();
// The +1 here is due to the fact that we always want there to be one idle
// worker.
const size_t num_workers_to_create =
std::min({static_cast<size_t>(after_start().max_num_workers_created),
static_cast<size_t>(
ClampSub(desired_awake_workers + 1, workers_.size()))});
for (size_t i = 0; i < num_workers_to_create; ++i) {
CreateAndRegisterWorkerLockRequired(executor);
}
const size_t new_signals = std::min(
// Don't signal more than `workers_.size()` workers.
{ClampSub(workers_.size(), num_active_signals_),
ClampSub(desired_awake_workers, num_active_signals_)});
AnnotateAcquiredLockAlias alias(lock_, executor->outer()->lock_);
for (size_t i = 0; i < new_signals; ++i) {
executor->ScheduleSignal();
}
// This function is called every time a task source is (re-)enqueued,
// hence the minimum priority needs to be updated.
UpdateMinAllowedPriorityLockRequired();
// Ensure that the number of workers is periodically adjusted if needed.
MaybeScheduleAdjustMaxTasksLockRequired(executor);
}
} // namespace internal
} // namespace base