| // Copyright 2017 The Chromium Authors |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "base/task/thread_pool/thread_group.h" |
| |
| #include <string_view> |
| #include <utility> |
| |
| #include "base/auto_reset.h" |
| #include "base/check.h" |
| #include "base/functional/bind.h" |
| #include "base/functional/callback_helpers.h" |
| #include "base/strings/stringprintf.h" |
| #include "base/task/task_features.h" |
| #include "base/task/thread_pool/task_tracker.h" |
| #include "build/build_config.h" |
| |
| #if BUILDFLAG(IS_WIN) |
| #include "base/win/com_init_check_hook.h" |
| #include "base/win/scoped_winrt_initializer.h" |
| #endif |
| |
| namespace base::internal { |
| |
| namespace { |
| |
| // ThreadGroup that owns the current thread, if any. |
| constinit thread_local const ThreadGroup* current_thread_group = nullptr; |
| |
| } // namespace |
| |
| constexpr ThreadGroup::YieldSortKey ThreadGroup::kMaxYieldSortKey; |
| |
| void ThreadGroup::BaseScopedCommandsExecutor::ScheduleReleaseTaskSource( |
| RegisteredTaskSource task_source) { |
| task_sources_to_release_.push_back(std::move(task_source)); |
| } |
| |
| void ThreadGroup::BaseScopedCommandsExecutor::ScheduleAdjustMaxTasks() { |
| DCHECK(!must_schedule_adjust_max_tasks_); |
| must_schedule_adjust_max_tasks_ = true; |
| } |
| |
| void ThreadGroup::BaseScopedCommandsExecutor::ScheduleStart( |
| scoped_refptr<WorkerThread> worker) { |
| workers_to_start_.emplace_back(std::move(worker)); |
| } |
| |
| ThreadGroup::BaseScopedCommandsExecutor::BaseScopedCommandsExecutor( |
| ThreadGroup* outer) |
| : outer_(outer) {} |
| |
| ThreadGroup::BaseScopedCommandsExecutor::~BaseScopedCommandsExecutor() { |
| CheckedLock::AssertNoLockHeldOnCurrentThread(); |
| Flush(); |
| } |
| |
| void ThreadGroup::BaseScopedCommandsExecutor::Flush() { |
| // Start workers. Happens after wake ups (implemented by children and thus |
| // called on their destructor, i.e. before this) to prevent the case where a |
| // worker enters its main function, is descheduled because it wasn't woken up |
| // yet, and is woken up immediately after. |
| for (auto& worker : workers_to_start_) { |
| worker->Start(outer_->after_start().service_thread_task_runner, |
| outer_->after_start().worker_thread_observer); |
| if (outer_->worker_started_for_testing_) { |
| outer_->worker_started_for_testing_->Wait(); |
| } |
| } |
| workers_to_start_.clear(); |
| |
| if (must_schedule_adjust_max_tasks_) { |
| outer_->ScheduleAdjustMaxTasks(); |
| } |
| } |
| |
| ThreadGroup::ScopedReenqueueExecutor::ScopedReenqueueExecutor() = default; |
| |
| ThreadGroup::ScopedReenqueueExecutor::~ScopedReenqueueExecutor() { |
| if (destination_thread_group_) { |
| destination_thread_group_->PushTaskSourceAndWakeUpWorkers( |
| std::move(transaction_with_task_source_.value())); |
| } |
| } |
| |
| void ThreadGroup::ScopedReenqueueExecutor:: |
| SchedulePushTaskSourceAndWakeUpWorkers( |
| RegisteredTaskSourceAndTransaction transaction_with_task_source, |
| ThreadGroup* destination_thread_group) { |
| DCHECK(destination_thread_group); |
| DCHECK(!destination_thread_group_); |
| DCHECK(!transaction_with_task_source_); |
| transaction_with_task_source_.emplace( |
| std::move(transaction_with_task_source)); |
| destination_thread_group_ = destination_thread_group; |
| } |
| |
| ThreadGroup::ThreadGroup(std::string_view histogram_label, |
| std::string_view thread_group_label, |
| ThreadType thread_type_hint, |
| TrackedRef<TaskTracker> task_tracker, |
| TrackedRef<Delegate> delegate) |
| : task_tracker_(std::move(task_tracker)), |
| delegate_(std::move(delegate)), |
| unnecessary_wakeup_histogram_label_( |
| StringPrintf("ThreadPool.UnnecessaryWakeup.%s", histogram_label)), |
| thread_group_label_(thread_group_label), |
| thread_type_hint_(thread_type_hint), |
| idle_workers_set_cv_for_testing_(lock_.CreateConditionVariable()) { |
| DCHECK(!thread_group_label_.empty()); |
| } |
| |
| void ThreadGroup::StartImplLockRequired( |
| size_t max_tasks, |
| size_t max_best_effort_tasks, |
| TimeDelta suggested_reclaim_time, |
| scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner, |
| WorkerThreadObserver* worker_thread_observer, |
| WorkerEnvironment worker_environment, |
| TimeDelta may_block_threshold, |
| TimeDelta blocked_workers_poll_period, |
| bool synchronous_thread_start_for_testing) { |
| if (synchronous_thread_start_for_testing) { |
| worker_started_for_testing_.emplace(WaitableEvent::ResetPolicy::AUTOMATIC); |
| // Don't emit a ScopedBlockingCallWithBaseSyncPrimitives from this |
| // WaitableEvent or it defeats the purpose of having threads start without |
| // externally visible side-effects. |
| worker_started_for_testing_->declare_only_used_while_idle(); |
| } |
| |
| in_start().may_block_threshold = may_block_threshold; |
| in_start().blocked_workers_poll_period = blocked_workers_poll_period; |
| |
| max_tasks_ = max_tasks; |
| baseline_max_tasks_ = max_tasks; |
| DCHECK_GE(max_tasks_, 1U); |
| in_start().initial_max_tasks = std::min(max_tasks_, kMaxNumberOfWorkers); |
| max_best_effort_tasks_ = max_best_effort_tasks; |
| in_start().suggested_reclaim_time = suggested_reclaim_time; |
| in_start().worker_environment = worker_environment; |
| in_start().service_thread_task_runner = std::move(service_thread_task_runner); |
| in_start().worker_thread_observer = worker_thread_observer; |
| |
| #if DCHECK_IS_ON() |
| in_start().initialized = true; |
| #endif |
| } |
| |
| ThreadGroup::~ThreadGroup() = default; |
| |
| void ThreadGroup::BindToCurrentThread() { |
| DCHECK(!CurrentThreadHasGroup()); |
| current_thread_group = this; |
| } |
| |
| void ThreadGroup::UnbindFromCurrentThread() { |
| DCHECK(IsBoundToCurrentThread()); |
| current_thread_group = nullptr; |
| } |
| |
| bool ThreadGroup::IsBoundToCurrentThread() const { |
| return current_thread_group == this; |
| } |
| |
| void ThreadGroup::SetMaxTasks(size_t max_tasks) { |
| CheckedAutoLock auto_lock(lock_); |
| size_t extra_tasks = max_tasks_ - baseline_max_tasks_; |
| baseline_max_tasks_ = std::min(max_tasks, after_start().initial_max_tasks); |
| max_tasks_ = baseline_max_tasks_ + extra_tasks; |
| } |
| |
| void ThreadGroup::ResetMaxTasks() { |
| SetMaxTasks(after_start().initial_max_tasks); |
| } |
| |
| size_t |
| ThreadGroup::GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() |
| const { |
| // For simplicity, only 1 worker is assigned to each task source regardless of |
| // its max concurrency, with the exception of the top task source. |
| const size_t num_queued = priority_queue_.GetNumBackgroundTaskSources(); |
| if (num_queued == 0 || |
| !task_tracker_->CanRunThreadType(ThreadType::kBackground)) { |
| return 0U; |
| } |
| if (priority_queue_.PeekSortKey().thread_type() == ThreadType::kBackground) { |
| // Assign the correct number of workers for the top TaskSource (-1 for the |
| // worker that is already accounted for in |num_queued|). |
| return std::max<size_t>( |
| 1, num_queued + |
| priority_queue_.PeekTaskSource()->GetRemainingConcurrency() - 1); |
| } |
| return num_queued; |
| } |
| |
| size_t |
| ThreadGroup::GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired() |
| const { |
| // For simplicity, only 1 worker is assigned to each task source regardless of |
| // its max concurrency, with the exception of the top task source. |
| const size_t num_queued = priority_queue_.GetNumForegroundTaskSources(); |
| if (num_queued == 0 || |
| !task_tracker_->CanRunThreadType(ThreadType::kDefault)) { |
| return 0U; |
| } |
| auto thread_type = priority_queue_.PeekSortKey().thread_type(); |
| if (thread_type != ThreadType::kBackground) { |
| // Assign the correct number of workers for the top TaskSource (-1 for the |
| // worker that is already accounted for in |num_queued|). |
| return std::max<size_t>( |
| 1, num_queued + |
| priority_queue_.PeekTaskSource()->GetRemainingConcurrency() - 1); |
| } |
| return num_queued; |
| } |
| |
| RegisteredTaskSource ThreadGroup::RemoveTaskSource( |
| const TaskSource& task_source) { |
| CheckedAutoLock auto_lock(lock_); |
| return priority_queue_.RemoveTaskSource(task_source); |
| } |
| |
| void ThreadGroup::ReEnqueueTaskSourceLockRequired( |
| BaseScopedCommandsExecutor* workers_executor, |
| ScopedReenqueueExecutor* reenqueue_executor, |
| RegisteredTaskSourceAndTransaction transaction_with_task_source) { |
| // Decide in which thread group the TaskSource should be reenqueued. |
| auto& [task_source, transaction] = transaction_with_task_source; |
| ThreadGroup* destination_thread_group = delegate_->GetThreadGroup( |
| transaction.thread_type(), task_source->thread_policy()); |
| |
| bool push_to_immediate_queue = |
| task_source.WillReEnqueue(TimeTicks::Now(), &transaction); |
| |
| if (destination_thread_group == this) { |
| // Another worker that was running a task from this task source may have |
| // reenqueued it already, in which case its heap_handle will be valid. It |
| // shouldn't be queued twice so the task source registration is released. |
| if (task_source->immediate_heap_handle().IsValid()) { |
| workers_executor->ScheduleReleaseTaskSource(std::move(task_source)); |
| } else { |
| // If the TaskSource should be reenqueued in the current thread group, |
| // reenqueue it inside the scope of the lock. |
| if (push_to_immediate_queue) { |
| auto sort_key = task_source->GetSortKey(); |
| // When moving |task_source| into |priority_queue_|, it may be destroyed |
| // on another thread as soon as |lock_| is released, since we're no |
| // longer holding a reference to it. To prevent UAF, release |
| // |transaction| before moving |task_source|. Ref. crbug.com/1412008 |
| transaction.Release(); |
| priority_queue_.Push(std::move(task_source), sort_key); |
| } |
| } |
| // This is called unconditionally to ensure there are always workers to run |
| // task sources in the queue. Some ThreadGroup implementations only invoke |
| // TakeRegisteredTaskSource() once per wake up and hence this is required to |
| // avoid races that could leave a task source stranded in the queue with no |
| // active workers. |
| EnsureEnoughWorkersLockRequired(workers_executor); |
| } else { |
| // Otherwise, schedule a reenqueue after releasing the lock. |
| reenqueue_executor->SchedulePushTaskSourceAndWakeUpWorkers( |
| std::move(transaction_with_task_source), destination_thread_group); |
| } |
| } |
| |
| RegisteredTaskSource ThreadGroup::TakeRegisteredTaskSource( |
| BaseScopedCommandsExecutor* executor) { |
| DCHECK(!priority_queue_.IsEmpty()); |
| |
| auto run_status = priority_queue_.PeekTaskSource().WillRunTask(); |
| |
| if (run_status == TaskSource::RunStatus::kDisallowed) { |
| executor->ScheduleReleaseTaskSource(priority_queue_.PopTaskSource()); |
| return nullptr; |
| } |
| |
| if (run_status == TaskSource::RunStatus::kAllowedSaturated) { |
| return priority_queue_.PopTaskSource(); |
| } |
| |
| // If the TaskSource isn't saturated, check whether TaskTracker allows it to |
| // remain in the PriorityQueue. |
| // The canonical way of doing this is to pop the task source to return, call |
| // RegisterTaskSource() to get an additional RegisteredTaskSource, and |
| // reenqueue that task source if valid. Instead, it is cheaper and equivalent |
| // to peek the task source, call RegisterTaskSource() to get an additional |
| // RegisteredTaskSource to replace if valid, and only pop |priority_queue_| |
| // otherwise. |
| RegisteredTaskSource task_source = |
| task_tracker_->RegisterTaskSource(priority_queue_.PeekTaskSource().get()); |
| if (!task_source) { |
| return priority_queue_.PopTaskSource(); |
| } |
| // Replace the top task_source and then update the queue. |
| std::swap(priority_queue_.PeekTaskSource(), task_source); |
| priority_queue_.UpdateSortKey(*task_source.get(), task_source->GetSortKey()); |
| return task_source; |
| } |
| |
| void ThreadGroup::UpdateSortKeyImpl(BaseScopedCommandsExecutor* executor, |
| TaskSource::Transaction transaction) { |
| CheckedAutoLock auto_lock(lock_); |
| priority_queue_.UpdateSortKey(*transaction.task_source(), |
| transaction.task_source()->GetSortKey()); |
| EnsureEnoughWorkersLockRequired(executor); |
| } |
| |
| void ThreadGroup::PushTaskSourceAndWakeUpWorkersImpl( |
| BaseScopedCommandsExecutor* executor, |
| RegisteredTaskSourceAndTransaction transaction_with_task_source) { |
| auto& [task_source, transaction] = transaction_with_task_source; |
| DCHECK_EQ(delegate_->GetThreadGroup(transaction.thread_type(), |
| task_source->thread_policy()), |
| this); |
| CheckedAutoLock lock(lock_); |
| if (task_source->immediate_heap_handle().IsValid()) { |
| // If the task source changed group, it is possible that multiple concurrent |
| // workers try to enqueue it. Only the first enqueue should succeed. |
| executor->ScheduleReleaseTaskSource(std::move(task_source)); |
| return; |
| } |
| auto sort_key = task_source->GetSortKey(); |
| // When moving |task_source| into |priority_queue_|, it may be destroyed |
| // on another thread as soon as |lock_| is released, since we're no longer |
| // holding a reference to it. To prevent UAF, release |transaction| before |
| // moving |task_source|. Ref. crbug.com/1412008 |
| transaction.Release(); |
| priority_queue_.Push(std::move(task_source), sort_key); |
| EnsureEnoughWorkersLockRequired(executor); |
| } |
| |
| void ThreadGroup::EnqueueAllTaskSources(PriorityQueue* new_priority_queue) { |
| CheckedAutoLock lock(lock_); |
| while (!new_priority_queue->IsEmpty()) { |
| TaskSourceSortKey top_sort_key = new_priority_queue->PeekSortKey(); |
| RegisteredTaskSource task_source = new_priority_queue->PopTaskSource(); |
| priority_queue_.Push(std::move(task_source), top_sort_key); |
| } |
| } |
| |
| void ThreadGroup::HandoffTaskSourcesToOtherThreadGroupAtLeastThreadType( |
| ThreadType min_thread_type, |
| ThreadGroup* destination_thread_group) { |
| PriorityQueue new_priority_queue; |
| TaskSourceSortKey top_sort_key; |
| { |
| // Since tasks are ordered by decreasing priority, this is implemented by |
| // moving all tasks with priority higher or equal to `min_thread_type` to a |
| // `new_priority_queue`. |
| CheckedAutoLock current_thread_group_lock(lock_); |
| while (!priority_queue_.IsEmpty() && |
| (top_sort_key = priority_queue_.PeekSortKey()).thread_type() >= |
| min_thread_type) { |
| new_priority_queue.Push(priority_queue_.PopTaskSource(), top_sort_key); |
| } |
| } |
| destination_thread_group->EnqueueAllTaskSources(&new_priority_queue); |
| } |
| |
| void ThreadGroup::HandoffTaskSourcesToOtherThreadGroupAtMostThreadType( |
| ThreadType max_thread_type, |
| ThreadGroup* destination_thread_group) { |
| PriorityQueue new_priority_queue; |
| TaskSourceSortKey top_sort_key; |
| { |
| // Since tasks are ordered by decreasing priority, this is implemented by |
| // moving all tasks with priority higher than `max_thread_type` to a new |
| // queue, and then swapping it with `priority_queue_`. Thus all tasks with |
| // priority lower or equal to `max_thread_type` will end up in |
| // `new_priority_queue`. |
| CheckedAutoLock current_thread_group_lock(lock_); |
| while (!priority_queue_.IsEmpty() && |
| (top_sort_key = priority_queue_.PeekSortKey()).thread_type() > |
| max_thread_type) { |
| new_priority_queue.Push(priority_queue_.PopTaskSource(), top_sort_key); |
| } |
| new_priority_queue.swap(priority_queue_); |
| } |
| destination_thread_group->EnqueueAllTaskSources(&new_priority_queue); |
| } |
| |
| bool ThreadGroup::ShouldYield(TaskSourceSortKey sort_key) { |
| DCHECK(TS_UNCHECKED_READ(max_allowed_sort_key_).is_lock_free()); |
| |
| if (!task_tracker_->CanRunThreadType(sort_key.thread_type())) { |
| return true; |
| } |
| // It is safe to read |max_allowed_sort_key_| without a lock since this |
| // variable is atomic, keeping in mind that threads may not immediately see |
| // the new value when it is updated. |
| auto max_allowed_sort_key = |
| TS_UNCHECKED_READ(max_allowed_sort_key_).load(std::memory_order_relaxed); |
| |
| // To reduce unnecessary yielding, a task will never yield to a BEST_EFFORT |
| // task regardless of its worker_count. |
| if (sort_key.thread_type() > max_allowed_sort_key.thread_type || |
| max_allowed_sort_key.thread_type == ThreadType::kBackground) { |
| return false; |
| } |
| // Otherwise, a task only yields to a task of equal priority if its |
| // worker_count would be greater still after yielding, e.g. a job with 1 |
| // worker doesn't yield to a job with 0 workers. |
| if (sort_key.thread_type() == max_allowed_sort_key.thread_type && |
| sort_key.worker_count() <= max_allowed_sort_key.worker_count + 1) { |
| return false; |
| } |
| |
| // Reset |max_allowed_sort_key_| so that only one thread should yield at a |
| // time for a given task. |
| max_allowed_sort_key = |
| TS_UNCHECKED_READ(max_allowed_sort_key_) |
| .exchange(kMaxYieldSortKey, std::memory_order_relaxed); |
| // Another thread might have decided to yield and racily reset |
| // |max_allowed_sort_key_|, in which case this thread doesn't yield. |
| return max_allowed_sort_key.thread_type != ThreadType::kBackground; |
| } |
| |
| #if BUILDFLAG(IS_WIN) |
| // static |
| std::unique_ptr<win::ScopedWindowsThreadEnvironment> |
| ThreadGroup::GetScopedWindowsThreadEnvironment(WorkerEnvironment environment) { |
| std::unique_ptr<win::ScopedWindowsThreadEnvironment> scoped_environment; |
| if (environment == WorkerEnvironment::COM_MTA) { |
| scoped_environment = std::make_unique<win::ScopedWinrtInitializer>(); |
| } |
| // Continuing execution with an uninitialized apartment may lead to broken |
| // program invariants later on. |
| CHECK(!scoped_environment || scoped_environment->Succeeded()); |
| return scoped_environment; |
| } |
| #endif |
| |
| // static |
| bool ThreadGroup::CurrentThreadHasGroup() { |
| return current_thread_group != nullptr; |
| } |
| |
| size_t ThreadGroup::GetMaxTasksForTesting() const { |
| CheckedAutoLock auto_lock(lock_); |
| return max_tasks_; |
| } |
| |
| size_t ThreadGroup::GetMaxBestEffortTasksForTesting() const { |
| CheckedAutoLock auto_lock(lock_); |
| return max_best_effort_tasks_; |
| } |
| |
| void ThreadGroup::WaitForWorkersIdleLockRequiredForTesting(size_t n) { |
| // Make sure workers do not cleanup while watching the idle count. |
| AutoReset<bool> ban_cleanups(&worker_cleanup_disallowed_for_testing_, true); |
| |
| while (NumberOfIdleWorkersLockRequiredForTesting() < n) { |
| idle_workers_set_cv_for_testing_.Wait(); |
| } |
| } |
| |
| void ThreadGroup::WaitForWorkersIdleForTesting(size_t n) { |
| CheckedAutoLock auto_lock(lock_); |
| |
| #if DCHECK_IS_ON() |
| DCHECK(!some_workers_cleaned_up_for_testing_) |
| << "Workers detached prior to waiting for a specific number of idle " |
| "workers. Doing the wait under such conditions is flaky. Consider " |
| "setting the suggested reclaim time to TimeDelta::Max() in Start()."; |
| #endif |
| |
| WaitForWorkersIdleLockRequiredForTesting(n); |
| } |
| |
| void ThreadGroup::WaitForAllWorkersIdleForTesting() { |
| CheckedAutoLock auto_lock(lock_); |
| WaitForWorkersIdleLockRequiredForTesting(workers_.size()); |
| } |
| |
| void ThreadGroup::WaitForWorkersCleanedUpForTesting(size_t n) { |
| CheckedAutoLock auto_lock(lock_); |
| |
| if (!num_workers_cleaned_up_for_testing_cv_) { |
| lock_.CreateConditionVariableAndEmplace( |
| num_workers_cleaned_up_for_testing_cv_); |
| } |
| |
| while (num_workers_cleaned_up_for_testing_ < n) { |
| num_workers_cleaned_up_for_testing_cv_->Wait(); |
| } |
| |
| num_workers_cleaned_up_for_testing_ = 0; |
| } |
| |
| size_t ThreadGroup::GetMaxConcurrentNonBlockedTasksDeprecated() const { |
| #if DCHECK_IS_ON() |
| CheckedAutoLock auto_lock(lock_); |
| DCHECK_NE(after_start().initial_max_tasks, 0U) |
| << "GetMaxConcurrentTasksDeprecated() should only be called after the " |
| << "thread group has started."; |
| #endif |
| return after_start().initial_max_tasks; |
| } |
| |
| size_t ThreadGroup::NumberOfWorkersForTesting() const { |
| CheckedAutoLock auto_lock(lock_); |
| return workers_.size(); |
| } |
| |
| size_t ThreadGroup::NumberOfIdleWorkersForTesting() const { |
| CheckedAutoLock auto_lock(lock_); |
| return NumberOfIdleWorkersLockRequiredForTesting(); |
| } |
| |
| size_t ThreadGroup::GetDesiredNumAwakeWorkersLockRequired() const { |
| // Number of BEST_EFFORT task sources that are running or queued and allowed |
| // to run by the CanRunPolicy. |
| const size_t num_running_or_queued_can_run_best_effort_task_sources = |
| num_running_best_effort_tasks_ + |
| GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired(); |
| |
| const size_t workers_for_best_effort_task_sources = |
| std::max(std::min(num_running_or_queued_can_run_best_effort_task_sources, |
| max_best_effort_tasks_), |
| num_running_best_effort_tasks_); |
| |
| // Number of USER_{VISIBLE|BLOCKING} task sources that are running or queued. |
| const size_t num_running_or_queued_foreground_task_sources = |
| (num_running_tasks_ - num_running_best_effort_tasks_) + |
| GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired(); |
| |
| const size_t workers_for_foreground_task_sources = |
| num_running_or_queued_foreground_task_sources; |
| |
| return std::min({workers_for_best_effort_task_sources + |
| workers_for_foreground_task_sources, |
| max_tasks_, kMaxNumberOfWorkers}); |
| } |
| |
| void ThreadGroup::MaybeScheduleAdjustMaxTasksLockRequired( |
| BaseScopedCommandsExecutor* executor) { |
| if (!adjust_max_tasks_posted_ && |
| ShouldPeriodicallyAdjustMaxTasksLockRequired()) { |
| executor->ScheduleAdjustMaxTasks(); |
| adjust_max_tasks_posted_ = true; |
| } |
| } |
| |
| bool ThreadGroup::ShouldPeriodicallyAdjustMaxTasksLockRequired() { |
| // AdjustMaxTasks() should be scheduled to periodically adjust |max_tasks_| |
| // and |max_best_effort_tasks_| when (1) the concurrency limits are not large |
| // enough to accommodate all queued and running task sources and an idle |
| // worker and (2) there are unresolved MAY_BLOCK ScopedBlockingCalls. |
| // - When (1) is false: No worker would be created or woken up if the |
| // concurrency limits were increased, so there is no hurry to increase them. |
| // - When (2) is false: The concurrency limits could not be increased by |
| // AdjustMaxTasks(). |
| |
| const size_t num_running_or_queued_best_effort_task_sources = |
| num_running_best_effort_tasks_ + |
| GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired(); |
| if (num_running_or_queued_best_effort_task_sources > max_best_effort_tasks_ && |
| num_unresolved_best_effort_may_block_ > 0) { |
| return true; |
| } |
| |
| const size_t num_running_or_queued_task_sources = |
| num_running_tasks_ + |
| GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() + |
| GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired(); |
| constexpr size_t kIdleWorker = 1; |
| return num_running_or_queued_task_sources + kIdleWorker > max_tasks_ && |
| num_unresolved_may_block_ > 0; |
| } |
| |
| void ThreadGroup::UpdateMinAllowedPriorityLockRequired() { |
| if (priority_queue_.IsEmpty() || num_running_tasks_ < max_tasks_) { |
| max_allowed_sort_key_.store(kMaxYieldSortKey, std::memory_order_relaxed); |
| } else { |
| max_allowed_sort_key_.store({priority_queue_.PeekSortKey().thread_type(), |
| priority_queue_.PeekSortKey().worker_count()}, |
| std::memory_order_relaxed); |
| } |
| } |
| |
| void ThreadGroup::DecrementTasksRunningLockRequired(ThreadType thread_type) { |
| DCHECK_GT(num_running_tasks_, 0U); |
| --num_running_tasks_; |
| if (thread_type == ThreadType::kBackground) { |
| DCHECK_GT(num_running_best_effort_tasks_, 0U); |
| --num_running_best_effort_tasks_; |
| } |
| UpdateMinAllowedPriorityLockRequired(); |
| } |
| |
| void ThreadGroup::IncrementTasksRunningLockRequired(ThreadType thread_type) { |
| ++num_running_tasks_; |
| DCHECK_LE(num_running_tasks_, max_tasks_); |
| DCHECK_LE(num_running_tasks_, kMaxNumberOfWorkers); |
| if (thread_type == ThreadType::kBackground) { |
| ++num_running_best_effort_tasks_; |
| DCHECK_LE(num_running_best_effort_tasks_, num_running_tasks_); |
| DCHECK_LE(num_running_best_effort_tasks_, max_best_effort_tasks_); |
| } |
| UpdateMinAllowedPriorityLockRequired(); |
| } |
| |
| void ThreadGroup::DecrementMaxTasksLockRequired() { |
| DCHECK_GT(num_running_tasks_, 0U); |
| DCHECK_GT(max_tasks_, 0U); |
| --max_tasks_; |
| UpdateMinAllowedPriorityLockRequired(); |
| } |
| |
| void ThreadGroup::IncrementMaxTasksLockRequired() { |
| DCHECK_GT(num_running_tasks_, 0U); |
| ++max_tasks_; |
| UpdateMinAllowedPriorityLockRequired(); |
| } |
| |
| void ThreadGroup::DecrementMaxBestEffortTasksLockRequired() { |
| DCHECK_GT(num_running_tasks_, 0U); |
| DCHECK_GT(max_best_effort_tasks_, 0U); |
| --max_best_effort_tasks_; |
| UpdateMinAllowedPriorityLockRequired(); |
| } |
| |
| void ThreadGroup::IncrementMaxBestEffortTasksLockRequired() { |
| DCHECK_GT(num_running_tasks_, 0U); |
| ++max_best_effort_tasks_; |
| UpdateMinAllowedPriorityLockRequired(); |
| } |
| |
| ThreadGroup::InitializedInStart::InitializedInStart() = default; |
| ThreadGroup::InitializedInStart::~InitializedInStart() = default; |
| |
| } // namespace base::internal |