| // Copyright 2016 The Chromium Authors |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "base/task/thread_pool/thread_group_impl.h" |
| |
| #include <optional> |
| #include <string_view> |
| |
| #include "base/metrics/histogram_macros.h" |
| #include "base/profiler/thread_group_profiler.h" |
| #include "base/sequence_token.h" |
| #include "base/strings/stringprintf.h" |
| #include "base/task/common/checked_lock.h" |
| #include "base/task/thread_pool/worker_thread.h" |
| #include "base/threading/platform_thread_metrics.h" |
| #include "base/threading/scoped_blocking_call.h" |
| #include "base/threading/scoped_blocking_call_internal.h" |
| #include "base/threading/thread_checker.h" |
| #include "base/time/time_override.h" |
| #include "base/trace_event/trace_event.h" |
| #include "third_party/abseil-cpp/absl/container/inlined_vector.h" |
| |
| namespace base::internal { |
| |
| // Upon destruction, executes actions that control the number of active workers. |
| // Useful to satisfy locking requirements of these actions. |
| class ThreadGroupImpl::ScopedCommandsExecutor |
| : public ThreadGroup::BaseScopedCommandsExecutor { |
| public: |
| explicit ScopedCommandsExecutor(ThreadGroupImpl* outer) |
| : BaseScopedCommandsExecutor(outer) {} |
| |
| ScopedCommandsExecutor(const ScopedCommandsExecutor&) = delete; |
| ScopedCommandsExecutor& operator=(const ScopedCommandsExecutor&) = delete; |
| ~ScopedCommandsExecutor() override { |
| CheckedLock::AssertNoLockHeldOnCurrentThread(); |
| |
| // Wake up workers. |
| for (auto& worker : workers_to_wake_up_) { |
| worker->WakeUp(); |
| } |
| } |
| |
| void ScheduleWakeUp(scoped_refptr<WorkerThread> worker) { |
| workers_to_wake_up_.emplace_back(std::move(worker)); |
| } |
| |
| private: |
| absl::InlinedVector<scoped_refptr<WorkerThread>, 2> workers_to_wake_up_; |
| }; |
| |
| class ThreadGroupImpl::WorkerDelegate : public WorkerThread::Delegate, |
| public BlockingObserver { |
| public: |
| // |outer| owns the worker for which this delegate is constructed. If |
| // |is_excess| is true, this worker will be eligible for reclaim. |
| explicit WorkerDelegate(TrackedRef<ThreadGroupImpl> outer, bool is_excess); |
| WorkerDelegate(const WorkerDelegate&) = delete; |
| WorkerDelegate& operator=(const WorkerDelegate&) = delete; |
| |
| // WorkerThread::Delegate: |
| void OnMainEntry(WorkerThread* worker) override; |
| void OnMainExit(WorkerThread* worker) override; |
| RegisteredTaskSource GetWork(WorkerThread* worker) override; |
| RegisteredTaskSource SwapProcessedTask(RegisteredTaskSource task_source, |
| WorkerThread* worker) override; |
| void RecordUnnecessaryWakeup() override; |
| TimeDelta GetSleepTimeout() override; |
| |
| // BlockingObserver: |
| void BlockingStarted(BlockingType blocking_type) override; |
| void BlockingTypeUpgraded() override; |
| void BlockingEnded() override; |
| |
| // WorkerThread::Delegate: |
| |
| // Notifies the worker of shutdown, possibly marking the running task as |
| // MAY_BLOCK. |
| void OnShutdownStartedLockRequired(BaseScopedCommandsExecutor* executor) |
| EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_); |
| |
| // Increments max [best effort] tasks iff this worker has been within a |
| // ScopedBlockingCall for more than |may_block_threshold|. |
| void MaybeIncrementMaxTasksLockRequired() |
| EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_); |
| |
| // Increments max [best effort] tasks. |
| void IncrementMaxTasksLockRequired() EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_); |
| |
| TaskPriority current_task_priority_lock_required() const |
| EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) { |
| return *read_any().current_task_priority; |
| } |
| |
| // Exposed for AnnotateAcquiredLockAlias. |
| const CheckedLock& lock() const LOCK_RETURNED(outer_->lock_) { |
| return outer_->lock_; |
| } |
| |
| private: |
| // Returns true iff the worker can get work. Cleans up the worker or puts it |
| // on the idle set if it can't get work. |
| bool CanGetWorkLockRequired(BaseScopedCommandsExecutor* executor, |
| WorkerThread* worker) |
| EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_); |
| |
| // Calls cleanup on |worker| and removes it from the thread group. Called from |
| // GetWork() when no work is available and CanCleanupLockRequired() returns |
| // true. |
| void CleanupLockRequired(BaseScopedCommandsExecutor* executor, |
| WorkerThread* worker) |
| EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_); |
| |
| // Called in GetWork() when a worker becomes idle. |
| void OnWorkerBecomesIdleLockRequired(BaseScopedCommandsExecutor* executor, |
| WorkerThread* worker) |
| EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_); |
| |
| RegisteredTaskSource GetWorkLockRequired(BaseScopedCommandsExecutor* executor, |
| WorkerThread* worker) |
| EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_); |
| |
| // Returns true if |worker| is allowed to cleanup and remove itself from the |
| // thread group. Called from GetWork() when no work is available. |
| bool CanCleanupLockRequired(const WorkerThread* worker) |
| EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_); |
| |
| // Only used in DCHECKs. |
| template <typename Worker> |
| bool ContainsWorker(const std::vector<scoped_refptr<Worker>>& workers, |
| const WorkerThread* worker) { |
| auto it = std::ranges::find_if( |
| workers, |
| [worker](const scoped_refptr<Worker>& i) { return i.get() == worker; }); |
| return it != workers.end(); |
| } |
| |
| // Accessed only from the worker thread. |
| struct WorkerOnly { |
| WorkerOnly(); |
| ~WorkerOnly(); |
| // Associated WorkerThread, if any, initialized in OnMainEntry(). |
| raw_ptr<WorkerThread> worker_thread_; |
| |
| #if BUILDFLAG(IS_WIN) |
| std::unique_ptr<win::ScopedWindowsThreadEnvironment> win_thread_environment; |
| #endif // BUILDFLAG(IS_WIN) |
| } worker_only_; |
| |
| // Writes from the worker thread protected by |outer_->lock_|. Reads from any |
| // thread, protected by |outer_->lock_| when not on the worker thread. |
| struct WriteWorkerReadAny { |
| // The priority of the task the worker is currently running if any. |
| std::optional<TaskPriority> current_task_priority; |
| // The shutdown behavior of the task the worker is currently running if any. |
| std::optional<TaskShutdownBehavior> current_shutdown_behavior; |
| |
| // Time when MayBlockScopeEntered() was last called. Reset when |
| // BlockingScopeExited() is called. |
| TimeTicks blocking_start_time; |
| |
| // Whether the worker is currently running a task (i.e. GetWork() has |
| // returned a non-empty task source and DidProcessTask() hasn't been called |
| // yet). |
| bool is_running_task() const { return !!current_shutdown_behavior; } |
| } write_worker_read_any_; |
| |
| WorkerOnly& worker_only() { |
| DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); |
| return worker_only_; |
| } |
| |
| WriteWorkerReadAny& write_worker() EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) { |
| DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); |
| return write_worker_read_any_; |
| } |
| |
| const WriteWorkerReadAny& read_any() const |
| EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) { |
| return write_worker_read_any_; |
| } |
| |
| const WriteWorkerReadAny& read_worker() const { |
| DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); |
| return write_worker_read_any_; |
| } |
| |
| const TrackedRef<ThreadGroupImpl> outer_; |
| |
| // Whether the worker is in excess. This must be decided at worker creation |
| // time to prevent unnecessarily discarding TLS state, as well as any behavior |
| // the OS has learned about a given thread. |
| const bool is_excess_; |
| |
| // Whether |outer_->max_tasks_|/|outer_->max_best_effort_tasks_| were |
| // incremented due to a ScopedBlockingCall on the thread. |
| bool incremented_max_tasks_since_blocked_ GUARDED_BY(outer_->lock_) = false; |
| bool incremented_max_best_effort_tasks_since_blocked_ |
| GUARDED_BY(outer_->lock_) = false; |
| // Whether |outer_->max_tasks_| and |outer_->max_best_effort_tasks_| was |
| // incremented due to running CONTINUE_ON_SHUTDOWN on the thread during |
| // shutdown. |
| bool incremented_max_tasks_for_shutdown_ GUARDED_BY(outer_->lock_) = false; |
| |
| // Verifies that specific calls are always made from the worker thread. |
| THREAD_CHECKER(worker_thread_checker_); |
| }; |
| |
| ThreadGroupImpl::ThreadGroupImpl(std::string_view histogram_label, |
| std::string_view thread_group_label, |
| ThreadType thread_type_hint, |
| int64_t thread_group_type, |
| TrackedRef<TaskTracker> task_tracker, |
| TrackedRef<Delegate> delegate, |
| bool monitor_worker_thread_priorities) |
| : ThreadGroup(histogram_label, |
| thread_group_label, |
| thread_type_hint, |
| std::move(task_tracker), |
| std::move(delegate)), |
| thread_group_type_(thread_group_type), |
| tracked_ref_factory_(this), |
| monitor_worker_thread_priorities_(monitor_worker_thread_priorities) { |
| DCHECK(!thread_group_label_.empty()); |
| } |
| |
| void ThreadGroupImpl::Start( |
| size_t max_tasks, |
| size_t max_best_effort_tasks, |
| TimeDelta suggested_reclaim_time, |
| scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner, |
| WorkerThreadObserver* worker_thread_observer, |
| WorkerEnvironment worker_environment, |
| bool synchronous_thread_start_for_testing, |
| std::optional<TimeDelta> may_block_threshold) { |
| #if DCHECK_IS_ON() |
| DCHECK(!in_start().start_called); |
| in_start().start_called = true; |
| #endif |
| { |
| ScopedCommandsExecutor executor(this); |
| CheckedAutoLock auto_lock(lock_); |
| |
| ThreadGroup::StartImplLockRequired( |
| max_tasks, max_best_effort_tasks, suggested_reclaim_time, |
| service_thread_task_runner, worker_thread_observer, worker_environment, |
| synchronous_thread_start_for_testing, may_block_threshold); |
| |
| DCHECK(workers_.empty()); |
| EnsureEnoughWorkersLockRequired(&executor); |
| } |
| |
| if (ThreadGroupProfiler::IsProfilingEnabled()) { |
| // This call posts a task, so do it outside of the lock. |
| thread_group_profiler_.emplace(service_thread_task_runner, |
| thread_group_type_); |
| } |
| } |
| |
| ThreadGroupImpl::~ThreadGroupImpl() { |
| // ThreadGroup should only ever be deleted: |
| // 1) In tests, after JoinForTesting(). |
| // 2) In production, iff initialization failed. |
| // In both cases |workers_| should be empty. |
| DCHECK(workers_.empty()); |
| } |
| |
| void ThreadGroupImpl::UpdateSortKey(TaskSource::Transaction transaction) { |
| ScopedCommandsExecutor executor(this); |
| UpdateSortKeyImpl(&executor, std::move(transaction)); |
| } |
| |
| void ThreadGroupImpl::PushTaskSourceAndWakeUpWorkers( |
| RegisteredTaskSourceAndTransaction transaction_with_task_source) { |
| ScopedCommandsExecutor executor(this); |
| PushTaskSourceAndWakeUpWorkersImpl(&executor, |
| std::move(transaction_with_task_source)); |
| } |
| |
| ThreadGroupImpl::WorkerDelegate::WorkerDelegate( |
| TrackedRef<ThreadGroupImpl> outer, |
| bool is_excess) |
| : outer_(outer), is_excess_(is_excess) { |
| // Bound in OnMainEntry(). |
| DETACH_FROM_THREAD(worker_thread_checker_); |
| } |
| |
| ThreadGroupImpl::WorkerDelegate::WorkerOnly::WorkerOnly() = default; |
| ThreadGroupImpl::WorkerDelegate::WorkerOnly::~WorkerOnly() = default; |
| |
| TimeDelta ThreadGroupImpl::WorkerDelegate::GetSleepTimeout() { |
| DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); |
| if (!is_excess_) { |
| return TimeDelta::Max(); |
| } |
| // Sleep for an extra 10% to avoid the following pathological case: |
| // 0) A task is running on a timer which matches |
| // |after_start().suggested_reclaim_time|. |
| // 1) The timer fires and this worker is created by |
| // MaintainAtLeastOneIdleWorkerLockRequired() because the last idle |
| // worker was assigned the task. |
| // 2) This worker begins sleeping |after_start().suggested_reclaim_time| |
| // (at the front of the idle set). |
| // 3) The task assigned to the other worker completes and the worker goes |
| // back in the idle set (this worker may now second on the idle set; |
| // its GetLastUsedTime() is set to Now()). |
| // 4) The sleep in (2) expires. Since (3) was fast this worker is likely |
| // to have been second on the idle set long enough for |
| // CanCleanupLockRequired() to be satisfied in which case this worker |
| // is cleaned up. |
| // 5) The timer fires at roughly the same time and we're back to (1) if |
| // (4) resulted in a clean up; causing thread churn. |
| // |
| // Sleeping 10% longer in (2) makes it much less likely that (4) occurs |
| // before (5). In that case (5) will cause (3) and refresh this worker's |
| // GetLastUsedTime(), making CanCleanupLockRequired() return false in (4) |
| // and avoiding churn. |
| // |
| // Of course the same problem arises if in (0) the timer matches |
| // |after_start().suggested_reclaim_time * 1.1| but it's expected that any |
| // timer slower than |after_start().suggested_reclaim_time| will cause |
| // such churn during long idle periods. If this is a problem in practice, |
| // the standby thread configuration and algorithm should be revisited. |
| return outer_->after_start().suggested_reclaim_time * 1.1; |
| } |
| |
| void ThreadGroupImpl::WorkerDelegate::OnMainEntry(WorkerThread* worker) { |
| DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); |
| |
| { |
| #if DCHECK_IS_ON() |
| CheckedAutoLock auto_lock(outer_->lock_); |
| DCHECK( |
| ContainsWorker(outer_->workers_, static_cast<WorkerThread*>(worker))); |
| #endif |
| } |
| |
| #if BUILDFLAG(IS_WIN) |
| worker_only().win_thread_environment = GetScopedWindowsThreadEnvironment( |
| outer_->after_start().worker_environment); |
| #endif // BUILDFLAG(IS_WIN) |
| |
| std::string thread_name = |
| StringPrintf("ThreadPool%sWorker", outer_->thread_group_label_.c_str()); |
| PlatformThread::SetName(thread_name); |
| #if BUILDFLAG(IS_ANDROID) |
| if (outer_->monitor_worker_thread_priorities_) { |
| PlatformThreadPriorityMonitor::Get().RegisterCurrentThread(thread_name); |
| } |
| #endif // BUILDFLAG(IS_ANDROID) |
| |
| outer_->BindToCurrentThread(); |
| worker_only().worker_thread_ = static_cast<WorkerThread*>(worker); |
| SetBlockingObserverForCurrentThread(this); |
| |
| if (outer_->thread_group_profiler_) { |
| outer_->thread_group_profiler_->OnWorkerThreadStarted(worker); |
| } |
| |
| if (outer_->worker_started_for_testing_) { |
| // When |worker_started_for_testing_| is set, the thread that starts |
| // workers should wait for a worker to have started before starting the |
| // next one, and there should only be one thread that wakes up workers at |
| // a time. |
| DCHECK(!outer_->worker_started_for_testing_->IsSignaled()); |
| outer_->worker_started_for_testing_->Signal(); |
| } |
| } |
| |
| void ThreadGroupImpl::WorkerDelegate::OnMainExit(WorkerThread* worker_base) { |
| DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); |
| |
| #if DCHECK_IS_ON() |
| WorkerThread* worker = static_cast<WorkerThread*>(worker_base); |
| { |
| bool shutdown_complete = outer_->task_tracker_->IsShutdownComplete(); |
| CheckedAutoLock auto_lock(outer_->lock_); |
| |
| // |worker| should already have been removed from the idle workers set and |
| // |workers_| by the time the thread is about to exit. (except in the |
| // cases where the thread group is no longer going to be used - in which |
| // case, it's fine for there to be invalid workers in the thread group). |
| if (!shutdown_complete && !outer_->join_for_testing_started_) { |
| DCHECK(!outer_->idle_workers_set_.Contains(worker)); |
| DCHECK(!ContainsWorker(outer_->workers_, worker)); |
| } |
| } |
| #endif |
| |
| #if BUILDFLAG(IS_WIN) |
| worker_only().win_thread_environment.reset(); |
| #endif // BUILDFLAG(IS_WIN) |
| |
| if (outer_->thread_group_profiler_) { |
| outer_->thread_group_profiler_->OnWorkerThreadExiting(worker_base); |
| } |
| |
| // Count cleaned up workers for tests. It's important to do this here |
| // instead of at the end of CleanupLockRequired() because some side-effects |
| // of cleaning up happen outside the lock (e.g. recording histograms) and |
| // resuming from tests must happen-after that point or checks on the main |
| // thread will be flaky (crbug.com/1047733). |
| CheckedAutoLock auto_lock(outer_->lock_); |
| ++outer_->num_workers_cleaned_up_for_testing_; |
| #if DCHECK_IS_ON() |
| outer_->some_workers_cleaned_up_for_testing_ = true; |
| #endif |
| if (outer_->num_workers_cleaned_up_for_testing_cv_) { |
| outer_->num_workers_cleaned_up_for_testing_cv_->Signal(); |
| } |
| } |
| |
| bool ThreadGroupImpl::WorkerDelegate::CanGetWorkLockRequired( |
| BaseScopedCommandsExecutor* executor, |
| WorkerThread* worker_base) { |
| WorkerThread* worker = static_cast<WorkerThread*>(worker_base); |
| |
| const bool is_on_idle_workers_set = outer_->IsOnIdleSetLockRequired(worker); |
| DCHECK_EQ(is_on_idle_workers_set, outer_->idle_workers_set_.Contains(worker)); |
| |
| // This occurs when the when WorkerThread::Delegate::WaitForWork() times out |
| // (i.e. when the worker's wakes up after GetSleepTimeout()). |
| if (is_on_idle_workers_set) { |
| if (CanCleanupLockRequired(worker)) { |
| CleanupLockRequired(executor, worker); |
| } |
| return false; |
| } |
| |
| // If too many workers are running, this worker should not get work, until |
| // tasks are no longer in excess (i.e. max tasks increases). This ensures that |
| // if this worker is in excess, it gets a chance to being cleaned up. |
| if (outer_->GetNumAwakeWorkersLockRequired() > outer_->max_tasks_) { |
| OnWorkerBecomesIdleLockRequired(executor, worker); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| RegisteredTaskSource ThreadGroupImpl::WorkerDelegate::GetWork( |
| WorkerThread* worker) { |
| DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); |
| DCHECK(!read_worker().current_task_priority); |
| DCHECK(!read_worker().current_shutdown_behavior); |
| |
| ScopedCommandsExecutor executor(outer_.get()); |
| RegisteredTaskSource task_source; |
| { |
| CheckedAutoLock auto_lock(outer_->lock_); |
| task_source = GetWorkLockRequired(&executor, worker); |
| } |
| // Notify the profiler on the worker thread status when profiling is enabled. |
| // This must be called without holding lock_ as lock_ is not a universal |
| // predecessor that does not satisfy OnWorkerThreadIdle's CheckedLock. |
| if (outer_->thread_group_profiler_) { |
| // GetWork is only called when waking up, i.e. from an idle state. No need |
| // to mark it idle again if no task source available. |
| if (task_source) { |
| outer_->thread_group_profiler_->OnWorkerThreadActive(worker); |
| } |
| } |
| return task_source; |
| } |
| |
| RegisteredTaskSource ThreadGroupImpl::WorkerDelegate::GetWorkLockRequired( |
| BaseScopedCommandsExecutor* executor, |
| WorkerThread* worker) { |
| DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); |
| DCHECK(ContainsWorker(outer_->workers_, worker)); |
| |
| if (!CanGetWorkLockRequired(executor, worker)) { |
| return nullptr; |
| } |
| |
| RegisteredTaskSource task_source; |
| TaskPriority priority; |
| while (!task_source && !outer_->priority_queue_.IsEmpty()) { |
| // Enforce the CanRunPolicy and that no more than |max_best_effort_tasks_| |
| // BEST_EFFORT tasks run concurrently. |
| priority = outer_->priority_queue_.PeekSortKey().priority(); |
| if (!outer_->task_tracker_->CanRunPriority(priority) || |
| (priority == TaskPriority::BEST_EFFORT && |
| outer_->num_running_best_effort_tasks_ >= |
| outer_->max_best_effort_tasks_)) { |
| break; |
| } |
| |
| task_source = outer_->TakeRegisteredTaskSource(executor); |
| } |
| if (!task_source) { |
| OnWorkerBecomesIdleLockRequired(executor, worker); |
| return nullptr; |
| } |
| |
| // Running task bookkeeping. |
| outer_->IncrementTasksRunningLockRequired(priority); |
| |
| write_worker().current_task_priority = priority; |
| write_worker().current_shutdown_behavior = task_source->shutdown_behavior(); |
| |
| // Subtle: This must be after the call to WillRunTask() inside |
| // TakeRegisteredTaskSource(), so that any state used by WillRunTask() to |
| // determine that the task source must remain in the TaskQueue is also used |
| // to determine the desired number of workers. Concretely, this wouldn't |
| // work: |
| // |
| // Thread 1: GetWork() calls EnsureEnoughWorkers(). No worker woken up |
| // because the queue contains a job with max concurrency = 1 and |
| // the current worker is awake. |
| // Thread 2: Increases the job's max concurrency. |
| // ShouldQueueUponCapacityIncrease() returns false because the |
| // job is already queued. |
| // Thread 1: Calls WillRunTask() on the job. It returns |
| // kAllowedNotSaturated because max concurrency is not reached. |
| // But no extra worker is woken up to run the job! |
| outer_->EnsureEnoughWorkersLockRequired(executor); |
| |
| return task_source; |
| } |
| |
| RegisteredTaskSource ThreadGroupImpl::WorkerDelegate::SwapProcessedTask( |
| RegisteredTaskSource task_source, |
| WorkerThread* worker) { |
| DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); |
| DCHECK(read_worker().current_task_priority); |
| DCHECK(read_worker().current_shutdown_behavior); |
| |
| // A transaction to the TaskSource to reenqueue, if any. Instantiated here as |
| // |TaskSource::lock_| is a UniversalPredecessor and must always be acquired |
| // prior to acquiring a second lock |
| std::optional<RegisteredTaskSourceAndTransaction> |
| transaction_with_task_source; |
| if (task_source) { |
| transaction_with_task_source.emplace( |
| RegisteredTaskSourceAndTransaction::FromTaskSource( |
| std::move(task_source))); |
| } |
| |
| // Calling WakeUp() guarantees that this WorkerThread will run Tasks from |
| // TaskSources returned by the GetWork() method of |delegate_| until it |
| // returns nullptr. Resetting |wake_up_event_| here doesn't break this |
| // invariant and avoids a useless loop iteration before going to sleep if |
| // WakeUp() is called while this WorkerThread is awake. |
| wake_up_event_.Reset(); |
| |
| ScopedCommandsExecutor workers_executor(outer_.get()); |
| ScopedReenqueueExecutor reenqueue_executor; |
| RegisteredTaskSource next_task_source; |
| { |
| CheckedAutoLock auto_lock(outer_->lock_); |
| |
| // During shutdown, max_tasks may have been incremented in |
| // OnShutdownStartedLockRequired(). |
| if (incremented_max_tasks_for_shutdown_) { |
| DCHECK(outer_->shutdown_started_); |
| outer_->DecrementMaxTasksLockRequired(); |
| if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT) { |
| outer_->DecrementMaxBestEffortTasksLockRequired(); |
| } |
| incremented_max_tasks_since_blocked_ = false; |
| incremented_max_best_effort_tasks_since_blocked_ = false; |
| incremented_max_tasks_for_shutdown_ = false; |
| } |
| |
| DCHECK(read_worker().blocking_start_time.is_null()); |
| DCHECK(!incremented_max_tasks_since_blocked_); |
| DCHECK(!incremented_max_best_effort_tasks_since_blocked_); |
| |
| // Running task bookkeeping. |
| outer_->DecrementTasksRunningLockRequired( |
| *read_worker().current_task_priority); |
| write_worker().current_shutdown_behavior = std::nullopt; |
| write_worker().current_task_priority = std::nullopt; |
| |
| if (transaction_with_task_source) { |
| outer_->ReEnqueueTaskSourceLockRequired( |
| &workers_executor, &reenqueue_executor, |
| std::move(transaction_with_task_source.value())); |
| } |
| |
| next_task_source = GetWorkLockRequired(&workers_executor, |
| static_cast<WorkerThread*>(worker)); |
| } |
| // Must be called without holding a lock. |
| if (outer_->thread_group_profiler_ && !task_source) { |
| outer_->thread_group_profiler_->OnWorkerThreadIdle(worker); |
| } |
| return next_task_source; |
| } |
| |
| bool ThreadGroupImpl::WorkerDelegate::CanCleanupLockRequired( |
| const WorkerThread* worker) { |
| DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); |
| if (!is_excess_) { |
| return false; |
| } |
| |
| const TimeTicks last_used_time = worker->GetLastUsedTime(); |
| if (last_used_time.is_null() || |
| subtle::TimeTicksNowIgnoringOverride() - last_used_time < |
| outer_->after_start().suggested_reclaim_time) { |
| return false; |
| } |
| if (!outer_->worker_cleanup_disallowed_for_testing_) [[likely]] { |
| return true; |
| } |
| return false; |
| } |
| |
| void ThreadGroupImpl::WorkerDelegate::CleanupLockRequired( |
| BaseScopedCommandsExecutor* executor, |
| WorkerThread* worker_base) { |
| WorkerThread* worker = static_cast<WorkerThread*>(worker_base); |
| DCHECK(!outer_->join_for_testing_started_); |
| DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); |
| |
| worker->Cleanup(); |
| |
| if (outer_->IsOnIdleSetLockRequired(worker)) { |
| outer_->idle_workers_set_.Remove(worker); |
| } |
| |
| // Remove the worker from |workers_|. |
| auto worker_iter = std::ranges::find(outer_->workers_, worker); |
| CHECK(worker_iter != outer_->workers_.end()); |
| outer_->workers_.erase(worker_iter); |
| } |
| |
| void ThreadGroupImpl::WorkerDelegate::OnWorkerBecomesIdleLockRequired( |
| BaseScopedCommandsExecutor* executor, |
| WorkerThread* worker_base) { |
| WorkerThread* worker = static_cast<WorkerThread*>(worker_base); |
| |
| DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); |
| DCHECK(!outer_->idle_workers_set_.Contains(worker)); |
| |
| // Add the worker to the idle set. |
| outer_->idle_workers_set_.Insert(worker); |
| DCHECK_LE(outer_->idle_workers_set_.Size(), outer_->workers_.size()); |
| outer_->idle_workers_set_cv_for_testing_.Broadcast(); |
| } |
| |
| void ThreadGroupImpl::WorkerDelegate::RecordUnnecessaryWakeup() { |
| DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); |
| |
| base::BooleanHistogram::FactoryGet( |
| outer_->unnecessary_wakeup_histogram_label_, |
| base::Histogram::kUmaTargetedHistogramFlag) |
| ->Add(true); |
| |
| TRACE_EVENT_INSTANT("wakeup.flow", "ThreadPool.UnnecessaryWakeup"); |
| } |
| |
| void ThreadGroupImpl::WorkerDelegate::BlockingStarted( |
| BlockingType blocking_type) { |
| DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); |
| DCHECK(worker_only().worker_thread_); |
| // Skip if this blocking scope happened outside of a RunTask. |
| if (!read_worker().current_task_priority) { |
| return; |
| } |
| |
| worker_only().worker_thread_->MaybeUpdateThreadType(); |
| |
| // WillBlock is always used when time overrides is active. crbug.com/1038867 |
| if (base::subtle::ScopedTimeClockOverrides::overrides_active()) { |
| blocking_type = BlockingType::WILL_BLOCK; |
| } |
| |
| ScopedCommandsExecutor executor(outer_.get()); |
| CheckedAutoLock auto_lock(outer_->lock_); |
| |
| DCHECK(!incremented_max_tasks_since_blocked_); |
| DCHECK(!incremented_max_best_effort_tasks_since_blocked_); |
| DCHECK(read_worker().blocking_start_time.is_null()); |
| write_worker().blocking_start_time = subtle::TimeTicksNowIgnoringOverride(); |
| |
| if (incremented_max_tasks_for_shutdown_) { |
| return; |
| } |
| |
| if (*read_any().current_task_priority == TaskPriority::BEST_EFFORT) { |
| ++outer_->num_unresolved_best_effort_may_block_; |
| } |
| |
| if (blocking_type == BlockingType::WILL_BLOCK) { |
| incremented_max_tasks_since_blocked_ = true; |
| outer_->IncrementMaxTasksLockRequired(); |
| outer_->EnsureEnoughWorkersLockRequired(&executor); |
| } else { |
| ++outer_->num_unresolved_may_block_; |
| } |
| |
| outer_->MaybeScheduleAdjustMaxTasksLockRequired(&executor); |
| } |
| |
| void ThreadGroupImpl::WorkerDelegate::BlockingTypeUpgraded() { |
| DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); |
| // Skip if this blocking scope happened outside of a RunTask. |
| if (!read_worker().current_task_priority) { |
| return; |
| } |
| |
| // The blocking type always being WILL_BLOCK in this experiment and with |
| // time overrides, it should never be considered "upgraded". |
| if (base::subtle::ScopedTimeClockOverrides::overrides_active()) { |
| return; |
| } |
| |
| ScopedCommandsExecutor executor(outer_.get()); |
| CheckedAutoLock auto_lock(outer_->lock_); |
| |
| // Don't do anything if a MAY_BLOCK ScopedBlockingCall instantiated in the |
| // same scope already caused the max tasks to be incremented. |
| if (incremented_max_tasks_since_blocked_) { |
| return; |
| } |
| |
| // Cancel the effect of a MAY_BLOCK ScopedBlockingCall instantiated in the |
| // same scope. |
| --outer_->num_unresolved_may_block_; |
| |
| incremented_max_tasks_since_blocked_ = true; |
| outer_->IncrementMaxTasksLockRequired(); |
| outer_->EnsureEnoughWorkersLockRequired(&executor); |
| } |
| |
| void ThreadGroupImpl::WorkerDelegate::BlockingEnded() { |
| DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); |
| // Skip if this blocking scope happened outside of a RunTask. |
| if (!read_worker().current_task_priority) { |
| return; |
| } |
| |
| CheckedAutoLock auto_lock(outer_->lock_); |
| DCHECK(!read_worker().blocking_start_time.is_null()); |
| write_worker().blocking_start_time = TimeTicks(); |
| if (!incremented_max_tasks_for_shutdown_) { |
| if (incremented_max_tasks_since_blocked_) { |
| outer_->DecrementMaxTasksLockRequired(); |
| } else { |
| --outer_->num_unresolved_may_block_; |
| } |
| |
| if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT) { |
| if (incremented_max_best_effort_tasks_since_blocked_) { |
| outer_->DecrementMaxBestEffortTasksLockRequired(); |
| } else { |
| --outer_->num_unresolved_best_effort_may_block_; |
| } |
| } |
| } |
| |
| incremented_max_tasks_since_blocked_ = false; |
| incremented_max_best_effort_tasks_since_blocked_ = false; |
| } |
| |
| // BlockingObserver: |
| // Notifies the worker of shutdown, possibly marking the running task as |
| // MAY_BLOCK. |
| void ThreadGroupImpl::WorkerDelegate::OnShutdownStartedLockRequired( |
| BaseScopedCommandsExecutor* executor) { |
| if (!read_any().is_running_task()) { |
| return; |
| } |
| // Workers running a CONTINUE_ON_SHUTDOWN tasks are replaced by incrementing |
| // max_tasks/max_best_effort_tasks. The effect is reverted in |
| // DidProcessTask(). |
| if (*read_any().current_shutdown_behavior == |
| TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) { |
| incremented_max_tasks_for_shutdown_ = true; |
| IncrementMaxTasksLockRequired(); |
| } |
| } |
| |
| // Increments max [best effort] tasks iff this worker has been within a |
| // ScopedBlockingCall for more than |may_block_threshold|. |
| void ThreadGroupImpl::WorkerDelegate::MaybeIncrementMaxTasksLockRequired() |
| EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) { |
| if (read_any().blocking_start_time.is_null() || |
| subtle::TimeTicksNowIgnoringOverride() - read_any().blocking_start_time < |
| outer_->after_start().may_block_threshold) { |
| return; |
| } |
| IncrementMaxTasksLockRequired(); |
| } |
| |
| // Increments max [best effort] tasks. |
| void ThreadGroupImpl::WorkerDelegate::IncrementMaxTasksLockRequired() |
| EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) { |
| if (!incremented_max_tasks_since_blocked_) { |
| outer_->IncrementMaxTasksLockRequired(); |
| // Update state for an unresolved ScopedBlockingCall. |
| if (!read_any().blocking_start_time.is_null()) { |
| incremented_max_tasks_since_blocked_ = true; |
| --outer_->num_unresolved_may_block_; |
| } |
| } |
| if (*read_any().current_task_priority == TaskPriority::BEST_EFFORT && |
| !incremented_max_best_effort_tasks_since_blocked_) { |
| outer_->IncrementMaxBestEffortTasksLockRequired(); |
| // Update state for an unresolved ScopedBlockingCall. |
| if (!read_any().blocking_start_time.is_null()) { |
| incremented_max_best_effort_tasks_since_blocked_ = true; |
| --outer_->num_unresolved_best_effort_may_block_; |
| } |
| } |
| } |
| |
| void ThreadGroupImpl::JoinForTesting() { |
| // profiler needs to shutdown first to not block worker thread joins. |
| if (thread_group_profiler_) { |
| thread_group_profiler_->Shutdown(); |
| } |
| decltype(workers_) workers_copy; |
| { |
| CheckedAutoLock auto_lock(lock_); |
| priority_queue_.EnableFlushTaskSourcesOnDestroyForTesting(); |
| |
| DCHECK_GT(workers_.size(), size_t(0)) |
| << "Joined an unstarted thread group."; |
| |
| join_for_testing_started_ = true; |
| |
| // Ensure WorkerThreads in |workers_| do not attempt to cleanup while |
| // being joined. |
| worker_cleanup_disallowed_for_testing_ = true; |
| |
| // Make a copy of the WorkerThreads so that we can call |
| // WorkerThread::JoinForTesting() without holding |lock_| since |
| // WorkerThreads may need to access |workers_|. |
| workers_copy = workers_; |
| } |
| for (const auto& worker : workers_copy) { |
| static_cast<WorkerThread*>(worker.get())->JoinForTesting(); |
| } |
| |
| CheckedAutoLock auto_lock(lock_); |
| DCHECK(workers_ == workers_copy); |
| // Release |workers_| to clear their TrackedRef against |this|. |
| workers_.clear(); |
| } |
| |
| size_t ThreadGroupImpl::NumberOfIdleWorkersLockRequiredForTesting() const { |
| return idle_workers_set_.Size(); |
| } |
| |
| void ThreadGroupImpl::MaintainAtLeastOneIdleWorkerLockRequired( |
| ScopedCommandsExecutor* executor) { |
| if (workers_.size() == kMaxNumberOfWorkers) { |
| return; |
| } |
| DCHECK_LT(workers_.size(), kMaxNumberOfWorkers); |
| |
| if (!idle_workers_set_.IsEmpty()) { |
| return; |
| } |
| |
| if (workers_.size() >= max_tasks_) { |
| return; |
| } |
| |
| scoped_refptr<WorkerThread> new_worker = |
| CreateAndRegisterWorkerLockRequired(executor); |
| DCHECK(new_worker); |
| idle_workers_set_.Insert(new_worker.get()); |
| } |
| |
| scoped_refptr<WorkerThread> |
| ThreadGroupImpl::CreateAndRegisterWorkerLockRequired( |
| ScopedCommandsExecutor* executor) { |
| DCHECK(!join_for_testing_started_); |
| DCHECK_LT(workers_.size(), max_tasks_); |
| DCHECK_LT(workers_.size(), kMaxNumberOfWorkers); |
| DCHECK(idle_workers_set_.IsEmpty()); |
| |
| // WorkerThread needs |lock_| as a predecessor for its thread lock because in |
| // GetWork(), |lock_| is first acquired and then the thread lock is acquired |
| // when GetLastUsedTime() is called on the worker by CanGetWorkLockRequired(). |
| scoped_refptr<WorkerThread> worker = MakeRefCounted<WorkerThread>( |
| thread_type_hint_, |
| std::make_unique<WorkerDelegate>( |
| tracked_ref_factory_.GetTrackedRef(), |
| /* is_excess=*/workers_.size() >= after_start().initial_max_tasks), |
| task_tracker_, worker_sequence_num_++, &lock_); |
| |
| workers_.push_back(worker); |
| executor->ScheduleStart(worker); |
| DCHECK_LE(workers_.size(), max_tasks_); |
| |
| return worker; |
| } |
| |
| size_t ThreadGroupImpl::GetNumAwakeWorkersLockRequired() const { |
| DCHECK_GE(workers_.size(), idle_workers_set_.Size()); |
| size_t num_awake_workers = workers_.size() - idle_workers_set_.Size(); |
| DCHECK_GE(num_awake_workers, num_running_tasks_); |
| return num_awake_workers; |
| } |
| |
| void ThreadGroupImpl::DidUpdateCanRunPolicy() { |
| ScopedCommandsExecutor executor(this); |
| CheckedAutoLock auto_lock(lock_); |
| EnsureEnoughWorkersLockRequired(&executor); |
| } |
| |
| void ThreadGroupImpl::OnShutdownStarted() { |
| ScopedCommandsExecutor executor(this); |
| CheckedAutoLock auto_lock(lock_); |
| |
| // Don't do anything if the thread group isn't started. |
| if (max_tasks_ == 0) { |
| return; |
| } |
| if (join_for_testing_started_) [[unlikely]] { |
| return; |
| } |
| |
| if (thread_group_profiler_) { |
| thread_group_profiler_->Shutdown(); |
| } |
| |
| // Start a MAY_BLOCK scope on each worker that is already running a task. |
| for (scoped_refptr<WorkerThread>& worker : workers_) { |
| // The delegates of workers inside a ThreadGroupImpl should be |
| // `WorkerDelegate`s. |
| WorkerDelegate* delegate = static_cast<WorkerDelegate*>(worker->delegate()); |
| AnnotateAcquiredLockAlias annotate(lock_, delegate->lock()); |
| delegate->OnShutdownStartedLockRequired(&executor); |
| } |
| EnsureEnoughWorkersLockRequired(&executor); |
| |
| shutdown_started_ = true; |
| } |
| |
| void ThreadGroupImpl::EnsureEnoughWorkersLockRequired( |
| BaseScopedCommandsExecutor* base_executor) { |
| // Don't do anything if the thread group isn't started. |
| if (max_tasks_ == 0) { |
| return; |
| } |
| #if DCHECK_IS_ON() |
| // CHECK() that Start() is complete, if workers are to be created. |
| after_start(); |
| #endif |
| if (join_for_testing_started_) [[unlikely]] { |
| return; |
| } |
| |
| ScopedCommandsExecutor* executor = |
| static_cast<ScopedCommandsExecutor*>(base_executor); |
| |
| const size_t desired_num_awake_workers = |
| GetDesiredNumAwakeWorkersLockRequired(); |
| const size_t num_awake_workers = GetNumAwakeWorkersLockRequired(); |
| |
| size_t num_workers_to_wake_up = |
| ClampSub(desired_num_awake_workers, num_awake_workers); |
| num_workers_to_wake_up = std::min(num_workers_to_wake_up, size_t(2U)); |
| |
| // Wake up the appropriate number of workers. |
| for (size_t i = 0; i < num_workers_to_wake_up; ++i) { |
| MaintainAtLeastOneIdleWorkerLockRequired(executor); |
| WorkerThread* worker_to_wakeup = idle_workers_set_.Take(); |
| DCHECK(worker_to_wakeup); |
| executor->ScheduleWakeUp(worker_to_wakeup); |
| } |
| |
| // In the case where the loop above didn't wake up any worker and we don't |
| // have excess workers, the idle worker should be maintained. This happens |
| // when called from the last worker awake, or a recent increase in |max_tasks| |
| // now makes it possible to keep an idle worker. |
| if (desired_num_awake_workers == num_awake_workers) { |
| MaintainAtLeastOneIdleWorkerLockRequired(executor); |
| } |
| |
| // This function is called every time a task source is (re-)enqueued, |
| // hence the minimum priority needs to be updated. |
| UpdateMinAllowedPriorityLockRequired(); |
| |
| // Ensure that the number of workers is periodically adjusted if needed. |
| MaybeScheduleAdjustMaxTasksLockRequired(executor); |
| } |
| |
| bool ThreadGroupImpl::IsOnIdleSetLockRequired(WorkerThread* worker) const { |
| // To avoid searching through the idle set : use GetLastUsedTime() not being |
| // null (or being directly on top of the idle set) as a proxy for being on |
| // the idle set. |
| return idle_workers_set_.Peek() == worker || |
| !worker->GetLastUsedTime().is_null(); |
| } |
| |
| void ThreadGroupImpl::ScheduleAdjustMaxTasks() { |
| // |adjust_max_tasks_posted_| can't change before the task posted below runs. |
| DCHECK(TS_UNCHECKED_READ(adjust_max_tasks_posted_)); |
| |
| after_start().service_thread_task_runner->PostDelayedTask( |
| FROM_HERE, BindOnce(&ThreadGroupImpl::AdjustMaxTasks, Unretained(this)), |
| after_start().blocked_workers_poll_period); |
| } |
| |
| void ThreadGroupImpl::AdjustMaxTasks() { |
| DCHECK( |
| after_start().service_thread_task_runner->RunsTasksInCurrentSequence()); |
| |
| ScopedCommandsExecutor executor(this); |
| CheckedAutoLock auto_lock(lock_); |
| DCHECK(adjust_max_tasks_posted_); |
| adjust_max_tasks_posted_ = false; |
| |
| // Increment max tasks for each worker that has been within a MAY_BLOCK |
| // ScopedBlockingCall for more than may_block_threshold. |
| for (scoped_refptr<WorkerThread> worker : workers_) { |
| // The delegates of workers inside a ThreadGroupImpl should be |
| // `WorkerDelegate`s. |
| WorkerDelegate* delegate = static_cast<WorkerDelegate*>(worker->delegate()); |
| AnnotateAcquiredLockAlias annotate(lock_, delegate->lock()); |
| delegate->MaybeIncrementMaxTasksLockRequired(); |
| } |
| |
| // Wake up workers according to the updated |max_tasks_|. This will also |
| // reschedule AdjustMaxTasks() if necessary. |
| EnsureEnoughWorkersLockRequired(&executor); |
| } |
| |
| } // namespace base::internal |