blob: 3550b190fed836e830b02aee16dfa028c8d69d5d [file]
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef BASE_TASK_THREAD_POOL_THREAD_GROUP_H_
#define BASE_TASK_THREAD_POOL_THREAD_GROUP_H_
#include "base/base_export.h"
#include "base/memory/ref_counted.h"
#include "base/task/common/checked_lock.h"
#include "base/task/thread_pool/priority_queue.h"
#include "base/task/thread_pool/task.h"
#include "base/task/thread_pool/task_source.h"
#include "base/task/thread_pool/tracked_ref.h"
#include "build/build_config.h"
namespace base {
namespace internal {
class TaskTracker;
// Interface and base implementation for a thread group. A thread group is a
// subset of the threads in the thread pool (see GetThreadGroupForTraits() for
// thread group selection logic when posting tasks and creating task runners).
class BASE_EXPORT ThreadGroup {
public:
// Delegate interface for ThreadGroup.
class BASE_EXPORT Delegate {
public:
virtual ~Delegate() = default;
// Invoked when the TaskSource in |task_source_and_transaction| is non-empty
// after the ThreadGroup has run a task from it. The implementation must
// return the thread group in which the TaskSource should be reenqueued.
virtual ThreadGroup* GetThreadGroupForTraits(const TaskTraits& traits) = 0;
};
enum class WorkerEnvironment {
// No special worker environment required.
NONE,
#if defined(OS_WIN)
// Initialize a COM MTA on the worker.
COM_MTA,
#endif // defined(OS_WIN)
};
virtual ~ThreadGroup();
// Registers the thread group in TLS.
void BindToCurrentThread();
// Resets the thread group in TLS.
void UnbindFromCurrentThread();
// Returns true if the thread group is registered in TLS.
bool IsBoundToCurrentThread() const;
// Removes |task_source| from |priority_queue_|. Returns true if successful,
// or false if |task_source| is not currently in |priority_queue_|, such as
// when a worker is running a task from it.
bool RemoveTaskSource(scoped_refptr<TaskSource> task_source);
// Updates the position of the TaskSource in |task_source_and_transaction| in
// this ThreadGroup's PriorityQueue based on the TaskSource's current traits.
//
// Implementations should instantiate a concrete ScopedWorkersExecutor and
// invoke UpdateSortKeyImpl().
virtual void UpdateSortKey(
TaskSourceAndTransaction task_source_and_transaction) = 0;
// Pushes the TaskSource in |task_source_and_transaction| into this
// ThreadGroup's PriorityQueue and wakes up workers as appropriate.
//
// Implementations should instantiate a concrete ScopedWorkersExecutor and
// invoke PushTaskSourceAndWakeUpWorkersImpl().
virtual void PushTaskSourceAndWakeUpWorkers(
TaskSourceAndTransaction task_source_and_transaction) = 0;
// Removes all task sources from this ThreadGroup's PriorityQueue and enqueues
// them in another |destination_thread_group|. After this method is called,
// any task sources posted to this ThreadGroup will be forwarded to
// |destination_thread_group|.
//
// TODO(crbug.com/756547): Remove this method once the UseNativeThreadPool
// experiment is complete.
void InvalidateAndHandoffAllTaskSourcesToOtherThreadGroup(
ThreadGroup* destination_thread_group);
// Prevents new tasks from starting to run and waits for currently running
// tasks to complete their execution. It is guaranteed that no thread will do
// work on behalf of this ThreadGroup after this returns. It is
// invalid to post a task once this is called. TaskTracker::Flush() can be
// called before this to complete existing tasks, which might otherwise post a
// task during JoinForTesting(). This can only be called once.
virtual void JoinForTesting() = 0;
// Returns the maximum number of non-blocked tasks that can run concurrently
// in this ThreadGroup.
//
// TODO(fdoray): Remove this method. https://crbug.com/687264
virtual size_t GetMaxConcurrentNonBlockedTasksDeprecated() const = 0;
// Reports relevant metrics per implementation.
virtual void ReportHeartbeatMetrics() const = 0;
// Wakes up workers as appropriate for the new CanRunPolicy policy. Must be
// called after an update to CanRunPolicy in TaskTracker.
virtual void DidUpdateCanRunPolicy() = 0;
protected:
// Derived classes must implement a ScopedWorkersExecutor that derives from
// this to perform operations on workers at the end of a scope, when all locks
// have been released.
class BaseScopedWorkersExecutor {
protected:
BaseScopedWorkersExecutor() = default;
~BaseScopedWorkersExecutor() = default;
DISALLOW_COPY_AND_ASSIGN(BaseScopedWorkersExecutor);
};
// Allows a task source to be pushed to a ThreadGroup's PriorityQueue at the
// end of a scope, when all locks have been released.
class ScopedReenqueueExecutor {
public:
ScopedReenqueueExecutor();
~ScopedReenqueueExecutor();
// A TaskSourceAndTransaction and the ThreadGroup in which it should be
// enqueued.
void SchedulePushTaskSourceAndWakeUpWorkers(
TaskSourceAndTransaction task_source_and_transaction,
ThreadGroup* destination_thread_group);
private:
// A TaskSourceAndTransaction and the thread group in which it should be
// enqueued.
Optional<TaskSourceAndTransaction> task_source_and_transaction_;
ThreadGroup* destination_thread_group_ = nullptr;
DISALLOW_COPY_AND_ASSIGN(ScopedReenqueueExecutor);
};
// |predecessor_thread_group| is a ThreadGroup whose lock can be acquired
// before the constructed ThreadGroup's lock. This is necessary to move all
// task sources from |predecessor_thread_group| to the constructed ThreadGroup
// and support the UseNativeThreadPool experiment.
//
// TODO(crbug.com/756547): Remove |predecessor_thread_group| once the
// experiment is complete.
ThreadGroup(TrackedRef<TaskTracker> task_tracker,
TrackedRef<Delegate> delegate,
ThreadGroup* predecessor_thread_group = nullptr);
const TrackedRef<TaskTracker> task_tracker_;
const TrackedRef<Delegate> delegate_;
// Returns the number of queued BEST_EFFORT task sources allowed to run by the
// current CanRunPolicy.
size_t GetNumQueuedCanRunBestEffortTaskSources() const
EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Returns the number of queued USER_VISIBLE/USER_BLOCKING task sources
// allowed to run by the current CanRunPolicy.
size_t GetNumQueuedCanRunForegroundTaskSources() const
EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Ensures that there are enough workers to run queued task sources.
// |executor| is forwarded from the one received in
// PushTaskSourceAndWakeUpWorkersImpl()
virtual void EnsureEnoughWorkersLockRequired(
BaseScopedWorkersExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_) = 0;
// Reenqueues a |task_source_and_transaction| from which a Task just ran in
// the current ThreadGroup into the appropriate ThreadGroup.
void ReEnqueueTaskSourceLockRequired(
BaseScopedWorkersExecutor* workers_executor,
ScopedReenqueueExecutor* reenqueue_executor,
TaskSourceAndTransaction task_source_and_transaction)
EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Must be invoked by implementations of the corresponding non-Impl() methods.
void UpdateSortKeyImpl(BaseScopedWorkersExecutor* executor,
TaskSourceAndTransaction task_source_and_transaction);
void PushTaskSourceAndWakeUpWorkersImpl(
BaseScopedWorkersExecutor* executor,
TaskSourceAndTransaction task_source_and_transaction);
// Synchronizes accesses to all members of this class which are neither const,
// atomic, nor immutable after start. Since this lock is a bottleneck to post
// and schedule work, only simple data structure manipulations are allowed
// within its scope (no thread creation or wake up).
mutable CheckedLock lock_;
// PriorityQueue from which all threads of this ThreadGroup get work.
PriorityQueue priority_queue_ GUARDED_BY(lock_);
// If |replacement_thread_group_| is non-null, this ThreadGroup is invalid and
// all task sources should be scheduled on |replacement_thread_group_|. Used
// to support the UseNativeThreadPool experiment.
ThreadGroup* replacement_thread_group_ = nullptr;
private:
DISALLOW_COPY_AND_ASSIGN(ThreadGroup);
};
} // namespace internal
} // namespace base
#endif // BASE_TASK_THREAD_POOL_THREAD_GROUP_H_