| // Copyright 2013 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "mojo/common/handle_watcher.h" |
| |
| #include <map> |
| |
| #include "base/atomic_sequence_num.h" |
| #include "base/bind.h" |
| #include "base/lazy_instance.h" |
| #include "base/memory/weak_ptr.h" |
| #include "base/message_loop/message_loop.h" |
| #include "base/message_loop/message_loop_proxy.h" |
| #include "base/threading/thread.h" |
| #include "base/time/time.h" |
| #include "mojo/common/message_pump_mojo.h" |
| #include "mojo/common/message_pump_mojo_handler.h" |
| #include "mojo/common/time_helper.h" |
| |
| namespace mojo { |
| namespace common { |
| |
| typedef int WatcherID; |
| |
| namespace { |
| |
| const char kWatcherThreadName[] = "handle-watcher-thread"; |
| |
| // TODO(sky): this should be unnecessary once MessageLoop has been refactored. |
| MessagePumpMojo* message_pump_mojo = NULL; |
| |
| scoped_ptr<base::MessagePump> CreateMessagePumpMojo() { |
| message_pump_mojo = new MessagePumpMojo; |
| return scoped_ptr<base::MessagePump>(message_pump_mojo).Pass(); |
| } |
| |
| base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) { |
| return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() : |
| internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline); |
| } |
| |
| // Tracks the data for a single call to Start(). |
| struct WatchData { |
| WatchData() |
| : id(0), |
| wait_flags(MOJO_WAIT_FLAG_NONE), |
| message_loop(NULL) {} |
| |
| WatcherID id; |
| Handle handle; |
| MojoWaitFlags wait_flags; |
| base::TimeTicks deadline; |
| base::Callback<void(MojoResult)> callback; |
| scoped_refptr<base::MessageLoopProxy> message_loop; |
| }; |
| |
| // WatcherBackend -------------------------------------------------------------- |
| |
| // WatcherBackend is responsible for managing the requests and interacting with |
| // MessagePumpMojo. All access (outside of creation/destruction) is done on the |
| // thread WatcherThreadManager creates. |
| class WatcherBackend : public MessagePumpMojoHandler { |
| public: |
| WatcherBackend(); |
| virtual ~WatcherBackend(); |
| |
| void StartWatching(const WatchData& data); |
| void StopWatching(WatcherID watcher_id); |
| |
| private: |
| typedef std::map<Handle, WatchData> HandleToWatchDataMap; |
| |
| // Invoked when a handle needs to be removed and notified. |
| void RemoveAndNotify(const Handle& handle, MojoResult result); |
| |
| // Searches through |handle_to_data_| for |watcher_id|. Returns true if found |
| // and sets |handle| to the Handle. Returns false if not a known id. |
| bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const; |
| |
| // MessagePumpMojoHandler overrides: |
| virtual void OnHandleReady(const Handle& handle) OVERRIDE; |
| virtual void OnHandleError(const Handle& handle, MojoResult result) OVERRIDE; |
| |
| // Maps from assigned id to WatchData. |
| HandleToWatchDataMap handle_to_data_; |
| |
| DISALLOW_COPY_AND_ASSIGN(WatcherBackend); |
| }; |
| |
| WatcherBackend::WatcherBackend() { |
| } |
| |
| WatcherBackend::~WatcherBackend() { |
| } |
| |
| void WatcherBackend::StartWatching(const WatchData& data) { |
| RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED); |
| |
| DCHECK_EQ(0u, handle_to_data_.count(data.handle)); |
| |
| handle_to_data_[data.handle] = data; |
| message_pump_mojo->AddHandler(this, data.handle, |
| data.wait_flags, |
| data.deadline); |
| } |
| |
| void WatcherBackend::StopWatching(WatcherID watcher_id) { |
| // Because of the thread hop it is entirely possible to get here and not |
| // have a valid handle registered for |watcher_id|. |
| Handle handle; |
| if (!GetMojoHandleByWatcherID(watcher_id, &handle)) |
| return; |
| |
| handle_to_data_.erase(handle); |
| message_pump_mojo->RemoveHandler(handle); |
| } |
| |
| void WatcherBackend::RemoveAndNotify(const Handle& handle, |
| MojoResult result) { |
| if (handle_to_data_.count(handle) == 0) |
| return; |
| |
| const WatchData data(handle_to_data_[handle]); |
| handle_to_data_.erase(handle); |
| message_pump_mojo->RemoveHandler(handle); |
| data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result)); |
| } |
| |
| bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id, |
| Handle* handle) const { |
| for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin(); |
| i != handle_to_data_.end(); ++i) { |
| if (i->second.id == watcher_id) { |
| *handle = i->second.handle; |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| void WatcherBackend::OnHandleReady(const Handle& handle) { |
| RemoveAndNotify(handle, MOJO_RESULT_OK); |
| } |
| |
| void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) { |
| RemoveAndNotify(handle, result); |
| } |
| |
| // WatcherThreadManager -------------------------------------------------------- |
| |
| // WatcherThreadManager manages the background thread that listens for handles |
| // to be ready. All requests are handled by WatcherBackend. |
| class WatcherThreadManager { |
| public: |
| // Returns the shared instance. |
| static WatcherThreadManager* GetInstance(); |
| |
| // Starts watching the requested handle. Returns a unique ID that is used to |
| // stop watching the handle. When the handle is ready |callback| is notified |
| // on the thread StartWatching() was invoked on. |
| // This may be invoked on any thread. |
| WatcherID StartWatching(const Handle& handle, |
| MojoWaitFlags wait_flags, |
| base::TimeTicks deadline, |
| const base::Callback<void(MojoResult)>& callback); |
| |
| // Stops watching a handle. |
| // This may be invoked on any thread. |
| void StopWatching(WatcherID watcher_id); |
| |
| private: |
| friend struct base::DefaultLazyInstanceTraits<WatcherThreadManager>; |
| |
| WatcherThreadManager(); |
| ~WatcherThreadManager(); |
| |
| base::Thread thread_; |
| |
| base::AtomicSequenceNumber watcher_id_generator_; |
| |
| WatcherBackend backend_; |
| |
| DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager); |
| }; |
| |
| WatcherThreadManager* WatcherThreadManager::GetInstance() { |
| static base::LazyInstance<WatcherThreadManager> instance = |
| LAZY_INSTANCE_INITIALIZER; |
| return &instance.Get(); |
| } |
| |
| WatcherID WatcherThreadManager::StartWatching( |
| const Handle& handle, |
| MojoWaitFlags wait_flags, |
| base::TimeTicks deadline, |
| const base::Callback<void(MojoResult)>& callback) { |
| WatchData data; |
| data.id = watcher_id_generator_.GetNext(); |
| data.handle = handle; |
| data.callback = callback; |
| data.wait_flags = wait_flags; |
| data.deadline = deadline; |
| data.message_loop = base::MessageLoopProxy::current(); |
| DCHECK_NE(static_cast<base::MessageLoopProxy*>(NULL), |
| data.message_loop.get()); |
| // We outlive |thread_|, so it's safe to use Unretained() here. |
| thread_.message_loop()->PostTask( |
| FROM_HERE, |
| base::Bind(&WatcherBackend::StartWatching, |
| base::Unretained(&backend_), |
| data)); |
| return data.id; |
| } |
| |
| void WatcherThreadManager::StopWatching(WatcherID watcher_id) { |
| // We outlive |thread_|, so it's safe to use Unretained() here. |
| thread_.message_loop()->PostTask( |
| FROM_HERE, |
| base::Bind(&WatcherBackend::StopWatching, |
| base::Unretained(&backend_), |
| watcher_id)); |
| } |
| |
| WatcherThreadManager::WatcherThreadManager() |
| : thread_(kWatcherThreadName) { |
| base::Thread::Options thread_options; |
| thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo); |
| thread_.StartWithOptions(thread_options); |
| } |
| |
| WatcherThreadManager::~WatcherThreadManager() { |
| thread_.Stop(); |
| } |
| |
| } // namespace |
| |
| // HandleWatcher::StartState --------------------------------------------------- |
| |
| // Contains the information passed to Start(). |
| struct HandleWatcher::StartState { |
| explicit StartState(HandleWatcher* watcher) : weak_factory(watcher) { |
| } |
| |
| ~StartState() { |
| } |
| |
| // ID assigned by WatcherThreadManager. |
| WatcherID watcher_id; |
| |
| // Callback to notify when done. |
| base::Callback<void(MojoResult)> callback; |
| |
| // When Start() is invoked a callback is passed to WatcherThreadManager |
| // using a WeakRef from |weak_refactory_|. The callback invokes |
| // OnHandleReady() (on the thread Start() is invoked from) which in turn |
| // notifies |callback_|. Doing this allows us to reset state when the handle |
| // is ready, and then notify the callback. Doing this also means Stop() |
| // cancels any pending callbacks that may be inflight. |
| base::WeakPtrFactory<HandleWatcher> weak_factory; |
| }; |
| |
| // HandleWatcher --------------------------------------------------------------- |
| |
| HandleWatcher::HandleWatcher() { |
| } |
| |
| HandleWatcher::~HandleWatcher() { |
| Stop(); |
| } |
| |
| void HandleWatcher::Start(const Handle& handle, |
| MojoWaitFlags wait_flags, |
| MojoDeadline deadline, |
| const base::Callback<void(MojoResult)>& callback) { |
| DCHECK(handle.is_valid()); |
| DCHECK_NE(MOJO_WAIT_FLAG_NONE, wait_flags); |
| |
| Stop(); |
| |
| start_state_.reset(new StartState(this)); |
| start_state_->callback = callback; |
| start_state_->watcher_id = |
| WatcherThreadManager::GetInstance()->StartWatching( |
| handle, |
| wait_flags, |
| MojoDeadlineToTimeTicks(deadline), |
| base::Bind(&HandleWatcher::OnHandleReady, |
| start_state_->weak_factory.GetWeakPtr())); |
| } |
| |
| void HandleWatcher::Stop() { |
| if (!start_state_.get()) |
| return; |
| |
| scoped_ptr<StartState> old_state(start_state_.Pass()); |
| WatcherThreadManager::GetInstance()->StopWatching(old_state->watcher_id); |
| } |
| |
| void HandleWatcher::OnHandleReady(MojoResult result) { |
| DCHECK(start_state_.get()); |
| scoped_ptr<StartState> old_state(start_state_.Pass()); |
| old_state->callback.Run(result); |
| |
| // NOTE: We may have been deleted during callback execution. |
| } |
| |
| } // namespace common |
| } // namespace mojo |