TaskScheduler: Make SchedulerWorkerThread own its delegate.
This will allow us to have one delegate per SchedulerWorkerThread and
to maintain state in that delegate.
BUG=553459
Review URL: https://codereview.chromium.org/1903103007
Cr-Commit-Position: refs/heads/master@{#389663}
diff --git a/base/task_scheduler/scheduler_thread_pool_impl.cc b/base/task_scheduler/scheduler_thread_pool_impl.cc
index e72ea6e1..fe433f28 100644
--- a/base/task_scheduler/scheduler_thread_pool_impl.cc
+++ b/base/task_scheduler/scheduler_thread_pool_impl.cc
@@ -134,18 +134,18 @@
DCHECK(join_for_testing_returned_.IsSignaled() || worker_threads_.empty());
}
-std::unique_ptr<SchedulerThreadPoolImpl>
-SchedulerThreadPoolImpl::CreateThreadPool(
+std::unique_ptr<SchedulerThreadPoolImpl> SchedulerThreadPoolImpl::Create(
ThreadPriority thread_priority,
size_t max_threads,
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
TaskTracker* task_tracker,
DelayedTaskManager* delayed_task_manager) {
std::unique_ptr<SchedulerThreadPoolImpl> thread_pool(
- new SchedulerThreadPoolImpl(re_enqueue_sequence_callback, task_tracker,
- delayed_task_manager));
- if (thread_pool->Initialize(thread_priority, max_threads))
+ new SchedulerThreadPoolImpl(task_tracker, delayed_task_manager));
+ if (thread_pool->Initialize(thread_priority, max_threads,
+ re_enqueue_sequence_callback)) {
return thread_pool;
+ }
return nullptr;
}
@@ -299,32 +299,32 @@
}
SchedulerThreadPoolImpl::SchedulerThreadPoolImpl(
- const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
TaskTracker* task_tracker,
DelayedTaskManager* delayed_task_manager)
: idle_worker_threads_stack_lock_(shared_priority_queue_.container_lock()),
idle_worker_threads_stack_cv_for_testing_(
idle_worker_threads_stack_lock_.CreateConditionVariable()),
join_for_testing_returned_(true, false),
- worker_thread_delegate_(
- new SchedulerWorkerThreadDelegateImpl(this,
- re_enqueue_sequence_callback)),
task_tracker_(task_tracker),
delayed_task_manager_(delayed_task_manager) {
DCHECK(task_tracker_);
DCHECK(delayed_task_manager_);
}
-bool SchedulerThreadPoolImpl::Initialize(ThreadPriority thread_priority,
- size_t max_threads) {
+bool SchedulerThreadPoolImpl::Initialize(
+ ThreadPriority thread_priority,
+ size_t max_threads,
+ const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) {
AutoSchedulerLock auto_lock(idle_worker_threads_stack_lock_);
DCHECK(worker_threads_.empty());
for (size_t i = 0; i < max_threads; ++i) {
std::unique_ptr<SchedulerWorkerThread> worker_thread =
- SchedulerWorkerThread::CreateSchedulerWorkerThread(
- thread_priority, worker_thread_delegate_.get(), task_tracker_);
+ SchedulerWorkerThread::Create(
+ thread_priority, WrapUnique(new SchedulerWorkerThreadDelegateImpl(
+ this, re_enqueue_sequence_callback)),
+ task_tracker_);
if (!worker_thread)
break;
idle_worker_threads_stack_.Push(worker_thread.get());
diff --git a/base/task_scheduler/scheduler_thread_pool_impl.h b/base/task_scheduler/scheduler_thread_pool_impl.h
index 3ed91bd..b764f27 100644
--- a/base/task_scheduler/scheduler_thread_pool_impl.h
+++ b/base/task_scheduler/scheduler_thread_pool_impl.h
@@ -51,7 +51,7 @@
// handle shutdown behavior of Tasks. |delayed_task_manager| handles Tasks
// posted with a delay. Returns nullptr on failure to create a thread pool
// with at least one thread.
- static std::unique_ptr<SchedulerThreadPoolImpl> CreateThreadPool(
+ static std::unique_ptr<SchedulerThreadPoolImpl> Create(
ThreadPriority thread_priority,
size_t max_threads,
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
@@ -80,11 +80,13 @@
class SchedulerWorkerThreadDelegateImpl;
SchedulerThreadPoolImpl(
- const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
TaskTracker* task_tracker,
DelayedTaskManager* delayed_task_manager);
- bool Initialize(ThreadPriority thread_priority, size_t max_threads);
+ bool Initialize(
+ ThreadPriority thread_priority,
+ size_t max_threads,
+ const ReEnqueueSequenceCallback& re_enqueue_sequence_callback);
// Wakes up the last thread from this thread pool to go idle, if any.
void WakeUpOneThread();
@@ -115,9 +117,6 @@
// Signaled once JoinForTesting() has returned.
WaitableEvent join_for_testing_returned_;
- // Delegate for all worker threads in this pool.
- std::unique_ptr<SchedulerWorkerThread::Delegate> worker_thread_delegate_;
-
TaskTracker* const task_tracker_;
DelayedTaskManager* const delayed_task_manager_;
diff --git a/base/task_scheduler/scheduler_thread_pool_impl_unittest.cc b/base/task_scheduler/scheduler_thread_pool_impl_unittest.cc
index ae36e69a..691d0dd 100644
--- a/base/task_scheduler/scheduler_thread_pool_impl_unittest.cc
+++ b/base/task_scheduler/scheduler_thread_pool_impl_unittest.cc
@@ -57,7 +57,7 @@
TaskSchedulerThreadPoolImplTest() = default;
void SetUp() override {
- thread_pool_ = SchedulerThreadPoolImpl::CreateThreadPool(
+ thread_pool_ = SchedulerThreadPoolImpl::Create(
ThreadPriority::NORMAL, kNumThreadsInThreadPool,
Bind(&TaskSchedulerThreadPoolImplTest::ReEnqueueSequenceCallback,
Unretained(this)),
diff --git a/base/task_scheduler/scheduler_worker_thread.cc b/base/task_scheduler/scheduler_worker_thread.cc
index b571ebc..11c88af 100644
--- a/base/task_scheduler/scheduler_worker_thread.cc
+++ b/base/task_scheduler/scheduler_worker_thread.cc
@@ -14,13 +14,13 @@
namespace base {
namespace internal {
-std::unique_ptr<SchedulerWorkerThread>
-SchedulerWorkerThread::CreateSchedulerWorkerThread(
+std::unique_ptr<SchedulerWorkerThread> SchedulerWorkerThread::Create(
ThreadPriority thread_priority,
- Delegate* delegate,
+ std::unique_ptr<Delegate> delegate,
TaskTracker* task_tracker) {
std::unique_ptr<SchedulerWorkerThread> worker_thread(
- new SchedulerWorkerThread(thread_priority, delegate, task_tracker));
+ new SchedulerWorkerThread(thread_priority, std::move(delegate),
+ task_tracker));
if (worker_thread->thread_handle_.is_null())
return nullptr;
@@ -45,10 +45,10 @@
}
SchedulerWorkerThread::SchedulerWorkerThread(ThreadPriority thread_priority,
- Delegate* delegate,
+ std::unique_ptr<Delegate> delegate,
TaskTracker* task_tracker)
: wake_up_event_(false, false),
- delegate_(delegate),
+ delegate_(std::move(delegate)),
task_tracker_(task_tracker) {
DCHECK(delegate_);
DCHECK(task_tracker_);
diff --git a/base/task_scheduler/scheduler_worker_thread.h b/base/task_scheduler/scheduler_worker_thread.h
index 87941f8d..3022d100 100644
--- a/base/task_scheduler/scheduler_worker_thread.h
+++ b/base/task_scheduler/scheduler_worker_thread.h
@@ -53,9 +53,9 @@
// Tasks from Sequences returned by |delegate|. |task_tracker| is used to
// handle shutdown behavior of Tasks. Returns nullptr if creating the
// underlying platform thread fails.
- static std::unique_ptr<SchedulerWorkerThread> CreateSchedulerWorkerThread(
+ static std::unique_ptr<SchedulerWorkerThread> Create(
ThreadPriority thread_priority,
- Delegate* delegate,
+ std::unique_ptr<Delegate> delegate,
TaskTracker* task_tracker);
// Destroying a SchedulerWorkerThread in production is not allowed; it is
@@ -74,7 +74,7 @@
private:
SchedulerWorkerThread(ThreadPriority thread_priority,
- Delegate* delegate,
+ std::unique_ptr<Delegate> delegate,
TaskTracker* task_tracker);
// PlatformThread::Delegate:
@@ -88,7 +88,7 @@
// Event signaled to wake up this SchedulerWorkerThread.
WaitableEvent wake_up_event_;
- Delegate* const delegate_;
+ const std::unique_ptr<Delegate> delegate_;
TaskTracker* const task_tracker_;
// Synchronizes access to |should_exit_for_testing_|.
diff --git a/base/task_scheduler/scheduler_worker_thread_stack_unittest.cc b/base/task_scheduler/scheduler_worker_thread_stack_unittest.cc
index 9035428..b39243d0 100644
--- a/base/task_scheduler/scheduler_worker_thread_stack_unittest.cc
+++ b/base/task_scheduler/scheduler_worker_thread_stack_unittest.cc
@@ -5,6 +5,7 @@
#include "base/task_scheduler/scheduler_worker_thread_stack.h"
#include "base/logging.h"
+#include "base/memory/ptr_util.h"
#include "base/memory/ref_counted.h"
#include "base/task_scheduler/scheduler_worker_thread.h"
#include "base/task_scheduler/sequence.h"
@@ -34,14 +35,17 @@
class TaskSchedulerWorkerThreadStackTest : public testing::Test {
protected:
void SetUp() override {
- thread_a_ = SchedulerWorkerThread::CreateSchedulerWorkerThread(
- ThreadPriority::NORMAL, &delegate_, &task_tracker_);
+ thread_a_ = SchedulerWorkerThread::Create(
+ ThreadPriority::NORMAL,
+ WrapUnique(new MockSchedulerWorkerThreadDelegate), &task_tracker_);
ASSERT_TRUE(thread_a_);
- thread_b_ = SchedulerWorkerThread::CreateSchedulerWorkerThread(
- ThreadPriority::NORMAL, &delegate_, &task_tracker_);
+ thread_b_ = SchedulerWorkerThread::Create(
+ ThreadPriority::NORMAL,
+ WrapUnique(new MockSchedulerWorkerThreadDelegate), &task_tracker_);
ASSERT_TRUE(thread_b_);
- thread_c_ = SchedulerWorkerThread::CreateSchedulerWorkerThread(
- ThreadPriority::NORMAL, &delegate_, &task_tracker_);
+ thread_c_ = SchedulerWorkerThread::Create(
+ ThreadPriority::NORMAL,
+ WrapUnique(new MockSchedulerWorkerThreadDelegate), &task_tracker_);
ASSERT_TRUE(thread_c_);
}
@@ -56,7 +60,6 @@
std::unique_ptr<SchedulerWorkerThread> thread_c_;
private:
- MockSchedulerWorkerThreadDelegate delegate_;
TaskTracker task_tracker_;
};
diff --git a/base/task_scheduler/scheduler_worker_thread_unittest.cc b/base/task_scheduler/scheduler_worker_thread_unittest.cc
index dd414d0a..9e9e2886 100644
--- a/base/task_scheduler/scheduler_worker_thread_unittest.cc
+++ b/base/task_scheduler/scheduler_worker_thread_unittest.cc
@@ -27,16 +27,17 @@
const size_t kNumSequencesPerTest = 150;
// The test parameter is the number of Tasks per Sequence returned by GetWork().
-class TaskSchedulerWorkerThreadTest : public testing::TestWithParam<size_t>,
- public SchedulerWorkerThread::Delegate {
+class TaskSchedulerWorkerThreadTest : public testing::TestWithParam<size_t> {
protected:
TaskSchedulerWorkerThreadTest()
: main_entry_called_(true, false),
num_get_work_cv_(lock_.CreateConditionVariable()) {}
void SetUp() override {
- worker_thread_ = SchedulerWorkerThread::CreateSchedulerWorkerThread(
- ThreadPriority::NORMAL, this, &task_tracker_);
+ worker_thread_ = SchedulerWorkerThread::Create(
+ ThreadPriority::NORMAL,
+ WrapUnique(new TestSchedulerWorkerThreadDelegate(this)),
+ &task_tracker_);
ASSERT_TRUE(worker_thread_);
main_entry_called_.Wait();
}
@@ -83,74 +84,85 @@
std::unique_ptr<SchedulerWorkerThread> worker_thread_;
private:
- // SchedulerWorkerThread::Delegate:
- void OnMainEntry() override {
- // Without this |auto_lock|, OnMainEntry() could be called twice without
- // generating an error.
- AutoSchedulerLock auto_lock(lock_);
- EXPECT_FALSE(main_entry_called_.IsSignaled());
- main_entry_called_.Signal();
- }
+ class TestSchedulerWorkerThreadDelegate
+ : public SchedulerWorkerThread::Delegate {
+ public:
+ TestSchedulerWorkerThreadDelegate(TaskSchedulerWorkerThreadTest* outer)
+ : outer_(outer) {}
- scoped_refptr<Sequence> GetWork(
- SchedulerWorkerThread* worker_thread) override {
- EXPECT_EQ(worker_thread_.get(), worker_thread);
-
- {
- AutoSchedulerLock auto_lock(lock_);
-
- // Increment the number of times that this method has been called.
- ++num_get_work_;
- num_get_work_cv_->Signal();
-
- // Verify that this method isn't called more times than expected.
- EXPECT_LE(num_get_work_, max_get_work_);
-
- // Check if a Sequence should be returned.
- if (num_sequences_to_create_ == 0)
- return nullptr;
- --num_sequences_to_create_;
+ // SchedulerWorkerThread::Delegate:
+ void OnMainEntry() override {
+ // Without synchronization, OnMainEntry() could be called twice without
+ // generating an error.
+ AutoSchedulerLock auto_lock(outer_->lock_);
+ EXPECT_FALSE(outer_->main_entry_called_.IsSignaled());
+ outer_->main_entry_called_.Signal();
}
- // Create a Sequence with TasksPerSequence() Tasks.
- scoped_refptr<Sequence> sequence(new Sequence);
- for (size_t i = 0; i < TasksPerSequence(); ++i) {
- std::unique_ptr<Task> task(new Task(
- FROM_HERE, Bind(&TaskSchedulerWorkerThreadTest::RunTaskCallback,
- Unretained(this)),
- TaskTraits(), TimeTicks()));
- EXPECT_TRUE(task_tracker_.WillPostTask(task.get()));
- sequence->PushTask(std::move(task));
+ scoped_refptr<Sequence> GetWork(
+ SchedulerWorkerThread* worker_thread) override {
+ EXPECT_EQ(outer_->worker_thread_.get(), worker_thread);
+
+ {
+ AutoSchedulerLock auto_lock(outer_->lock_);
+
+ // Increment the number of times that this method has been called.
+ ++outer_->num_get_work_;
+ outer_->num_get_work_cv_->Signal();
+
+ // Verify that this method isn't called more times than expected.
+ EXPECT_LE(outer_->num_get_work_, outer_->max_get_work_);
+
+ // Check if a Sequence should be returned.
+ if (outer_->num_sequences_to_create_ == 0)
+ return nullptr;
+ --outer_->num_sequences_to_create_;
+ }
+
+ // Create a Sequence with TasksPerSequence() Tasks.
+ scoped_refptr<Sequence> sequence(new Sequence);
+ for (size_t i = 0; i < outer_->TasksPerSequence(); ++i) {
+ std::unique_ptr<Task> task(new Task(
+ FROM_HERE, Bind(&TaskSchedulerWorkerThreadTest::RunTaskCallback,
+ Unretained(outer_)),
+ TaskTraits(), TimeTicks()));
+ EXPECT_TRUE(outer_->task_tracker_.WillPostTask(task.get()));
+ sequence->PushTask(std::move(task));
+ }
+
+ {
+ // Add the Sequence to the vector of created Sequences.
+ AutoSchedulerLock auto_lock(outer_->lock_);
+ outer_->created_sequences_.push_back(sequence);
+ }
+
+ return sequence;
}
- {
- // Add the Sequence to the vector of created Sequences.
- AutoSchedulerLock auto_lock(lock_);
- created_sequences_.push_back(sequence);
+ // This override verifies that |sequence| contains the expected number of
+ // Tasks and adds it to |enqueued_sequences_|. Unlike a normal
+ // EnqueueSequence implementation, it doesn't reinsert |sequence| into a
+ // queue for further execution.
+ void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override {
+ EXPECT_GT(outer_->TasksPerSequence(), 1U);
+
+ // Verify that |sequence| contains TasksPerSequence() - 1 Tasks.
+ for (size_t i = 0; i < outer_->TasksPerSequence() - 1; ++i) {
+ EXPECT_TRUE(sequence->PeekTask());
+ sequence->PopTask();
+ }
+ EXPECT_FALSE(sequence->PeekTask());
+
+ // Add |sequence| to |re_enqueued_sequences_|.
+ AutoSchedulerLock auto_lock(outer_->lock_);
+ outer_->re_enqueued_sequences_.push_back(std::move(sequence));
+ EXPECT_LE(outer_->re_enqueued_sequences_.size(),
+ outer_->created_sequences_.size());
}
- return sequence;
- }
-
- // This override verifies that |sequence| contains the expected number of
- // Tasks and adds it to |re_enqueued_sequences_|. Unlike a normal
- // ReEnqueueSequence implementation, it doesn't reinsert |sequence| into a
- // queue for further execution.
- void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override {
- EXPECT_GT(TasksPerSequence(), 1U);
-
- // Verify that |sequence| contains TasksPerSequence() - 1 Tasks.
- for (size_t i = 0; i < TasksPerSequence() - 1; ++i) {
- EXPECT_TRUE(sequence->PeekTask());
- sequence->PopTask();
- }
- EXPECT_FALSE(sequence->PeekTask());
-
- // Add |sequence| to |re_enqueued_sequences_|.
- AutoSchedulerLock auto_lock(lock_);
- re_enqueued_sequences_.push_back(std::move(sequence));
- EXPECT_LE(re_enqueued_sequences_.size(), created_sequences_.size());
- }
+ private:
+ TaskSchedulerWorkerThreadTest* outer_;
+ };
void RunTaskCallback() {
AutoSchedulerLock auto_lock(lock_);