blob: 37063794e0c2a85cb8840c6361e545e6537a892d [file] [log] [blame]
// Copyright 2015 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "cc/raster/categorized_worker_pool.h"
#include <algorithm>
#include <memory>
#include <string>
#include <utility>
#include "base/command_line.h"
#include "base/containers/contains.h"
#include "base/functional/bind.h"
#include "base/no_destructor.h"
#include "base/strings/string_number_conversions.h"
#include "base/task/sequence_manager/task_time_observer.h"
#include "base/task/sequenced_task_runner.h"
#include "base/task/single_thread_task_runner.h"
#include "base/task/task_traits.h"
#include "base/threading/thread_restrictions.h"
#include "base/trace_event/typed_macros.h"
#include "build/build_config.h"
#include "cc/base/math_util.h"
#include "cc/base/switches.h"
#include "cc/raster/task_category.h"
namespace cc {
namespace {
// Task categories running at normal thread priority.
constexpr TaskCategory kNormalThreadPriorityCategories[] = {
TASK_CATEGORY_NONCONCURRENT_FOREGROUND, TASK_CATEGORY_FOREGROUND,
TASK_CATEGORY_BACKGROUND_WITH_NORMAL_THREAD_PRIORITY};
// Task categories running at background thread priority.
constexpr TaskCategory kBackgroundThreadPriorityCategories[] = {
TASK_CATEGORY_BACKGROUND};
// Foreground task categories.
constexpr TaskCategory kForegroundCategories[] = {
TASK_CATEGORY_NONCONCURRENT_FOREGROUND, TASK_CATEGORY_FOREGROUND};
// Background task categories. Tasks in these categories cannot start running
// when a task with a category in |kForegroundCategories| is running or ready to
// run.
constexpr TaskCategory kBackgroundCategories[] = {
TASK_CATEGORY_BACKGROUND,
TASK_CATEGORY_BACKGROUND_WITH_NORMAL_THREAD_PRIORITY};
scoped_refptr<CategorizedWorkerPool>& GetWorkerPool() {
static base::NoDestructor<scoped_refptr<CategorizedWorkerPool>> worker_pool;
return *worker_pool;
}
} // namespace
// A sequenced task runner which posts tasks to a CategorizedWorkerPool.
class CategorizedWorkerPool::CategorizedWorkerPoolSequencedTaskRunner
: public base::SequencedTaskRunner {
public:
explicit CategorizedWorkerPoolSequencedTaskRunner(
TaskGraphRunner* task_graph_runner)
: task_graph_runner_(task_graph_runner),
namespace_token_(task_graph_runner->GenerateNamespaceToken()) {}
// Overridden from base::TaskRunner:
bool PostDelayedTask(const base::Location& from_here,
base::OnceClosure task,
base::TimeDelta delay) override {
return PostNonNestableDelayedTask(from_here, std::move(task), delay);
}
// Overridden from base::SequencedTaskRunner:
bool PostNonNestableDelayedTask(const base::Location& from_here,
base::OnceClosure task,
base::TimeDelta delay) override {
// Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
// for details.
CHECK(task);
base::AutoLock lock(lock_);
// Remove completed tasks.
DCHECK(completed_tasks_.empty());
task_graph_runner_->CollectCompletedTasks(namespace_token_,
&completed_tasks_);
tasks_.erase(tasks_.begin(), tasks_.begin() + completed_tasks_.size());
tasks_.push_back(base::MakeRefCounted<ClosureTask>(std::move(task)));
graph_.Reset();
for (const auto& graph_task : tasks_) {
int dependencies = 0;
if (!graph_.nodes.empty()) {
dependencies = 1;
}
// Treat any tasks that are enqueued through the SequencedTaskRunner as
// FOREGROUND priority. We don't have enough information to know the
// actual priority of such tasks, so we run them as soon as possible.
TaskGraph::Node node(graph_task, TASK_CATEGORY_FOREGROUND,
0u /* priority */, dependencies);
if (dependencies) {
graph_.edges.push_back(
TaskGraph::Edge(graph_.nodes.back().task.get(), node.task.get()));
}
graph_.nodes.push_back(std::move(node));
}
task_graph_runner_->ScheduleTasks(namespace_token_, &graph_);
completed_tasks_.clear();
return true;
}
bool RunsTasksInCurrentSequence() const override { return true; }
private:
~CategorizedWorkerPoolSequencedTaskRunner() override {
{
base::ScopedAllowBaseSyncPrimitivesOutsideBlockingScope allow_wait;
task_graph_runner_->WaitForTasksToFinishRunning(namespace_token_);
}
task_graph_runner_->CollectCompletedTasks(namespace_token_,
&completed_tasks_);
}
// Lock to exclusively access all the following members that are used to
// implement the SequencedTaskRunner interfaces.
base::Lock lock_;
raw_ptr<TaskGraphRunner> task_graph_runner_;
// Namespace used to schedule tasks in the task graph runner.
NamespaceToken namespace_token_;
// List of tasks currently queued up for execution.
Task::Vector tasks_;
// Graph object used for scheduling tasks.
TaskGraph graph_;
// Cached vector to avoid allocation when getting the list of complete
// tasks.
Task::Vector completed_tasks_;
};
CategorizedWorkerPoolJob::CategorizedWorkerPoolJob() = default;
CategorizedWorkerPoolJob::~CategorizedWorkerPoolJob() = default;
void CategorizedWorkerPoolJob::Start(int max_concurrency_foreground) {
max_concurrency_foreground_ = max_concurrency_foreground;
background_job_handle_ = base::CreateJob(
FROM_HERE,
{base::TaskPriority::BEST_EFFORT, base::ThreadPolicy::PREFER_BACKGROUND,
base::MayBlock()},
base::BindRepeating(
&CategorizedWorkerPoolJob::Run, base::Unretained(this),
base::span<const TaskCategory>(kBackgroundThreadPriorityCategories)),
base::BindRepeating(
[](CategorizedWorkerPoolJob* self, size_t) {
return std::min<size_t>(1U,
self->GetMaxJobConcurrency(
kBackgroundThreadPriorityCategories));
},
base::Unretained(this)));
foreground_job_handle_ = base::CreateJob(
FROM_HERE, {base::TaskPriority::USER_BLOCKING, base::MayBlock()},
base::BindRepeating(
&CategorizedWorkerPoolJob::Run, base::Unretained(this),
base::span<const TaskCategory>(kNormalThreadPriorityCategories)),
base::BindRepeating(
[](CategorizedWorkerPoolJob* self, size_t) {
return std::min(
self->max_concurrency_foreground_,
self->GetMaxJobConcurrency(kNormalThreadPriorityCategories));
},
base::Unretained(this)));
}
void CategorizedWorkerPoolJob::Shutdown() {
{
base::ScopedAllowBaseSyncPrimitivesOutsideBlockingScope allow;
WaitForTasksToFinishRunning(namespace_token_);
}
CollectCompletedTasks(namespace_token_, &completed_tasks_);
// Shutdown raster threads.
{
base::AutoLock lock(lock_);
DCHECK(!work_queue_.HasReadyToRunTasks());
DCHECK(!work_queue_.HasAnyNamespaces());
}
if (foreground_job_handle_) {
foreground_job_handle_.Cancel();
}
if (background_job_handle_) {
background_job_handle_.Cancel();
}
workers_are_idle_cv_.Broadcast();
}
// Overridden from base::TaskRunner:
bool CategorizedWorkerPoolJob::PostDelayedTask(const base::Location& from_here,
base::OnceClosure task,
base::TimeDelta delay) {
base::JobHandle* job_handle_to_notify = nullptr;
{
base::AutoLock lock(lock_);
// Remove completed tasks.
DCHECK(completed_tasks_.empty());
CollectCompletedTasksWithLockAcquired(namespace_token_, &completed_tasks_);
std::erase_if(tasks_,
[this](const scoped_refptr<Task>& e)
EXCLUSIVE_LOCKS_REQUIRED(lock_) {
return base::Contains(this->completed_tasks_, e);
});
tasks_.push_back(base::MakeRefCounted<ClosureTask>(std::move(task)));
graph_.Reset();
for (const auto& graph_task : tasks_) {
// Delayed tasks are assigned FOREGROUND category, ensuring that they run
// as soon as possible once their delay has expired.
graph_.nodes.push_back(
TaskGraph::Node(graph_task.get(), TASK_CATEGORY_FOREGROUND,
0u /* priority */, 0u /* dependencies */));
}
job_handle_to_notify =
ScheduleTasksWithLockAcquired(namespace_token_, &graph_);
completed_tasks_.clear();
}
if (job_handle_to_notify) {
job_handle_to_notify->NotifyConcurrencyIncrease();
}
return true;
}
void CategorizedWorkerPoolJob::Run(base::span<const TaskCategory> categories,
base::JobDelegate* job_delegate) {
std::optional<TaskGraphWorkQueue::PrioritizedTask> prioritized_task;
while (!job_delegate->ShouldYield()) {
base::JobHandle* job_handle_to_notify = nullptr;
{
base::AutoLock lock(lock_);
// Pop a task for |categories|.
prioritized_task = GetNextTaskToRunWithLockAcquired(categories);
if (!prioritized_task) {
// We are no longer running tasks, which may allow another category to
// start running. Notify other worker jobs outside of |lock| below.
job_handle_to_notify =
ScheduleTasksWithLockAcquired(namespace_token_, &graph_);
}
}
if (job_handle_to_notify) {
job_handle_to_notify->NotifyConcurrencyIncrease();
}
// There's no pending task to run, quit the worker until notified again.
if (!prioritized_task) {
if (!job_handle_to_notify) {
// No pending task to run and no other category or job handle with tasks
// to run, so the workers are going idle.
workers_are_idle_cv_.Signal();
}
return;
}
TRACE_EVENT(
"toplevel", "TaskGraphRunner::RunTask",
perfetto::TerminatingFlow::Global(
prioritized_task->task->trace_task_id()),
[&](perfetto::EventContext ctx) {
ctx.event<perfetto::protos::pbzero::ChromeTrackEvent>()
->set_chrome_raster_task()
->set_source_frame_number(prioritized_task->task->frame_number());
});
base::ScopedAllowBaseSyncPrimitives allow;
prioritized_task->task->RunOnWorkerThread();
{
base::AutoLock lock(lock_);
auto* task_namespace = prioritized_task->task_namespace.get();
work_queue_.CompleteTask(std::move(*prioritized_task));
// If namespace has finished running all tasks, wake up origin threads.
if (work_queue_.HasFinishedRunningTasksInNamespace(task_namespace)) {
has_namespaces_with_finished_running_tasks_cv_.Signal();
}
}
}
}
std::optional<TaskGraphWorkQueue::PrioritizedTask>
CategorizedWorkerPoolJob::GetNextTaskToRunWithLockAcquired(
base::span<const TaskCategory> categories) {
lock_.AssertAcquired();
for (const auto& category : categories) {
if (ShouldRunTaskForCategoryWithLockAcquired(category)) {
return work_queue_.GetNextTaskToRun(category);
}
}
return std::nullopt;
}
void CategorizedWorkerPoolJob::FlushForTesting() {
foreground_job_handle_.Join();
background_job_handle_.Join();
}
void CategorizedWorkerPoolJob::ScheduleTasks(NamespaceToken token,
TaskGraph* graph) {
TRACE_EVENT2("disabled-by-default-cc.debug",
"CategorizedWorkerPool::ScheduleTasks", "num_nodes",
graph->nodes.size(), "num_edges", graph->edges.size());
base::JobHandle* job_handle_to_notify = nullptr;
{
base::AutoLock lock(lock_);
job_handle_to_notify = ScheduleTasksWithLockAcquired(token, graph);
}
if (job_handle_to_notify) {
job_handle_to_notify->NotifyConcurrencyIncrease();
}
}
void CategorizedWorkerPoolJob::ExternalDependencyCompletedForTask(
NamespaceToken token,
scoped_refptr<Task> task) {
base::JobHandle* job_handle_to_notify = nullptr;
{
base::AutoLock lock(lock_);
if (work_queue_.ExternalDependencyCompletedForTask(token,
std::move(task))) {
// If the task became ready to run, we may need to tell its JobHandle to
// start a worker.
job_handle_to_notify = GetJobHandleToNotifyWithLockAcquired();
}
}
if (job_handle_to_notify) {
job_handle_to_notify->NotifyConcurrencyIncrease();
}
}
base::JobHandle* CategorizedWorkerPoolJob::ScheduleTasksWithLockAcquired(
NamespaceToken token,
TaskGraph* graph) {
DCHECK(token.IsValid());
DCHECK(!TaskGraphWorkQueue::DependencyMismatch(graph));
work_queue_.ScheduleTasks(token, graph);
return GetJobHandleToNotifyWithLockAcquired();
}
base::JobHandle*
CategorizedWorkerPoolJob::GetJobHandleToNotifyWithLockAcquired() {
lock_.AssertAcquired();
for (TaskCategory category : kNormalThreadPriorityCategories) {
if (ShouldRunTaskForCategoryWithLockAcquired(category)) {
return &foreground_job_handle_;
}
}
// Due to the early return in the previous loop, this only runs when there are
// no tasks to run on normal priority threads.
for (TaskCategory category : kBackgroundThreadPriorityCategories) {
if (ShouldRunTaskForCategoryWithLockAcquired(category)) {
return &background_job_handle_;
}
}
return nullptr;
}
size_t CategorizedWorkerPoolJob::GetMaxJobConcurrency(
base::span<const TaskCategory> categories) const {
base::AutoLock lock(lock_);
bool has_foreground_tasks = false;
for (TaskCategory foreground_category : kForegroundCategories) {
if (work_queue_.NumRunningTasksForCategory(foreground_category) > 0 ||
work_queue_.HasReadyToRunTasksForCategory(foreground_category)) {
has_foreground_tasks = true;
break;
}
}
bool has_running_background_tasks = false;
for (TaskCategory background_category : kBackgroundCategories) {
has_running_background_tasks |=
work_queue_.NumRunningTasksForCategory(background_category);
}
size_t num_foreground_tasks = 0;
size_t num_background_tasks = 0;
for (TaskCategory category : categories) {
if (base::Contains(kBackgroundCategories, category)) {
if (work_queue_.NumRunningTasksForCategory(category) > 0) {
num_background_tasks = 1;
}
// Enforce that only one background task is allowed to run at a time, and
// only if there are no foreground tasks running or ready to run.
if (!has_running_background_tasks && !has_foreground_tasks &&
work_queue_.HasReadyToRunTasksForCategory(category)) {
num_background_tasks = 1;
}
} else if (category == TASK_CATEGORY_NONCONCURRENT_FOREGROUND) {
// Enforce that only one nonconcurrent task is allowed to run at a time.
if (work_queue_.NumRunningTasksForCategory(category) > 0 ||
work_queue_.HasReadyToRunTasksForCategory(category)) {
++num_foreground_tasks;
}
} else {
num_foreground_tasks += work_queue_.NumRunningTasksForCategory(category) +
work_queue_.NumReadyTasksForCategory(category);
}
}
return num_foreground_tasks + num_background_tasks;
}
CategorizedWorkerPool* CategorizedWorkerPool::GetOrCreate(Delegate* delegate) {
if (GetWorkerPool()) {
return GetWorkerPool().get();
}
const base::CommandLine& command_line =
*base::CommandLine::ForCurrentProcess();
int num_raster_threads = 1;
if (command_line.HasSwitch(switches::kNumRasterThreads)) {
std::string string_value =
command_line.GetSwitchValueASCII(switches::kNumRasterThreads);
bool parsed_num_raster_threads =
base::StringToInt(string_value, &num_raster_threads);
CHECK(parsed_num_raster_threads) << string_value;
CHECK_GT(num_raster_threads, 0);
}
scoped_refptr<CategorizedWorkerPool> categorized_worker_pool =
scoped_refptr<CategorizedWorkerPool>(new CategorizedWorkerPoolJob());
categorized_worker_pool->Start(num_raster_threads);
GetWorkerPool() = std::move(categorized_worker_pool);
return GetWorkerPool().get();
}
CategorizedWorkerPool::CategorizedWorkerPool()
: namespace_token_(GenerateNamespaceToken()),
has_namespaces_with_finished_running_tasks_cv_(&lock_),
workers_are_idle_cv_(&lock_) {}
scoped_refptr<base::SequencedTaskRunner>
CategorizedWorkerPool::CreateSequencedTaskRunner() {
return new CategorizedWorkerPoolSequencedTaskRunner(this);
}
CategorizedWorkerPool::~CategorizedWorkerPool() = default;
NamespaceToken CategorizedWorkerPool::GenerateNamespaceToken() {
base::AutoLock lock(lock_);
return work_queue_.GenerateNamespaceToken();
}
void CategorizedWorkerPool::WaitForTasksToFinishRunning(NamespaceToken token) {
TRACE_EVENT0("disabled-by-default-cc.debug",
"CategorizedWorkerPool::WaitForTasksToFinishRunning");
DCHECK(token.IsValid());
{
base::AutoLock lock(lock_);
auto* task_namespace = work_queue_.GetNamespaceForToken(token);
if (!task_namespace) {
return;
}
while (!work_queue_.HasFinishedRunningTasksInNamespace(task_namespace)) {
has_namespaces_with_finished_running_tasks_cv_.Wait();
}
// There may be other namespaces that have finished running tasks, so wake
// up another origin thread.
has_namespaces_with_finished_running_tasks_cv_.Signal();
}
}
void CategorizedWorkerPool::CollectCompletedTasks(
NamespaceToken token,
Task::Vector* completed_tasks) {
TRACE_EVENT0("disabled-by-default-cc.debug",
"CategorizedWorkerPool::CollectCompletedTasks");
{
base::AutoLock lock(lock_);
CollectCompletedTasksWithLockAcquired(token, completed_tasks);
}
}
void CategorizedWorkerPool::RunTasksUntilIdleForTest() {
base::AutoLock lock(lock_);
while (work_queue_.HasReadyToRunTasks() ||
work_queue_.NumRunningTasks() > 0) {
workers_are_idle_cv_.Wait();
}
}
void CategorizedWorkerPool::CollectCompletedTasksWithLockAcquired(
NamespaceToken token,
Task::Vector* completed_tasks) {
DCHECK(token.IsValid());
work_queue_.CollectCompletedTasks(token, completed_tasks);
}
bool CategorizedWorkerPool::ShouldRunTaskForCategoryWithLockAcquired(
TaskCategory category) {
lock_.AssertAcquired();
if (!work_queue_.HasReadyToRunTasksForCategory(category)) {
return false;
}
if (base::Contains(kBackgroundCategories, category)) {
// Only run background tasks if there are no foreground tasks running or
// ready to run.
for (TaskCategory foreground_category : kForegroundCategories) {
if (work_queue_.NumRunningTasksForCategory(foreground_category) > 0 ||
work_queue_.HasReadyToRunTasksForCategory(foreground_category)) {
return false;
}
}
// Enforce that only one background task runs at a time.
for (TaskCategory background_category : kBackgroundCategories) {
if (work_queue_.NumRunningTasksForCategory(background_category) > 0) {
return false;
}
}
}
// Enforce that only one nonconcurrent task runs at a time.
if (category == TASK_CATEGORY_NONCONCURRENT_FOREGROUND &&
work_queue_.NumRunningTasksForCategory(
TASK_CATEGORY_NONCONCURRENT_FOREGROUND) > 0) {
return false;
}
return true;
}
CategorizedWorkerPool::ClosureTask::ClosureTask(base::OnceClosure closure)
: closure_(std::move(closure)) {}
// Overridden from Task:
void CategorizedWorkerPool::ClosureTask::RunOnWorkerThread() {
std::move(closure_).Run();
}
CategorizedWorkerPool::ClosureTask::~ClosureTask() {}
} // namespace cc