ipcz: Dynamic RouterLinkState allocation
With this change, RemoteRouterLink may now attempt to dynamically
allocate its own shared RouterLinkState when one is not provided at
construction time.
RemoteRouterLinks also support construction over a pending
RouterLinkState fragment which can be resolved asynchronously
once the underlying buffer becomes available.
The net result of these changes is that every central RouterLink
will eventually be assigned a valid RouterLinkState which it can
use to coordinate communication and some route operations
(e.g. closure and proxy bypass.)
Bug: 1299283
Change-Id: I4cb76d4c8ad17d91d70a901f9416b440fc0d8d8f
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3756354
Reviewed-by: Robert Sesek <rsesek@chromium.org>
Commit-Queue: Ken Rockot <rockot@google.com>
Cr-Commit-Position: refs/heads/main@{#1024402}
NOKEYCHECK=True
GitOrigin-RevId: f0a2302e16ecab71e7bd0ce6bf9de79f8e3ad897
diff --git a/src/ipcz/node_link.cc b/src/ipcz/node_link.cc
index fe89b0b..7e08c64 100644
--- a/src/ipcz/node_link.cc
+++ b/src/ipcz/node_link.cc
@@ -266,6 +266,19 @@
sublink->router_link->GetType(), route_closed.params().sequence_length);
}
+bool NodeLink::OnSetRouterLinkState(msg::SetRouterLinkState& set) {
+ if (set.params().descriptor.is_null()) {
+ return false;
+ }
+
+ if (absl::optional<Sublink> sublink = GetSublink(set.params().sublink)) {
+ auto fragment = memory().GetFragment(set.params().descriptor);
+ sublink->router_link->SetLinkState(
+ memory().AdoptFragmentRef<RouterLinkState>(fragment));
+ }
+ return true;
+}
+
bool NodeLink::OnFlushRouter(msg::FlushRouter& flush) {
if (Ref<Router> router = GetRouter(flush.params().sublink)) {
router->Flush();
diff --git a/src/ipcz/node_link.h b/src/ipcz/node_link.h
index 54fad52..d7d2292 100644
--- a/src/ipcz/node_link.h
+++ b/src/ipcz/node_link.h
@@ -135,6 +135,7 @@
bool OnAddBlockBuffer(msg::AddBlockBuffer& add) override;
bool OnAcceptParcel(msg::AcceptParcel& accept) override;
bool OnRouteClosed(msg::RouteClosed& route_closed) override;
+ bool OnSetRouterLinkState(msg::SetRouterLinkState& set) override;
bool OnFlushRouter(msg::FlushRouter& flush) override;
void OnTransportError() override;
diff --git a/src/ipcz/node_link_memory.cc b/src/ipcz/node_link_memory.cc
index 74f7358..6bf92d0 100644
--- a/src/ipcz/node_link_memory.cc
+++ b/src/ipcz/node_link_memory.cc
@@ -282,6 +282,39 @@
return buffer_pool_.FreeBlock(fragment);
}
+FragmentRef<RouterLinkState> NodeLinkMemory::TryAllocateRouterLinkState() {
+ Fragment fragment = buffer_pool_.AllocateBlock(sizeof(RouterLinkState));
+ if (!fragment.is_null()) {
+ return InitializeRouterLinkStateFragment(fragment);
+ }
+
+ // Unlike with the more generic AllocateFragment(), we unconditionally lobby
+ // for additional capacity when RouterLinkState allocation fails.
+ RequestBlockCapacity(sizeof(RouterLinkState), [](bool ok) {});
+ return {};
+}
+
+void NodeLinkMemory::AllocateRouterLinkState(RouterLinkStateCallback callback) {
+ Fragment fragment = buffer_pool_.AllocateBlock(sizeof(RouterLinkState));
+ if (!fragment.is_null()) {
+ callback(InitializeRouterLinkStateFragment(fragment));
+ return;
+ }
+
+ RequestBlockCapacity(sizeof(RouterLinkState),
+ [memory = WrapRefCounted(this),
+ callback = std::move(callback)](bool ok) mutable {
+ if (!ok) {
+ DLOG(ERROR)
+ << "Could not allocate a new RouterLinkState";
+ callback({});
+ return;
+ }
+
+ memory->AllocateRouterLinkState(std::move(callback));
+ });
+}
+
void NodeLinkMemory::WaitForBufferAsync(
BufferId id,
BufferPool::WaitForBufferCallback callback) {
@@ -368,4 +401,13 @@
}
}
+FragmentRef<RouterLinkState> NodeLinkMemory::InitializeRouterLinkStateFragment(
+ const Fragment& fragment) {
+ ABSL_ASSERT(!fragment.is_null());
+ FragmentRef<RouterLinkState> ref =
+ AdoptFragmentRef<RouterLinkState>(fragment);
+ RouterLinkState::Initialize(ref.get());
+ return ref;
+}
+
} // namespace ipcz
diff --git a/src/ipcz/node_link_memory.h b/src/ipcz/node_link_memory.h
index 0390719..be6770b 100644
--- a/src/ipcz/node_link_memory.h
+++ b/src/ipcz/node_link_memory.h
@@ -17,6 +17,7 @@
#include "ipcz/fragment_descriptor.h"
#include "ipcz/fragment_ref.h"
#include "ipcz/ipcz.h"
+#include "ipcz/ref_counted_fragment.h"
#include "ipcz/router_link_state.h"
#include "ipcz/sublink_id.h"
#include "third_party/abseil-cpp/absl/container/flat_hash_map.h"
@@ -101,6 +102,15 @@
// with the same BufferId and dimensions as `descriptor`.
Fragment GetFragment(const FragmentDescriptor& descriptor);
+ // Adopts an existing reference to a RefCountedFragment within `fragment`.
+ // This does NOT increment the ref count of the RefCountedFragment.
+ template <typename T>
+ FragmentRef<T> AdoptFragmentRef(const Fragment& fragment) {
+ ABSL_ASSERT(sizeof(T) <= fragment.size());
+ return FragmentRef<T>(RefCountedFragment::kAdoptExistingRef,
+ WrapRefCounted(this), fragment);
+ }
+
// Adds a new buffer to the underlying BufferPool to use as additional
// allocation capacity for blocks of size `block_size`. Note that the
// contents of the mapped region must already be initialized as a
@@ -118,6 +128,21 @@
// allocated fragment within this NodeLinkMemory.
bool FreeFragment(const Fragment& fragment);
+ // Allocates a fragment to store a new RouterLinkState and initializes a new
+ // RouterLinkState instance there. If no capacity is currently available to
+ // allocate an appropriate fragment, this may return null.
+ FragmentRef<RouterLinkState> TryAllocateRouterLinkState();
+
+ // Allocates a fragment to store a new RouterLinkState and initializes a new
+ // RouterLinkState instance there. Calls `callback` with a reference to the
+ // new fragment once allocated. Unlike TryAllocateRouterLinkState(), this
+ // allocation always succeeds eventually unless driver memory allocation
+ // itself begins to fail unrecoverably. If the allocation can succeed
+ // synchronously, `callback` may be called before this method returns.
+ using RouterLinkStateCallback =
+ std::function<void(FragmentRef<RouterLinkState>)>;
+ void AllocateRouterLinkState(RouterLinkStateCallback callback);
+
// Runs `callback` as soon as the identified buffer is added to the underlying
// BufferPool. If the buffer is already present here, `callback` is run
// immediately.
@@ -142,6 +167,10 @@
RequestBlockCapacityCallback callback);
void OnCapacityRequestComplete(size_t block_size, bool success);
+ // Initializes `fragment` as a new RouterLinkState and returns a ref to it.
+ FragmentRef<RouterLinkState> InitializeRouterLinkStateFragment(
+ const Fragment& fragment);
+
const Ref<Node> node_;
// The underlying BufferPool. Note that this object is itself thread-safe, so
diff --git a/src/ipcz/node_messages_generator.h b/src/ipcz/node_messages_generator.h
index 5f4d2e0..6cdbf72 100644
--- a/src/ipcz/node_messages_generator.h
+++ b/src/ipcz/node_messages_generator.h
@@ -117,6 +117,14 @@
IPCZ_MSG_PARAM(SequenceNumber, sequence_length)
IPCZ_MSG_END()
+// Notifies a node that the Router it has bound to `sublink` (on the
+// transmitting NodeLink) now has an allocated RouterLinkState in the fragment
+// identified by `descriptor`.
+IPCZ_MSG_BEGIN(SetRouterLinkState, IPCZ_MSG_ID(23), IPCZ_MSG_VERSION(0))
+ IPCZ_MSG_PARAM(SublinkId, sublink)
+ IPCZ_MSG_PARAM(FragmentDescriptor, descriptor)
+IPCZ_MSG_END()
+
// Hints to the target router that it should flush its state. Generally sent to
// catalyze route reduction or elicit some other state change which was blocked
// on some other work being done first by the sender of this message.
diff --git a/src/ipcz/remote_router_link.cc b/src/ipcz/remote_router_link.cc
index ec47513..6fba386 100644
--- a/src/ipcz/remote_router_link.cc
+++ b/src/ipcz/remote_router_link.cc
@@ -9,6 +9,7 @@
#include "ipcz/box.h"
#include "ipcz/node_link.h"
+#include "ipcz/node_link_memory.h"
#include "ipcz/node_messages.h"
#include "ipcz/portal.h"
#include "ipcz/router.h"
@@ -24,9 +25,11 @@
: node_link_(std::move(node_link)),
sublink_(sublink),
type_(type),
- side_(side),
- link_state_(std::move(link_state)) {
- ABSL_ASSERT(link_state_.is_null() || link_state_.is_addressable());
+ side_(side) {
+ ABSL_ASSERT(type.is_central() || link_state.is_null());
+ if (type.is_central()) {
+ SetLinkState(std::move(link_state));
+ }
}
RemoteRouterLink::~RemoteRouterLink() = default;
@@ -42,12 +45,67 @@
std::move(link_state), type, side));
}
+void RemoteRouterLink::SetLinkState(FragmentRef<RouterLinkState> state) {
+ ABSL_ASSERT(type_.is_central());
+ if (state.is_null()) {
+ // By convention, if a central link has no RouterLinkState at construction
+ // time, side A is responsible for allocating a new one and sharing it with
+ // side B eventually. Side B lives with a null RouterLinkState until then.
+ if (side_.is_side_a()) {
+ AllocateAndShareLinkState();
+ }
+ return;
+ }
+
+ if (state.is_pending()) {
+ // By convention, side A should never be given a pending RouterLinkState
+ // fragment.
+ ABSL_ASSERT(side_.is_side_b());
+
+ // Side B on the other hand may obtain a RouterLinkState fragment which it
+ // can't address yet, and in this case, we wait for the fragment's buffer to
+ // be mapped locally.
+ Ref<NodeLinkMemory> memory = WrapRefCounted(&node_link()->memory());
+ FragmentDescriptor descriptor = state.fragment().descriptor();
+ memory->WaitForBufferAsync(
+ descriptor.buffer_id(),
+ [self = WrapRefCounted(this), memory, descriptor] {
+ auto fragment = memory->GetFragment(descriptor);
+ self->SetLinkState(
+ memory->AdoptFragmentRef<RouterLinkState>(fragment));
+ });
+ return;
+ }
+
+ ABSL_ASSERT(state.is_addressable());
+
+ // SetLinkState() must be called with an addressable fragment only once.
+ ABSL_ASSERT(link_state_.load(std::memory_order_acquire) == nullptr);
+
+ // The release when storing `link_state_` is balanced by an acquire in
+ // GetLinkState().
+ link_state_fragment_ = std::move(state);
+ link_state_.store(link_state_fragment_.get(), std::memory_order_release);
+
+ // If this side of the link was already marked stable before the
+ // RouterLinkState was available, `side_is_stable_` will be true. In that
+ // case, set the stable bit in RouterLinkState immediately. This may unblock
+ // some routing work. The acquire here is balanced by a release in
+ // MarkSideStable().
+ if (side_is_stable_.load(std::memory_order_acquire)) {
+ MarkSideStable();
+ }
+ if (Ref<Router> router = node_link()->GetRouter(sublink_)) {
+ router->Flush();
+ }
+}
+
LinkType RemoteRouterLink::GetType() const {
return type_;
}
RouterLinkState* RemoteRouterLink::GetLinkState() const {
- return link_state_.get();
+ return link_state_.load(std::memory_order_acquire);
}
bool RemoteRouterLink::HasLocalPeer(const Router& router) {
@@ -233,4 +291,21 @@
return ss.str();
}
+void RemoteRouterLink::AllocateAndShareLinkState() {
+ node_link()->memory().AllocateRouterLinkState(
+ [self = WrapRefCounted(this)](FragmentRef<RouterLinkState> state) {
+ if (state.is_null()) {
+ DLOG(ERROR) << "Unable to allocate RouterLinkState.";
+ return;
+ }
+ ABSL_ASSERT(state.is_addressable());
+ self->SetLinkState(state);
+
+ msg::SetRouterLinkState set;
+ set.params().sublink = self->sublink();
+ set.params().descriptor = state.release().descriptor();
+ self->node_link()->Transmit(set);
+ });
+}
+
} // namespace ipcz
diff --git a/src/ipcz/remote_router_link.h b/src/ipcz/remote_router_link.h
index 9967fa5..a5c3aeb 100644
--- a/src/ipcz/remote_router_link.h
+++ b/src/ipcz/remote_router_link.h
@@ -48,6 +48,23 @@
const Ref<NodeLink>& node_link() const { return node_link_; }
SublinkId sublink() const { return sublink_; }
+ // Sets this link's RouterLinkState.
+ //
+ // If `state` is null and this link is on side B, this call is a no-op. If
+ // `state` is null and this link is on side A, this call will kick off an
+ // asynchronous allocation of a new RouterLinkState. When that completes, the
+ // new state will be adopted by side A and shared with side B.
+ //
+ // If `state` references a pending fragment and this link is on side A, the
+ // call is a no-op. If `state` references a pending fragment and this link
+ // is on side B, this operation will be automatically deferred until the
+ // NodeLink acquires a mapping of the buffer referenced by `state` and the
+ // fragment can be resolved to an addressable one.
+ //
+ // Finally, if `state` references a valid, addressable fragment, it is
+ // adopted as-is.
+ void SetLinkState(FragmentRef<RouterLinkState> state);
+
// RouterLink:
LinkType GetType() const override;
RouterLinkState* GetLinkState() const override;
@@ -73,6 +90,8 @@
~RemoteRouterLink() override;
+ void AllocateAndShareLinkState();
+
const Ref<NodeLink> node_link_;
const SublinkId sublink_;
const LinkType type_;
@@ -87,7 +106,19 @@
// shared by both ends of this RouterLink. Always null for non-central links,
// and may be null for a central links if its RouterLinkState has not yet been
// allocated or shared.
- FragmentRef<RouterLinkState> link_state_;
+ //
+ // Must be set at most once and is only retained by this object to keep the
+ // fragment allocated. Access is unguarded and is restricted to
+ // SetLinkState(), and only allowed while `link_state_` below is still null.
+ // Any other access is unsafe. Use GetLinkState() to get a usable reference to
+ // the RouterLinkState instance.
+ FragmentRef<RouterLinkState> link_state_fragment_;
+
+ // Cached address of the shared RouterLinkState referenced by
+ // `link_state_fragment_`. Once this is set to a non-null value it retains
+ // that value indefinitely, so any non-null value loaded from this field is
+ // safe to dereference for the duration of the RemoteRouterLink's lifetime.
+ std::atomic<RouterLinkState*> link_state_{nullptr};
};
} // namespace ipcz
diff --git a/src/ipcz/router_link_test.cc b/src/ipcz/router_link_test.cc
index 3790412..38057ba 100644
--- a/src/ipcz/router_link_test.cc
+++ b/src/ipcz/router_link_test.cc
@@ -5,7 +5,10 @@
#include "ipcz/router_link.h"
#include <tuple>
+#include <utility>
+#include <vector>
+#include "ipcz/driver_memory.h"
#include "ipcz/driver_transport.h"
#include "ipcz/ipcz.h"
#include "ipcz/link_side.h"
@@ -36,6 +39,87 @@
constexpr NodeName kTestPeer1Name(3, 4);
constexpr NodeName kTestPeer2Name(4, 5);
+// A helper for the tests in this module, TestNodePair creates one broker node
+// and one non-broker node and interconnects them using the synchronous
+// reference driver. This class exposes the NodeLinkMemory on either end of the
+// connection and provides some additional facilities which tests can use to
+// poke at node and router state.
+class TestNodePair {
+ public:
+ TestNodePair() {
+ auto transports = DriverTransport::CreatePair(kTestDriver);
+ auto alloc = NodeLinkMemory::Allocate(node_a_);
+ node_link_a_ =
+ NodeLink::Create(node_a_, LinkSide::kA, kTestBrokerName,
+ kTestNonBrokerName, Node::Type::kNormal, 0,
+ transports.first, std::move(alloc.node_link_memory));
+ node_link_b_ = NodeLink::Create(
+ node_b_, LinkSide::kB, kTestNonBrokerName, kTestBrokerName,
+ Node::Type::kBroker, 0, transports.second,
+ NodeLinkMemory::Adopt(node_b_, std::move(alloc.primary_buffer_memory)));
+ node_a_->AddLink(kTestNonBrokerName, node_link_a_);
+ node_b_->AddLink(kTestBrokerName, node_link_b_);
+ }
+
+ ~TestNodePair() {
+ node_b_->Close();
+ node_a_->Close();
+ }
+
+ NodeLinkMemory& memory_a() const { return node_link_a_->memory(); }
+ NodeLinkMemory& memory_b() const { return node_link_b_->memory(); }
+
+ // Activates both of the test nodes' NodeLink transports. Tests can defer this
+ // activation as a means of deferring NodeLink communications in general.
+ void ActivateTransports() {
+ node_link_a_->transport()->Activate();
+ node_link_b_->transport()->Activate();
+ }
+
+ // Establishes new RemoteRouterLinks between `a` and `b`. Different initial
+ // RouterLinkState references may be provided for the link on either side in
+ // order to mimic various production scenarios.
+ RouterLink::Pair LinkRemoteRouters(Ref<Router> a,
+ FragmentRef<RouterLinkState> a_state,
+ Ref<Router> b,
+ FragmentRef<RouterLinkState> b_state) {
+ const SublinkId sublink = node_link_a_->memory().AllocateSublinkIds(1);
+ Ref<RemoteRouterLink> a_link = node_link_a_->AddRemoteRouterLink(
+ sublink, std::move(a_state), LinkType::kCentral, LinkSide::kA, a);
+ Ref<RemoteRouterLink> b_link = node_link_b_->AddRemoteRouterLink(
+ sublink, std::move(b_state), LinkType::kCentral, LinkSide::kB, b);
+ a->SetOutwardLink(a_link);
+ b->SetOutwardLink(b_link);
+ return {a_link, b_link};
+ }
+
+ // Depletes the available supply of RouterLinkState fragments and returns
+ // references to all of them. Note that one side effect of this call is that
+ // memory_a() will expand its RouterLinkState fragment capacity, so subsequent
+ // allocation requests will still succeed.
+ std::vector<FragmentRef<RouterLinkState>> AllocateAllRouterLinkStates() {
+ std::vector<FragmentRef<RouterLinkState>> fragments;
+ for (;;) {
+ FragmentRef<RouterLinkState> fragment =
+ memory_a().TryAllocateRouterLinkState();
+ if (fragment.is_null()) {
+ return fragments;
+ }
+ fragments.push_back(std::move(fragment));
+ }
+ }
+
+ private:
+ const Ref<Node> node_a_{MakeRefCounted<Node>(Node::Type::kBroker,
+ kTestDriver,
+ IPCZ_INVALID_DRIVER_HANDLE)};
+ const Ref<Node> node_b_{MakeRefCounted<Node>(Node::Type::kNormal,
+ kTestDriver,
+ IPCZ_INVALID_DRIVER_HANDLE)};
+ Ref<NodeLink> node_link_a_;
+ Ref<NodeLink> node_link_b_;
+};
+
class RouterLinkTest : public testing::Test,
public testing::WithParamInterface<RouterLinkTestMode> {
public:
@@ -47,48 +131,24 @@
break;
case RouterLinkTestMode::kRemote: {
- auto transports = DriverTransport::CreatePair(kTestDriver);
- auto alloc = NodeLinkMemory::Allocate(broker_);
- broker_node_link_ = NodeLink::Create(
- broker_, LinkSide::kA, kTestBrokerName, kTestNonBrokerName,
- Node::Type::kNormal, 0, transports.first,
- std::move(alloc.node_link_memory));
- non_broker_node_link_ = NodeLink::Create(
- non_broker_, LinkSide::kB, kTestNonBrokerName, kTestBrokerName,
- Node::Type::kBroker, 0, transports.second,
- NodeLinkMemory::Adopt(non_broker_,
- std::move(alloc.primary_buffer_memory)));
- broker_->AddLink(kTestNonBrokerName, broker_node_link_);
- non_broker_->AddLink(kTestBrokerName, non_broker_node_link_);
-
- auto fragment = broker_node_link_->memory().AllocateFragment(
- sizeof(RouterLinkState));
- auto link_state = FragmentRef<RouterLinkState>(
- RefCountedFragment::kAdoptExistingRef,
- WrapRefCounted(&broker_node_link_->memory()), fragment);
- RouterLinkState::Initialize(link_state.get());
- a_link_ = broker_node_link_->AddRemoteRouterLink(
- SublinkId{0}, link_state, LinkType::kCentral, LinkSide::kA, a_);
- b_link_ = non_broker_node_link_->AddRemoteRouterLink(
- SublinkId{0}, link_state, LinkType::kCentral, LinkSide::kB, b_);
- a_->SetOutwardLink(a_link_);
- b_->SetOutwardLink(b_link_);
-
- broker_node_link_->transport()->Activate();
- non_broker_node_link_->transport()->Activate();
+ auto link_state = nodes_.memory_a().TryAllocateRouterLinkState();
+ ABSL_ASSERT(link_state.is_addressable());
+ link_state_ = link_state.get();
+ std::tie(a_link_, b_link_) =
+ nodes_.LinkRemoteRouters(a_, link_state, b_, link_state);
break;
}
}
ASSERT_EQ(a_link_->GetLinkState(), b_link_->GetLinkState());
link_state_ = a_link_->GetLinkState();
+
+ nodes_.ActivateTransports();
}
void TearDown() override {
a_->CloseRoute();
b_->CloseRoute();
- broker_->Close();
- non_broker_->Close();
}
Router& a() { return *a_; }
@@ -99,15 +159,7 @@
RouterLinkState::Status link_status() { return link_state_->status; }
private:
- const Ref<Node> broker_{MakeRefCounted<Node>(Node::Type::kBroker,
- kTestDriver,
- IPCZ_INVALID_DRIVER_HANDLE)};
- const Ref<Node> non_broker_{MakeRefCounted<Node>(Node::Type::kNormal,
- kTestDriver,
- IPCZ_INVALID_DRIVER_HANDLE)};
- Ref<NodeLink> broker_node_link_;
- Ref<NodeLink> non_broker_node_link_;
-
+ TestNodePair nodes_;
const Ref<Router> a_{MakeRefCounted<Router>()};
const Ref<Router> b_{MakeRefCounted<Router>()};
Ref<RouterLink> a_link_;
@@ -186,6 +238,149 @@
EXPECT_EQ(RouterLinkState::kStable, link_status());
}
+class RemoteRouterLinkTest : public testing::Test {
+ public:
+ TestNodePair& nodes() { return nodes_; }
+
+ std::vector<Router::Pair> CreateTestRouterPairs(size_t n) {
+ std::vector<Router::Pair> pairs;
+ pairs.reserve(n);
+ for (size_t i = 0; i < n; ++i) {
+ pairs.emplace_back(MakeRefCounted<Router>(), MakeRefCounted<Router>());
+ }
+ return pairs;
+ }
+
+ void CloseRoutes(const std::vector<Router::Pair>& routers) {
+ for (const auto& pair : routers) {
+ pair.first->CloseRoute();
+ pair.second->CloseRoute();
+ }
+ }
+
+ BufferId GenerateBufferId() {
+ return nodes().memory_a().AllocateNewBufferId();
+ }
+
+ private:
+ TestNodePair nodes_;
+};
+
+TEST_F(RemoteRouterLinkTest, NewLinkWithAddressableState) {
+ nodes().ActivateTransports();
+
+ std::vector<FragmentRef<RouterLinkState>> fragments =
+ nodes().AllocateAllRouterLinkStates();
+ std::vector<Router::Pair> router_pairs =
+ CreateTestRouterPairs(fragments.size());
+ std::vector<RouterLink::Pair> links;
+ for (size_t i = 0; i < fragments.size(); ++i) {
+ auto [a, b] = router_pairs[i];
+ auto [a_link, b_link] =
+ nodes().LinkRemoteRouters(a, fragments[i], b, fragments[i]);
+ a_link->MarkSideStable();
+ b_link->MarkSideStable();
+ links.emplace_back(std::move(a_link), std::move(b_link));
+ }
+
+ // We should be able to lock all links from either side, implying that both
+ // sides have a valid reference to the same RouterLinkState.
+ for (const auto& [a_link, b_link] : links) {
+ EXPECT_TRUE(a_link->TryLockForClosure());
+ a_link->Unlock();
+ EXPECT_TRUE(b_link->TryLockForClosure());
+ }
+
+ CloseRoutes(router_pairs);
+}
+
+TEST_F(RemoteRouterLinkTest, NewLinkWithPendingState) {
+ // Occupy all fragments in the primary buffer so they aren't usable.
+ std::vector<FragmentRef<RouterLinkState>> unused_fragments =
+ nodes().AllocateAllRouterLinkStates();
+
+ // Now allocate another batch of fragments which must be in a newly allocated
+ // buffer on node A. Because the nodes' transports are not active yet, there
+ // is no way for node B to have had this buffer shared with it yet. Hence all
+ // of these fragments will be seen as pending on node B.
+ std::vector<FragmentRef<RouterLinkState>> fragments =
+ nodes().AllocateAllRouterLinkStates();
+
+ std::vector<Router::Pair> router_pairs =
+ CreateTestRouterPairs(fragments.size());
+ std::vector<RouterLink::Pair> links;
+ for (size_t i = 0; i < fragments.size(); ++i) {
+ auto [a, b] = router_pairs[i];
+ auto a_state = fragments[i];
+ auto b_fragment =
+ nodes().memory_b().GetFragment(fragments[i].fragment().descriptor());
+ auto b_state =
+ nodes().memory_b().AdoptFragmentRef<RouterLinkState>(b_fragment);
+ ASSERT_TRUE(a_state.is_addressable());
+ ASSERT_TRUE(b_state.is_pending());
+ auto [a_link, b_link] =
+ nodes().LinkRemoteRouters(a, std::move(a_state), b, std::move(b_state));
+ a_link->MarkSideStable();
+ b_link->MarkSideStable();
+ links.emplace_back(std::move(a_link), std::move(b_link));
+ }
+
+ // Because side B of these links still cannot resolve its RouterLinkState,
+ // the link still cannot be stabilized or locked yet.
+ for (const auto& [a_link, b_link] : links) {
+ EXPECT_FALSE(a_link->TryLockForClosure());
+ EXPECT_FALSE(b_link->TryLockForClosure());
+ }
+
+ // We're using the synchronous driver, so as soon as we activate our
+ // transports, all pending NodeLink communications will complete before this
+ // call returns. This also means side B of each link will resolve its
+ // RouterLinkState.
+ nodes().ActivateTransports();
+
+ // Now all links should be lockable from either side, implying that both
+ // sides have a valid reference to the same RouterLinkState.
+ for (const auto& [a_link, b_link] : links) {
+ EXPECT_TRUE(a_link->TryLockForClosure());
+ a_link->Unlock();
+ EXPECT_TRUE(b_link->TryLockForClosure());
+ }
+
+ CloseRoutes(router_pairs);
+}
+
+TEST_F(RemoteRouterLinkTest, NewLinkWithNullState) {
+ // Occupy all fragments in the primary buffer so they aren't usable.
+ std::vector<FragmentRef<RouterLinkState>> unused_fragments =
+ nodes().AllocateAllRouterLinkStates();
+
+ constexpr size_t kNumIterations = 15000;
+ std::vector<Router::Pair> router_pairs =
+ CreateTestRouterPairs(kNumIterations);
+ std::vector<RouterLink::Pair> links;
+ for (size_t i = 0; i < kNumIterations; ++i) {
+ auto [a, b] = router_pairs[i];
+ auto [a_link, b_link] = nodes().LinkRemoteRouters(a, nullptr, b, nullptr);
+ a_link->MarkSideStable();
+ b_link->MarkSideStable();
+ EXPECT_FALSE(a_link->TryLockForClosure());
+ EXPECT_FALSE(b_link->TryLockForClosure());
+ links.emplace_back(std::move(a_link), std::move(b_link));
+ }
+
+ // Since we're using the synchronous driver, by the time transport activation
+ // completes, side A of each link must have already dynamically allocated a
+ // RouterLinkState and shared it with side B.
+ nodes().ActivateTransports();
+ for (const auto& [a_link, b_link] : links) {
+ EXPECT_TRUE(a_link->TryLockForClosure());
+ a_link->Unlock();
+ EXPECT_TRUE(b_link->TryLockForClosure());
+ }
+
+ CloseRoutes(router_pairs);
+}
+
INSTANTIATE_TEST_SUITE_P(,
RouterLinkTest,
::testing::Values(RouterLinkTestMode::kLocal,