| // Copyright 2013 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "cc/worker_pool.h" |
| |
| #include "base/bind.h" |
| #include "base/debug/trace_event.h" |
| #include "base/stringprintf.h" |
| #include "base/synchronization/condition_variable.h" |
| #include "base/threading/simple_thread.h" |
| #include "cc/rendering_stats.h" |
| |
| #if defined(OS_ANDROID) |
| // TODO(epenner): Move thread priorities to base. (crbug.com/170549) |
| #include <sys/resource.h> |
| #endif |
| |
| namespace cc { |
| |
| namespace { |
| |
| class WorkerPoolTaskImpl : public internal::WorkerPoolTask { |
| public: |
| WorkerPoolTaskImpl(const WorkerPool::Callback& task, |
| const base::Closure& reply) |
| : internal::WorkerPoolTask(reply), |
| task_(task) {} |
| |
| virtual bool IsCheap() OVERRIDE { return false; } |
| |
| virtual void WillRunOnThread(unsigned thread_index) OVERRIDE {} |
| |
| virtual void Run(RenderingStats* rendering_stats) OVERRIDE { |
| task_.Run(rendering_stats); |
| } |
| |
| private: |
| WorkerPool::Callback task_; |
| }; |
| |
| } // namespace |
| |
| namespace internal { |
| |
| WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) { |
| } |
| |
| WorkerPoolTask::~WorkerPoolTask() { |
| } |
| |
| void WorkerPoolTask::DidComplete() { |
| reply_.Run(); |
| } |
| |
| } // namespace internal |
| |
| // Internal to the worker pool. Any data or logic that needs to be |
| // shared between threads lives in this class. All members are guarded |
| // by |lock_|. |
| class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { |
| public: |
| Inner(WorkerPool* worker_pool, |
| size_t num_threads, |
| const std::string& thread_name_prefix, |
| bool need_on_task_completed_callback); |
| ~Inner(); |
| |
| void Shutdown(); |
| |
| void SetRecordRenderingStats(bool record_rendering_stats); |
| |
| void GetRenderingStats(RenderingStats* stats); |
| |
| void PostTask(scoped_ptr<internal::WorkerPoolTask> task); |
| |
| // Appends all completed tasks to worker pool's completed tasks queue |
| // and returns true if idle. |
| bool CollectCompletedTasks(); |
| |
| // Runs cheap tasks on caller thread until |time_limit| is reached |
| // and returns true if idle. |
| bool RunCheapTasksUntilTimeLimit(base::TimeTicks time_limit); |
| |
| private: |
| // Appends all completed tasks to |completed_tasks|. Lock must |
| // already be acquired before calling this function. |
| bool AppendCompletedTasksWithLockAcquired( |
| ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks); |
| |
| // Schedule a OnTaskCompletedOnOriginThread callback if not already |
| // pending. Lock must already be acquired before calling this function. |
| void ScheduleOnTaskCompletedWithLockAcquired(); |
| void OnTaskCompletedOnOriginThread(); |
| |
| // Schedule an OnIdleOnOriginThread callback if not already pending. |
| // Lock must already be acquired before calling this function. |
| void ScheduleOnIdleWithLockAcquired(); |
| void OnIdleOnOriginThread(); |
| |
| // Overridden from base::DelegateSimpleThread: |
| virtual void Run() OVERRIDE; |
| |
| // Pointer to worker pool. Can only be used on origin thread. |
| // Not guarded by |lock_|. |
| WorkerPool* worker_pool_on_origin_thread_; |
| |
| // This lock protects all members of this class except |
| // |worker_pool_on_origin_thread_|. Do not read or modify anything |
| // without holding this lock. Do not block while holding this lock. |
| mutable base::Lock lock_; |
| |
| // Condition variable that is waited on by worker threads until new |
| // tasks are posted or shutdown starts. |
| base::ConditionVariable has_pending_tasks_cv_; |
| |
| // Target message loop used for posting callbacks. |
| scoped_refptr<base::MessageLoopProxy> origin_loop_; |
| |
| base::WeakPtrFactory<Inner> weak_ptr_factory_; |
| |
| // Set to true when worker pool requires a callback for each |
| // completed task. |
| bool need_on_task_completed_callback_; |
| |
| const base::Closure on_task_completed_callback_; |
| // Set when a OnTaskCompletedOnOriginThread() callback is pending. |
| bool on_task_completed_pending_; |
| |
| const base::Closure on_idle_callback_; |
| // Set when a OnIdleOnOriginThread() callback is pending. |
| bool on_idle_pending_; |
| |
| // Provides each running thread loop with a unique index. First thread |
| // loop index is 0. |
| unsigned next_thread_index_; |
| |
| // Number of tasks currently running. |
| unsigned running_task_count_; |
| |
| // Set during shutdown. Tells workers to exit when no more tasks |
| // are pending. |
| bool shutdown_; |
| |
| typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque; |
| TaskDeque pending_tasks_; |
| TaskDeque completed_tasks_; |
| |
| scoped_ptr<RenderingStats> rendering_stats_; |
| |
| ScopedPtrDeque<base::DelegateSimpleThread> workers_; |
| |
| DISALLOW_COPY_AND_ASSIGN(Inner); |
| }; |
| |
| WorkerPool::Inner::Inner(WorkerPool* worker_pool, |
| size_t num_threads, |
| const std::string& thread_name_prefix, |
| bool need_on_task_completed_callback) |
| : worker_pool_on_origin_thread_(worker_pool), |
| lock_(), |
| has_pending_tasks_cv_(&lock_), |
| origin_loop_(base::MessageLoopProxy::current()), |
| weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
| need_on_task_completed_callback_(need_on_task_completed_callback), |
| on_task_completed_callback_( |
| base::Bind(&WorkerPool::Inner::OnTaskCompletedOnOriginThread, |
| weak_ptr_factory_.GetWeakPtr())), |
| on_task_completed_pending_(false), |
| on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, |
| weak_ptr_factory_.GetWeakPtr())), |
| on_idle_pending_(false), |
| next_thread_index_(0), |
| running_task_count_(0), |
| shutdown_(false) { |
| base::AutoLock lock(lock_); |
| |
| while (workers_.size() < num_threads) { |
| scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( |
| new base::DelegateSimpleThread( |
| this, |
| thread_name_prefix + |
| StringPrintf("Worker%lu", workers_.size() + 1).c_str())); |
| worker->Start(); |
| workers_.push_back(worker.Pass()); |
| } |
| } |
| |
| WorkerPool::Inner::~Inner() { |
| base::AutoLock lock(lock_); |
| |
| DCHECK(shutdown_); |
| |
| // Cancel all pending callbacks. |
| weak_ptr_factory_.InvalidateWeakPtrs(); |
| |
| DCHECK_EQ(pending_tasks_.size(), 0); |
| DCHECK_EQ(completed_tasks_.size(), 0); |
| DCHECK_EQ(running_task_count_, 0); |
| } |
| |
| void WorkerPool::Inner::Shutdown() { |
| { |
| base::AutoLock lock(lock_); |
| |
| DCHECK(!shutdown_); |
| shutdown_ = true; |
| |
| // Wake up a worker so it knows it should exit. This will cause all workers |
| // to exit as each will wake up another worker before exiting. |
| has_pending_tasks_cv_.Signal(); |
| } |
| |
| while (workers_.size()) { |
| scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); |
| worker->Join(); |
| } |
| } |
| |
| void WorkerPool::Inner::SetRecordRenderingStats(bool record_rendering_stats) { |
| base::AutoLock lock(lock_); |
| |
| if (record_rendering_stats) |
| rendering_stats_.reset(new RenderingStats); |
| else |
| rendering_stats_.reset(); |
| } |
| |
| void WorkerPool::Inner::GetRenderingStats(RenderingStats* stats) { |
| base::AutoLock lock(lock_); |
| |
| if (rendering_stats_) |
| stats->Add(*rendering_stats_); |
| } |
| |
| void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { |
| base::AutoLock lock(lock_); |
| |
| pending_tasks_.push_back(task.Pass()); |
| |
| // There is more work available, so wake up worker thread. |
| has_pending_tasks_cv_.Signal(); |
| } |
| |
| bool WorkerPool::Inner::CollectCompletedTasks() { |
| base::AutoLock lock(lock_); |
| |
| return AppendCompletedTasksWithLockAcquired( |
| &worker_pool_on_origin_thread_->completed_tasks_); |
| } |
| |
| bool WorkerPool::Inner::RunCheapTasksUntilTimeLimit( |
| base::TimeTicks time_limit) { |
| base::AutoLock lock(lock_); |
| |
| while (base::TimeTicks::Now() < time_limit) { |
| scoped_ptr<internal::WorkerPoolTask> task; |
| |
| // Find next cheap task. |
| for (TaskDeque::iterator iter = pending_tasks_.begin(); |
| iter != pending_tasks_.end(); ++iter) { |
| if ((*iter)->IsCheap()) { |
| task = pending_tasks_.take(iter); |
| break; |
| } |
| } |
| |
| if (!task) { |
| // Schedule an idle callback if requested and not pending. |
| if (!running_task_count_ && pending_tasks_.empty()) |
| ScheduleOnIdleWithLockAcquired(); |
| |
| // Exit when no more cheap tasks are pending. |
| break; |
| } |
| |
| scoped_ptr<RenderingStats> rendering_stats; |
| // Collect rendering stats if |rendering_stats_| is set. |
| if (rendering_stats_) |
| rendering_stats = make_scoped_ptr(new RenderingStats); |
| |
| // Increment |running_task_count_| before starting to run task. |
| running_task_count_++; |
| |
| { |
| base::AutoUnlock unlock(lock_); |
| |
| task->Run(rendering_stats.get()); |
| |
| // Append tasks directly to worker pool's completed tasks queue. |
| worker_pool_on_origin_thread_->completed_tasks_.push_back(task.Pass()); |
| if (need_on_task_completed_callback_) |
| worker_pool_on_origin_thread_->OnTaskCompleted(); |
| } |
| |
| // Add rendering stat results to |rendering_stats_|. |
| if (rendering_stats && rendering_stats_) |
| rendering_stats_->Add(*rendering_stats); |
| |
| // Decrement |running_task_count_| now that we are done running task. |
| running_task_count_--; |
| } |
| |
| // Append any other completed tasks before releasing lock. |
| return AppendCompletedTasksWithLockAcquired( |
| &worker_pool_on_origin_thread_->completed_tasks_); |
| } |
| |
| bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired( |
| ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) { |
| lock_.AssertAcquired(); |
| |
| while (completed_tasks_.size()) |
| completed_tasks->push_back(completed_tasks_.take_front().Pass()); |
| |
| return !running_task_count_ && pending_tasks_.empty(); |
| } |
| |
| void WorkerPool::Inner::ScheduleOnTaskCompletedWithLockAcquired() { |
| lock_.AssertAcquired(); |
| |
| if (on_task_completed_pending_ || !need_on_task_completed_callback_) |
| return; |
| origin_loop_->PostTask(FROM_HERE, on_task_completed_callback_); |
| on_task_completed_pending_ = true; |
| } |
| |
| void WorkerPool::Inner::OnTaskCompletedOnOriginThread() { |
| { |
| base::AutoLock lock(lock_); |
| |
| DCHECK(on_task_completed_pending_); |
| on_task_completed_pending_ = false; |
| |
| AppendCompletedTasksWithLockAcquired( |
| &worker_pool_on_origin_thread_->completed_tasks_); |
| } |
| |
| worker_pool_on_origin_thread_->OnTaskCompleted(); |
| } |
| |
| void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { |
| lock_.AssertAcquired(); |
| |
| if (on_idle_pending_) |
| return; |
| origin_loop_->PostTask(FROM_HERE, on_idle_callback_); |
| on_idle_pending_ = true; |
| } |
| |
| void WorkerPool::Inner::OnIdleOnOriginThread() { |
| { |
| base::AutoLock lock(lock_); |
| |
| DCHECK(on_idle_pending_); |
| on_idle_pending_ = false; |
| |
| // Early out if no longer idle. |
| if (running_task_count_ || !pending_tasks_.empty()) |
| return; |
| |
| AppendCompletedTasksWithLockAcquired( |
| &worker_pool_on_origin_thread_->completed_tasks_); |
| } |
| |
| worker_pool_on_origin_thread_->OnIdle(); |
| } |
| |
| void WorkerPool::Inner::Run() { |
| #if defined(OS_ANDROID) |
| // TODO(epenner): Move thread priorities to base. (crbug.com/170549) |
| int nice_value = 10; // Idle priority. |
| setpriority(PRIO_PROCESS, base::PlatformThread::CurrentId(), nice_value); |
| #endif |
| |
| { |
| base::AutoLock lock(lock_); |
| |
| // Get a unique thread index. |
| int thread_index = next_thread_index_++; |
| |
| while (true) { |
| if (pending_tasks_.empty()) { |
| // Exit when shutdown is set and no more tasks are pending. |
| if (shutdown_) |
| break; |
| |
| // Schedule an idle callback if requested and not pending. |
| if (!running_task_count_) |
| ScheduleOnIdleWithLockAcquired(); |
| |
| // Wait for new pending tasks. |
| has_pending_tasks_cv_.Wait(); |
| continue; |
| } |
| |
| // Get next task. |
| scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front(); |
| |
| scoped_ptr<RenderingStats> rendering_stats; |
| // Collect rendering stats if |rendering_stats_| is set. |
| if (rendering_stats_) |
| rendering_stats = make_scoped_ptr(new RenderingStats); |
| |
| // Increment |running_task_count_| before starting to run task. |
| running_task_count_++; |
| |
| // There may be more work available, so wake up another |
| // worker thread. |
| has_pending_tasks_cv_.Signal(); |
| |
| { |
| base::AutoUnlock unlock(lock_); |
| |
| task->WillRunOnThread(thread_index); |
| task->Run(rendering_stats.get()); |
| } |
| |
| completed_tasks_.push_back(task.Pass()); |
| |
| // Add rendering stat results to |rendering_stats_|. |
| if (rendering_stats && rendering_stats_) |
| rendering_stats_->Add(*rendering_stats); |
| |
| // Decrement |running_task_count_| now that we are done running task. |
| running_task_count_--; |
| |
| // Schedule a task completed callback if requested and not pending. |
| ScheduleOnTaskCompletedWithLockAcquired(); |
| } |
| |
| // We noticed we should exit. Wake up the next worker so it knows it should |
| // exit as well (because the Shutdown() code only signals once). |
| has_pending_tasks_cv_.Signal(); |
| } |
| } |
| |
| WorkerPool::WorkerPool(WorkerPoolClient* client, |
| size_t num_threads, |
| base::TimeDelta check_for_completed_tasks_delay, |
| const std::string& thread_name_prefix) |
| : client_(client), |
| origin_loop_(base::MessageLoopProxy::current()), |
| weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
| check_for_completed_tasks_delay_(check_for_completed_tasks_delay), |
| check_for_completed_tasks_pending_(false), |
| run_cheap_tasks_callback_( |
| base::Bind(&WorkerPool::RunCheapTasks, |
| weak_ptr_factory_.GetWeakPtr())), |
| run_cheap_tasks_pending_(false), |
| inner_(make_scoped_ptr( |
| new Inner( |
| this, |
| num_threads, |
| thread_name_prefix, |
| // Request OnTaskCompleted() callback when check |
| // for completed tasks delay is 0. |
| check_for_completed_tasks_delay == base::TimeDelta()))) { |
| } |
| |
| WorkerPool::~WorkerPool() { |
| Shutdown(); |
| |
| // Cancel all pending callbacks. |
| weak_ptr_factory_.InvalidateWeakPtrs(); |
| |
| DCHECK_EQ(completed_tasks_.size(), 0); |
| } |
| |
| void WorkerPool::Shutdown() { |
| inner_->Shutdown(); |
| DispatchCompletionCallbacks(); |
| } |
| |
| void WorkerPool::PostTaskAndReply( |
| const Callback& task, const base::Closure& reply) { |
| PostTask(make_scoped_ptr(new WorkerPoolTaskImpl( |
| task, |
| reply)).PassAs<internal::WorkerPoolTask>()); |
| } |
| |
| void WorkerPool::SetRunCheapTasksTimeLimit( |
| base::TimeTicks run_cheap_tasks_time_limit) { |
| run_cheap_tasks_time_limit_ = run_cheap_tasks_time_limit; |
| ScheduleRunCheapTasks(); |
| } |
| |
| void WorkerPool::SetRecordRenderingStats(bool record_rendering_stats) { |
| inner_->SetRecordRenderingStats(record_rendering_stats); |
| } |
| |
| void WorkerPool::GetRenderingStats(RenderingStats* stats) { |
| inner_->GetRenderingStats(stats); |
| } |
| |
| void WorkerPool::OnIdle() { |
| TRACE_EVENT0("cc", "WorkerPool::OnIdle"); |
| |
| DispatchCompletionCallbacks(); |
| } |
| |
| void WorkerPool::OnTaskCompleted() { |
| TRACE_EVENT0("cc", "WorkerPool::OnTaskCompleted"); |
| |
| DispatchCompletionCallbacks(); |
| } |
| |
| void WorkerPool::ScheduleCheckForCompletedTasks() { |
| if (check_for_completed_tasks_pending_ || |
| check_for_completed_tasks_delay_ == base::TimeDelta()) |
| return; |
| check_for_completed_tasks_callback_.Reset( |
| base::Bind(&WorkerPool::CheckForCompletedTasks, |
| weak_ptr_factory_.GetWeakPtr())); |
| check_for_completed_tasks_time_ = base::TimeTicks::Now() + |
| check_for_completed_tasks_delay_; |
| origin_loop_->PostDelayedTask( |
| FROM_HERE, |
| check_for_completed_tasks_callback_.callback(), |
| check_for_completed_tasks_delay_); |
| check_for_completed_tasks_pending_ = true; |
| } |
| |
| void WorkerPool::CheckForCompletedTasks() { |
| TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); |
| DCHECK(check_for_completed_tasks_pending_); |
| check_for_completed_tasks_pending_ = false; |
| |
| // Schedule another check for completed tasks if not idle. |
| if (!inner_->CollectCompletedTasks()) |
| ScheduleCheckForCompletedTasks(); |
| |
| DispatchCompletionCallbacks(); |
| } |
| |
| void WorkerPool::CancelCheckForCompletedTasks() { |
| if (!check_for_completed_tasks_pending_) |
| return; |
| |
| check_for_completed_tasks_callback_.Cancel(); |
| check_for_completed_tasks_pending_ = false; |
| } |
| |
| void WorkerPool::DispatchCompletionCallbacks() { |
| TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); |
| |
| if (completed_tasks_.empty()) |
| return; |
| |
| while (completed_tasks_.size()) { |
| scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front(); |
| task->DidComplete(); |
| } |
| |
| client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); |
| } |
| |
| void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { |
| if (task->IsCheap()) |
| ScheduleRunCheapTasks(); |
| |
| // Schedule check for completed tasks if not pending. |
| ScheduleCheckForCompletedTasks(); |
| |
| inner_->PostTask(task.Pass()); |
| } |
| |
| void WorkerPool::ScheduleRunCheapTasks() { |
| if (run_cheap_tasks_pending_) |
| return; |
| origin_loop_->PostTask(FROM_HERE, run_cheap_tasks_callback_); |
| run_cheap_tasks_pending_ = true; |
| } |
| |
| void WorkerPool::RunCheapTasks() { |
| TRACE_EVENT0("cc", "WorkerPool::RunCheapTasks"); |
| DCHECK(run_cheap_tasks_pending_); |
| run_cheap_tasks_pending_ = false; |
| |
| while (true) { |
| base::TimeTicks time_limit = run_cheap_tasks_time_limit_; |
| |
| if (!check_for_completed_tasks_time_.is_null()) |
| time_limit = std::min(time_limit, check_for_completed_tasks_time_); |
| |
| bool is_idle = inner_->RunCheapTasksUntilTimeLimit(time_limit); |
| |
| if (base::TimeTicks::Now() >= run_cheap_tasks_time_limit_) { |
| TRACE_EVENT_INSTANT0("cc", "WorkerPool::RunCheapTasks out of time"); |
| break; |
| } |
| |
| // We must be out of cheap tasks if this happens. |
| if (check_for_completed_tasks_time_.is_null() || |
| base::TimeTicks::Now() < run_cheap_tasks_time_limit_) |
| break; |
| |
| TRACE_EVENT_INSTANT0("cc", "WorkerPool::RunCheapTasks check time"); |
| CancelCheckForCompletedTasks(); |
| DispatchCompletionCallbacks(); |
| // Schedule another check for completed tasks if not idle. |
| if (!is_idle) |
| ScheduleCheckForCompletedTasks(); |
| } |
| } |
| |
| } // namespace cc |