blob: c6e5f9733c5605113c3507b1a92dd8ebdfdcbc15 [file] [log] [blame]
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/task/thread_pool/job_task_source.h"
#include <utility>
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/logging.h"
#include "base/memory/ptr_util.h"
#include "base/task/task_features.h"
#include "base/task/thread_pool/pooled_task_runner_delegate.h"
#include "base/threading/thread_restrictions.h"
#include "base/time/time.h"
#include "base/time/time_override.h"
namespace base {
namespace internal {
JobTaskSource::JobTaskSource(
const Location& from_here,
const TaskTraits& traits,
RepeatingCallback<void(experimental::JobDelegate*)> worker_task,
RepeatingCallback<size_t()> max_concurrency_callback,
PooledTaskRunnerDelegate* delegate)
: TaskSource(traits, nullptr, TaskSourceExecutionMode::kJob),
from_here_(from_here),
max_concurrency_callback_(std::move(max_concurrency_callback)),
worker_task_(base::BindRepeating(
[](JobTaskSource* self,
const RepeatingCallback<void(experimental::JobDelegate*)>&
worker_task) {
// Each worker task has its own delegate with associated state.
experimental::JobDelegate job_delegate{self, self->delegate_};
worker_task.Run(&job_delegate);
},
base::Unretained(this),
std::move(worker_task))),
queue_time_(TimeTicks::Now()),
delegate_(delegate) {
DCHECK(delegate_);
}
JobTaskSource::~JobTaskSource() {
#if DCHECK_IS_ON()
auto worker_count = worker_count_.load(std::memory_order_relaxed);
// Make sure there's no outstanding active run operation left.
DCHECK(worker_count == 0U || worker_count == kInvalidWorkerCount)
<< worker_count;
#endif
}
ExecutionEnvironment JobTaskSource::GetExecutionEnvironment() {
return {SequenceToken::Create(), nullptr};
}
TaskSource::RunStatus JobTaskSource::WillRunTask() {
// When this call is caused by an increase of max concurrency followed by an
// associated NotifyConcurrencyIncrease(), the priority queue lock guarantees
// an happens-after relation with NotifyConcurrencyIncrease(). The memory
// operations on |worker_count| below and in DidProcessTask() use
// std::memory_order_release and std::memory_order_acquire respectively to
// establish a Release-Acquire ordering. This ensures that all memory
// side-effects made before this point, including an increase of max
// concurrency followed by NotifyConcurrencyIncrease() are visible to a
// DidProcessTask() call which is ordered after this one.
const size_t max_concurrency = GetMaxConcurrency();
size_t worker_count_before_add =
worker_count_.load(std::memory_order_relaxed);
// std::memory_order_release on success to make the newest |max_concurrency|
// visible to a thread that calls DidProcessTask() containing a matching
// std::memory_order_acquire.
while (worker_count_before_add < max_concurrency &&
!worker_count_.compare_exchange_weak(
worker_count_before_add, worker_count_before_add + 1,
std::memory_order_release, std::memory_order_relaxed)) {
}
// Don't allow this worker to run the task if either:
// A) |worker_count_| is already at |max_concurrency|.
// B) |max_concurrency| was lowered below or to |worker_count_|.
// C) |worker_count_| was invalidated.
if (worker_count_before_add >= max_concurrency) {
// The caller is prevented from running a task from this TaskSource.
return RunStatus::kDisallowed;
}
DCHECK_LT(worker_count_before_add, max_concurrency);
return max_concurrency == worker_count_before_add + 1
? RunStatus::kAllowedSaturated
: RunStatus::kAllowedNotSaturated;
}
size_t JobTaskSource::GetRemainingConcurrency() const {
// std::memory_order_relaxed is sufficient because no other state is
// synchronized with GetRemainingConcurrency().
const size_t worker_count = worker_count_.load(std::memory_order_relaxed);
const size_t max_concurrency = GetMaxConcurrency();
// Avoid underflows.
if (worker_count > max_concurrency)
return 0;
return max_concurrency - worker_count;
}
void JobTaskSource::NotifyConcurrencyIncrease() {
#if DCHECK_IS_ON()
{
AutoLock auto_lock(version_lock_);
++increase_version_;
version_condition_.Broadcast();
}
#endif // DCHECK_IS_ON()
// Make sure the task source is in the queue if not already.
// Caveat: it's possible but unlikely that the task source has already reached
// its intended concurrency and doesn't need to be enqueued if there
// previously were too many worker. For simplicity, the task source is always
// enqueued and will get discarded if already saturated when it is popped from
// the priority queue.
delegate_->EnqueueJobTaskSource(this);
}
size_t JobTaskSource::GetMaxConcurrency() const {
return max_concurrency_callback_.Run();
}
#if DCHECK_IS_ON()
size_t JobTaskSource::GetConcurrencyIncreaseVersion() const {
AutoLock auto_lock(version_lock_);
return increase_version_;
}
bool JobTaskSource::WaitForConcurrencyIncreaseUpdate(size_t recorded_version) {
AutoLock auto_lock(version_lock_);
constexpr TimeDelta timeout = TimeDelta::FromSeconds(1);
const base::TimeTicks start_time = subtle::TimeTicksNowIgnoringOverride();
do {
DCHECK_LE(recorded_version, increase_version_);
if (recorded_version != increase_version_)
return true;
// Waiting is acceptable because it is in DCHECK-only code.
ScopedAllowBaseSyncPrimitivesOutsideBlockingScope
allow_base_sync_primitives;
version_condition_.TimedWait(timeout);
} while (subtle::TimeTicksNowIgnoringOverride() - start_time < timeout);
return false;
}
#endif // DCHECK_IS_ON()
Optional<Task> JobTaskSource::TakeTask(TaskSource::Transaction* transaction) {
DCHECK_GT(worker_count_.load(std::memory_order_relaxed), 0U);
DCHECK(worker_task_);
return base::make_optional<Task>(from_here_, worker_task_, TimeDelta());
}
bool JobTaskSource::DidProcessTask(TaskSource::Transaction* transaction) {
size_t worker_count_before_sub =
worker_count_.load(std::memory_order_relaxed);
// std::memory_order_acquire on |worker_count_| is necessary to establish
// Release-Acquire ordering (see WillRunTask()).
// When the task source needs to be queued, either because the current task
// yielded or because of NotifyConcurrencyIncrease(), one of the following is
// true:
// A) The JobTaskSource is already in the queue (no worker picked up the
// extra work yet): Incorrectly returning false is fine and the memory
// barrier may be ineffective.
// B) The JobTaskSource() is no longer in the queue: The Release-Acquire
// ordering with WillRunTask() established by |worker_count| ensures that
// the upcoming call for GetMaxConcurrency() happens-after any
// NotifyConcurrencyIncrease() that happened-before WillRunTask(). If
// this task completed because it yielded, this barrier guarantees that
// it sees an up-to-date concurrency value and correctly re-enqueues.
//
// Note that stale values the other way around (incorrectly re-enqueuing) are
// not an issue because the queues support empty task sources.
while (worker_count_before_sub != kInvalidWorkerCount &&
!worker_count_.compare_exchange_weak(
worker_count_before_sub, worker_count_before_sub - 1,
std::memory_order_acquire, std::memory_order_relaxed)) {
}
if (worker_count_before_sub == kInvalidWorkerCount)
return false;
DCHECK_GT(worker_count_before_sub, 0U);
// Re-enqueue the TaskSource if the task ran and the worker count is below the
// max concurrency.
return worker_count_before_sub <= GetMaxConcurrency();
}
SequenceSortKey JobTaskSource::GetSortKey() const {
return SequenceSortKey(traits_.priority(), queue_time_);
}
Optional<Task> JobTaskSource::Clear(TaskSource::Transaction* transaction) {
// Invalidate |worker_count_| so that further calls to WillRunTask() never
// succeed. std::memory_order_relaxed is sufficient because this task source
// never needs to be re-enqueued after Clear().
size_t worker_count_before_store =
worker_count_.exchange(kInvalidWorkerCount, std::memory_order_relaxed);
DCHECK_GT(worker_count_before_store, 0U);
// Nothing is cleared since other workers might still racily run tasks. For
// simplicity, the destructor will take care of it once all references are
// released.
return base::make_optional<Task>(from_here_, DoNothing(), TimeDelta());
}
} // namespace internal
} // namespace base