blob: ca4091532fbd3d588f9ad659f5f868ae9f5a5fff [file] [log] [blame]
// Copyright 2012 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "ipc/ipc_sync_channel.h"
#include <stddef.h>
#include <stdint.h>
#include <memory>
#include <utility>
#include "base/functional/bind.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/memory/ptr_util.h"
#include "base/memory/raw_ptr.h"
#include "base/run_loop.h"
#include "base/synchronization/waitable_event.h"
#include "base/task/sequenced_task_runner.h"
#include "base/task/single_thread_task_runner.h"
#include "base/trace_event/trace_event.h"
#include "build/build_config.h"
#include "ipc/ipc_channel_factory.h"
#include "ipc/param_traits_macros.h"
#include "mojo/public/cpp/bindings/sync_event_watcher.h"
using base::WaitableEvent;
namespace IPC {
namespace {
// Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
constinit thread_local SyncChannel::ReceivedSyncMsgQueue* received_queue =
nullptr;
} // namespace
// Currently, this class remains to manage a shared waitable event on
// a thread across a number of sync channels on that thread.
class SyncChannel::ReceivedSyncMsgQueue :
public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> {
public:
// Returns the ReceivedSyncMsgQueue instance for this thread, creating one
// if necessary. Call RemoveContext on the same thread when done.
static ReceivedSyncMsgQueue* AddContext() {
// We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
// SyncChannel objects can block the same thread).
if (!received_queue) {
received_queue = new ReceivedSyncMsgQueue();
}
++received_queue->listener_count_;
return received_queue;
}
// SyncChannel calls this in its destructor.
void RemoveContext(SyncContext* context) {
base::AutoLock auto_lock(message_lock_);
if (--listener_count_ == 0) {
DCHECK(received_queue);
received_queue = nullptr;
sync_dispatch_watcher_.reset();
}
}
base::WaitableEvent* dispatch_event() { return &dispatch_event_; }
private:
friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>;
// See the comment in SyncChannel::SyncChannel for why this event is created
// as manual reset.
ReceivedSyncMsgQueue()
: dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL,
base::WaitableEvent::InitialState::NOT_SIGNALED),
sync_dispatch_watcher_(std::make_unique<mojo::SyncEventWatcher>(
&dispatch_event_,
base::BindRepeating(&ReceivedSyncMsgQueue::OnDispatchEventReady,
base::Unretained(this)))) {
sync_dispatch_watcher_->AllowWokenUpBySyncWatchOnSameThread();
}
~ReceivedSyncMsgQueue() = default;
void OnDispatchEventReady() {}
// Signaled when we get a synchronous message that we must respond to, as the
// sender needs its reply before it can reply to our original synchronous
// message.
base::WaitableEvent dispatch_event_;
base::Lock message_lock_;
int listener_count_ = 0;
// Watches |dispatch_event_| during all sync handle watches on this thread.
std::unique_ptr<mojo::SyncEventWatcher> sync_dispatch_watcher_;
};
SyncChannel::SyncContext::SyncContext(
Listener* listener,
const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
WaitableEvent* shutdown_event)
: ChannelProxy::Context(listener, ipc_task_runner, listener_task_runner),
received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
shutdown_event_(shutdown_event) {}
SyncChannel::SyncContext::~SyncContext() = default;
base::WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() {
return received_sync_msgs_->dispatch_event();
}
void SyncChannel::SyncContext::Clear() {
received_sync_msgs_->RemoveContext(this);
Context::Clear();
}
void SyncChannel::SyncContext::OnChannelError() {
shutdown_watcher_.StopWatching();
Context::OnChannelError();
}
void SyncChannel::SyncContext::OnChannelOpened() {
if (shutdown_event_) {
shutdown_watcher_.StartWatching(
shutdown_event_,
base::BindOnce(&SyncChannel::SyncContext::OnShutdownEventSignaled,
base::Unretained(this)),
base::SequencedTaskRunner::GetCurrentDefault());
}
Context::OnChannelOpened();
}
void SyncChannel::SyncContext::OnChannelClosed() {
shutdown_watcher_.StopWatching();
Context::OnChannelClosed();
}
void SyncChannel::SyncContext::OnShutdownEventSignaled(WaitableEvent* event) {
DCHECK_EQ(event, shutdown_event_);
}
// static
std::unique_ptr<SyncChannel> SyncChannel::Create(
Listener* listener,
const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
WaitableEvent* shutdown_event) {
return base::WrapUnique(new SyncChannel(
listener, ipc_task_runner, listener_task_runner, shutdown_event));
}
SyncChannel::SyncChannel(
Listener* listener,
const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
WaitableEvent* shutdown_event)
: ChannelProxy(new SyncContext(listener,
ipc_task_runner,
listener_task_runner,
shutdown_event)),
sync_handle_registry_(mojo::SyncHandleRegistry::current()) {
// The current (listener) thread must be distinct from the IPC thread, or else
// sending synchronous messages will deadlock.
DCHECK_NE(ipc_task_runner.get(),
base::SingleThreadTaskRunner::GetCurrentDefault().get());
StartWatching();
}
SyncChannel::~SyncChannel() = default;
void SyncChannel::OnDispatchEventSignaled(base::WaitableEvent* event) {
DCHECK_EQ(sync_context()->GetDispatchEvent(), event);
sync_context()->GetDispatchEvent()->Reset();
StartWatching();
}
void SyncChannel::StartWatching() {
// |dispatch_watcher_| watches the event asynchronously, only dispatching
// messages once the listener thread is unblocked and pumping its task queue.
// The ReceivedSyncMsgQueue also watches this event and may dispatch
// immediately if woken up by a message which it's allowed to dispatch.
dispatch_watcher_.StartWatching(
sync_context()->GetDispatchEvent(),
base::BindOnce(&SyncChannel::OnDispatchEventSignaled,
base::Unretained(this)),
sync_context()->listener_task_runner());
}
} // namespace IPC