ipcz: Broker message relay
Implements the ability for messages with driver objects to be split and
partially relayed through a broker node when driver objects cannot be
transmitted directly between two non-broker nodes.
This is ultimately required to support HANDLE transfer by Windows
drivers when sandboxed processes are involved, but it is generalized
here as part of ipcz proper in order to minimize the potential burden on
future driver implementations.
Bug: 1299283
Change-Id: I3ce59ed2d80033a4f5e09aca29c69ec24d5fe14d
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3802771
Reviewed-by: Alex Gough <ajgo@chromium.org>
Commit-Queue: Ken Rockot <rockot@google.com>
Cr-Commit-Position: refs/heads/main@{#1032818}
NOKEYCHECK=True
GitOrigin-RevId: d3e3c665808942117b1d54153742e8553ec96ff9
diff --git a/src/box_test.cc b/src/box_test.cc
index ec814f1..f13203a 100644
--- a/src/box_test.cc
+++ b/src/box_test.cc
@@ -136,6 +136,7 @@
EXPECT_EQ(kMessage2, GetBlobContents(blob_handle));
}
+ WaitForDirectRemoteLink(q);
CloseAll({q, b});
}
@@ -160,6 +161,7 @@
EXPECT_EQ(IPCZ_RESULT_OK, Put(p, kMessage1, {&box, 1}));
}
+ WaitForDirectRemoteLink(p);
CloseAll({p, b});
}
diff --git a/src/ipcz/handle_type.h b/src/ipcz/handle_type.h
index a143c5e..df08a27 100644
--- a/src/ipcz/handle_type.h
+++ b/src/ipcz/handle_type.h
@@ -21,7 +21,10 @@
// DriverObject array and wraps it as a Box object.
kBox = 1,
- // TODO: Add an enumeration for relayed boxes.
+ // A placeholder for a box handle in a split parcel transmission. This occurs
+ // in the directly transmitted half of the parcel, and it signifies the
+ // existence of a corresponding DriverObject in the relayed half.
+ kRelayedBox = 2,
};
} // namespace ipcz
diff --git a/src/ipcz/message.cc b/src/ipcz/message.cc
index 5872d6b..1b686ed 100644
--- a/src/ipcz/message.cc
+++ b/src/ipcz/message.cc
@@ -236,37 +236,12 @@
bool Message::DeserializeUnknownType(const DriverTransport::RawMessage& message,
const DriverTransport& transport) {
- // Copy the data into a local message object to avoid any TOCTOU issues in
- // case `data` is in unsafe shared memory.
- data_.resize(message.data.size());
- memcpy(data_.data(), message.data.data(), message.data.size());
-
- // Validate the header. The message must at least be large enough to encode a
- // v0 MessageHeader, and the encoded header size and version must make sense
- // (e.g. version 0 size must be sizeof(MessageHeader))
- if (data_.size() < sizeof(internal::MessageHeaderV0)) {
- return false;
- }
-
- const auto& message_header =
- *reinterpret_cast<const internal::MessageHeaderV0*>(data_.data());
- if (message_header.version == 0) {
- if (message_header.size != sizeof(internal::MessageHeaderV0)) {
- return false;
- }
- } else {
- if (message_header.size < sizeof(internal::MessageHeaderV0)) {
- return false;
- }
- }
-
- if (message_header.size > data_.size()) {
+ if (!CopyDataAndValidateHeader(message.data)) {
return false;
}
// Validate and deserialize the DriverObject array.
- const uint32_t driver_object_array_offset =
- message_header.driver_object_data_array;
+ const uint32_t driver_object_array_offset = header().driver_object_data_array;
bool all_driver_objects_ok = true;
if (driver_object_array_offset > 0) {
if (!IsArrayValid(*this, driver_object_array_offset,
@@ -295,16 +270,44 @@
return all_driver_objects_ok;
}
-bool Message::DeserializeFromTransport(
- size_t params_size,
- uint32_t params_current_version,
- absl::Span<const internal::ParamMetadata> params_metadata,
- const DriverTransport::RawMessage& message,
- const DriverTransport& transport) {
- if (!DeserializeUnknownType(message, transport)) {
+bool Message::CopyDataAndValidateHeader(absl::Span<const uint8_t> data) {
+ // Copy the data into a local message object to avoid any TOCTOU issues in
+ // case `data` is in unsafe shared memory.
+ data_.resize(data.size());
+ memcpy(data_.data(), data.data(), data.size());
+
+ // The message must at least be large enough to encode a v0 MessageHeader.
+ if (data_.size() < sizeof(internal::MessageHeaderV0)) {
return false;
}
+ // Version 0 header must match MsesageHeaderV0 size exactly. Newer unknown
+ // versions must not be smaller than that.
+ const auto& header =
+ *reinterpret_cast<const internal::MessageHeaderV0*>(data_.data());
+ if (header.version == 0) {
+ if (header.size != sizeof(internal::MessageHeaderV0)) {
+ return false;
+ }
+ } else {
+ if (header.size < sizeof(internal::MessageHeaderV0)) {
+ return false;
+ }
+ }
+
+ // The header's stated size (and thus the start of the parameter payload)
+ // must not run over the edge of the message.
+ if (header.size > data_.size()) {
+ return false;
+ }
+
+ return true;
+}
+
+bool Message::ValidateParameters(
+ size_t params_size,
+ uint32_t params_current_version,
+ absl::Span<const internal::ParamMetadata> params_metadata) {
// Validate parameter data. There must be at least enough bytes following the
// header to encode a StructHeader and to account for all parameter data.
absl::Span<uint8_t> params_data = params_data_view();
@@ -383,4 +386,35 @@
return true;
}
+bool Message::DeserializeFromTransport(
+ size_t params_size,
+ uint32_t params_current_version,
+ absl::Span<const internal::ParamMetadata> params_metadata,
+ const DriverTransport::RawMessage& message,
+ const DriverTransport& transport) {
+ if (!DeserializeUnknownType(message, transport)) {
+ return false;
+ }
+
+ return ValidateParameters(params_size, params_current_version,
+ params_metadata);
+}
+
+bool Message::DeserializeFromRelay(
+ size_t params_size,
+ uint32_t params_current_version,
+ absl::Span<const internal::ParamMetadata> params_metadata,
+ absl::Span<const uint8_t> data,
+ absl::Span<DriverObject> objects) {
+ if (!CopyDataAndValidateHeader(data)) {
+ return false;
+ }
+
+ driver_objects_.resize(objects.size());
+ std::move(objects.begin(), objects.end(), driver_objects_.begin());
+
+ return ValidateParameters(params_size, params_current_version,
+ params_metadata);
+}
+
} // namespace ipcz
diff --git a/src/ipcz/message.h b/src/ipcz/message.h
index 7aa50d2..180f207 100644
--- a/src/ipcz/message.h
+++ b/src/ipcz/message.h
@@ -342,6 +342,19 @@
data_.data());
}
+ // Common helper for vaidation of an incoming message header and basic data
+ // payload size.
+ bool CopyDataAndValidateHeader(absl::Span<const uint8_t> data);
+
+ // Common helper to validate an encoded parameter structure against a specific
+ // message definition. Must only be called on a Message with `data_` already
+ // populated, the header already validated, and DriverObjects already
+ // deserialized into `driver_objects_`.
+ bool ValidateParameters(
+ size_t params_size,
+ uint32_t params_current_version,
+ absl::Span<const internal::ParamMetadata> params_metadata);
+
// Attempts to deserialize a message from raw `data` and `handles` into `this`
// message object, given the `params_size`, `params_current_version` and
// `params_metadata`, which are all generated from message macros at build
@@ -356,6 +369,19 @@
const DriverTransport::RawMessage& message,
const DriverTransport& transport);
+ // Attempts to deserialize a message from raw `data`, given a set of already
+ // deserialized DriverObjects in `objects`. The objects and data here have
+ // been extracted from a message relayed opaquely through the broker. While
+ // each DriverObject has already been validated and deserialized, the
+ // message-specific parameter data and object-field assignments must be
+ // validated here.
+ bool DeserializeFromRelay(
+ size_t params_size,
+ uint32_t params_current_version,
+ absl::Span<const internal::ParamMetadata> params_metadata,
+ absl::Span<const uint8_t> data,
+ absl::Span<DriverObject> objects);
+
// Raw serialized data for this message. This always begins with MessageHeader
// (or potentially some newer or older version thereof), whose actual size
// is determined by the header's `size` field. After that many bytes, a
diff --git a/src/ipcz/message_macros/message_declaration_macros.h b/src/ipcz/message_macros/message_declaration_macros.h
index 4594983..ecfc724 100644
--- a/src/ipcz/message_macros/message_declaration_macros.h
+++ b/src/ipcz/message_macros/message_declaration_macros.h
@@ -20,6 +20,8 @@
~name(); \
bool Deserialize(const DriverTransport::RawMessage& message, \
const DriverTransport& transport); \
+ bool DeserializeRelayed(absl::Span<const uint8_t> data, \
+ absl::Span<DriverObject> objects); \
\
static constexpr internal::ParamMetadata kMetadata[] = {
#define IPCZ_MSG_END() \
diff --git a/src/ipcz/message_macros/message_definition_macros.h b/src/ipcz/message_macros/message_definition_macros.h
index 2ac1232..a373ca6 100644
--- a/src/ipcz/message_macros/message_definition_macros.h
+++ b/src/ipcz/message_macros/message_definition_macros.h
@@ -10,15 +10,20 @@
#define IPCZ_MSG_ID(x)
#define IPCZ_MSG_VERSION(x)
-#define IPCZ_MSG_BEGIN(name, id_decl, version_decl) \
- name::name() = default; \
- name::~name() = default; \
- bool name::Deserialize(const DriverTransport::RawMessage& message, \
- const DriverTransport& transport) { \
- return DeserializeFromTransport(sizeof(ParamsType), kVersion, \
- absl::MakeSpan(kMetadata), message, \
- transport); \
- } \
+#define IPCZ_MSG_BEGIN(name, id_decl, version_decl) \
+ name::name() = default; \
+ name::~name() = default; \
+ bool name::Deserialize(const DriverTransport::RawMessage& message, \
+ const DriverTransport& transport) { \
+ return DeserializeFromTransport(sizeof(ParamsType), kVersion, \
+ absl::MakeSpan(kMetadata), message, \
+ transport); \
+ } \
+ bool name::DeserializeRelayed(absl::Span<const uint8_t> data, \
+ absl::Span<DriverObject> objects) { \
+ return DeserializeFromRelay(sizeof(ParamsType), kVersion, \
+ absl::MakeSpan(kMetadata), data, objects); \
+ } \
constexpr internal::ParamMetadata name::kMetadata[];
#define IPCZ_MSG_END()
diff --git a/src/ipcz/node.cc b/src/ipcz/node.cc
index 5c4cf84..258b5a5 100644
--- a/src/ipcz/node.cc
+++ b/src/ipcz/node.cc
@@ -291,6 +291,31 @@
return true;
}
+bool Node::RelayMessage(const NodeName& from_node, msg::RelayMessage& relay) {
+ ABSL_ASSERT(type_ == Type::kBroker);
+ auto link = GetLink(relay.params().destination);
+ if (!link) {
+ return true;
+ }
+
+ absl::Span<uint8_t> data = relay.GetArrayView<uint8_t>(relay.params().data);
+ msg::AcceptRelayedMessage accept;
+ accept.params().source = from_node;
+ accept.params().data = accept.AllocateArray<uint8_t>(data.size());
+ memcpy(accept.GetArrayData(accept.params().data), data.data(), data.size());
+ accept.params().driver_objects =
+ accept.AppendDriverObjects(relay.driver_objects());
+ link->Transmit(accept);
+ return true;
+}
+
+bool Node::AcceptRelayedMessage(msg::AcceptRelayedMessage& accept) {
+ if (auto link = GetLink(accept.params().source)) {
+ link->DispatchRelayedMessage(accept);
+ }
+ return true;
+}
+
void Node::DropLink(const NodeName& name) {
Ref<NodeLink> link;
bool lost_broker = false;
diff --git a/src/ipcz/node.h b/src/ipcz/node.h
index ded796d..5f36a9b 100644
--- a/src/ipcz/node.h
+++ b/src/ipcz/node.h
@@ -11,6 +11,7 @@
#include "ipcz/driver_memory.h"
#include "ipcz/ipcz.h"
#include "ipcz/link_side.h"
+#include "ipcz/node_messages.h"
#include "ipcz/node_name.h"
#include "third_party/abseil-cpp/absl/container/flat_hash_map.h"
#include "third_party/abseil-cpp/absl/container/flat_hash_set.h"
@@ -146,6 +147,13 @@
// the broker could not satisfy the request.
bool CancelIntroduction(const NodeName& name);
+ // Relays a message to its destination on behalf of `from_node`.
+ bool RelayMessage(const NodeName& from_node, msg::RelayMessage& relay);
+
+ // Attempts to dispatch a relayed message from the broker as if it came from
+ // the relay source directly.
+ bool AcceptRelayedMessage(msg::AcceptRelayedMessage& accept);
+
// Drops this node's link to the named node, if one exists.
void DropLink(const NodeName& name);
diff --git a/src/ipcz/node_link.cc b/src/ipcz/node_link.cc
index f84033d..1a09862 100644
--- a/src/ipcz/node_link.cc
+++ b/src/ipcz/node_link.cc
@@ -8,6 +8,7 @@
#include <atomic>
#include <cstddef>
#include <cstdint>
+#include <cstring>
#include <limits>
#include <utility>
@@ -20,6 +21,7 @@
#include "ipcz/node.h"
#include "ipcz/node_link_memory.h"
#include "ipcz/node_messages.h"
+#include "ipcz/parcel.h"
#include "ipcz/portal.h"
#include "ipcz/remote_router_link.h"
#include "ipcz/router.h"
@@ -239,6 +241,55 @@
Transmit(request);
}
+void NodeLink::RelayMessage(const NodeName& to_node, Message& message) {
+ ABSL_ASSERT(remote_node_type_ == Node::Type::kBroker);
+
+ msg::RelayMessage relay;
+ relay.params().destination = to_node;
+ relay.params().data =
+ relay.AllocateArray<uint8_t>(message.data_view().size());
+ memcpy(relay.GetArrayData(relay.params().data), message.data_view().data(),
+ message.data_view().size());
+ relay.params().driver_objects =
+ relay.AppendDriverObjects(message.driver_objects());
+ Transmit(relay);
+}
+
+bool NodeLink::DispatchRelayedMessage(msg::AcceptRelayedMessage& accept) {
+ // We only allow a limited subset of messages to be relayed through a broker.
+ // Namely, any message which might carry driver objects between two
+ // non-brokers needs to be relayable.
+ //
+ // If an unknown or unsupported message type is relayed it's silently
+ // discarded, rather than being rejected as a validation failure. This leaves
+ // open the possibility for newer versions of a message to introduce driver
+ // objects and support relaying.
+ absl::Span<uint8_t> data = accept.GetArrayView<uint8_t>(accept.params().data);
+ absl::Span<DriverObject> objects = accept.driver_objects();
+ if (data.size() < sizeof(internal::MessageHeaderV0)) {
+ return false;
+ }
+ const uint8_t message_id =
+ reinterpret_cast<internal::MessageHeaderV0*>(data.data())->message_id;
+ switch (message_id) {
+ case msg::AcceptParcelDriverObjects::kId: {
+ msg::AcceptParcelDriverObjects accept_parcel;
+ return accept_parcel.DeserializeRelayed(data, objects) &&
+ OnAcceptParcelDriverObjects(accept_parcel);
+ }
+
+ case msg::AddBlockBuffer::kId: {
+ msg::AddBlockBuffer add;
+ return add.DeserializeRelayed(data, objects) && OnAddBlockBuffer(add);
+ }
+
+ default:
+ DVLOG(4) << "Ignoring relayed message with ID "
+ << static_cast<int>(message_id);
+ return true;
+ }
+}
+
void NodeLink::Deactivate() {
{
absl::MutexLock lock(&mutex_);
@@ -257,9 +308,13 @@
if (!message.CanTransmitOn(*transport_)) {
// The driver has indicated that it can't transmit this message through our
// transport, so the message must instead be relayed through a broker.
- //
- // TODO: Broker relaying not yet implemented.
- ABSL_ASSERT(false);
+ auto broker = node_->GetBrokerLink();
+ if (!broker) {
+ DLOG(ERROR) << "Cannot relay message without a broker link";
+ return;
+ }
+
+ broker->RelayMessage(remote_node_name_, message);
return;
}
@@ -347,6 +402,7 @@
// until any deserialized objects are stored in a new Parcel object. This
// ensures that they're properly cleaned up before we return.
bool parcel_valid = true;
+ bool is_split_parcel = false;
std::vector<Ref<APIObject>> objects(handle_types.size());
for (size_t i = 0; i < handle_types.size(); ++i) {
switch (handle_types[i]) {
@@ -377,6 +433,11 @@
break;
}
+ case HandleType::kRelayedBox: {
+ is_split_parcel = true;
+ break;
+ }
+
default:
parcel_valid = false;
break;
@@ -399,25 +460,22 @@
parcel.SetInlinedData(
std::vector<uint8_t>(parcel_data.begin(), parcel_data.end()));
- const absl::optional<Sublink> sublink = GetSublink(for_sublink);
- if (!sublink) {
- DVLOG(4) << "Dropping " << parcel.Describe() << " at "
- << local_node_name_.ToString() << ", arriving from "
- << remote_node_name_.ToString() << " via unknown sublink "
- << for_sublink;
- return true;
+ if (is_split_parcel) {
+ return AcceptParcelWithoutDriverObjects(for_sublink, parcel);
}
- const LinkType link_type = sublink->router_link->GetType();
- if (link_type.is_outward()) {
- DVLOG(4) << "Accepting inbound " << parcel.Describe() << " at "
- << sublink->router_link->Describe();
- return sublink->receiver->AcceptInboundParcel(parcel);
- }
+ return AcceptCompleteParcel(for_sublink, parcel);
+}
- ABSL_ASSERT(link_type.is_peripheral_inward());
- DVLOG(4) << "Accepting outbound " << parcel.Describe() << " at "
- << sublink->router_link->Describe();
- return sublink->receiver->AcceptOutboundParcel(parcel);
+bool NodeLink::OnAcceptParcelDriverObjects(
+ msg::AcceptParcelDriverObjects& accept) {
+ Parcel parcel(accept.params().sequence_number);
+ std::vector<Ref<APIObject>> objects;
+ objects.reserve(accept.driver_objects().size());
+ for (auto& object : accept.driver_objects()) {
+ objects.push_back(MakeRefCounted<Box>(std::move(object)));
+ }
+ parcel.SetObjects(std::move(objects));
+ return AcceptParcelDriverObjects(accept.params().sublink, parcel);
}
bool NodeLink::OnRouteClosed(msg::RouteClosed& route_closed) {
@@ -578,6 +636,22 @@
return true;
}
+bool NodeLink::OnRelayMessage(msg::RelayMessage& relay) {
+ if (node_->type() != Node::Type::kBroker) {
+ return false;
+ }
+
+ return node_->RelayMessage(remote_node_name_, relay);
+}
+
+bool NodeLink::OnAcceptRelayedMessage(msg::AcceptRelayedMessage& accept) {
+ if (remote_node_type_ != Node::Type::kBroker) {
+ return false;
+ }
+
+ return node_->AcceptRelayedMessage(accept);
+}
+
void NodeLink::OnTransportError() {
SublinkMap sublinks;
{
@@ -596,6 +670,100 @@
node_->DropLink(remote_node_name_);
}
+bool NodeLink::AcceptParcelWithoutDriverObjects(SublinkId for_sublink,
+ Parcel& parcel) {
+ const auto key = std::make_tuple(for_sublink, parcel.sequence_number());
+ Parcel parcel_with_driver_objects;
+ {
+ absl::MutexLock lock(&mutex_);
+ auto [it, inserted] = partial_parcels_.try_emplace(key, std::move(parcel));
+ if (inserted) {
+ return true;
+ }
+
+ parcel_with_driver_objects = std::move(it->second);
+ partial_parcels_.erase(it);
+ }
+
+ return AcceptSplitParcel(for_sublink, parcel, parcel_with_driver_objects);
+}
+
+bool NodeLink::AcceptParcelDriverObjects(SublinkId for_sublink,
+ Parcel& parcel) {
+ const auto key = std::make_tuple(for_sublink, parcel.sequence_number());
+ Parcel parcel_without_driver_objects;
+ {
+ absl::MutexLock lock(&mutex_);
+ auto [it, inserted] = partial_parcels_.try_emplace(key, std::move(parcel));
+ if (inserted) {
+ return true;
+ }
+
+ parcel_without_driver_objects = std::move(it->second);
+ partial_parcels_.erase(it);
+ }
+
+ return AcceptSplitParcel(for_sublink, parcel_without_driver_objects, parcel);
+}
+
+bool NodeLink::AcceptSplitParcel(SublinkId for_sublink,
+ Parcel& parcel_without_driver_objects,
+ Parcel& parcel_with_driver_objects) {
+ // The parcel with no driver objects should still have an object attachemnt
+ // slot reserved for every relayed driver object.
+ if (parcel_without_driver_objects.num_objects() <
+ parcel_with_driver_objects.num_objects()) {
+ return false;
+ }
+
+ // Fill in all the object gaps in the data-only parcel with the boxed objects
+ // from the driver objects parcel.
+ Parcel& complete_parcel = parcel_without_driver_objects;
+ auto remaining_driver_objects = parcel_with_driver_objects.objects_view();
+ for (auto& object : complete_parcel.objects_view()) {
+ if (object) {
+ continue;
+ }
+
+ if (remaining_driver_objects.empty()) {
+ return false;
+ }
+
+ object = std::move(remaining_driver_objects[0]);
+ remaining_driver_objects.remove_prefix(1);
+ }
+
+ // At least one driver object was unclaimed by the data half of the parcel.
+ // That's not right.
+ if (!remaining_driver_objects.empty()) {
+ return false;
+ }
+
+ return AcceptCompleteParcel(for_sublink, complete_parcel);
+}
+
+bool NodeLink::AcceptCompleteParcel(SublinkId for_sublink, Parcel& parcel) {
+ const absl::optional<Sublink> sublink = GetSublink(for_sublink);
+ if (!sublink) {
+ DVLOG(4) << "Dropping " << parcel.Describe() << " at "
+ << local_node_name_.ToString() << ", arriving from "
+ << remote_node_name_.ToString() << " via unknown sublink "
+ << for_sublink;
+ return true;
+ }
+ const LinkType link_type = sublink->router_link->GetType();
+ if (link_type.is_outward()) {
+ DVLOG(4) << "Accepting inbound " << parcel.Describe() << " at "
+ << sublink->router_link->Describe();
+ return sublink->receiver->AcceptInboundParcel(parcel);
+ }
+
+ ABSL_ASSERT(link_type.is_peripheral_inward());
+ DVLOG(4) << "Accepting outbound " << parcel.Describe() << " at "
+ << sublink->router_link->Describe();
+ return sublink->receiver->AcceptOutboundParcel(parcel);
+}
+
NodeLink::Sublink::Sublink(Ref<RemoteRouterLink> router_link,
Ref<Router> receiver)
: router_link(std::move(router_link)), receiver(std::move(receiver)) {}
diff --git a/src/ipcz/node_link.h b/src/ipcz/node_link.h
index 76003be..0ba40fa 100644
--- a/src/ipcz/node_link.h
+++ b/src/ipcz/node_link.h
@@ -30,6 +30,7 @@
namespace ipcz {
class Message;
+class Parcel;
class RemoteRouterLink;
class Router;
@@ -166,6 +167,17 @@
using RequestMemoryCallback = std::function<void(DriverMemory)>;
void RequestMemory(size_t size, RequestMemoryCallback callback);
+ // Asks the remote node (which must be a broker) to relay `message` over to
+ // `to_node`. This is used to transmit driver objects between non-broker nodes
+ // whenever direct transmission is unsupported by the driver.
+ void RelayMessage(const NodeName& to_node, Message& message);
+
+ // Simulates receipt of a new message from the remote node on this link. This
+ // is called by the local Node with a message that was relayed to it by its
+ // broker. All relayed messages land on their destination node through this
+ // method.
+ bool DispatchRelayedMessage(msg::AcceptRelayedMessage& relay);
+
// Permanently deactivates this NodeLink. Once this call returns the NodeLink
// will no longer receive transport messages. It may still be used to transmit
// outgoing messages, but it cannot be reactivated. Transmissions over a
@@ -208,6 +220,8 @@
bool OnRejectIntroduction(msg::RejectIntroduction& reject) override;
bool OnAddBlockBuffer(msg::AddBlockBuffer& add) override;
bool OnAcceptParcel(msg::AcceptParcel& accept) override;
+ bool OnAcceptParcelDriverObjects(
+ msg::AcceptParcelDriverObjects& accept) override;
bool OnRouteClosed(msg::RouteClosed& route_closed) override;
bool OnRouteDisconnected(msg::RouteDisconnected& route_disconnected) override;
bool OnBypassPeer(msg::BypassPeer& bypass) override;
@@ -219,8 +233,17 @@
bool OnFlushRouter(msg::FlushRouter& flush) override;
bool OnRequestMemory(msg::RequestMemory& request) override;
bool OnProvideMemory(msg::ProvideMemory& provide) override;
+ bool OnRelayMessage(msg::RelayMessage& relay) override;
+ bool OnAcceptRelayedMessage(msg::AcceptRelayedMessage& accept) override;
void OnTransportError() override;
+ bool AcceptParcelWithoutDriverObjects(SublinkId for_sublink, Parcel& parcel);
+ bool AcceptParcelDriverObjects(SublinkId for_sublink, Parcel& parcel);
+ bool AcceptSplitParcel(SublinkId for_sublink,
+ Parcel& parcel_without_driver_objects,
+ Parcel& parcel_with_driver_objects);
+ bool AcceptCompleteParcel(SublinkId for_sublink, Parcel& parcel);
+
const Ref<Node> node_;
const LinkSide link_side_;
const NodeName local_node_name_;
@@ -249,6 +272,12 @@
using MemoryRequestMap =
absl::flat_hash_map<uint32_t, std::list<RequestMemoryCallback>>;
MemoryRequestMap pending_memory_requests_ ABSL_GUARDED_BY(mutex_);
+
+ // Tracks partially received contents of split parcels so they can be
+ // reconstructed for dispatch.
+ using PartialParcelKey = std::tuple<SublinkId, SequenceNumber>;
+ using PartialParcelMap = absl::flat_hash_map<PartialParcelKey, Parcel>;
+ PartialParcelMap partial_parcels_ ABSL_GUARDED_BY(mutex_);
};
} // namespace ipcz
diff --git a/src/ipcz/node_messages_generator.h b/src/ipcz/node_messages_generator.h
index afdaa77..36f6d6f 100644
--- a/src/ipcz/node_messages_generator.h
+++ b/src/ipcz/node_messages_generator.h
@@ -140,6 +140,25 @@
IPCZ_MSG_PARAM_DRIVER_OBJECT_ARRAY(driver_objects)
IPCZ_MSG_END()
+// Conveys partial parcel contents, namely just its attached driver objects.
+// When a parcel with driver objects cannot be transmitted directly to its
+// destination, this message is split off and relayed through the broker while
+// the rest of the parcel contents are sent directly, without the objects
+// attached. The receiving node can reconstitute the full parcel once both
+// messages are received.
+IPCZ_MSG_BEGIN(AcceptParcelDriverObjects, IPCZ_MSG_ID(21), IPCZ_MSG_VERSION(0))
+ // The SublinkId linking the source and destination Routers along the
+ // transmitting NodeLink.
+ IPCZ_MSG_PARAM(SublinkId, sublink)
+
+ // The SequenceNumber of this parcel within the transmitting portal's outbound
+ // parcel sequence (and the receiving portal's inbound parcel sequence.)
+ IPCZ_MSG_PARAM(SequenceNumber, sequence_number)
+
+ // The driver objects to be accepted.
+ IPCZ_MSG_PARAM_DRIVER_OBJECT_ARRAY(driver_objects)
+IPCZ_MSG_END()
+
// Notifies a node that the route has been closed on one side. This message
// always pertains to the side of the route opposite of the router receiving it,
// guaranteed by the fact that the closed side of the route only transmits this
@@ -368,4 +387,31 @@
IPCZ_MSG_PARAM_DRIVER_OBJECT(buffer)
IPCZ_MSG_END()
+// Sends a message payload to the broker to be relayed to another node. Used to
+// relay messages which carry driver objects through the broker when they cannot
+// be transmitted directly between their source and destination nodes.
+IPCZ_MSG_BEGIN(RelayMessage, IPCZ_MSG_ID(66), IPCZ_MSG_VERSION(0))
+ // The node to which this message's contents should ultimately be relayed.
+ IPCZ_MSG_PARAM(NodeName, destination)
+
+ // The actual serialized message to be relayed, including its own header.
+ IPCZ_MSG_PARAM_ARRAY(uint8_t, data)
+
+ // The set of driver objects to be relayed along with `data`.
+ IPCZ_MSG_PARAM_DRIVER_OBJECT_ARRAY(driver_objects)
+IPCZ_MSG_END()
+
+// Relays a message payload from an intermediate broker to its destination. This
+// is the continuation of RelayMessage above. Must only be accepted on a broker.
+IPCZ_MSG_BEGIN(AcceptRelayedMessage, IPCZ_MSG_ID(67), IPCZ_MSG_VERSION(0))
+ // The node which originally requested that the broker relay this message.
+ IPCZ_MSG_PARAM(NodeName, source)
+
+ // The full serialized data of the relayed message.
+ IPCZ_MSG_PARAM_ARRAY(uint8_t, data)
+
+ // The set of driver objects relayed along with `data`.
+ IPCZ_MSG_PARAM_DRIVER_OBJECT_ARRAY(driver_objects)
+IPCZ_MSG_END()
+
IPCZ_MSG_END_INTERFACE()
diff --git a/src/ipcz/remote_router_link.cc b/src/ipcz/remote_router_link.cc
index 4602db6..dd1ba21 100644
--- a/src/ipcz/remote_router_link.cc
+++ b/src/ipcz/remote_router_link.cc
@@ -113,6 +113,7 @@
size_t num_portals = 0;
absl::InlinedVector<DriverObject, 2> driver_objects;
+ bool must_relay_driver_objects = false;
for (Ref<APIObject>& object : objects) {
switch (object->object_type()) {
case APIObject::kPortal:
@@ -122,10 +123,9 @@
case APIObject::kBox: {
Box* box = Box::FromObject(object.get());
ABSL_ASSERT(box);
-
- // TODO: Support object relay when direct transmission is impossible.
- ABSL_ASSERT(box->object().CanTransmitOn(*node_link()->transport()));
-
+ if (!box->object().CanTransmitOn(*node_link()->transport())) {
+ must_relay_driver_objects = true;
+ }
driver_objects.push_back(std::move(box->object()));
break;
}
@@ -135,6 +135,20 @@
}
}
+ // If driver objects will require relaying through the broker, then the parcel
+ // must be split into two separate messages: one for the driver objects (which
+ // will be relayed), and one for the rest of the message (which will transmit
+ // directly).
+ //
+ // This ensures that many side effects of message receipt are well-ordered
+ // with other transmissions on the same link from the same thread. Namely,
+ // since a thread may send a message which introduces a new remote Router on a
+ // new sublink, followed immediately by a message which targets that Router,
+ // it is critical that both messages arrive in the order they were sent. If
+ // one of the messages is relayed while the other is not, ordering could not
+ // be guaranteed.
+ const bool must_split_parcel = must_relay_driver_objects;
+
// Allocate all the arrays in the message. Note that each allocation may
// relocate the parcel data in memory, so views into these arrays should not
// be acquired until all allocations are complete.
@@ -174,7 +188,8 @@
}
case APIObject::kBox:
- handle_types[i] = HandleType::kBox;
+ handle_types[i] =
+ must_split_parcel ? HandleType::kRelayedBox : HandleType::kBox;
break;
default:
@@ -183,8 +198,20 @@
}
}
- accept.params().driver_objects =
- accept.AppendDriverObjects(absl::MakeSpan(driver_objects));
+ if (must_split_parcel) {
+ msg::AcceptParcelDriverObjects accept_objects;
+ accept_objects.params().sublink = sublink_;
+ accept_objects.params().sequence_number = parcel.sequence_number();
+ accept_objects.params().driver_objects =
+ accept_objects.AppendDriverObjects(absl::MakeSpan(driver_objects));
+
+ DVLOG(4) << "Transmitting objects for " << parcel.Describe() << " over "
+ << Describe();
+ node_link()->Transmit(accept_objects);
+ } else {
+ accept.params().driver_objects =
+ accept.AppendDriverObjects(absl::MakeSpan(driver_objects));
+ }
DVLOG(4) << "Transmitting " << parcel.Describe() << " over " << Describe();
diff --git a/src/reference_drivers/async_reference_driver.cc b/src/reference_drivers/async_reference_driver.cc
index 647e423..3ebbe8e 100644
--- a/src/reference_drivers/async_reference_driver.cc
+++ b/src/reference_drivers/async_reference_driver.cc
@@ -30,10 +30,27 @@
// thread for each transport.
class AsyncTransport : public ObjectImpl<AsyncTransport, Object::kTransport> {
public:
+ enum class NodeType {
+ kBroker,
+ kNonBroker,
+ };
+
+ struct TransportType {
+ NodeType local;
+ NodeType remote;
+ };
+
+ explicit AsyncTransport(const TransportType& type) : type_(type) {}
+
+ NodeType local_type() const { return type_.local; }
+ NodeType remote_type() const { return type_.remote; }
+
using Pair = std::pair<Ref<AsyncTransport>, Ref<AsyncTransport>>;
- static Pair CreatePair() {
- Pair pair{MakeRefCounted<AsyncTransport>(),
- MakeRefCounted<AsyncTransport>()};
+ static Pair CreatePair(NodeType node0_type, NodeType node1_type) {
+ Pair pair{MakeRefCounted<AsyncTransport>(
+ TransportType{.local = node0_type, .remote = node1_type}),
+ MakeRefCounted<AsyncTransport>(
+ TransportType{.local = node1_type, .remote = node0_type})};
std::tie(pair.second->peer_, pair.first->peer_) = pair;
return pair;
}
@@ -164,6 +181,8 @@
}
}
+ const TransportType type_;
+
Ref<AsyncTransport> peer_;
IpczHandle transport_ = IPCZ_INVALID_HANDLE;
IpczTransportActivityHandler handler_;
@@ -176,15 +195,18 @@
std::make_unique<absl::Notification>();
};
-IpczResult IPCZ_API CreateTransports(IpczDriverHandle,
- IpczDriverHandle,
+IpczResult IPCZ_API CreateTransports(IpczDriverHandle transport0,
+ IpczDriverHandle transport1,
uint32_t,
const void*,
- IpczDriverHandle* transport0,
- IpczDriverHandle* transport1) {
- auto [first, second] = AsyncTransport::CreatePair();
- *transport0 = Object::ReleaseAsHandle(std::move(first));
- *transport1 = Object::ReleaseAsHandle(std::move(second));
+ IpczDriverHandle* new_transport0,
+ IpczDriverHandle* new_transport1) {
+ auto* target0 = AsyncTransport::FromHandle(transport0);
+ auto* target1 = AsyncTransport::FromHandle(transport1);
+ auto [first, second] = AsyncTransport::CreatePair(target0->remote_type(),
+ target1->remote_type());
+ *new_transport0 = Object::ReleaseAsHandle(std::move(first));
+ *new_transport1 = Object::ReleaseAsHandle(std::move(second));
return IPCZ_RESULT_OK;
}
@@ -216,6 +238,29 @@
return IPCZ_RESULT_OK;
}
+IpczResult IPCZ_API SerializeWithForcedBrokering(IpczDriverHandle handle,
+ IpczDriverHandle transport,
+ uint32_t flags,
+ const void* options,
+ void* data,
+ size_t* num_bytes,
+ IpczDriverHandle* handles,
+ size_t* num_handles) {
+ auto* target = AsyncTransport::FromHandle(transport);
+ if (!target) {
+ return IPCZ_RESULT_ABORTED;
+ }
+
+ if (target->local_type() == AsyncTransport::NodeType::kNonBroker &&
+ target->remote_type() == AsyncTransport::NodeType::kNonBroker) {
+ // Force ipcz to relay driver objects through a broker.
+ return IPCZ_RESULT_PERMISSION_DENIED;
+ }
+
+ return kSingleProcessReferenceDriverBase.Serialize(
+ handle, transport, flags, options, data, num_bytes, handles, num_handles);
+}
+
} // namespace
// Note that this driver inherits most of its implementation from the baseline
@@ -236,4 +281,29 @@
kSingleProcessReferenceDriverBase.GenerateRandomBytes,
};
+const IpczDriver kAsyncReferenceDriverWithForcedBrokering = {
+ sizeof(kAsyncReferenceDriverWithForcedBrokering),
+ kSingleProcessReferenceDriverBase.Close,
+ SerializeWithForcedBrokering,
+ kSingleProcessReferenceDriverBase.Deserialize,
+ CreateTransports,
+ ActivateTransport,
+ DeactivateTransport,
+ Transmit,
+ kSingleProcessReferenceDriverBase.AllocateSharedMemory,
+ kSingleProcessReferenceDriverBase.GetSharedMemoryInfo,
+ kSingleProcessReferenceDriverBase.DuplicateSharedMemory,
+ kSingleProcessReferenceDriverBase.MapSharedMemory,
+ kSingleProcessReferenceDriverBase.GenerateRandomBytes,
+};
+
+AsyncTransportPair CreateAsyncTransportPair() {
+ AsyncTransport::Pair transports = AsyncTransport::CreatePair(
+ AsyncTransport::NodeType::kBroker, AsyncTransport::NodeType::kNonBroker);
+ return {
+ .broker = Object::ReleaseAsHandle(std::move(transports.first)),
+ .non_broker = Object::ReleaseAsHandle(std::move(transports.second)),
+ };
+}
+
} // namespace ipcz::reference_drivers
diff --git a/src/reference_drivers/async_reference_driver.h b/src/reference_drivers/async_reference_driver.h
index e596e60..2371be1 100644
--- a/src/reference_drivers/async_reference_driver.h
+++ b/src/reference_drivers/async_reference_driver.h
@@ -15,6 +15,19 @@
// production driver, without the complexity of a multiprocess environment.
extern const IpczDriver kAsyncReferenceDriver;
+// Mostly the same as kAsyncReferenceDriver, but rejects direct transmission of
+// driver handles between non-broker nodes. This forces ipcz to relay such
+// messages through the broker.
+extern const IpczDriver kAsyncReferenceDriverWithForcedBrokering;
+
+// Creates a new pair of async transport endpoints, one for a broker and one
+// for a non-broker.
+struct AsyncTransportPair {
+ IpczDriverHandle broker;
+ IpczDriverHandle non_broker;
+};
+AsyncTransportPair CreateAsyncTransportPair();
+
} // namespace ipcz::reference_drivers
#endif // IPCZ_SRC_DRIVERS_ASYNC_REFERENCE_DRIVER_H_
diff --git a/src/test/multinode_test.cc b/src/test/multinode_test.cc
index 9455476..dd9345c 100644
--- a/src/test/multinode_test.cc
+++ b/src/test/multinode_test.cc
@@ -121,6 +121,12 @@
case DriverMode::kAsyncDelegatedAlloc:
return reference_drivers::kAsyncReferenceDriver;
+ case DriverMode::kAsyncObjectBrokering:
+ return reference_drivers::kAsyncReferenceDriverWithForcedBrokering;
+
+ case DriverMode::kAsyncObjectBrokeringAndDelegatedAlloc:
+ return reference_drivers::kAsyncReferenceDriverWithForcedBrokering;
+
#if BUILDFLAG(ENABLE_IPCZ_MULTIPROCESS_TESTS)
case DriverMode::kMultiprocess:
return reference_drivers::kMultiprocessReferenceDriver;
@@ -146,7 +152,8 @@
void TestNode::ConnectToBroker(absl::Span<IpczHandle> portals) {
uint32_t flags = IPCZ_CONNECT_NODE_TO_BROKER;
- if (driver_mode_ == DriverMode::kAsyncDelegatedAlloc) {
+ if (driver_mode_ == DriverMode::kAsyncDelegatedAlloc ||
+ driver_mode_ == DriverMode::kAsyncObjectBrokeringAndDelegatedAlloc) {
flags |= IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE;
}
IpczDriverHandle transport =
@@ -222,6 +229,18 @@
}
TestNode::TransportPair TestNode::CreateTransports() {
+ if (driver_mode_ == DriverMode::kAsync ||
+ driver_mode_ == DriverMode::kAsyncDelegatedAlloc ||
+ driver_mode_ == DriverMode::kAsyncObjectBrokering ||
+ driver_mode_ == DriverMode::kAsyncObjectBrokeringAndDelegatedAlloc) {
+ reference_drivers::AsyncTransportPair transports =
+ reference_drivers::CreateAsyncTransportPair();
+ return {
+ .ours = transports.broker,
+ .theirs = transports.non_broker,
+ };
+ }
+
TransportPair transports;
const IpczResult result = GetDriver().CreateTransports(
IPCZ_INVALID_DRIVER_HANDLE, IPCZ_INVALID_DRIVER_HANDLE, IPCZ_NO_FLAGS,
diff --git a/src/test/multinode_test.h b/src/test/multinode_test.h
index 6f08423..75e3f4e 100644
--- a/src/test/multinode_test.h
+++ b/src/test/multinode_test.h
@@ -303,12 +303,14 @@
#endif
// TODO: Add other DriverMode enumerators here as support is landed.
-#define INSTANTIATE_MULTINODE_TEST_SUITE_P(suite) \
- INSTANTIATE_TEST_SUITE_P( \
- , suite, \
- ::testing::Values(ipcz::test::DriverMode::kSync, \
- ipcz::test::DriverMode::kAsync, \
- ipcz::test::DriverMode::kAsyncDelegatedAlloc \
- IPCZ_EXTRA_DRIVER_MODES))
+#define INSTANTIATE_MULTINODE_TEST_SUITE_P(suite) \
+ INSTANTIATE_TEST_SUITE_P( \
+ , suite, \
+ ::testing::Values( \
+ ipcz::test::DriverMode::kSync, ipcz::test::DriverMode::kAsync, \
+ ipcz::test::DriverMode::kAsyncDelegatedAlloc, \
+ ipcz::test::DriverMode::kAsyncObjectBrokering, \
+ ipcz::test::DriverMode::kAsyncObjectBrokeringAndDelegatedAlloc \
+ IPCZ_EXTRA_DRIVER_MODES))
#endif // IPCZ_SRC_TEST_MULTINODE_TEST_H_