blob: 79e698660aac898b605de35a564fd3d68d81a1db [file] [log] [blame]
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/task/thread_pool/thread_group_native.h"
#include <algorithm>
#include <utility>
#include "base/system/sys_info.h"
#include "base/task/thread_pool/task_tracker.h"
namespace base {
namespace internal {
class ThreadGroupNative::ScopedWorkersExecutor
: public ThreadGroup::BaseScopedWorkersExecutor {
public:
ScopedWorkersExecutor(ThreadGroupNative* outer) : outer_(outer) {}
~ScopedWorkersExecutor() {
CheckedLock::AssertNoLockHeldOnCurrentThread();
for (size_t i = 0; i < num_threadpool_work_to_submit_; ++i)
outer_->SubmitWork();
}
// Sets the number of threadpool work to submit upon destruction.
void set_num_threadpool_work_to_submit(size_t num) {
DCHECK_EQ(num_threadpool_work_to_submit_, 0U);
num_threadpool_work_to_submit_ = num;
}
private:
ThreadGroupNative* const outer_;
size_t num_threadpool_work_to_submit_ = 0;
DISALLOW_COPY_AND_ASSIGN(ScopedWorkersExecutor);
};
ThreadGroupNative::ThreadGroupNative(TrackedRef<TaskTracker> task_tracker,
TrackedRef<Delegate> delegate,
ThreadGroup* predecessor_thread_group)
: ThreadGroup(std::move(task_tracker),
std::move(delegate),
predecessor_thread_group) {}
ThreadGroupNative::~ThreadGroupNative() {
#if DCHECK_IS_ON()
// Verify join_for_testing has been called to ensure that there is no more
// outstanding work. Otherwise, work may try to de-reference an invalid
// pointer to this class.
DCHECK(join_for_testing_returned_);
#endif
}
void ThreadGroupNative::Start(WorkerEnvironment worker_environment) {
worker_environment_ = worker_environment;
StartImpl();
ScopedWorkersExecutor executor(this);
CheckedAutoLock auto_lock(lock_);
DCHECK(!started_);
started_ = true;
EnsureEnoughWorkersLockRequired(&executor);
}
void ThreadGroupNative::JoinForTesting() {
{
CheckedAutoLock auto_lock(lock_);
priority_queue_.EnableFlushTaskSourcesOnDestroyForTesting();
}
JoinImpl();
#if DCHECK_IS_ON()
DCHECK(!join_for_testing_returned_);
join_for_testing_returned_ = true;
#endif
}
void ThreadGroupNative::RunNextTaskSourceImpl() {
RegisteredTaskSource task_source = GetWork();
if (task_source) {
BindToCurrentThread();
task_source = task_tracker_->RunAndPopNextTask(std::move(task_source));
UnbindFromCurrentThread();
if (task_source) {
ScopedWorkersExecutor workers_executor(this);
ScopedReenqueueExecutor reenqueue_executor;
auto transaction_with_task_source =
TransactionWithRegisteredTaskSource::FromTaskSource(
std::move(task_source));
CheckedAutoLock auto_lock(lock_);
ReEnqueueTaskSourceLockRequired(&workers_executor, &reenqueue_executor,
std::move(transaction_with_task_source));
}
}
}
void ThreadGroupNative::UpdateMinAllowedPriorityLockRequired() {
// Tasks should yield as soon as there is work of higher priority in
// |priority_queue_|.
min_allowed_priority_.store(priority_queue_.IsEmpty()
? TaskPriority::BEST_EFFORT
: priority_queue_.PeekSortKey().priority(),
std::memory_order_relaxed);
}
RegisteredTaskSource ThreadGroupNative::GetWork() {
ScopedWorkersExecutor workers_executor(this);
CheckedAutoLock auto_lock(lock_);
DCHECK_GT(num_pending_threadpool_work_, 0U);
--num_pending_threadpool_work_;
RegisteredTaskSource task_source;
TaskPriority priority;
while (!task_source && !priority_queue_.IsEmpty()) {
priority = priority_queue_.PeekSortKey().priority();
// Enforce the CanRunPolicy.
if (!task_tracker_->CanRunPriority(priority))
return nullptr;
task_source = TakeRegisteredTaskSource(&workers_executor);
}
UpdateMinAllowedPriorityLockRequired();
return task_source;
}
void ThreadGroupNative::UpdateSortKey(TaskSource::Transaction transaction) {
ScopedWorkersExecutor executor(this);
UpdateSortKeyImpl(&executor, std::move(transaction));
}
void ThreadGroupNative::PushTaskSourceAndWakeUpWorkers(
TransactionWithRegisteredTaskSource transaction_with_task_source) {
ScopedWorkersExecutor executor(this);
PushTaskSourceAndWakeUpWorkersImpl(&executor,
std::move(transaction_with_task_source));
}
void ThreadGroupNative::EnsureEnoughWorkersLockRequired(
BaseScopedWorkersExecutor* executor) {
if (!started_)
return;
// Ensure that there is at least one pending threadpool work per TaskSource in
// the PriorityQueue.
const size_t desired_num_pending_threadpool_work =
GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() +
GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired();
if (desired_num_pending_threadpool_work > num_pending_threadpool_work_) {
static_cast<ScopedWorkersExecutor*>(executor)
->set_num_threadpool_work_to_submit(
desired_num_pending_threadpool_work - num_pending_threadpool_work_);
num_pending_threadpool_work_ = desired_num_pending_threadpool_work;
}
// This function is called every time a task source is queued or re-enqueued,
// hence the minimum priority needs to be updated.
UpdateMinAllowedPriorityLockRequired();
}
size_t ThreadGroupNative::GetMaxConcurrentNonBlockedTasksDeprecated() const {
// Native thread pools give us no control over the number of workers that are
// active at one time. Consequently, we cannot report a true value here.
// Instead, the values were chosen to match
// ThreadPoolInstance::StartWithDefaultParams.
const int num_cores = SysInfo::NumberOfProcessors();
return std::max(3, num_cores - 1);
}
void ThreadGroupNative::ReportHeartbeatMetrics() const {
// Native thread pools do not provide the capability to determine the
// number of worker threads created.
}
void ThreadGroupNative::DidUpdateCanRunPolicy() {
ScopedWorkersExecutor executor(this);
CheckedAutoLock auto_lock(lock_);
EnsureEnoughWorkersLockRequired(&executor);
}
} // namespace internal
} // namespace base