|  | // Copyright 2016 The Chromium Authors. All rights reserved. | 
|  | // Use of this source code is governed by a BSD-style license that can be | 
|  | // found in the LICENSE file. | 
|  |  | 
|  | #include "base/task_scheduler/scheduler_worker_pool_impl.h" | 
|  |  | 
|  | #include <stddef.h> | 
|  |  | 
|  | #include <algorithm> | 
|  | #include <utility> | 
|  |  | 
|  | #include "base/bind.h" | 
|  | #include "base/bind_helpers.h" | 
|  | #include "base/lazy_instance.h" | 
|  | #include "base/memory/ptr_util.h" | 
|  | #include "base/sequenced_task_runner.h" | 
|  | #include "base/single_thread_task_runner.h" | 
|  | #include "base/strings/stringprintf.h" | 
|  | #include "base/task_scheduler/delayed_task_manager.h" | 
|  | #include "base/task_scheduler/task_tracker.h" | 
|  | #include "base/threading/platform_thread.h" | 
|  | #include "base/threading/thread_local.h" | 
|  | #include "base/threading/thread_restrictions.h" | 
|  |  | 
|  | namespace base { | 
|  | namespace internal { | 
|  |  | 
|  | namespace { | 
|  |  | 
|  | // SchedulerWorker that owns the current thread, if any. | 
|  | LazyInstance<ThreadLocalPointer<const SchedulerWorker>>::Leaky | 
|  | tls_current_worker = LAZY_INSTANCE_INITIALIZER; | 
|  |  | 
|  | // SchedulerWorkerPool that owns the current thread, if any. | 
|  | LazyInstance<ThreadLocalPointer<const SchedulerWorkerPool>>::Leaky | 
|  | tls_current_worker_pool = LAZY_INSTANCE_INITIALIZER; | 
|  |  | 
|  | // A task runner that runs tasks with the PARALLEL ExecutionMode. | 
|  | class SchedulerParallelTaskRunner : public TaskRunner { | 
|  | public: | 
|  | // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so | 
|  | // long as |worker_pool| is alive. | 
|  | // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. | 
|  | SchedulerParallelTaskRunner(const TaskTraits& traits, | 
|  | SchedulerWorkerPool* worker_pool) | 
|  | : traits_(traits), worker_pool_(worker_pool) {} | 
|  |  | 
|  | // TaskRunner: | 
|  | bool PostDelayedTask(const tracked_objects::Location& from_here, | 
|  | const Closure& closure, | 
|  | TimeDelta delay) override { | 
|  | // Post the task as part of a one-off single-task Sequence. | 
|  | return worker_pool_->PostTaskWithSequence( | 
|  | WrapUnique(new Task(from_here, closure, traits_, delay)), | 
|  | make_scoped_refptr(new Sequence), nullptr); | 
|  | } | 
|  |  | 
|  | bool RunsTasksOnCurrentThread() const override { | 
|  | return tls_current_worker_pool.Get().Get() == worker_pool_; | 
|  | } | 
|  |  | 
|  | private: | 
|  | ~SchedulerParallelTaskRunner() override = default; | 
|  |  | 
|  | const TaskTraits traits_; | 
|  | SchedulerWorkerPool* const worker_pool_; | 
|  |  | 
|  | DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner); | 
|  | }; | 
|  |  | 
|  | // A task runner that runs tasks with the SEQUENCED ExecutionMode. | 
|  | class SchedulerSequencedTaskRunner : public SequencedTaskRunner { | 
|  | public: | 
|  | // Constructs a SchedulerSequencedTaskRunner which can be used to post tasks | 
|  | // so long as |worker_pool| is alive. | 
|  | // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. | 
|  | SchedulerSequencedTaskRunner(const TaskTraits& traits, | 
|  | SchedulerWorkerPool* worker_pool) | 
|  | : traits_(traits), worker_pool_(worker_pool) {} | 
|  |  | 
|  | // SequencedTaskRunner: | 
|  | bool PostDelayedTask(const tracked_objects::Location& from_here, | 
|  | const Closure& closure, | 
|  | TimeDelta delay) override { | 
|  | std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); | 
|  | task->sequenced_task_runner_ref = this; | 
|  |  | 
|  | // Post the task as part of |sequence_|. | 
|  | return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, | 
|  | nullptr); | 
|  | } | 
|  |  | 
|  | bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, | 
|  | const Closure& closure, | 
|  | base::TimeDelta delay) override { | 
|  | // Tasks are never nested within the task scheduler. | 
|  | return PostDelayedTask(from_here, closure, delay); | 
|  | } | 
|  |  | 
|  | bool RunsTasksOnCurrentThread() const override { | 
|  | return tls_current_worker_pool.Get().Get() == worker_pool_; | 
|  | } | 
|  |  | 
|  | private: | 
|  | ~SchedulerSequencedTaskRunner() override = default; | 
|  |  | 
|  | // Sequence for all Tasks posted through this TaskRunner. | 
|  | const scoped_refptr<Sequence> sequence_ = new Sequence; | 
|  |  | 
|  | const TaskTraits traits_; | 
|  | SchedulerWorkerPool* const worker_pool_; | 
|  |  | 
|  | DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); | 
|  | }; | 
|  |  | 
|  | // A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. | 
|  | class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { | 
|  | public: | 
|  | // Constructs a SchedulerSingleThreadTaskRunner which can be used to post | 
|  | // tasks so long as |worker_pool| and |worker| are alive. | 
|  | // TODO(robliao): Find a concrete way to manage the memory of |worker_pool| | 
|  | // and |worker|. | 
|  | SchedulerSingleThreadTaskRunner(const TaskTraits& traits, | 
|  | SchedulerWorkerPool* worker_pool, | 
|  | SchedulerWorker* worker) | 
|  | : traits_(traits), | 
|  | worker_pool_(worker_pool), | 
|  | worker_(worker) {} | 
|  |  | 
|  | // SingleThreadTaskRunner: | 
|  | bool PostDelayedTask(const tracked_objects::Location& from_here, | 
|  | const Closure& closure, | 
|  | TimeDelta delay) override { | 
|  | std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); | 
|  | task->single_thread_task_runner_ref = this; | 
|  |  | 
|  | // Post the task to be executed by |worker_| as part of |sequence_|. | 
|  | return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, | 
|  | worker_); | 
|  | } | 
|  |  | 
|  | bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, | 
|  | const Closure& closure, | 
|  | base::TimeDelta delay) override { | 
|  | // Tasks are never nested within the task scheduler. | 
|  | return PostDelayedTask(from_here, closure, delay); | 
|  | } | 
|  |  | 
|  | bool RunsTasksOnCurrentThread() const override { | 
|  | return tls_current_worker.Get().Get() == worker_; | 
|  | } | 
|  |  | 
|  | private: | 
|  | ~SchedulerSingleThreadTaskRunner() override = default; | 
|  |  | 
|  | // Sequence for all Tasks posted through this TaskRunner. | 
|  | const scoped_refptr<Sequence> sequence_ = new Sequence; | 
|  |  | 
|  | const TaskTraits traits_; | 
|  | SchedulerWorkerPool* const worker_pool_; | 
|  | SchedulerWorker* const worker_; | 
|  |  | 
|  | DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); | 
|  | }; | 
|  |  | 
|  | // Only used in DCHECKs. | 
|  | bool ContainsWorker( | 
|  | const std::vector<std::unique_ptr<SchedulerWorker>>& workers, | 
|  | const SchedulerWorker* worker) { | 
|  | auto it = std::find_if(workers.begin(), workers.end(), | 
|  | [worker](const std::unique_ptr<SchedulerWorker>& i) { | 
|  | return i.get() == worker; | 
|  | }); | 
|  | return it != workers.end(); | 
|  | } | 
|  |  | 
|  | }  // namespace | 
|  |  | 
|  | class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl | 
|  | : public SchedulerWorker::Delegate { | 
|  | public: | 
|  | // |outer| owns the worker for which this delegate is constructed. | 
|  | // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is | 
|  | // called with a non-single-threaded Sequence. |shared_priority_queue| is a | 
|  | // PriorityQueue whose transactions may overlap with the worker's | 
|  | // single-threaded PriorityQueue's transactions. |index| will be appended to | 
|  | // the pool name to label the underlying worker threads. | 
|  | SchedulerWorkerDelegateImpl( | 
|  | SchedulerWorkerPoolImpl* outer, | 
|  | const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 
|  | const PriorityQueue* shared_priority_queue, | 
|  | int index); | 
|  | ~SchedulerWorkerDelegateImpl() override; | 
|  |  | 
|  | PriorityQueue* single_threaded_priority_queue() { | 
|  | return &single_threaded_priority_queue_; | 
|  | } | 
|  |  | 
|  | // SchedulerWorker::Delegate: | 
|  | void OnMainEntry(SchedulerWorker* worker) override; | 
|  | scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override; | 
|  | void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override; | 
|  | TimeDelta GetSleepTimeout() override; | 
|  | bool CanDetach(SchedulerWorker* worker) override; | 
|  |  | 
|  | private: | 
|  | SchedulerWorkerPoolImpl* outer_; | 
|  | const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; | 
|  |  | 
|  | // Single-threaded PriorityQueue for the worker. | 
|  | PriorityQueue single_threaded_priority_queue_; | 
|  |  | 
|  | // True if the last Sequence returned by GetWork() was extracted from | 
|  | // |single_threaded_priority_queue_|. | 
|  | bool last_sequence_is_single_threaded_ = false; | 
|  |  | 
|  | const int index_; | 
|  |  | 
|  | DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); | 
|  | }; | 
|  |  | 
|  | SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { | 
|  | // SchedulerWorkerPool should never be deleted in production unless its | 
|  | // initialization failed. | 
|  | DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); | 
|  | } | 
|  |  | 
|  | // static | 
|  | std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( | 
|  | StringPiece name, | 
|  | ThreadPriority thread_priority, | 
|  | size_t max_threads, | 
|  | IORestriction io_restriction, | 
|  | const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 
|  | TaskTracker* task_tracker, | 
|  | DelayedTaskManager* delayed_task_manager) { | 
|  | std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool( | 
|  | new SchedulerWorkerPoolImpl(name, io_restriction, task_tracker, | 
|  | delayed_task_manager)); | 
|  | if (worker_pool->Initialize(thread_priority, max_threads, | 
|  | re_enqueue_sequence_callback)) { | 
|  | return worker_pool; | 
|  | } | 
|  | return nullptr; | 
|  | } | 
|  |  | 
|  | void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { | 
|  | AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 
|  | while (idle_workers_stack_.Size() < workers_.size()) | 
|  | idle_workers_stack_cv_for_testing_->Wait(); | 
|  | } | 
|  |  | 
|  | void SchedulerWorkerPoolImpl::JoinForTesting() { | 
|  | for (const auto& worker : workers_) | 
|  | worker->JoinForTesting(); | 
|  |  | 
|  | DCHECK(!join_for_testing_returned_.IsSignaled()); | 
|  | join_for_testing_returned_.Signal(); | 
|  | } | 
|  |  | 
|  | scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( | 
|  | const TaskTraits& traits, | 
|  | ExecutionMode execution_mode) { | 
|  | switch (execution_mode) { | 
|  | case ExecutionMode::PARALLEL: | 
|  | return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); | 
|  |  | 
|  | case ExecutionMode::SEQUENCED: | 
|  | return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); | 
|  |  | 
|  | case ExecutionMode::SINGLE_THREADED: { | 
|  | // TODO(fdoray): Find a way to take load into account when assigning a | 
|  | // SchedulerWorker to a SingleThreadTaskRunner. Also, this code | 
|  | // assumes that all SchedulerWorkers are alive. Eventually, we might | 
|  | // decide to tear down threads that haven't run tasks for a long time. | 
|  | size_t worker_index; | 
|  | { | 
|  | AutoSchedulerLock auto_lock(next_worker_index_lock_); | 
|  | worker_index = next_worker_index_; | 
|  | next_worker_index_ = (next_worker_index_ + 1) % workers_.size(); | 
|  | } | 
|  | return make_scoped_refptr(new SchedulerSingleThreadTaskRunner( | 
|  | traits, this, workers_[worker_index].get())); | 
|  | } | 
|  | } | 
|  |  | 
|  | NOTREACHED(); | 
|  | return nullptr; | 
|  | } | 
|  |  | 
|  | void SchedulerWorkerPoolImpl::ReEnqueueSequence( | 
|  | scoped_refptr<Sequence> sequence, | 
|  | const SequenceSortKey& sequence_sort_key) { | 
|  | shared_priority_queue_.BeginTransaction()->Push(std::move(sequence), | 
|  | sequence_sort_key); | 
|  |  | 
|  | // The thread calling this method just ran a Task from |sequence| and will | 
|  | // soon try to get another Sequence from which to run a Task. If the thread | 
|  | // belongs to this pool, it will get that Sequence from | 
|  | // |shared_priority_queue_|. When that's the case, there is no need to wake up | 
|  | // another worker after |sequence| is inserted in |shared_priority_queue_|. If | 
|  | // we did wake up another worker, we would waste resources by having more | 
|  | // workers trying to get a Sequence from |shared_priority_queue_| than the | 
|  | // number of Sequences in it. | 
|  | if (tls_current_worker_pool.Get().Get() != this) | 
|  | WakeUpOneWorker(); | 
|  | } | 
|  |  | 
|  | bool SchedulerWorkerPoolImpl::PostTaskWithSequence( | 
|  | std::unique_ptr<Task> task, | 
|  | scoped_refptr<Sequence> sequence, | 
|  | SchedulerWorker* worker) { | 
|  | DCHECK(task); | 
|  | DCHECK(sequence); | 
|  | DCHECK(!worker || ContainsWorker(workers_, worker)); | 
|  |  | 
|  | if (!task_tracker_->WillPostTask(task.get())) | 
|  | return false; | 
|  |  | 
|  | if (task->delayed_run_time.is_null()) { | 
|  | PostTaskWithSequenceNow(std::move(task), std::move(sequence), worker); | 
|  | } else { | 
|  | delayed_task_manager_->AddDelayedTask(std::move(task), std::move(sequence), | 
|  | worker, this); | 
|  | } | 
|  |  | 
|  | return true; | 
|  | } | 
|  |  | 
|  | void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow( | 
|  | std::unique_ptr<Task> task, | 
|  | scoped_refptr<Sequence> sequence, | 
|  | SchedulerWorker* worker) { | 
|  | DCHECK(task); | 
|  | DCHECK(sequence); | 
|  | DCHECK(!worker || ContainsWorker(workers_, worker)); | 
|  |  | 
|  | // Confirm that |task| is ready to run (its delayed run time is either null or | 
|  | // in the past). | 
|  | DCHECK_LE(task->delayed_run_time, delayed_task_manager_->Now()); | 
|  |  | 
|  | // Because |worker| belongs to this worker pool, we know that the type | 
|  | // of its delegate is SchedulerWorkerDelegateImpl. | 
|  | PriorityQueue* const priority_queue = | 
|  | worker | 
|  | ? static_cast<SchedulerWorkerDelegateImpl*>(worker->delegate()) | 
|  | ->single_threaded_priority_queue() | 
|  | : &shared_priority_queue_; | 
|  | DCHECK(priority_queue); | 
|  |  | 
|  | const bool sequence_was_empty = sequence->PushTask(std::move(task)); | 
|  | if (sequence_was_empty) { | 
|  | // Insert |sequence| in |priority_queue| if it was empty before |task| was | 
|  | // inserted into it. Otherwise, one of these must be true: | 
|  | // - |sequence| is already in a PriorityQueue (not necessarily | 
|  | //   |shared_priority_queue_|), or, | 
|  | // - A worker is running a Task from |sequence|. It will insert |sequence| | 
|  | //   in a PriorityQueue once it's done running the Task. | 
|  | const auto sequence_sort_key = sequence->GetSortKey(); | 
|  | priority_queue->BeginTransaction()->Push(std::move(sequence), | 
|  | sequence_sort_key); | 
|  |  | 
|  | // Wake up a worker to process |sequence|. | 
|  | if (worker) | 
|  | worker->WakeUp(); | 
|  | else | 
|  | WakeUpOneWorker(); | 
|  | } | 
|  | } | 
|  |  | 
|  | SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 
|  | SchedulerWorkerDelegateImpl( | 
|  | SchedulerWorkerPoolImpl* outer, | 
|  | const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 
|  | const PriorityQueue* shared_priority_queue, | 
|  | int index) | 
|  | : outer_(outer), | 
|  | re_enqueue_sequence_callback_(re_enqueue_sequence_callback), | 
|  | single_threaded_priority_queue_(shared_priority_queue), | 
|  | index_(index) {} | 
|  |  | 
|  | SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 
|  | ~SchedulerWorkerDelegateImpl() = default; | 
|  |  | 
|  | void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( | 
|  | SchedulerWorker* worker) { | 
|  | #if DCHECK_IS_ON() | 
|  | // Wait for |outer_->workers_created_| to avoid traversing | 
|  | // |outer_->workers_| while it is being filled by Initialize(). | 
|  | outer_->workers_created_.Wait(); | 
|  | DCHECK(ContainsWorker(outer_->workers_, worker)); | 
|  | #endif | 
|  |  | 
|  | PlatformThread::SetName( | 
|  | StringPrintf("%sWorker%d", outer_->name_.c_str(), index_)); | 
|  |  | 
|  | DCHECK(!tls_current_worker.Get().Get()); | 
|  | DCHECK(!tls_current_worker_pool.Get().Get()); | 
|  | tls_current_worker.Get().Set(worker); | 
|  | tls_current_worker_pool.Get().Set(outer_); | 
|  |  | 
|  | ThreadRestrictions::SetIOAllowed(outer_->io_restriction_ == | 
|  | IORestriction::ALLOWED); | 
|  | } | 
|  |  | 
|  | scoped_refptr<Sequence> | 
|  | SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork( | 
|  | SchedulerWorker* worker) { | 
|  | DCHECK(ContainsWorker(outer_->workers_, worker)); | 
|  |  | 
|  | scoped_refptr<Sequence> sequence; | 
|  | { | 
|  | std::unique_ptr<PriorityQueue::Transaction> shared_transaction( | 
|  | outer_->shared_priority_queue_.BeginTransaction()); | 
|  | std::unique_ptr<PriorityQueue::Transaction> single_threaded_transaction( | 
|  | single_threaded_priority_queue_.BeginTransaction()); | 
|  |  | 
|  | if (shared_transaction->IsEmpty() && | 
|  | single_threaded_transaction->IsEmpty()) { | 
|  | single_threaded_transaction.reset(); | 
|  |  | 
|  | // |shared_transaction| is kept alive while |worker| is added to | 
|  | // |idle_workers_stack_| to avoid this race: | 
|  | // 1. This thread creates a Transaction, finds |shared_priority_queue_| | 
|  | //    empty and ends the Transaction. | 
|  | // 2. Other thread creates a Transaction, inserts a Sequence into | 
|  | //    |shared_priority_queue_| and ends the Transaction. This can't happen | 
|  | //    if the Transaction of step 1 is still active because because there | 
|  | //    can only be one active Transaction per PriorityQueue at a time. | 
|  | // 3. Other thread calls WakeUpOneWorker(). No thread is woken up because | 
|  | //    |idle_workers_stack_| is empty. | 
|  | // 4. This thread adds itself to |idle_workers_stack_| and goes to sleep. | 
|  | //    No thread runs the Sequence inserted in step 2. | 
|  | outer_->AddToIdleWorkersStack(worker); | 
|  | return nullptr; | 
|  | } | 
|  |  | 
|  | // True if both PriorityQueues have Sequences and the Sequence at the top of | 
|  | // the shared PriorityQueue is more important. | 
|  | const bool shared_sequence_is_more_important = | 
|  | !shared_transaction->IsEmpty() && | 
|  | !single_threaded_transaction->IsEmpty() && | 
|  | shared_transaction->PeekSortKey() > | 
|  | single_threaded_transaction->PeekSortKey(); | 
|  |  | 
|  | if (single_threaded_transaction->IsEmpty() || | 
|  | shared_sequence_is_more_important) { | 
|  | sequence = shared_transaction->PopSequence(); | 
|  | last_sequence_is_single_threaded_ = false; | 
|  | } else { | 
|  | DCHECK(!single_threaded_transaction->IsEmpty()); | 
|  | sequence = single_threaded_transaction->PopSequence(); | 
|  | last_sequence_is_single_threaded_ = true; | 
|  | } | 
|  | } | 
|  | DCHECK(sequence); | 
|  |  | 
|  | outer_->RemoveFromIdleWorkersStack(worker); | 
|  | return sequence; | 
|  | } | 
|  |  | 
|  | void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 
|  | ReEnqueueSequence(scoped_refptr<Sequence> sequence) { | 
|  | if (last_sequence_is_single_threaded_) { | 
|  | // A single-threaded Sequence is always re-enqueued in the single-threaded | 
|  | // PriorityQueue from which it was extracted. | 
|  | const SequenceSortKey sequence_sort_key = sequence->GetSortKey(); | 
|  | single_threaded_priority_queue_.BeginTransaction()->Push( | 
|  | std::move(sequence), sequence_sort_key); | 
|  | } else { | 
|  | // |re_enqueue_sequence_callback_| will determine in which PriorityQueue | 
|  | // |sequence| must be enqueued. | 
|  | re_enqueue_sequence_callback_.Run(std::move(sequence)); | 
|  | } | 
|  | } | 
|  |  | 
|  | TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 
|  | GetSleepTimeout() { | 
|  | return TimeDelta::Max(); | 
|  | } | 
|  |  | 
|  | bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( | 
|  | SchedulerWorker* worker) { | 
|  | return false; | 
|  | } | 
|  |  | 
|  | SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( | 
|  | StringPiece name, | 
|  | IORestriction io_restriction, | 
|  | TaskTracker* task_tracker, | 
|  | DelayedTaskManager* delayed_task_manager) | 
|  | : name_(name.as_string()), | 
|  | io_restriction_(io_restriction), | 
|  | idle_workers_stack_lock_(shared_priority_queue_.container_lock()), | 
|  | idle_workers_stack_cv_for_testing_( | 
|  | idle_workers_stack_lock_.CreateConditionVariable()), | 
|  | join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, | 
|  | WaitableEvent::InitialState::NOT_SIGNALED), | 
|  | #if DCHECK_IS_ON() | 
|  | workers_created_(WaitableEvent::ResetPolicy::MANUAL, | 
|  | WaitableEvent::InitialState::NOT_SIGNALED), | 
|  | #endif | 
|  | task_tracker_(task_tracker), | 
|  | delayed_task_manager_(delayed_task_manager) { | 
|  | DCHECK(task_tracker_); | 
|  | DCHECK(delayed_task_manager_); | 
|  | } | 
|  |  | 
|  | bool SchedulerWorkerPoolImpl::Initialize( | 
|  | ThreadPriority thread_priority, | 
|  | size_t max_threads, | 
|  | const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { | 
|  | AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 
|  |  | 
|  | DCHECK(workers_.empty()); | 
|  |  | 
|  | for (size_t i = 0; i < max_threads; ++i) { | 
|  | std::unique_ptr<SchedulerWorker> worker = | 
|  | SchedulerWorker::Create( | 
|  | thread_priority, WrapUnique(new SchedulerWorkerDelegateImpl( | 
|  | this, re_enqueue_sequence_callback, | 
|  | &shared_priority_queue_, static_cast<int>(i))), | 
|  | task_tracker_, | 
|  | SchedulerWorker::InitialState::ALIVE); | 
|  | if (!worker) | 
|  | break; | 
|  | idle_workers_stack_.Push(worker.get()); | 
|  | workers_.push_back(std::move(worker)); | 
|  | } | 
|  |  | 
|  | #if DCHECK_IS_ON() | 
|  | workers_created_.Signal(); | 
|  | #endif | 
|  |  | 
|  | return !workers_.empty(); | 
|  | } | 
|  |  | 
|  | void SchedulerWorkerPoolImpl::WakeUpOneWorker() { | 
|  | SchedulerWorker* worker; | 
|  | { | 
|  | AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 
|  | worker = idle_workers_stack_.Pop(); | 
|  | } | 
|  | if (worker) | 
|  | worker->WakeUp(); | 
|  | } | 
|  |  | 
|  | void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( | 
|  | SchedulerWorker* worker) { | 
|  | AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 
|  | idle_workers_stack_.Push(worker); | 
|  | DCHECK_LE(idle_workers_stack_.Size(), workers_.size()); | 
|  |  | 
|  | if (idle_workers_stack_.Size() == workers_.size()) | 
|  | idle_workers_stack_cv_for_testing_->Broadcast(); | 
|  | } | 
|  |  | 
|  | void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack( | 
|  | SchedulerWorker* worker) { | 
|  | AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 
|  | idle_workers_stack_.Remove(worker); | 
|  | } | 
|  |  | 
|  | }  // namespace internal | 
|  | }  // namespace base |