| // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "jingle/glue/thread_wrapper.h" |
| |
| #include <stddef.h> |
| #include <stdint.h> |
| |
| #include "base/bind.h" |
| #include "base/bind_helpers.h" |
| #include "base/lazy_instance.h" |
| #include "base/macros.h" |
| #include "base/threading/thread_local.h" |
| #include "base/trace_event/trace_event.h" |
| #include "third_party/webrtc/rtc_base/nullsocketserver.h" |
| |
| namespace jingle_glue { |
| |
| struct JingleThreadWrapper::PendingSend { |
| PendingSend(const rtc::Message& message_value) |
| : sending_thread(JingleThreadWrapper::current()), |
| message(message_value), |
| done_event(base::WaitableEvent::ResetPolicy::MANUAL, |
| base::WaitableEvent::InitialState::NOT_SIGNALED) { |
| DCHECK(sending_thread); |
| } |
| |
| JingleThreadWrapper* sending_thread; |
| rtc::Message message; |
| base::WaitableEvent done_event; |
| }; |
| |
| base::LazyInstance<base::ThreadLocalPointer<JingleThreadWrapper>>:: |
| DestructorAtExit g_jingle_thread_wrapper = LAZY_INSTANCE_INITIALIZER; |
| |
| // static |
| void JingleThreadWrapper::EnsureForCurrentMessageLoop() { |
| if (JingleThreadWrapper::current() == nullptr) { |
| base::MessageLoopCurrent message_loop = base::MessageLoopCurrent::Get(); |
| std::unique_ptr<JingleThreadWrapper> wrapper = |
| JingleThreadWrapper::WrapTaskRunner(message_loop->task_runner()); |
| message_loop->AddDestructionObserver(wrapper.release()); |
| } |
| |
| DCHECK_EQ(rtc::Thread::Current(), current()); |
| } |
| |
| std::unique_ptr<JingleThreadWrapper> JingleThreadWrapper::WrapTaskRunner( |
| scoped_refptr<base::SingleThreadTaskRunner> task_runner) { |
| DCHECK(!JingleThreadWrapper::current()); |
| DCHECK(task_runner->BelongsToCurrentThread()); |
| |
| std::unique_ptr<JingleThreadWrapper> result( |
| new JingleThreadWrapper(task_runner)); |
| g_jingle_thread_wrapper.Get().Set(result.get()); |
| return result; |
| } |
| |
| // static |
| JingleThreadWrapper* JingleThreadWrapper::current() { |
| return g_jingle_thread_wrapper.Get().Get(); |
| } |
| |
| JingleThreadWrapper::JingleThreadWrapper( |
| scoped_refptr<base::SingleThreadTaskRunner> task_runner) |
| : task_runner_(task_runner), |
| send_allowed_(false), |
| last_task_id_(0), |
| pending_send_event_(base::WaitableEvent::ResetPolicy::MANUAL, |
| base::WaitableEvent::InitialState::NOT_SIGNALED), |
| weak_ptr_factory_(this) { |
| DCHECK(task_runner->BelongsToCurrentThread()); |
| DCHECK(!rtc::Thread::Current()); |
| weak_ptr_ = weak_ptr_factory_.GetWeakPtr(); |
| rtc::MessageQueueManager::Add(this); |
| SafeWrapCurrent(); |
| } |
| |
| JingleThreadWrapper::~JingleThreadWrapper() { |
| DCHECK_EQ(this, JingleThreadWrapper::current()); |
| DCHECK_EQ(this, rtc::Thread::Current()); |
| |
| UnwrapCurrent(); |
| rtc::ThreadManager::Instance()->SetCurrentThread(nullptr); |
| rtc::MessageQueueManager::Remove(this); |
| g_jingle_thread_wrapper.Get().Set(nullptr); |
| |
| Clear(nullptr, rtc::MQID_ANY, nullptr); |
| } |
| |
| void JingleThreadWrapper::WillDestroyCurrentMessageLoop() { |
| delete this; |
| } |
| |
| void JingleThreadWrapper::Post(const rtc::Location& posted_from, |
| rtc::MessageHandler* handler, |
| uint32_t message_id, |
| rtc::MessageData* data, |
| bool time_sensitive) { |
| PostTaskInternal(posted_from, 0, handler, message_id, data); |
| } |
| |
| void JingleThreadWrapper::PostDelayed(const rtc::Location& posted_from, |
| int delay_ms, |
| rtc::MessageHandler* handler, |
| uint32_t message_id, |
| rtc::MessageData* data) { |
| PostTaskInternal(posted_from, delay_ms, handler, message_id, data); |
| } |
| |
| void JingleThreadWrapper::Clear(rtc::MessageHandler* handler, |
| uint32_t id, |
| rtc::MessageList* removed) { |
| base::AutoLock auto_lock(lock_); |
| |
| for (MessagesQueue::iterator it = messages_.begin(); |
| it != messages_.end();) { |
| MessagesQueue::iterator next = it; |
| ++next; |
| |
| if (it->second.Match(handler, id)) { |
| if (removed) { |
| removed->push_back(it->second); |
| } else { |
| delete it->second.pdata; |
| } |
| messages_.erase(it); |
| } |
| |
| it = next; |
| } |
| |
| for (std::list<PendingSend*>::iterator it = pending_send_messages_.begin(); |
| it != pending_send_messages_.end();) { |
| std::list<PendingSend*>::iterator next = it; |
| ++next; |
| |
| if ((*it)->message.Match(handler, id)) { |
| if (removed) { |
| removed ->push_back((*it)->message); |
| } else { |
| delete (*it)->message.pdata; |
| } |
| (*it)->done_event.Signal(); |
| pending_send_messages_.erase(it); |
| } |
| |
| it = next; |
| } |
| } |
| |
| void JingleThreadWrapper::Dispatch(rtc::Message* message) { |
| TRACE_EVENT2("webrtc", "JingleThreadWrapper::Dispatch", "src_file_and_line", |
| message->posted_from.file_and_line(), "src_func", |
| message->posted_from.function_name()); |
| message->phandler->OnMessage(message); |
| } |
| |
| void JingleThreadWrapper::Send(const rtc::Location& posted_from, |
| rtc::MessageHandler* handler, |
| uint32_t id, |
| rtc::MessageData* data) { |
| JingleThreadWrapper* current_thread = JingleThreadWrapper::current(); |
| DCHECK(current_thread != nullptr) << "Send() can be called only from a " |
| "thread that has JingleThreadWrapper."; |
| |
| rtc::Message message; |
| message.posted_from = posted_from; |
| message.phandler = handler; |
| message.message_id = id; |
| message.pdata = data; |
| |
| if (current_thread == this) { |
| Dispatch(&message); |
| return; |
| } |
| |
| // Send message from a thread different than |this|. |
| |
| // Allow inter-thread send only from threads that have |
| // |send_allowed_| flag set. |
| DCHECK(current_thread->send_allowed_) << "Send()'ing synchronous " |
| "messages is not allowed from the current thread."; |
| |
| PendingSend pending_send(message); |
| { |
| base::AutoLock auto_lock(lock_); |
| pending_send_messages_.push_back(&pending_send); |
| } |
| |
| // Need to signal |pending_send_event_| here in case the thread is |
| // sending message to another thread. |
| pending_send_event_.Signal(); |
| task_runner_->PostTask(FROM_HERE, |
| base::Bind(&JingleThreadWrapper::ProcessPendingSends, |
| weak_ptr_)); |
| |
| |
| while (!pending_send.done_event.IsSignaled()) { |
| base::WaitableEvent* events[] = {&pending_send.done_event, |
| ¤t_thread->pending_send_event_}; |
| size_t event = base::WaitableEvent::WaitMany(events, arraysize(events)); |
| DCHECK(event == 0 || event == 1); |
| |
| if (event == 1) |
| current_thread->ProcessPendingSends(); |
| } |
| } |
| |
| void JingleThreadWrapper::ProcessPendingSends() { |
| while (true) { |
| PendingSend* pending_send = nullptr; |
| { |
| base::AutoLock auto_lock(lock_); |
| if (!pending_send_messages_.empty()) { |
| pending_send = pending_send_messages_.front(); |
| pending_send_messages_.pop_front(); |
| } else { |
| // Reset the event while |lock_| is still locked. |
| pending_send_event_.Reset(); |
| break; |
| } |
| } |
| if (pending_send) { |
| Dispatch(&pending_send->message); |
| pending_send->done_event.Signal(); |
| } |
| } |
| } |
| |
| void JingleThreadWrapper::PostTaskInternal(const rtc::Location& posted_from, |
| int delay_ms, |
| rtc::MessageHandler* handler, |
| uint32_t message_id, |
| rtc::MessageData* data) { |
| int task_id; |
| rtc::Message message; |
| message.posted_from = posted_from; |
| message.phandler = handler; |
| message.message_id = message_id; |
| message.pdata = data; |
| { |
| base::AutoLock auto_lock(lock_); |
| task_id = ++last_task_id_; |
| messages_.insert(std::pair<int, rtc::Message>(task_id, message)); |
| } |
| |
| if (delay_ms <= 0) { |
| task_runner_->PostTask(FROM_HERE, |
| base::Bind(&JingleThreadWrapper::RunTask, |
| weak_ptr_, task_id)); |
| } else { |
| task_runner_->PostDelayedTask(FROM_HERE, |
| base::Bind(&JingleThreadWrapper::RunTask, |
| weak_ptr_, task_id), |
| base::TimeDelta::FromMilliseconds(delay_ms)); |
| } |
| } |
| |
| void JingleThreadWrapper::RunTask(int task_id) { |
| bool have_message = false; |
| rtc::Message message; |
| { |
| base::AutoLock auto_lock(lock_); |
| MessagesQueue::iterator it = messages_.find(task_id); |
| if (it != messages_.end()) { |
| have_message = true; |
| message = it->second; |
| messages_.erase(it); |
| } |
| } |
| |
| if (have_message) { |
| if (message.message_id == rtc::MQID_DISPOSE) { |
| DCHECK(message.phandler == nullptr); |
| delete message.pdata; |
| } else { |
| Dispatch(&message); |
| } |
| } |
| } |
| |
| // All methods below are marked as not reached. See comments in the |
| // header for more details. |
| void JingleThreadWrapper::Quit() { |
| NOTREACHED(); |
| } |
| |
| bool JingleThreadWrapper::IsQuitting() { |
| NOTREACHED(); |
| return false; |
| } |
| |
| void JingleThreadWrapper::Restart() { |
| NOTREACHED(); |
| } |
| |
| bool JingleThreadWrapper::Get(rtc::Message*, int, bool) { |
| NOTREACHED(); |
| return false; |
| } |
| |
| bool JingleThreadWrapper::Peek(rtc::Message*, int) { |
| NOTREACHED(); |
| return false; |
| } |
| |
| void JingleThreadWrapper::PostAt(const rtc::Location& posted_from, |
| uint32_t, |
| rtc::MessageHandler*, |
| uint32_t, |
| rtc::MessageData*) { |
| NOTREACHED(); |
| } |
| |
| void JingleThreadWrapper::ReceiveSends() { |
| NOTREACHED(); |
| } |
| |
| int JingleThreadWrapper::GetDelay() { |
| NOTREACHED(); |
| return 0; |
| } |
| |
| void JingleThreadWrapper::Stop() { |
| NOTREACHED(); |
| } |
| |
| void JingleThreadWrapper::Run() { |
| NOTREACHED(); |
| } |
| |
| } // namespace jingle_glue |