ipcz: Implement proxy bypass, part 2 of 2
This is the actual implementation of proxy bypass operations, including
link stabilization, link locking, bypass request and authentication,
link decay, and ultimately proxy removal.
For test coverage, existing RemotePortalTest tests now block shutdown
until their potentially complex routes are reduced to a direct link
between two Routers.
Bug: 1299283
Change-Id: I1516de72e0369543660a84dccd5128b2bf93fcd8
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3780861
Commit-Queue: Ken Rockot <rockot@google.com>
Reviewed-by: Robert Sesek <rsesek@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1030300}
NOKEYCHECK=True
GitOrigin-RevId: 9bf8c693335e2eabbfb832e6b5262cab2a1e0629
diff --git a/src/ipcz/local_router_link.cc b/src/ipcz/local_router_link.cc
index 8e8ee3d..4922f81 100644
--- a/src/ipcz/local_router_link.cc
+++ b/src/ipcz/local_router_link.cc
@@ -18,10 +18,17 @@
class LocalRouterLink::SharedState : public RefCounted {
public:
- SharedState(LinkType type, Ref<Router> router_a, Ref<Router> router_b)
+ SharedState(LinkType type,
+ LocalRouterLink::InitialState initial_state,
+ Ref<Router> router_a,
+ Ref<Router> router_b)
: type_(type),
router_a_(std::move(router_a)),
- router_b_(std::move(router_b)) {}
+ router_b_(std::move(router_b)) {
+ if (initial_state == LocalRouterLink::kStable) {
+ link_state_.status = RouterLinkState::kStable;
+ }
+ }
LinkType type() const { return type_; }
@@ -63,14 +70,14 @@
};
// static
-RouterLink::Pair LocalRouterLink::ConnectRouters(LinkType type,
- const Router::Pair& routers) {
+RouterLink::Pair LocalRouterLink::CreatePair(LinkType type,
+ const Router::Pair& routers,
+ InitialState initial_state) {
ABSL_ASSERT(type == LinkType::kCentral || type == LinkType::kBridge);
- auto state = MakeRefCounted<SharedState>(type, routers.first, routers.second);
+ auto state = MakeRefCounted<SharedState>(type, initial_state, routers.first,
+ routers.second);
auto a = AdoptRef(new LocalRouterLink(LinkSide::kA, state));
auto b = AdoptRef(new LocalRouterLink(LinkSide::kB, state));
- routers.first->SetOutwardLink(a);
- routers.second->SetOutwardLink(b);
return {a, b};
}
@@ -87,8 +94,12 @@
return &state_->link_state();
}
-bool LocalRouterLink::HasLocalPeer(const Router& router) {
- return state_->GetRouter(side_.opposite()).get() == &router;
+Ref<Router> LocalRouterLink::GetLocalPeer() {
+ return state_->GetRouter(side_.opposite());
+}
+
+RemoteRouterLink* LocalRouterLink::AsRemoteRouterLink() {
+ return nullptr;
}
void LocalRouterLink::AcceptParcel(Parcel& parcel) {
@@ -136,7 +147,7 @@
bool LocalRouterLink::FlushOtherSideIfWaiting() {
const LinkSide other_side = side_.opposite();
if (state_->link_state().ResetWaitingBit(other_side)) {
- state_->GetRouter(other_side)->Flush();
+ state_->GetRouter(other_side)->Flush(Router::kForceProxyBypassAttempt);
return true;
}
return false;
diff --git a/src/ipcz/local_router_link.h b/src/ipcz/local_router_link.h
index e17d5a1..77291e8 100644
--- a/src/ipcz/local_router_link.h
+++ b/src/ipcz/local_router_link.h
@@ -21,15 +21,22 @@
class LocalRouterLink : public RouterLink {
public:
// Creates a new pair of LocalRouterLinks linking the given pair of Routers
- // together. The Routers must not currently have outward links. `type` must
- // be either kCentral or kBridge, as local links may never be peripheral.
- static RouterLink::Pair ConnectRouters(LinkType type,
- const Router::Pair& routers);
+ // together. `type` must be either kCentral or kBridge, as local links may
+ // never be peripheral. `initial_state` determines whether the new link starts
+ // in a stable state.
+ //
+ // It is the caller's responsibilty to give the returned links to their
+ // respective Routers.
+ enum InitialState { kUnstable, kStable };
+ static RouterLink::Pair CreatePair(LinkType type,
+ const Router::Pair& routers,
+ InitialState initial_state = kUnstable);
// RouterLink:
LinkType GetType() const override;
RouterLinkState* GetLinkState() const override;
- bool HasLocalPeer(const Router& router) override;
+ Ref<Router> GetLocalPeer() override;
+ RemoteRouterLink* AsRemoteRouterLink() override;
void AcceptParcel(Parcel& parcel) override;
void AcceptRouteClosure(SequenceNumber sequence_length) override;
void AcceptRouteDisconnected() override;
diff --git a/src/ipcz/node_link.cc b/src/ipcz/node_link.cc
index 68d7ba4..ed87b2c 100644
--- a/src/ipcz/node_link.cc
+++ b/src/ipcz/node_link.cc
@@ -442,7 +442,7 @@
}
return receiver->AcceptBypassLink(
- WrapRefCounted(this), accept.params().new_sublink, std::move(link_state),
+ *this, accept.params().new_sublink, std::move(link_state),
accept.params().inbound_sequence_length_from_bypassed_link);
}
@@ -477,9 +477,9 @@
if (link_state.is_null()) {
return false;
}
- return router->BypassPeerWithLink(*this, bypass.params().new_sublink,
- std::move(link_state),
- bypass.params().inbound_sequence_length);
+ return router->AcceptBypassLink(*this, bypass.params().new_sublink,
+ std::move(link_state),
+ bypass.params().inbound_sequence_length);
}
bool NodeLink::OnStopProxyingToLocalPeer(msg::StopProxyingToLocalPeer& stop) {
@@ -494,7 +494,7 @@
bool NodeLink::OnFlushRouter(msg::FlushRouter& flush) {
if (Ref<Router> router = GetRouter(flush.params().sublink)) {
- router->Flush();
+ router->Flush(Router::kForceProxyBypassAttempt);
}
return true;
}
diff --git a/src/ipcz/node_link_test.cc b/src/ipcz/node_link_test.cc
index 94e91c1..b504e29 100644
--- a/src/ipcz/node_link_test.cc
+++ b/src/ipcz/node_link_test.cc
@@ -65,12 +65,13 @@
auto [link0, link1] = LinkNodes(node0, node1);
auto router0 = MakeRefCounted<Router>();
auto router1 = MakeRefCounted<Router>();
+ FragmentRef<RouterLinkState> link_state =
+ link0->memory().GetInitialRouterLinkState(0);
router0->SetOutwardLink(link0->AddRemoteRouterLink(
- SublinkId(0), link0->memory().GetInitialRouterLinkState(0),
- LinkType::kCentral, LinkSide::kA, router0));
+ SublinkId(0), link_state, LinkType::kCentral, LinkSide::kA, router0));
router1->SetOutwardLink(link1->AddRemoteRouterLink(
- SublinkId(0), link0->memory().GetInitialRouterLinkState(0),
- LinkType::kCentral, LinkSide::kB, router1));
+ SublinkId(0), link_state, LinkType::kCentral, LinkSide::kB, router1));
+ link_state->status = RouterLinkState::kStable;
EXPECT_FALSE(router1->IsPeerClosed());
router0->CloseRoute();
diff --git a/src/ipcz/portal.cc b/src/ipcz/portal.cc
index 025258b..4de1d7c 100644
--- a/src/ipcz/portal.cc
+++ b/src/ipcz/portal.cc
@@ -46,7 +46,10 @@
DVLOG(5) << "Created new portal pair with routers " << routers.first.get()
<< " and " << routers.second.get();
- LocalRouterLink::ConnectRouters(LinkType::kCentral, routers);
+ auto links = LocalRouterLink::CreatePair(LinkType::kCentral, routers,
+ LocalRouterLink::kStable);
+ routers.first->SetOutwardLink(std::move(links.first));
+ routers.second->SetOutwardLink(std::move(links.second));
return {MakeRefCounted<Portal>(node, std::move(routers.first)),
MakeRefCounted<Portal>(node, std::move(routers.second))};
}
diff --git a/src/ipcz/remote_router_link.cc b/src/ipcz/remote_router_link.cc
index 0c2be7f..4602db6 100644
--- a/src/ipcz/remote_router_link.cc
+++ b/src/ipcz/remote_router_link.cc
@@ -84,7 +84,7 @@
MarkSideStable();
}
if (Ref<Router> router = node_link()->GetRouter(sublink_)) {
- router->Flush();
+ router->Flush(Router::kForceProxyBypassAttempt);
}
}
@@ -96,8 +96,12 @@
return link_state_.load(std::memory_order_acquire);
}
-bool RemoteRouterLink::HasLocalPeer(const Router& router) {
- return false;
+Ref<Router> RemoteRouterLink::GetLocalPeer() {
+ return nullptr;
+}
+
+RemoteRouterLink* RemoteRouterLink::AsRemoteRouterLink() {
+ return this;
}
void RemoteRouterLink::AcceptParcel(Parcel& parcel) {
diff --git a/src/ipcz/remote_router_link.h b/src/ipcz/remote_router_link.h
index 859dd54..98950ab 100644
--- a/src/ipcz/remote_router_link.h
+++ b/src/ipcz/remote_router_link.h
@@ -51,7 +51,8 @@
// RouterLink:
LinkType GetType() const override;
RouterLinkState* GetLinkState() const override;
- bool HasLocalPeer(const Router& router) override;
+ Ref<Router> GetLocalPeer() override;
+ RemoteRouterLink* AsRemoteRouterLink() override;
void AcceptParcel(Parcel& parcel) override;
void AcceptRouteClosure(SequenceNumber sequence_length) override;
void AcceptRouteDisconnected() override;
diff --git a/src/ipcz/route_edge_test.cc b/src/ipcz/route_edge_test.cc
index 53134e4..8d84a24 100644
--- a/src/ipcz/route_edge_test.cc
+++ b/src/ipcz/route_edge_test.cc
@@ -25,9 +25,7 @@
auto a = MakeRefCounted<Router>();
auto b = MakeRefCounted<Router>();
auto [a_link, b_blink] =
- LocalRouterLink::ConnectRouters(LinkType::kCentral, {a, b});
- a->CloseRoute();
- b->CloseRoute();
+ LocalRouterLink::CreatePair(LinkType::kCentral, {a, b});
return a_link;
}
};
diff --git a/src/ipcz/router.cc b/src/ipcz/router.cc
index 779e6fc..1badab2 100644
--- a/src/ipcz/router.cc
+++ b/src/ipcz/router.cc
@@ -10,6 +10,7 @@
#include <utility>
#include "ipcz/ipcz.h"
+#include "ipcz/local_router_link.h"
#include "ipcz/node_link.h"
#include "ipcz/remote_router_link.h"
#include "ipcz/sequence_number.h"
@@ -18,6 +19,7 @@
#include "third_party/abseil-cpp/absl/container/inlined_vector.h"
#include "third_party/abseil-cpp/absl/synchronization/mutex.h"
#include "util/log.h"
+#include "util/multi_mutex_lock.h"
namespace ipcz {
@@ -82,6 +84,15 @@
return (status_.flags & IPCZ_PORTAL_STATUS_DEAD) != 0;
}
+bool Router::IsOnCentralRemoteLink() {
+ absl::MutexLock lock(&mutex_);
+ // This may only be called on terminal Routers.
+ ABSL_ASSERT(!inward_edge_);
+ return outward_edge_.primary_link() && outward_edge_.is_stable() &&
+ outward_edge_.primary_link()->GetType().is_central() &&
+ !outward_edge_.primary_link()->GetLocalPeer();
+}
+
void Router::QueryStatus(IpczPortalStatus& status) {
absl::MutexLock lock(&mutex_);
const size_t size = std::min(status.size, status_.size);
@@ -92,7 +103,7 @@
bool Router::HasLocalPeer(Router& router) {
absl::MutexLock lock(&mutex_);
return outward_edge_.primary_link() &&
- outward_edge_.primary_link()->HasLocalPeer(router);
+ outward_edge_.primary_link()->GetLocalPeer() == &router;
}
IpczResult Router::SendOutboundParcel(Parcel& parcel) {
@@ -149,6 +160,14 @@
{
absl::MutexLock lock(&mutex_);
+
+ // If we have a stable inward edge (or none at all), and the outward edge
+ // is stable too, our new link can be marked stable from our side.
+ if (link->GetType().is_central() && outward_edge_.is_stable() &&
+ (!inward_edge_ || inward_edge_->is_stable())) {
+ link->MarkSideStable();
+ }
+
if (!is_disconnected_) {
outward_edge_.SetPrimaryLink(std::move(link));
}
@@ -161,7 +180,7 @@
return;
}
- Flush();
+ Flush(kForceProxyBypassAttempt);
}
bool Router::AcceptInboundParcel(Parcel& parcel) {
@@ -398,8 +417,7 @@
DVLOG(4) << "Disconnected new Router immediately after deserialization";
router->AcceptRouteDisconnectedFrom(LinkType::kPeripheralOutward);
}
-
- router->Flush();
+ router->Flush(kForceProxyBypassAttempt);
return router;
}
@@ -493,46 +511,242 @@
// We may have inbound parcels queued which need to be forwarded to the new
// Router, so give them a chance to be flushed out.
- Flush();
+ Flush(kForceProxyBypassAttempt);
}
bool Router::BypassPeer(RemoteRouterLink& requestor,
const NodeName& bypass_target_node,
SublinkId bypass_target_sublink) {
- // TODO: Implement this.
- return true;
+ NodeLink& from_node_link = *requestor.node_link();
+
+ // Validate that the source of this request is actually our peripheral outward
+ // peer, and that we are therefore its inward peer.
+ {
+ absl::MutexLock lock(&mutex_);
+ const Ref<RouterLink>& outward_link = outward_edge_.primary_link();
+ if (!outward_link) {
+ // This Router may have been disconnected already due to some other
+ // failure along the route. This is not the fault of the requestor, so we
+ // silently ignore the request.
+ return true;
+ }
+
+ if (outward_link != &requestor ||
+ !outward_link->GetType().is_peripheral_outward()) {
+ DLOG(ERROR) << "Rejecting RequestProxyBypass received on "
+ << requestor.Describe() << " with existing "
+ << outward_edge_.primary_link()->Describe();
+ return false;
+ }
+ }
+
+ // There are two distinct cases to handle. The first case here is when the
+ // proxy's outward peer lives on a different node from us.
+ if (bypass_target_node != from_node_link.local_node_name()) {
+ Ref<NodeLink> link_to_bypass_target =
+ from_node_link.node()->GetLink(bypass_target_node);
+ if (link_to_bypass_target) {
+ return BypassPeerWithNewRemoteLink(
+ requestor, *link_to_bypass_target, bypass_target_sublink,
+ link_to_bypass_target->memory().TryAllocateRouterLinkState());
+ }
+
+ // We need to establish a link to the target node before we can proceed.
+ from_node_link.node()->EstablishLink(
+ bypass_target_node,
+ [router = WrapRefCounted(this), requestor = WrapRefCounted(&requestor),
+ bypass_target_sublink](NodeLink* link_to_bypass_target) {
+ if (!link_to_bypass_target) {
+ DLOG(ERROR) << "Disconnecting Router due to failed introduction";
+ router->AcceptRouteDisconnectedFrom(LinkType::kPeripheralOutward);
+ return;
+ }
+
+ router->BypassPeerWithNewRemoteLink(
+ *requestor, *link_to_bypass_target, bypass_target_sublink,
+ link_to_bypass_target->memory().TryAllocateRouterLinkState());
+ });
+ return true;
+ }
+
+ // The second case is when the proxy's outward peer lives on our own node.
+ return BypassPeerWithNewLocalLink(requestor, bypass_target_sublink);
}
bool Router::AcceptBypassLink(
- Ref<NodeLink> new_node_link,
+ NodeLink& new_node_link,
SublinkId new_sublink,
FragmentRef<RouterLinkState> new_link_state,
SequenceNumber inbound_sequence_length_from_bypassed_link) {
- // TODO: Implement this.
+ SequenceNumber length_to_proxy_from_us;
+ Ref<RemoteRouterLink> old_link;
+ Ref<RemoteRouterLink> new_link;
+ {
+ absl::ReleasableMutexLock lock(&mutex_);
+ if (is_disconnected_ || !outward_edge_.primary_link()) {
+ // We've already been unexpectedly disconnected from the proxy, so the
+ // route is dysfunctional. Don't establish new links.
+ DVLOG(4) << "Discarding proxy bypass link due to peer disconnection";
+ return true;
+ }
+
+ old_link =
+ WrapRefCounted(outward_edge_.primary_link()->AsRemoteRouterLink());
+ if (!old_link) {
+ // It only makes sense to receive this at a router whose outward link is
+ // remote. If we have a non-remote outward link, something is wrong.
+ DVLOG(4) << "Rejecting unexpected bypass link";
+ return false;
+ }
+
+ if (old_link->node_link() != &new_node_link &&
+ !old_link->CanNodeRequestBypass(new_node_link.remote_node_name())) {
+ // The new link must either go to the same node as the old link, or the
+ // the old link must have been expecting a bypass link to the new node.
+ DLOG(ERROR) << "Rejecting unauthorized BypassProxy";
+ return false;
+ }
+
+ length_to_proxy_from_us = outbound_parcels_.current_sequence_number();
+ if (!outward_edge_.BeginPrimaryLinkDecay()) {
+ DLOG(ERROR) << "Rejecting BypassProxy on failure to decay link";
+ return false;
+ }
+
+ // By convention the initiator of a bypass assumes side A of the bypass
+ // link, so we assume side B.
+ new_link = new_node_link.AddRemoteRouterLink(
+ new_sublink, std::move(new_link_state), LinkType::kCentral,
+ LinkSide::kB, WrapRefCounted(this));
+
+ if (new_link) {
+ DVLOG(4) << "Bypassing proxy on other end of " << old_link->Describe()
+ << " using a new " << new_link->Describe()
+ << " with length to proxy " << length_to_proxy_from_us
+ << " and length from proxy "
+ << inbound_sequence_length_from_bypassed_link;
+
+ outward_edge_.set_length_to_decaying_link(length_to_proxy_from_us);
+ outward_edge_.set_length_from_decaying_link(
+ inbound_sequence_length_from_bypassed_link);
+ outward_edge_.SetPrimaryLink(new_link);
+ }
+ }
+
+ if (!new_link) {
+ AcceptRouteDisconnectedFrom(LinkType::kCentral);
+ return true;
+ }
+
+ if (new_link->node_link() == old_link->node_link()) {
+ // If the new link goes to the same place as the old link, we only need
+ // to tell the proxy there to stop proxying. It has already conspired with
+ // its local outward peer.
+ old_link->StopProxyingToLocalPeer(length_to_proxy_from_us);
+ } else {
+ // Otherwise, tell the proxy to stop proxying and let its inward peer (our
+ // new outward peer) know that the proxy will stop.
+ old_link->StopProxying(length_to_proxy_from_us,
+ inbound_sequence_length_from_bypassed_link);
+ new_link->ProxyWillStop(length_to_proxy_from_us);
+ }
+
+ Flush();
return true;
}
bool Router::StopProxying(SequenceNumber inbound_sequence_length,
SequenceNumber outbound_sequence_length) {
- // TODO: Implement this.
+ {
+ absl::MutexLock lock(&mutex_);
+ if (outward_edge_.is_stable() || !inward_edge_ ||
+ inward_edge_->is_stable()) {
+ // Proxies begin decaying their links before requesting to be bypassed,
+ // and they don't adopt new links after that. So if either edge is stable
+ // then someone is doing something wrong.
+ DLOG(ERROR) << "Rejecting StopProxying on invalid or non-proxying Router";
+ return false;
+ }
+
+ inward_edge_->set_length_to_decaying_link(inbound_sequence_length);
+ inward_edge_->set_length_from_decaying_link(outbound_sequence_length);
+ outward_edge_.set_length_to_decaying_link(outbound_sequence_length);
+ outward_edge_.set_length_from_decaying_link(inbound_sequence_length);
+ }
+
+ Flush();
return true;
}
bool Router::NotifyProxyWillStop(SequenceNumber inbound_sequence_length) {
- // TODO: Implement this.
- return true;
-}
+ {
+ absl::MutexLock lock(&mutex_);
+ if (outward_edge_.is_stable()) {
+ // If the outward edge is already stable, either this request is invalid,
+ // or we've lost all links due to disconnection. In the latter case we
+ // can silently ignore this, but the former case is a validation failure.
+ return is_disconnected_;
+ }
-bool Router::BypassPeerWithLink(NodeLink& from_node_link,
- SublinkId new_sublink,
- FragmentRef<RouterLinkState> new_link_state,
- SequenceNumber inbound_sequence_length) {
- // TODO: Implement this.
+ DVLOG(4) << "Bypassed proxy will stop forwarding inbound parcels after a "
+ << "sequence length of " << inbound_sequence_length;
+
+ outward_edge_.set_length_from_decaying_link(inbound_sequence_length);
+ }
+
+ Flush();
return true;
}
bool Router::StopProxyingToLocalPeer(SequenceNumber outbound_sequence_length) {
- // TODO: Implement this.
+ Ref<Router> local_peer;
+ {
+ absl::MutexLock lock(&mutex_);
+ if (outward_edge_.decaying_link()) {
+ local_peer = outward_edge_.decaying_link()->GetLocalPeer();
+ } else {
+ // Ignore this request if we've been unexpectedly disconnected.
+ return is_disconnected_;
+ }
+ }
+
+ if (!local_peer) {
+ // It's invalid to send call this on a Router with a non-local outward peer.
+ DLOG(ERROR) << "Rejecting StopProxyingToLocalPeer with no local peer";
+ return false;
+ }
+
+ {
+ MultiMutexLock lock(&mutex_, &local_peer->mutex_);
+ const Ref<RouterLink>& our_link = outward_edge_.decaying_link();
+ const Ref<RouterLink>& peer_link =
+ local_peer->outward_edge_.decaying_link();
+ if (!our_link || !peer_link) {
+ // Either Router may have been unexpectedly disconnected, in which case
+ // we can ignore this request.
+ return true;
+ }
+
+ if (!inward_edge_ || our_link->GetLocalPeer() != local_peer ||
+ peer_link->GetLocalPeer() != this) {
+ // Consistency check: this must be a proxying router, and both this router
+ // and its local peer must link to each other.
+ DLOG(ERROR) << "Rejecting StopProxyingToLocalPeer at invalid proxy";
+ return false;
+ }
+
+ DVLOG(4) << "Stopping proxy with decaying "
+ << inward_edge_->decaying_link()->Describe() << " and decaying "
+ << our_link->Describe();
+
+ local_peer->outward_edge_.set_length_from_decaying_link(
+ outbound_sequence_length);
+ outward_edge_.set_length_to_decaying_link(outbound_sequence_length);
+ inward_edge_->set_length_from_decaying_link(outbound_sequence_length);
+ }
+
+ Flush();
+ local_peer->Flush();
return true;
}
@@ -561,7 +775,7 @@
}
}
-void Router::Flush() {
+void Router::Flush(FlushBehavior behavior) {
Ref<RouterLink> outward_link;
Ref<RouterLink> inward_link;
Ref<RouterLink> decaying_outward_link;
@@ -570,6 +784,10 @@
Ref<RouterLink> dead_outward_link;
absl::optional<SequenceNumber> final_inward_sequence_length;
absl::optional<SequenceNumber> final_outward_sequence_length;
+ bool on_central_link = false;
+ bool inward_link_decayed = false;
+ bool outward_link_decayed = false;
+ bool dropped_last_decaying_link = false;
ParcelsToFlush parcels_to_flush;
{
absl::MutexLock lock(&mutex_);
@@ -581,6 +799,7 @@
decaying_outward_link = outward_edge_.decaying_link();
decaying_inward_link =
inward_edge_ ? inward_edge_->decaying_link() : nullptr;
+ on_central_link = outward_link && outward_link->GetType().is_central();
// Collect any parcels which are safe to transmit now. Note that we do not
// transmit anything or generally call into any RouterLinks while `mutex_`
@@ -591,11 +810,53 @@
// is released further below.
CollectParcelsToFlush(outbound_parcels_, outward_edge_, parcels_to_flush);
- if (inward_edge_) {
- CollectParcelsToFlush(inbound_parcels_, *inward_edge_, parcels_to_flush);
+ const SequenceNumber outbound_sequence_length_sent =
+ outbound_parcels_.current_sequence_number();
+ const SequenceNumber inbound_sequence_length_received =
+ inbound_parcels_.GetCurrentSequenceLength();
+ if (outward_edge_.MaybeFinishDecay(outbound_sequence_length_sent,
+ inbound_sequence_length_received)) {
+ DVLOG(4) << "Outward " << decaying_outward_link->Describe()
+ << " fully decayed at " << outbound_sequence_length_sent
+ << " sent and " << inbound_sequence_length_received
+ << " recived";
+ outward_link_decayed = true;
}
- if (outward_link && outbound_parcels_.IsSequenceFullyConsumed()) {
+ if (inward_edge_) {
+ CollectParcelsToFlush(inbound_parcels_, *inward_edge_, parcels_to_flush);
+ const SequenceNumber inbound_sequence_length_sent =
+ inbound_parcels_.current_sequence_number();
+ const SequenceNumber outbound_sequence_length_received =
+ outbound_parcels_.GetCurrentSequenceLength();
+ if (inward_edge_->MaybeFinishDecay(inbound_sequence_length_sent,
+ outbound_sequence_length_received)) {
+ DVLOG(4) << "Inward " << decaying_inward_link->Describe()
+ << " fully decayed at " << inbound_sequence_length_sent
+ << " sent and " << outbound_sequence_length_received
+ << " received";
+ inward_link_decayed = true;
+ }
+ }
+
+ // If we're dropping the last of our decaying links, our outward link may
+ // now be stable. This may unblock proxy bypass or other operations.
+ const bool inward_edge_stable =
+ !decaying_inward_link || inward_link_decayed;
+ const bool outward_edge_stable =
+ outward_link && (!decaying_outward_link || outward_link_decayed);
+ const bool both_edges_stable = inward_edge_stable && outward_edge_stable;
+ const bool either_link_decayed =
+ inward_link_decayed || outward_link_decayed;
+ if (on_central_link && either_link_decayed && both_edges_stable) {
+ DVLOG(4) << "Router with fully decayed links may be eligible for bypass "
+ << " with outward " << outward_link->Describe();
+ outward_link->MarkSideStable();
+ dropped_last_decaying_link = true;
+ }
+
+ if (on_central_link && outbound_parcels_.IsSequenceFullyConsumed() &&
+ outward_link->TryLockForClosure()) {
// Notify the other end of the route that this end is closed. See the
// AcceptRouteClosure() invocation further below.
final_outward_sequence_length =
@@ -627,6 +888,14 @@
parcel.link->AcceptParcel(parcel.parcel);
}
+ if (outward_link_decayed) {
+ decaying_outward_link->Deactivate();
+ }
+
+ if (inward_link_decayed) {
+ decaying_inward_link->Deactivate();
+ }
+
if (dead_outward_link) {
if (final_outward_sequence_length) {
dead_outward_link->AcceptRouteClosure(*final_outward_sequence_length);
@@ -640,6 +909,298 @@
}
dead_inward_link->Deactivate();
}
+
+ if (dead_outward_link || !on_central_link) {
+ // If we're not on a central link, there's no more work to do.
+ return;
+ }
+
+ if (!dropped_last_decaying_link && behavior != kForceProxyBypassAttempt) {
+ // No relevant state changes, so there are no new bypass opportunities.
+ return;
+ }
+
+ if (inward_link && MaybeStartSelfBypass()) {
+ return;
+ }
+
+ if (outward_link) {
+ outward_link->FlushOtherSideIfWaiting();
+ }
+}
+
+bool Router::MaybeStartSelfBypass() {
+ Ref<RemoteRouterLink> remote_inward_link;
+ Ref<RemoteRouterLink> remote_outward_link;
+ Ref<Router> local_outward_peer;
+ {
+ absl::MutexLock lock(&mutex_);
+ if (!inward_edge_ || !inward_edge_->primary_link() ||
+ !inward_edge_->is_stable()) {
+ // Only a proxy with stable links can be bypassed.
+ return false;
+ }
+
+ const Ref<RouterLink>& outward_link = outward_edge_.primary_link();
+ RemoteRouterLink* inward_link =
+ inward_edge_->primary_link()->AsRemoteRouterLink();
+ if (!outward_link || !inward_link) {
+ return false;
+ }
+
+ const NodeName& inward_peer_name =
+ inward_link->node_link()->remote_node_name();
+ if (!outward_link->TryLockForBypass(inward_peer_name)) {
+ DVLOG(4) << "Proxy bypass blocked by busy " << outward_link->Describe();
+ return false;
+ }
+
+ remote_inward_link = WrapRefCounted(inward_link);
+ local_outward_peer = outward_link->GetLocalPeer();
+ if (!local_outward_peer) {
+ remote_outward_link = WrapRefCounted(outward_link->AsRemoteRouterLink());
+ }
+ }
+
+ if (remote_outward_link) {
+ // The simpler case here: our outward peer is on another node, so we begin
+ // decaying our inward and outward links and ask the inward peer to bypass
+ // us ASAP.
+ {
+ absl::MutexLock lock(&mutex_);
+ if (!inward_edge_ || !inward_edge_->primary_link() ||
+ !outward_edge_.primary_link()) {
+ // We've been disconnected since leaving the block above. Nothing to do.
+ return false;
+ }
+
+ outward_edge_.BeginPrimaryLinkDecay();
+ inward_edge_->BeginPrimaryLinkDecay();
+ }
+
+ DVLOG(4) << "Proxy sending bypass request to inward peer over "
+ << remote_inward_link->Describe()
+ << " targeting outward peer on other side of "
+ << remote_outward_link->Describe();
+
+ remote_inward_link->BypassPeer(
+ remote_outward_link->node_link()->remote_node_name(),
+ remote_outward_link->sublink());
+ return true;
+ }
+
+ // When the bypass target is local to the same node as this router, we can
+ // establish the bypass link immediately and send it to the remote inward
+ // peer.
+ return StartSelfBypassToLocalPeer(
+ *local_outward_peer, *remote_inward_link,
+ remote_inward_link->node_link()->memory().TryAllocateRouterLinkState());
+}
+
+bool Router::StartSelfBypassToLocalPeer(
+ Router& local_outward_peer,
+ RemoteRouterLink& inward_link,
+ FragmentRef<RouterLinkState> new_link_state) {
+ if (new_link_state.is_null()) {
+ NodeLinkMemory& memory = inward_link.node_link()->memory();
+ memory.AllocateRouterLinkState(
+ [router = WrapRefCounted(this),
+ local_outward_peer = WrapRefCounted(&local_outward_peer),
+ inward_link = WrapRefCounted(&inward_link)](
+ FragmentRef<RouterLinkState> new_link_state) {
+ router->StartSelfBypassToLocalPeer(*local_outward_peer, *inward_link,
+ std::move(new_link_state));
+ });
+ return true;
+ }
+
+ Ref<RemoteRouterLink> new_link;
+ SequenceNumber length_from_outward_peer;
+ const SublinkId new_sublink =
+ inward_link.node_link()->memory().AllocateSublinkIds(1);
+ {
+ MultiMutexLock lock(&mutex_, &local_outward_peer.mutex_);
+
+ const Ref<RouterLink>& outward_link = outward_edge_.primary_link();
+ const Ref<RouterLink>& peer_outward_link =
+ local_outward_peer.outward_edge_.primary_link();
+ if (!outward_link || !peer_outward_link || is_disconnected_ ||
+ local_outward_peer.is_disconnected_) {
+ DVLOG(4) << "Proxy bypass blocked due to peer closure or disconnection";
+ return false;
+ }
+
+ DVLOG(4) << "Proxy requesting own bypass from inward peer on "
+ << inward_link.node_link()->remote_node_name().ToString()
+ << " to local outward peer";
+
+ ABSL_ASSERT(outward_link->GetLocalPeer() == &local_outward_peer);
+ ABSL_ASSERT(peer_outward_link->GetLocalPeer() == this);
+
+ // Decay both of our existing links, as well as the local peer's link to us.
+ length_from_outward_peer =
+ local_outward_peer.outbound_parcels_.current_sequence_number();
+ local_outward_peer.outward_edge_.BeginPrimaryLinkDecay();
+ local_outward_peer.outward_edge_.set_length_to_decaying_link(
+ length_from_outward_peer);
+ outward_edge_.BeginPrimaryLinkDecay();
+ outward_edge_.set_length_from_decaying_link(length_from_outward_peer);
+ inward_edge_->BeginPrimaryLinkDecay();
+ inward_edge_->set_length_to_decaying_link(length_from_outward_peer);
+
+ new_link = inward_link.node_link()->AddRemoteRouterLink(
+ new_sublink, new_link_state, LinkType::kCentral, LinkSide::kA,
+ WrapRefCounted(&local_outward_peer));
+ }
+
+ if (!new_link) {
+ AcceptRouteDisconnectedFrom(LinkType::kCentral);
+ return false;
+ }
+
+ // Inform our inward peer on another node that they can bypass us using the
+ // new link we just created to our own outward local peer. Once that message
+ // is sent, it's safe for that local peer to adopt the new link.
+ inward_link.BypassPeerWithLink(new_sublink, std::move(new_link_state),
+ length_from_outward_peer);
+ local_outward_peer.SetOutwardLink(std::move(new_link));
+ return true;
+}
+
+bool Router::BypassPeerWithNewRemoteLink(
+ RemoteRouterLink& requestor,
+ NodeLink& node_link,
+ SublinkId bypass_target_sublink,
+ FragmentRef<RouterLinkState> new_link_state) {
+ if (new_link_state.is_null()) {
+ // We can't proceed with bypass until we have a fragment allocated for a new
+ // RouterLinkState.
+ node_link.memory().AllocateRouterLinkState(
+ [router = WrapRefCounted(this), requestor = WrapRefCounted(&requestor),
+ node_link = WrapRefCounted(&node_link),
+ bypass_target_sublink](FragmentRef<RouterLinkState> new_link_state) {
+ router->BypassPeerWithNewRemoteLink(*requestor, *node_link,
+ bypass_target_sublink,
+ std::move(new_link_state));
+ });
+ return true;
+ }
+
+ // Begin decaying our outward link.
+ SequenceNumber length_to_decaying_link;
+ Ref<RouterLink> new_link;
+ const SublinkId new_sublink = node_link.memory().AllocateSublinkIds(1);
+ {
+ absl::ReleasableMutexLock lock(&mutex_);
+ if (!outward_edge_.primary_link() || is_disconnected_) {
+ // We've been disconnected since leaving the above block. Don't bother
+ // to request a bypass. This is not the requestor's fault, so it's not
+ // treated as an error.
+ return true;
+ }
+
+ if (!outward_edge_.BeginPrimaryLinkDecay()) {
+ DLOG(ERROR) << "Rejecting BypassPeer on failure to decay link";
+ return false;
+ }
+
+ length_to_decaying_link = outbound_parcels_.current_sequence_number();
+ outward_edge_.set_length_to_decaying_link(length_to_decaying_link);
+ new_link = node_link.AddRemoteRouterLink(new_sublink, new_link_state,
+ LinkType::kCentral, LinkSide::kA,
+ WrapRefCounted(this));
+ }
+
+ if (!new_link) {
+ // The NodeLink was disconnected before we could create a new link for
+ // this Router. This is not the requestor's fault, so it's not treated as
+ // an error.
+ AcceptRouteDisconnectedFrom(LinkType::kCentral);
+ return true;
+ }
+
+ const NodeName proxy_node_name = requestor.node_link()->remote_node_name();
+ DVLOG(4) << "Sending AcceptBypassLink from "
+ << node_link.local_node_name().ToString() << " to "
+ << node_link.remote_node_name().ToString() << " with new sublink "
+ << new_sublink << " to replace a link to proxy "
+ << proxy_node_name.ToString() << " via sublink "
+ << bypass_target_sublink;
+
+ node_link.AcceptBypassLink(proxy_node_name, bypass_target_sublink,
+ length_to_decaying_link, new_sublink,
+ std::move(new_link_state));
+
+ // NOTE: This link is intentionally set *after* transmitting the
+ // above message. Otherwise the router might race on another thread to send
+ // messages via `new_sublink`, and the remote node would have no idea where
+ // to route them.
+ SetOutwardLink(std::move(new_link));
+ return true;
+}
+
+bool Router::BypassPeerWithNewLocalLink(RemoteRouterLink& requestor,
+ SublinkId bypass_target_sublink) {
+ NodeLink& from_node_link = *requestor.node_link();
+ const Ref<Router> new_local_peer =
+ from_node_link.GetRouter(bypass_target_sublink);
+ if (!new_local_peer) {
+ // The peer may have already been destroyed or disconnected from the proxy
+ // by the time we get here.
+ AcceptRouteDisconnectedFrom(LinkType::kPeripheralOutward);
+ return true;
+ }
+
+ Ref<RouterLink> link_from_new_local_peer_to_proxy;
+ SequenceNumber length_to_proxy_from_us;
+ SequenceNumber length_from_proxy_to_us;
+ {
+ MultiMutexLock lock(&mutex_, &new_local_peer->mutex_);
+ length_from_proxy_to_us =
+ new_local_peer->outbound_parcels_.current_sequence_number();
+ length_to_proxy_from_us = outbound_parcels_.current_sequence_number();
+
+ DVLOG(4) << "Proxy bypass requested with new local peer on "
+ << from_node_link.local_node_name().ToString() << " and proxy on "
+ << from_node_link.remote_node_name().ToString() << " via sublinks "
+ << bypass_target_sublink << " and " << requestor.sublink()
+ << "; length to the proxy is " << length_to_proxy_from_us
+ << " and length from the proxy " << length_from_proxy_to_us;
+
+ link_from_new_local_peer_to_proxy =
+ new_local_peer->outward_edge_.primary_link();
+ if (!outward_edge_.primary_link() || !link_from_new_local_peer_to_proxy ||
+ is_disconnected_ || new_local_peer->is_disconnected_) {
+ return true;
+ }
+
+ // Otherwise immediately begin decay of both links to the proxy.
+ if (!outward_edge_.BeginPrimaryLinkDecay() ||
+ !new_local_peer->outward_edge_.BeginPrimaryLinkDecay()) {
+ DLOG(ERROR) << "Rejecting RequestProxyBypass on failure to decay link";
+ return false;
+ }
+ outward_edge_.set_length_to_decaying_link(length_to_proxy_from_us);
+ outward_edge_.set_length_from_decaying_link(length_from_proxy_to_us);
+ new_local_peer->outward_edge_.set_length_to_decaying_link(
+ length_from_proxy_to_us);
+ new_local_peer->outward_edge_.set_length_from_decaying_link(
+ length_to_proxy_from_us);
+
+ // Finally, link the two routers with a new LocalRouterLink. This link will
+ // remain unstable until the decaying proxy links are gone.
+ RouterLink::Pair links = LocalRouterLink::CreatePair(
+ LinkType::kCentral, Router::Pair(WrapRefCounted(this), new_local_peer));
+ outward_edge_.SetPrimaryLink(std::move(links.first));
+ new_local_peer->outward_edge_.SetPrimaryLink(std::move(links.second));
+ }
+
+ link_from_new_local_peer_to_proxy->StopProxying(length_from_proxy_to_us,
+ length_to_proxy_from_us);
+
+ Flush();
+ new_local_peer->Flush();
+ return true;
}
} // namespace ipcz
diff --git a/src/ipcz/router.h b/src/ipcz/router.h
index 847622a..5ffd361 100644
--- a/src/ipcz/router.h
+++ b/src/ipcz/router.h
@@ -32,12 +32,15 @@
//
// Before a Router can participate in any actual routing, it must have an
// outward link to another Router (see SetOutwardLink()). To establish a locally
-// connected pair of Routers, pass both to LocalRouterLink::ConnectRouters(),
-// which internally calls SetOutwardLink() on both:
+// connected pair of Routers, pass both to LocalRouterLink::Create() and pass
+// each returned link to the coresponding router:
//
// Router::Pair routers = {MakeRefCounted<Router>(),
// MakeRefCounted<Router>()};
-// LocalRouterLink::ConnectRouters(LinkType::kCentral, routers);
+// RouterLink::Pair links =
+// LocalRouterLink::CreatePair(LinkType::kCentral, routers);
+// routers.first->SetOutwardLink(std::move(links.first));
+// routers.second->SetOutwardLink(std::move(links.second));
//
// Each ipcz Portal directly controls a terminal Router along its route, and
// all routes stabilize to eventually consist of only two interconnected
@@ -59,11 +62,18 @@
// retrieved.
bool IsRouteDead();
+ // Indicates whether this Router is currently on a central link which is
+ // connected to a router on another node. Used by tests to verify route
+ // reduction behavior, and may only be called on terminal Routers, i.e.
+ // Routers controlled directly by a Portal.
+ bool IsOnCentralRemoteLink();
+
// Fills in an IpczPortalStatus corresponding to the current state of this
// Router.
void QueryStatus(IpczPortalStatus& status);
- // Returns true iff this is a LocalRouterLink whose peer router is `router`.
+ // Returns true iff this Router's outward link is a LocalRouterLink between
+ // `this` and `router`.
bool HasLocalPeer(Router& router);
// Attempts to send an outbound parcel originating from this Router. Called
@@ -162,19 +172,36 @@
// bypass is completed immediately by establishing a new LocalRotuerLink
// between the two routers. In this case a StopProxying message is sent back
// to the requestor in order to finalize the bypass.
+ //
+ // Returns true if the BypassPeer() request was valid, or false if it was
+ // invalid. Note that a return value of true does not necessarily imply that
+ // bypass was or will be successful (e.g. it may silently fail due to lost
+ // node connections).
bool BypassPeer(RemoteRouterLink& requestor,
const NodeName& bypass_target_node,
SublinkId bypass_target_sublink);
// Begins decaying this router's outward link and replaces it with a new link
- // over `new_node_link` via `new_sublink`, and using (optional)
- // `new_link_state` for its shared state.
+ // over `new_node_link` via `new_sublink`, and using `new_link_state` for its
+ // shared state.
//
// `inbound_sequence_length_from_bypassed_link` conveys the final length of
// sequence of inbound parcels to expect over the decaying link from the peer.
// See comments on the BypassPeer definition in node_messages_generator.h.
+ //
+ // Returns true if the request was valid, or false if it was invalid. An
+ // invalid request implies that a remote node tried to do something bad and
+ // should be disconnected ASAP. Note that a return value of true does not
+ // necessarily imply that the bypass link was accepted, as it may be
+ // silently discarded if other links have been disconnected already.
+ //
+ // If `new_node_link` links to a remote node which differs from that of this
+ // router's current outward link, the current outward link must have already
+ // been configured to accept replacement by the new remote node via its
+ // RouterLinkState's `allowed_bypass_request_source` field. This method
+ // authenticates the request accordingly.
bool AcceptBypassLink(
- Ref<NodeLink> new_node_link,
+ NodeLink& new_node_link,
SublinkId new_sublink,
FragmentRef<RouterLinkState> new_link_state,
SequenceNumber inbound_sequence_length_from_bypassed_link);
@@ -183,6 +210,9 @@
// decaying links. Once these lengths are set and sequences have progressed
// to the specified length in each direction, those decaying links -- and
// eventually the router itself -- are dropped.
+ //
+ // Returns true if and only if this router is a proxy with decaying inward and
+ // outward links. Otherwise returns false, indicating an invalid request.
bool StopProxying(SequenceNumber inbound_sequence_length,
SequenceNumber outbound_sequence_length);
@@ -190,23 +220,18 @@
// this router's decaying outward link. Once this length is set and the
// decaying link has forwarded the full sequence of parcels up to this limit,
// the decaying link can be dropped.
- bool NotifyProxyWillStop(SequenceNumber inbound_sequence_length);
-
- // Begins decaying this router's outward link and replaces it with a new link
- // using `new_sublink` over `from_node_link`, the node issuing this request.
- // `new_link_state` if non-null specifies the shared memory location of the
- // RouterLinkState for this link.
//
- // `inbound_sequence_length` conveys the final length of the sequence of
- // inbound parcels to expect over the decaying link.
- bool BypassPeerWithLink(NodeLink& from_node_link,
- SublinkId new_sublink,
- FragmentRef<RouterLinkState> new_link_state,
- SequenceNumber inbound_sequence_length);
+ // Returns true if this router has a decaying outward link -- implying that
+ // its outward peer is a proxy -- or the router has been disconnected.
+ // Otherwise the request is invalid and this returns false.
+ bool NotifyProxyWillStop(SequenceNumber inbound_sequence_length);
// Configures the final sequence length of outbound parcels to expect on this
// proxying Router's decaying inward link. Once this is set and the decaying
// link has received the full sequence of parcels, the link can be dropped.
+ //
+ // Returns true if the request is valid, meaning that this Router is a proxy
+ // whose outward peer is local to the same node. Otherwise this returns false.
bool StopProxyingToLocalPeer(SequenceNumber outbound_sequence_length);
// Notifies this Router that one of its links has been disconnected from a
@@ -234,7 +259,17 @@
//
// A safe way to ensure that is for RouterLink implementations to only call
// into Router using a reference held on the calling stack.
- void Flush();
+ //
+ // The specified FlushBehavior determines whether the Flush() operation will
+ // unconditionally attempt to initiate bypass of this Router or its outward
+ // peer after performing all other flushing operations. By default, bypass
+ // progress is only attempted if the flush iteslf resulted in an unstable
+ // central link becoming potentially stable. But various operations which
+ // invoke Flush() may also elicit state changes that can unblock a bypass
+ // operation. These operatoins may specify kForceProxyBypassAttempt in such
+ // cases.
+ enum FlushBehavior { kDefault, kForceProxyBypassAttempt };
+ void Flush(FlushBehavior behavior = kDefault);
private:
~Router() override;
@@ -248,6 +283,38 @@
// unblock our bypass.
bool MaybeStartSelfBypass();
+ // Starts bypass of this Router when its outward peer lives on the same node.
+ // This must only be called once the central link is already locked. If
+ // `new_link_state` is non-null, it will be used for the RouterLinkState of
+ // the new RemoteRouterLink between this Routers inward and outward peers.
+ // Otherwise one will be allocated asynchronously before proceeding.
+ //
+ // Returns true if and only if self-bypass has been initiated by reaching out
+ // to this router's inward peer with with a BypassPeer() or
+ // BypassPeerWithLink() request. Otherwise returns false.
+ bool StartSelfBypassToLocalPeer(Router& local_outward_peer,
+ RemoteRouterLink& inward_link,
+ FragmentRef<RouterLinkState> new_link_state);
+
+ // Attempts to bypass the link identified by `requestor` in favor of a new
+ // link that runs over `node_link`. If `new_link_state` is non-null, it will
+ // be used for the RouterLinkState of the new RemoteRouterLink; otherwise one
+ // will be allocated asynchronously before proceeding.
+ //
+ // Returns true if and only if this request was valid.
+ bool BypassPeerWithNewRemoteLink(RemoteRouterLink& requestor,
+ NodeLink& node_link,
+ SublinkId bypass_target_sublink,
+ FragmentRef<RouterLinkState> new_link_state);
+
+ // Attempts to bypass the link identified by `requestor` in favor of a new
+ // LocalRouterLink to a Router bound to `bypass_target_sublink` on the same
+ // NodeLink as `requestor`.
+ //
+ // Returns true if and only if this request was valid.
+ bool BypassPeerWithNewLocalLink(RemoteRouterLink& requestor,
+ SublinkId bypass_target_sublink);
+
absl::Mutex mutex_;
// The current computed portal status to be reflected by a portal controlling
diff --git a/src/ipcz/router_link.h b/src/ipcz/router_link.h
index 59e13ee..b6fa153 100644
--- a/src/ipcz/router_link.h
+++ b/src/ipcz/router_link.h
@@ -39,9 +39,13 @@
// returns null.
virtual RouterLinkState* GetLinkState() const = 0;
- // Returns true if this is a LocalRouterLink and the Router on the other side
- // of the link is `router`.
- virtual bool HasLocalPeer(const Router& router) = 0;
+ // Returns the Router on the other end of this link, if this is a
+ // LocalRouterLink. Otherwise returns null.
+ virtual Ref<Router> GetLocalPeer() = 0;
+
+ // If this is a RemoteRouterLink, returns a downcast reference to it.
+ // Otherwise returns null.
+ virtual RemoteRouterLink* AsRemoteRouterLink() = 0;
// Passes a parcel to the Router on the other side of this link to be queued
// and/or router further.
diff --git a/src/ipcz/router_link_test.cc b/src/ipcz/router_link_test.cc
index 77336c9..d4ee861 100644
--- a/src/ipcz/router_link_test.cc
+++ b/src/ipcz/router_link_test.cc
@@ -128,7 +128,9 @@
switch (GetParam()) {
case RouterLinkTestMode::kLocal:
std::tie(a_link_, b_link_) =
- LocalRouterLink::ConnectRouters(LinkType::kCentral, {a_, b_});
+ LocalRouterLink::CreatePair(LinkType::kCentral, {a_, b_});
+ a_->SetOutwardLink(a_link_);
+ b_->SetOutwardLink(b_link_);
break;
case RouterLinkTestMode::kRemote: {
@@ -169,7 +171,7 @@
};
TEST_P(RouterLinkTest, Locking) {
- EXPECT_EQ(RouterLinkState::kUnstable, link_status());
+ link_state().status = RouterLinkState::kUnstable;
// No locking can take place until both sides are marked stable.
EXPECT_FALSE(a_link().TryLockForBypass(kTestPeer1Name));
@@ -217,6 +219,8 @@
}
TEST_P(RouterLinkTest, FlushOtherSideIfWaiting) {
+ link_state().status = RouterLinkState::kUnstable;
+
// FlushOtherSideIfWaiting() does nothing if the other side is not, in fact,
// waiting for something.
EXPECT_FALSE(a_link().FlushOtherSideIfWaiting());
diff --git a/src/remote_portal_test.cc b/src/remote_portal_test.cc
index 2d41f4f..1611b9e 100644
--- a/src/remote_portal_test.cc
+++ b/src/remote_portal_test.cc
@@ -49,6 +49,7 @@
EXPECT_NE(IPCZ_INVALID_HANDLE, p);
VerifyEndToEnd(p);
+ WaitForDirectRemoteLink(p);
CloseAll({p, b});
}
@@ -59,6 +60,7 @@
EXPECT_EQ(IPCZ_RESULT_OK, Put(c, kTestMessage1, {&p, 1}));
VerifyEndToEnd(q);
+ WaitForDirectRemoteLink(q);
CloseAll({q, c});
}
@@ -80,6 +82,7 @@
EXPECT_EQ(kTestMessage1, message);
}
+ WaitForDirectRemoteLink(q);
PingPong(b);
CloseAll({q, b});
}
@@ -100,6 +103,7 @@
EXPECT_EQ(kTestMessage2, message);
}
+ WaitForDirectRemoteLink(p);
PingPong(b);
CloseAll({p, b});
}
@@ -117,7 +121,7 @@
CloseAll({c1, c2});
}
-constexpr size_t kTransferBackAndForthNumIterations = 1;
+constexpr size_t kTransferBackAndForthNumIterations = 100;
MULTINODE_TEST_NODE(RemotePortalTestNode, TransferBackAndForthClient) {
IpczHandle b = ConnectToBroker();
@@ -149,6 +153,7 @@
}
VerifyEndToEndLocal(q, p);
+ WaitForDirectLocalLink(q, p);
CloseAll({q, p, c});
}
diff --git a/src/test/test_base.cc b/src/test/test_base.cc
index f10e0ff..de9e098 100644
--- a/src/test/test_base.cc
+++ b/src/test/test_base.cc
@@ -4,10 +4,16 @@
#include "test/test_base.h"
+#include <chrono>
+#include <thread>
+
#include "api.h"
#include "ipcz/ipcz.h"
+#include "ipcz/portal.h"
+#include "ipcz/router.h"
#include "testing/gtest/include/gtest/gtest.h"
#include "third_party/abseil-cpp/absl/synchronization/notification.h"
+#include "util/ref_counted.h"
namespace ipcz::test::internal {
@@ -187,6 +193,31 @@
EXPECT_EQ(kMessage1, message);
}
+void TestBase::WaitForDirectRemoteLink(IpczHandle portal) {
+ const Ref<Router> router = Portal::FromHandle(portal)->router();
+ while (!router->IsOnCentralRemoteLink()) {
+ using namespace std::chrono_literals;
+ std::this_thread::sleep_for(8ms);
+ }
+
+ const std::string kMessage = "very direct wow";
+ EXPECT_EQ(IPCZ_RESULT_OK, Put(portal, kMessage));
+
+ std::string message;
+ EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(portal, &message));
+ EXPECT_EQ(kMessage, message);
+}
+
+void TestBase::WaitForDirectLocalLink(IpczHandle a, IpczHandle b) {
+ const Ref<Router> router_a = Portal::FromHandle(a)->router();
+ const Ref<Router> router_b = Portal::FromHandle(b)->router();
+ while (!router_a->HasLocalPeer(*router_b) &&
+ !router_b->HasLocalPeer(*router_a)) {
+ using namespace std::chrono_literals;
+ std::this_thread::sleep_for(8ms);
+ }
+}
+
void TestBase::HandleEvent(const IpczTrapEvent* event) {
auto handler =
absl::WrapUnique(reinterpret_cast<TrapEventHandler*>(event->context));
diff --git a/src/test/test_base.h b/src/test/test_base.h
index f9e4a08..cdcad57 100644
--- a/src/test/test_base.h
+++ b/src/test/test_base.h
@@ -101,6 +101,16 @@
// both `a` and `b`, and then this waits to read the same message from both.
void VerifyEndToEndLocal(IpczHandle a, IpczHandle b);
+ // Waits until `portal` is backed by a Router which is connected directly to
+ // its peer portal's Router on another node, with no proxies in between. Must
+ // be called on each portal of the portal pair in order to properly verify a
+ // direct route end-to-end.
+ void WaitForDirectRemoteLink(IpczHandle portal);
+
+ // Waits for portals `a` and `b` to become direct local peers, after any
+ // potential proxies in between are eliminated.
+ void WaitForDirectLocalLink(IpczHandle a, IpczHandle b);
+
private:
static void HandleEvent(const IpczTrapEvent* event);