blob: dca78ea4d9e7fd0b57e0d86847d261237e09aa28 [file] [log] [blame]
// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/bind.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/single_thread_task_runner.h"
#include "base/thread_task_runner_handle.h"
#include "chrome/browser/chromeos/file_system_provider/queue.h"
namespace chromeos {
namespace file_system_provider {
Queue::Task::Task() : token(0) {
}
Queue::Task::Task(size_t token, const AbortableCallback& callback)
: token(token), callback(callback) {
}
Queue::Task::~Task() {
}
Queue::Queue(size_t max_in_parallel)
: max_in_parallel_(max_in_parallel),
next_token_(1),
weak_ptr_factory_(this) {
CHECK_LT(0u, max_in_parallel);
}
Queue::~Queue() {
}
size_t Queue::NewToken() {
return next_token_++;
}
void Queue::Enqueue(size_t token, const AbortableCallback& callback) {
#if !NDEBUG
CHECK(executed_.find(token) == executed_.end());
for (auto& task : pending_) {
CHECK(token != task.token);
}
#endif
pending_.push_back(Task(token, callback));
base::ThreadTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::Bind(&Queue::MaybeRun, weak_ptr_factory_.GetWeakPtr()));
}
void Queue::Complete(size_t token) {
const auto it = executed_.find(token);
DCHECK(it != executed_.end());
executed_.erase(it);
base::ThreadTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::Bind(&Queue::MaybeRun, weak_ptr_factory_.GetWeakPtr()));
}
void Queue::MaybeRun() {
if (executed_.size() == max_in_parallel_ || !pending_.size())
return;
CHECK_GT(max_in_parallel_, executed_.size());
Task task = pending_.front();
pending_.pop_front();
executed_[task.token] = task;
AbortCallback abort_callback = task.callback.Run();
// It may happen that the task is completed and removed synchronously. Hence,
// we need to check if the task is still in the executed collection.
const auto executed_task_it = executed_.find(task.token);
if (executed_task_it != executed_.end())
executed_task_it->second.abort_callback = abort_callback;
}
void Queue::Abort(size_t token) {
// Check if it's running. If so, then abort and expect a Complete() call soon.
const auto it = executed_.find(token);
if (it != executed_.end()) {
Task& task = it->second;
AbortCallback abort_callback = task.abort_callback;
task.abort_callback = AbortCallback();
DCHECK(!abort_callback.is_null());
abort_callback.Run();
return;
}
// Aborting not running tasks is linear. TODO(mtomasz): Optimize if feasible.
for (auto it = pending_.begin(); it != pending_.end(); ++it) {
if (token == it->token) {
pending_.erase(it);
base::ThreadTaskRunnerHandle::Get()->PostTask(
FROM_HERE,
base::Bind(&Queue::MaybeRun, weak_ptr_factory_.GetWeakPtr()));
return;
}
}
// The task is already removed, marked as completed or aborted.
NOTREACHED();
}
} // namespace file_system_provider
} // namespace chromeos