Add multiplexing of message pipes in the new EDK.
This dramatically cuts down on the number of OS pipes used. The key is that the vast majority (all for now) of message pipes are only read or written in one process. If they're passed to another process, that is before they're interacted with. So by default, create message pipes such that they're not transferable after they're read or written. A non-transferable pipe is then just a unique ID. When two process "bind" their end by reading/writing to that ID, the parent process ensures that the two processes have a channel between them and tells them to connect to each other using it.
"Transferable" message pipes can still be created and these can be sent after they're read or written to, by specifying MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE.
BUG=556259
R=tsepez@chromium.org, yzshen@chromium.org
Review URL: https://codereview.chromium.org/1488853002 .
Cr-Commit-Position: refs/heads/master@{#363386}
diff --git a/components/web_view/frame.cc b/components/web_view/frame.cc
index 39077ae..de267a3 100644
--- a/components/web_view/frame.cc
+++ b/components/web_view/frame.cc
@@ -387,12 +387,12 @@
// case we do not use the WindowTreeClient (because the app has a Window
// already
// and ends up reusing it).
- DCHECK(!window_tree_client.get());
+ DCHECK(!window_tree_client);
ChangeClient(frame_client, user_data.Pass(), window_tree_client.Pass(),
app_id, navigation_start_time);
} else {
waiting_for_on_will_navigate_ack_ = true;
- DCHECK(window_tree_client.get());
+ DCHECK(window_tree_client);
// TODO(sky): url isn't correct here, it should be a security origin.
frame_client_->OnWillNavigate(
url.spec(),
diff --git a/content/browser/browser_child_process_host_impl.cc b/content/browser/browser_child_process_host_impl.cc
index e297d778..671708a 100644
--- a/content/browser/browser_child_process_host_impl.cc
+++ b/content/browser/browser_child_process_host_impl.cc
@@ -401,8 +401,6 @@
const base::Process& process = child_process_->GetProcess();
DCHECK(process.IsValid());
-#if defined(OS_WIN)
- // TODO(jam): enable on POSIX
if (base::CommandLine::ForCurrentProcess()->HasSwitch("use-new-edk")) {
mojo::embedder::ScopedPlatformHandle client_pipe =
mojo::embedder::ChildProcessLaunched(process.Handle());
@@ -415,7 +413,6 @@
#endif
process.Handle(), true)));
}
-#endif
#if defined(OS_WIN)
// Start a WaitableEventWatcher that will invoke OnProcessExitedEarly if the
diff --git a/content/browser/renderer_host/render_process_host_impl.cc b/content/browser/renderer_host/render_process_host_impl.cc
index c695f6f..12fb07aa 100644
--- a/content/browser/renderer_host/render_process_host_impl.cc
+++ b/content/browser/renderer_host/render_process_host_impl.cc
@@ -2478,8 +2478,6 @@
SendExternalMojoShellHandleToChild(GetHandle(), this);
#endif
-#if defined(OS_WIN)
- // TODO(jam): enable on POSIX
if (base::CommandLine::ForCurrentProcess()->HasSwitch("use-new-edk") &&
child_process_launcher_.get()) {
base::ProcessHandle process_handle =
@@ -2495,7 +2493,6 @@
#endif
process_handle, true)));
}
-#endif
// Allow Mojo to be setup before the renderer sees any Chrome IPC messages.
// This way, Mojo can be safely used from the renderer in response to any
diff --git a/content/child/child_thread_impl.cc b/content/child/child_thread_impl.cc
index cf466cc..07977ba 100644
--- a/content/child/child_thread_impl.cc
+++ b/content/child/child_thread_impl.cc
@@ -414,6 +414,7 @@
IPC::Logging::GetInstance()->SetIPCSender(this);
#endif
+ mojo_ipc_support_.reset(new IPC::ScopedIPCSupport(GetIOTaskRunner()));
mojo_application_.reset(new MojoApplication(GetIOTaskRunner()));
sync_message_filter_ = channel_->CreateSyncMessageFilter();
@@ -653,10 +654,8 @@
OnProcessBackgrounded)
IPC_MESSAGE_HANDLER(MojoMsg_BindExternalMojoShellHandle,
OnBindExternalMojoShellHandle)
-#if defined(OS_WIN)
IPC_MESSAGE_HANDLER(ChildProcessMsg_SetMojoParentPipeHandle,
OnSetMojoParentPipeHandle)
-#endif
IPC_MESSAGE_UNHANDLED(handled = false)
IPC_END_MESSAGE_MAP()
diff --git a/content/child/child_thread_impl.h b/content/child/child_thread_impl.h
index 09a58b8..9f4df7f 100644
--- a/content/child/child_thread_impl.h
+++ b/content/child/child_thread_impl.h
@@ -235,6 +235,7 @@
void EnsureConnected();
+ scoped_ptr<IPC::ScopedIPCSupport> mojo_ipc_support_;
scoped_ptr<MojoApplication> mojo_application_;
std::string channel_name_;
diff --git a/content/renderer/DEPS b/content/renderer/DEPS
index b070955..3a49854 100644
--- a/content/renderer/DEPS
+++ b/content/renderer/DEPS
@@ -29,5 +29,8 @@
'.*_[a-z]*browsertest.*': [
"+content/public/browser",
"+content/shell",
+ ],
+ "render_thread_impl_browsertest\.cc": [
+ "+content/app/mojo/mojo_init.h",
],
}
diff --git a/content/renderer/render_thread_impl_browsertest.cc b/content/renderer/render_thread_impl_browsertest.cc
index c31cc14..d814e56 100644
--- a/content/renderer/render_thread_impl_browsertest.cc
+++ b/content/renderer/render_thread_impl_browsertest.cc
@@ -9,7 +9,8 @@
#include "base/single_thread_task_runner.h"
#include "base/strings/string_number_conversions.h"
#include "base/thread_task_runner_handle.h"
-#include "components/scheduler/renderer/renderer_scheduler.h"
+#include "components/scheduler/renderer/renderer_scheduler.h"
+#include "content/app/mojo/mojo_init.h"
#include "content/common/in_process_child_thread_params.h"
#include "content/common/resource_messages.h"
#include "content/common/websocket_messages.h"
@@ -173,6 +174,7 @@
scoped_ptr<scheduler::RendererScheduler> renderer_scheduler =
scheduler::RendererScheduler::Create();
+ InitializeMojo();
thread_ = new RenderThreadImplForTest(
InProcessChildThreadParams(test_helper_->GetChannelId(),
test_helper_->GetIOTaskRunner()),
diff --git a/mojo/edk/embedder/embedder.cc b/mojo/edk/embedder/embedder.cc
index eb572c57..35e55ba 100644
--- a/mojo/edk/embedder/embedder.cc
+++ b/mojo/edk/embedder/embedder.cc
@@ -65,15 +65,9 @@
}
ScopedPlatformHandle ChildProcessLaunched(base::ProcessHandle child_process) {
-#if defined(OS_WIN)
PlatformChannelPair token_channel;
new ChildBrokerHost(child_process, token_channel.PassServerHandle());
return token_channel.PassClientHandle();
-#else
- // TODO(jam): create this for POSIX. Need to implement channel reading first
- // so we don't leak handles.
- return ScopedPlatformHandle();
-#endif
}
void ChildProcessLaunched(base::ProcessHandle child_process,
@@ -165,9 +159,11 @@
ScopedMessagePipeHandle CreateMessagePipe(
ScopedPlatformHandle platform_handle) {
+ MojoCreateMessagePipeOptions options = {
+ static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)),
+ MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE};
scoped_refptr<MessagePipeDispatcher> dispatcher =
- MessagePipeDispatcher::Create(
- MessagePipeDispatcher::kDefaultCreateOptions);
+ MessagePipeDispatcher::Create(options);
ScopedMessagePipeHandle rv(
MessagePipeHandle(internal::g_core->AddDispatcher(dispatcher)));
diff --git a/mojo/edk/embedder/embedder_unittest.cc b/mojo/edk/embedder/embedder_unittest.cc
index 60a2b801..4f6493f 100644
--- a/mojo/edk/embedder/embedder_unittest.cc
+++ b/mojo/edk/embedder/embedder_unittest.cc
@@ -34,14 +34,7 @@
MOJO_HANDLE_SIGNAL_WRITABLE |
MOJO_HANDLE_SIGNAL_PEER_CLOSED;
-class EmbedderTest : public test::MojoSystemTest {
- public:
- EmbedderTest() {}
- ~EmbedderTest() override {}
-
- private:
- MOJO_DISALLOW_COPY_AND_ASSIGN(EmbedderTest);
-};
+typedef testing::Test EmbedderTest;
TEST_F(EmbedderTest, ChannelBasic) {
MojoHandle server_mp, client_mp;
@@ -85,8 +78,11 @@
MojoCreateMessagePipe(nullptr, &server_mp, &client_mp));
MojoHandle server_mp2, client_mp2;
+ MojoCreateMessagePipeOptions options;
+ options.struct_size = sizeof(MojoCreateMessagePipeOptions);
+ options.flags = MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE;
ASSERT_EQ(MOJO_RESULT_OK,
- MojoCreateMessagePipe(nullptr, &server_mp2, &client_mp2));
+ MojoCreateMessagePipe(&options, &server_mp2, &client_mp2));
// Write to server2 and wait for client2 to be readable before sending it.
// client2's MessagePipeDispatcher will have the message below in its
@@ -165,9 +161,12 @@
ASSERT_EQ(MOJO_RESULT_OK,
MojoCreateMessagePipe(nullptr, &server_mp, &client_mp));
+ MojoCreateMessagePipeOptions options;
+ options.struct_size = sizeof(MojoCreateMessagePipeOptions);
+ options.flags = MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE;
MojoHandle server_mp2, client_mp2;
ASSERT_EQ(MOJO_RESULT_OK,
- MojoCreateMessagePipe(nullptr, &server_mp2, &client_mp2));
+ MojoCreateMessagePipe(&options, &server_mp2, &client_mp2));
static const size_t kNumMessages = 1001;
for (size_t i = 0; i < kNumMessages; i++) {
@@ -465,114 +464,108 @@
}
MOJO_MULTIPROCESS_TEST_CHILD_TEST(MultiprocessChannelsClient) {
- base::MessageLoop message_loop;
ScopedPlatformHandle client_platform_handle =
test::MultiprocessTestHelper::client_platform_handle.Pass();
EXPECT_TRUE(client_platform_handle.is_valid());
- base::TestIOThread test_io_thread(base::TestIOThread::kAutoStart);
+ MojoHandle client_mp = CreateMessagePipe(
+ client_platform_handle.Pass()).release().value();
- {
- test::ScopedIPCSupport ipc_support(test_io_thread.task_runner());
- MojoHandle client_mp = CreateMessagePipe(
- client_platform_handle.Pass()).release().value();
+ // 1. Read the first message from |client_mp|.
+ MojoHandleSignalsState state;
+ ASSERT_EQ(MOJO_RESULT_OK, MojoWait(client_mp, MOJO_HANDLE_SIGNAL_READABLE,
+ MOJO_DEADLINE_INDEFINITE, &state));
+ ASSERT_EQ(kSignalReadadableWritable, state.satisfied_signals);
+ ASSERT_EQ(kSignalAll, state.satisfiable_signals);
- // 1. Read the first message from |client_mp|.
- MojoHandleSignalsState state;
- ASSERT_EQ(MOJO_RESULT_OK, MojoWait(client_mp, MOJO_HANDLE_SIGNAL_READABLE,
- MOJO_DEADLINE_INDEFINITE, &state));
- ASSERT_EQ(kSignalReadadableWritable, state.satisfied_signals);
- ASSERT_EQ(kSignalAll, state.satisfiable_signals);
+ char buffer[1000] = {};
+ uint32_t num_bytes = static_cast<uint32_t>(sizeof(buffer));
+ ASSERT_EQ(MOJO_RESULT_OK,
+ MojoReadMessage(client_mp, buffer, &num_bytes, nullptr, nullptr,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ const char kHello[] = "hello";
+ ASSERT_EQ(sizeof(kHello), num_bytes);
+ EXPECT_STREQ(kHello, buffer);
- char buffer[1000] = {};
- uint32_t num_bytes = static_cast<uint32_t>(sizeof(buffer));
- ASSERT_EQ(MOJO_RESULT_OK,
- MojoReadMessage(client_mp, buffer, &num_bytes, nullptr, nullptr,
- MOJO_READ_MESSAGE_FLAG_NONE));
- const char kHello[] = "hello";
- ASSERT_EQ(sizeof(kHello), num_bytes);
- EXPECT_STREQ(kHello, buffer);
+ // 2. Write a message to |client_mp| (attaching nothing).
+ const char kWorld[] = "world!";
+ ASSERT_EQ(MOJO_RESULT_OK,
+ MojoWriteMessage(client_mp, kWorld,
+ static_cast<uint32_t>(sizeof(kWorld)), nullptr,
+ 0, MOJO_WRITE_MESSAGE_FLAG_NONE));
- // 2. Write a message to |client_mp| (attaching nothing).
- const char kWorld[] = "world!";
- ASSERT_EQ(MOJO_RESULT_OK,
- MojoWriteMessage(client_mp, kWorld,
- static_cast<uint32_t>(sizeof(kWorld)), nullptr,
- 0, MOJO_WRITE_MESSAGE_FLAG_NONE));
+ // 4. Read a message from |client_mp|, which should have |mp1| attached.
+ ASSERT_EQ(MOJO_RESULT_OK, MojoWait(client_mp, MOJO_HANDLE_SIGNAL_READABLE,
+ MOJO_DEADLINE_INDEFINITE, &state));
+ // The other end of the handle may or may not be closed at this point, so we
+ // can't test MOJO_HANDLE_SIGNAL_WRITABLE or MOJO_HANDLE_SIGNAL_PEER_CLOSED.
+ ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE,
+ state.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE);
+ ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE,
+ state.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE);
+ // TODO(vtl): If the scope were to end here (and |client_mp| closed), we'd
+ // die (again due to |Channel::HandleLocalError()|).
+ memset(buffer, 0, sizeof(buffer));
+ num_bytes = static_cast<uint32_t>(sizeof(buffer));
+ MojoHandle mp1 = MOJO_HANDLE_INVALID;
+ uint32_t num_handles = 1;
+ ASSERT_EQ(MOJO_RESULT_OK,
+ MojoReadMessage(client_mp, buffer, &num_bytes, &mp1, &num_handles,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ const char kBar[] = "Bar";
+ ASSERT_EQ(sizeof(kBar), num_bytes);
+ EXPECT_STREQ(kBar, buffer);
+ ASSERT_EQ(1u, num_handles);
+ EXPECT_NE(mp1, MOJO_HANDLE_INVALID);
+ // TODO(vtl): If the scope were to end here (and the two handles closed),
+ // we'd die due to |Channel::RunRemoteMessagePipeEndpoint()| not handling
+ // write errors (assuming the parent had closed the pipe).
- // 4. Read a message from |client_mp|, which should have |mp1| attached.
- ASSERT_EQ(MOJO_RESULT_OK, MojoWait(client_mp, MOJO_HANDLE_SIGNAL_READABLE,
- MOJO_DEADLINE_INDEFINITE, &state));
- // The other end of the handle may or may not be closed at this point, so we
- // can't test MOJO_HANDLE_SIGNAL_WRITABLE or MOJO_HANDLE_SIGNAL_PEER_CLOSED.
- ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE,
- state.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE);
- ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE,
- state.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE);
- // TODO(vtl): If the scope were to end here (and |client_mp| closed), we'd
- // die (again due to |Channel::HandleLocalError()|).
- memset(buffer, 0, sizeof(buffer));
- num_bytes = static_cast<uint32_t>(sizeof(buffer));
- MojoHandle mp1 = MOJO_HANDLE_INVALID;
- uint32_t num_handles = 1;
- ASSERT_EQ(MOJO_RESULT_OK,
- MojoReadMessage(client_mp, buffer, &num_bytes, &mp1, &num_handles,
- MOJO_READ_MESSAGE_FLAG_NONE));
- const char kBar[] = "Bar";
- ASSERT_EQ(sizeof(kBar), num_bytes);
- EXPECT_STREQ(kBar, buffer);
- ASSERT_EQ(1u, num_handles);
- EXPECT_NE(mp1, MOJO_HANDLE_INVALID);
- // TODO(vtl): If the scope were to end here (and the two handles closed),
- // we'd die due to |Channel::RunRemoteMessagePipeEndpoint()| not handling
- // write errors (assuming the parent had closed the pipe).
+ // 6. Close |client_mp|.
+ ASSERT_EQ(MOJO_RESULT_OK, MojoClose(client_mp));
- // 6. Close |client_mp|.
- ASSERT_EQ(MOJO_RESULT_OK, MojoClose(client_mp));
+ // Create a new message pipe (endpoints |mp2| and |mp3|).
+ MojoHandle mp2, mp3;
+ ASSERT_EQ(MOJO_RESULT_OK, MojoCreateMessagePipe(nullptr, &mp2, &mp3));
- // Create a new message pipe (endpoints |mp2| and |mp3|).
- MojoHandle mp2, mp3;
- ASSERT_EQ(MOJO_RESULT_OK, MojoCreateMessagePipe(nullptr, &mp2, &mp3));
+ // 7. Write a message to |mp3|.
+ const char kBaz[] = "baz";
+ ASSERT_EQ(MOJO_RESULT_OK,
+ MojoWriteMessage(mp3, kBaz, static_cast<uint32_t>(sizeof(kBaz)),
+ nullptr, 0, MOJO_WRITE_MESSAGE_FLAG_NONE));
- // 7. Write a message to |mp3|.
- const char kBaz[] = "baz";
- ASSERT_EQ(MOJO_RESULT_OK,
- MojoWriteMessage(mp3, kBaz, static_cast<uint32_t>(sizeof(kBaz)),
- nullptr, 0, MOJO_WRITE_MESSAGE_FLAG_NONE));
+ // 8. Close |mp3|.
+ ASSERT_EQ(MOJO_RESULT_OK, MojoClose(mp3));
- // 8. Close |mp3|.
- ASSERT_EQ(MOJO_RESULT_OK, MojoClose(mp3));
+ // 9. Write a message to |mp1|, attaching |mp2|.
+ const char kQuux[] = "quux";
+ ASSERT_EQ(MOJO_RESULT_OK,
+ MojoWriteMessage(mp1, kQuux, static_cast<uint32_t>(sizeof(kQuux)),
+ &mp2, 1, MOJO_WRITE_MESSAGE_FLAG_NONE));
+ mp2 = MOJO_HANDLE_INVALID;
- // 9. Write a message to |mp1|, attaching |mp2|.
- const char kQuux[] = "quux";
- ASSERT_EQ(MOJO_RESULT_OK,
- MojoWriteMessage(mp1, kQuux, static_cast<uint32_t>(sizeof(kQuux)),
- &mp2, 1, MOJO_WRITE_MESSAGE_FLAG_NONE));
- mp2 = MOJO_HANDLE_INVALID;
+ // 3. Read a message from |mp1|.
+ ASSERT_EQ(MOJO_RESULT_OK, MojoWait(mp1, MOJO_HANDLE_SIGNAL_READABLE,
+ MOJO_DEADLINE_INDEFINITE, &state));
+ ASSERT_EQ(kSignalReadadableWritable, state.satisfied_signals);
+ ASSERT_EQ(kSignalAll, state.satisfiable_signals);
- // 3. Read a message from |mp1|.
- ASSERT_EQ(MOJO_RESULT_OK, MojoWait(mp1, MOJO_HANDLE_SIGNAL_READABLE,
- MOJO_DEADLINE_INDEFINITE, &state));
- ASSERT_EQ(kSignalReadadableWritable, state.satisfied_signals);
- ASSERT_EQ(kSignalAll, state.satisfiable_signals);
+ memset(buffer, 0, sizeof(buffer));
+ num_bytes = static_cast<uint32_t>(sizeof(buffer));
+ ASSERT_EQ(MOJO_RESULT_OK,
+ MojoReadMessage(mp1, buffer, &num_bytes, nullptr, nullptr,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ const char kFoo[] = "FOO";
+ ASSERT_EQ(sizeof(kFoo), num_bytes);
+ EXPECT_STREQ(kFoo, buffer);
- memset(buffer, 0, sizeof(buffer));
- num_bytes = static_cast<uint32_t>(sizeof(buffer));
- ASSERT_EQ(MOJO_RESULT_OK,
- MojoReadMessage(mp1, buffer, &num_bytes, nullptr, nullptr,
- MOJO_READ_MESSAGE_FLAG_NONE));
- const char kFoo[] = "FOO";
- ASSERT_EQ(sizeof(kFoo), num_bytes);
- EXPECT_STREQ(kFoo, buffer);
-
- // 11. Wait on |mp1| (which should eventually fail) and then close it.
- ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
- MojoWait(mp1, MOJO_HANDLE_SIGNAL_READABLE,
- MOJO_DEADLINE_INDEFINITE, &state));
- ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, state.satisfied_signals);
- ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, state.satisfiable_signals);
- ASSERT_EQ(MOJO_RESULT_OK, MojoClose(mp1));
- }
+ // 11. Wait on |mp1| (which should eventually fail) and then close it.
+ ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ MojoWait(mp1, MOJO_HANDLE_SIGNAL_READABLE,
+ MOJO_DEADLINE_INDEFINITE, &state));
+ ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, state.satisfied_signals);
+ ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, state.satisfiable_signals);
+ ASSERT_EQ(MOJO_RESULT_OK, MojoClose(mp1));
}
// TODO(vtl): Test immediate write & close.
diff --git a/mojo/edk/system/BUILD.gn b/mojo/edk/system/BUILD.gn
index fe113d2..54e3e3e 100644
--- a/mojo/edk/system/BUILD.gn
+++ b/mojo/edk/system/BUILD.gn
@@ -69,6 +69,8 @@
"raw_channel.h",
"raw_channel_posix.cc",
"raw_channel_win.cc",
+ "routed_raw_channel.cc",
+ "routed_raw_channel.h",
"shared_buffer_dispatcher.cc",
"shared_buffer_dispatcher.h",
"simple_dispatcher.cc",
diff --git a/mojo/edk/system/broker.h b/mojo/edk/system/broker.h
index ce4c9565..f78fff7 100644
--- a/mojo/edk/system/broker.h
+++ b/mojo/edk/system/broker.h
@@ -6,23 +6,26 @@
#define MOJO_EDK_SYSTEM_BROKER_H_
#include <stdint.h>
-#include <vector>
#include "mojo/edk/embedder/scoped_platform_handle.h"
namespace mojo {
namespace edk {
+class MessagePipeDispatcher;
+class RawChannel;
// An interface for communicating to a central "broker" process from each
-// process using the EDK. This is needed because child processes are sandboxed.
-// It is safe to call from any thread.
+// process using the EDK. It serves two purposes:
+// 1) Windows only: brokering to help child processes as they can't create
+// named pipes or duplicate handles.
+// 2) All platforms: support multiplexed messages pipes.
+
class MOJO_SYSTEM_IMPL_EXPORT Broker {
public:
virtual ~Broker() {}
#if defined(OS_WIN)
- // All these methods are needed because sandboxed Windows processes can't
- // create named pipes or duplicate handles.
+ // It is safe to call these three methods from any thread.
// Create a PlatformChannelPair.
virtual void CreatePlatformChannelPair(ScopedPlatformHandle* server,
@@ -40,6 +43,20 @@
size_t count,
PlatformHandle* handles) = 0;
#endif
+
+ // Multiplexing related methods. They are called from the IO thread only.
+
+ // Called by |message_pipe| so that it receives messages for the given
+ // globally unique |pipe_id|. When the connection is established,
+ // MessagePipeDispatcher::GotNonTransferableChannel is called with the channel
+ // that it can use for sending messages.
+ virtual void ConnectMessagePipe(uint64_t pipe_id,
+ MessagePipeDispatcher* message_pipe) = 0;
+
+ // Called by |message_pipe| when it's closing so that its route can be
+ // unregistered.
+ virtual void CloseMessagePipe(uint64_t pipe_id,
+ MessagePipeDispatcher* message_pipe) = 0;
};
} // namespace edk
diff --git a/mojo/edk/system/broker_messages.h b/mojo/edk/system/broker_messages.h
index 351cbe6..7d8946d 100644
--- a/mojo/edk/system/broker_messages.h
+++ b/mojo/edk/system/broker_messages.h
@@ -8,6 +8,7 @@
#include <stdint.h>
#include "base/compiler_specific.h"
+#include "base/process/process_handle.h"
namespace mojo {
namespace edk {
@@ -15,7 +16,11 @@
// This header defines the message format between ChildBroker and
// ChildBrokerHost.
-enum MessageId {
+#if defined(OS_WIN)
+// Windows only messages needed because sandboxed child processes need the
+// parent's help. They are sent synchronously from child to parent and each have
+// a response. They are sent over a raw pipe.
+enum WindowsSandboxMessages {
// The reply is two HANDLEs.
CREATE_PLATFORM_CHANNEL_PAIR = 0,
// The reply is tokens of the same count of passed in handles.
@@ -28,17 +33,48 @@
struct BrokerMessage {
uint32_t size;
- MessageId id;
+ WindowsSandboxMessages id;
// Data, if any, follows.
union {
-#if defined(OS_WIN)
HANDLE handles[1]; // If HANDLE_TO_TOKEN.
uint64_t tokens[1]; // If TOKEN_TO_HANDLE.
-#endif
};
};
-const int kBrokerMessageHeaderSize = sizeof(uint32_t) + sizeof(MessageId);
+const int kBrokerMessageHeaderSize =
+ sizeof(uint32_t) + sizeof(WindowsSandboxMessages);
+
+#endif
+
+// Multiplexing related messages. They are all asynchronous messages.
+// They are sent over RawChannel.
+enum MultiplexMessages {
+ // Messages from child to parent.
+ CONNECT_MESSAGE_PIPE = 0,
+ CANCEL_CONNECT_MESSAGE_PIPE,
+
+ // Messages from parent to child.
+ CONNECT_TO_PROCESS,
+ PEER_PIPE_CONNECTED,
+};
+
+struct ConnectMessagePipeMessage {
+ // CONNECT_MESSAGE_PIPE or CANCEL_CONNECT_MESSAGE_PIPE
+ MultiplexMessages type;
+ uint64_t pipe_id;
+};
+
+struct ConnectToProcessMessage {
+ MultiplexMessages type; // CONNECT_TO_PROCESS
+ base::ProcessId process_id;
+ // Also has an attached platform handle.
+};
+
+struct PeerPipeConnectedMessage {
+ MultiplexMessages type; // PEER_PIPE_CONNECTED
+ uint64_t pipe_id;
+ base::ProcessId process_id;
+};
} // namespace edk
} // namespace mojo
diff --git a/mojo/edk/system/broker_state.cc b/mojo/edk/system/broker_state.cc
index 6eb4ae9..b875b31 100644
--- a/mojo/edk/system/broker_state.cc
+++ b/mojo/edk/system/broker_state.cc
@@ -4,9 +4,13 @@
#include "mojo/edk/system/broker_state.h"
+#include "base/bind.h"
#include "base/rand_util.h"
#include "mojo/edk/embedder/embedder_internal.h"
#include "mojo/edk/embedder/platform_channel_pair.h"
+#include "mojo/edk/system/child_broker_host.h"
+#include "mojo/edk/system/message_pipe_dispatcher.h"
+#include "mojo/edk/system/routed_raw_channel.h"
namespace mojo {
namespace edk {
@@ -28,7 +32,7 @@
const PlatformHandle* platform_handles,
size_t count,
uint64_t* tokens) {
- base::AutoLock auto_locker(lock_);
+ base::AutoLock auto_locker(token_map_lock_);
for (size_t i = 0; i < count; ++i) {
if (platform_handles[i].is_valid()) {
uint64_t token;
@@ -47,7 +51,7 @@
void BrokerState::TokenToHandle(const uint64_t* tokens,
size_t count,
PlatformHandle* handles) {
- base::AutoLock auto_locker(lock_);
+ base::AutoLock auto_locker(token_map_lock_);
for (size_t i = 0; i < count; ++i) {
auto it = token_map_.find(tokens[i]);
if (it == token_map_.end()) {
@@ -60,9 +64,155 @@
}
#endif
-BrokerState::BrokerState() : broker_thread_("Mojo Broker Thread") {
- base::Thread::Options options(base::MessageLoop::TYPE_IO, 0);
- broker_thread_.StartWithOptions(options);
+void BrokerState::ConnectMessagePipe(uint64_t pipe_id,
+ MessagePipeDispatcher* message_pipe) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ base::AutoLock auto_lock(lock_);
+ if (pending_connects_.find(pipe_id) != pending_connects_.end()) {
+ // Both ends of the message pipe are in this process.
+ if (!in_process_pipes_channel1_) {
+ PlatformChannelPair channel_pair;
+ in_process_pipes_channel1_ = new RoutedRawChannel(
+ channel_pair.PassServerHandle(),
+ base::Bind(&BrokerState::ChannelDestructed, base::Unretained(this)));
+ in_process_pipes_channel2_ = new RoutedRawChannel(
+ channel_pair.PassClientHandle(),
+ base::Bind(&BrokerState::ChannelDestructed, base::Unretained(this)));
+ }
+
+ connected_pipes_[pending_connects_[pipe_id]] = in_process_pipes_channel1_;
+ connected_pipes_[message_pipe] = in_process_pipes_channel2_;
+ in_process_pipes_channel1_->AddRoute(pipe_id, pending_connects_[pipe_id]);
+ in_process_pipes_channel2_->AddRoute(pipe_id, message_pipe);
+ pending_connects_[pipe_id]->GotNonTransferableChannel(
+ in_process_pipes_channel1_->channel());
+ message_pipe->GotNonTransferableChannel(
+ in_process_pipes_channel2_->channel());
+
+ pending_connects_.erase(pipe_id);
+ return;
+ }
+
+ if (pending_child_connects_.find(pipe_id) != pending_child_connects_.end()) {
+ // A child process has already tried to connect.
+ EnsureProcessesConnected(base::GetCurrentProcId(),
+ pending_child_connects_[pipe_id]->GetProcessId());
+ pending_child_connects_[pipe_id]->ConnectMessagePipe(
+ pipe_id, base::GetCurrentProcId());
+ base::ProcessId peer_pid = pending_child_connects_[pipe_id]->GetProcessId();
+ pending_child_connects_.erase(pipe_id);
+ connected_pipes_[message_pipe] = child_channels_[peer_pid];
+ child_channels_[peer_pid]->AddRoute(pipe_id, message_pipe);
+ message_pipe->GotNonTransferableChannel(
+ child_channels_[peer_pid]->channel());
+ return;
+ }
+
+ pending_connects_[pipe_id] = message_pipe;
+}
+
+void BrokerState::CloseMessagePipe(uint64_t pipe_id,
+ MessagePipeDispatcher* message_pipe) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ base::AutoLock auto_lock(lock_);
+
+ CHECK(connected_pipes_.find(message_pipe) != connected_pipes_.end());
+ connected_pipes_[message_pipe]->RemoveRoute(pipe_id, message_pipe);
+ connected_pipes_.erase(message_pipe);
+}
+
+void BrokerState::ChildBrokerHostCreated(ChildBrokerHost* child_broker_host) {
+ base::AutoLock auto_lock(lock_);
+ CHECK(child_processes_.find(child_broker_host->GetProcessId()) ==
+ child_processes_.end());
+ child_processes_[child_broker_host->GetProcessId()] = child_broker_host;
+}
+
+void BrokerState::ChildBrokerHostDestructed(
+ ChildBrokerHost* child_broker_host) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ base::AutoLock auto_lock(lock_);
+
+ for (auto it = pending_child_connects_.begin();
+ it != pending_child_connects_.end();) {
+ if (it->second == child_broker_host) {
+ // Since we can't do it = pending_child_connects_.erase(it); until
+ // hash_map uses unordered_map on posix.
+ auto cur = it++;
+ pending_child_connects_.erase(cur);
+ } else {
+ it++;
+ }
+ }
+
+ base::ProcessId pid = child_broker_host->GetProcessId();
+ for (auto it = connected_processes_.begin();
+ it != connected_processes_.end();) {
+ if ((*it).first == pid || (*it).second == pid) {
+ // Since we can't do it = pending_child_connects_.erase(it); until
+ // hash_map uses unordered_map on posix.
+ auto cur = it++;
+ connected_processes_.erase(cur);
+ } else {
+ it++;
+ }
+ }
+
+ CHECK(child_processes_.find(pid) != child_processes_.end());
+ child_processes_.erase(pid);
+}
+
+void BrokerState::HandleConnectMessagePipe(ChildBrokerHost* pipe_process,
+ uint64_t pipe_id) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ base::AutoLock auto_lock(lock_);
+ if (pending_child_connects_.find(pipe_id) != pending_child_connects_.end()) {
+ // Another child process is waiting to connect to the given pipe.
+ ChildBrokerHost* pending_pipe_process = pending_child_connects_[pipe_id];
+ EnsureProcessesConnected(pipe_process->GetProcessId(),
+ pending_pipe_process->GetProcessId());
+ pending_pipe_process->ConnectMessagePipe(
+ pipe_id, pipe_process->GetProcessId());
+ pipe_process->ConnectMessagePipe(
+ pipe_id, pending_pipe_process->GetProcessId());
+ pending_child_connects_.erase(pipe_id);
+ return;
+ }
+
+ if (pending_connects_.find(pipe_id) != pending_connects_.end()) {
+ // This parent process is the other side of the given pipe.
+ EnsureProcessesConnected(base::GetCurrentProcId(),
+ pipe_process->GetProcessId());
+ MessagePipeDispatcher* pending_pipe = pending_connects_[pipe_id];
+ connected_pipes_[pending_pipe] =
+ child_channels_[pipe_process->GetProcessId()];
+ child_channels_[pipe_process->GetProcessId()]->AddRoute(
+ pipe_id, pending_pipe);
+ pending_pipe->GotNonTransferableChannel(
+ child_channels_[pipe_process->GetProcessId()]->channel());
+ pipe_process->ConnectMessagePipe(
+ pipe_id, base::GetCurrentProcId());
+ pending_connects_.erase(pipe_id);
+ return;
+ }
+
+ // This is the first connection request for pipe_id to reach the parent.
+ pending_child_connects_[pipe_id] = pipe_process;
+}
+
+void BrokerState::HandleCancelConnectMessagePipe(uint64_t pipe_id) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ base::AutoLock auto_lock(lock_);
+ if (pending_child_connects_.find(pipe_id) == pending_child_connects_.end()) {
+ NOTREACHED() << "Can't find entry for pipe_id " << pipe_id;
+ } else {
+ pending_child_connects_.erase(pipe_id);
+ }
+}
+
+BrokerState::BrokerState()
+ : in_process_pipes_channel1_(nullptr),
+ in_process_pipes_channel2_(nullptr) {
DCHECK(!internal::g_broker);
internal::g_broker = this;
}
@@ -70,5 +220,49 @@
BrokerState::~BrokerState() {
}
+void BrokerState::EnsureProcessesConnected(base::ProcessId pid1,
+ base::ProcessId pid2) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ lock_.AssertAcquired();
+ CHECK_NE(pid1, pid2);
+ CHECK_NE(pid2, base::GetCurrentProcId());
+ std::pair<base::ProcessId, base::ProcessId> processes;
+ processes.first = std::min(pid1, pid2);
+ processes.second = std::max(pid1, pid2);
+ if (connected_processes_.find(processes) != connected_processes_.end())
+ return;
+
+ connected_processes_.insert(processes);
+ PlatformChannelPair channel_pair;
+ if (pid1 == base::GetCurrentProcId()) {
+ CHECK(child_channels_.find(pid2) == child_channels_.end());
+ CHECK(child_processes_.find(pid2) != child_processes_.end());
+ child_channels_[pid2] = new RoutedRawChannel(
+ channel_pair.PassServerHandle(),
+ base::Bind(&BrokerState::ChannelDestructed, base::Unretained(this)));
+ child_processes_[pid2]->ConnectToProcess(base::GetCurrentProcId(),
+ channel_pair.PassClientHandle());
+ return;
+ }
+
+ CHECK(child_processes_.find(pid1) != child_processes_.end());
+ CHECK(child_processes_.find(pid2) != child_processes_.end());
+ child_processes_[pid1]->ConnectToProcess(pid2,
+ channel_pair.PassServerHandle());
+ child_processes_[pid2]->ConnectToProcess(pid1,
+ channel_pair.PassClientHandle());
+}
+
+void BrokerState::ChannelDestructed(RoutedRawChannel* channel) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ base::AutoLock auto_lock(lock_);
+ for (auto it : child_channels_) {
+ if (it.second == channel) {
+ child_channels_.erase(it.first);
+ break;
+ }
+ }
+}
+
} // namespace edk
} // namespace mojo
diff --git a/mojo/edk/system/broker_state.h b/mojo/edk/system/broker_state.h
index cf982c7..29f3ffe 100644
--- a/mojo/edk/system/broker_state.h
+++ b/mojo/edk/system/broker_state.h
@@ -7,15 +7,18 @@
#include "base/compiler_specific.h"
#include "base/containers/hash_tables.h"
+#include "base/macros.h"
#include "base/memory/singleton.h"
+#include "base/process/process_handle.h"
#include "base/synchronization/lock.h"
-#include "base/threading/thread.h"
#include "mojo/edk/embedder/scoped_platform_handle.h"
#include "mojo/edk/system/broker.h"
#include "mojo/edk/system/system_impl_export.h"
namespace mojo {
namespace edk {
+class ChildBrokerHost;
+class RoutedRawChannel;
// Common broker state that has to live in a parent process. There is one
// instance of this class in the parent process. This class implements the
@@ -24,7 +27,7 @@
public:
static BrokerState* GetInstance();
- // Broker implementation.
+ // Broker implementation:
#if defined(OS_WIN)
void CreatePlatformChannelPair(ScopedPlatformHandle* server,
ScopedPlatformHandle* client) override;
@@ -35,10 +38,20 @@
size_t count,
PlatformHandle* handles) override;
#endif
+ void ConnectMessagePipe(uint64_t pipe_id,
+ MessagePipeDispatcher* message_pipe) override;
+ void CloseMessagePipe(uint64_t pipe_id,
+ MessagePipeDispatcher* message_pipe) override;
- scoped_refptr<base::TaskRunner> broker_thread() {
- return broker_thread_.task_runner();
- }
+ // Called by ChildBrokerHost on construction and destruction.
+ void ChildBrokerHostCreated(ChildBrokerHost* child_broker_host);
+ void ChildBrokerHostDestructed(ChildBrokerHost* child_broker_host);
+
+ // These are called by ChildBrokerHost as they dispatch IPCs from ChildBroker.
+ // They are called on the IO thread.
+ void HandleConnectMessagePipe(ChildBrokerHost* pipe_process,
+ uint64_t pipe_id);
+ void HandleCancelConnectMessagePipe(uint64_t pipe_id);
private:
friend struct base::DefaultSingletonTraits<BrokerState>;
@@ -46,20 +59,58 @@
BrokerState();
~BrokerState() override;
- // A separate thread to handle sync IPCs from child processes for exchanging
- // platform handles with tokens. We use a separate thread because latency is
- // very sensitive (since any time a pipe is created or sent, a child process
- // makes a sync call to this class).
- base::Thread broker_thread_;
+ // Checks if there's a direct channel between the two processes, and if not
+ // creates one and tells them about it.
+ // If one of the processes is the current one, it should be pid1.
+ // Called on the IO thread.
+ void EnsureProcessesConnected(base::ProcessId pid1, base::ProcessId pid2);
+
+ // Callback when a RoutedRawChannel is destroyed for cleanup.
+ // Called on the IO thread.
+ void ChannelDestructed(RoutedRawChannel* channel);
#if defined(OS_WIN)
// Used in the parent (unsandboxed) process to hold a mapping between HANDLES
// and tokens. When a child process wants to send a HANDLE to another process,
// it exchanges it to a token and then the other process exchanges that token
// back to a HANDLE.
- base::Lock lock_; // Guards access to below.
+ base::Lock token_map_lock_;
base::hash_map<uint64_t, HANDLE> token_map_;
#endif
+
+ // For pending connects originiating in this process.
+ // Only accessed on the IO thread.
+ base::hash_map<uint64_t, MessagePipeDispatcher*> pending_connects_;
+
+ // For connected message pipes in this process. This is needed so that when a
+ // MessagePipeDispatcher is closed we can remove the route for the
+ // corresponding RoutedRawChannel.
+ // Only accessed on the IO thread.
+ base::hash_map<MessagePipeDispatcher*, RoutedRawChannel*> connected_pipes_;
+
+ base::Lock lock_; // Guards access to below.
+
+ // Holds a map of all the RoutedRawChannel that connect this parent process to
+ // a child process. The key is the child process'd pid.
+ base::hash_map<base::ProcessId, RoutedRawChannel*> child_channels_;
+
+ base::hash_map<uint64_t, ChildBrokerHost*> pending_child_connects_;
+
+ // Each entry is an std::pair of ints of processe IDs that have
+ // RoutedRawChannel objects between them. The pair always has the smaller
+ // process id value first.
+ // For now, we don't reap connections if there are no more routes between two
+ // processes.
+ base::hash_set<std::pair<base::ProcessId, base::ProcessId>>
+ connected_processes_;
+
+ base::hash_map<base::ProcessId, ChildBrokerHost*> child_processes_;
+
+ // Used for message pipes in the same process.
+ RoutedRawChannel* in_process_pipes_channel1_;
+ RoutedRawChannel* in_process_pipes_channel2_;
+
+ DISALLOW_COPY_AND_ASSIGN(BrokerState);
};
} // namespace edk
diff --git a/mojo/edk/system/child_broker.cc b/mojo/edk/system/child_broker.cc
index 7d7b4c6..fbfeb3d 100644
--- a/mojo/edk/system/child_broker.cc
+++ b/mojo/edk/system/child_broker.cc
@@ -4,9 +4,13 @@
#include "mojo/edk/system/child_broker.h"
+#include "base/bind.h"
#include "base/logging.h"
#include "mojo/edk/embedder/embedder_internal.h"
+#include "mojo/edk/embedder/platform_channel_pair.h"
#include "mojo/edk/system/broker_messages.h"
+#include "mojo/edk/system/message_pipe_dispatcher.h"
+#include "mojo/edk/system/routed_raw_channel.h"
namespace mojo {
namespace edk {
@@ -17,23 +21,41 @@
}
void ChildBroker::SetChildBrokerHostHandle(ScopedPlatformHandle handle) {
- handle_ = handle.Pass();
+ ScopedPlatformHandle parent_async_channel_handle;
+#if defined(OS_POSIX)
+ parent_async_channel_handle = handle.Pass();
+#else
+ // On Windows we have two pipes to the parent. The first is for the token
+ // exchange for creating and passing handles, since the child needs the
+ // parent's help if it is sandboxed. The second is the same as POSIX, which is
+ // used for multiplexing related messages. So on Windows, we send the second
+ // pipe as the first string over the first one.
+ parent_sync_channel_ = handle.Pass();
+
+ HANDLE parent_handle = INVALID_HANDLE_VALUE;
+ DWORD bytes_read = 0;
+ BOOL rv = ReadFile(parent_sync_channel_.get().handle, &parent_handle,
+ sizeof(parent_handle), &bytes_read, NULL);
+ CHECK(rv);
+ parent_async_channel_handle.reset(PlatformHandle(parent_handle));
+#endif
+
+ parent_async_channel_ =
+ RawChannel::Create(parent_async_channel_handle.Pass());
+ internal::g_io_thread_task_runner->PostTask(
+ FROM_HERE,
+ base::Bind(&RawChannel::Init, base::Unretained(parent_async_channel_),
+ this));
+
lock_.Unlock();
}
#if defined(OS_WIN)
void ChildBroker::CreatePlatformChannelPair(
ScopedPlatformHandle* server, ScopedPlatformHandle* client) {
- BrokerMessage message;
- message.size = kBrokerMessageHeaderSize;
- message.id = CREATE_PLATFORM_CHANNEL_PAIR;
-
- uint32_t response_size = 2 * sizeof(HANDLE);
- HANDLE handles[2];
- if (WriteAndReadResponse(&message, handles, response_size)) {
- server->reset(PlatformHandle(handles[0]));
- client->reset(PlatformHandle(handles[1]));
- }
+ lock_.Lock();
+ CreatePlatformChannelPairNoLock(server, client);
+ lock_.Unlock();
}
void ChildBroker::HandleToToken(const PlatformHandle* platform_handles,
@@ -49,7 +71,9 @@
message->handles[i] = platform_handles[i].handle;
uint32_t response_size = static_cast<int>(count) * sizeof(uint64_t);
+ lock_.Lock();
WriteAndReadResponse(message, tokens, response_size);
+ lock_.Unlock();
}
void ChildBroker::TokenToHandle(const uint64_t* tokens,
@@ -67,14 +91,85 @@
std::vector<HANDLE> handles_temp(count);
uint32_t response_size =
static_cast<uint32_t>(handles_temp.size()) * sizeof(HANDLE);
+ lock_.Lock();
if (WriteAndReadResponse(message, &handles_temp[0], response_size)) {
for (uint32_t i = 0; i < count; ++i)
handles[i].handle = handles_temp[i];
+ lock_.Unlock();
}
}
#endif
-ChildBroker::ChildBroker() {
+void ChildBroker::ConnectMessagePipe(uint64_t pipe_id,
+ MessagePipeDispatcher* message_pipe) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ lock_.Lock();
+
+ ConnectMessagePipeMessage data;
+ data.pipe_id = pipe_id;
+ if (pending_connects_.find(pipe_id) != pending_connects_.end()) {
+ // Both ends of the message pipe are in the same process.
+ // First, tell the browser side that to remove its bookkeeping for a pending
+ // connect, since it'll never get the other side.
+
+ data.type = CANCEL_CONNECT_MESSAGE_PIPE;
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(
+ MessageInTransit::Type::MESSAGE, sizeof(data), &data));
+ parent_async_channel_->WriteMessage(message.Pass());
+
+ if (!in_process_pipes_channel1_) {
+ ScopedPlatformHandle server_handle, client_handle;
+#if defined(OS_WIN)
+ CreatePlatformChannelPairNoLock(&server_handle, &client_handle);
+#else
+ PlatformChannelPair channel_pair;
+ server_handle = channel_pair.PassServerHandle();
+ client_handle = channel_pair.PassClientHandle();
+#endif
+ in_process_pipes_channel1_ = new RoutedRawChannel(
+ server_handle.Pass(),
+ base::Bind(&ChildBroker::ChannelDestructed, base::Unretained(this)));
+ in_process_pipes_channel2_ = new RoutedRawChannel(
+ client_handle.Pass(),
+ base::Bind(&ChildBroker::ChannelDestructed, base::Unretained(this)));
+ }
+
+ connected_pipes_[pending_connects_[pipe_id]] = in_process_pipes_channel1_;
+ connected_pipes_[message_pipe] = in_process_pipes_channel2_;
+ in_process_pipes_channel1_->AddRoute(pipe_id, pending_connects_[pipe_id]);
+ in_process_pipes_channel2_->AddRoute(pipe_id, message_pipe);
+ pending_connects_[pipe_id]->GotNonTransferableChannel(
+ in_process_pipes_channel1_->channel());
+ message_pipe->GotNonTransferableChannel(
+ in_process_pipes_channel2_->channel());
+
+ pending_connects_.erase(pipe_id);
+ lock_.Unlock();
+ return;
+ }
+
+ data.type = CONNECT_MESSAGE_PIPE;
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(
+ MessageInTransit::Type::MESSAGE, sizeof(data), &data));
+ pending_connects_[pipe_id] = message_pipe;
+ parent_async_channel_->WriteMessage(message.Pass());
+
+ lock_.Unlock();
+}
+
+void ChildBroker::CloseMessagePipe(
+ uint64_t pipe_id, MessagePipeDispatcher* message_pipe) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ lock_.Lock();
+ CHECK(connected_pipes_.find(message_pipe) != connected_pipes_.end());
+ connected_pipes_[message_pipe]->RemoveRoute(pipe_id, message_pipe);
+ connected_pipes_.erase(message_pipe);
+ lock_.Unlock();
+}
+
+ChildBroker::ChildBroker()
+ : in_process_pipes_channel1_(nullptr),
+ in_process_pipes_channel2_(nullptr) {
DCHECK(!internal::g_broker);
internal::g_broker = this;
// Block any threads from calling this until we have a pipe to the parent.
@@ -84,18 +179,80 @@
ChildBroker::~ChildBroker() {
}
+void ChildBroker::OnReadMessage(
+ const MessageInTransit::View& message_view,
+ ScopedPlatformHandleVectorPtr platform_handles) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ lock_.Lock();
+ MultiplexMessages type =
+ *static_cast<const MultiplexMessages*>(message_view.bytes());
+ if (type == CONNECT_TO_PROCESS) {
+ DCHECK_EQ(platform_handles->size(), 1u);
+ ScopedPlatformHandle handle((*platform_handles.get())[0]);
+ (*platform_handles.get())[0] = PlatformHandle();
+
+ const ConnectToProcessMessage* message =
+ static_cast<const ConnectToProcessMessage*>(message_view.bytes());
+
+ CHECK(channels_.find(message->process_id) == channels_.end());
+ channels_[message->process_id] = new RoutedRawChannel(
+ handle.Pass(),
+ base::Bind(&ChildBroker::ChannelDestructed, base::Unretained(this)));
+ } else if (type == PEER_PIPE_CONNECTED) {
+ DCHECK(!platform_handles);
+ const PeerPipeConnectedMessage* message =
+ static_cast<const PeerPipeConnectedMessage*>(message_view.bytes());
+
+ uint64_t pipe_id = message->pipe_id;
+ uint64_t peer_pid = message->process_id;
+
+ CHECK(pending_connects_.find(pipe_id) != pending_connects_.end());
+ MessagePipeDispatcher* pipe = pending_connects_[pipe_id];
+ pending_connects_.erase(pipe_id);
+ if (channels_.find(peer_pid) == channels_.end()) {
+ // We saw the peer process die before we got the reply from the parent.
+ pipe->OnError(ERROR_READ_SHUTDOWN);
+ } else {
+ CHECK(connected_pipes_.find(pipe) == connected_pipes_.end());
+ connected_pipes_[pipe] = channels_[peer_pid];
+ channels_[peer_pid]->AddRoute(pipe_id, pipe);
+ pipe->GotNonTransferableChannel(channels_[peer_pid]->channel());
+ }
+ } else {
+ NOTREACHED();
+ }
+
+ lock_.Unlock();
+}
+
+void ChildBroker::OnError(Error error) {
+ // The parent process shut down.
+}
+
+void ChildBroker::ChannelDestructed(RoutedRawChannel* channel) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ lock_.Lock();
+ for (auto it : channels_) {
+ if (it.second == channel) {
+ channels_.erase(it.first);
+ break;
+ }
+ }
+ lock_.Unlock();
+}
+
+#if defined(OS_WIN)
+
bool ChildBroker::WriteAndReadResponse(BrokerMessage* message,
void* response,
uint32_t response_size) {
- lock_.Lock();
- CHECK(handle_.is_valid());
+ CHECK(parent_sync_channel_.is_valid());
bool result = true;
-#if defined(OS_WIN)
DWORD bytes_written = 0;
// This will always write in one chunk per
// https://msdn.microsoft.com/en-us/library/windows/desktop/aa365150.aspx.
- BOOL rv = WriteFile(handle_.get().handle, message, message->size,
+ BOOL rv = WriteFile(parent_sync_channel_.get().handle, message, message->size,
&bytes_written, NULL);
if (!rv || bytes_written != message->size) {
LOG(ERROR) << "Child token serializer couldn't write message.";
@@ -103,8 +260,8 @@
} else {
while (response_size) {
DWORD bytes_read = 0;
- rv = ReadFile(handle_.get().handle, response, response_size, &bytes_read,
- NULL);
+ rv = ReadFile(parent_sync_channel_.get().handle, response, response_size,
+ &bytes_read, NULL);
if (!rv) {
LOG(ERROR) << "Child token serializer couldn't read result.";
result = false;
@@ -114,12 +271,25 @@
response = static_cast<char*>(response) + bytes_read;
}
}
-#endif
-
- lock_.Unlock();
return result;
}
+void ChildBroker::CreatePlatformChannelPairNoLock(
+ ScopedPlatformHandle* server, ScopedPlatformHandle* client) {
+ BrokerMessage message;
+ message.size = kBrokerMessageHeaderSize;
+ message.id = CREATE_PLATFORM_CHANNEL_PAIR;
+
+ uint32_t response_size = 2 * sizeof(HANDLE);
+ HANDLE handles[2];
+ if (WriteAndReadResponse(&message, handles, response_size)) {
+ server->reset(PlatformHandle(handles[0]));
+ client->reset(PlatformHandle(handles[1]));
+ }
+}
+
+#endif
+
} // namespace edk
} // namespace mojo
diff --git a/mojo/edk/system/child_broker.h b/mojo/edk/system/child_broker.h
index 689739a..366daa74 100644
--- a/mojo/edk/system/child_broker.h
+++ b/mojo/edk/system/child_broker.h
@@ -5,20 +5,28 @@
#ifndef MOJO_EDK_SYSTEM_CHILD_BROKER_H_
#define MOJO_EDK_SYSTEM_CHILD_BROKER_H_
+#include "base/compiler_specific.h"
+#include "base/containers/hash_tables.h"
+#include "base/macros.h"
#include "base/memory/singleton.h"
+#include "base/process/process_handle.h"
#include "base/synchronization/lock_impl.h"
#include "mojo/edk/embedder/scoped_platform_handle.h"
#include "mojo/edk/system/broker.h"
+#include "mojo/edk/system/raw_channel.h"
#include "mojo/edk/system/system_impl_export.h"
namespace mojo {
namespace edk {
+class RoutedRawChannel;
struct BrokerMessage;
// An implementation of Broker used in (sandboxed) child processes. It talks
// over sync IPCs to the (unsandboxed) parent process (specifically,
-// ParentBroker) to convert handles to tokens and vice versa.
-class MOJO_SYSTEM_IMPL_EXPORT ChildBroker : public Broker {
+// ChildBrokerHost) to convert handles to tokens and vice versa. It also sends
+// async messages to handle message pipe multiplexing.
+class MOJO_SYSTEM_IMPL_EXPORT ChildBroker
+ : public Broker, public RawChannel::Delegate {
public:
static ChildBroker* GetInstance();
@@ -36,6 +44,10 @@
size_t count,
PlatformHandle* handles) override;
#endif
+ void ConnectMessagePipe(uint64_t pipe_id,
+ MessagePipeDispatcher* message_pipe) override;
+ void CloseMessagePipe(uint64_t pipe_id,
+ MessagePipeDispatcher* message_pipe) override;
private:
friend struct base::DefaultSingletonTraits<ChildBroker>;
@@ -43,17 +55,68 @@
ChildBroker();
~ChildBroker() override;
+ // RawChannel::Delegate implementation:
+ void OnReadMessage(
+ const MessageInTransit::View& message_view,
+ ScopedPlatformHandleVectorPtr platform_handles) override;
+ void OnError(Error error) override;
+
+ // Callback for when a RoutedRawChannel is destroyed for cleanup.
+ void ChannelDestructed(RoutedRawChannel* channel);
+
+#if defined(OS_WIN)
// Helper method to write the given message and read back the result.
bool WriteAndReadResponse(BrokerMessage* message,
void* response,
uint32_t response_size);
+ void CreatePlatformChannelPairNoLock(ScopedPlatformHandle* server,
+ ScopedPlatformHandle* client);
+
+ // Pipe used for communication to the parent process. We use a pipe directly
+ // instead of bindings or RawChannel because we need to send synchronous
+ // messages with replies from any thread.
+ ScopedPlatformHandle parent_sync_channel_;
+#endif
+
// Guards access to below.
// We use LockImpl instead of Lock because the latter adds thread checking
// that we don't want (since we lock in the constructor and unlock on another
// thread.
base::internal::LockImpl lock_;
- ScopedPlatformHandle handle_;
+
+ // RawChannel used for asynchronous communication to and from the parent
+ // process. Since these messages are bidirectional, we can't use
+ // |parent_sync_channel_| which is only used for sync messages to the parent.
+ // However since the messages are asynchronous, we can use RawChannel for
+ // convenience instead of writing and reading from pipes manually. Although it
+ // would be convenient, we don't use Mojo IPC because it would be a layering
+ // violation (and cirular dependency) if the system layer depended on
+ // bindings.
+ RawChannel* parent_async_channel_;
+
+ // Maps from routing ids to the MessagePipeDispatcher that have requested to
+ // connect to them. When the parent replies with which process they should be
+ // connected to, they will migrate to |connected_pipes_|.
+ base::hash_map<uint64_t, MessagePipeDispatcher*> pending_connects_;
+
+ // Map from MessagePipeDispatcher to its RoutedRawChannel. This is needed so
+ // that when a MessagePipeDispatcher is closed we can remove the route for the
+ // corresponding RoutedRawChannel.
+ // Note the key is MessagePipeDispatcher*, instead of pipe_id, because when
+ // the two pipes are in the same process they will have one pipe_id but be
+ // connected to the two RoutedRawChannel objects below.
+ base::hash_map<MessagePipeDispatcher*, RoutedRawChannel*> connected_pipes_;
+
+ // Holds a map of all the RoutedRawChannel that connect this child process to
+ // any other process. The key is the peer process'd pid.
+ base::hash_map<base::ProcessId, RoutedRawChannel*> channels_;
+
+ // Used for message pipes in the same process.
+ RoutedRawChannel* in_process_pipes_channel1_;
+ RoutedRawChannel* in_process_pipes_channel2_;
+
+ DISALLOW_COPY_AND_ASSIGN(ChildBroker);
};
} // namespace edk
diff --git a/mojo/edk/system/child_broker_host.cc b/mojo/edk/system/child_broker_host.cc
index b7f055a..ae7236f 100644
--- a/mojo/edk/system/child_broker_host.cc
+++ b/mojo/edk/system/child_broker_host.cc
@@ -6,77 +6,177 @@
#include "base/bind.h"
#include "base/lazy_instance.h"
+#include "mojo/edk/embedder/embedder_internal.h"
#include "mojo/edk/embedder/platform_channel_pair.h"
#include "mojo/edk/system/broker_messages.h"
#include "mojo/edk/system/broker_state.h"
#include "mojo/edk/system/configuration.h"
+#include "mojo/edk/system/core.h"
+#include "mojo/edk/system/platform_handle_dispatcher.h"
namespace mojo {
namespace edk {
namespace {
+#if defined(OS_WIN)
static const int kDefaultReadBufferSize = 256;
+#endif
}
ChildBrokerHost::ChildBrokerHost(base::ProcessHandle child_process,
ScopedPlatformHandle pipe)
- : child_process_(child_process),
- pipe_(pipe.Pass()),
- num_bytes_read_(0) {
-#if defined(OS_WIN)
+ : process_id_(base::GetProcId(child_process)) {
+ ScopedPlatformHandle parent_async_channel_handle;
+#if defined(OS_POSIX)
+ parent_async_channel_handle = pipe.Pass();
+#else
+ child_process_ = child_process;
+ sync_channel_ = pipe.Pass();
memset(&read_context_.overlapped, 0, sizeof(read_context_.overlapped));
read_context_.handler = this;
memset(&write_context_.overlapped, 0, sizeof(write_context_.overlapped));
write_context_.handler = this;
-#else
- // TODO(jam)
- (void)child_process_; // Suppress -Wunused-private-field.
- (void)num_bytes_read_; // Suppress -Wunused-private-field.
-#endif
-
read_data_.resize(kDefaultReadBufferSize);
- BrokerState::GetInstance()->broker_thread()->PostTask(
+ num_bytes_read_ = 0;
+
+ // See comment in ChildBroker::SetChildBrokerHostHandle. Summary is we need
+ // two pipes on Windows, so send the second one over the first one.
+ PlatformChannelPair parent_pipe;
+ parent_async_channel_handle = parent_pipe.PassServerHandle();
+
+ HANDLE duplicated_child_handle =
+ DuplicateToChild(parent_pipe.PassClientHandle().release().handle);
+ BOOL rv = WriteFile(sync_channel_.get().handle,
+ &duplicated_child_handle, sizeof(duplicated_child_handle),
+ NULL, &write_context_.overlapped);
+ DCHECK(rv || GetLastError() == ERROR_IO_PENDING);
+
+ internal::g_io_thread_task_runner->PostTask(
FROM_HERE,
base::Bind(&ChildBrokerHost::RegisterIOHandler, base::Unretained(this)));
+#endif
+
+ child_channel_ = RawChannel::Create(parent_async_channel_handle.Pass());
+ internal::g_io_thread_task_runner->PostTask(
+ FROM_HERE,
+ base::Bind(&RawChannel::Init, base::Unretained(child_channel_), this));
+ internal::g_io_thread_task_runner->PostTask(
+ FROM_HERE,
+ base::Bind(&RawChannel::EnsureLazyInitialized,
+ base::Unretained(child_channel_)));
+
+ BrokerState::GetInstance()->ChildBrokerHostCreated(this);
+}
+
+base::ProcessId ChildBrokerHost::GetProcessId() {
+ return process_id_;
+}
+
+void ChildBrokerHost::ConnectToProcess(base::ProcessId process_id,
+ ScopedPlatformHandle pipe) {
+ if (!child_channel_)
+ return; // Can happen at process shutdown on Windows.
+ ConnectToProcessMessage data;
+ data.type = CONNECT_TO_PROCESS;
+ data.process_id = process_id;
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(
+ MessageInTransit::Type::MESSAGE, sizeof(data), &data));
+ scoped_refptr<Dispatcher> dispatcher =
+ PlatformHandleDispatcher::Create(pipe.Pass());
+ internal::g_core->AddDispatcher(dispatcher);
+ scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector);
+ dispatchers->push_back(dispatcher);
+ message->SetDispatchers(dispatchers.Pass());
+ message->SerializeAndCloseDispatchers();
+ child_channel_->WriteMessage(message.Pass());
+}
+
+void ChildBrokerHost::ConnectMessagePipe(uint64_t pipe_id,
+ base::ProcessId process_id) {
+ if (!child_channel_)
+ return; // Can happen at process shutdown on Windows.
+ PeerPipeConnectedMessage data;
+ data.type = PEER_PIPE_CONNECTED;
+ data.pipe_id = pipe_id;
+ data.process_id = process_id;
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(
+ MessageInTransit::Type::MESSAGE, sizeof(data), &data));
+ child_channel_->WriteMessage(message.Pass());
}
ChildBrokerHost::~ChildBrokerHost() {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ BrokerState::GetInstance()->ChildBrokerHostDestructed(this);
+ if (child_channel_)
+ child_channel_->Shutdown();
}
-void ChildBrokerHost::RegisterIOHandler() {
-#if defined(OS_WIN)
- base::MessageLoopForIO::current()->RegisterIOHandler(
- pipe_.get().handle, this);
- BeginRead();
-#elif defined(OS_POSIX)
- // TOOD(jam): setup
+void ChildBrokerHost::OnReadMessage(
+ const MessageInTransit::View& message_view,
+ ScopedPlatformHandleVectorPtr platform_handles) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ CHECK(!platform_handles);
+ if (message_view.num_bytes() !=
+ static_cast<uint32_t>(sizeof(ConnectMessagePipeMessage))) {
+ NOTREACHED();
+ delete this;
+ }
+
+ const ConnectMessagePipeMessage* message =
+ static_cast<const ConnectMessagePipeMessage*>(message_view.bytes());
+ switch(message->type) {
+ case CONNECT_MESSAGE_PIPE:
+ BrokerState::GetInstance()->HandleConnectMessagePipe(this,
+ message->pipe_id);
+ break;
+ case CANCEL_CONNECT_MESSAGE_PIPE:
+ BrokerState::GetInstance()->HandleCancelConnectMessagePipe(
+ message->pipe_id);
+ break;
+ default:
+ NOTREACHED();
+ delete this;
+ }
+}
+
+void ChildBrokerHost::OnError(Error error) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ child_channel_->Shutdown();
+ child_channel_ = nullptr;
+ // On Windows, we have two pipes to the child process. It's easier to wait
+ // until we get the error from the pipe that uses asynchronous I/O.
+#if !defined(OS_WIN)
+ delete this;
#endif
}
-void ChildBrokerHost::BeginRead() {
#if defined(OS_WIN)
- BOOL rv = ReadFile(pipe_.get().handle, &read_data_[num_bytes_read_],
+void ChildBrokerHost::RegisterIOHandler() {
+ base::MessageLoopForIO::current()->RegisterIOHandler(
+ sync_channel_.get().handle, this);
+ BeginRead();
+}
+
+void ChildBrokerHost::BeginRead() {
+ BOOL rv = ReadFile(sync_channel_.get().handle,
+ &read_data_[num_bytes_read_],
static_cast<int>(read_data_.size() - num_bytes_read_),
nullptr, &read_context_.overlapped);
if (rv || GetLastError() == ERROR_IO_PENDING)
return;
- if (rv == ERROR_BROKEN_PIPE) {
+ if (GetLastError() == ERROR_BROKEN_PIPE) {
delete this;
return;
}
NOTREACHED() << "Unknown error in ChildBrokerHost " << rv;
-#endif
}
-#if defined(OS_WIN)
void ChildBrokerHost::OnIOCompleted(base::MessageLoopForIO::IOContext* context,
DWORD bytes_transferred,
DWORD error) {
- if (context != &read_context_)
- return;
-
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
if (error == ERROR_BROKEN_PIPE) {
delete this;
return; // Child process exited or crashed.
@@ -88,6 +188,11 @@
return;
}
+ if (context == &write_context_) {
+ write_data_.clear();
+ return;
+ }
+
num_bytes_read_ += bytes_transferred;
CHECK_GE(num_bytes_read_, sizeof(uint32_t));
BrokerMessage* message = reinterpret_cast<BrokerMessage*>(&read_data_[0]);
@@ -97,6 +202,15 @@
return;
}
+ // This should never fire because we only get new requests from a child
+ // process after it has read all the previous data we wrote.
+ if (!write_data_.empty()) {
+ NOTREACHED() << "ChildBrokerHost shouldn't have data to write when it gets "
+ << " a new request";
+ delete this;
+ return;
+ }
+
if (message->id == CREATE_PLATFORM_CHANNEL_PAIR) {
PlatformChannelPair channel_pair;
uint32_t response_size = 2 * sizeof(HANDLE);
@@ -152,7 +266,7 @@
return;
}
- BOOL rv = WriteFile(pipe_.get().handle, &write_data_[0],
+ BOOL rv = WriteFile(sync_channel_.get().handle, &write_data_[0],
static_cast<int>(write_data_.size()), NULL,
&write_context_.overlapped);
DCHECK(rv || GetLastError() == ERROR_IO_PENDING);
diff --git a/mojo/edk/system/child_broker_host.h b/mojo/edk/system/child_broker_host.h
index 7f4bc61..a91218a 100644
--- a/mojo/edk/system/child_broker_host.h
+++ b/mojo/edk/system/child_broker_host.h
@@ -8,20 +8,24 @@
#include <vector>
#include "base/compiler_specific.h"
+#include "base/macros.h"
#include "base/message_loop/message_loop.h"
#include "base/process/process_handle.h"
#include "mojo/edk/embedder/scoped_platform_handle.h"
+#include "mojo/edk/system/raw_channel.h"
#include "mojo/edk/system/system_impl_export.h"
namespace mojo {
namespace edk {
-// Responds to requests from a child process to exchange handles to tokens and
-// vice versa. There is one object of this class per child process host object.
+// Responds to requests from ChildBroker. This is used to handle message pipe
+// multiplexing and Windows sandbox messages. There is one object of this class
+// per child process host object.
// This object will delete itself when it notices that the pipe is broken.
class MOJO_SYSTEM_IMPL_EXPORT ChildBrokerHost
+ : public RawChannel::Delegate
#if defined(OS_WIN)
- : NON_EXPORTED_BASE(public base::MessageLoopForIO::IOHandler) {
+ , NON_EXPORTED_BASE(public base::MessageLoopForIO::IOHandler) {
#else
{
#endif
@@ -33,17 +37,29 @@
// this class.
ChildBrokerHost(base::ProcessHandle child_process, ScopedPlatformHandle pipe);
- private:
-#if defined(OS_WIN)
- ~ChildBrokerHost() override;
-#else
- ~ChildBrokerHost();
-#endif
+ base::ProcessId GetProcessId();
+ // Sends a message to the child process to connect to |process_id| via |pipe|.
+ void ConnectToProcess(base::ProcessId process_id, ScopedPlatformHandle pipe);
+
+ // Sends a message to the child process that |pipe_id|'s other end is in
+ // |process_id|.
+ void ConnectMessagePipe(uint64_t pipe_id, base::ProcessId process_id);
+
+ private:
+ ~ChildBrokerHost() override;
+
+ // RawChannel::Delegate implementation:
+ void OnReadMessage(
+ const MessageInTransit::View& message_view,
+ ScopedPlatformHandleVectorPtr platform_handles) override;
+ void OnError(Error error) override;
+
+#if defined(OS_WIN)
void RegisterIOHandler();
void BeginRead();
-#if defined(OS_WIN)
+ // base::MessageLoopForIO::IOHandler implementation:
void OnIOCompleted(base::MessageLoopForIO::IOContext* context,
DWORD bytes_transferred,
DWORD error) override;
@@ -53,18 +69,29 @@
HANDLE DuplicateFromChild(HANDLE handle);
#endif
- base::ProcessHandle child_process_;
- ScopedPlatformHandle pipe_;
+ base::ProcessId process_id_;
+
+ // Channel used to receive and send multiplexing related messages.
+ RawChannel* child_channel_;
#if defined(OS_WIN)
+ // Handle to the child process, used for duplication of handles.
+ base::ProcessHandle child_process_;
+
+ // Pipe used for synchronous messages from the child. Responses are written to
+ // it as well.
+ ScopedPlatformHandle sync_channel_;
+
base::MessageLoopForIO::IOContext read_context_;
base::MessageLoopForIO::IOContext write_context_;
-#endif
std::vector<char> read_data_;
// How many bytes in read_data_ we already read.
uint32_t num_bytes_read_;
std::vector<char> write_data_;
+#endif
+
+ DISALLOW_COPY_AND_ASSIGN(ChildBrokerHost);
};
} // namespace edk
diff --git a/mojo/edk/system/core.cc b/mojo/edk/system/core.cc
index 77b06f0cc..454b487 100644
--- a/mojo/edk/system/core.cc
+++ b/mojo/edk/system/core.cc
@@ -7,6 +7,7 @@
#include <vector>
#include "base/logging.h"
+#include "base/rand_util.h"
#include "base/time/time.h"
#include "mojo/edk/embedder/embedder_internal.h"
#include "mojo/edk/embedder/platform_channel_pair.h"
@@ -206,18 +207,28 @@
return MOJO_RESULT_RESOURCE_EXHAUSTED;
}
- ScopedPlatformHandle server_handle, client_handle;
+ if (validated_options.flags &
+ MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE) {
+ ScopedPlatformHandle server_handle, client_handle;
#if defined(OS_WIN)
- internal::g_broker->CreatePlatformChannelPair(&server_handle, &client_handle);
+ internal::g_broker->CreatePlatformChannelPair(&server_handle,
+ &client_handle);
#else
- PlatformChannelPair channel_pair;
- server_handle = channel_pair.PassServerHandle();
- client_handle = channel_pair.PassClientHandle();
+ PlatformChannelPair channel_pair;
+ server_handle = channel_pair.PassServerHandle();
+ client_handle = channel_pair.PassClientHandle();
#endif
- dispatcher0->Init(server_handle.Pass(), nullptr, 0u, nullptr, 0u, nullptr,
- nullptr);
- dispatcher1->Init(client_handle.Pass(), nullptr, 0u, nullptr, 0u, nullptr,
- nullptr);
+ dispatcher0->Init(server_handle.Pass(), nullptr, 0u, nullptr, 0u, nullptr,
+ nullptr);
+ dispatcher1->Init(client_handle.Pass(), nullptr, 0u, nullptr, 0u, nullptr,
+ nullptr);
+ } else {
+ uint64_t pipe_id = 0;
+ while (pipe_id == 0)
+ pipe_id = base::RandUint64();
+ dispatcher0->InitNonTransferable(pipe_id);
+ dispatcher1->InitNonTransferable(pipe_id);
+ }
*message_pipe_handle0 = handle_pair.first;
*message_pipe_handle1 = handle_pair.second;
diff --git a/mojo/edk/system/core_test_base.h b/mojo/edk/system/core_test_base.h
index 774dc95..7ed8a7a 100644
--- a/mojo/edk/system/core_test_base.h
+++ b/mojo/edk/system/core_test_base.h
@@ -22,7 +22,7 @@
class CoreTestBase_MockHandleInfo;
-class CoreTestBase : public MojoSystemTest {
+class CoreTestBase : public testing::Test {
public:
using MockHandleInfo = CoreTestBase_MockHandleInfo;
diff --git a/mojo/edk/system/core_unittest.cc b/mojo/edk/system/core_unittest.cc
index cbbd905..9604bbd 100644
--- a/mojo/edk/system/core_unittest.cc
+++ b/mojo/edk/system/core_unittest.cc
@@ -730,8 +730,11 @@
#endif
MojoHandle h_passed[2];
+ MojoCreateMessagePipeOptions options;
+ options.struct_size = sizeof(MojoCreateMessagePipeOptions);
+ options.flags = MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE;
ASSERT_EQ(MOJO_RESULT_OK,
- core()->CreateMessagePipe(nullptr, &h_passed[0], &h_passed[1]));
+ core()->CreateMessagePipe(&options, &h_passed[0], &h_passed[1]));
// Make sure that |h_passed[]| work properly.
ASSERT_EQ(MOJO_RESULT_OK,
diff --git a/mojo/edk/system/data_pipe_unittest.cc b/mojo/edk/system/data_pipe_unittest.cc
index 5c8f0f8..0880f6084 100644
--- a/mojo/edk/system/data_pipe_unittest.cc
+++ b/mojo/edk/system/data_pipe_unittest.cc
@@ -32,7 +32,7 @@
// TODO(vtl): Get rid of this.
const size_t kMaxPoll = 100;
-class DataPipeTest : public test::MojoSystemTest {
+class DataPipeTest : public testing::Test {
public:
DataPipeTest() : producer_(MOJO_HANDLE_INVALID),
consumer_(MOJO_HANDLE_INVALID) {}
diff --git a/mojo/edk/system/dispatcher.h b/mojo/edk/system/dispatcher.h
index fb757c7..d1c7b1f6 100644
--- a/mojo/edk/system/dispatcher.h
+++ b/mojo/edk/system/dispatcher.h
@@ -300,6 +300,7 @@
// Available to subclasses. (Note: Returns a non-const reference, just like
// |base::AutoLock|'s constructor takes a non-const reference.)
base::Lock& lock() const { return lock_; }
+ bool is_closed() const { return is_closed_; }
private:
friend class DispatcherTransport;
diff --git a/mojo/edk/system/message_in_transit.cc b/mojo/edk/system/message_in_transit.cc
index b85efc7..05fd98f 100644
--- a/mojo/edk/system/message_in_transit.cc
+++ b/mojo/edk/system/message_in_transit.cc
@@ -166,6 +166,7 @@
header()->type = type;
header()->num_bytes = num_bytes;
header()->unused = 0;
+ header()->route_id = 0;
// Note: If dispatchers are subsequently attached, then |total_size| will have
// to be adjusted.
UpdateTotalSize();
diff --git a/mojo/edk/system/message_in_transit.h b/mojo/edk/system/message_in_transit.h
index 78dc40a..bebaaf7 100644
--- a/mojo/edk/system/message_in_transit.h
+++ b/mojo/edk/system/message_in_transit.h
@@ -103,6 +103,7 @@
return static_cast<const char*>(buffer_) + sizeof(Header);
}
Type type() const { return header()->type; }
+ uint64_t route_id() const { return header()->route_id; }
private:
const Header* header() const { return static_cast<const Header*>(buffer_); }
@@ -171,6 +172,9 @@
Type type() const { return header()->type; }
+ void set_route_id(uint64_t route_id) { header()->route_id = route_id; }
+ uint64_t route_id() const { return header()->route_id; }
+
// Gets the dispatchers attached to this message; this may return null if
// there are none. Note that the caller may mutate the set of dispatchers
// (e.g., take ownership of all the dispatchers, leaving the vector empty).
@@ -200,9 +204,10 @@
// |SerializeAndCloseDispatchers()| has not been called.
uint32_t total_size;
Type type; // 2 bytes.
- Type unusedforalignment; // 2 bytes.
+ uint16_t unusedforalignment; // 2 bytes.
uint32_t num_bytes;
uint32_t unused;
+ uint64_t route_id;
};
const Header* header() const {
diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc
index 6ebab748..55e03fa 100644
--- a/mojo/edk/system/message_pipe_dispatcher.cc
+++ b/mojo/edk/system/message_pipe_dispatcher.cc
@@ -25,10 +25,13 @@
const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1);
struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher {
+ bool transferable;
+ bool write_error;
+ uint64_t pipe_id; // If transferable is false.
+ // The following members are only set if transferable is true.
// Could be |kInvalidMessagePipeHandleIndex| if the other endpoint of the MP
// was closed.
size_t platform_handle_index;
- bool write_error;
size_t shared_memory_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.)
uint32_t shared_memory_size;
@@ -90,7 +93,8 @@
const MojoCreateMessagePipeOptions* in_options,
MojoCreateMessagePipeOptions* out_options) {
const MojoCreateMessagePipeOptionsFlags kKnownFlags =
- MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE;
+ MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE |
+ MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE;
*out_options = kDefaultCreateOptions;
if (!in_options)
@@ -119,6 +123,7 @@
char* serialized_write_buffer, size_t serialized_write_buffer_size,
std::vector<int>* serialized_read_fds,
std::vector<int>* serialized_write_fds) {
+ CHECK(transferable_);
if (message_pipe.get().is_valid()) {
channel_ = RawChannel::Create(message_pipe.Pass());
@@ -132,8 +137,14 @@
}
}
+void MessagePipeDispatcher::InitNonTransferable(uint64_t pipe_id) {
+ CHECK(!transferable_);
+ pipe_id_ = pipe_id;
+}
+
void MessagePipeDispatcher::InitOnIO() {
base::AutoLock locker(lock());
+ CHECK(transferable_);
calling_init_ = true;
if (channel_)
channel_->Init(this);
@@ -142,10 +153,28 @@
void MessagePipeDispatcher::CloseOnIO() {
base::AutoLock locker(lock());
+ if (transferable_) {
+ if (channel_) {
+ channel_->Shutdown();
+ channel_ = nullptr;
+ }
+ } else {
+ if (non_transferable_state_ == CONNECT_CALLED ||
+ non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) {
+ if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
+ RequestNontransferableChannel();
- if (channel_) {
- channel_->Shutdown();
- channel_ = nullptr;
+ // We can't cancel the pending request yet, since the other side of the
+ // message pipe would want to get pending outgoing messages (if any) or
+ // at least know that this end was closed. So keep this object alive until
+ // then.
+ non_transferable_state_ = WAITING_FOR_CONNECT_TO_CLOSE;
+ AddRef();
+ } else if (non_transferable_state_ == CONNECTED) {
+ internal::g_broker->CloseMessagePipe(pipe_id_, this);
+ non_transferable_state_ = CLOSED;
+ channel_ = nullptr;
+ }
}
}
@@ -153,6 +182,30 @@
return Type::MESSAGE_PIPE;
}
+void MessagePipeDispatcher::GotNonTransferableChannel(RawChannel* channel) {
+ base::AutoLock locker(lock());
+ channel_ = channel;
+ while (!non_transferable_outgoing_message_queue_.IsEmpty()) {
+ channel_->WriteMessage(
+ non_transferable_outgoing_message_queue_.GetMessage());
+ }
+
+ if (non_transferable_state_ == WAITING_FOR_CONNECT_TO_CLOSE) {
+ // We kept this object alive until it's connected, we can release it now.
+ // Since we're in a callback from the Broker, call it asynchronously.
+ internal::g_io_thread_task_runner->PostTask(
+ FROM_HERE,
+ base::Bind(&Broker::CloseMessagePipe,
+ base::Unretained(internal::g_broker), pipe_id_,
+ base::Unretained(this)));
+ non_transferable_state_ = CLOSED;
+ channel_ = nullptr;
+ base::MessageLoop::current()->ReleaseSoon(FROM_HERE, this);
+ } else {
+ non_transferable_state_ = CONNECTED;
+ }
+}
+
#if defined(OS_WIN)
// TODO(jam): this is copied from RawChannelWin till I figure out what's the
// best way we want to share this.
@@ -186,6 +239,14 @@
const SerializedMessagePipeHandleDispatcher* serialization =
static_cast<const SerializedMessagePipeHandleDispatcher*>(source);
+
+ scoped_refptr<MessagePipeDispatcher> rv(
+ new MessagePipeDispatcher(serialization->transferable));
+ if (!rv->transferable_) {
+ rv->InitNonTransferable(serialization->pipe_id);
+ return rv;
+ }
+
if (serialization->shared_memory_size !=
(serialization->serialized_read_buffer_size +
serialization->serialized_write_buffer_size +
@@ -233,8 +294,6 @@
}
}
- scoped_refptr<MessagePipeDispatcher> rv(
- Create(MessagePipeDispatcher::kDefaultCreateOptions));
rv->write_error_ = serialization->write_error;
std::vector<int> serialized_read_fds;
@@ -269,8 +328,15 @@
while (message_queue_size) {
size_t message_size;
- CHECK(MessageInTransit::GetNextMessageSize(
- message_queue_data, message_queue_size, &message_size));
+ if (!MessageInTransit::GetNextMessageSize(
+ message_queue_data, message_queue_size, &message_size)) {
+ NOTREACHED() << "Couldn't read message size from serialized data.";
+ return nullptr;
+ }
+ if (message_size > message_queue_size) {
+ NOTREACHED() << "Invalid serialized message size.";
+ return nullptr;
+ }
MessageInTransit::View message_view(message_size, message_queue_data);
message_queue_size -= message_size;
message_queue_data += message_size;
@@ -333,14 +399,17 @@
return rv;
}
-MessagePipeDispatcher::MessagePipeDispatcher()
+MessagePipeDispatcher::MessagePipeDispatcher(bool transferable)
: channel_(nullptr),
- serialized_(false),
serialized_read_fds_length_(0u),
serialized_write_fds_length_(0u),
serialized_message_fds_length_(0u),
+ pipe_id_(0),
+ non_transferable_state_(WAITING_FOR_READ_OR_WRITE),
+ serialized_(false),
calling_init_(false),
- write_error_(false) {
+ write_error_(false),
+ transferable_(transferable) {
}
MessagePipeDispatcher::~MessagePipeDispatcher() {
@@ -369,6 +438,16 @@
}
void MessagePipeDispatcher::SerializeInternal() {
+ serialized_ = true;
+ if (!transferable_) {
+ CHECK(non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
+ << "Non transferable message pipe being sent after read/write. "
+ << "MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE must be used if "
+ << "the pipe can be sent after it's read or written.";
+ non_transferable_state_ = SERIALISED;
+ return;
+ }
+
// We need to stop watching handle immediately, even though not on IO thread,
// so that other messages aren't read after this.
std::vector<int> serialized_read_fds, serialized_write_fds;
@@ -385,8 +464,6 @@
serialized_write_fds.end());
serialized_write_fds_length_ = serialized_write_fds.size();
channel_ = nullptr;
- if (write_error)
- write_error = true;
} else {
// It's valid that the other side wrote some data and closed its end.
}
@@ -441,20 +518,18 @@
all_platform_handles->at(i) = PlatformHandle();
}
#endif
+ }
serialized_message_queue_.insert(
serialized_message_queue_.end(),
static_cast<const char*>(message->transport_data()->buffer()),
static_cast<const char*>(message->transport_data()->buffer()) +
transport_data_buffer_size);
- }
}
for (size_t i = 0; i < dispatchers.size(); ++i)
dispatchers[i]->TransportEnded();
}
-
- serialized_ = true;
}
scoped_refptr<Dispatcher>
@@ -463,21 +538,24 @@
SerializeInternal();
- // TODO(vtl): Currently, there are no options, so we just use
- // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options
- // too.
- scoped_refptr<MessagePipeDispatcher> rv = Create(kDefaultCreateOptions);
- rv->serialized_platform_handle_ = serialized_platform_handle_.Pass();
- serialized_message_queue_.swap(rv->serialized_message_queue_);
- serialized_read_buffer_.swap(rv->serialized_read_buffer_);
- serialized_write_buffer_.swap(rv->serialized_write_buffer_);
- serialized_fds_.swap(rv->serialized_fds_);
- rv->serialized_read_fds_length_ = serialized_read_fds_length_;
- rv->serialized_write_fds_length_ = serialized_write_fds_length_;
- rv->serialized_message_fds_length_ = serialized_message_fds_length_;
+ scoped_refptr<MessagePipeDispatcher> rv(
+ new MessagePipeDispatcher(transferable_));
rv->serialized_ = true;
- rv->write_error_ = write_error_;
- return scoped_refptr<Dispatcher>(rv.get());
+ if (transferable_) {
+ rv->serialized_platform_handle_ = serialized_platform_handle_.Pass();
+ serialized_message_queue_.swap(rv->serialized_message_queue_);
+ serialized_read_buffer_.swap(rv->serialized_read_buffer_);
+ serialized_write_buffer_.swap(rv->serialized_write_buffer_);
+ serialized_fds_.swap(rv->serialized_fds_);
+ rv->serialized_read_fds_length_ = serialized_read_fds_length_;
+ rv->serialized_write_fds_length_ = serialized_write_fds_length_;
+ rv->serialized_message_fds_length_ = serialized_message_fds_length_;
+ rv->write_error_ = write_error_;
+ } else {
+ rv->pipe_id_ = pipe_id_;
+ rv->non_transferable_state_ = non_transferable_state_;
+ }
+ return rv;
}
MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
@@ -485,15 +563,17 @@
uint32_t num_bytes,
std::vector<DispatcherTransport>* transports,
MojoWriteMessageFlags flags) {
+ lock().AssertAcquired();
DCHECK(!transports ||
(transports->size() > 0 &&
transports->size() <= GetConfiguration().max_message_num_handles));
- lock().AssertAcquired();
-
- if (!channel_ || write_error_)
+ if (write_error_ ||
+ (transferable_ && !channel_) ||
+ (!transferable_ && non_transferable_state_ == CLOSED)) {
return MOJO_RESULT_FAILED_PRECONDITION;
+ }
if (num_bytes > GetConfiguration().max_message_num_bytes)
return MOJO_RESULT_RESOURCE_EXHAUSTED;
@@ -506,7 +586,17 @@
}
message->SerializeAndCloseDispatchers();
- channel_->WriteMessage(message.Pass());
+ if (!transferable_)
+ message->set_route_id(pipe_id_);
+ if (!transferable_ &&
+ (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE ||
+ non_transferable_state_ == CONNECT_CALLED)) {
+ if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE)
+ RequestNontransferableChannel();
+ non_transferable_outgoing_message_queue_.AddMessage(message.Pass());
+ } else {
+ channel_->WriteMessage(message.Pass());
+ }
return MOJO_RESULT_OK;
}
@@ -518,8 +608,17 @@
uint32_t* num_dispatchers,
MojoReadMessageFlags flags) {
lock().AssertAcquired();
- if (channel_)
+ if (channel_) {
channel_->EnsureLazyInitialized();
+ } else if (!transferable_) {
+ if (non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) {
+ RequestNontransferableChannel();
+ return MOJO_RESULT_SHOULD_WAIT;
+ } else if (non_transferable_state_ == CONNECT_CALLED) {
+ return MOJO_RESULT_SHOULD_WAIT;
+ }
+ }
+
DCHECK(!dispatchers || dispatchers->empty());
const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes;
@@ -583,14 +682,22 @@
HandleSignalsState rv;
if (!message_queue_.IsEmpty())
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
- if (channel_ || !message_queue_.IsEmpty())
+ if (!message_queue_.IsEmpty() ||
+ (transferable_ && channel_) ||
+ (!transferable_ && non_transferable_state_ != CLOSED))
rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
- if (channel_ && !write_error_) {
+ if (!write_error_ &&
+ ((transferable_ && channel_) ||
+ (!transferable_ && non_transferable_state_ != CLOSED))) {
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
}
- if (!channel_ || write_error_)
+ if (write_error_ ||
+ (transferable_ && !channel_) ||
+ (!transferable_ &&
+ ((non_transferable_state_ == CLOSED) || is_closed()))) {
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
+ }
rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
return rv;
}
@@ -601,8 +708,13 @@
uintptr_t context,
HandleSignalsState* signals_state) {
lock().AssertAcquired();
- if (channel_)
+ if (channel_) {
channel_->EnsureLazyInitialized();
+ } else if (!transferable_ &&
+ non_transferable_state_ == WAITING_FOR_READ_OR_WRITE) {
+ RequestNontransferableChannel();
+ }
+
HandleSignalsState state = GetHandleSignalsStateImplNoLock();
if (state.satisfies(signals)) {
if (signals_state)
@@ -653,6 +765,8 @@
CloseImplNoLock();
SerializedMessagePipeHandleDispatcher* serialization =
static_cast<SerializedMessagePipeHandleDispatcher*>(destination);
+ serialization->transferable = transferable_;
+ serialization->pipe_id = pipe_id_;
if (serialized_platform_handle_.is_valid()) {
serialization->platform_handle_index = platform_handles->size();
platform_handles->push_back(serialized_platform_handle_.release());
@@ -796,7 +910,18 @@
// called, that is safe since this class always does a PostTask to the IO
// thread to self destruct.
if (channel_ && error != ERROR_WRITE) {
- channel_->Shutdown();
+ if (transferable_) {
+ channel_->Shutdown();
+ } else {
+ CHECK_NE(non_transferable_state_, CLOSED);
+ // Since we're in a callback from the Broker, call it asynchronously.
+ internal::g_io_thread_task_runner->PostTask(
+ FROM_HERE,
+ base::Bind(&Broker::CloseMessagePipe,
+ base::Unretained(internal::g_broker), pipe_id_,
+ base::Unretained(this)));
+ non_transferable_state_ = CLOSED;
+ }
channel_ = nullptr;
}
awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
@@ -824,10 +949,14 @@
if ((*transports)[i].GetType() == Dispatcher::Type::MESSAGE_PIPE) {
MessagePipeDispatcher* mp =
static_cast<MessagePipeDispatcher*>(((*transports)[i]).dispatcher());
- if (channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) {
+ if (transferable_ && mp->transferable_ &&
+ channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) {
// The other case should have been disallowed by |Core|. (Note: |port|
// is the peer port of the handle given to |WriteMessage()|.)
return MOJO_RESULT_INVALID_ARGUMENT;
+ } else if (!transferable_ && !mp->transferable_ &&
+ pipe_id_ == mp->pipe_id_) {
+ return MOJO_RESULT_INVALID_ARGUMENT;
}
}
}
@@ -849,5 +978,19 @@
return MOJO_RESULT_OK;
}
+void MessagePipeDispatcher::RequestNontransferableChannel() {
+ lock().AssertAcquired();
+ CHECK(!transferable_);
+ CHECK_EQ(non_transferable_state_, WAITING_FOR_READ_OR_WRITE);
+ non_transferable_state_ = CONNECT_CALLED;
+
+ // PostTask since the broker can call us back synchronously.
+ internal::g_io_thread_task_runner->PostTask(
+ FROM_HERE,
+ base::Bind(&Broker::ConnectMessagePipe,
+ base::Unretained(internal::g_broker), pipe_id_,
+ base::Unretained(this)));
+}
+
} // namespace edk
} // namespace mojo
diff --git a/mojo/edk/system/message_pipe_dispatcher.h b/mojo/edk/system/message_pipe_dispatcher.h
index 5248583..d70d3bc 100644
--- a/mojo/edk/system/message_pipe_dispatcher.h
+++ b/mojo/edk/system/message_pipe_dispatcher.h
@@ -9,6 +9,7 @@
#include "mojo/edk/embedder/platform_channel_pair.h"
#include "mojo/edk/system/awakable_list.h"
#include "mojo/edk/system/dispatcher.h"
+#include "mojo/edk/system/message_in_transit_queue.h"
#include "mojo/edk/system/raw_channel.h"
#include "mojo/edk/system/system_impl_export.h"
#include "mojo/public/cpp/system/macros.h"
@@ -27,8 +28,10 @@
static const MojoCreateMessagePipeOptions kDefaultCreateOptions;
static scoped_refptr<MessagePipeDispatcher> Create(
- const MojoCreateMessagePipeOptions& /*validated_options*/) {
- return make_scoped_refptr(new MessagePipeDispatcher());
+ const MojoCreateMessagePipeOptions& validated_options) {
+ return make_scoped_refptr(new MessagePipeDispatcher(
+ !!(validated_options.flags &
+ MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE)));
}
// Validates and/or sets default options for |MojoCreateMessagePipeOptions|.
@@ -40,6 +43,7 @@
const MojoCreateMessagePipeOptions* in_options,
MojoCreateMessagePipeOptions* out_options);
+ // Initializes a transferable message pipe.
// Must be called before any other methods. (This method is not thread-safe.)
void Init(
ScopedPlatformHandle message_pipe,
@@ -48,9 +52,24 @@
std::vector<int>* serialized_read_fds,
std::vector<int>* serialized_write_fds);
+ // Initializes a nontransferable message pipe.
+ void InitNonTransferable(uint64_t pipe_id);
+
// |Dispatcher| public methods:
Type GetType() const override;
+ // RawChannel::Delegate methods:
+ void OnReadMessage(
+ const MessageInTransit::View& message_view,
+ ScopedPlatformHandleVectorPtr platform_handles) override;
+ void OnError(Error error) override;
+
+ // Called by broker when a route is established between this
+ // MessagePipeDispatcher and another one. This object will receive messages
+ // sent to its pipe_id. It should tag all outgoing messages by calling
+ // MessageInTransit::set_route_id with pipe_id_.
+ void GotNonTransferableChannel(RawChannel* channel);
+
// The "opposite" of |SerializeAndClose()|. (Typically this is called by
// |Dispatcher::Deserialize()|.)
static scoped_refptr<MessagePipeDispatcher> Deserialize(
@@ -59,7 +78,9 @@
PlatformHandleVector* platform_handles);
private:
- MessagePipeDispatcher();
+ // See MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE's definition for an
+ // explanation of what is a transferable pipe.
+ explicit MessagePipeDispatcher(bool transferable);
~MessagePipeDispatcher() override;
void InitOnIO();
@@ -96,12 +117,6 @@
void TransportStarted() override;
void TransportEnded() override;
- // |RawChannel::Delegate methods:
- void OnReadMessage(
- const MessageInTransit::View& message_view,
- ScopedPlatformHandleVectorPtr platform_handles) override;
- void OnError(Error error) override;
-
// Calls ReleaseHandle and serializes the raw channel. This is split into a
// function because it's called in two different ways:
// 1) When serializing "live" dispatchers that are passed to MojoWriteMessage,
@@ -115,14 +130,20 @@
MessageInTransit* message,
std::vector<DispatcherTransport>* transports);
+ // Called whenever a read or write is done on a non-transferable pipe, which
+ // "binds" the pipe id to this object.
+ void RequestNontransferableChannel();
+
// Protected by |lock()|:
RawChannel* channel_;
// Queue of incoming messages that we read from RawChannel but haven't been
// consumed through MojoReadMessage yet.
MessageInTransitQueue message_queue_;
+
+ // The following members are only used when transferable_ is false;
+
// When sending MP, contains serialized message_queue_.
- bool serialized_;
std::vector<char> serialized_message_queue_;
std::vector<char> serialized_read_buffer_;
std::vector<char> serialized_write_buffer_;
@@ -133,14 +154,40 @@
size_t serialized_write_fds_length_;
size_t serialized_message_fds_length_;
ScopedPlatformHandle serialized_platform_handle_;
+
+ // The following members are only used when transferable_ is true;
+
+ // The unique id shared by both ends of a non-transferable message pipe. This
+ // is held on until a read or write are done, and at that point it's used to
+ // get a RawChannel.
+ uint64_t pipe_id_;
+ enum NonTransferableState {
+ WAITING_FOR_READ_OR_WRITE,
+ CONNECT_CALLED,
+ CONNECTED,
+ WAITING_FOR_CONNECT_TO_CLOSE,
+ CLOSED,
+ SERIALISED,
+ };
+
+ NonTransferableState non_transferable_state_;
+ // Messages that were written while we were waiting to get a RawChannel.
+ MessageInTransitQueue non_transferable_outgoing_message_queue_;
+
+
+ // The following members are used for both modes of transferable_.
+
AwakableList awakable_list_;
// If DispatcherTransport is created. Must be set before lock() is called to
// avoid deadlocks with RawChannel calling us.
base::Lock started_transport_;
+ bool serialized_;
bool calling_init_;
bool write_error_;
+ // Whether it can be sent after read or write.
+ bool transferable_;
MOJO_DISALLOW_COPY_AND_ASSIGN(MessagePipeDispatcher);
};
diff --git a/mojo/edk/system/message_pipe_perftest.cc b/mojo/edk/system/message_pipe_perftest.cc
index 5d59e656..e89143b9 100644
--- a/mojo/edk/system/message_pipe_perftest.cc
+++ b/mojo/edk/system/message_pipe_perftest.cc
@@ -24,8 +24,7 @@
: public test::MultiprocessMessagePipeTestBase {
public:
MultiprocessMessagePipePerfTest()
- : test::MultiprocessMessagePipeTestBase(base::MessageLoop::TYPE_IO),
- message_count_(0),
+ : message_count_(0),
message_size_(0) {}
void SetUpMeasurement(int message_count, size_t message_size) {
@@ -86,11 +85,6 @@
// (which it doesn't reply to). It'll return the number of messages received,
// not including any "quitquitquit" message, modulo 100.
MOJO_MULTIPROCESS_TEST_CHILD_MAIN(PingPongClient) {
- SimplePlatformSupport platform_support;
- base::MessageLoop message_loop(base::MessageLoop::TYPE_IO);
- base::TestIOThread test_io_thread(base::TestIOThread::kAutoStart);
- test::ScopedIPCSupport ipc_support(test_io_thread.task_runner());
-
ScopedPlatformHandle client_platform_handle =
test::MultiprocessTestHelper::client_platform_handle.Pass();
CHECK(client_platform_handle.is_valid());
diff --git a/mojo/edk/system/message_pipe_test_utils.cc b/mojo/edk/system/message_pipe_test_utils.cc
index 1e337fb..c752abc 100644
--- a/mojo/edk/system/message_pipe_test_utils.cc
+++ b/mojo/edk/system/message_pipe_test_utils.cc
@@ -11,16 +11,7 @@
namespace test {
#if !defined(OS_IOS)
-MultiprocessMessagePipeTestBase::MultiprocessMessagePipeTestBase()
- : test_io_thread_(base::TestIOThread::kAutoStart),
- ipc_support_(test_io_thread_.task_runner()) {
-}
-
-MultiprocessMessagePipeTestBase::MultiprocessMessagePipeTestBase(
- base::MessageLoop::Type main_message_loop_type)
- : message_loop_(main_message_loop_type),
- test_io_thread_(base::TestIOThread::kAutoStart),
- ipc_support_(test_io_thread_.task_runner()) {
+MultiprocessMessagePipeTestBase::MultiprocessMessagePipeTestBase() {
}
MultiprocessMessagePipeTestBase::~MultiprocessMessagePipeTestBase() {
diff --git a/mojo/edk/system/message_pipe_test_utils.h b/mojo/edk/system/message_pipe_test_utils.h
index e067329d..837d534a6 100644
--- a/mojo/edk/system/message_pipe_test_utils.h
+++ b/mojo/edk/system/message_pipe_test_utils.h
@@ -5,12 +5,7 @@
#ifndef MOJO_EDK_SYSTEM_MESSAGE_PIPE_TEST_UTILS_H_
#define MOJO_EDK_SYSTEM_MESSAGE_PIPE_TEST_UTILS_H_
-#include "base/message_loop/message_loop.h"
-#include "base/test/test_io_thread.h"
-#include "mojo/edk/embedder/simple_platform_support.h"
-#include "mojo/edk/system/test_utils.h"
#include "mojo/edk/test/multiprocess_test_helper.h"
-#include "mojo/edk/test/scoped_ipc_support.h"
#include "mojo/public/cpp/system/macros.h"
namespace mojo {
@@ -25,19 +20,12 @@
class MultiprocessMessagePipeTestBase : public testing::Test {
public:
MultiprocessMessagePipeTestBase();
- MultiprocessMessagePipeTestBase(
- base::MessageLoop::Type main_message_loop_type);
~MultiprocessMessagePipeTestBase() override;
protected:
- PlatformSupport* platform_support() { return &platform_support_; }
test::MultiprocessTestHelper* helper() { return &helper_; }
private:
- SimplePlatformSupport platform_support_;
- base::MessageLoop message_loop_;
- base::TestIOThread test_io_thread_;
- test::ScopedIPCSupport ipc_support_;
test::MultiprocessTestHelper helper_;
MOJO_DISALLOW_COPY_AND_ASSIGN(MultiprocessMessagePipeTestBase);
diff --git a/mojo/edk/system/message_pipe_unittest.cc b/mojo/edk/system/message_pipe_unittest.cc
index 9283acc..0498b55 100644
--- a/mojo/edk/system/message_pipe_unittest.cc
+++ b/mojo/edk/system/message_pipe_unittest.cc
@@ -16,7 +16,7 @@
MOJO_HANDLE_SIGNAL_PEER_CLOSED;
static const char kHelloWorld[] = "hello world";
-class MessagePipeTest : public test::MojoSystemTest {
+class MessagePipeTest : public testing::Test {
public:
MessagePipeTest() {
CHECK_EQ(MOJO_RESULT_OK, MojoCreateMessagePipe(nullptr, &pipe0_, &pipe1_));
diff --git a/mojo/edk/system/multiprocess_message_pipe_unittest.cc b/mojo/edk/system/multiprocess_message_pipe_unittest.cc
index 4bb3f133..f1afa0e1 100644
--- a/mojo/edk/system/multiprocess_message_pipe_unittest.cc
+++ b/mojo/edk/system/multiprocess_message_pipe_unittest.cc
@@ -16,8 +16,7 @@
#include "base/files/scoped_temp_dir.h"
#include "base/location.h"
#include "base/logging.h"
-#include "base/test/test_io_thread.h"
-#include "build/build_config.h" // TODO(vtl): Remove this.
+#include "build/build_config.h"
#include "mojo/edk/embedder/embedder.h"
#include "mojo/edk/embedder/platform_channel_pair.h"
#include "mojo/edk/embedder/platform_shared_buffer.h"
@@ -44,11 +43,6 @@
// (which it doesn't reply to). It'll return the number of messages received,
// not including any "quitquitquit" message, modulo 100.
MOJO_MULTIPROCESS_TEST_CHILD_MAIN(EchoEcho) {
- SimplePlatformSupport platform_support;
- base::MessageLoop message_loop;
- base::TestIOThread test_io_thread(base::TestIOThread::kAutoStart);
- test::ScopedIPCSupport ipc_support(test_io_thread.task_runner());
-
ScopedPlatformHandle client_platform_handle =
test::MultiprocessTestHelper::client_platform_handle.Pass();
CHECK(client_platform_handle.is_valid());
@@ -208,11 +202,6 @@
}
MOJO_MULTIPROCESS_TEST_CHILD_MAIN(CheckSharedBuffer) {
- SimplePlatformSupport platform_support;
- base::MessageLoop message_loop;
- base::TestIOThread test_io_thread(base::TestIOThread::kAutoStart);
- test::ScopedIPCSupport ipc_support(test_io_thread.task_runner());
-
ScopedPlatformHandle client_platform_handle =
test::MultiprocessTestHelper::client_platform_handle.Pass();
CHECK(client_platform_handle.is_valid());
@@ -382,11 +371,6 @@
}
MOJO_MULTIPROCESS_TEST_CHILD_MAIN(CheckPlatformHandleFile) {
- SimplePlatformSupport platform_support;
- base::MessageLoop message_loop;
- base::TestIOThread test_io_thread(base::TestIOThread::kAutoStart);
- test::ScopedIPCSupport ipc_support(test_io_thread.task_runner());
-
ScopedPlatformHandle client_platform_handle =
test::MultiprocessTestHelper::client_platform_handle.Pass();
CHECK(client_platform_handle.is_valid());
@@ -501,11 +485,6 @@
#endif
MOJO_MULTIPROCESS_TEST_CHILD_MAIN(CheckMessagePipe) {
- SimplePlatformSupport platform_support;
- base::MessageLoop message_loop;
- base::TestIOThread test_io_thread(base::TestIOThread::kAutoStart);
- test::ScopedIPCSupport ipc_support(test_io_thread.task_runner());
-
ScopedPlatformHandle client_platform_handle =
test::MultiprocessTestHelper::client_platform_handle.Pass();
CHECK(client_platform_handle.is_valid());
@@ -666,11 +645,6 @@
}
MOJO_MULTIPROCESS_TEST_CHILD_MAIN(DataPipeConsumer) {
- SimplePlatformSupport platform_support;
- base::MessageLoop message_loop;
- base::TestIOThread test_io_thread(base::TestIOThread::kAutoStart);
- test::ScopedIPCSupport ipc_support(test_io_thread.task_runner());
-
ScopedPlatformHandle client_platform_handle =
test::MultiprocessTestHelper::client_platform_handle.Pass();
CHECK(client_platform_handle.is_valid());
diff --git a/mojo/edk/system/raw_channel_unittest.cc b/mojo/edk/system/raw_channel_unittest.cc
index fdc77d8..c2298d6 100644
--- a/mojo/edk/system/raw_channel_unittest.cc
+++ b/mojo/edk/system/raw_channel_unittest.cc
@@ -23,7 +23,9 @@
#include "base/synchronization/waitable_event.h"
#include "base/test/test_io_thread.h"
#include "base/threading/simple_thread.h"
-#include "build/build_config.h" // TODO(vtl): Remove this.
+#include "build/build_config.h"
+#include "mojo/edk/embedder/embedder.h"
+#include "mojo/edk/embedder/embedder_internal.h"
#include "mojo/edk/embedder/platform_channel_pair.h"
#include "mojo/edk/embedder/platform_handle.h"
#include "mojo/edk/embedder/scoped_platform_handle.h"
@@ -73,7 +75,7 @@
// -----------------------------------------------------------------------------
-class RawChannelTest : public test::MojoSystemTest {
+class RawChannelTest : public testing::Test {
public:
RawChannelTest() {}
~RawChannelTest() override {}
@@ -89,6 +91,14 @@
handles[1].reset();
}
+ void FlushIOThread() {
+ base::WaitableEvent event(false, false);
+ internal::g_io_thread_task_runner->PostTask(
+ FROM_HERE,
+ base::Bind(&base::WaitableEvent::Signal, base::Unretained(&event)));
+ event.Wait();
+ }
+
protected:
ScopedPlatformHandle handles[2];
@@ -189,7 +199,7 @@
WriteOnlyRawChannelDelegate delegate;
RawChannel* rc = RawChannel::Create(handles[0].Pass());
TestMessageReaderAndChecker checker(handles[1].get());
- test_io_thread()->PostTaskAndWait(
+ internal::g_io_thread_task_runner->PostTask(
FROM_HERE,
base::Bind(&InitOnIOThread, rc, base::Unretained(&delegate)));
@@ -205,7 +215,7 @@
for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1)
EXPECT_TRUE(checker.ReadAndCheckNextMessage(size)) << size;
- test_io_thread()->PostTaskAndWait(
+ internal::g_io_thread_task_runner->PostTask(
FROM_HERE, base::Bind(&RawChannel::Shutdown, base::Unretained(rc)));
}
@@ -274,7 +284,7 @@
TEST_F(RawChannelTest, OnReadMessage) {
ReadCheckerRawChannelDelegate delegate;
RawChannel* rc = RawChannel::Create(handles[0].Pass());
- test_io_thread()->PostTaskAndWait(
+ internal::g_io_thread_task_runner->PostTask(
FROM_HERE,
base::Bind(&InitOnIOThread, rc, base::Unretained(&delegate)));
@@ -297,7 +307,7 @@
EXPECT_TRUE(WriteTestMessageToHandle(handles[1].get(), size));
delegate.Wait();
- test_io_thread()->PostTaskAndWait(
+ internal::g_io_thread_task_runner->PostTask(
FROM_HERE, base::Bind(&RawChannel::Shutdown, base::Unretained(rc)));
}
@@ -371,7 +381,7 @@
WriteOnlyRawChannelDelegate writer_delegate;
RawChannel* writer_rc = RawChannel::Create(handles[0].Pass());
- test_io_thread()->PostTaskAndWait(
+ internal::g_io_thread_task_runner->PostTask(
FROM_HERE,
base::Bind(&InitOnIOThread, writer_rc,
base::Unretained(&writer_delegate)));
@@ -379,7 +389,7 @@
ReadCountdownRawChannelDelegate reader_delegate(kNumWriterThreads *
kNumWriteMessagesPerThread);
RawChannel* reader_rc = RawChannel::Create(handles[1].Pass());
- test_io_thread()->PostTaskAndWait(
+ internal::g_io_thread_task_runner->PostTask(
FROM_HERE,
base::Bind(&InitOnIOThread, reader_rc,
base::Unretained(&reader_delegate)));
@@ -401,11 +411,11 @@
// Wait for reading to finish.
reader_delegate.Wait();
- test_io_thread()->PostTaskAndWait(
+ internal::g_io_thread_task_runner->PostTask(
FROM_HERE,
base::Bind(&RawChannel::Shutdown, base::Unretained(reader_rc)));
- test_io_thread()->PostTaskAndWait(
+ internal::g_io_thread_task_runner->PostTask(
FROM_HERE,
base::Bind(&RawChannel::Shutdown, base::Unretained(writer_rc)));
}
@@ -470,9 +480,10 @@
TEST_F(RawChannelTest, OnError) {
ErrorRecordingRawChannelDelegate delegate(0, true, true);
RawChannel* rc = RawChannel::Create(handles[0].Pass());
- test_io_thread()->PostTaskAndWait(
+ internal::g_io_thread_task_runner->PostTask(
FROM_HERE,
base::Bind(&InitOnIOThread, rc, base::Unretained(&delegate)));
+ FlushIOThread();
// Close the handle of the other end, which should make writing fail.
handles[1].reset();
@@ -491,7 +502,7 @@
// notification. (If we actually get another one, |OnError()| crashes.)
test::Sleep(test::DeadlineFromMilliseconds(20));
- test_io_thread()->PostTaskAndWait(
+ internal::g_io_thread_task_runner->PostTask(
FROM_HERE, base::Bind(&RawChannel::Shutdown, base::Unretained(rc)));
}
@@ -513,9 +524,10 @@
// messages that were written.
ErrorRecordingRawChannelDelegate delegate(kMessageCount, true, true);
RawChannel* rc = RawChannel::Create(handles[0].Pass());
- test_io_thread()->PostTaskAndWait(
+ internal::g_io_thread_task_runner->PostTask(
FROM_HERE,
base::Bind(&InitOnIOThread, rc, base::Unretained(&delegate)));
+ FlushIOThread();
EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1)));
@@ -528,7 +540,7 @@
// And then we should get a read error.
delegate.WaitForReadError();
- test_io_thread()->PostTaskAndWait(
+ internal::g_io_thread_task_runner->PostTask(
FROM_HERE, base::Bind(&RawChannel::Shutdown, base::Unretained(rc)));
}
@@ -597,13 +609,13 @@
WriteOnlyRawChannelDelegate write_delegate;
RawChannel* rc_write = RawChannel::Create(handles[0].Pass());
- test_io_thread()->PostTaskAndWait(
+ internal::g_io_thread_task_runner->PostTask(
FROM_HERE,
base::Bind(&InitOnIOThread, rc_write, base::Unretained(&write_delegate)));
ReadPlatformHandlesCheckerRawChannelDelegate read_delegate;
RawChannel* rc_read = RawChannel::Create(handles[1].Pass());
- test_io_thread()->PostTaskAndWait(
+ internal::g_io_thread_task_runner->PostTask(
FROM_HERE,
base::Bind(&InitOnIOThread, rc_read, base::Unretained(&read_delegate)));
@@ -633,10 +645,10 @@
read_delegate.Wait();
- test_io_thread()->PostTaskAndWait(
+ internal::g_io_thread_task_runner->PostTask(
FROM_HERE,
base::Bind(&RawChannel::Shutdown, base::Unretained(rc_read)));
- test_io_thread()->PostTaskAndWait(
+ internal::g_io_thread_task_runner->PostTask(
FROM_HERE,
base::Bind(&RawChannel::Shutdown, base::Unretained(rc_write)));
}
diff --git a/mojo/edk/system/routed_raw_channel.cc b/mojo/edk/system/routed_raw_channel.cc
new file mode 100644
index 0000000..e2fe67e
--- /dev/null
+++ b/mojo/edk/system/routed_raw_channel.cc
@@ -0,0 +1,168 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/edk/system/routed_raw_channel.h"
+
+#include "base/bind.h"
+#include "base/logging.h"
+#include "mojo/edk/embedder/embedder_internal.h"
+#include "mojo/edk/system/message_pipe_dispatcher.h"
+
+namespace mojo {
+namespace edk {
+
+namespace {
+const uint64_t kInternalRoutingId = 0;
+
+// These are messages sent over our internal routing id above, meant for the
+// other side's RoutedRawChannel to dispatch.
+enum InternalMessages {
+ ROUTE_CLOSED = 0,
+};
+}
+
+RoutedRawChannel::PendingMessage::PendingMessage() {
+}
+
+RoutedRawChannel::PendingMessage::~PendingMessage() {
+}
+
+RoutedRawChannel::RoutedRawChannel(
+ ScopedPlatformHandle handle,
+ const base::Callback<void(RoutedRawChannel*)>& destruct_callback)
+ : channel_(RawChannel::Create(handle.Pass())),
+ destruct_callback_(destruct_callback) {
+ internal::g_io_thread_task_runner->PostTask(
+ FROM_HERE,
+ base::Bind(&RawChannel::Init, base::Unretained(channel_), this));
+ internal::g_io_thread_task_runner->PostTask(
+ FROM_HERE,
+ base::Bind(&RawChannel::EnsureLazyInitialized,
+ base::Unretained(channel_)));
+}
+
+void RoutedRawChannel::AddRoute(uint64_t pipe_id, MessagePipeDispatcher* pipe) {
+ CHECK_NE(pipe_id, kInternalRoutingId) << kInternalRoutingId << " is reserved";
+ base::AutoLock auto_lock(lock_);
+ CHECK(routes_.find(pipe_id) == routes_.end());
+ routes_[pipe_id] = pipe;
+
+ for (size_t i = 0; i < pending_messages_.size();) {
+ MessageInTransit::View view(pending_messages_[i]->message.size(),
+ &pending_messages_[i]->message[0]);
+ if (view.route_id() == pipe_id) {
+ pipe->OnReadMessage(view, pending_messages_[i]->handles.Pass());
+ pending_messages_.erase(pending_messages_.begin() + i);
+ } else {
+ ++i;
+ }
+ }
+
+ if (close_routes_.find(pipe_id) != close_routes_.end())
+ pipe->OnError(ERROR_READ_SHUTDOWN);
+}
+
+void RoutedRawChannel::RemoveRoute(uint64_t pipe_id,
+ MessagePipeDispatcher* pipe) {
+ base::AutoLock auto_lock(lock_);
+ CHECK(routes_.find(pipe_id) != routes_.end());
+ CHECK_EQ(routes_[pipe_id], pipe);
+ routes_.erase(pipe_id);
+
+ // Only send a message to the other side to close the route if we hadn't
+ // received a close route message. Otherwise they would keep going back and
+ // forth.
+ if (close_routes_.find(pipe_id) != close_routes_.end()) {
+ close_routes_.erase(pipe_id);
+ } else if (channel_) {
+ // Default route id of 0 to reach the other side's RoutedRawChannel.
+ char message_data[sizeof(char) + sizeof(uint64_t)];
+ message_data[0] = ROUTE_CLOSED;
+ memcpy(&message_data[1], &pipe_id, sizeof(uint64_t));
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(
+ MessageInTransit::Type::MESSAGE, arraysize(message_data),
+ message_data));
+ message->set_route_id(kInternalRoutingId);
+ channel_->WriteMessage(message.Pass());
+ }
+
+ if (!channel_ && routes_.empty()) {
+ // PostTask to avoid reentrancy since the broker might be calling us.
+ base::MessageLoop::current()->DeleteSoon(FROM_HERE, this);
+ }
+}
+
+RoutedRawChannel::~RoutedRawChannel() {
+ destruct_callback_.Run(this);
+}
+
+void RoutedRawChannel::OnReadMessage(
+ const MessageInTransit::View& message_view,
+ ScopedPlatformHandleVectorPtr platform_handles) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ // Note: normally, when a message arrives here we should find a corresponding
+ // entry for the MessagePipeDispatcher with the given route_id. However it is
+ // possible that they just connected, and due to race conditions one side has
+ // connected and sent a message (and even closed) before the other side had a
+ // chance to register with this RoutedRawChannel. In that case, we must buffer
+ // all messages.
+ base::AutoLock auto_lock(lock_);
+ uint64_t route_id = message_view.route_id();
+ if (route_id == kInternalRoutingId) {
+ if (message_view.num_bytes() != sizeof(char) + sizeof(uint64_t)) {
+ NOTREACHED() << "Invalid internal message in RoutedRawChannel.";
+ return;
+ }
+ const char* bytes = static_cast<const char*>(message_view.bytes());
+ if (bytes[0] != ROUTE_CLOSED) {
+ NOTREACHED() << "Unknown internal message in RoutedRawChannel.";
+ return;
+ }
+ uint64_t closed_route = *reinterpret_cast<const uint64_t*>(&bytes[1]);
+ if (close_routes_.find(closed_route) != close_routes_.end()) {
+ NOTREACHED() << "Should only receive one ROUTE_CLOSED per route.";
+ return;
+ }
+ close_routes_.insert(closed_route);
+ if (routes_.find(closed_route) == routes_.end())
+ return; // This side hasn't connected yet.
+
+ routes_[closed_route]->OnError(ERROR_READ_SHUTDOWN);
+ return;
+ }
+
+ if (routes_.find(route_id) != routes_.end()) {
+ routes_[route_id]->OnReadMessage(message_view, platform_handles.Pass());
+ } else {
+ scoped_ptr<PendingMessage> msg(new PendingMessage);
+ msg->message.resize(message_view.total_size());
+ memcpy(&msg->message[0], message_view.main_buffer(),
+ message_view.total_size());
+ msg->handles = platform_handles.Pass();
+ pending_messages_.push_back(msg.Pass());
+ }
+}
+
+void RoutedRawChannel::OnError(Error error) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ bool destruct = false;
+ {
+ base::AutoLock auto_lock(lock_);
+
+ channel_->Shutdown();
+ channel_ = nullptr;
+ if (routes_.empty()) {
+ destruct = true;
+ } else {
+ for (auto it = routes_.begin(); it != routes_.end(); ++it)
+ it->second->OnError(error);
+ }
+ }
+
+ if (destruct)
+ delete this;
+}
+
+} // namespace edk
+} // namespace mojo
diff --git a/mojo/edk/system/routed_raw_channel.h b/mojo/edk/system/routed_raw_channel.h
new file mode 100644
index 0000000..c27b86c
--- /dev/null
+++ b/mojo/edk/system/routed_raw_channel.h
@@ -0,0 +1,78 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_EDK_SYSTEM_ROUTED_RAW_CHANNEL_H_
+#define MOJO_EDK_SYSTEM_ROUTED_RAW_CHANNEL_H_
+
+#include "base/callback.h"
+#include "base/containers/hash_tables.h"
+#include "base/macros.h"
+#include "base/synchronization/lock.h"
+#include "mojo/edk/embedder/platform_handle_vector.h"
+#include "mojo/edk/embedder/scoped_platform_handle.h"
+#include "mojo/edk/system/broker.h"
+#include "mojo/edk/system/raw_channel.h"
+#include "mojo/edk/system/system_impl_export.h"
+
+namespace mojo {
+namespace edk {
+class RawChannel;
+
+// This class wraps a RawChannel and adds routing on top of it.
+// Non-transferable MessagePipeDispatchers call here, indirectly through the
+// Broker interface, to associate with their pipe_id.
+class RoutedRawChannel : public RawChannel::Delegate {
+ public:
+ RoutedRawChannel(
+ ScopedPlatformHandle handle,
+ const base::Callback<void(RoutedRawChannel*)>& destruct_callback);
+
+ // Connect the given |pipe| with the pipe_id route. Only non-transferable
+ // message pipes can call this, and they can only call it once.
+ void AddRoute(uint64_t pipe_id, MessagePipeDispatcher* pipe);
+
+ // Called when the MessagePipeDispatcher is going away.
+ void RemoveRoute(uint64_t pipe_id, MessagePipeDispatcher* pipe);
+
+ RawChannel* channel() { return channel_; }
+
+ private:
+ friend class base::DeleteHelper<RoutedRawChannel>;
+ ~RoutedRawChannel() override;
+
+ // RawChannel::Delegate implementation:
+ void OnReadMessage(
+ const MessageInTransit::View& message_view,
+ ScopedPlatformHandleVectorPtr platform_handles) override;
+ void OnError(Error error) override;
+
+ RawChannel* channel_;
+
+ base::Lock lock_; // Guards access to below.
+ base::hash_map<uint64_t, MessagePipeDispatcher*> routes_;
+
+ // If we got messages before the route was added (due to race conditions
+ // between different channels), this is used to buffer them.
+ struct PendingMessage {
+ PendingMessage();
+ ~PendingMessage();
+ std::vector<char> message;
+ ScopedPlatformHandleVectorPtr handles;
+ };
+ std::vector<scoped_ptr<PendingMessage>> pending_messages_;
+
+ // If we got a ROUTE_CLOSED message for a route before it registered with us,
+ // we need to hold on to this information so that we can tell it that the
+ // connetion is closed when it does connect.
+ base::hash_set<uint64_t> close_routes_;
+
+ base::Callback<void(RoutedRawChannel*)> destruct_callback_;
+
+ DISALLOW_COPY_AND_ASSIGN(RoutedRawChannel);
+};
+
+} // namespace edk
+} // namespace mojo
+
+#endif // MOJO_EDK_SYSTEM_ROUTED_RAW_CHANNEL_H_
diff --git a/mojo/edk/system/run_all_unittests.cc b/mojo/edk/system/run_all_unittests.cc
index b68fd83..4421aa6 100644
--- a/mojo/edk/system/run_all_unittests.cc
+++ b/mojo/edk/system/run_all_unittests.cc
@@ -5,9 +5,11 @@
#include "base/bind.h"
#include "base/command_line.h"
#include "base/test/launcher/unit_test_launcher.h"
+#include "base/test/test_io_thread.h"
#include "base/test/test_suite.h"
#include "mojo/edk/embedder/embedder.h"
#include "mojo/edk/test/multiprocess_test_helper.h"
+#include "mojo/edk/test/scoped_ipc_support.h"
#include "testing/gtest/include/gtest/gtest.h"
int main(int argc, char** argv) {
@@ -31,6 +33,12 @@
mojo::edk::Init();
+ base::TestIOThread test_io_thread(base::TestIOThread::kAutoStart);
+ // Leak this because its destructor calls mojo::edk::ShutdownIPCSupport which
+ // really does nothing in the new EDK but does depend on the current message
+ // loop, which is destructed inside base::LaunchUnitTests.
+ new mojo::edk::test::ScopedIPCSupport(test_io_thread.task_runner());
+
return base::LaunchUnitTests(
argc, argv,
base::Bind(&base::TestSuite::Run, base::Unretained(&test_suite)));
diff --git a/mojo/edk/system/test_utils.cc b/mojo/edk/system/test_utils.cc
index 49e346b..0549f86 100644
--- a/mojo/edk/system/test_utils.cc
+++ b/mojo/edk/system/test_utils.cc
@@ -69,15 +69,6 @@
return static_cast<MojoDeadline>(result);
}
-
-MojoSystemTest::MojoSystemTest()
- : test_io_thread_(base::TestIOThread::kAutoStart),
- ipc_support_(test_io_thread_.task_runner()) {
-}
-
-MojoSystemTest::~MojoSystemTest() {
-}
-
} // namespace test
} // namespace edk
} // namespace mojo
diff --git a/mojo/edk/system/test_utils.h b/mojo/edk/system/test_utils.h
index cf244b3..46398d9 100644
--- a/mojo/edk/system/test_utils.h
+++ b/mojo/edk/system/test_utils.h
@@ -5,9 +5,7 @@
#ifndef MOJO_EDK_SYSTEM_TEST_UTILS_H_
#define MOJO_EDK_SYSTEM_TEST_UTILS_H_
-#include "base/test/test_io_thread.h"
#include "base/time/time.h"
-#include "mojo/edk/test/scoped_ipc_support.h"
#include "mojo/public/c/system/types.h"
#include "mojo/public/cpp/system/macros.h"
#include "testing/gtest/include/gtest/gtest.h"
@@ -54,23 +52,6 @@
MOJO_DISALLOW_COPY_AND_ASSIGN(Stopwatch);
};
-// A base class which initializes and shuts down the necessary objects so that
-// Mojo system calls can be made.
-class MojoSystemTest : public testing::Test {
- public:
- MojoSystemTest();
- ~MojoSystemTest() override;
-
- base::TestIOThread* test_io_thread() { return &test_io_thread_; }
-
- private:
- base::MessageLoop message_loop_;
- base::TestIOThread test_io_thread_;
- test::ScopedIPCSupport ipc_support_;
-
- MOJO_DISALLOW_COPY_AND_ASSIGN(MojoSystemTest);
-};
-
} // namespace test
} // namespace edk
} // namespace mojo
diff --git a/mojo/edk/test/BUILD.gn b/mojo/edk/test/BUILD.gn
index 9a52c99..a77373b 100644
--- a/mojo/edk/test/BUILD.gn
+++ b/mojo/edk/test/BUILD.gn
@@ -54,6 +54,7 @@
":test_support_impl",
"//base",
"//base/test:test_support",
+ "//mojo/edk/test:test_support",
"//mojo/public/c/test_support",
# TODO(use_chrome_edk): temporary since the Mojo wrapper primitives are
diff --git a/mojo/edk/test/multiprocess_test_helper_unittest.cc b/mojo/edk/test/multiprocess_test_helper_unittest.cc
index b3ae11d..7e99fb27a 100644
--- a/mojo/edk/test/multiprocess_test_helper_unittest.cc
+++ b/mojo/edk/test/multiprocess_test_helper_unittest.cc
@@ -7,6 +7,7 @@
#include "base/logging.h"
#include "build/build_config.h"
#include "mojo/edk/embedder/scoped_platform_handle.h"
+#include "mojo/edk/system/test_utils.h"
#include "mojo/edk/test/test_utils.h"
#include "testing/gtest/include/gtest/gtest.h"
diff --git a/mojo/edk/test/run_all_perftests.cc b/mojo/edk/test/run_all_perftests.cc
index d700bc3..c277a38 100644
--- a/mojo/edk/test/run_all_perftests.cc
+++ b/mojo/edk/test/run_all_perftests.cc
@@ -4,17 +4,32 @@
#include "base/command_line.h"
#include "base/test/perf_test_suite.h"
+#include "base/test/test_io_thread.h"
#include "mojo/edk/embedder/embedder.h"
+#include "mojo/edk/test/multiprocess_test_helper.h"
+#include "mojo/edk/test/scoped_ipc_support.h"
#include "mojo/edk/test/test_support_impl.h"
#include "mojo/public/tests/test_support_private.h"
int main(int argc, char** argv) {
- mojo::edk::Init();
- mojo::test::TestSupport::Init(new mojo::edk::test::TestSupportImpl());
+ base::PerfTestSuite test(argc, argv);
+
+ // Must be run before mojo::edk::Init.
+ if (base::CommandLine::ForCurrentProcess()->HasSwitch(
+ mojo::edk::test::kBrokerHandleSwitch)) {
+ mojo::edk::PreInitializeChildProcess();
+ }
// TODO(use_chrome_edk): temporary to force new EDK.
- base::PerfTestSuite test(argc, argv);
base::CommandLine::ForCurrentProcess()->AppendSwitch("--use-new-edk");
+ mojo::edk::Init();
+ base::TestIOThread test_io_thread(base::TestIOThread::kAutoStart);
+ // Leak this because its destructor calls mojo::edk::ShutdownIPCSupport which
+ // really does nothing in the new EDK but does depend on the current message
+ // loop, which is destructed inside base::LaunchUnitTests.
+ new mojo::edk::test::ScopedIPCSupport(test_io_thread.task_runner());
+ mojo::test::TestSupport::Init(new mojo::edk::test::TestSupportImpl());
+
return test.Run();
}
diff --git a/mojo/mojo_edk.gyp b/mojo/mojo_edk.gyp
index 07edb34..a8cbc82 100644
--- a/mojo/mojo_edk.gyp
+++ b/mojo/mojo_edk.gyp
@@ -105,6 +105,8 @@
'edk/system/raw_channel.h',
'edk/system/raw_channel_posix.cc',
'edk/system/raw_channel_win.cc',
+ 'edk/system/routed_raw_channel.cc',
+ 'edk/system/routed_raw_channel.h',
'edk/system/shared_buffer_dispatcher.cc',
'edk/system/shared_buffer_dispatcher.h',
'edk/system/simple_dispatcher.cc',
diff --git a/mojo/public/c/system/message_pipe.h b/mojo/public/c/system/message_pipe.h
index d42c3fc..4f5d4c78 100644
--- a/mojo/public/c/system/message_pipe.h
+++ b/mojo/public/c/system/message_pipe.h
@@ -18,17 +18,26 @@
// |uint32_t struct_size|: Set to the size of the
// |MojoCreateMessagePipeOptions| struct. (Used to allow for future
// extensions.)
-// |MojoCreateMessagePipeOptionsFlags flags|: Reserved for future use.
+// |MojoCreateMessagePipeOptionsFlags flags|: Used to specify different modes
+// of operation.
// |MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE|: No flags; default mode.
+// |MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE|: The message pipe
+// can be sent over another pipe after it's been read, written or
+// waited. This mode makes message pipes use more resources (one OS
+// pipe each), so only specify if this functionality is required.
typedef uint32_t MojoCreateMessagePipeOptionsFlags;
#ifdef __cplusplus
const MojoCreateMessagePipeOptionsFlags
MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE = 0;
+const MojoCreateMessagePipeOptionsFlags
+ MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE = 1;
#else
#define MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE \
((MojoCreateMessagePipeOptionsFlags)0)
+#define MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE \
+ ((MojoCreateMessagePipeOptionsFlags)1)
#endif
MOJO_STATIC_ASSERT(MOJO_ALIGNOF(int64_t) == 8, "int64_t has weird alignment");
diff --git a/mojo/runner/host/child_process.cc b/mojo/runner/host/child_process.cc
index 0dd6d84..a9f07bb 100644
--- a/mojo/runner/host/child_process.cc
+++ b/mojo/runner/host/child_process.cc
@@ -124,7 +124,9 @@
// create a message pipe which requires this code to be run first.
embedder::InitIPCSupport(embedder::ProcessType::NONE, this, io_runner_,
embedder::ScopedPlatformHandle());
+ }
+ void StartControllerThread() {
// Create and start our controller thread.
base::Thread::Options controller_thread_options;
controller_thread_options.message_loop_type =
@@ -305,7 +307,6 @@
ScopedMessagePipeHandle host_message_pipe(embedder::CreateChannel(
platform_channel.Pass(), base::Bind(&DidCreateChannel), io_task_runner));
-#if defined(OS_WIN)
if (base::CommandLine::ForCurrentProcess()->HasSwitch("use-new-edk")) {
// When using the new Mojo EDK, each message pipe is backed by a platform
// handle. The one platform handle that comes on the command line is used
@@ -339,9 +340,6 @@
#endif
)));
}
-#else
- // TODO(jam): hook up on POSIX
-#endif
return host_message_pipe.Pass();
}
@@ -385,6 +383,7 @@
app_context.Init();
ScopedMessagePipeHandle host_message_pipe = InitializeHostMessagePipe(
platform_channel.Pass(), app_context.io_runner());
+ app_context.StartControllerThread();
Blocker blocker;
app_context.controller_runner()->PostTask(
FROM_HERE,
diff --git a/mojo/runner/host/child_process_host.cc b/mojo/runner/host/child_process_host.cc
index cb106933..770afb9 100644
--- a/mojo/runner/host/child_process_host.cc
+++ b/mojo/runner/host/child_process_host.cc
@@ -39,11 +39,8 @@
channel_info_(nullptr),
start_child_process_event_(false, false),
weak_factory_(this) {
-#if defined(OS_WIN)
- // TODO(jam): enable on POSIX
if (base::CommandLine::ForCurrentProcess()->HasSwitch("use-new-edk"))
serializer_platform_channel_pair_.reset(new edk::PlatformChannelPair(true));
-#endif
child_message_pipe_ = embedder::CreateChannel(
platform_channel_pair_.PassServerHandle(),
@@ -71,7 +68,6 @@
DCHECK(!child_process_.IsValid());
DCHECK(child_message_pipe_.is_valid());
-#if defined(OS_WIN)
if (base::CommandLine::ForCurrentProcess()->HasSwitch("use-new-edk")) {
std::string client_handle_as_string =
serializer_platform_channel_pair_
@@ -86,7 +82,6 @@
MOJO_WRITE_MESSAGE_FLAG_NONE);
DCHECK_EQ(rv, MOJO_RESULT_OK);
}
-#endif
controller_.Bind(
InterfacePtrInfo<ChildController>(child_message_pipe_.Pass(), 0u));
@@ -206,7 +201,6 @@
if (child_process_.IsValid()) {
platform_channel_pair_.ChildProcessLaunched();
-#if defined(OS_WIN)
if (serializer_platform_channel_pair_.get()) {
serializer_platform_channel_pair_->ChildProcessLaunched();
mojo::embedder::ChildProcessLaunched(
@@ -220,7 +214,6 @@
#endif
)));
}
-#endif
}
start_child_process_event_.Signal();
}
diff --git a/third_party/mojo/src/mojo/edk/embedder/embedder.cc b/third_party/mojo/src/mojo/edk/embedder/embedder.cc
index 692547ac..00c48ea 100644
--- a/third_party/mojo/src/mojo/edk/embedder/embedder.cc
+++ b/third_party/mojo/src/mojo/edk/embedder/embedder.cc
@@ -65,6 +65,7 @@
void OnShutdownComplete() {
passed_in_delegate_->OnShutdownComplete();
+ delete this;
}
private:
@@ -248,6 +249,8 @@
internal::g_ipc_support->ShutdownOnIOThread();
delete internal::g_ipc_support;
internal::g_ipc_support = nullptr;
+ delete g_wrapper_process_delegate;
+ g_wrapper_process_delegate = nullptr;
}
void ShutdownIPCSupport() {