ipcz: Implement MergePortals API
This introduces the concept of a "bridge" link which can be used to
splice together two terminal routers from separate routes. Bridges
serve as proxying links, such that the other terminal routers from
both routes are effectively linked together by a continuous single
route.
Once a merged route is sufficiently reduced such that a bridge link is
neighbored on either side by two stable central links, those
links can be locked and, along with the bridge itself, bypassed with a
single new central link via a process which closely mirrors that
of normal proxy bypass.
This API will be used to support Mojo message pipe fusion, a
rarely used Mojo feature which Content and other systems have
come to rely upon.
Bug: 1299283
Change-Id: I7b66507b5fb1891c23b2b7f37d87abad34327824
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3803590
Commit-Queue: Ken Rockot <rockot@google.com>
Reviewed-by: Robert Sesek <rsesek@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1034325}
NOKEYCHECK=True
GitOrigin-RevId: 6fecc4bb86caf6f90815d8c95849d65a7cbf3514
diff --git a/src/BUILD.gn b/src/BUILD.gn
index f689d5b..4fa9f4f 100644
--- a/src/BUILD.gn
+++ b/src/BUILD.gn
@@ -381,6 +381,7 @@
"ipcz/route_edge_test.cc",
"ipcz/router_link_test.cc",
"ipcz/sequenced_queue_test.cc",
+ "merge_portals_test.cc",
"queueing_test.cc",
"reference_drivers/sync_reference_driver_test.cc",
"remote_portal_test.cc",
diff --git a/src/api.cc b/src/api.cc
index ff582ac..56ef2bf 100644
--- a/src/api.cc
+++ b/src/api.cc
@@ -109,7 +109,22 @@
IpczHandle portal1,
uint32_t flags,
const void* options) {
- return IPCZ_RESULT_UNIMPLEMENTED;
+ ipcz::Portal* first = ipcz::Portal::FromHandle(portal0);
+ ipcz::Portal* second = ipcz::Portal::FromHandle(portal1);
+ if (!first || !second) {
+ return IPCZ_RESULT_INVALID_ARGUMENT;
+ }
+
+ ipcz::Ref<ipcz::Portal> one(ipcz::RefCounted::kAdoptExistingRef, first);
+ ipcz::Ref<ipcz::Portal> two(ipcz::RefCounted::kAdoptExistingRef, second);
+ IpczResult result = one->Merge(*two);
+ if (result != IPCZ_RESULT_OK) {
+ one.release();
+ two.release();
+ return result;
+ }
+
+ return IPCZ_RESULT_OK;
}
IpczResult QueryPortalStatus(IpczHandle portal_handle,
diff --git a/src/api_test.cc b/src/api_test.cc
index ce83991..84955ae 100644
--- a/src/api_test.cc
+++ b/src/api_test.cc
@@ -18,10 +18,6 @@
TEST_F(APITest, Unimplemented) {
EXPECT_EQ(IPCZ_RESULT_UNIMPLEMENTED,
- ipcz().MergePortals(IPCZ_INVALID_HANDLE, IPCZ_INVALID_HANDLE,
- IPCZ_NO_FLAGS, nullptr));
-
- EXPECT_EQ(IPCZ_RESULT_UNIMPLEMENTED,
ipcz().BeginPut(IPCZ_INVALID_HANDLE, IPCZ_NO_FLAGS, nullptr,
nullptr, nullptr));
EXPECT_EQ(IPCZ_RESULT_UNIMPLEMENTED,
@@ -188,6 +184,62 @@
CloseAll({a, node});
}
+TEST_F(APITest, MergePortalsFailure) {
+ const IpczHandle node = CreateNode(kDefaultDriver);
+ auto [a, b] = OpenPortals(node);
+
+ // Invalid portal handles.
+ EXPECT_EQ(
+ IPCZ_RESULT_INVALID_ARGUMENT,
+ ipcz().MergePortals(a, IPCZ_INVALID_HANDLE, IPCZ_NO_FLAGS, nullptr));
+ EXPECT_EQ(
+ IPCZ_RESULT_INVALID_ARGUMENT,
+ ipcz().MergePortals(IPCZ_INVALID_HANDLE, a, IPCZ_NO_FLAGS, nullptr));
+ EXPECT_EQ(IPCZ_RESULT_INVALID_ARGUMENT,
+ ipcz().MergePortals(IPCZ_INVALID_HANDLE, IPCZ_INVALID_HANDLE,
+ IPCZ_NO_FLAGS, nullptr));
+
+ // Can't merge into own peer.
+ EXPECT_EQ(IPCZ_RESULT_INVALID_ARGUMENT,
+ ipcz().MergePortals(a, b, IPCZ_NO_FLAGS, nullptr));
+
+ // Can't merge into self.
+ EXPECT_EQ(IPCZ_RESULT_INVALID_ARGUMENT,
+ ipcz().MergePortals(a, a, IPCZ_NO_FLAGS, nullptr));
+
+ auto [c, d] = OpenPortals(node);
+
+ // Can't merge a portal that's had parcels put into it.
+ EXPECT_EQ(IPCZ_RESULT_OK, Put(c, "!"));
+ EXPECT_EQ(IPCZ_RESULT_FAILED_PRECONDITION,
+ ipcz().MergePortals(a, c, IPCZ_NO_FLAGS, nullptr));
+
+ // Can't merge a portal that's had parcels retrieved from it.
+ std::string message;
+ EXPECT_EQ(IPCZ_RESULT_OK, Get(d, &message));
+ EXPECT_EQ(IPCZ_RESULT_FAILED_PRECONDITION,
+ ipcz().MergePortals(a, d, IPCZ_NO_FLAGS, nullptr));
+
+ CloseAll({a, b, c, d, node});
+}
+
+TEST_F(APITest, MergePortals) {
+ const IpczHandle node = CreateNode(kDefaultDriver);
+ auto [a, b] = OpenPortals(node);
+ auto [c, d] = OpenPortals(node);
+
+ EXPECT_EQ(IPCZ_RESULT_OK, Put(a, "!"));
+ EXPECT_EQ(IPCZ_RESULT_OK, ipcz().MergePortals(b, c, IPCZ_NO_FLAGS, nullptr));
+
+ // The message from `a` should be routed to `d`, since `b` and `c` have been
+ // merged.
+ std::string message;
+ EXPECT_EQ(IPCZ_RESULT_OK, Get(d, &message));
+ EXPECT_EQ("!", message);
+
+ CloseAll({a, d, node});
+}
+
TEST_F(APITest, PutGet) {
const IpczHandle node = CreateNode(kDefaultDriver);
auto [a, b] = OpenPortals(node);
diff --git a/src/ipcz/local_router_link.cc b/src/ipcz/local_router_link.cc
index d5a794f..e675a57 100644
--- a/src/ipcz/local_router_link.cc
+++ b/src/ipcz/local_router_link.cc
@@ -110,7 +110,12 @@
void LocalRouterLink::AcceptParcel(Parcel& parcel) {
if (Ref<Router> receiver = state_->GetRouter(side_.opposite())) {
- receiver->AcceptInboundParcel(parcel);
+ if (state_->type() == LinkType::kCentral) {
+ receiver->AcceptInboundParcel(parcel);
+ } else {
+ ABSL_ASSERT(state_->type() == LinkType::kBridge);
+ receiver->AcceptOutboundParcel(parcel);
+ }
}
}
diff --git a/src/ipcz/portal.cc b/src/ipcz/portal.cc
index affabb3..cb494aa 100644
--- a/src/ipcz/portal.cc
+++ b/src/ipcz/portal.cc
@@ -68,6 +68,10 @@
return IPCZ_RESULT_OK;
}
+IpczResult Portal::Merge(Portal& other) {
+ return router_->MergeRoute(other.router());
+}
+
IpczResult Portal::Put(absl::Span<const uint8_t> data,
absl::Span<const IpczHandle> handles,
const IpczPutLimits* limits) {
diff --git a/src/ipcz/portal.h b/src/ipcz/portal.h
index 8ca9cfc..df1fad2 100644
--- a/src/ipcz/portal.h
+++ b/src/ipcz/portal.h
@@ -41,6 +41,7 @@
// ipcz portal API implementation:
IpczResult QueryStatus(IpczPortalStatus& status);
+ IpczResult Merge(Portal& other);
IpczResult Put(absl::Span<const uint8_t> data,
absl::Span<const IpczHandle> handles,
diff --git a/src/ipcz/route_edge.cc b/src/ipcz/route_edge.cc
index 5c702a5..5b6a371 100644
--- a/src/ipcz/route_edge.cc
+++ b/src/ipcz/route_edge.cc
@@ -42,6 +42,14 @@
return std::move(decaying_link_);
}
+Ref<Router> RouteEdge::GetLocalPeer() {
+ return primary_link_ ? primary_link_->GetLocalPeer() : nullptr;
+}
+
+Ref<Router> RouteEdge::GetDecayingLocalPeer() {
+ return decaying_link_ ? decaying_link_->GetLocalPeer() : nullptr;
+}
+
bool RouteEdge::BeginPrimaryLinkDecay() {
if (decaying_link_ || is_decay_deferred_) {
return false;
diff --git a/src/ipcz/route_edge.h b/src/ipcz/route_edge.h
index 01079af..372ad2c 100644
--- a/src/ipcz/route_edge.h
+++ b/src/ipcz/route_edge.h
@@ -86,6 +86,14 @@
// Releases this edge's decaying link and returns a reference to it.
Ref<RouterLink> ReleaseDecayingLink();
+ // If the primary link is present and is a LocalRouterLink, this returns the
+ // Router on the other side of the link. Otherwise it returns null.
+ Ref<Router> GetLocalPeer();
+
+ // If the decaying link is present and is a LocalRouterLink, this returns the
+ // Router on the other side of the link. Otherwise it returns null.
+ Ref<Router> GetDecayingLocalPeer();
+
// Sets the current primary link to begin decay; or if there is no primary
// link yet, marks this edge for deferred decay. In the latter case, the next
// primary link adopted by this edge will immediately begin to decay. This may
diff --git a/src/ipcz/router.cc b/src/ipcz/router.cc
index c1277a4..551dfee 100644
--- a/src/ipcz/router.cc
+++ b/src/ipcz/router.cc
@@ -103,8 +103,7 @@
bool Router::HasLocalPeer(Router& router) {
absl::MutexLock lock(&mutex_);
- return outward_edge_.primary_link() &&
- outward_edge_.primary_link()->GetLocalPeer() == &router;
+ return outward_edge_.GetLocalPeer() == &router;
}
IpczResult Router::AllocateOutboundParcel(size_t num_bytes,
@@ -312,7 +311,7 @@
*inbound_parcels_.final_sequence_length() <= sequence_length;
}
- if (!inward_edge_) {
+ if (!inward_edge_ && !bridge_) {
status_.flags |= IPCZ_PORTAL_STATUS_PEER_CLOSED;
if (inbound_parcels_.IsSequenceFullyConsumed()) {
status_.flags |= IPCZ_PORTAL_STATUS_DEAD;
@@ -327,6 +326,11 @@
return outbound_parcels_.final_sequence_length().has_value() &&
*outbound_parcels_.final_sequence_length() <= sequence_length;
}
+ } else if (link_type.is_bridge()) {
+ if (!outbound_parcels_.SetFinalSequenceLength(sequence_length)) {
+ return false;
+ }
+ bridge_.reset();
}
}
@@ -356,6 +360,9 @@
if (inward_edge_) {
forwarding_links.push_back(inward_edge_->ReleasePrimaryLink());
forwarding_links.push_back(inward_edge_->ReleaseDecayingLink());
+ } else if (bridge_) {
+ forwarding_links.push_back(bridge_->ReleasePrimaryLink());
+ forwarding_links.push_back(bridge_->ReleaseDecayingLink());
} else {
// Terminal routers may have trap events to fire.
status_.flags |= IPCZ_PORTAL_STATUS_PEER_CLOSED;
@@ -514,6 +521,41 @@
return IPCZ_RESULT_OK;
}
+IpczResult Router::MergeRoute(const Ref<Router>& other) {
+ if (HasLocalPeer(*other) || other == this) {
+ return IPCZ_RESULT_INVALID_ARGUMENT;
+ }
+
+ {
+ MultiMutexLock lock(&mutex_, &other->mutex_);
+ if (inward_edge_ || other->inward_edge_ || bridge_ || other->bridge_) {
+ // It's not legal to call this on non-terminal routers.
+ return IPCZ_RESULT_INVALID_ARGUMENT;
+ }
+
+ if (inbound_parcels_.current_sequence_number() > SequenceNumber(0) ||
+ outbound_parcels_.GetCurrentSequenceLength() > SequenceNumber(0) ||
+ other->inbound_parcels_.current_sequence_number() > SequenceNumber(0) ||
+ other->outbound_parcels_.GetCurrentSequenceLength() >
+ SequenceNumber(0)) {
+ // It's not legal to call this on a router which has transmitted outbound
+ // parcels to its peer or retrieved inbound parcels from its queue.
+ return IPCZ_RESULT_FAILED_PRECONDITION;
+ }
+
+ bridge_ = std::make_unique<RouteEdge>();
+ other->bridge_ = std::make_unique<RouteEdge>();
+
+ RouterLink::Pair links = LocalRouterLink::CreatePair(
+ LinkType::kBridge, Router::Pair(WrapRefCounted(this), other));
+ bridge_->SetPrimaryLink(std::move(links.first));
+ other->bridge_->SetPrimaryLink(std::move(links.second));
+ }
+
+ Flush();
+ return IPCZ_RESULT_OK;
+}
+
// static
Ref<Router> Router::Deserialize(const RouterDescriptor& descriptor,
NodeLink& from_node_link) {
@@ -672,10 +714,9 @@
return true;
}
- if (outward_link != &requestor ||
- !outward_link->GetType().is_peripheral_outward()) {
- DLOG(ERROR) << "Rejecting RequestProxyBypass received on "
- << requestor.Describe() << " with existing "
+ if (outward_link != &requestor) {
+ DLOG(ERROR) << "Rejecting BypassPeer received on " << requestor.Describe()
+ << " with existing "
<< outward_edge_.primary_link()->Describe();
return false;
}
@@ -798,10 +839,10 @@
bool Router::StopProxying(SequenceNumber inbound_sequence_length,
SequenceNumber outbound_sequence_length) {
+ Ref<Router> bridge_peer;
{
absl::MutexLock lock(&mutex_);
- if (outward_edge_.is_stable() || !inward_edge_ ||
- inward_edge_->is_stable()) {
+ if (outward_edge_.is_stable()) {
// Proxies begin decaying their links before requesting to be bypassed,
// and they don't adopt new links after that. So if either edge is stable
// then someone is doing something wrong.
@@ -809,13 +850,50 @@
return false;
}
- inward_edge_->set_length_to_decaying_link(inbound_sequence_length);
- inward_edge_->set_length_from_decaying_link(outbound_sequence_length);
+ if (bridge_) {
+ // If we have a bridge link, we also need to update the router on the
+ // other side of the bridge.
+ bridge_peer = bridge_->GetDecayingLocalPeer();
+ if (!bridge_peer) {
+ return false;
+ }
+ } else if (!inward_edge_ || inward_edge_->is_stable()) {
+ // Not a proxy, so this request is invalid.
+ return false;
+ } else {
+ inward_edge_->set_length_to_decaying_link(inbound_sequence_length);
+ inward_edge_->set_length_from_decaying_link(outbound_sequence_length);
+ outward_edge_.set_length_to_decaying_link(outbound_sequence_length);
+ outward_edge_.set_length_from_decaying_link(inbound_sequence_length);
+ }
+ }
+
+ if (bridge_peer) {
+ MultiMutexLock lock(&mutex_, &bridge_peer->mutex_);
+ if (!bridge_ || bridge_->is_stable() || !bridge_peer->bridge_ ||
+ bridge_peer->bridge_->is_stable()) {
+ // The bridge is being or has already been torn down, so there's nothing
+ // to do here.
+ return true;
+ }
+
+ bridge_->set_length_to_decaying_link(inbound_sequence_length);
+ bridge_->set_length_from_decaying_link(outbound_sequence_length);
outward_edge_.set_length_to_decaying_link(outbound_sequence_length);
outward_edge_.set_length_from_decaying_link(inbound_sequence_length);
+ bridge_peer->bridge_->set_length_to_decaying_link(outbound_sequence_length);
+ bridge_peer->bridge_->set_length_from_decaying_link(
+ inbound_sequence_length);
+ bridge_peer->outward_edge_.set_length_to_decaying_link(
+ inbound_sequence_length);
+ bridge_peer->outward_edge_.set_length_from_decaying_link(
+ outbound_sequence_length);
}
Flush();
+ if (bridge_peer) {
+ bridge_peer->Flush();
+ }
return true;
}
@@ -841,9 +919,12 @@
bool Router::StopProxyingToLocalPeer(SequenceNumber outbound_sequence_length) {
Ref<Router> local_peer;
+ Ref<Router> bridge_peer;
{
absl::MutexLock lock(&mutex_);
- if (outward_edge_.decaying_link()) {
+ if (bridge_) {
+ bridge_peer = bridge_->GetDecayingLocalPeer();
+ } else if (outward_edge_.decaying_link()) {
local_peer = outward_edge_.decaying_link()->GetLocalPeer();
} else {
// Ignore this request if we've been unexpectedly disconnected.
@@ -851,13 +932,8 @@
}
}
- if (!local_peer) {
- // It's invalid to send call this on a Router with a non-local outward peer.
- DLOG(ERROR) << "Rejecting StopProxyingToLocalPeer with no local peer";
- return false;
- }
-
- {
+ if (local_peer && !bridge_peer) {
+ // This is the common case, with no bridge link.
MultiMutexLock lock(&mutex_, &local_peer->mutex_);
const Ref<RouterLink>& our_link = outward_edge_.decaying_link();
const Ref<RouterLink>& peer_link =
@@ -884,10 +960,48 @@
outbound_sequence_length);
outward_edge_.set_length_to_decaying_link(outbound_sequence_length);
inward_edge_->set_length_from_decaying_link(outbound_sequence_length);
+ } else if (bridge_peer) {
+ // When a bridge peer is present we actually have three local routers
+ // involved: this router, its outward peer, and its bridge peer. Both this
+ // router and the bridge peer serve as "the" proxy being bypassed in this
+ // case, so we'll be bypassing both of them below.
+ {
+ absl::MutexLock lock(&bridge_peer->mutex_);
+ if (bridge_peer->outward_edge_.is_stable()) {
+ return false;
+ }
+ local_peer = bridge_peer->outward_edge_.GetDecayingLocalPeer();
+ if (!local_peer) {
+ return false;
+ }
+ }
+
+ MultiMutexLock lock(&mutex_, &local_peer->mutex_, &bridge_peer->mutex_);
+ if (outward_edge_.is_stable() || local_peer->outward_edge_.is_stable() ||
+ bridge_peer->outward_edge_.is_stable()) {
+ return false;
+ }
+
+ local_peer->outward_edge_.set_length_from_decaying_link(
+ outbound_sequence_length);
+ outward_edge_.set_length_from_decaying_link(outbound_sequence_length);
+ bridge_->set_length_to_decaying_link(outbound_sequence_length);
+ bridge_peer->outward_edge_.set_length_to_decaying_link(
+ outbound_sequence_length);
+ bridge_peer->bridge_->set_length_from_decaying_link(
+ outbound_sequence_length);
+ } else {
+ // It's invalid to send call this on a Router with a non-local outward peer
+ // or bridge link.
+ DLOG(ERROR) << "Rejecting StopProxyingToLocalPeer with no local peer";
+ return false;
}
Flush();
local_peer->Flush();
+ if (bridge_peer) {
+ bridge_peer->Flush();
+ }
return true;
}
@@ -919,10 +1033,12 @@
void Router::Flush(FlushBehavior behavior) {
Ref<RouterLink> outward_link;
Ref<RouterLink> inward_link;
+ Ref<RouterLink> bridge_link;
Ref<RouterLink> decaying_outward_link;
Ref<RouterLink> decaying_inward_link;
Ref<RouterLink> dead_inward_link;
Ref<RouterLink> dead_outward_link;
+ Ref<RouterLink> dead_bridge_link;
absl::optional<SequenceNumber> final_inward_sequence_length;
absl::optional<SequenceNumber> final_outward_sequence_length;
bool on_central_link = false;
@@ -941,6 +1057,11 @@
decaying_inward_link =
inward_edge_ ? inward_edge_->decaying_link() : nullptr;
on_central_link = outward_link && outward_link->GetType().is_central();
+ if (bridge_) {
+ // Bridges have either a primary link or decaying link, but never both.
+ bridge_link = bridge_->primary_link() ? bridge_->primary_link()
+ : bridge_->decaying_link();
+ }
// Collect any parcels which are safe to transmit now. Note that we do not
// transmit anything or generally call into any RouterLinks while `mutex_`
@@ -978,6 +1099,14 @@
<< " received";
inward_link_decayed = true;
}
+ } else if (bridge_link) {
+ CollectParcelsToFlush(inbound_parcels_, *bridge_, parcels_to_flush);
+ }
+
+ if (bridge_ && bridge_->MaybeFinishDecay(
+ inbound_parcels_.current_sequence_number(),
+ outbound_parcels_.current_sequence_number())) {
+ bridge_.reset();
}
// If we're dropping the last of our decaying links, our outward link may
@@ -1021,6 +1150,9 @@
final_inward_sequence_length = inbound_parcels_.final_sequence_length();
if (inward_edge_) {
dead_inward_link = inward_edge_->ReleasePrimaryLink();
+ } else {
+ dead_bridge_link = std::move(bridge_link);
+ bridge_.reset();
}
}
}
@@ -1037,6 +1169,11 @@
decaying_inward_link->Deactivate();
}
+ if (bridge_link && outward_link && !inward_link && !decaying_inward_link &&
+ !decaying_outward_link) {
+ MaybeStartBridgeBypass();
+ }
+
if (dead_outward_link) {
if (final_outward_sequence_length) {
dead_outward_link->AcceptRouteClosure(*final_outward_sequence_length);
@@ -1051,6 +1188,12 @@
dead_inward_link->Deactivate();
}
+ if (dead_bridge_link) {
+ if (final_inward_sequence_length) {
+ dead_bridge_link->AcceptRouteClosure(*final_inward_sequence_length);
+ }
+ }
+
if (dead_outward_link || !on_central_link) {
// If we're not on a central link, there's no more work to do.
return;
@@ -1208,6 +1351,241 @@
return true;
}
+void Router::MaybeStartBridgeBypass() {
+ Ref<Router> first_bridge = WrapRefCounted(this);
+ Ref<Router> second_bridge;
+ {
+ absl::MutexLock lock(&mutex_);
+ if (!bridge_ || !bridge_->is_stable()) {
+ return;
+ }
+
+ second_bridge = bridge_->GetLocalPeer();
+ if (!second_bridge) {
+ return;
+ }
+ }
+
+ Ref<Router> first_local_peer;
+ Ref<Router> second_local_peer;
+ Ref<RemoteRouterLink> first_remote_link;
+ Ref<RemoteRouterLink> second_remote_link;
+ {
+ MultiMutexLock lock(&mutex_, &second_bridge->mutex_);
+ const Ref<RouterLink>& link_to_first_peer = outward_edge_.primary_link();
+ const Ref<RouterLink>& link_to_second_peer =
+ second_bridge->outward_edge_.primary_link();
+ if (!link_to_first_peer || !link_to_second_peer) {
+ return;
+ }
+
+ NodeName first_peer_node_name;
+ first_local_peer = link_to_first_peer->GetLocalPeer();
+ first_remote_link =
+ WrapRefCounted(link_to_first_peer->AsRemoteRouterLink());
+ if (first_remote_link) {
+ first_peer_node_name = first_remote_link->node_link()->remote_node_name();
+ }
+
+ NodeName second_peer_node_name;
+ second_local_peer = link_to_second_peer->GetLocalPeer();
+ second_remote_link =
+ WrapRefCounted(link_to_second_peer->AsRemoteRouterLink());
+ if (second_remote_link) {
+ second_peer_node_name =
+ second_remote_link->node_link()->remote_node_name();
+ }
+
+ if (!link_to_first_peer->TryLockForBypass(second_peer_node_name)) {
+ return;
+ }
+ if (!link_to_second_peer->TryLockForBypass(first_peer_node_name)) {
+ // Cancel the decay on this bridge's side, because we couldn't decay the
+ // other side of the bridge yet.
+ link_to_first_peer->Unlock();
+ return;
+ }
+ }
+
+ // At this point, the outward links from each bridge router have been locked
+ // for bypass. There are now three distinct cases to handle, based around
+ // where the outward peer routers are located.
+
+ // Case 1: Neither bridge router's outward peer is local to this node. This is
+ // roughly equivalent to the normal proxy bypass case where the proxy belongs
+ // to a different node from its inward and outward peers. We send a message to
+ // our outward peer with sufficient information for it to bypass both bridge
+ // routers with a new central link directly to the other bridge router's
+ // outward peer.
+ if (!first_local_peer && !second_local_peer) {
+ {
+ MultiMutexLock lock(&mutex_, &second_bridge->mutex_);
+ outward_edge_.BeginPrimaryLinkDecay();
+ second_bridge->outward_edge_.BeginPrimaryLinkDecay();
+ bridge_->BeginPrimaryLinkDecay();
+ second_bridge->bridge_->BeginPrimaryLinkDecay();
+ }
+ second_remote_link->BypassPeer(
+ first_remote_link->node_link()->remote_node_name(),
+ first_remote_link->sublink());
+ return;
+ }
+
+ // Case 2: Only one of the bridge routers has a local outward peer. This is
+ // roughly equivalent to the normal proxy bypass case where the proxy and its
+ // outward peer belong to the same node. This case is handled separately since
+ // it's a bit more complex than the cases above and below.
+ if (!second_local_peer) {
+ StartBridgeBypassFromLocalPeer(
+ second_remote_link->node_link()->memory().TryAllocateRouterLinkState());
+ return;
+ } else if (!first_local_peer) {
+ second_bridge->StartBridgeBypassFromLocalPeer(
+ first_remote_link->node_link()->memory().TryAllocateRouterLinkState());
+ return;
+ }
+
+ // Case 3: Both bridge routers' outward peers are local to this node. This is
+ // a unique bypass case, as it's the only scenario where all involved routers
+ // are local to the same node and bypass can be orchestrated synchronously in
+ // a single step.
+ {
+ MultiMutexLock lock(&mutex_, &second_bridge->mutex_,
+ &first_local_peer->mutex_, &second_local_peer->mutex_);
+ const SequenceNumber length_from_first_peer =
+ first_local_peer->outbound_parcels_.current_sequence_number();
+ const SequenceNumber length_from_second_peer =
+ second_local_peer->outbound_parcels_.current_sequence_number();
+
+ RouteEdge& first_peer_edge = first_local_peer->outward_edge_;
+ first_peer_edge.BeginPrimaryLinkDecay();
+ first_peer_edge.set_length_to_decaying_link(length_from_first_peer);
+ first_peer_edge.set_length_from_decaying_link(length_from_second_peer);
+
+ RouteEdge& second_peer_edge = second_local_peer->outward_edge_;
+ second_peer_edge.BeginPrimaryLinkDecay();
+ second_peer_edge.set_length_to_decaying_link(length_from_second_peer);
+ second_peer_edge.set_length_from_decaying_link(length_from_first_peer);
+
+ outward_edge_.BeginPrimaryLinkDecay();
+ outward_edge_.set_length_to_decaying_link(length_from_second_peer);
+ outward_edge_.set_length_from_decaying_link(length_from_first_peer);
+
+ RouteEdge& peer_bridge_outward_edge = second_bridge->outward_edge_;
+ peer_bridge_outward_edge.BeginPrimaryLinkDecay();
+ peer_bridge_outward_edge.set_length_to_decaying_link(
+ length_from_first_peer);
+ peer_bridge_outward_edge.set_length_from_decaying_link(
+ length_from_second_peer);
+
+ bridge_->BeginPrimaryLinkDecay();
+ bridge_->set_length_to_decaying_link(length_from_first_peer);
+ bridge_->set_length_from_decaying_link(length_from_second_peer);
+
+ RouteEdge& peer_bridge = *second_bridge->bridge_;
+ peer_bridge.BeginPrimaryLinkDecay();
+ peer_bridge.set_length_to_decaying_link(length_from_second_peer);
+ peer_bridge.set_length_from_decaying_link(length_from_first_peer);
+
+ RouterLink::Pair links = LocalRouterLink::CreatePair(
+ LinkType::kCentral, Router::Pair(first_local_peer, second_local_peer));
+ first_local_peer->outward_edge_.SetPrimaryLink(std::move(links.first));
+ second_local_peer->outward_edge_.SetPrimaryLink(std::move(links.second));
+ }
+
+ first_bridge->Flush();
+ second_bridge->Flush();
+ first_local_peer->Flush();
+ second_local_peer->Flush();
+}
+
+void Router::StartBridgeBypassFromLocalPeer(
+ FragmentRef<RouterLinkState> link_state) {
+ Ref<Router> local_peer;
+ Ref<Router> other_bridge;
+ {
+ absl::MutexLock lock(&mutex_);
+ if (!bridge_ || !bridge_->is_stable()) {
+ return;
+ }
+
+ local_peer = outward_edge_.GetLocalPeer();
+ other_bridge = bridge_->GetLocalPeer();
+ if (!local_peer || !other_bridge) {
+ return;
+ }
+ }
+
+ Ref<RemoteRouterLink> remote_link;
+ {
+ absl::MutexLock lock(&other_bridge->mutex_);
+ if (!other_bridge->outward_edge_.primary_link()) {
+ return;
+ }
+
+ remote_link = WrapRefCounted(
+ other_bridge->outward_edge_.primary_link()->AsRemoteRouterLink());
+ if (!remote_link) {
+ return;
+ }
+ }
+
+ if (link_state.is_null()) {
+ // We need a new RouterLinkState on the remote link before we can complete
+ // this operation.
+ remote_link->node_link()->memory().AllocateRouterLinkState(
+ [router = WrapRefCounted(this)](FragmentRef<RouterLinkState> state) {
+ if (!state.is_null()) {
+ router->StartBridgeBypassFromLocalPeer(std::move(state));
+ }
+ });
+ return;
+ }
+
+ // At this point, we have a new RouterLinkState for a new link, we have
+ // references to all three local routers (this bridge router, its local peer,
+ // and the other bridge router), and we have a remote link to the other bridge
+ // router's outward peer. This is sufficient to initiate bypass.
+
+ const Ref<NodeLink>& node_link_to_peer = remote_link->node_link();
+ SequenceNumber length_from_local_peer;
+ const SublinkId bypass_sublink =
+ node_link_to_peer->memory().AllocateSublinkIds(1);
+ Ref<RemoteRouterLink> new_link = node_link_to_peer->AddRemoteRouterLink(
+ bypass_sublink, link_state, LinkType::kCentral, LinkSide::kA, local_peer);
+ {
+ MultiMutexLock lock(&mutex_, &other_bridge->mutex_, &local_peer->mutex_);
+
+ length_from_local_peer =
+ local_peer->outbound_parcels_.current_sequence_number();
+
+ RouteEdge& edge_from_local_peer = local_peer->outward_edge_;
+ edge_from_local_peer.BeginPrimaryLinkDecay();
+ edge_from_local_peer.set_length_to_decaying_link(length_from_local_peer);
+
+ RouteEdge& edge_to_other_peer = other_bridge->outward_edge_;
+ edge_to_other_peer.BeginPrimaryLinkDecay();
+ edge_to_other_peer.set_length_to_decaying_link(length_from_local_peer);
+
+ bridge_->BeginPrimaryLinkDecay();
+ bridge_->set_length_to_decaying_link(length_from_local_peer);
+
+ outward_edge_.BeginPrimaryLinkDecay();
+ outward_edge_.set_length_from_decaying_link(length_from_local_peer);
+
+ RouteEdge& other_bridge_edge = *other_bridge->bridge_;
+ other_bridge_edge.BeginPrimaryLinkDecay();
+ other_bridge_edge.set_length_from_decaying_link(length_from_local_peer);
+ }
+
+ remote_link->BypassPeerWithLink(bypass_sublink, std::move(link_state),
+ length_from_local_peer);
+ local_peer->SetOutwardLink(std::move(new_link));
+ Flush();
+ other_bridge->Flush();
+ local_peer->Flush();
+}
+
bool Router::BypassPeerWithNewRemoteLink(
RemoteRouterLink& requestor,
NodeLink& node_link,
@@ -1318,7 +1696,7 @@
// Otherwise immediately begin decay of both links to the proxy.
if (!outward_edge_.BeginPrimaryLinkDecay() ||
!new_local_peer->outward_edge_.BeginPrimaryLinkDecay()) {
- DLOG(ERROR) << "Rejecting RequestProxyBypass on failure to decay link";
+ DLOG(ERROR) << "Rejecting BypassPeer on failure to decay link";
return false;
}
outward_edge_.set_length_to_decaying_link(length_to_proxy_from_us);
diff --git a/src/ipcz/router.h b/src/ipcz/router.h
index 310b773..cb3eca9 100644
--- a/src/ipcz/router.h
+++ b/src/ipcz/router.h
@@ -160,6 +160,12 @@
IpczTrapConditionFlags* satisfied_condition_flags,
IpczPortalStatus* status);
+ // Attempts to merge this Router's route with the route terminated by `other`.
+ // Both `other` and this Router must be terminal routers on their own separate
+ // routes, and neither Router must have transmitted or retreived any parcels
+ // via Put or Get APIs.
+ IpczResult MergeRoute(const Ref<Router>& other);
+
// Deserializes a new Router from `descriptor` received over `from_node_link`.
static Ref<Router> Deserialize(const RouterDescriptor& descriptor,
NodeLink& from_node_link);
@@ -320,6 +326,21 @@
RemoteRouterLink& inward_link,
FragmentRef<RouterLinkState> new_link_state);
+ // Attempts to start bypass of this Router, which must be on a bridge link, as
+ // well bypassing the bridge link itself and the bridge peer router on its
+ // other side. This method will attempt to lock this Router's outward link as
+ // well as the outward link of this Router's bridge peer. If either fails,
+ // both are left unlocked and this operation cannot yet proceed.
+ void MaybeStartBridgeBypass();
+
+ // Starts bypass of this Router, which must be on a bridge link and must have
+ // a local outward peer link. The router on the other side of the bridge must
+ // have a remote outward peer, and `link_state` if non-null will be used to
+ // establish a new remote link to that peer to bypass the entire bridge. If
+ // `link_state` is null, the operation will be deferred until a fragment can
+ // be allocated.
+ void StartBridgeBypassFromLocalPeer(FragmentRef<RouterLinkState> link_state);
+
// Attempts to bypass the link identified by `requestor` in favor of a new
// link that runs over `node_link`. If `new_link_state` is non-null, it will
// be used for the RouterLinkState of the new RemoteRouterLink; otherwise one
@@ -358,6 +379,10 @@
// routers by definition can have no inward edge.
absl::optional<RouteEdge> inward_edge_ ABSL_GUARDED_BY(mutex_);
+ // A special inward edge which when present bridges this route with another
+ // route. This is used only to implement route merging.
+ std::unique_ptr<RouteEdge> bridge_ ABSL_GUARDED_BY(mutex_);
+
// Parcels received from the other end of the route. If this is a terminal
// router, these may be retrieved by the application via a controlling portal;
// otherwise they will be forwarded along `inward_edge_` as soon as possible.
diff --git a/src/merge_portals_test.cc b/src/merge_portals_test.cc
new file mode 100644
index 0000000..a25008a
--- /dev/null
+++ b/src/merge_portals_test.cc
@@ -0,0 +1,129 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <string>
+#include <string_view>
+
+#include "ipcz/ipcz.h"
+#include "test/multinode_test.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace ipcz {
+namespace {
+
+using MergePortalsTestNode = test::TestNode;
+using MergePortalsTest = test::MultinodeTest<MergePortalsTestNode>;
+
+constexpr std::string_view kMessage1 = "bork bork";
+constexpr std::string_view kMessage2 = "aw heck";
+
+MULTINODE_TEST_NODE(MergePortalsTestNode, MergeWithInitialPortalClient) {
+ IpczHandle b = ConnectToBroker();
+
+ std::string message;
+ EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(b, &message));
+ EXPECT_EQ(kMessage1, message);
+ EXPECT_EQ(IPCZ_RESULT_OK, Put(b, kMessage2));
+ WaitForDirectRemoteLink(b);
+ EXPECT_EQ(IPCZ_RESULT_OK, WaitForConditionFlags(b, IPCZ_TRAP_PEER_CLOSED));
+ Close(b);
+}
+
+TEST_P(MergePortalsTest, MergeWithInitialPortal) {
+ IpczHandle c = SpawnTestNode<MergeWithInitialPortalClient>();
+ auto [q, p] = OpenPortals();
+ EXPECT_EQ(IPCZ_RESULT_OK, Merge(c, p));
+ EXPECT_EQ(IPCZ_RESULT_OK, Put(q, kMessage1));
+
+ std::string message;
+ EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(q, &message));
+ EXPECT_EQ(kMessage2, message);
+ WaitForDirectRemoteLink(q);
+ Close(q);
+}
+
+TEST_P(MergePortalsTest, MergeWithClosedLocalPeer) {
+ auto [q, p] = OpenPortals();
+ auto [d, b] = OpenPortals();
+
+ Close(d);
+ EXPECT_EQ(IPCZ_RESULT_OK, Merge(b, p));
+ EXPECT_EQ(IPCZ_RESULT_OK, WaitForConditionFlags(q, IPCZ_TRAP_PEER_CLOSED));
+ Close(q);
+}
+
+MULTINODE_TEST_NODE(MergePortalsTestNode, MergeWithClosedRemotePeerClient) {
+ IpczHandle b = ConnectToBroker();
+ auto [q, p] = OpenPortals();
+ Close(q);
+ IpczHandle r;
+ EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(b, nullptr, {&r, 1}));
+ Merge(p, r);
+ EXPECT_EQ(IPCZ_RESULT_OK, WaitForConditionFlags(b, IPCZ_TRAP_PEER_CLOSED));
+ Close(b);
+}
+
+TEST_P(MergePortalsTest, MergeWithClosedRemotePeer) {
+ IpczHandle c = SpawnTestNode<MergeWithClosedRemotePeerClient>();
+ auto [r, s] = OpenPortals();
+ EXPECT_EQ(IPCZ_RESULT_OK, Put(c, "", {&r, 1}));
+ EXPECT_EQ(IPCZ_RESULT_OK, WaitForConditionFlags(s, IPCZ_TRAP_PEER_CLOSED));
+ CloseAll({c, s});
+}
+
+constexpr size_t kMergeComplexRoutesNumIterations = 1000;
+
+MULTINODE_TEST_NODE(MergePortalsTestNode, MergeComplexRoutesClient) {
+ IpczHandle b = ConnectToBroker();
+ IpczHandle other_client;
+ IpczHandle portal;
+ EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(b, nullptr, {&other_client, 1}));
+ EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(b, nullptr, {&portal, 1}));
+
+ for (size_t i = 0; i < kMergeComplexRoutesNumIterations; ++i) {
+ EXPECT_EQ(IPCZ_RESULT_OK, Put(other_client, "", {&portal, 1}));
+ EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(other_client, nullptr, {&portal, 1}));
+ EXPECT_EQ(IPCZ_RESULT_OK, Put(b, "", {&portal, 1}));
+ EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(b, nullptr, {&portal, 1}));
+ }
+
+ WaitForDirectRemoteLink(portal);
+ PingPong(portal);
+ EXPECT_EQ(IPCZ_RESULT_OK, Put(b, "done"));
+ EXPECT_EQ(IPCZ_RESULT_OK, WaitForConditionFlags(b, IPCZ_TRAP_PEER_CLOSED));
+ CloseAll({b, portal, other_client});
+}
+
+TEST_P(MergePortalsTest, MergeComplexRoutes) {
+ IpczHandle c1 = SpawnTestNode<MergeComplexRoutesClient>();
+ IpczHandle c2 = SpawnTestNode<MergeComplexRoutesClient>();
+
+ auto [q, p] = OpenPortals();
+ EXPECT_EQ(IPCZ_RESULT_OK, Put(c1, "", {&q, 1}));
+ EXPECT_EQ(IPCZ_RESULT_OK, Put(c2, "", {&p, 1}));
+
+ auto [s, t] = OpenPortals();
+ auto [u, v] = OpenPortals();
+ EXPECT_EQ(IPCZ_RESULT_OK, Put(c1, "", {&t, 1}));
+ EXPECT_EQ(IPCZ_RESULT_OK, Put(c2, "", {&v, 1}));
+
+ for (size_t i = 0; i < kMergeComplexRoutesNumIterations; ++i) {
+ EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(c1, nullptr, {&t, 1}));
+ EXPECT_EQ(IPCZ_RESULT_OK, Put(c1, "", {&t, 1}));
+ EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(c2, nullptr, {&v, 1}));
+ EXPECT_EQ(IPCZ_RESULT_OK, Put(c2, "", {&v, 1}));
+ }
+
+ Merge(s, u);
+
+ std::string message;
+ EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(c1, &message));
+ EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(c2, &message));
+ CloseAll({c1, c2});
+}
+
+INSTANTIATE_MULTINODE_TEST_SUITE_P(MergePortalsTest);
+
+} // namespace
+} // namespace ipcz
diff --git a/src/test/test_base.cc b/src/test/test_base.cc
index 13c85cc..087ed1a 100644
--- a/src/test/test_base.cc
+++ b/src/test/test_base.cc
@@ -55,6 +55,10 @@
}
}
+IpczResult TestBase::Merge(IpczHandle a, IpczHandle b) {
+ return ipcz().MergePortals(a, b, IPCZ_NO_FLAGS, nullptr);
+}
+
IpczHandle TestBase::CreateNode(const IpczDriver& driver,
IpczCreateNodeFlags flags) {
IpczHandle node;
diff --git a/src/test/test_base.h b/src/test/test_base.h
index 4786fbf..3b3df98 100644
--- a/src/test/test_base.h
+++ b/src/test/test_base.h
@@ -35,6 +35,7 @@
// Some trivial shorthand methods to access the ipcz API more conveniently.
void Close(IpczHandle handle);
void CloseAll(absl::Span<const IpczHandle> handles);
+ IpczResult Merge(IpczHandle a, IpczHandle b);
IpczHandle CreateNode(const IpczDriver& driver,
IpczCreateNodeFlags flags = IPCZ_NO_FLAGS);
std::pair<IpczHandle, IpczHandle> OpenPortals(IpczHandle node);
diff --git a/src/util/ref_counted.h b/src/util/ref_counted.h
index f57c7d3..d991b50 100644
--- a/src/util/ref_counted.h
+++ b/src/util/ref_counted.h
@@ -96,6 +96,8 @@
T* release() { return static_cast<T*>(ReleaseImpl()); }
+ void swap(Ref<T>& other) noexcept { std::swap(ptr_, other.ptr_); }
+
template <typename H>
friend H AbslHashValue(H h, const Ref<T>& ref) {
return H::combine(std::move(h), ref.get());