// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "content/renderer/categorized_worker_pool.h"

#include <string>
#include <utility>
#include <vector>

#include "base/strings/stringprintf.h"
#include "base/threading/thread_restrictions.h"
#include "base/trace_event/trace_event.h"
#include "cc/base/math_util.h"
#include "cc/raster/task_category.h"

namespace content {
namespace {

// A thread which forwards to CategorizedWorkerPool::Run with the runnable
// categories.
class CategorizedWorkerPoolThread : public base::SimpleThread {
 public:
  CategorizedWorkerPoolThread(
      const std::string& name_prefix,
      const Options& options,
      CategorizedWorkerPool* pool,
      std::vector<cc::TaskCategory> categories,
      base::ConditionVariable* has_ready_to_run_tasks_cv)
      : SimpleThread(name_prefix, options),
        pool_(pool),
        categories_(categories),
        has_ready_to_run_tasks_cv_(has_ready_to_run_tasks_cv) {}

  void Run() override { pool_->Run(categories_, has_ready_to_run_tasks_cv_); }

 private:
  CategorizedWorkerPool* const pool_;
  const std::vector<cc::TaskCategory> categories_;
  base::ConditionVariable* const has_ready_to_run_tasks_cv_;
};

}  // namespace

// A sequenced task runner which posts tasks to a CategorizedWorkerPool.
class CategorizedWorkerPool::CategorizedWorkerPoolSequencedTaskRunner
    : public base::SequencedTaskRunner {
 public:
  explicit CategorizedWorkerPoolSequencedTaskRunner(
      cc::TaskGraphRunner* task_graph_runner)
      : task_graph_runner_(task_graph_runner),
        namespace_token_(task_graph_runner->GenerateNamespaceToken()) {}

  // Overridden from base::TaskRunner:
  bool PostDelayedTask(const tracked_objects::Location& from_here,
                       const base::Closure& task,
                       base::TimeDelta delay) override {
    return PostNonNestableDelayedTask(from_here, task, delay);
  }
  bool RunsTasksOnCurrentThread() const override { return true; }

  // Overridden from base::SequencedTaskRunner:
  bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
                                  const base::Closure& task,
                                  base::TimeDelta delay) override {
    base::AutoLock lock(lock_);

    // Remove completed tasks.
    DCHECK(completed_tasks_.empty());
    task_graph_runner_->CollectCompletedTasks(namespace_token_,
                                              &completed_tasks_);

    tasks_.erase(tasks_.begin(), tasks_.begin() + completed_tasks_.size());

    tasks_.push_back(make_scoped_refptr(new ClosureTask(task)));
    graph_.Reset();
    for (const auto& graph_task : tasks_) {
      int dependencies = 0;
      if (!graph_.nodes.empty())
        dependencies = 1;

      // Treat any tasks that are enqueued through the SequencedTaskRunner as
      // FOREGROUND priority. We don't have enough information to know the
      // actual priority of such tasks, so we run them as soon as possible.
      cc::TaskGraph::Node node(graph_task, cc::TASK_CATEGORY_FOREGROUND,
                               0u /* priority */, dependencies);
      if (dependencies) {
        graph_.edges.push_back(cc::TaskGraph::Edge(
            graph_.nodes.back().task.get(), node.task.get()));
      }
      graph_.nodes.push_back(std::move(node));
    }
    task_graph_runner_->ScheduleTasks(namespace_token_, &graph_);
    completed_tasks_.clear();
    return true;
  }

 private:
  ~CategorizedWorkerPoolSequencedTaskRunner() override {
    task_graph_runner_->WaitForTasksToFinishRunning(namespace_token_);
    task_graph_runner_->CollectCompletedTasks(namespace_token_,
                                              &completed_tasks_);
  };

  // Lock to exclusively access all the following members that are used to
  // implement the SequencedTaskRunner interfaces.
  base::Lock lock_;

  cc::TaskGraphRunner* task_graph_runner_;
  // Namespace used to schedule tasks in the task graph runner.
  cc::NamespaceToken namespace_token_;
  // List of tasks currently queued up for execution.
  cc::Task::Vector tasks_;
  // Graph object used for scheduling tasks.
  cc::TaskGraph graph_;
  // Cached vector to avoid allocation when getting the list of complete
  // tasks.
  cc::Task::Vector completed_tasks_;
};

CategorizedWorkerPool::CategorizedWorkerPool()
    : namespace_token_(GenerateNamespaceToken()),
      has_ready_to_run_foreground_tasks_cv_(&lock_),
      has_ready_to_run_background_tasks_cv_(&lock_),
      has_namespaces_with_finished_running_tasks_cv_(&lock_),
      shutdown_(false) {}

void CategorizedWorkerPool::Start(int num_threads) {
  DCHECK(threads_.empty());

  // Start |num_threads| threads for foreground work, including nonconcurrent
  // foreground work.
  std::vector<cc::TaskCategory> foreground_categories;
  foreground_categories.push_back(cc::TASK_CATEGORY_NONCONCURRENT_FOREGROUND);
  foreground_categories.push_back(cc::TASK_CATEGORY_FOREGROUND);

  for (int i = 0; i < num_threads; i++) {
    std::unique_ptr<base::SimpleThread> thread(new CategorizedWorkerPoolThread(
        base::StringPrintf("CompositorTileWorker%d", i + 1).c_str(),
        base::SimpleThread::Options(), this, foreground_categories,
        &has_ready_to_run_foreground_tasks_cv_));
    thread->Start();
    threads_.push_back(std::move(thread));
  }

  // Start a single thread for background work.
  std::vector<cc::TaskCategory> background_categories;
  background_categories.push_back(cc::TASK_CATEGORY_BACKGROUND);

  // Use background priority for background thread.
  base::SimpleThread::Options thread_options;
#if !defined(OS_MACOSX)
  thread_options.priority = base::ThreadPriority::BACKGROUND;
#endif

  std::unique_ptr<base::SimpleThread> thread(new CategorizedWorkerPoolThread(
      "CompositorTileWorkerBackground", thread_options, this,
      background_categories, &has_ready_to_run_background_tasks_cv_));
  thread->Start();
  threads_.push_back(std::move(thread));
}

void CategorizedWorkerPool::Shutdown() {
  WaitForTasksToFinishRunning(namespace_token_);
  CollectCompletedTasks(namespace_token_, &completed_tasks_);
  // Shutdown raster threads.
  {
    base::AutoLock lock(lock_);

    DCHECK(!work_queue_.HasReadyToRunTasks());
    DCHECK(!work_queue_.HasAnyNamespaces());

    DCHECK(!shutdown_);
    shutdown_ = true;

    // Wake up all workers so they exit.
    has_ready_to_run_foreground_tasks_cv_.Broadcast();
    has_ready_to_run_background_tasks_cv_.Broadcast();
  }
  while (!threads_.empty()) {
    threads_.back()->Join();
    threads_.pop_back();
  }
}

// Overridden from base::TaskRunner:
bool CategorizedWorkerPool::PostDelayedTask(
    const tracked_objects::Location& from_here,
    const base::Closure& task,
    base::TimeDelta delay) {
  base::AutoLock lock(lock_);

  // Remove completed tasks.
  DCHECK(completed_tasks_.empty());
  CollectCompletedTasksWithLockAcquired(namespace_token_, &completed_tasks_);

  cc::Task::Vector::iterator end = std::remove_if(
      tasks_.begin(), tasks_.end(), [this](const scoped_refptr<cc::Task>& e) {
        return std::find(this->completed_tasks_.begin(),
                         this->completed_tasks_.end(),
                         e) != this->completed_tasks_.end();
      });
  tasks_.erase(end, tasks_.end());

  tasks_.push_back(make_scoped_refptr(new ClosureTask(task)));
  graph_.Reset();
  for (const auto& graph_task : tasks_) {
    // Delayed tasks are assigned FOREGROUND category, ensuring that they run as
    // soon as possible once their delay has expired.
    graph_.nodes.push_back(
        cc::TaskGraph::Node(graph_task.get(), cc::TASK_CATEGORY_FOREGROUND,
                            0u /* priority */, 0u /* dependencies */));
  }

  ScheduleTasksWithLockAcquired(namespace_token_, &graph_);
  completed_tasks_.clear();
  return true;
}

bool CategorizedWorkerPool::RunsTasksOnCurrentThread() const {
  return true;
}

void CategorizedWorkerPool::Run(
    const std::vector<cc::TaskCategory>& categories,
    base::ConditionVariable* has_ready_to_run_tasks_cv) {
  base::AutoLock lock(lock_);

  while (true) {
    if (!RunTaskWithLockAcquired(categories)) {
      // We are no longer running tasks, which may allow another category to
      // start running. Signal other worker threads.
      SignalHasReadyToRunTasksWithLockAcquired();

      // Exit when shutdown is set and no more tasks are pending.
      if (shutdown_)
        break;

      // Wait for more tasks.
      has_ready_to_run_tasks_cv->Wait();
      continue;
    }
  }
}

void CategorizedWorkerPool::FlushForTesting() {
  base::AutoLock lock(lock_);

  while (!work_queue_.HasFinishedRunningTasksInAllNamespaces()) {
    has_namespaces_with_finished_running_tasks_cv_.Wait();
  }
}

scoped_refptr<base::SequencedTaskRunner>
CategorizedWorkerPool::CreateSequencedTaskRunner() {
  return new CategorizedWorkerPoolSequencedTaskRunner(this);
}

CategorizedWorkerPool::~CategorizedWorkerPool() {}

cc::NamespaceToken CategorizedWorkerPool::GenerateNamespaceToken() {
  base::AutoLock lock(lock_);
  return work_queue_.GenerateNamespaceToken();
}

void CategorizedWorkerPool::ScheduleTasks(cc::NamespaceToken token,
                                          cc::TaskGraph* graph) {
  TRACE_EVENT2("disabled-by-default-cc.debug",
               "CategorizedWorkerPool::ScheduleTasks", "num_nodes",
               graph->nodes.size(), "num_edges", graph->edges.size());
  {
    base::AutoLock lock(lock_);
    ScheduleTasksWithLockAcquired(token, graph);
  }
}

void CategorizedWorkerPool::ScheduleTasksWithLockAcquired(
    cc::NamespaceToken token,
    cc::TaskGraph* graph) {
  DCHECK(token.IsValid());
  DCHECK(!cc::TaskGraphWorkQueue::DependencyMismatch(graph));
  DCHECK(!shutdown_);

  work_queue_.ScheduleTasks(token, graph);

  // There may be more work available, so wake up another worker thread.
  SignalHasReadyToRunTasksWithLockAcquired();
}

void CategorizedWorkerPool::WaitForTasksToFinishRunning(
    cc::NamespaceToken token) {
  TRACE_EVENT0("disabled-by-default-cc.debug",
               "CategorizedWorkerPool::WaitForTasksToFinishRunning");

  DCHECK(token.IsValid());

  {
    base::AutoLock lock(lock_);
    base::ThreadRestrictions::ScopedAllowWait allow_wait;

    auto* task_namespace = work_queue_.GetNamespaceForToken(token);

    if (!task_namespace)
      return;

    while (!work_queue_.HasFinishedRunningTasksInNamespace(task_namespace))
      has_namespaces_with_finished_running_tasks_cv_.Wait();

    // There may be other namespaces that have finished running tasks, so wake
    // up another origin thread.
    has_namespaces_with_finished_running_tasks_cv_.Signal();
  }
}

void CategorizedWorkerPool::CollectCompletedTasks(
    cc::NamespaceToken token,
    cc::Task::Vector* completed_tasks) {
  TRACE_EVENT0("disabled-by-default-cc.debug",
               "CategorizedWorkerPool::CollectCompletedTasks");

  {
    base::AutoLock lock(lock_);
    CollectCompletedTasksWithLockAcquired(token, completed_tasks);
  }
}

void CategorizedWorkerPool::CollectCompletedTasksWithLockAcquired(
    cc::NamespaceToken token,
    cc::Task::Vector* completed_tasks) {
  DCHECK(token.IsValid());
  work_queue_.CollectCompletedTasks(token, completed_tasks);
}

bool CategorizedWorkerPool::RunTaskWithLockAcquired(
    const std::vector<cc::TaskCategory>& categories) {
  for (const auto& category : categories) {
    if (ShouldRunTaskForCategoryWithLockAcquired(category)) {
      RunTaskInCategoryWithLockAcquired(category);
      return true;
    }
  }
  return false;
}

void CategorizedWorkerPool::RunTaskInCategoryWithLockAcquired(
    cc::TaskCategory category) {
  TRACE_EVENT0("toplevel", "TaskGraphRunner::RunTask");

  lock_.AssertAcquired();

  auto prioritized_task = work_queue_.GetNextTaskToRun(category);

  // There may be more work available, so wake up another worker thread.
  SignalHasReadyToRunTasksWithLockAcquired();

  {
    base::AutoUnlock unlock(lock_);

    prioritized_task.task->RunOnWorkerThread();
  }

  auto* task_namespace = prioritized_task.task_namespace;
  work_queue_.CompleteTask(std::move(prioritized_task));

  // If namespace has finished running all tasks, wake up origin threads.
  if (work_queue_.HasFinishedRunningTasksInNamespace(task_namespace))
    has_namespaces_with_finished_running_tasks_cv_.Signal();
}

bool CategorizedWorkerPool::ShouldRunTaskForCategoryWithLockAcquired(
    cc::TaskCategory category) {
  lock_.AssertAcquired();

  if (!work_queue_.HasReadyToRunTasksForCategory(category))
    return false;

  if (category == cc::TASK_CATEGORY_BACKGROUND) {
    // Only run background tasks if there are no foreground tasks running or
    // ready to run.
    size_t num_running_foreground_tasks =
        work_queue_.NumRunningTasksForCategory(
            cc::TASK_CATEGORY_NONCONCURRENT_FOREGROUND) +
        work_queue_.NumRunningTasksForCategory(cc::TASK_CATEGORY_FOREGROUND);
    bool has_ready_to_run_foreground_tasks =
        work_queue_.HasReadyToRunTasksForCategory(
            cc::TASK_CATEGORY_NONCONCURRENT_FOREGROUND) ||
        work_queue_.HasReadyToRunTasksForCategory(cc::TASK_CATEGORY_FOREGROUND);

    if (num_running_foreground_tasks > 0 || has_ready_to_run_foreground_tasks)
      return false;
  }

  // Enforce that only one nonconcurrent task runs at a time.
  if (category == cc::TASK_CATEGORY_NONCONCURRENT_FOREGROUND &&
      work_queue_.NumRunningTasksForCategory(
          cc::TASK_CATEGORY_NONCONCURRENT_FOREGROUND) > 0) {
    return false;
  }

  return true;
}

void CategorizedWorkerPool::SignalHasReadyToRunTasksWithLockAcquired() {
  lock_.AssertAcquired();

  if (ShouldRunTaskForCategoryWithLockAcquired(cc::TASK_CATEGORY_FOREGROUND) ||
      ShouldRunTaskForCategoryWithLockAcquired(
          cc::TASK_CATEGORY_NONCONCURRENT_FOREGROUND)) {
    has_ready_to_run_foreground_tasks_cv_.Signal();
  }

  if (ShouldRunTaskForCategoryWithLockAcquired(cc::TASK_CATEGORY_BACKGROUND)) {
    has_ready_to_run_background_tasks_cv_.Signal();
  }
}

CategorizedWorkerPool::ClosureTask::ClosureTask(const base::Closure& closure)
    : closure_(closure) {}

// Overridden from cc::Task:
void CategorizedWorkerPool::ClosureTask::RunOnWorkerThread() {
  closure_.Run();
  closure_.Reset();
}

CategorizedWorkerPool::ClosureTask::~ClosureTask() {}

}  // namespace content
