| // Copyright 2019 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "base/task/thread_pool/job_task_source.h" |
| |
| #include <type_traits> |
| #include <utility> |
| |
| #include "base/bind.h" |
| #include "base/bind_helpers.h" |
| #include "base/bits.h" |
| #include "base/check_op.h" |
| #include "base/memory/ptr_util.h" |
| #include "base/task/common/checked_lock.h" |
| #include "base/task/task_features.h" |
| #include "base/task/thread_pool/pooled_task_runner_delegate.h" |
| #include "base/threading/thread_restrictions.h" |
| #include "base/time/time.h" |
| #include "base/time/time_override.h" |
| #include "base/trace_event/base_tracing.h" |
| |
| namespace base { |
| namespace internal { |
| |
| namespace { |
| |
| // Capped to allow assigning task_ids from a bitfield. |
| constexpr size_t kMaxWorkersPerJob = 32; |
| static_assert( |
| kMaxWorkersPerJob <= |
| std::numeric_limits<std::result_of< |
| decltype (&JobDelegate::GetTaskId)(JobDelegate)>::type>::max(), |
| "AcquireTaskId return type isn't big enough to fit kMaxWorkersPerJob"); |
| |
| } // namespace |
| |
| JobTaskSource::State::State() = default; |
| JobTaskSource::State::~State() = default; |
| |
| JobTaskSource::State::Value JobTaskSource::State::Cancel() { |
| return {value_.fetch_or(kCanceledMask, std::memory_order_relaxed)}; |
| } |
| |
| JobTaskSource::State::Value JobTaskSource::State::DecrementWorkerCount() { |
| const size_t value_before_sub = |
| value_.fetch_sub(kWorkerCountIncrement, std::memory_order_relaxed); |
| DCHECK((value_before_sub >> kWorkerCountBitOffset) > 0); |
| return {value_before_sub}; |
| } |
| |
| JobTaskSource::State::Value JobTaskSource::State::IncrementWorkerCount() { |
| size_t value_before_add = |
| value_.fetch_add(kWorkerCountIncrement, std::memory_order_relaxed); |
| return {value_before_add}; |
| } |
| |
| JobTaskSource::State::Value JobTaskSource::State::Load() const { |
| return {value_.load(std::memory_order_relaxed)}; |
| } |
| |
| JobTaskSource::JoinFlag::JoinFlag() = default; |
| JobTaskSource::JoinFlag::~JoinFlag() = default; |
| |
| void JobTaskSource::JoinFlag::SetWaiting() { |
| value_.store(kWaitingForWorkerToYield, std::memory_order_relaxed); |
| } |
| |
| bool JobTaskSource::JoinFlag::ShouldWorkerYield() { |
| // The fetch_and() sets the state to kWaitingForWorkerToSignal if it was |
| // previously kWaitingForWorkerToYield, otherwise it leaves it unchanged. |
| return value_.fetch_and(kWaitingForWorkerToSignal, |
| std::memory_order_relaxed) == |
| kWaitingForWorkerToYield; |
| } |
| |
| bool JobTaskSource::JoinFlag::ShouldWorkerSignal() { |
| return value_.exchange(kNotWaiting, std::memory_order_relaxed) != kNotWaiting; |
| } |
| |
| JobTaskSource::JobTaskSource(const Location& from_here, |
| const TaskTraits& traits, |
| RepeatingCallback<void(JobDelegate*)> worker_task, |
| MaxConcurrencyCallback max_concurrency_callback, |
| PooledTaskRunnerDelegate* delegate) |
| : TaskSource(traits, nullptr, TaskSourceExecutionMode::kJob), |
| from_here_(from_here), |
| max_concurrency_callback_(std::move(max_concurrency_callback)), |
| worker_task_(std::move(worker_task)), |
| primary_task_(base::BindRepeating( |
| [](JobTaskSource* self) { |
| CheckedLock::AssertNoLockHeldOnCurrentThread(); |
| // Each worker task has its own delegate with associated state. |
| JobDelegate job_delegate{self, self->delegate_}; |
| self->worker_task_.Run(&job_delegate); |
| }, |
| base::Unretained(this))), |
| queue_time_(TimeTicks::Now()), |
| delegate_(delegate) { |
| DCHECK(delegate_); |
| } |
| |
| JobTaskSource::~JobTaskSource() { |
| // Make sure there's no outstanding active run operation left. |
| DCHECK_EQ(state_.Load().worker_count(), 0U); |
| } |
| |
| ExecutionEnvironment JobTaskSource::GetExecutionEnvironment() { |
| return {SequenceToken::Create(), nullptr}; |
| } |
| |
| bool JobTaskSource::WillJoin() { |
| TRACE_EVENT0("base", "Job.WaitForParticipationOpportunity"); |
| CheckedAutoLock auto_lock(worker_lock_); |
| DCHECK(!worker_released_condition_); // This may only be called once. |
| worker_released_condition_ = worker_lock_.CreateConditionVariable(); |
| // Prevent wait from triggering a ScopedBlockingCall as this would cause |
| // |ThreadGroup::lock_| to be acquired, causing lock inversion. |
| worker_released_condition_->declare_only_used_while_idle(); |
| const auto state_before_add = state_.IncrementWorkerCount(); |
| |
| if (!state_before_add.is_canceled() && |
| state_before_add.worker_count() < |
| GetMaxConcurrency(state_before_add.worker_count())) { |
| return true; |
| } |
| return WaitForParticipationOpportunity(); |
| } |
| |
| bool JobTaskSource::RunJoinTask() { |
| JobDelegate job_delegate{this, nullptr}; |
| worker_task_.Run(&job_delegate); |
| |
| // It is safe to read |state_| without a lock since this variable is atomic |
| // and the call to GetMaxConcurrency() is used for a best effort early exit. |
| // Stale values will only cause WaitForParticipationOpportunity() to be |
| // called. |
| const auto state = TS_UNCHECKED_READ(state_).Load(); |
| // The condition is slightly different from the one in WillJoin() since we're |
| // using |state| that was already incremented to include the joining thread. |
| if (!state.is_canceled() && |
| state.worker_count() <= GetMaxConcurrency(state.worker_count() - 1)) { |
| return true; |
| } |
| |
| TRACE_EVENT0("base", "Job.WaitForParticipationOpportunity"); |
| CheckedAutoLock auto_lock(worker_lock_); |
| return WaitForParticipationOpportunity(); |
| } |
| |
| void JobTaskSource::Cancel(TaskSource::Transaction* transaction) { |
| CheckedAutoLock auto_lock(worker_lock_); |
| // Sets the kCanceledMask bit on |state_| so that further calls to |
| // WillRunTask() never succeed. std::memory_order_relaxed is sufficient |
| // because this task source never needs to be re-enqueued after Cancel(). |
| state_.Cancel(); |
| } |
| |
| // EXCLUSIVE_LOCK_REQUIRED(worker_lock_) |
| bool JobTaskSource::WaitForParticipationOpportunity() { |
| DCHECK(!join_flag_.IsWaiting()); |
| |
| // std::memory_order_relaxed is sufficient because no other state is |
| // synchronized with |state_| outside of |lock_|. |
| auto state = state_.Load(); |
| // |worker_count - 1| to exclude the joining thread which is not active. |
| size_t max_concurrency = GetMaxConcurrency(state.worker_count() - 1); |
| |
| // Wait until either: |
| // A) |worker_count| is below or equal to max concurrency and state is not |
| // canceled. |
| // B) All other workers returned and |worker_count| is 1. |
| while (!((state.worker_count() <= max_concurrency && !state.is_canceled()) || |
| state.worker_count() == 1)) { |
| // std::memory_order_relaxed is sufficient because no other state is |
| // synchronized with |join_flag_| outside of |lock_|. |
| join_flag_.SetWaiting(); |
| |
| // To avoid unnecessarily waiting, if either condition A) or B) change |
| // |lock_| is taken and |worker_released_condition_| signaled if necessary: |
| // 1- In DidProcessTask(), after worker count is decremented. |
| // 2- In NotifyConcurrencyIncrease(), following a max_concurrency increase. |
| worker_released_condition_->Wait(); |
| state = state_.Load(); |
| // |worker_count - 1| to exclude the joining thread which is not active. |
| max_concurrency = GetMaxConcurrency(state.worker_count() - 1); |
| } |
| // Case A: |
| if (state.worker_count() <= max_concurrency && !state.is_canceled()) |
| return true; |
| // Case B: |
| // Only the joining thread remains. |
| DCHECK_EQ(state.worker_count(), 1U); |
| DCHECK(state.is_canceled() || max_concurrency == 0U); |
| state_.DecrementWorkerCount(); |
| // Prevent subsequent accesses to user callbacks. |
| state_.Cancel(); |
| return false; |
| } |
| |
| TaskSource::RunStatus JobTaskSource::WillRunTask() { |
| CheckedAutoLock auto_lock(worker_lock_); |
| auto state_before_add = state_.Load(); |
| |
| // Don't allow this worker to run the task if either: |
| // A) |state_| was canceled. |
| // B) |worker_count| is already at |max_concurrency|. |
| // C) |max_concurrency| was lowered below or to |worker_count|. |
| // Case A: |
| if (state_before_add.is_canceled()) |
| return RunStatus::kDisallowed; |
| |
| const size_t max_concurrency = |
| GetMaxConcurrency(state_before_add.worker_count()); |
| if (state_before_add.worker_count() < max_concurrency) |
| state_before_add = state_.IncrementWorkerCount(); |
| const size_t worker_count_before_add = state_before_add.worker_count(); |
| // Case B) or C): |
| if (worker_count_before_add >= max_concurrency) |
| return RunStatus::kDisallowed; |
| |
| DCHECK_LT(worker_count_before_add, max_concurrency); |
| return max_concurrency == worker_count_before_add + 1 |
| ? RunStatus::kAllowedSaturated |
| : RunStatus::kAllowedNotSaturated; |
| } |
| |
| size_t JobTaskSource::GetRemainingConcurrency() const { |
| // It is safe to read |state_| without a lock since this variable is atomic, |
| // and no other state is synchronized with GetRemainingConcurrency(). |
| const auto state = TS_UNCHECKED_READ(state_).Load(); |
| if (state.is_canceled()) |
| return 0; |
| const size_t max_concurrency = GetMaxConcurrency(state.worker_count()); |
| // Avoid underflows. |
| if (state.worker_count() > max_concurrency) |
| return 0; |
| return max_concurrency - state.worker_count(); |
| } |
| |
| bool JobTaskSource::IsCompleted() const { |
| CheckedAutoLock auto_lock(worker_lock_); |
| auto state = state_.Load(); |
| return GetMaxConcurrency(state.worker_count()) == 0 && |
| state.worker_count() == 0; |
| } |
| |
| size_t JobTaskSource::GetWorkerCount() const { |
| return TS_UNCHECKED_READ(state_).Load().worker_count(); |
| } |
| |
| void JobTaskSource::NotifyConcurrencyIncrease() { |
| // Avoid unnecessary locks when NotifyConcurrencyIncrease() is spuriously |
| // called. |
| if (GetRemainingConcurrency() == 0) |
| return; |
| |
| { |
| // Lock is taken to access |join_flag_| below and signal |
| // |worker_released_condition_|. |
| CheckedAutoLock auto_lock(worker_lock_); |
| if (join_flag_.ShouldWorkerSignal()) |
| worker_released_condition_->Signal(); |
| } |
| |
| // Make sure the task source is in the queue if not already. |
| // Caveat: it's possible but unlikely that the task source has already reached |
| // its intended concurrency and doesn't need to be enqueued if there |
| // previously were too many worker. For simplicity, the task source is always |
| // enqueued and will get discarded if already saturated when it is popped from |
| // the priority queue. |
| delegate_->EnqueueJobTaskSource(this); |
| } |
| |
| size_t JobTaskSource::GetMaxConcurrency() const { |
| return GetMaxConcurrency(TS_UNCHECKED_READ(state_).Load().worker_count()); |
| } |
| |
| size_t JobTaskSource::GetMaxConcurrency(size_t worker_count) const { |
| return std::min(max_concurrency_callback_.Run(worker_count), |
| kMaxWorkersPerJob); |
| } |
| |
| uint8_t JobTaskSource::AcquireTaskId() { |
| static_assert(kMaxWorkersPerJob <= sizeof(assigned_task_ids_) * 8, |
| "TaskId bitfield isn't big enough to fit kMaxWorkersPerJob."); |
| uint32_t assigned_task_ids = |
| assigned_task_ids_.load(std::memory_order_relaxed); |
| uint32_t new_assigned_task_ids = 0; |
| uint8_t task_id = 0; |
| // memory_order_acquire on success, matched with memory_order_release in |
| // ReleaseTaskId() so that operations done by previous threads that had |
| // the same task_id become visible to the current thread. |
| do { |
| // Count trailing one bits. This is the id of the right-most 0-bit in |
| // |assigned_task_ids|. |
| task_id = bits::CountTrailingZeroBits(~assigned_task_ids); |
| new_assigned_task_ids = assigned_task_ids | (uint32_t(1) << task_id); |
| } while (!assigned_task_ids_.compare_exchange_weak( |
| assigned_task_ids, new_assigned_task_ids, std::memory_order_acquire, |
| std::memory_order_relaxed)); |
| return task_id; |
| } |
| |
| void JobTaskSource::ReleaseTaskId(uint8_t task_id) { |
| // memory_order_release to match AcquireTaskId(). |
| uint32_t previous_task_ids = assigned_task_ids_.fetch_and( |
| ~(uint32_t(1) << task_id), std::memory_order_release); |
| DCHECK(previous_task_ids & (uint32_t(1) << task_id)); |
| } |
| |
| bool JobTaskSource::ShouldYield() { |
| // It is safe to read |join_flag_| and |state_| without a lock since these |
| // variables are atomic, keeping in mind that threads may not immediately see |
| // the new value when it is updated. |
| return TS_UNCHECKED_READ(join_flag_).ShouldWorkerYield() || |
| TS_UNCHECKED_READ(state_).Load().is_canceled(); |
| } |
| |
| Task JobTaskSource::TakeTask(TaskSource::Transaction* transaction) { |
| // JobTaskSource members are not lock-protected so no need to acquire a lock |
| // if |transaction| is nullptr. |
| DCHECK_GT(TS_UNCHECKED_READ(state_).Load().worker_count(), 0U); |
| DCHECK(primary_task_); |
| return Task(from_here_, primary_task_, TimeDelta()); |
| } |
| |
| bool JobTaskSource::DidProcessTask(TaskSource::Transaction* /*transaction*/) { |
| // Lock is needed to access |join_flag_| below and signal |
| // |worker_released_condition_|. |
| CheckedAutoLock auto_lock(worker_lock_); |
| const auto state_before_sub = state_.DecrementWorkerCount(); |
| |
| if (join_flag_.ShouldWorkerSignal()) |
| worker_released_condition_->Signal(); |
| |
| // A canceled task source should never get re-enqueued. |
| if (state_before_sub.is_canceled()) |
| return false; |
| |
| DCHECK_GT(state_before_sub.worker_count(), 0U); |
| |
| // Re-enqueue the TaskSource if the task ran and the worker count is below the |
| // max concurrency. |
| // |worker_count - 1| to exclude the returning thread. |
| return state_before_sub.worker_count() <= |
| GetMaxConcurrency(state_before_sub.worker_count() - 1); |
| } |
| |
| TaskSourceSortKey JobTaskSource::GetSortKey() const { |
| return TaskSourceSortKey(traits_.priority(), queue_time_); |
| } |
| |
| Task JobTaskSource::Clear(TaskSource::Transaction* transaction) { |
| Cancel(); |
| // Nothing is cleared since other workers might still racily run tasks. For |
| // simplicity, the destructor will take care of it once all references are |
| // released. |
| return Task(from_here_, DoNothing(), TimeDelta()); |
| } |
| |
| } // namespace internal |
| } // namespace base |