ipcz: Dynamic RouterLinkState allocation

With this change, RemoteRouterLink may now attempt to dynamically
allocate its own shared RouterLinkState when one is not provided at
construction time.

RemoteRouterLinks also support construction over a pending
RouterLinkState fragment which can be resolved asynchronously
once the underlying buffer becomes available.

The net result of these changes is that every central RouterLink
will eventually be assigned a valid RouterLinkState which it can
use to coordinate communication and some route operations
(e.g. closure and proxy bypass.)

Bug: 1299283
Change-Id: I4cb76d4c8ad17d91d70a901f9416b440fc0d8d8f
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3756354
Reviewed-by: Robert Sesek <rsesek@chromium.org>
Commit-Queue: Ken Rockot <rockot@google.com>
Cr-Commit-Position: refs/heads/main@{#1024402}
NOKEYCHECK=True
GitOrigin-RevId: f0a2302e16ecab71e7bd0ce6bf9de79f8e3ad897
diff --git a/src/ipcz/node_link.cc b/src/ipcz/node_link.cc
index fe89b0b..7e08c64 100644
--- a/src/ipcz/node_link.cc
+++ b/src/ipcz/node_link.cc
@@ -266,6 +266,19 @@
       sublink->router_link->GetType(), route_closed.params().sequence_length);
 }
 
+bool NodeLink::OnSetRouterLinkState(msg::SetRouterLinkState& set) {
+  if (set.params().descriptor.is_null()) {
+    return false;
+  }
+
+  if (absl::optional<Sublink> sublink = GetSublink(set.params().sublink)) {
+    auto fragment = memory().GetFragment(set.params().descriptor);
+    sublink->router_link->SetLinkState(
+        memory().AdoptFragmentRef<RouterLinkState>(fragment));
+  }
+  return true;
+}
+
 bool NodeLink::OnFlushRouter(msg::FlushRouter& flush) {
   if (Ref<Router> router = GetRouter(flush.params().sublink)) {
     router->Flush();
diff --git a/src/ipcz/node_link.h b/src/ipcz/node_link.h
index 54fad52..d7d2292 100644
--- a/src/ipcz/node_link.h
+++ b/src/ipcz/node_link.h
@@ -135,6 +135,7 @@
   bool OnAddBlockBuffer(msg::AddBlockBuffer& add) override;
   bool OnAcceptParcel(msg::AcceptParcel& accept) override;
   bool OnRouteClosed(msg::RouteClosed& route_closed) override;
+  bool OnSetRouterLinkState(msg::SetRouterLinkState& set) override;
   bool OnFlushRouter(msg::FlushRouter& flush) override;
   void OnTransportError() override;
 
diff --git a/src/ipcz/node_link_memory.cc b/src/ipcz/node_link_memory.cc
index 74f7358..6bf92d0 100644
--- a/src/ipcz/node_link_memory.cc
+++ b/src/ipcz/node_link_memory.cc
@@ -282,6 +282,39 @@
   return buffer_pool_.FreeBlock(fragment);
 }
 
+FragmentRef<RouterLinkState> NodeLinkMemory::TryAllocateRouterLinkState() {
+  Fragment fragment = buffer_pool_.AllocateBlock(sizeof(RouterLinkState));
+  if (!fragment.is_null()) {
+    return InitializeRouterLinkStateFragment(fragment);
+  }
+
+  // Unlike with the more generic AllocateFragment(), we unconditionally lobby
+  // for additional capacity when RouterLinkState allocation fails.
+  RequestBlockCapacity(sizeof(RouterLinkState), [](bool ok) {});
+  return {};
+}
+
+void NodeLinkMemory::AllocateRouterLinkState(RouterLinkStateCallback callback) {
+  Fragment fragment = buffer_pool_.AllocateBlock(sizeof(RouterLinkState));
+  if (!fragment.is_null()) {
+    callback(InitializeRouterLinkStateFragment(fragment));
+    return;
+  }
+
+  RequestBlockCapacity(sizeof(RouterLinkState),
+                       [memory = WrapRefCounted(this),
+                        callback = std::move(callback)](bool ok) mutable {
+                         if (!ok) {
+                           DLOG(ERROR)
+                               << "Could not allocate a new RouterLinkState";
+                           callback({});
+                           return;
+                         }
+
+                         memory->AllocateRouterLinkState(std::move(callback));
+                       });
+}
+
 void NodeLinkMemory::WaitForBufferAsync(
     BufferId id,
     BufferPool::WaitForBufferCallback callback) {
@@ -368,4 +401,13 @@
   }
 }
 
+FragmentRef<RouterLinkState> NodeLinkMemory::InitializeRouterLinkStateFragment(
+    const Fragment& fragment) {
+  ABSL_ASSERT(!fragment.is_null());
+  FragmentRef<RouterLinkState> ref =
+      AdoptFragmentRef<RouterLinkState>(fragment);
+  RouterLinkState::Initialize(ref.get());
+  return ref;
+}
+
 }  // namespace ipcz
diff --git a/src/ipcz/node_link_memory.h b/src/ipcz/node_link_memory.h
index 0390719..be6770b 100644
--- a/src/ipcz/node_link_memory.h
+++ b/src/ipcz/node_link_memory.h
@@ -17,6 +17,7 @@
 #include "ipcz/fragment_descriptor.h"
 #include "ipcz/fragment_ref.h"
 #include "ipcz/ipcz.h"
+#include "ipcz/ref_counted_fragment.h"
 #include "ipcz/router_link_state.h"
 #include "ipcz/sublink_id.h"
 #include "third_party/abseil-cpp/absl/container/flat_hash_map.h"
@@ -101,6 +102,15 @@
   // with the same BufferId and dimensions as `descriptor`.
   Fragment GetFragment(const FragmentDescriptor& descriptor);
 
+  // Adopts an existing reference to a RefCountedFragment within `fragment`.
+  // This does NOT increment the ref count of the RefCountedFragment.
+  template <typename T>
+  FragmentRef<T> AdoptFragmentRef(const Fragment& fragment) {
+    ABSL_ASSERT(sizeof(T) <= fragment.size());
+    return FragmentRef<T>(RefCountedFragment::kAdoptExistingRef,
+                          WrapRefCounted(this), fragment);
+  }
+
   // Adds a new buffer to the underlying BufferPool to use as additional
   // allocation capacity for blocks of size `block_size`. Note that the
   // contents of the mapped region must already be initialized as a
@@ -118,6 +128,21 @@
   // allocated fragment within this NodeLinkMemory.
   bool FreeFragment(const Fragment& fragment);
 
+  // Allocates a fragment to store a new RouterLinkState and initializes a new
+  // RouterLinkState instance there. If no capacity is currently available to
+  // allocate an appropriate fragment, this may return null.
+  FragmentRef<RouterLinkState> TryAllocateRouterLinkState();
+
+  // Allocates a fragment to store a new RouterLinkState and initializes a new
+  // RouterLinkState instance there. Calls `callback` with a reference to the
+  // new fragment once allocated. Unlike TryAllocateRouterLinkState(), this
+  // allocation always succeeds eventually unless driver memory allocation
+  // itself begins to fail unrecoverably. If the allocation can succeed
+  // synchronously, `callback` may be called before this method returns.
+  using RouterLinkStateCallback =
+      std::function<void(FragmentRef<RouterLinkState>)>;
+  void AllocateRouterLinkState(RouterLinkStateCallback callback);
+
   // Runs `callback` as soon as the identified buffer is added to the underlying
   // BufferPool. If the buffer is already present here, `callback` is run
   // immediately.
@@ -142,6 +167,10 @@
                             RequestBlockCapacityCallback callback);
   void OnCapacityRequestComplete(size_t block_size, bool success);
 
+  // Initializes `fragment` as a new RouterLinkState and returns a ref to it.
+  FragmentRef<RouterLinkState> InitializeRouterLinkStateFragment(
+      const Fragment& fragment);
+
   const Ref<Node> node_;
 
   // The underlying BufferPool. Note that this object is itself thread-safe, so
diff --git a/src/ipcz/node_messages_generator.h b/src/ipcz/node_messages_generator.h
index 5f4d2e0..6cdbf72 100644
--- a/src/ipcz/node_messages_generator.h
+++ b/src/ipcz/node_messages_generator.h
@@ -117,6 +117,14 @@
   IPCZ_MSG_PARAM(SequenceNumber, sequence_length)
 IPCZ_MSG_END()
 
+// Notifies a node that the Router it has bound to `sublink` (on the
+// transmitting NodeLink) now has an allocated RouterLinkState in the fragment
+// identified by `descriptor`.
+IPCZ_MSG_BEGIN(SetRouterLinkState, IPCZ_MSG_ID(23), IPCZ_MSG_VERSION(0))
+  IPCZ_MSG_PARAM(SublinkId, sublink)
+  IPCZ_MSG_PARAM(FragmentDescriptor, descriptor)
+IPCZ_MSG_END()
+
 // Hints to the target router that it should flush its state. Generally sent to
 // catalyze route reduction or elicit some other state change which was blocked
 // on some other work being done first by the sender of this message.
diff --git a/src/ipcz/remote_router_link.cc b/src/ipcz/remote_router_link.cc
index ec47513..6fba386 100644
--- a/src/ipcz/remote_router_link.cc
+++ b/src/ipcz/remote_router_link.cc
@@ -9,6 +9,7 @@
 
 #include "ipcz/box.h"
 #include "ipcz/node_link.h"
+#include "ipcz/node_link_memory.h"
 #include "ipcz/node_messages.h"
 #include "ipcz/portal.h"
 #include "ipcz/router.h"
@@ -24,9 +25,11 @@
     : node_link_(std::move(node_link)),
       sublink_(sublink),
       type_(type),
-      side_(side),
-      link_state_(std::move(link_state)) {
-  ABSL_ASSERT(link_state_.is_null() || link_state_.is_addressable());
+      side_(side) {
+  ABSL_ASSERT(type.is_central() || link_state.is_null());
+  if (type.is_central()) {
+    SetLinkState(std::move(link_state));
+  }
 }
 
 RemoteRouterLink::~RemoteRouterLink() = default;
@@ -42,12 +45,67 @@
                                        std::move(link_state), type, side));
 }
 
+void RemoteRouterLink::SetLinkState(FragmentRef<RouterLinkState> state) {
+  ABSL_ASSERT(type_.is_central());
+  if (state.is_null()) {
+    // By convention, if a central link has no RouterLinkState at construction
+    // time, side A is responsible for allocating a new one and sharing it with
+    // side B eventually. Side B lives with a null RouterLinkState until then.
+    if (side_.is_side_a()) {
+      AllocateAndShareLinkState();
+    }
+    return;
+  }
+
+  if (state.is_pending()) {
+    // By convention, side A should never be given a pending RouterLinkState
+    // fragment.
+    ABSL_ASSERT(side_.is_side_b());
+
+    // Side B on the other hand may obtain a RouterLinkState fragment which it
+    // can't address yet, and in this case, we wait for the fragment's buffer to
+    // be mapped locally.
+    Ref<NodeLinkMemory> memory = WrapRefCounted(&node_link()->memory());
+    FragmentDescriptor descriptor = state.fragment().descriptor();
+    memory->WaitForBufferAsync(
+        descriptor.buffer_id(),
+        [self = WrapRefCounted(this), memory, descriptor] {
+          auto fragment = memory->GetFragment(descriptor);
+          self->SetLinkState(
+              memory->AdoptFragmentRef<RouterLinkState>(fragment));
+        });
+    return;
+  }
+
+  ABSL_ASSERT(state.is_addressable());
+
+  // SetLinkState() must be called with an addressable fragment only once.
+  ABSL_ASSERT(link_state_.load(std::memory_order_acquire) == nullptr);
+
+  // The release when storing `link_state_` is balanced by an acquire in
+  // GetLinkState().
+  link_state_fragment_ = std::move(state);
+  link_state_.store(link_state_fragment_.get(), std::memory_order_release);
+
+  // If this side of the link was already marked stable before the
+  // RouterLinkState was available, `side_is_stable_` will be true. In that
+  // case, set the stable bit in RouterLinkState immediately. This may unblock
+  // some routing work. The acquire here is balanced by a release in
+  // MarkSideStable().
+  if (side_is_stable_.load(std::memory_order_acquire)) {
+    MarkSideStable();
+  }
+  if (Ref<Router> router = node_link()->GetRouter(sublink_)) {
+    router->Flush();
+  }
+}
+
 LinkType RemoteRouterLink::GetType() const {
   return type_;
 }
 
 RouterLinkState* RemoteRouterLink::GetLinkState() const {
-  return link_state_.get();
+  return link_state_.load(std::memory_order_acquire);
 }
 
 bool RemoteRouterLink::HasLocalPeer(const Router& router) {
@@ -233,4 +291,21 @@
   return ss.str();
 }
 
+void RemoteRouterLink::AllocateAndShareLinkState() {
+  node_link()->memory().AllocateRouterLinkState(
+      [self = WrapRefCounted(this)](FragmentRef<RouterLinkState> state) {
+        if (state.is_null()) {
+          DLOG(ERROR) << "Unable to allocate RouterLinkState.";
+          return;
+        }
+        ABSL_ASSERT(state.is_addressable());
+        self->SetLinkState(state);
+
+        msg::SetRouterLinkState set;
+        set.params().sublink = self->sublink();
+        set.params().descriptor = state.release().descriptor();
+        self->node_link()->Transmit(set);
+      });
+}
+
 }  // namespace ipcz
diff --git a/src/ipcz/remote_router_link.h b/src/ipcz/remote_router_link.h
index 9967fa5..a5c3aeb 100644
--- a/src/ipcz/remote_router_link.h
+++ b/src/ipcz/remote_router_link.h
@@ -48,6 +48,23 @@
   const Ref<NodeLink>& node_link() const { return node_link_; }
   SublinkId sublink() const { return sublink_; }
 
+  // Sets this link's RouterLinkState.
+  //
+  // If `state` is null and this link is on side B, this call is a no-op. If
+  // `state` is null and this link is on side A, this call will kick off an
+  // asynchronous allocation of a new RouterLinkState. When that completes, the
+  // new state will be adopted by side A and shared with side B.
+  //
+  // If `state` references a pending fragment and this link is on side A, the
+  // call is a no-op. If `state` references a pending fragment and this link
+  // is on side B, this operation will be automatically deferred until the
+  // NodeLink acquires a mapping of the buffer referenced by `state` and the
+  // fragment can be resolved to an addressable one.
+  //
+  // Finally, if `state` references a valid, addressable fragment, it is
+  // adopted as-is.
+  void SetLinkState(FragmentRef<RouterLinkState> state);
+
   // RouterLink:
   LinkType GetType() const override;
   RouterLinkState* GetLinkState() const override;
@@ -73,6 +90,8 @@
 
   ~RemoteRouterLink() override;
 
+  void AllocateAndShareLinkState();
+
   const Ref<NodeLink> node_link_;
   const SublinkId sublink_;
   const LinkType type_;
@@ -87,7 +106,19 @@
   // shared by both ends of this RouterLink. Always null for non-central links,
   // and may be null for a central links if its RouterLinkState has not yet been
   // allocated or shared.
-  FragmentRef<RouterLinkState> link_state_;
+  //
+  // Must be set at most once and is only retained by this object to keep the
+  // fragment allocated. Access is unguarded and is restricted to
+  // SetLinkState(), and only allowed while `link_state_` below is still null.
+  // Any other access is unsafe. Use GetLinkState() to get a usable reference to
+  // the RouterLinkState instance.
+  FragmentRef<RouterLinkState> link_state_fragment_;
+
+  // Cached address of the shared RouterLinkState referenced by
+  // `link_state_fragment_`. Once this is set to a non-null value it retains
+  // that value indefinitely, so any non-null value loaded from this field is
+  // safe to dereference for the duration of the RemoteRouterLink's lifetime.
+  std::atomic<RouterLinkState*> link_state_{nullptr};
 };
 
 }  // namespace ipcz
diff --git a/src/ipcz/router_link_test.cc b/src/ipcz/router_link_test.cc
index 3790412..38057ba 100644
--- a/src/ipcz/router_link_test.cc
+++ b/src/ipcz/router_link_test.cc
@@ -5,7 +5,10 @@
 #include "ipcz/router_link.h"
 
 #include <tuple>
+#include <utility>
+#include <vector>
 
+#include "ipcz/driver_memory.h"
 #include "ipcz/driver_transport.h"
 #include "ipcz/ipcz.h"
 #include "ipcz/link_side.h"
@@ -36,6 +39,87 @@
 constexpr NodeName kTestPeer1Name(3, 4);
 constexpr NodeName kTestPeer2Name(4, 5);
 
+// A helper for the tests in this module, TestNodePair creates one broker node
+// and one non-broker node and interconnects them using the synchronous
+// reference driver. This class exposes the NodeLinkMemory on either end of the
+// connection and provides some additional facilities which tests can use to
+// poke at node and router state.
+class TestNodePair {
+ public:
+  TestNodePair() {
+    auto transports = DriverTransport::CreatePair(kTestDriver);
+    auto alloc = NodeLinkMemory::Allocate(node_a_);
+    node_link_a_ =
+        NodeLink::Create(node_a_, LinkSide::kA, kTestBrokerName,
+                         kTestNonBrokerName, Node::Type::kNormal, 0,
+                         transports.first, std::move(alloc.node_link_memory));
+    node_link_b_ = NodeLink::Create(
+        node_b_, LinkSide::kB, kTestNonBrokerName, kTestBrokerName,
+        Node::Type::kBroker, 0, transports.second,
+        NodeLinkMemory::Adopt(node_b_, std::move(alloc.primary_buffer_memory)));
+    node_a_->AddLink(kTestNonBrokerName, node_link_a_);
+    node_b_->AddLink(kTestBrokerName, node_link_b_);
+  }
+
+  ~TestNodePair() {
+    node_b_->Close();
+    node_a_->Close();
+  }
+
+  NodeLinkMemory& memory_a() const { return node_link_a_->memory(); }
+  NodeLinkMemory& memory_b() const { return node_link_b_->memory(); }
+
+  // Activates both of the test nodes' NodeLink transports. Tests can defer this
+  // activation as a means of deferring NodeLink communications in general.
+  void ActivateTransports() {
+    node_link_a_->transport()->Activate();
+    node_link_b_->transport()->Activate();
+  }
+
+  // Establishes new RemoteRouterLinks between `a` and `b`. Different initial
+  // RouterLinkState references may be provided for the link on either side in
+  // order to mimic various production scenarios.
+  RouterLink::Pair LinkRemoteRouters(Ref<Router> a,
+                                     FragmentRef<RouterLinkState> a_state,
+                                     Ref<Router> b,
+                                     FragmentRef<RouterLinkState> b_state) {
+    const SublinkId sublink = node_link_a_->memory().AllocateSublinkIds(1);
+    Ref<RemoteRouterLink> a_link = node_link_a_->AddRemoteRouterLink(
+        sublink, std::move(a_state), LinkType::kCentral, LinkSide::kA, a);
+    Ref<RemoteRouterLink> b_link = node_link_b_->AddRemoteRouterLink(
+        sublink, std::move(b_state), LinkType::kCentral, LinkSide::kB, b);
+    a->SetOutwardLink(a_link);
+    b->SetOutwardLink(b_link);
+    return {a_link, b_link};
+  }
+
+  // Depletes the available supply of RouterLinkState fragments and returns
+  // references to all of them. Note that one side effect of this call is that
+  // memory_a() will expand its RouterLinkState fragment capacity, so subsequent
+  // allocation requests will still succeed.
+  std::vector<FragmentRef<RouterLinkState>> AllocateAllRouterLinkStates() {
+    std::vector<FragmentRef<RouterLinkState>> fragments;
+    for (;;) {
+      FragmentRef<RouterLinkState> fragment =
+          memory_a().TryAllocateRouterLinkState();
+      if (fragment.is_null()) {
+        return fragments;
+      }
+      fragments.push_back(std::move(fragment));
+    }
+  }
+
+ private:
+  const Ref<Node> node_a_{MakeRefCounted<Node>(Node::Type::kBroker,
+                                               kTestDriver,
+                                               IPCZ_INVALID_DRIVER_HANDLE)};
+  const Ref<Node> node_b_{MakeRefCounted<Node>(Node::Type::kNormal,
+                                               kTestDriver,
+                                               IPCZ_INVALID_DRIVER_HANDLE)};
+  Ref<NodeLink> node_link_a_;
+  Ref<NodeLink> node_link_b_;
+};
+
 class RouterLinkTest : public testing::Test,
                        public testing::WithParamInterface<RouterLinkTestMode> {
  public:
@@ -47,48 +131,24 @@
         break;
 
       case RouterLinkTestMode::kRemote: {
-        auto transports = DriverTransport::CreatePair(kTestDriver);
-        auto alloc = NodeLinkMemory::Allocate(broker_);
-        broker_node_link_ = NodeLink::Create(
-            broker_, LinkSide::kA, kTestBrokerName, kTestNonBrokerName,
-            Node::Type::kNormal, 0, transports.first,
-            std::move(alloc.node_link_memory));
-        non_broker_node_link_ = NodeLink::Create(
-            non_broker_, LinkSide::kB, kTestNonBrokerName, kTestBrokerName,
-            Node::Type::kBroker, 0, transports.second,
-            NodeLinkMemory::Adopt(non_broker_,
-                                  std::move(alloc.primary_buffer_memory)));
-        broker_->AddLink(kTestNonBrokerName, broker_node_link_);
-        non_broker_->AddLink(kTestBrokerName, non_broker_node_link_);
-
-        auto fragment = broker_node_link_->memory().AllocateFragment(
-            sizeof(RouterLinkState));
-        auto link_state = FragmentRef<RouterLinkState>(
-            RefCountedFragment::kAdoptExistingRef,
-            WrapRefCounted(&broker_node_link_->memory()), fragment);
-        RouterLinkState::Initialize(link_state.get());
-        a_link_ = broker_node_link_->AddRemoteRouterLink(
-            SublinkId{0}, link_state, LinkType::kCentral, LinkSide::kA, a_);
-        b_link_ = non_broker_node_link_->AddRemoteRouterLink(
-            SublinkId{0}, link_state, LinkType::kCentral, LinkSide::kB, b_);
-        a_->SetOutwardLink(a_link_);
-        b_->SetOutwardLink(b_link_);
-
-        broker_node_link_->transport()->Activate();
-        non_broker_node_link_->transport()->Activate();
+        auto link_state = nodes_.memory_a().TryAllocateRouterLinkState();
+        ABSL_ASSERT(link_state.is_addressable());
+        link_state_ = link_state.get();
+        std::tie(a_link_, b_link_) =
+            nodes_.LinkRemoteRouters(a_, link_state, b_, link_state);
         break;
       }
     }
 
     ASSERT_EQ(a_link_->GetLinkState(), b_link_->GetLinkState());
     link_state_ = a_link_->GetLinkState();
+
+    nodes_.ActivateTransports();
   }
 
   void TearDown() override {
     a_->CloseRoute();
     b_->CloseRoute();
-    broker_->Close();
-    non_broker_->Close();
   }
 
   Router& a() { return *a_; }
@@ -99,15 +159,7 @@
   RouterLinkState::Status link_status() { return link_state_->status; }
 
  private:
-  const Ref<Node> broker_{MakeRefCounted<Node>(Node::Type::kBroker,
-                                               kTestDriver,
-                                               IPCZ_INVALID_DRIVER_HANDLE)};
-  const Ref<Node> non_broker_{MakeRefCounted<Node>(Node::Type::kNormal,
-                                                   kTestDriver,
-                                                   IPCZ_INVALID_DRIVER_HANDLE)};
-  Ref<NodeLink> broker_node_link_;
-  Ref<NodeLink> non_broker_node_link_;
-
+  TestNodePair nodes_;
   const Ref<Router> a_{MakeRefCounted<Router>()};
   const Ref<Router> b_{MakeRefCounted<Router>()};
   Ref<RouterLink> a_link_;
@@ -186,6 +238,149 @@
   EXPECT_EQ(RouterLinkState::kStable, link_status());
 }
 
+class RemoteRouterLinkTest : public testing::Test {
+ public:
+  TestNodePair& nodes() { return nodes_; }
+
+  std::vector<Router::Pair> CreateTestRouterPairs(size_t n) {
+    std::vector<Router::Pair> pairs;
+    pairs.reserve(n);
+    for (size_t i = 0; i < n; ++i) {
+      pairs.emplace_back(MakeRefCounted<Router>(), MakeRefCounted<Router>());
+    }
+    return pairs;
+  }
+
+  void CloseRoutes(const std::vector<Router::Pair>& routers) {
+    for (const auto& pair : routers) {
+      pair.first->CloseRoute();
+      pair.second->CloseRoute();
+    }
+  }
+
+  BufferId GenerateBufferId() {
+    return nodes().memory_a().AllocateNewBufferId();
+  }
+
+ private:
+  TestNodePair nodes_;
+};
+
+TEST_F(RemoteRouterLinkTest, NewLinkWithAddressableState) {
+  nodes().ActivateTransports();
+
+  std::vector<FragmentRef<RouterLinkState>> fragments =
+      nodes().AllocateAllRouterLinkStates();
+  std::vector<Router::Pair> router_pairs =
+      CreateTestRouterPairs(fragments.size());
+  std::vector<RouterLink::Pair> links;
+  for (size_t i = 0; i < fragments.size(); ++i) {
+    auto [a, b] = router_pairs[i];
+    auto [a_link, b_link] =
+        nodes().LinkRemoteRouters(a, fragments[i], b, fragments[i]);
+    a_link->MarkSideStable();
+    b_link->MarkSideStable();
+    links.emplace_back(std::move(a_link), std::move(b_link));
+  }
+
+  // We should be able to lock all links from either side, implying that both
+  // sides have a valid reference to the same RouterLinkState.
+  for (const auto& [a_link, b_link] : links) {
+    EXPECT_TRUE(a_link->TryLockForClosure());
+    a_link->Unlock();
+    EXPECT_TRUE(b_link->TryLockForClosure());
+  }
+
+  CloseRoutes(router_pairs);
+}
+
+TEST_F(RemoteRouterLinkTest, NewLinkWithPendingState) {
+  // Occupy all fragments in the primary buffer so they aren't usable.
+  std::vector<FragmentRef<RouterLinkState>> unused_fragments =
+      nodes().AllocateAllRouterLinkStates();
+
+  // Now allocate another batch of fragments which must be in a newly allocated
+  // buffer on node A. Because the nodes' transports are not active yet, there
+  // is no way for node B to have had this buffer shared with it yet. Hence all
+  // of these fragments will be seen as pending on node B.
+  std::vector<FragmentRef<RouterLinkState>> fragments =
+      nodes().AllocateAllRouterLinkStates();
+
+  std::vector<Router::Pair> router_pairs =
+      CreateTestRouterPairs(fragments.size());
+  std::vector<RouterLink::Pair> links;
+  for (size_t i = 0; i < fragments.size(); ++i) {
+    auto [a, b] = router_pairs[i];
+    auto a_state = fragments[i];
+    auto b_fragment =
+        nodes().memory_b().GetFragment(fragments[i].fragment().descriptor());
+    auto b_state =
+        nodes().memory_b().AdoptFragmentRef<RouterLinkState>(b_fragment);
+    ASSERT_TRUE(a_state.is_addressable());
+    ASSERT_TRUE(b_state.is_pending());
+    auto [a_link, b_link] =
+        nodes().LinkRemoteRouters(a, std::move(a_state), b, std::move(b_state));
+    a_link->MarkSideStable();
+    b_link->MarkSideStable();
+    links.emplace_back(std::move(a_link), std::move(b_link));
+  }
+
+  // Because side B of these links still cannot resolve its RouterLinkState,
+  // the link still cannot be stabilized or locked yet.
+  for (const auto& [a_link, b_link] : links) {
+    EXPECT_FALSE(a_link->TryLockForClosure());
+    EXPECT_FALSE(b_link->TryLockForClosure());
+  }
+
+  // We're using the synchronous driver, so as soon as we activate our
+  // transports, all pending NodeLink communications will complete before this
+  // call returns. This also means side B of each link will resolve its
+  // RouterLinkState.
+  nodes().ActivateTransports();
+
+  // Now all links should be lockable from either side, implying that both
+  // sides have a valid reference to the same RouterLinkState.
+  for (const auto& [a_link, b_link] : links) {
+    EXPECT_TRUE(a_link->TryLockForClosure());
+    a_link->Unlock();
+    EXPECT_TRUE(b_link->TryLockForClosure());
+  }
+
+  CloseRoutes(router_pairs);
+}
+
+TEST_F(RemoteRouterLinkTest, NewLinkWithNullState) {
+  // Occupy all fragments in the primary buffer so they aren't usable.
+  std::vector<FragmentRef<RouterLinkState>> unused_fragments =
+      nodes().AllocateAllRouterLinkStates();
+
+  constexpr size_t kNumIterations = 15000;
+  std::vector<Router::Pair> router_pairs =
+      CreateTestRouterPairs(kNumIterations);
+  std::vector<RouterLink::Pair> links;
+  for (size_t i = 0; i < kNumIterations; ++i) {
+    auto [a, b] = router_pairs[i];
+    auto [a_link, b_link] = nodes().LinkRemoteRouters(a, nullptr, b, nullptr);
+    a_link->MarkSideStable();
+    b_link->MarkSideStable();
+    EXPECT_FALSE(a_link->TryLockForClosure());
+    EXPECT_FALSE(b_link->TryLockForClosure());
+    links.emplace_back(std::move(a_link), std::move(b_link));
+  }
+
+  // Since we're using the synchronous driver, by the time transport activation
+  // completes, side A of each link must have already dynamically allocated a
+  // RouterLinkState and shared it with side B.
+  nodes().ActivateTransports();
+  for (const auto& [a_link, b_link] : links) {
+    EXPECT_TRUE(a_link->TryLockForClosure());
+    a_link->Unlock();
+    EXPECT_TRUE(b_link->TryLockForClosure());
+  }
+
+  CloseRoutes(router_pairs);
+}
+
 INSTANTIATE_TEST_SUITE_P(,
                          RouterLinkTest,
                          ::testing::Values(RouterLinkTestMode::kLocal,