blob: cb58d9b4f3a586dc31fdfa6dffe8802bdd0953f9 [file] [log] [blame]
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/task/sequence_manager/task_queue_selector.h"
#include <utility>
#include "base/logging.h"
#include "base/task/sequence_manager/associated_thread_id.h"
#include "base/task/sequence_manager/task_queue_impl.h"
#include "base/task/sequence_manager/work_queue.h"
#include "base/threading/thread_checker.h"
#include "base/trace_event/traced_value.h"
namespace base {
namespace sequence_manager {
namespace internal {
constexpr const int64_t TaskQueueSelector::per_priority_starvation_tolerance_[];
TaskQueueSelector::TaskQueueSelector(
scoped_refptr<AssociatedThreadId> associated_thread,
const SequenceManager::Settings& settings)
: associated_thread_(std::move(associated_thread)),
#if DCHECK_IS_ON()
random_task_selection_(settings.random_task_selection_seed != 0),
#endif
anti_starvation_logic_for_priorities_disabled_(
settings.anti_starvation_logic_for_priorities_disabled),
delayed_work_queue_sets_("delayed", this, settings),
immediate_work_queue_sets_("immediate", this, settings) {
}
TaskQueueSelector::~TaskQueueSelector() = default;
void TaskQueueSelector::AddQueue(internal::TaskQueueImpl* queue) {
DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
DCHECK(queue->IsQueueEnabled());
AddQueueImpl(queue, TaskQueue::kNormalPriority);
}
void TaskQueueSelector::RemoveQueue(internal::TaskQueueImpl* queue) {
DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
if (queue->IsQueueEnabled()) {
RemoveQueueImpl(queue);
}
}
void TaskQueueSelector::EnableQueue(internal::TaskQueueImpl* queue) {
DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
DCHECK(queue->IsQueueEnabled());
AddQueueImpl(queue, queue->GetQueuePriority());
if (task_queue_selector_observer_)
task_queue_selector_observer_->OnTaskQueueEnabled(queue);
}
void TaskQueueSelector::DisableQueue(internal::TaskQueueImpl* queue) {
DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
DCHECK(!queue->IsQueueEnabled());
RemoveQueueImpl(queue);
}
void TaskQueueSelector::SetQueuePriority(internal::TaskQueueImpl* queue,
TaskQueue::QueuePriority priority) {
DCHECK_LT(priority, TaskQueue::kQueuePriorityCount);
DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
if (queue->IsQueueEnabled()) {
ChangeSetIndex(queue, priority);
} else {
// Disabled queue is not in any set so we can't use ChangeSetIndex here
// and have to assign priority for the queue itself.
queue->delayed_work_queue()->AssignSetIndex(priority);
queue->immediate_work_queue()->AssignSetIndex(priority);
}
DCHECK_EQ(priority, queue->GetQueuePriority());
}
TaskQueue::QueuePriority TaskQueueSelector::NextPriority(
TaskQueue::QueuePriority priority) {
DCHECK(priority < TaskQueue::kQueuePriorityCount);
return static_cast<TaskQueue::QueuePriority>(static_cast<int>(priority) + 1);
}
void TaskQueueSelector::AddQueueImpl(internal::TaskQueueImpl* queue,
TaskQueue::QueuePriority priority) {
#if DCHECK_IS_ON()
DCHECK(!CheckContainsQueueForTest(queue));
#endif
delayed_work_queue_sets_.AddQueue(queue->delayed_work_queue(), priority);
immediate_work_queue_sets_.AddQueue(queue->immediate_work_queue(), priority);
#if DCHECK_IS_ON()
DCHECK(CheckContainsQueueForTest(queue));
#endif
}
void TaskQueueSelector::ChangeSetIndex(internal::TaskQueueImpl* queue,
TaskQueue::QueuePriority priority) {
#if DCHECK_IS_ON()
DCHECK(CheckContainsQueueForTest(queue));
#endif
delayed_work_queue_sets_.ChangeSetIndex(queue->delayed_work_queue(),
priority);
immediate_work_queue_sets_.ChangeSetIndex(queue->immediate_work_queue(),
priority);
#if DCHECK_IS_ON()
DCHECK(CheckContainsQueueForTest(queue));
#endif
}
void TaskQueueSelector::RemoveQueueImpl(internal::TaskQueueImpl* queue) {
#if DCHECK_IS_ON()
DCHECK(CheckContainsQueueForTest(queue));
#endif
delayed_work_queue_sets_.RemoveQueue(queue->delayed_work_queue());
immediate_work_queue_sets_.RemoveQueue(queue->immediate_work_queue());
#if DCHECK_IS_ON()
DCHECK(!CheckContainsQueueForTest(queue));
#endif
}
int64_t TaskQueueSelector::GetSortKeyForPriority(
TaskQueue::QueuePriority priority) const {
switch (priority) {
case TaskQueue::kControlPriority:
return std::numeric_limits<int64_t>::min();
case TaskQueue::kBestEffortPriority:
return std::numeric_limits<int64_t>::max();
default:
if (anti_starvation_logic_for_priorities_disabled_)
return per_priority_starvation_tolerance_[priority];
return selection_count_ + per_priority_starvation_tolerance_[priority];
}
}
void TaskQueueSelector::WorkQueueSetBecameEmpty(size_t set_index) {
non_empty_set_counts_[set_index]--;
DCHECK_GE(non_empty_set_counts_[set_index], 0);
// There are no delayed or immediate tasks for |set_index| so remove from
// |active_priorities_|.
if (non_empty_set_counts_[set_index] == 0)
active_priorities_.erase(static_cast<TaskQueue::QueuePriority>(set_index));
}
void TaskQueueSelector::WorkQueueSetBecameNonEmpty(size_t set_index) {
non_empty_set_counts_[set_index]++;
DCHECK_LE(non_empty_set_counts_[set_index], kMaxNonEmptySetCount);
// There is now a delayed or an immediate task for |set_index|, so add to
// |active_priorities_|.
if (non_empty_set_counts_[set_index] == 1) {
TaskQueue::QueuePriority priority =
static_cast<TaskQueue::QueuePriority>(set_index);
active_priorities_.insert(GetSortKeyForPriority(priority), priority);
}
}
void TaskQueueSelector::CollectSkippedOverLowerPriorityTasks(
const internal::WorkQueue* selected_work_queue,
std::vector<const Task*>* result) const {
delayed_work_queue_sets_.CollectSkippedOverLowerPriorityTasks(
selected_work_queue, result);
immediate_work_queue_sets_.CollectSkippedOverLowerPriorityTasks(
selected_work_queue, result);
}
#if DCHECK_IS_ON() || !defined(NDEBUG)
bool TaskQueueSelector::CheckContainsQueueForTest(
const internal::TaskQueueImpl* queue) const {
bool contains_delayed_work_queue =
delayed_work_queue_sets_.ContainsWorkQueueForTest(
queue->delayed_work_queue());
bool contains_immediate_work_queue =
immediate_work_queue_sets_.ContainsWorkQueueForTest(
queue->immediate_work_queue());
DCHECK_EQ(contains_delayed_work_queue, contains_immediate_work_queue);
return contains_delayed_work_queue;
}
#endif
WorkQueue* TaskQueueSelector::SelectWorkQueueToService() {
DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
if (active_priorities_.empty())
return nullptr;
// Select the priority from which we will select a task. Usually this is
// the highest priority for which we have work, unless we are starving a lower
// priority.
TaskQueue::QueuePriority priority = active_priorities_.min_id();
bool chose_delayed_over_immediate;
// Control tasks are allowed to indefinitely stave out other work and any
// control tasks we run should not be counted for task starvation purposes.
if (priority != TaskQueue::kControlPriority)
selection_count_++;
WorkQueue* queue =
#if DCHECK_IS_ON()
random_task_selection_ ? ChooseWithPriority<SetOperationRandom>(
priority, &chose_delayed_over_immediate)
:
#endif
ChooseWithPriority<SetOperationOldest>(
priority, &chose_delayed_over_immediate);
// If we still have any tasks remaining for |set_index| then adjust it's
// sort key.
if (active_priorities_.IsInQueue(priority))
active_priorities_.ChangeMinKey(GetSortKeyForPriority(priority));
if (chose_delayed_over_immediate) {
immediate_starvation_count_++;
} else {
immediate_starvation_count_ = 0;
}
return queue;
}
void TaskQueueSelector::AsValueInto(trace_event::TracedValue* state) const {
DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
state->SetInteger("immediate_starvation_count", immediate_starvation_count_);
}
void TaskQueueSelector::SetTaskQueueSelectorObserver(Observer* observer) {
task_queue_selector_observer_ = observer;
}
Optional<TaskQueue::QueuePriority>
TaskQueueSelector::GetHighestPendingPriority() const {
DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
if (active_priorities_.empty())
return nullopt;
return active_priorities_.min_id();
}
void TaskQueueSelector::SetImmediateStarvationCountForTest(
size_t immediate_starvation_count) {
immediate_starvation_count_ = immediate_starvation_count;
}
bool TaskQueueSelector::HasTasksWithPriority(
TaskQueue::QueuePriority priority) {
return !delayed_work_queue_sets_.IsSetEmpty(priority) ||
!immediate_work_queue_sets_.IsSetEmpty(priority);
}
TaskQueueSelector::SmallPriorityQueue::SmallPriorityQueue() {
for (size_t i = 0; i < TaskQueue::kQueuePriorityCount; i++) {
id_to_index_[i] = kInvalidIndex;
}
}
void TaskQueueSelector::SmallPriorityQueue::insert(
int64_t key,
TaskQueue::QueuePriority id) {
DCHECK_LE(size_, TaskQueue::kQueuePriorityCount);
DCHECK_LT(id, TaskQueue::kQueuePriorityCount);
DCHECK(!IsInQueue(id));
// Insert while keeping |keys_| sorted.
size_t i = size_;
while (i > 0 && key < keys_[i - 1]) {
keys_[i] = keys_[i - 1];
TaskQueue::QueuePriority moved_id = index_to_id_[i - 1];
index_to_id_[i] = moved_id;
id_to_index_[moved_id] = i;
i--;
}
keys_[i] = key;
index_to_id_[i] = id;
id_to_index_[id] = i;
size_++;
}
void TaskQueueSelector::SmallPriorityQueue::erase(TaskQueue::QueuePriority id) {
DCHECK_NE(size_, 0u);
DCHECK_LT(id, TaskQueue::kQueuePriorityCount);
DCHECK(IsInQueue(id));
// Erase while keeping |keys_| sorted.
size_--;
for (size_t i = id_to_index_[id]; i < size_; i++) {
keys_[i] = keys_[i + 1];
TaskQueue::QueuePriority moved_id = index_to_id_[i + 1];
index_to_id_[i] = moved_id;
id_to_index_[moved_id] = i;
}
id_to_index_[id] = kInvalidIndex;
}
void TaskQueueSelector::SmallPriorityQueue::ChangeMinKey(int64_t new_key) {
DCHECK_NE(size_, 0u);
TaskQueue::QueuePriority id = index_to_id_[0];
size_t i = 0;
while ((i + 1) < size_ && keys_[i + 1] < new_key) {
keys_[i] = keys_[i + 1];
TaskQueue::QueuePriority moved_id = index_to_id_[i + 1];
index_to_id_[i] = moved_id;
id_to_index_[moved_id] = i;
i++;
}
keys_[i] = new_key;
index_to_id_[i] = id;
id_to_index_[id] = i;
}
} // namespace internal
} // namespace sequence_manager
} // namespace base