| // Copyright 2016 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "base/task_scheduler/scheduler_worker_pool_impl.h" |
| |
| #include <stddef.h> |
| |
| #include <algorithm> |
| #include <utility> |
| |
| #include "base/atomicops.h" |
| #include "base/bind.h" |
| #include "base/bind_helpers.h" |
| #include "base/lazy_instance.h" |
| #include "base/memory/ptr_util.h" |
| #include "base/sequenced_task_runner.h" |
| #include "base/single_thread_task_runner.h" |
| #include "base/strings/stringprintf.h" |
| #include "base/task_scheduler/delayed_task_manager.h" |
| #include "base/task_scheduler/task_tracker.h" |
| #include "base/threading/platform_thread.h" |
| #include "base/threading/thread_local.h" |
| #include "base/threading/thread_restrictions.h" |
| #include "base/time/time.h" |
| |
| namespace base { |
| namespace internal { |
| |
| namespace { |
| |
| // SchedulerWorker that owns the current thread, if any. |
| LazyInstance<ThreadLocalPointer<const SchedulerWorker>>::Leaky |
| tls_current_worker = LAZY_INSTANCE_INITIALIZER; |
| |
| // SchedulerWorkerPool that owns the current thread, if any. |
| LazyInstance<ThreadLocalPointer<const SchedulerWorkerPool>>::Leaky |
| tls_current_worker_pool = LAZY_INSTANCE_INITIALIZER; |
| |
| // A task runner that runs tasks with the PARALLEL ExecutionMode. |
| class SchedulerParallelTaskRunner : public TaskRunner { |
| public: |
| // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so |
| // long as |worker_pool| is alive. |
| // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. |
| SchedulerParallelTaskRunner(const TaskTraits& traits, |
| SchedulerWorkerPool* worker_pool) |
| : traits_(traits), worker_pool_(worker_pool) { |
| DCHECK(worker_pool_); |
| } |
| |
| // TaskRunner: |
| bool PostDelayedTask(const tracked_objects::Location& from_here, |
| const Closure& closure, |
| TimeDelta delay) override { |
| // Post the task as part of a one-off single-task Sequence. |
| return worker_pool_->PostTaskWithSequence( |
| WrapUnique(new Task(from_here, closure, traits_, delay)), |
| make_scoped_refptr(new Sequence), nullptr); |
| } |
| |
| bool RunsTasksOnCurrentThread() const override { |
| return tls_current_worker_pool.Get().Get() == worker_pool_; |
| } |
| |
| private: |
| ~SchedulerParallelTaskRunner() override = default; |
| |
| const TaskTraits traits_; |
| SchedulerWorkerPool* const worker_pool_; |
| |
| DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner); |
| }; |
| |
| // A task runner that runs tasks with the SEQUENCED ExecutionMode. |
| class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
| public: |
| // Constructs a SchedulerSequencedTaskRunner which can be used to post tasks |
| // so long as |worker_pool| is alive. |
| // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. |
| SchedulerSequencedTaskRunner(const TaskTraits& traits, |
| SchedulerWorkerPool* worker_pool) |
| : traits_(traits), worker_pool_(worker_pool) { |
| DCHECK(worker_pool_); |
| } |
| |
| // SequencedTaskRunner: |
| bool PostDelayedTask(const tracked_objects::Location& from_here, |
| const Closure& closure, |
| TimeDelta delay) override { |
| std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); |
| task->sequenced_task_runner_ref = this; |
| |
| // Post the task as part of |sequence_|. |
| return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, |
| nullptr); |
| } |
| |
| bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
| const Closure& closure, |
| base::TimeDelta delay) override { |
| // Tasks are never nested within the task scheduler. |
| return PostDelayedTask(from_here, closure, delay); |
| } |
| |
| bool RunsTasksOnCurrentThread() const override { |
| return tls_current_worker_pool.Get().Get() == worker_pool_; |
| } |
| |
| private: |
| ~SchedulerSequencedTaskRunner() override = default; |
| |
| // Sequence for all Tasks posted through this TaskRunner. |
| const scoped_refptr<Sequence> sequence_ = new Sequence; |
| |
| const TaskTraits traits_; |
| SchedulerWorkerPool* const worker_pool_; |
| |
| DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); |
| }; |
| |
| // Only used in DCHECKs. |
| bool ContainsWorker( |
| const std::vector<std::unique_ptr<SchedulerWorker>>& workers, |
| const SchedulerWorker* worker) { |
| auto it = std::find_if(workers.begin(), workers.end(), |
| [worker](const std::unique_ptr<SchedulerWorker>& i) { |
| return i.get() == worker; |
| }); |
| return it != workers.end(); |
| } |
| |
| } // namespace |
| |
| // A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. |
| class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner : |
| public SingleThreadTaskRunner { |
| public: |
| // Constructs a SchedulerSingleThreadTaskRunner which can be used to post |
| // tasks so long as |worker_pool| and |worker| are alive. |
| // TODO(robliao): Find a concrete way to manage the memory of |worker_pool| |
| // and |worker|. |
| SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
| SchedulerWorkerPool* worker_pool, |
| SchedulerWorker* worker); |
| |
| // SingleThreadTaskRunner: |
| bool PostDelayedTask(const tracked_objects::Location& from_here, |
| const Closure& closure, |
| TimeDelta delay) override { |
| std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); |
| task->single_thread_task_runner_ref = this; |
| |
| // Post the task to be executed by |worker_| as part of |sequence_|. |
| return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, |
| worker_); |
| } |
| |
| bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
| const Closure& closure, |
| base::TimeDelta delay) override { |
| // Tasks are never nested within the task scheduler. |
| return PostDelayedTask(from_here, closure, delay); |
| } |
| |
| bool RunsTasksOnCurrentThread() const override { |
| return tls_current_worker.Get().Get() == worker_; |
| } |
| |
| private: |
| ~SchedulerSingleThreadTaskRunner() override; |
| |
| // Sequence for all Tasks posted through this TaskRunner. |
| const scoped_refptr<Sequence> sequence_ = new Sequence; |
| |
| const TaskTraits traits_; |
| SchedulerWorkerPool* const worker_pool_; |
| SchedulerWorker* const worker_; |
| |
| DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
| }; |
| |
| class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
| : public SchedulerWorker::Delegate { |
| public: |
| // |outer| owns the worker for which this delegate is constructed. |
| // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is |
| // called with a non-single-threaded Sequence. |shared_priority_queue| is a |
| // PriorityQueue whose transactions may overlap with the worker's |
| // single-threaded PriorityQueue's transactions. |index| will be appended to |
| // the pool name to label the underlying worker threads. |
| SchedulerWorkerDelegateImpl( |
| SchedulerWorkerPoolImpl* outer, |
| const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
| const PriorityQueue* shared_priority_queue, |
| int index); |
| ~SchedulerWorkerDelegateImpl() override; |
| |
| PriorityQueue* single_threaded_priority_queue() { |
| return &single_threaded_priority_queue_; |
| } |
| |
| // SchedulerWorker::Delegate: |
| void OnMainEntry(SchedulerWorker* worker) override; |
| scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override; |
| void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; |
| TimeDelta GetSleepTimeout() override; |
| bool CanDetach(SchedulerWorker* worker) override; |
| |
| void RegisterSingleThreadTaskRunner() { |
| subtle::Barrier_AtomicIncrement(&num_single_threaded_runners_, 1); |
| } |
| |
| void UnregisterSingleThreadTaskRunner() { |
| subtle::Barrier_AtomicIncrement(&num_single_threaded_runners_, -1); |
| } |
| |
| private: |
| SchedulerWorkerPoolImpl* outer_; |
| const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; |
| |
| // Single-threaded PriorityQueue for the worker. |
| PriorityQueue single_threaded_priority_queue_; |
| |
| // True if the last Sequence returned by GetWork() was extracted from |
| // |single_threaded_priority_queue_|. |
| bool last_sequence_is_single_threaded_ = false; |
| |
| // Time when GetWork() first returned nullptr. |
| TimeTicks idle_start_time_; |
| |
| subtle::Atomic32 num_single_threaded_runners_ = 0; |
| |
| const int index_; |
| |
| DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); |
| }; |
| |
| SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { |
| // SchedulerWorkerPool should never be deleted in production unless its |
| // initialization failed. |
| DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); |
| } |
| |
| // static |
| std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( |
| const SchedulerWorkerPoolParams& params, |
| const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
| TaskTracker* task_tracker, |
| DelayedTaskManager* delayed_task_manager) { |
| std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool( |
| new SchedulerWorkerPoolImpl(params.name(), |
| params.io_restriction(), |
| params.suggested_reclaim_time(), |
| task_tracker, delayed_task_manager)); |
| if (worker_pool->Initialize(params.thread_priority(), |
| params.max_threads(), |
| re_enqueue_sequence_callback)) { |
| return worker_pool; |
| } |
| return nullptr; |
| } |
| |
| void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { |
| AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| while (idle_workers_stack_.Size() < workers_.size()) |
| idle_workers_stack_cv_for_testing_->Wait(); |
| } |
| |
| void SchedulerWorkerPoolImpl::JoinForTesting() { |
| DCHECK(!CanWorkerDetachForTesting() || suggested_reclaim_time_.is_max()) << |
| "Workers can detach during join."; |
| for (const auto& worker : workers_) |
| worker->JoinForTesting(); |
| |
| DCHECK(!join_for_testing_returned_.IsSignaled()); |
| join_for_testing_returned_.Signal(); |
| } |
| |
| void SchedulerWorkerPoolImpl::DisallowWorkerDetachmentForTesting() { |
| AutoSchedulerLock auto_lock(worker_detachment_allowed_lock_); |
| worker_detachment_allowed_ = false; |
| } |
| |
| scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( |
| const TaskTraits& traits, |
| ExecutionMode execution_mode) { |
| switch (execution_mode) { |
| case ExecutionMode::PARALLEL: |
| return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); |
| |
| case ExecutionMode::SEQUENCED: |
| return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); |
| |
| case ExecutionMode::SINGLE_THREADED: { |
| // TODO(fdoray): Find a way to take load into account when assigning a |
| // SchedulerWorker to a SingleThreadTaskRunner. Also, this code |
| // assumes that all SchedulerWorkers are alive. Eventually, we might |
| // decide to tear down threads that haven't run tasks for a long time. |
| size_t worker_index; |
| { |
| AutoSchedulerLock auto_lock(next_worker_index_lock_); |
| worker_index = next_worker_index_; |
| next_worker_index_ = (next_worker_index_ + 1) % workers_.size(); |
| } |
| return make_scoped_refptr(new SchedulerSingleThreadTaskRunner( |
| traits, this, workers_[worker_index].get())); |
| } |
| } |
| |
| NOTREACHED(); |
| return nullptr; |
| } |
| |
| void SchedulerWorkerPoolImpl::ReEnqueueSequence( |
| scoped_refptr<Sequence> sequence, |
| const SequenceSortKey& sequence_sort_key) { |
| shared_priority_queue_.BeginTransaction()->Push(std::move(sequence), |
| sequence_sort_key); |
| |
| // The thread calling this method just ran a Task from |sequence| and will |
| // soon try to get another Sequence from which to run a Task. If the thread |
| // belongs to this pool, it will get that Sequence from |
| // |shared_priority_queue_|. When that's the case, there is no need to wake up |
| // another worker after |sequence| is inserted in |shared_priority_queue_|. If |
| // we did wake up another worker, we would waste resources by having more |
| // workers trying to get a Sequence from |shared_priority_queue_| than the |
| // number of Sequences in it. |
| if (tls_current_worker_pool.Get().Get() != this) |
| WakeUpOneWorker(); |
| } |
| |
| bool SchedulerWorkerPoolImpl::PostTaskWithSequence( |
| std::unique_ptr<Task> task, |
| scoped_refptr<Sequence> sequence, |
| SchedulerWorker* worker) { |
| DCHECK(task); |
| DCHECK(sequence); |
| DCHECK(!worker || ContainsWorker(workers_, worker)); |
| |
| if (!task_tracker_->WillPostTask(task.get())) |
| return false; |
| |
| if (task->delayed_run_time.is_null()) { |
| PostTaskWithSequenceNow(std::move(task), std::move(sequence), worker); |
| } else { |
| delayed_task_manager_->AddDelayedTask(std::move(task), std::move(sequence), |
| worker, this); |
| } |
| |
| return true; |
| } |
| |
| void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow( |
| std::unique_ptr<Task> task, |
| scoped_refptr<Sequence> sequence, |
| SchedulerWorker* worker) { |
| DCHECK(task); |
| DCHECK(sequence); |
| DCHECK(!worker || ContainsWorker(workers_, worker)); |
| |
| // Confirm that |task| is ready to run (its delayed run time is either null or |
| // in the past). |
| DCHECK_LE(task->delayed_run_time, delayed_task_manager_->Now()); |
| |
| // Because |worker| belongs to this worker pool, we know that the type |
| // of its delegate is SchedulerWorkerDelegateImpl. |
| PriorityQueue* const priority_queue = |
| worker |
| ? static_cast<SchedulerWorkerDelegateImpl*>(worker->delegate()) |
| ->single_threaded_priority_queue() |
| : &shared_priority_queue_; |
| DCHECK(priority_queue); |
| |
| const bool sequence_was_empty = sequence->PushTask(std::move(task)); |
| if (sequence_was_empty) { |
| // Insert |sequence| in |priority_queue| if it was empty before |task| was |
| // inserted into it. Otherwise, one of these must be true: |
| // - |sequence| is already in a PriorityQueue (not necessarily |
| // |shared_priority_queue_|), or, |
| // - A worker is running a Task from |sequence|. It will insert |sequence| |
| // in a PriorityQueue once it's done running the Task. |
| const auto sequence_sort_key = sequence->GetSortKey(); |
| priority_queue->BeginTransaction()->Push(std::move(sequence), |
| sequence_sort_key); |
| |
| // Wake up a worker to process |sequence|. |
| if (worker) |
| worker->WakeUp(); |
| else |
| WakeUpOneWorker(); |
| } |
| } |
| |
| SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: |
| SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
| SchedulerWorkerPool* worker_pool, |
| SchedulerWorker* worker) |
| : traits_(traits), |
| worker_pool_(worker_pool), |
| worker_(worker) { |
| DCHECK(worker_pool_); |
| DCHECK(worker_); |
| static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> |
| RegisterSingleThreadTaskRunner(); |
| } |
| |
| SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: |
| ~SchedulerSingleThreadTaskRunner() { |
| static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> |
| UnregisterSingleThreadTaskRunner(); |
| } |
| |
| SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| SchedulerWorkerDelegateImpl( |
| SchedulerWorkerPoolImpl* outer, |
| const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
| const PriorityQueue* shared_priority_queue, |
| int index) |
| : outer_(outer), |
| re_enqueue_sequence_callback_(re_enqueue_sequence_callback), |
| single_threaded_priority_queue_(shared_priority_queue), |
| index_(index) {} |
| |
| SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| ~SchedulerWorkerDelegateImpl() = default; |
| |
| void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( |
| SchedulerWorker* worker) { |
| #if DCHECK_IS_ON() |
| // Wait for |outer_->workers_created_| to avoid traversing |
| // |outer_->workers_| while it is being filled by Initialize(). |
| outer_->workers_created_.Wait(); |
| DCHECK(ContainsWorker(outer_->workers_, worker)); |
| #endif |
| |
| PlatformThread::SetName( |
| StringPrintf("%sWorker%d", outer_->name_.c_str(), index_)); |
| |
| DCHECK(!tls_current_worker.Get().Get()); |
| DCHECK(!tls_current_worker_pool.Get().Get()); |
| tls_current_worker.Get().Set(worker); |
| tls_current_worker_pool.Get().Set(outer_); |
| |
| // New threads haven't run GetWork() yet, so reset the idle_start_time_. |
| idle_start_time_ = TimeTicks(); |
| |
| ThreadRestrictions::SetIOAllowed( |
| outer_->io_restriction_ == |
| SchedulerWorkerPoolParams::IORestriction::ALLOWED); |
| } |
| |
| scoped_refptr<Sequence> |
| SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork( |
| SchedulerWorker* worker) { |
| DCHECK(ContainsWorker(outer_->workers_, worker)); |
| |
| scoped_refptr<Sequence> sequence; |
| { |
| std::unique_ptr<PriorityQueue::Transaction> shared_transaction( |
| outer_->shared_priority_queue_.BeginTransaction()); |
| std::unique_ptr<PriorityQueue::Transaction> single_threaded_transaction( |
| single_threaded_priority_queue_.BeginTransaction()); |
| |
| if (shared_transaction->IsEmpty() && |
| single_threaded_transaction->IsEmpty()) { |
| single_threaded_transaction.reset(); |
| |
| // |shared_transaction| is kept alive while |worker| is added to |
| // |idle_workers_stack_| to avoid this race: |
| // 1. This thread creates a Transaction, finds |shared_priority_queue_| |
| // empty and ends the Transaction. |
| // 2. Other thread creates a Transaction, inserts a Sequence into |
| // |shared_priority_queue_| and ends the Transaction. This can't happen |
| // if the Transaction of step 1 is still active because because there |
| // can only be one active Transaction per PriorityQueue at a time. |
| // 3. Other thread calls WakeUpOneWorker(). No thread is woken up because |
| // |idle_workers_stack_| is empty. |
| // 4. This thread adds itself to |idle_workers_stack_| and goes to sleep. |
| // No thread runs the Sequence inserted in step 2. |
| outer_->AddToIdleWorkersStack(worker); |
| if (idle_start_time_.is_null()) |
| idle_start_time_ = TimeTicks::Now(); |
| return nullptr; |
| } |
| |
| // True if both PriorityQueues have Sequences and the Sequence at the top of |
| // the shared PriorityQueue is more important. |
| const bool shared_sequence_is_more_important = |
| !shared_transaction->IsEmpty() && |
| !single_threaded_transaction->IsEmpty() && |
| shared_transaction->PeekSortKey() > |
| single_threaded_transaction->PeekSortKey(); |
| |
| if (single_threaded_transaction->IsEmpty() || |
| shared_sequence_is_more_important) { |
| sequence = shared_transaction->PopSequence(); |
| last_sequence_is_single_threaded_ = false; |
| } else { |
| DCHECK(!single_threaded_transaction->IsEmpty()); |
| sequence = single_threaded_transaction->PopSequence(); |
| last_sequence_is_single_threaded_ = true; |
| } |
| } |
| DCHECK(sequence); |
| |
| idle_start_time_ = TimeTicks(); |
| |
| outer_->RemoveFromIdleWorkersStack(worker); |
| return sequence; |
| } |
| |
| void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| ReEnqueueSequence(scoped_refptr<Sequence> sequence) { |
| if (last_sequence_is_single_threaded_) { |
| // A single-threaded Sequence is always re-enqueued in the single-threaded |
| // PriorityQueue from which it was extracted. |
| const SequenceSortKey sequence_sort_key = sequence->GetSortKey(); |
| single_threaded_priority_queue_.BeginTransaction()->Push( |
| std::move(sequence), sequence_sort_key); |
| } else { |
| // |re_enqueue_sequence_callback_| will determine in which PriorityQueue |
| // |sequence| must be enqueued. |
| re_enqueue_sequence_callback_.Run(std::move(sequence)); |
| } |
| } |
| |
| TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| GetSleepTimeout() { |
| return outer_->suggested_reclaim_time_; |
| } |
| |
| bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( |
| SchedulerWorker* worker) { |
| // It's not an issue if |num_single_threaded_runners_| is incremented after |
| // this because the newly created TaskRunner (from which no task has run yet) |
| // will simply run all its tasks on the next physical thread created by the |
| // worker. |
| const bool can_detach = |
| !idle_start_time_.is_null() && |
| (TimeTicks::Now() - idle_start_time_) > outer_->suggested_reclaim_time_ && |
| worker != outer_->PeekAtIdleWorkersStack() && |
| !subtle::Acquire_Load(&num_single_threaded_runners_) && |
| outer_->CanWorkerDetachForTesting(); |
| return can_detach; |
| } |
| |
| SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( |
| StringPiece name, |
| SchedulerWorkerPoolParams::IORestriction io_restriction, |
| const TimeDelta& suggested_reclaim_time, |
| TaskTracker* task_tracker, |
| DelayedTaskManager* delayed_task_manager) |
| : name_(name.as_string()), |
| io_restriction_(io_restriction), |
| suggested_reclaim_time_(suggested_reclaim_time), |
| idle_workers_stack_lock_(shared_priority_queue_.container_lock()), |
| idle_workers_stack_cv_for_testing_( |
| idle_workers_stack_lock_.CreateConditionVariable()), |
| join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, |
| WaitableEvent::InitialState::NOT_SIGNALED), |
| #if DCHECK_IS_ON() |
| workers_created_(WaitableEvent::ResetPolicy::MANUAL, |
| WaitableEvent::InitialState::NOT_SIGNALED), |
| #endif |
| task_tracker_(task_tracker), |
| delayed_task_manager_(delayed_task_manager) { |
| DCHECK(task_tracker_); |
| DCHECK(delayed_task_manager_); |
| } |
| |
| bool SchedulerWorkerPoolImpl::Initialize( |
| ThreadPriority thread_priority, |
| size_t max_threads, |
| const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { |
| AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| |
| DCHECK(workers_.empty()); |
| |
| for (size_t i = 0; i < max_threads; ++i) { |
| std::unique_ptr<SchedulerWorker> worker = |
| SchedulerWorker::Create( |
| thread_priority, WrapUnique(new SchedulerWorkerDelegateImpl( |
| this, re_enqueue_sequence_callback, |
| &shared_priority_queue_, static_cast<int>(i))), |
| task_tracker_, |
| i == 0 |
| ? SchedulerWorker::InitialState::ALIVE |
| : SchedulerWorker::InitialState::DETACHED); |
| if (!worker) |
| break; |
| idle_workers_stack_.Push(worker.get()); |
| workers_.push_back(std::move(worker)); |
| } |
| |
| #if DCHECK_IS_ON() |
| workers_created_.Signal(); |
| #endif |
| |
| return !workers_.empty(); |
| } |
| |
| void SchedulerWorkerPoolImpl::WakeUpOneWorker() { |
| SchedulerWorker* worker; |
| { |
| AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| worker = idle_workers_stack_.Pop(); |
| } |
| if (worker) |
| worker->WakeUp(); |
| } |
| |
| void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( |
| SchedulerWorker* worker) { |
| AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| // Detachment may cause multiple attempts to add because the delegate cannot |
| // determine who woke it up. As a result, when it wakes up, it may conclude |
| // there's no work to be done and attempt to add itself to the idle stack |
| // again. |
| if (!idle_workers_stack_.Contains(worker)) |
| idle_workers_stack_.Push(worker); |
| |
| DCHECK_LE(idle_workers_stack_.Size(), workers_.size()); |
| |
| if (idle_workers_stack_.Size() == workers_.size()) |
| idle_workers_stack_cv_for_testing_->Broadcast(); |
| } |
| |
| const SchedulerWorker* SchedulerWorkerPoolImpl::PeekAtIdleWorkersStack() const { |
| AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| return idle_workers_stack_.Peek(); |
| } |
| |
| void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack( |
| SchedulerWorker* worker) { |
| AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| idle_workers_stack_.Remove(worker); |
| } |
| |
| bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { |
| AutoSchedulerLock auto_lock(worker_detachment_allowed_lock_); |
| return worker_detachment_allowed_; |
| } |
| |
| } // namespace internal |
| } // namespace base |