blob: 952b0ba2abc323a071567ab931bda6d63dd2ac5a [file] [log] [blame]
From 63f1d3210b0dd275118a0d3008c966119e4fe77b Mon Sep 17 00:00:00 2001
From: Ken Rockot <rockot@google.com>
Date: Fri, 27 Sep 2019 05:59:16 +0900
Subject: [PATCH 2/2] [mojo] Support clients rejoining a process network
This change allows Mojo client processes to connect to a new process
network if disconnected from a previous one. Prior to this change, a
process could only belong to one network for its entire lifetime.
This new behavior is useful in cases where a client process may outlive
its original broker process (for example, if the broker crashes) and
should be able to continue operating normally when a new broker process
is established in the system.
Bug: 813112
Change-Id: Ifb5f16f1b39127cf7b469ee798f2d3da159fa432
Tbr: jam@chromium.org
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1821743
Reviewed-by: Wez <wez@chromium.org>
Reviewed-by: Oksana Zhuravlova <oksamyt@chromium.org>
Commit-Queue: Ken Rockot <rockot@google.com>
Cr-Commit-Position: refs/heads/master@{#700414}
CrOS-Libchrome-Original-Commit: 51da4f949e8d0fe08d50b1fff48d29cf5a2fe9f9
---
mojo/core/connection_params.h | 4 ++
mojo/core/core.cc | 10 ++--
mojo/core/core.h | 5 --
mojo/core/invitation_unittest.cc | 88 ++++++++++++++++++++++++++++
mojo/core/node_controller.cc | 69 ++++++++++++++++++++--
mojo/core/node_controller.h | 8 +++
mojo/core/scoped_process_handle.cc | 31 +++++++++-
mojo/core/scoped_process_handle.h | 6 ++
mojo/public/c/system/invitation.h | 11 ++++
mojo/public/cpp/system/invitation.cc | 9 ++-
mojo/public/cpp/system/invitation.h | 5 +-
11 files changed, 225 insertions(+), 21 deletions(-)
diff --git a/mojo/core/connection_params.h b/mojo/core/connection_params.h
index 357892e92..c1f41c031 100644
--- a/mojo/core/connection_params.h
+++ b/mojo/core/connection_params.h
@@ -39,8 +39,12 @@ class MOJO_SYSTEM_IMPL_EXPORT ConnectionParams {
void set_is_async(bool is_async) { is_async_ = is_async; }
bool is_async() const { return is_async_; }
+ void set_leak_endpoint(bool leak_endpoint) { leak_endpoint_ = leak_endpoint; }
+ bool leak_endpoint() const { return leak_endpoint_; }
+
private:
bool is_async_ = false;
+ bool leak_endpoint_ = false;
PlatformChannelEndpoint endpoint_;
PlatformChannelServerEndpoint server_endpoint_;
diff --git a/mojo/core/core.cc b/mojo/core/core.cc
index 14fe542fa..492b47620 100644
--- a/mojo/core/core.cc
+++ b/mojo/core/core.cc
@@ -199,12 +199,6 @@ void Core::SendBrokerClientInvitation(
process_error_callback);
}
-void Core::AcceptBrokerClientInvitation(ConnectionParams connection_params) {
- RequestContext request_context;
- GetNodeController()->AcceptBrokerClientInvitation(
- std::move(connection_params));
-}
-
void Core::ConnectIsolated(ConnectionParams connection_params,
const ports::PortRef& port,
base::StringPiece connection_name) {
@@ -1439,6 +1433,10 @@ MojoResult Core::AcceptInvitation(
connection_params =
ConnectionParams(PlatformChannelEndpoint(std::move(endpoint)));
}
+ if (options &&
+ options->flags & MOJO_ACCEPT_INVITATION_FLAG_LEAK_TRANSPORT_ENDPOINT) {
+ connection_params.set_leak_endpoint(true);
+ }
bool is_isolated =
options && (options->flags & MOJO_ACCEPT_INVITATION_FLAG_ISOLATED);
diff --git a/mojo/core/core.h b/mojo/core/core.h
index b3f557b27..8c561ab06 100644
--- a/mojo/core/core.h
+++ b/mojo/core/core.h
@@ -83,11 +83,6 @@ class MOJO_SYSTEM_IMPL_EXPORT Core {
const std::vector<std::pair<std::string, ports::PortRef>>& attached_ports,
const ProcessErrorCallback& process_error_callback);
- // Accepts an invitation via |connection_params|. The other end of the
- // connection medium in |connection_params| must have been used by some other
- // process to send an invitation.
- void AcceptBrokerClientInvitation(ConnectionParams connection_params);
-
// Extracts a named message pipe endpoint from the broker client invitation
// accepted by this process. Must only be called after
// AcceptBrokerClientInvitation.
diff --git a/mojo/core/invitation_unittest.cc b/mojo/core/invitation_unittest.cc
index 674a4fe80..f6ec61f21 100644
--- a/mojo/core/invitation_unittest.cc
+++ b/mojo/core/invitation_unittest.cc
@@ -14,16 +14,21 @@
#include "base/macros.h"
#include "base/optional.h"
#include "base/path_service.h"
+#include "base/process/process.h"
#include "base/run_loop.h"
#include "base/synchronization/lock.h"
+#include "base/test/bind_test_util.h"
#include "base/test/multiprocess_test.h"
#include "base/test/scoped_task_environment.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "build/build_config.h"
+#include "mojo/core/core.h"
+#include "mojo/core/node_controller.h"
#include "mojo/core/test/mojo_test_base.h"
#include "mojo/public/c/system/invitation.h"
#include "mojo/public/cpp/platform/named_platform_channel.h"
#include "mojo/public/cpp/platform/platform_channel.h"
+#include "mojo/public/cpp/system/invitation.h"
#include "mojo/public/cpp/system/platform_handle.h"
namespace mojo {
@@ -686,6 +691,89 @@ DEFINE_TEST_CLIENT(ProcessErrorsClient) {
EXPECT_EQ(kDisconnectMessage, ReadMessage(pipe));
}
+TEST_F(InvitationTest, Reinvitation) {
+ // The gist of this test is that a process should be able to accept an
+ // invitation, lose its connection to the process network, and then accept a
+ // new invitation to re-establish communication.
+
+ // We pass an extra PlatformChannel endpoint to the child process which it
+ // will use to accept a secondary invitation after we sever its first
+ // connection.
+ PlatformChannel secondary_channel;
+ auto command_line = base::GetMultiProcessTestChildBaseCommandLine();
+ base::LaunchOptions launch_options;
+ PrepareToPassRemoteEndpoint(&secondary_channel, &launch_options,
+ &command_line, kSecondaryChannelHandleSwitch);
+
+ MojoHandle pipe;
+ base::Process child_process = LaunchChildTestClient(
+ "ReinvitationClient", &pipe, 1, TransportType::kChannel,
+ MOJO_SEND_INVITATION_FLAG_NONE, nullptr, 0, &command_line,
+ &launch_options);
+ secondary_channel.RemoteProcessLaunchAttempted();
+
+ // Synchronize end-to-end communication first to ensure the process connection
+ // is fully established.
+ WriteMessage(pipe, kTestMessage1);
+ EXPECT_EQ(kTestMessage2, ReadMessage(pipe));
+
+ // Force-disconnect the child process.
+ Core::Get()->GetNodeController()->ForceDisconnectProcessForTesting(
+ child_process.Pid());
+
+ // The above disconnection should force pipe closure eventually.
+ WaitForSignals(pipe, MOJO_HANDLE_SIGNAL_PEER_CLOSED);
+ MojoClose(pipe);
+
+ // Now use our secondary channel to send a new invitation to the same process.
+ // It should be able to accept the new invitation and re-establish
+ // communication.
+ mojo::OutgoingInvitation new_invitation;
+ auto new_pipe = new_invitation.AttachMessagePipe(0);
+ mojo::OutgoingInvitation::Send(std::move(new_invitation),
+ child_process.Handle(),
+ secondary_channel.TakeLocalEndpoint());
+
+ WriteMessage(new_pipe.get().value(), kTestMessage3);
+ EXPECT_EQ(kTestMessage4, ReadMessage(new_pipe.get().value()));
+ WriteMessage(new_pipe.get().value(), kDisconnectMessage);
+
+ int wait_result = -1;
+ base::WaitForMultiprocessTestChildExit(
+ child_process, TestTimeouts::action_timeout(), &wait_result);
+ child_process.Close();
+ EXPECT_EQ(0, wait_result);
+}
+
+DEFINE_TEST_CLIENT(ReinvitationClient) {
+ MojoHandle pipe;
+ MojoHandle invitation = AcceptInvitation(MOJO_ACCEPT_INVITATION_FLAG_NONE);
+ const uint32_t pipe_name = 0;
+ ASSERT_EQ(MOJO_RESULT_OK, MojoExtractMessagePipeFromInvitation(
+ invitation, &pipe_name, 4, nullptr, &pipe));
+ ASSERT_EQ(MOJO_RESULT_OK, MojoClose(invitation));
+ EXPECT_EQ(kTestMessage1, ReadMessage(pipe));
+ WriteMessage(pipe, kTestMessage2);
+
+ // Wait for the pipe to break due to forced process disconnection.
+ WaitForSignals(pipe, MOJO_HANDLE_SIGNAL_PEER_CLOSED);
+ MojoClose(pipe);
+
+ // Now grab the secondary channel and accept a new invitation from it.
+ PlatformChannelEndpoint new_endpoint =
+ PlatformChannel::RecoverPassedEndpointFromString(
+ base::CommandLine::ForCurrentProcess()->GetSwitchValueASCII(
+ kSecondaryChannelHandleSwitch));
+ auto secondary_invitation =
+ mojo::IncomingInvitation::Accept(std::move(new_endpoint));
+ auto new_pipe = secondary_invitation.ExtractMessagePipe(0);
+
+ // Ensure that the new connection is working end-to-end.
+ EXPECT_EQ(kTestMessage3, ReadMessage(new_pipe.get().value()));
+ WriteMessage(new_pipe.get().value(), kTestMessage4);
+ EXPECT_EQ(kDisconnectMessage, ReadMessage(new_pipe.get().value()));
+}
+
TEST_F(InvitationTest, SendIsolatedInvitation) {
MojoHandle primordial_pipe;
base::Process child_process = LaunchChildTestClient(
diff --git a/mojo/core/node_controller.cc b/mojo/core/node_controller.cc
index 1ac546baf..4ecfbf653 100644
--- a/mojo/core/node_controller.cc
+++ b/mojo/core/node_controller.cc
@@ -214,7 +214,10 @@ void NodeController::AcceptBrokerClientInvitation(
CancelPendingPortMerges();
return;
}
+
+ const bool leak_endpoint = connection_params.leak_endpoint();
connection_params = ConnectionParams(std::move(endpoint));
+ connection_params.set_leak_endpoint(leak_endpoint);
} else {
// For async connections, we instead create a new channel for the broker and
// send a request for the inviting process to bind to it. This avoids doing
@@ -227,6 +230,10 @@ void NodeController::AcceptBrokerClientInvitation(
broker_host_handle = channel.TakeRemoteEndpoint().TakePlatformHandle();
}
#endif
+ // Re-enable port merge operations, which may have been disabled if this isn't
+ // the first invitation accepted by this process.
+ base::AutoLock lock(pending_port_merges_lock_);
+ reject_pending_merges_ = false;
io_task_runner_->PostTask(
FROM_HERE,
@@ -323,6 +330,15 @@ void NodeController::NotifyBadMessageFrom(const ports::NodeName& source_node,
peer->NotifyBadMessage(error);
}
+void NodeController::ForceDisconnectProcessForTesting(
+ base::ProcessId process_id) {
+ io_task_runner_->PostTask(
+ FROM_HERE,
+ base::BindOnce(
+ &NodeController::ForceDisconnectProcessForTestingOnIOThread,
+ base::Unretained(this), process_id));
+}
+
// static
void NodeController::DeserializeRawBytesAsEventForFuzzer(
base::span<const unsigned char> data) {
@@ -413,7 +429,17 @@ void NodeController::AcceptBrokerClientInvitationOnIOThread(
{
base::AutoLock lock(inviter_lock_);
- DCHECK(inviter_name_ == ports::kInvalidNodeName);
+ if (inviter_name_ != ports::kInvalidNodeName) {
+ // We've already accepted an invitation before and are already part of
+ // a different Mojo process network. In order to accept this new one and
+ // remain in a consistent state, we have to purge all peer connections and
+ // start from scratch.
+ {
+ base::AutoUnlock unlock(inviter_lock_);
+ DropAllPeers();
+ }
+ inviter_name_ = ports::kInvalidNodeName;
+ }
// At this point we don't know the inviter's name, so we can't yet insert it
// into our |peers_| map. That will happen as soon as we receive an
@@ -422,10 +448,18 @@ void NodeController::AcceptBrokerClientInvitationOnIOThread(
NodeChannel::Create(this, std::move(connection_params),
Channel::HandlePolicy::kAcceptHandles,
io_task_runner_, ProcessErrorCallback());
- // Prevent the inviter pipe handle from being closed on shutdown. Pipe
- // closure may be used by the inviter to detect the invitee process has
- // exited.
- bootstrap_inviter_channel_->LeakHandleOnShutdown();
+
+ if (connection_params.leak_endpoint()) {
+ // Prevent the inviter pipe handle from being closed on shutdown. Pipe
+ // closure may be used by the inviter to detect that the invited process
+ // has terminated. In such cases, the invited process must not be invited
+ // more than once in its lifetime; otherwise this leak matters.
+ //
+ // Note that this behavior is supported primarily to help adapt legacy
+ // Chrome IPC to Mojo, since channel disconnection is used there as a
+ // signal for normal child process termination.
+ bootstrap_inviter_channel_->LeakHandleOnShutdown();
+ }
}
bootstrap_inviter_channel_->Start();
if (broker_host_handle)
@@ -1260,6 +1294,31 @@ void NodeController::AttemptShutdownIfRequested() {
callback.Run();
}
+void NodeController::ForceDisconnectProcessForTestingOnIOThread(
+ base::ProcessId process_id) {
+#if defined(OS_NACL) || defined(OS_IOS)
+ NOTREACHED();
+#else
+ DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
+ RequestContext request_context;
+
+ // A channel may have multiple aliases since we generate one for any we
+ // invite and then only later refer to it by its own chosen name.
+ NodeMap peers_to_drop;
+ for (auto& peer : peers_) {
+ NodeChannel* channel = peer.second.get();
+ if (channel->HasRemoteProcessHandle()) {
+ base::Process process(channel->CloneRemoteProcessHandle().release());
+ if (process.Pid() == process_id)
+ peers_to_drop.emplace(peer.first, peer.second);
+ }
+ }
+
+ for (auto& peer : peers_to_drop)
+ DropPeer(peer.first, peer.second.get());
+#endif
+}
+
NodeController::IsolatedConnection::IsolatedConnection() = default;
NodeController::IsolatedConnection::IsolatedConnection(
diff --git a/mojo/core/node_controller.h b/mojo/core/node_controller.h
index 980f00fb5..cfd55e757 100644
--- a/mojo/core/node_controller.h
+++ b/mojo/core/node_controller.h
@@ -121,6 +121,11 @@ class MOJO_SYSTEM_IMPL_EXPORT NodeController : public ports::NodeDelegate,
void NotifyBadMessageFrom(const ports::NodeName& source_node,
const std::string& error);
+ // Force-closes the connection to another process to simulate connection
+ // failures for testing. |process_id| must correspond to a process to which
+ // this node has an active NodeChannel.
+ void ForceDisconnectProcessForTesting(base::ProcessId process_id);
+
static void DeserializeRawBytesAsEventForFuzzer(
base::span<const unsigned char> data);
static void DeserializeMessageAsEventForFuzzer(Channel::MessagePtr message);
@@ -240,6 +245,9 @@ class MOJO_SYSTEM_IMPL_EXPORT NodeController : public ports::NodeDelegate,
// possible. If so, shutdown is performed and the shutdown callback is run.
void AttemptShutdownIfRequested();
+ // See |ForceDisconnectProcessForTesting()|.
+ void ForceDisconnectProcessForTestingOnIOThread(base::ProcessId process_id);
+
// These are safe to access from any thread as long as the Node is alive.
Core* const core_;
const ports::NodeName name_;
diff --git a/mojo/core/scoped_process_handle.cc b/mojo/core/scoped_process_handle.cc
index 65dfcf78a..4509acdfc 100644
--- a/mojo/core/scoped_process_handle.cc
+++ b/mojo/core/scoped_process_handle.cc
@@ -10,6 +10,10 @@
#include <windows.h>
#endif
+#if defined(OS_FUCHSIA)
+#include <zircon/syscalls.h>
+#endif
+
namespace mojo {
namespace core {
@@ -30,7 +34,12 @@ base::ProcessHandle GetCurrentProcessHandle() {
ScopedProcessHandle::ScopedProcessHandle() = default;
ScopedProcessHandle::ScopedProcessHandle(base::ProcessHandle handle)
- : handle_(handle) {
+#if defined(OS_FUCHSIA)
+ : process_(handle)
+#else
+ : handle_(handle)
+#endif
+{
DCHECK_NE(handle, GetCurrentProcessHandle());
}
@@ -49,8 +58,20 @@ ScopedProcessHandle ScopedProcessHandle::CloneFrom(base::ProcessHandle handle) {
GetCurrentProcessHandle(), &handle, 0, FALSE,
DUPLICATE_SAME_ACCESS);
DCHECK(ok);
-#endif
return ScopedProcessHandle(handle);
+#elif defined(OS_FUCHSIA)
+ base::ProcessHandle new_handle;
+ zx_status_t status =
+ zx_handle_duplicate(handle, ZX_RIGHT_SAME_RIGHTS, &new_handle);
+ if (status != ZX_OK)
+ return ScopedProcessHandle();
+ return ScopedProcessHandle(new_handle);
+#elif defined(OS_POSIX)
+ return ScopedProcessHandle(handle);
+#else
+#error "Unsupported platform."
+ return ScopedProcessHandle();
+#endif
}
ScopedProcessHandle& ScopedProcessHandle::operator=(ScopedProcessHandle&&) =
@@ -59,6 +80,8 @@ ScopedProcessHandle& ScopedProcessHandle::operator=(ScopedProcessHandle&&) =
bool ScopedProcessHandle::is_valid() const {
#if defined(OS_WIN)
return handle_.IsValid();
+#elif defined(OS_FUCHSIA)
+ return process_.is_valid();
#else
return handle_ != base::kNullProcessHandle;
#endif
@@ -67,6 +90,8 @@ bool ScopedProcessHandle::is_valid() const {
base::ProcessHandle ScopedProcessHandle::get() const {
#if defined(OS_WIN)
return handle_.Get();
+#elif defined(OS_FUCHSIA)
+ return process_.get();
#else
return handle_;
#endif
@@ -75,6 +100,8 @@ base::ProcessHandle ScopedProcessHandle::get() const {
base::ProcessHandle ScopedProcessHandle::release() {
#if defined(OS_WIN)
return handle_.Take();
+#elif defined(OS_FUCHSIA)
+ return process_.release();
#else
return handle_;
#endif
diff --git a/mojo/core/scoped_process_handle.h b/mojo/core/scoped_process_handle.h
index 4677145d9..419db4aec 100644
--- a/mojo/core/scoped_process_handle.h
+++ b/mojo/core/scoped_process_handle.h
@@ -13,6 +13,10 @@
#include "base/win/scoped_handle.h"
#endif
+#if defined(OS_FUCHSIA)
+#include <lib/zx/process.h>
+#endif
+
namespace mojo {
namespace core {
@@ -52,6 +56,8 @@ class ScopedProcessHandle {
private:
#if defined(OS_WIN)
base::win::ScopedHandle handle_;
+#elif defined(OS_FUCHSIA)
+ zx::process process_;
#else
base::ProcessHandle handle_ = base::kNullProcessHandle;
#endif
diff --git a/mojo/public/c/system/invitation.h b/mojo/public/c/system/invitation.h
index 0c5582e39..e63001eb0 100644
--- a/mojo/public/c/system/invitation.h
+++ b/mojo/public/c/system/invitation.h
@@ -229,6 +229,17 @@ typedef uint32_t MojoAcceptInvitationFlags;
// |MOJO_SEND_INVITATION_FLAG_ISOLATED| for details.
#define MOJO_ACCEPT_INVITATION_FLAG_ISOLATED ((MojoAcceptInvitationFlags)1)
+// The transport endpoint used to accept this invitation should be leaked, i.e.
+// never closed until it's implicitly closed on process death. This exists to
+// support adaptation of legacy code to Mojo IPC so that, e.g., a broken pipe
+// can be used as a reliable indication of remote process death.
+//
+// This flag should generally not be used unless strictly necessary, and it is
+// unsafe to use in any situation where a process may accept multiple
+// invitations over the course of its lifetime.
+#define MOJO_ACCEPT_INVITATION_FLAG_LEAK_TRANSPORT_ENDPOINT \
+ ((MojoAcceptInvitationFlags)2)
+
// Options passed to |MojoAcceptInvitation()|.
struct MOJO_ALIGNAS(8) MojoAcceptInvitationOptions {
// The size of this structure, used for versioning.
diff --git a/mojo/public/cpp/system/invitation.cc b/mojo/public/cpp/system/invitation.cc
index 9c438eda4..84be661c8 100644
--- a/mojo/public/cpp/system/invitation.cc
+++ b/mojo/public/cpp/system/invitation.cc
@@ -225,7 +225,8 @@ IncomingInvitation& IncomingInvitation::operator=(IncomingInvitation&& other) =
// static
IncomingInvitation IncomingInvitation::Accept(
- PlatformChannelEndpoint channel_endpoint) {
+ PlatformChannelEndpoint channel_endpoint,
+ MojoAcceptInvitationFlags flags) {
MojoPlatformHandle endpoint_handle;
PlatformHandle::ToMojoPlatformHandle(channel_endpoint.TakePlatformHandle(),
&endpoint_handle);
@@ -237,9 +238,13 @@ IncomingInvitation IncomingInvitation::Accept(
transport_endpoint.num_platform_handles = 1;
transport_endpoint.platform_handles = &endpoint_handle;
+ MojoAcceptInvitationOptions options;
+ options.struct_size = sizeof(options);
+ options.flags = flags;
+
MojoHandle invitation_handle;
MojoResult result =
- MojoAcceptInvitation(&transport_endpoint, nullptr, &invitation_handle);
+ MojoAcceptInvitation(&transport_endpoint, &options, &invitation_handle);
if (result != MOJO_RESULT_OK)
return IncomingInvitation();
diff --git a/mojo/public/cpp/system/invitation.h b/mojo/public/cpp/system/invitation.h
index 6794d2ba7..6fb413d7b 100644
--- a/mojo/public/cpp/system/invitation.h
+++ b/mojo/public/cpp/system/invitation.h
@@ -13,6 +13,7 @@
#include "base/macros.h"
#include "base/process/process_handle.h"
#include "base/strings/string_piece.h"
+#include "mojo/public/c/system/invitation.h"
#include "mojo/public/cpp/platform/platform_channel_endpoint.h"
#include "mojo/public/cpp/platform/platform_channel_server_endpoint.h"
#include "mojo/public/cpp/system/handle.h"
@@ -169,7 +170,9 @@ class MOJO_CPP_SYSTEM_EXPORT IncomingInvitation {
// by |NamedPlatformChannel::ConnectToServer|.
//
// Note that this performs blocking I/O on the calling thread.
- static IncomingInvitation Accept(PlatformChannelEndpoint channel_endpoint);
+ static IncomingInvitation Accept(
+ PlatformChannelEndpoint channel_endpoint,
+ MojoAcceptInvitationFlags flags = MOJO_ACCEPT_INVITATION_FLAG_NONE);
// Like above, but does not perform any blocking I/O. Not all platforms and
// sandbox configurations are compatible with this API. In such cases, the
--
2.27.0.383.g050319c2ae-goog