blob: 437bbedfdb5472dd74a6e45def138c4667dee6b9 [file] [log] [blame]
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/task_scheduler/scheduler_worker.h"
#include <stddef.h>
#include <memory>
#include <vector>
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/macros.h"
#include "base/memory/ptr_util.h"
#include "base/synchronization/condition_variable.h"
#include "base/task_scheduler/scheduler_lock.h"
#include "base/task_scheduler/sequence.h"
#include "base/task_scheduler/task.h"
#include "base/task_scheduler/task_tracker.h"
#include "base/time/time.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace base {
namespace internal {
namespace {
const size_t kNumSequencesPerTest = 150;
// The test parameter is the number of Tasks per Sequence returned by GetWork().
class TaskSchedulerWorkerTest : public testing::TestWithParam<size_t> {
protected:
TaskSchedulerWorkerTest()
: main_entry_called_(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED),
num_get_work_cv_(lock_.CreateConditionVariable()),
worker_set_(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED) {}
void SetUp() override {
worker_ = SchedulerWorker::Create(
ThreadPriority::NORMAL,
WrapUnique(new TestSchedulerWorkerDelegate(this)),
&task_tracker_,
SchedulerWorker::InitialState::ALIVE);
ASSERT_TRUE(worker_);
worker_set_.Signal();
main_entry_called_.Wait();
}
void TearDown() override {
worker_->JoinForTesting();
}
size_t TasksPerSequence() const { return GetParam(); }
// Wait until GetWork() has been called |num_get_work| times.
void WaitForNumGetWork(size_t num_get_work) {
AutoSchedulerLock auto_lock(lock_);
while (num_get_work_ < num_get_work)
num_get_work_cv_->Wait();
}
void SetMaxGetWork(size_t max_get_work) {
AutoSchedulerLock auto_lock(lock_);
max_get_work_ = max_get_work;
}
void SetNumSequencesToCreate(size_t num_sequences_to_create) {
AutoSchedulerLock auto_lock(lock_);
EXPECT_EQ(0U, num_sequences_to_create_);
num_sequences_to_create_ = num_sequences_to_create;
}
size_t NumRunTasks() {
AutoSchedulerLock auto_lock(lock_);
return num_run_tasks_;
}
std::vector<scoped_refptr<Sequence>> CreatedSequences() {
AutoSchedulerLock auto_lock(lock_);
return created_sequences_;
}
std::vector<scoped_refptr<Sequence>> EnqueuedSequences() {
AutoSchedulerLock auto_lock(lock_);
return re_enqueued_sequences_;
}
std::unique_ptr<SchedulerWorker> worker_;
private:
class TestSchedulerWorkerDelegate
: public SchedulerWorker::Delegate {
public:
TestSchedulerWorkerDelegate(TaskSchedulerWorkerTest* outer)
: outer_(outer) {}
// SchedulerWorker::Delegate:
void OnMainEntry(SchedulerWorker* worker) override {
outer_->worker_set_.Wait();
EXPECT_EQ(outer_->worker_.get(), worker);
// Without synchronization, OnMainEntry() could be called twice without
// generating an error.
AutoSchedulerLock auto_lock(outer_->lock_);
EXPECT_FALSE(outer_->main_entry_called_.IsSignaled());
outer_->main_entry_called_.Signal();
}
scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override {
EXPECT_EQ(outer_->worker_.get(), worker);
{
AutoSchedulerLock auto_lock(outer_->lock_);
// Increment the number of times that this method has been called.
++outer_->num_get_work_;
outer_->num_get_work_cv_->Signal();
// Verify that this method isn't called more times than expected.
EXPECT_LE(outer_->num_get_work_, outer_->max_get_work_);
// Check if a Sequence should be returned.
if (outer_->num_sequences_to_create_ == 0)
return nullptr;
--outer_->num_sequences_to_create_;
}
// Create a Sequence with TasksPerSequence() Tasks.
scoped_refptr<Sequence> sequence(new Sequence);
for (size_t i = 0; i < outer_->TasksPerSequence(); ++i) {
std::unique_ptr<Task> task(new Task(
FROM_HERE, Bind(&TaskSchedulerWorkerTest::RunTaskCallback,
Unretained(outer_)),
TaskTraits(), TimeDelta()));
EXPECT_TRUE(outer_->task_tracker_.WillPostTask(task.get()));
sequence->PushTask(std::move(task));
}
{
// Add the Sequence to the vector of created Sequences.
AutoSchedulerLock auto_lock(outer_->lock_);
outer_->created_sequences_.push_back(sequence);
}
return sequence;
}
// This override verifies that |sequence| contains the expected number of
// Tasks and adds it to |enqueued_sequences_|. Unlike a normal
// EnqueueSequence implementation, it doesn't reinsert |sequence| into a
// queue for further execution.
void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override {
EXPECT_GT(outer_->TasksPerSequence(), 1U);
// Verify that |sequence| contains TasksPerSequence() - 1 Tasks.
for (size_t i = 0; i < outer_->TasksPerSequence() - 1; ++i) {
EXPECT_TRUE(sequence->PeekTask());
sequence->PopTask();
}
EXPECT_FALSE(sequence->PeekTask());
// Add |sequence| to |re_enqueued_sequences_|.
AutoSchedulerLock auto_lock(outer_->lock_);
outer_->re_enqueued_sequences_.push_back(std::move(sequence));
EXPECT_LE(outer_->re_enqueued_sequences_.size(),
outer_->created_sequences_.size());
}
TimeDelta GetSleepTimeout() override {
return TimeDelta::Max();
}
bool CanDetach(SchedulerWorker* worker) override {
return false;
}
private:
TaskSchedulerWorkerTest* outer_;
};
void RunTaskCallback() {
AutoSchedulerLock auto_lock(lock_);
++num_run_tasks_;
EXPECT_LE(num_run_tasks_, created_sequences_.size());
}
TaskTracker task_tracker_;
// Synchronizes access to all members below.
mutable SchedulerLock lock_;
// Signaled once OnMainEntry() has been called.
WaitableEvent main_entry_called_;
// Number of Sequences that should be created by GetWork(). When this
// is 0, GetWork() returns nullptr.
size_t num_sequences_to_create_ = 0;
// Number of times that GetWork() has been called.
size_t num_get_work_ = 0;
// Maximum number of times that GetWork() can be called.
size_t max_get_work_ = 0;
// Condition variable signaled when |num_get_work_| is incremented.
std::unique_ptr<ConditionVariable> num_get_work_cv_;
// Sequences created by GetWork().
std::vector<scoped_refptr<Sequence>> created_sequences_;
// Sequences passed to EnqueueSequence().
std::vector<scoped_refptr<Sequence>> re_enqueued_sequences_;
// Number of times that RunTaskCallback() has been called.
size_t num_run_tasks_ = 0;
// Signaled after |worker_| is set.
WaitableEvent worker_set_;
DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerTest);
};
// Verify that when GetWork() continuously returns Sequences, all Tasks in these
// Sequences run successfully. The test wakes up the SchedulerWorker once.
TEST_P(TaskSchedulerWorkerTest, ContinuousWork) {
// Set GetWork() to return |kNumSequencesPerTest| Sequences before starting to
// return nullptr.
SetNumSequencesToCreate(kNumSequencesPerTest);
// Expect |kNumSequencesPerTest| calls to GetWork() in which it returns a
// Sequence and one call in which its returns nullptr.
const size_t kExpectedNumGetWork = kNumSequencesPerTest + 1;
SetMaxGetWork(kExpectedNumGetWork);
// Wake up |worker_| and wait until GetWork() has been invoked the
// expected amount of times.
worker_->WakeUp();
WaitForNumGetWork(kExpectedNumGetWork);
// All tasks should have run.
EXPECT_EQ(kNumSequencesPerTest, NumRunTasks());
// If Sequences returned by GetWork() contain more than one Task, they aren't
// empty after the worker pops Tasks from them and thus should be returned to
// EnqueueSequence().
if (TasksPerSequence() > 1)
EXPECT_EQ(CreatedSequences(), EnqueuedSequences());
else
EXPECT_TRUE(EnqueuedSequences().empty());
}
// Verify that when GetWork() alternates between returning a Sequence and
// returning nullptr, all Tasks in the returned Sequences run successfully. The
// test wakes up the SchedulerWorker once for each Sequence.
TEST_P(TaskSchedulerWorkerTest, IntermittentWork) {
for (size_t i = 0; i < kNumSequencesPerTest; ++i) {
// Set GetWork() to return 1 Sequence before starting to return
// nullptr.
SetNumSequencesToCreate(1);
// Expect |i + 1| calls to GetWork() in which it returns a Sequence and
// |i + 1| calls in which it returns nullptr.
const size_t expected_num_get_work = 2 * (i + 1);
SetMaxGetWork(expected_num_get_work);
// Wake up |worker_| and wait until GetWork() has been invoked
// the expected amount of times.
worker_->WakeUp();
WaitForNumGetWork(expected_num_get_work);
// The Task should have run
EXPECT_EQ(i + 1, NumRunTasks());
// If Sequences returned by GetWork() contain more than one Task, they
// aren't empty after the worker pops Tasks from them and thus should be
// returned to EnqueueSequence().
if (TasksPerSequence() > 1)
EXPECT_EQ(CreatedSequences(), EnqueuedSequences());
else
EXPECT_TRUE(EnqueuedSequences().empty());
}
}
INSTANTIATE_TEST_CASE_P(OneTaskPerSequence,
TaskSchedulerWorkerTest,
::testing::Values(1));
INSTANTIATE_TEST_CASE_P(TwoTasksPerSequence,
TaskSchedulerWorkerTest,
::testing::Values(2));
namespace {
class ControllableDetachDelegate : public SchedulerWorker::Delegate {
public:
ControllableDetachDelegate()
: work_processed_(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED),
detach_requested_(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED) {}
~ControllableDetachDelegate() override = default;
// SchedulerWorker::Delegate:
void OnMainEntry(SchedulerWorker* worker) override {}
scoped_refptr<Sequence> GetWork(SchedulerWorker* worker)
override {
// Sends one item of work to signal |work_processed_|. On subsequent calls,
// sends nullptr to indicate there's no more work to be done.
if (work_requested_)
return nullptr;
work_requested_ = true;
scoped_refptr<Sequence> sequence(new Sequence);
std::unique_ptr<Task> task(new Task(
FROM_HERE, Bind(&WaitableEvent::Signal, Unretained(&work_processed_)),
TaskTraits(), TimeDelta()));
sequence->PushTask(std::move(task));
return sequence;
}
void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override {
ADD_FAILURE() <<
"GetWork() returns a sequence of one, so there's nothing to reenqueue.";
}
TimeDelta GetSleepTimeout() override {
return TimeDelta::Max();
}
bool CanDetach(SchedulerWorker* worker) override {
detach_requested_.Signal();
return can_detach_;
}
void WaitForWorkToRun() {
work_processed_.Wait();
}
void WaitForDetachRequest() {
detach_requested_.Wait();
}
void ResetState() {
work_requested_ = false;
work_processed_.Reset();
detach_requested_.Reset();
}
void set_can_detach(bool can_detach) { can_detach_ = can_detach; }
private:
bool work_requested_ = false;
bool can_detach_ = false;
WaitableEvent work_processed_;
WaitableEvent detach_requested_;
DISALLOW_COPY_AND_ASSIGN(ControllableDetachDelegate);
};
} // namespace
TEST(TaskSchedulerWorkerTest, WorkerDetaches) {
TaskTracker task_tracker;
// Will be owned by SchedulerWorker.
ControllableDetachDelegate* delegate = new ControllableDetachDelegate;
delegate->set_can_detach(true);
std::unique_ptr<SchedulerWorker> worker =
SchedulerWorker::Create(
ThreadPriority::NORMAL, WrapUnique(delegate), &task_tracker,
SchedulerWorker::InitialState::ALIVE);
worker->WakeUp();
delegate->WaitForWorkToRun();
delegate->WaitForDetachRequest();
// Sleep to give a chance for the detach to happen. A yield is too short.
PlatformThread::Sleep(TimeDelta::FromMilliseconds(50));
ASSERT_FALSE(worker->ThreadAliveForTesting());
}
TEST(TaskSchedulerWorkerTest, WorkerDetachesAndWakes) {
TaskTracker task_tracker;
// Will be owned by SchedulerWorker.
ControllableDetachDelegate* delegate = new ControllableDetachDelegate;
delegate->set_can_detach(true);
std::unique_ptr<SchedulerWorker> worker =
SchedulerWorker::Create(
ThreadPriority::NORMAL, WrapUnique(delegate), &task_tracker,
SchedulerWorker::InitialState::ALIVE);
worker->WakeUp();
delegate->WaitForWorkToRun();
delegate->WaitForDetachRequest();
// Sleep to give a chance for the detach to happen. A yield is too short.
PlatformThread::Sleep(TimeDelta::FromMilliseconds(50));
ASSERT_FALSE(worker->ThreadAliveForTesting());
delegate->ResetState();
delegate->set_can_detach(false);
worker->WakeUp();
delegate->WaitForWorkToRun();
delegate->WaitForDetachRequest();
PlatformThread::Sleep(TimeDelta::FromMilliseconds(50));
ASSERT_TRUE(worker->ThreadAliveForTesting());
worker->JoinForTesting();
}
TEST(TaskSchedulerWorkerTest, CreateDetached) {
TaskTracker task_tracker;
// Will be owned by SchedulerWorker.
ControllableDetachDelegate* delegate = new ControllableDetachDelegate;
std::unique_ptr<SchedulerWorker> worker =
SchedulerWorker::Create(
ThreadPriority::NORMAL, WrapUnique(delegate), &task_tracker,
SchedulerWorker::InitialState::DETACHED);
ASSERT_FALSE(worker->ThreadAliveForTesting());
worker->WakeUp();
delegate->WaitForWorkToRun();
delegate->WaitForDetachRequest();
ASSERT_TRUE(worker->ThreadAliveForTesting());
worker->JoinForTesting();
}
} // namespace
} // namespace internal
} // namespace base