blob: c1fc6c38e5b440ac09ef5b0425036b9eed18b9e4 [file] [log] [blame]
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_
#define BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_
#include <stddef.h>
#include <atomic>
#include <limits>
#include "base/base_export.h"
#include "base/callback.h"
#include "base/macros.h"
#include "base/optional.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
#include "base/task/post_job.h"
#include "base/task/task_traits.h"
#include "base/task/thread_pool/sequence_sort_key.h"
#include "base/task/thread_pool/task.h"
#include "base/task/thread_pool/task_source.h"
namespace base {
namespace internal {
class PooledTaskRunnerDelegate;
// A JobTaskSource generates many Tasks from a single RepeatingClosure.
//
// Derived classes control the intended concurrency with GetMaxConcurrency().
class BASE_EXPORT JobTaskSource : public TaskSource {
public:
JobTaskSource(const Location& from_here,
const TaskTraits& traits,
RepeatingCallback<void(experimental::JobDelegate*)> worker_task,
RepeatingCallback<size_t()> max_concurrency_callback,
PooledTaskRunnerDelegate* delegate);
static experimental::JobHandle CreateJobHandle(
scoped_refptr<internal::JobTaskSource> task_source) {
return experimental::JobHandle(std::move(task_source));
}
// Notifies this task source that max concurrency was increased, and the
// number of worker should be adjusted.
void NotifyConcurrencyIncrease();
// Informs this JobTaskSource that the current thread would like to join and
// contribute to running |worker_task|. Returns true if the joining thread can
// contribute (RunJoinTask() can be called), or false if joining was completed
// and all other workers returned because either there's no work remaining or
// Job was cancelled.
bool WillJoin();
// Contributes to running |worker_task| and returns true if the joining thread
// can contribute again (RunJoinTask() can be called again), or false if
// joining was completed and all other workers returned because either there's
// no work remaining or Job was cancelled. This should be called only after
// WillJoin() or RunJoinTask() previously returned true.
bool RunJoinTask();
// Cancels this JobTaskSource, causing all workers to yield and WillRunTask()
// to return RunStatus::kDisallowed.
void Cancel(TaskSource::Transaction* transaction = nullptr);
// TaskSource:
ExecutionEnvironment GetExecutionEnvironment() override;
size_t GetRemainingConcurrency() const override;
// Returns the maximum number of tasks from this TaskSource that can run
// concurrently.
size_t GetMaxConcurrency() const;
// Returns true if a worker should return from the worker task on the current
// thread ASAP.
bool ShouldYield();
PooledTaskRunnerDelegate* delegate() const { return delegate_; }
#if DCHECK_IS_ON()
size_t GetConcurrencyIncreaseVersion() const;
// Returns true if the concurrency version was updated above
// |recorded_version|, or false on timeout.
bool WaitForConcurrencyIncreaseUpdate(size_t recorded_version);
#endif // DCHECK_IS_ON()
private:
// Atomic internal state to track the number of workers running a task from
// this JobTaskSource and whether this JobTaskSource is canceled.
class State {
public:
static constexpr size_t kCanceledMask = 1;
static constexpr size_t kWorkerCountBitOffset = 1;
static constexpr size_t kWorkerCountIncrement = 1 << kWorkerCountBitOffset;
struct Value {
size_t worker_count() const { return value >> kWorkerCountBitOffset; }
// Returns true if canceled.
bool is_canceled() const { return value & kCanceledMask; }
uint32_t value;
};
State();
~State();
// Sets as canceled using std::memory_order_relaxed. Returns the state
// before the operation.
Value Cancel();
// Increments the worker count by 1 if smaller than |max_concurrency| and if
// |!is_canceled()|, using std::memory_order_release, and returns the state
// before the operation. Equivalent to Load() otherwise.
Value TryIncrementWorkerCountFromWorkerRelease(size_t max_concurrency);
// Decrements the worker count by 1 using std::memory_order_acquire. Returns
// the state before the operation.
Value DecrementWorkerCountFromWorkerAcquire();
// Increments the worker count by 1 using std::memory_order_relaxed. Returns
// the state before the operation.
Value IncrementWorkerCountFromJoiningThread();
// Decrements the worker count by 1 using std::memory_order_relaxed. Returns
// the state before the operation.
Value DecrementWorkerCountFromJoiningThread();
// Loads and returns the state, using std::memory_order_relaxed.
Value Load() const;
private:
std::atomic<uint32_t> value_{0};
};
// Atomic flag that indicates if the joining thread is currently waiting on
// another worker to yield or to signal.
class JoinFlag {
public:
static constexpr uint32_t kNotWaiting = 0;
static constexpr uint32_t kWaitingForWorkerToSignal = 1;
static constexpr uint32_t kWaitingForWorkerToYield = 3;
// kWaitingForWorkerToYield is 3 because the impl relies on the following
// property.
static_assert((kWaitingForWorkerToYield & kWaitingForWorkerToSignal) ==
kWaitingForWorkerToSignal,
"");
JoinFlag();
~JoinFlag();
// Sets the status as kWaitingForWorkerToYield using
// std::memory_order_relaxed.
void SetWaiting();
// If the flag is kWaitingForWorkerToYield, returns true indicating that the
// worker should yield, and atomically updates to kWaitingForWorkerToSignal
// (using std::memory_order_relaxed) to ensure that a single worker yields
// in response to SetWaiting().
bool ShouldWorkerYield();
// If the flag is kWaiting*, returns true indicating that the worker should
// signal, and atomically updates to kNotWaiting (using
// std::memory_order_relaxed) to ensure that a single worker signals in
// response to SetWaiting().
bool ShouldWorkerSignal();
private:
std::atomic<uint32_t> value_{kNotWaiting};
};
~JobTaskSource() override;
// Called from the joining thread. Waits for the worker count to be below or
// equal to max concurrency (will happen when a worker calls
// DidProcessTask()). Returns true if the joining thread should run a task, or
// false if joining was completed and all other workers returned because
// either there's no work remaining or Job was cancelled.
bool WaitForParticipationOpportunity();
// TaskSource:
RunStatus WillRunTask() override;
Task TakeTask(TaskSource::Transaction* transaction) override;
Task Clear(TaskSource::Transaction* transaction) override;
bool DidProcessTask(TaskSource::Transaction* transaction) override;
SequenceSortKey GetSortKey() const override;
// Current atomic state.
State state_;
// Normally, |join_flag_| is protected by |lock_|, except in ShouldYield()
// hence the use of atomics.
JoinFlag join_flag_ GUARDED_BY(lock_);
// Signaled when |join_flag_| is kWaiting* and a worker returns.
std::unique_ptr<ConditionVariable> worker_released_condition_
GUARDED_BY(lock_);
const Location from_here_;
RepeatingCallback<size_t()> max_concurrency_callback_;
// Worker task set by the job owner.
RepeatingCallback<void(experimental::JobDelegate*)> worker_task_;
// Task returned from TakeTask(), that calls |worker_task_| internally.
RepeatingClosure primary_task_;
const TimeTicks queue_time_;
PooledTaskRunnerDelegate* delegate_;
#if DCHECK_IS_ON()
// Synchronizes accesses to |increase_version_|.
mutable Lock version_lock_;
// Signaled whenever increase_version_ is updated.
ConditionVariable version_condition_{&version_lock_};
// Incremented every time max concurrency is increased.
size_t increase_version_ GUARDED_BY(version_lock_) = 0;
#endif // DCHECK_IS_ON()
DISALLOW_COPY_AND_ASSIGN(JobTaskSource);
};
} // namespace internal
} // namespace base
#endif // BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_