| // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "remoting/base/plugin_thread_task_runner.h" |
| |
| #include "base/bind.h" |
| |
| namespace { |
| |
| base::TimeDelta CalcTimeDelta(base::TimeTicks when) { |
| return std::max(when - base::TimeTicks::Now(), base::TimeDelta()); |
| } |
| |
| } // namespace |
| |
| namespace remoting { |
| |
| PluginThreadTaskRunner::Delegate::~Delegate() { |
| } |
| |
| PluginThreadTaskRunner::PluginThreadTaskRunner(Delegate* delegate) |
| : plugin_thread_id_(base::PlatformThread::CurrentId()), |
| event_(false, false), |
| delegate_(delegate), |
| next_sequence_num_(0), |
| quit_received_(false), |
| stopped_(false) { |
| } |
| |
| PluginThreadTaskRunner::~PluginThreadTaskRunner() { |
| DCHECK(delegate_ == NULL); |
| DCHECK(stopped_); |
| } |
| |
| void PluginThreadTaskRunner::DetachAndRunShutdownLoop() { |
| DCHECK(BelongsToCurrentThread()); |
| |
| // Detach from the plugin thread and redirect all tasks posted after this |
| // point to the shutdown task loop. |
| { |
| base::AutoLock auto_lock(lock_); |
| |
| DCHECK(delegate_ != NULL); |
| DCHECK(!stopped_); |
| |
| delegate_ = NULL; |
| stopped_ = quit_received_; |
| } |
| |
| // When DetachAndRunShutdownLoop() is called from NPP_Destroy() all scheduled |
| // timers are cancelled. It is OK to clear |scheduled_timers_| even if |
| // the timers weren't actually cancelled (i.e. DetachAndRunShutdownLoop() is |
| // called before NPP_Destroy()). |
| scheduled_timers_.clear(); |
| |
| // Run all tasks that are due. |
| ProcessIncomingTasks(); |
| RunDueTasks(base::TimeTicks::Now()); |
| |
| while (!stopped_) { |
| if (delayed_queue_.empty()) { |
| event_.Wait(); |
| } else { |
| event_.TimedWait(CalcTimeDelta(delayed_queue_.top().delayed_run_time)); |
| } |
| |
| // Run all tasks that are due. |
| ProcessIncomingTasks(); |
| RunDueTasks(base::TimeTicks::Now()); |
| |
| base::AutoLock auto_lock(lock_); |
| stopped_ = quit_received_; |
| } |
| } |
| |
| void PluginThreadTaskRunner::Quit() { |
| base::AutoLock auto_lock(lock_); |
| |
| if (!quit_received_) { |
| quit_received_ = true; |
| event_.Signal(); |
| } |
| } |
| |
| bool PluginThreadTaskRunner::PostDelayedTask( |
| const tracked_objects::Location& from_here, |
| const base::Closure& task, |
| base::TimeDelta delay) { |
| |
| // Wrap the task into |base::PendingTask|. |
| base::TimeTicks delayed_run_time; |
| if (delay > base::TimeDelta()) { |
| delayed_run_time = base::TimeTicks::Now() + delay; |
| } else { |
| DCHECK_EQ(delay.InMilliseconds(), 0) << "delay should not be negative"; |
| } |
| |
| base::PendingTask pending_task(from_here, task, delayed_run_time, false); |
| |
| // Push the task to the incoming queue. |
| base::AutoLock locked(lock_); |
| |
| // Initialize the sequence number. The sequence number provides FIFO ordering |
| // for tasks with the same |delayed_run_time|. |
| pending_task.sequence_num = next_sequence_num_++; |
| |
| // Post an asynchronous call on the plugin thread to process the task. |
| if (incoming_queue_.empty()) { |
| PostRunTasks(); |
| } |
| |
| incoming_queue_.push(pending_task); |
| pending_task.task.Reset(); |
| |
| // No tasks should be posted after Quit() has been called. |
| DCHECK(!quit_received_); |
| return true; |
| } |
| |
| bool PluginThreadTaskRunner::PostNonNestableDelayedTask( |
| const tracked_objects::Location& from_here, |
| const base::Closure& task, |
| base::TimeDelta delay) { |
| // All tasks running on this task loop are non-nestable. |
| return PostDelayedTask(from_here, task, delay); |
| } |
| |
| bool PluginThreadTaskRunner::RunsTasksOnCurrentThread() const { |
| // In pepper plugins ideally we should use pp::Core::IsMainThread, |
| // but it is problematic because we would need to keep reference to |
| // Core somewhere, e.g. make the delegate ref-counted. |
| return base::PlatformThread::CurrentId() == plugin_thread_id_; |
| } |
| |
| void PluginThreadTaskRunner::PostRunTasks() { |
| // Post tasks to the plugin thread when it is availabe or spin the shutdown |
| // task loop. |
| if (delegate_ != NULL) { |
| base::Closure closure = base::Bind(&PluginThreadTaskRunner::RunTasks, this); |
| delegate_->RunOnPluginThread( |
| base::TimeDelta(), |
| &PluginThreadTaskRunner::TaskSpringboard, |
| new base::Closure(closure)); |
| } else { |
| event_.Signal(); |
| } |
| } |
| |
| void PluginThreadTaskRunner::PostDelayedRunTasks(base::TimeTicks when) { |
| DCHECK(BelongsToCurrentThread()); |
| |
| // |delegate_| is updated from the plugin thread only, so it is safe to access |
| // it here without taking the lock. |
| if (delegate_ != NULL) { |
| // Schedule RunDelayedTasks() to be called at |when| if it hasn't been |
| // scheduled already. |
| if (scheduled_timers_.insert(when).second) { |
| base::TimeDelta delay = CalcTimeDelta(when); |
| base::Closure closure = |
| base::Bind(&PluginThreadTaskRunner::RunDelayedTasks, this, when); |
| delegate_->RunOnPluginThread( |
| delay, |
| &PluginThreadTaskRunner::TaskSpringboard, |
| new base::Closure(closure)); |
| } |
| } else { |
| // Spin the shutdown loop if the task runner has already been detached. |
| // The shutdown loop will pick the tasks to run itself. |
| event_.Signal(); |
| } |
| } |
| |
| void PluginThreadTaskRunner::ProcessIncomingTasks() { |
| DCHECK(BelongsToCurrentThread()); |
| |
| // Grab all unsorted tasks accomulated so far. |
| base::TaskQueue work_queue; |
| { |
| base::AutoLock locked(lock_); |
| incoming_queue_.Swap(&work_queue); |
| } |
| |
| while (!work_queue.empty()) { |
| base::PendingTask pending_task = work_queue.front(); |
| work_queue.pop(); |
| |
| if (pending_task.delayed_run_time.is_null()) { |
| pending_task.task.Run(); |
| } else { |
| delayed_queue_.push(pending_task); |
| } |
| } |
| } |
| |
| void PluginThreadTaskRunner::RunDelayedTasks(base::TimeTicks when) { |
| DCHECK(BelongsToCurrentThread()); |
| |
| scheduled_timers_.erase(when); |
| |
| // |stopped_| is updated by the plugin thread only, so it is safe to access |
| // it here without taking the lock. |
| if (!stopped_) { |
| ProcessIncomingTasks(); |
| RunDueTasks(base::TimeTicks::Now()); |
| } |
| } |
| |
| void PluginThreadTaskRunner::RunDueTasks(base::TimeTicks now) { |
| DCHECK(BelongsToCurrentThread()); |
| |
| // Run all due tasks. |
| while (!delayed_queue_.empty() && |
| delayed_queue_.top().delayed_run_time <= now) { |
| delayed_queue_.top().task.Run(); |
| delayed_queue_.pop(); |
| } |
| |
| // Post a delayed asynchronous call to the plugin thread to process tasks from |
| // the delayed queue. |
| if (!delayed_queue_.empty()) { |
| base::TimeTicks when = delayed_queue_.top().delayed_run_time; |
| if (scheduled_timers_.empty() || when < *scheduled_timers_.begin()) { |
| PostDelayedRunTasks(when); |
| } |
| } |
| } |
| |
| void PluginThreadTaskRunner::RunTasks() { |
| DCHECK(BelongsToCurrentThread()); |
| |
| // |stopped_| is updated by the plugin thread only, so it is safe to access |
| // it here without taking the lock. |
| if (!stopped_) { |
| ProcessIncomingTasks(); |
| RunDueTasks(base::TimeTicks::Now()); |
| } |
| } |
| |
| // static |
| void PluginThreadTaskRunner::TaskSpringboard(void* data) { |
| base::Closure* task = reinterpret_cast<base::Closure*>(data); |
| task->Run(); |
| delete task; |
| } |
| |
| } // namespace remoting |