ipcz: Node introductions

Implements basic brokering of node introductions between non-broker
nodes. Supporting this required a minor refactoring of NodeLinkMemory
allocation and initialization.

Bug: 1299283
Change-Id: Ibb3f4725daad121e39ea43b8432aa81cd8f33ac5
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3759376
Commit-Queue: Ken Rockot <rockot@google.com>
Reviewed-by: Daniel Cheng <dcheng@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1027285}
NOKEYCHECK=True
GitOrigin-RevId: 121c7d53e4f177790f225c8af0e1f2310aefe25d
diff --git a/src/BUILD.gn b/src/BUILD.gn
index 90b97fa..24af984 100644
--- a/src/BUILD.gn
+++ b/src/BUILD.gn
@@ -336,6 +336,7 @@
     "ipcz/node_connector_test.cc",
     "ipcz/node_link_memory_test.cc",
     "ipcz/node_link_test.cc",
+    "ipcz/node_test.cc",
     "ipcz/parcel_queue_test.cc",
     "ipcz/ref_counted_fragment_test.cc",
     "ipcz/router_link_test.cc",
diff --git a/src/ipcz/driver_memory.cc b/src/ipcz/driver_memory.cc
index df404fb..bc28fcd 100644
--- a/src/ipcz/driver_memory.cc
+++ b/src/ipcz/driver_memory.cc
@@ -63,4 +63,18 @@
   return DriverMemoryMapping(*memory_.driver(), mapping_handle, address, size_);
 }
 
+DriverMemoryWithMapping::DriverMemoryWithMapping() = default;
+
+DriverMemoryWithMapping::DriverMemoryWithMapping(DriverMemory memory,
+                                                 DriverMemoryMapping mapping)
+    : memory(std::move(memory)), mapping(std::move(mapping)) {}
+
+DriverMemoryWithMapping::DriverMemoryWithMapping(DriverMemoryWithMapping&&) =
+    default;
+
+DriverMemoryWithMapping& DriverMemoryWithMapping::operator=(
+    DriverMemoryWithMapping&&) = default;
+
+DriverMemoryWithMapping::~DriverMemoryWithMapping() = default;
+
 }  // namespace ipcz
diff --git a/src/ipcz/driver_memory.h b/src/ipcz/driver_memory.h
index 639b806..3545418 100644
--- a/src/ipcz/driver_memory.h
+++ b/src/ipcz/driver_memory.h
@@ -52,6 +52,19 @@
   size_t size_ = 0;
 };
 
+// This pairs a DriverMemory object with a mapping of that same object, for
+// convenience.
+struct DriverMemoryWithMapping {
+  DriverMemoryWithMapping();
+  DriverMemoryWithMapping(DriverMemory memory, DriverMemoryMapping mapping);
+  DriverMemoryWithMapping(DriverMemoryWithMapping&&);
+  DriverMemoryWithMapping& operator=(DriverMemoryWithMapping&&);
+  ~DriverMemoryWithMapping();
+
+  DriverMemory memory;
+  DriverMemoryMapping mapping;
+};
+
 }  // namespace ipcz
 
 #endif  // IPCZ_SRC_IPCZ_DRIVER_MEMORY_H_
diff --git a/src/ipcz/node.cc b/src/ipcz/node.cc
index eb6f865..3ff412b 100644
--- a/src/ipcz/node.cc
+++ b/src/ipcz/node.cc
@@ -4,11 +4,15 @@
 
 #include "ipcz/node.h"
 
+#include <utility>
 #include <vector>
 
+#include "ipcz/driver_memory.h"
 #include "ipcz/ipcz.h"
+#include "ipcz/link_side.h"
 #include "ipcz/node_connector.h"
 #include "ipcz/node_link.h"
+#include "ipcz/node_link_memory.h"
 #include "ipcz/portal.h"
 #include "ipcz/router.h"
 #include "third_party/abseil-cpp/absl/base/macros.h"
@@ -101,6 +105,15 @@
   return false;
 }
 
+Ref<NodeLink> Node::GetLink(const NodeName& name) {
+  absl::MutexLock lock(&mutex_);
+  auto it = node_links_.find(name);
+  if (it == node_links_.end()) {
+    return nullptr;
+  }
+  return it->second;
+}
+
 NodeName Node::GenerateRandomName() const {
   NodeName name;
   IpczResult result =
@@ -117,6 +130,156 @@
   callback(DriverMemory(driver_, size));
 }
 
+void Node::EstablishLink(const NodeName& name, EstablishLinkCallback callback) {
+  Ref<NodeLink> broker;
+  Ref<NodeLink> link;
+  {
+    absl::MutexLock lock(&mutex_);
+    broker = broker_link_;
+    auto it = node_links_.find(name);
+    if (it != node_links_.end()) {
+      link = it->second;
+    } else if (type_ == Type::kNormal && broker) {
+      auto [pending_it, inserted] = pending_introductions_.insert({name, {}});
+      pending_it->second.push_back(std::move(callback));
+      if (!inserted) {
+        // There's already an introduction request out for this node, so there's
+        // nothing more we need to do.
+        return;
+      }
+    }
+  }
+
+  if (broker && !link) {
+    broker->RequestIntroduction(name);
+  } else {
+    callback(link.get());
+  }
+}
+
+void Node::HandleIntroductionRequest(NodeLink& from_node_link,
+                                     const NodeName& for_node) {
+  // NodeLink must never accept these requests on non-broker nodes.
+  ABSL_ASSERT(type_ == Type::kBroker);
+
+  const NodeName requestor = from_node_link.remote_node_name();
+
+  DVLOG(4) << "Broker " << from_node_link.local_node_name().ToString()
+           << " received introduction request for " << for_node.ToString()
+           << " from " << requestor.ToString();
+
+  // A key which uniquely identifies the pair of nodes being introduced
+  // regardless of who requested the introduction.
+  const auto key = (requestor < for_node)
+                       ? IntroductionKey(requestor, for_node)
+                       : IntroductionKey(for_node, requestor);
+
+  Ref<NodeLink> target_link;
+  {
+    absl::MutexLock lock(&mutex_);
+    auto it = node_links_.find(for_node);
+    if (it != node_links_.end()) {
+      target_link = it->second;
+
+      auto [intro_it, inserted] = in_progress_introductions_.insert(key);
+      if (!inserted) {
+        // We're already introducing the same two nodes, so drop this request.
+        return;
+      }
+    }
+  }
+
+  if (!target_link) {
+    from_node_link.RejectIntroduction(for_node);
+    return;
+  }
+
+  DriverMemoryWithMapping buffer = NodeLinkMemory::AllocateMemory(driver_);
+  auto [transport_for_target, transport_for_requestor] =
+      DriverTransport::CreatePair(driver_, target_link->transport().get(),
+                                  from_node_link.transport().get());
+  target_link->AcceptIntroduction(
+      requestor, LinkSide::kA, from_node_link.remote_protocol_version(),
+      std::move(transport_for_target), buffer.memory.Clone());
+  from_node_link.AcceptIntroduction(
+      for_node, LinkSide::kB, target_link->remote_protocol_version(),
+      std::move(transport_for_requestor), std::move(buffer.memory));
+
+  absl::MutexLock lock(&mutex_);
+  in_progress_introductions_.erase(key);
+}
+
+void Node::AcceptIntroduction(NodeLink& from_node_link,
+                              const NodeName& name,
+                              LinkSide side,
+                              uint32_t remote_protocol_version,
+                              Ref<DriverTransport> transport,
+                              Ref<NodeLinkMemory> memory) {
+  // NodeLink should never dispatch this method to a node if the introduction
+  // didn't come from a broker, so this assertion should always hold.
+  ABSL_ASSERT(from_node_link.remote_node_type() == Node::Type::kBroker);
+
+  const NodeName local_name = from_node_link.local_node_name();
+
+  DVLOG(4) << "Node " << local_name.ToString() << " received introduction to "
+           << name.ToString() << " from broker "
+           << from_node_link.remote_node_name().ToString();
+
+  Ref<NodeLink> new_link = NodeLink::Create(
+      WrapRefCounted(this), side, local_name, name, Type::kNormal,
+      remote_protocol_version, transport, std::move(memory));
+  ABSL_ASSERT(new_link);
+
+  std::vector<EstablishLinkCallback> callbacks;
+  {
+    absl::MutexLock lock(&mutex_);
+    auto [link_it, inserted] = node_links_.insert({name, new_link});
+    if (!inserted) {
+      // If both nodes race to request an introduction to each other, the
+      // broker may send redundant introductions. It does however take care to
+      // ensure that they're ordered consistently across both nodes, so
+      // redundant introductions can be safely ignored by convention.
+    }
+
+    // If this node requested this introduction, we may have callbacks to run.
+    // Note that it is not an error to receive an unrequested introduction,
+    // since it is only necessary for one of the introduced nodes to have
+    // requested it.
+    auto it = pending_introductions_.find(name);
+    if (it != pending_introductions_.end()) {
+      callbacks = std::move(it->second);
+      pending_introductions_.erase(it);
+    }
+  }
+
+  if (transport) {
+    transport->Activate();
+  }
+
+  for (auto& callback : callbacks) {
+    callback(new_link.get());
+  }
+}
+
+bool Node::CancelIntroduction(const NodeName& name) {
+  std::vector<EstablishLinkCallback> callbacks;
+  {
+    absl::MutexLock lock(&mutex_);
+    auto it = pending_introductions_.find(name);
+    if (it == pending_introductions_.end()) {
+      return false;
+    }
+    callbacks = std::move(it->second);
+    pending_introductions_.erase(it);
+  }
+
+  for (auto& callback : callbacks) {
+    callback(nullptr);
+  }
+
+  return true;
+}
+
 void Node::ShutDown() {
   NodeLinkMap node_links;
   {
diff --git a/src/ipcz/node.h b/src/ipcz/node.h
index d5fc67d..5d37738 100644
--- a/src/ipcz/node.h
+++ b/src/ipcz/node.h
@@ -10,14 +10,17 @@
 #include "ipcz/api_object.h"
 #include "ipcz/driver_memory.h"
 #include "ipcz/ipcz.h"
+#include "ipcz/link_side.h"
 #include "ipcz/node_name.h"
 #include "third_party/abseil-cpp/absl/container/flat_hash_map.h"
+#include "third_party/abseil-cpp/absl/container/flat_hash_set.h"
 #include "third_party/abseil-cpp/absl/synchronization/mutex.h"
 #include "third_party/abseil-cpp/absl/types/span.h"
 
 namespace ipcz {
 
 class NodeLink;
+class NodeLinkMemory;
 
 // A Node controls creation and interconnection of a collection of routers which
 // can establish links to and from other routers in other nodes. Every node is
@@ -86,6 +89,11 @@
   // Registers a new NodeLink for the given `remote_node_name`.
   bool AddLink(const NodeName& remote_node_name, Ref<NodeLink> link);
 
+  // Returns a reference to the NodeLink used by this Node to communicate with
+  // the remote node identified by `name`; or null if this node has no NodeLink
+  // connected to that node.
+  Ref<NodeLink> GetLink(const NodeName& name);
+
   // Generates a new random NodeName using this node's driver as a source of
   // randomness.
   NodeName GenerateRandomName() const;
@@ -98,6 +106,39 @@
   using AllocateSharedMemoryCallback = std::function<void(DriverMemory)>;
   void AllocateSharedMemory(size_t size, AllocateSharedMemoryCallback callback);
 
+  // Asynchronously attempts to establish a new NodeLink directly to the named
+  // node, invoking `callback` when complete. On success, this node will retain
+  // a new NodeLink to the named node, and `callback` will be invoked with a
+  // reference to that link. Otherwise `callback` will be invoked with a null
+  // reference.
+  //
+  // If the calling node already has a link to the named node, `callback` may
+  // be invoked synchronously with a link to that node before this method
+  // returns.
+  using EstablishLinkCallback = std::function<void(NodeLink*)>;
+  void EstablishLink(const NodeName& name, EstablishLinkCallback callback);
+
+  // Handles an incoming introduction request. Must only be called on a broker
+  // node. If this broker has a NodeLink to the node named by `for_node`, it
+  // will introduce that node and the remote node on `from_node_link`.
+  void HandleIntroductionRequest(NodeLink& from_node_link,
+                                 const NodeName& for_node);
+
+  // Accepts an introduction received from the broker. `transport` and `memory`
+  // can be used to establish a new NodeLink to the remote node, whose name is
+  // `name`. The NodeLink must assume a role as the given `side` of the link.
+  void AcceptIntroduction(NodeLink& from_node_link,
+                          const NodeName& name,
+                          LinkSide side,
+                          uint32_t remote_protocol_version,
+                          Ref<DriverTransport> transport,
+                          Ref<NodeLinkMemory> memory);
+
+  // Handles a rejected introduction from the broker. This is called on a
+  // non-broker node that previously requested an introduction to `name` if
+  // the broker could not satisfy the request.
+  bool CancelIntroduction(const NodeName& name);
+
  private:
   ~Node() override;
 
@@ -127,6 +168,42 @@
   // system.
   using NodeLinkMap = absl::flat_hash_map<NodeName, Ref<NodeLink>>;
   NodeLinkMap node_links_ ABSL_GUARDED_BY(mutex_);
+
+  // A map of other nodes to which this node is waiting for an introduction from
+  // `broker_link_`. Once such an introduction is received, all callbacks for
+  // that NodeName are executed.
+  absl::flat_hash_map<NodeName, std::vector<EstablishLinkCallback>>
+      pending_introductions_ ABSL_GUARDED_BY(mutex_);
+
+  // Nodes may race to request introductions to each other from the same broker.
+  // This can lead to redundant introductions being sent which the requesting
+  // nodes should be able to ignore. However, the following could occur on a
+  // broker which is processing a request from node A on Thread 1 while also
+  // processing a request from node B on thread 2:
+  //
+  //    Thread 1                       Thread 2                      Time
+  //    ---                            ---                             |
+  //    A requests intro to B          B requests intro to A           v
+  //    Send B intro X to A
+  //                                   Send A intro Y to B
+  //    Send A intro X to B
+  //                                   Send B intro Y to A
+  //
+  // Each unique intro shares either end of a transport with its recipients,
+  // so both A and B must accept the same introduction (either X or Y). In this
+  // scenario however, A will first receive and accept intro X, and will ignore
+  // intro Y as redundant. But B will receive intro Y first and ignore intro X
+  // as redundant. This is bad.
+  //
+  // The set of `in_progress_introductions_` allows this (broker) node to guard
+  // against such interleaved introductions. Immediately before sending an intro
+  // to both recipients, a key identifying them is placed into the set. This key
+  // is removed immediately after both introductions are sent. If another thread
+  // is asked to introduce the same two nodes while the key is still present, it
+  // will ignore the request and send nothing.
+  using IntroductionKey = std::pair<NodeName, NodeName>;
+  absl::flat_hash_set<IntroductionKey> in_progress_introductions_
+      ABSL_GUARDED_BY(mutex_);
 };
 
 }  // namespace ipcz
diff --git a/src/ipcz/node_connector.cc b/src/ipcz/node_connector.cc
index 7186359..73702ba 100644
--- a/src/ipcz/node_connector.cc
+++ b/src/ipcz/node_connector.cc
@@ -37,8 +37,9 @@
                       flags,
                       std::move(waiting_portals),
                       std::move(callback)),
-        link_memory_allocation_(NodeLinkMemory::Allocate(node_)) {
-    ABSL_ASSERT(link_memory_allocation_.node_link_memory);
+        link_memory_allocation_(
+            NodeLinkMemory::AllocateMemory(node_->driver())) {
+    ABSL_ASSERT(link_memory_allocation_.mapping.is_valid());
   }
 
   ~NodeConnectorForBrokerToNonBroker() override = default;
@@ -58,7 +59,7 @@
     connect.params().num_initial_portals =
         checked_cast<uint32_t>(num_portals());
     connect.params().buffer = connect.AppendDriverObject(
-        link_memory_allocation_.primary_buffer_memory.TakeDriverObject());
+        link_memory_allocation_.memory.TakeDriverObject());
     return IPCZ_RESULT_OK == transport_->Transmit(connect);
   }
 
@@ -70,10 +71,11 @@
              << new_remote_node_name_.ToString();
 
     AcceptConnection(
-        NodeLink::Create(node_, LinkSide::kA, broker_name_,
-                         new_remote_node_name_, Node::Type::kNormal,
-                         connect.params().protocol_version, transport_,
-                         std::move(link_memory_allocation_.node_link_memory)),
+        NodeLink::Create(
+            node_, LinkSide::kA, broker_name_, new_remote_node_name_,
+            Node::Type::kNormal, connect.params().protocol_version, transport_,
+            NodeLinkMemory::Create(node_,
+                                   std::move(link_memory_allocation_.mapping))),
         LinkSide::kA, connect.params().num_initial_portals);
     return true;
   }
@@ -81,7 +83,7 @@
  private:
   const NodeName broker_name_{node_->GetAssignedName()};
   const NodeName new_remote_node_name_{node_->GenerateRandomName()};
-  NodeLinkMemory::Allocation link_memory_allocation_;
+  DriverMemoryWithMapping link_memory_allocation_;
 };
 
 class NodeConnectorForNonBrokerToBroker : public NodeConnector {
@@ -122,11 +124,11 @@
       return false;
     }
 
-    auto new_link = NodeLink::Create(
-        node_, LinkSide::kB, connect.params().receiver_name,
-        connect.params().broker_name, Node::Type::kBroker,
-        connect.params().protocol_version, transport_,
-        NodeLinkMemory::Adopt(node_, std::move(buffer_memory)));
+    auto new_link =
+        NodeLink::Create(node_, LinkSide::kB, connect.params().receiver_name,
+                         connect.params().broker_name, Node::Type::kBroker,
+                         connect.params().protocol_version, transport_,
+                         NodeLinkMemory::Create(node_, buffer_memory.Map()));
     node_->SetAssignedName(connect.params().receiver_name);
     node_->SetBrokerLink(new_link);
 
diff --git a/src/ipcz/node_link.cc b/src/ipcz/node_link.cc
index 7e08c64..69df0e3 100644
--- a/src/ipcz/node_link.cc
+++ b/src/ipcz/node_link.cc
@@ -125,6 +125,39 @@
   Transmit(add);
 }
 
+void NodeLink::RequestIntroduction(const NodeName& name) {
+  ABSL_ASSERT(remote_node_type_ == Node::Type::kBroker);
+
+  msg::RequestIntroduction request;
+  request.params().name = name;
+  Transmit(request);
+}
+
+void NodeLink::AcceptIntroduction(const NodeName& name,
+                                  LinkSide side,
+                                  uint32_t remote_protocol_version,
+                                  Ref<DriverTransport> transport,
+                                  DriverMemory memory) {
+  ABSL_ASSERT(node_->type() == Node::Type::kBroker);
+
+  msg::AcceptIntroduction accept;
+  accept.params().name = name;
+  accept.params().link_side = side;
+  accept.params().remote_protocol_version = remote_protocol_version;
+  accept.params().transport =
+      accept.AppendDriverObject(transport->TakeDriverObject());
+  accept.params().memory = accept.AppendDriverObject(memory.TakeDriverObject());
+  Transmit(accept);
+}
+
+void NodeLink::RejectIntroduction(const NodeName& name) {
+  ABSL_ASSERT(node_->type() == Node::Type::kBroker);
+
+  msg::RejectIntroduction reject;
+  reject.params().name = name;
+  Transmit(reject);
+}
+
 void NodeLink::Deactivate() {
   {
     absl::MutexLock lock(&mutex_);
@@ -158,6 +191,59 @@
       1, std::memory_order_relaxed));
 }
 
+bool NodeLink::OnRequestIntroduction(msg::RequestIntroduction& request) {
+  // TODO: Support broker-to-broker introduction requests.
+  if (remote_node_type_ != Node::Type::kNormal ||
+      node()->type() != Node::Type::kBroker) {
+    return false;
+  }
+
+  node()->HandleIntroductionRequest(*this, request.params().name);
+  return true;
+}
+
+bool NodeLink::OnAcceptIntroduction(msg::AcceptIntroduction& accept) {
+  if (remote_node_type_ != Node::Type::kBroker) {
+    return false;
+  }
+
+  if (node()->type() != Node::Type::kNormal) {
+    // TODO: Support broker-to-broker introductions.
+    return false;
+  }
+
+  auto memory = DriverMemory(accept.TakeDriverObject(accept.params().memory));
+  if (!memory.is_valid()) {
+    return false;
+  }
+
+  auto mapping = memory.Map();
+  if (!mapping.is_valid()) {
+    return false;
+  }
+
+  auto transport = MakeRefCounted<DriverTransport>(
+      accept.TakeDriverObject(accept.params().transport));
+  node()->AcceptIntroduction(
+      *this, accept.params().name, accept.params().link_side,
+      accept.params().remote_protocol_version, std::move(transport),
+      NodeLinkMemory::Create(node(), std::move(mapping)));
+  return true;
+}
+
+bool NodeLink::OnRejectIntroduction(msg::RejectIntroduction& reject) {
+  if (remote_node_type_ != Node::Type::kBroker) {
+    return false;
+  }
+
+  if (node()->type() != Node::Type::kNormal) {
+    // TODO: Support broker-to-broker introductions.
+    return false;
+  }
+
+  return node()->CancelIntroduction(reject.params().name);
+}
+
 bool NodeLink::OnAddBlockBuffer(msg::AddBlockBuffer& add) {
   DriverMemory buffer(add.TakeDriverObject(add.params().buffer));
   if (!buffer.is_valid()) {
diff --git a/src/ipcz/node_link.h b/src/ipcz/node_link.h
index d7d2292..06e3c00 100644
--- a/src/ipcz/node_link.h
+++ b/src/ipcz/node_link.h
@@ -106,6 +106,24 @@
   // AllocateNewBufferId().
   void AddBlockBuffer(BufferId id, uint32_t block_size, DriverMemory memory);
 
+  // Asks the broker on the other end of this link to introduce the local node
+  // to the node identified by `name`. This will always elicit a response from
+  // the broker in the form of either an AcceptIntroduction or
+  // RejectIntroduction message.
+  void RequestIntroduction(const NodeName& name);
+
+  // Introduces the remote node to the node named `name`, with details needed to
+  // construct a new NodeLink to that node.
+  void AcceptIntroduction(const NodeName& name,
+                          LinkSide side,
+                          uint32_t remote_protocol_version,
+                          Ref<DriverTransport> transport,
+                          DriverMemory memory);
+
+  // Rejects an introduction request previously sent by the remote node for the
+  // node identified by `name`.
+  void RejectIntroduction(const NodeName& name);
+
   // Permanently deactivates this NodeLink. Once this call returns the NodeLink
   // will no longer receive transport messages. It may still be used to transmit
   // outgoing messages, but it cannot be reactivated. Transmissions over a
@@ -132,6 +150,9 @@
   SequenceNumber GenerateOutgoingSequenceNumber();
 
   // NodeMessageListener overrides:
+  bool OnRequestIntroduction(msg::RequestIntroduction& request) override;
+  bool OnAcceptIntroduction(msg::AcceptIntroduction& accept) override;
+  bool OnRejectIntroduction(msg::RejectIntroduction& reject) override;
   bool OnAddBlockBuffer(msg::AddBlockBuffer& add) override;
   bool OnAcceptParcel(msg::AcceptParcel& accept) override;
   bool OnRouteClosed(msg::RouteClosed& route_closed) override;
diff --git a/src/ipcz/node_link_memory.cc b/src/ipcz/node_link_memory.cc
index 6bf92d0..52f2617 100644
--- a/src/ipcz/node_link_memory.cc
+++ b/src/ipcz/node_link_memory.cc
@@ -150,6 +150,7 @@
   // Consistency check here, because PrimaryBuffer is private to NodeLinkMemory.
   static_assert(sizeof(PrimaryBuffer) <= kPrimaryBufferSize,
                 "PrimaryBuffer structure is too large.");
+  ABSL_HARDENING_ASSERT(primary_buffer_memory_.size() >= kPrimaryBufferSize);
 
   const BlockAllocator allocators[] = {
       primary_buffer_.block_allocator_64(),
@@ -171,16 +172,16 @@
 }
 
 // static
-NodeLinkMemory::Allocation NodeLinkMemory::Allocate(Ref<Node> node) {
-  DriverMemory primary_buffer_memory(node->driver(), sizeof(PrimaryBuffer));
-  if (!primary_buffer_memory.is_valid()) {
-    return {.node_link_memory = nullptr, .primary_buffer_memory = {}};
+DriverMemoryWithMapping NodeLinkMemory::AllocateMemory(
+    const IpczDriver& driver) {
+  DriverMemory memory(driver, kPrimaryBufferSize);
+  if (!memory.is_valid()) {
+    return {};
   }
 
-  auto memory = AdoptRef(
-      new NodeLinkMemory(std::move(node), primary_buffer_memory.Map()));
-
-  PrimaryBuffer& primary_buffer = memory->primary_buffer_;
+  DriverMemoryMapping mapping = memory.Map();
+  PrimaryBuffer& primary_buffer =
+      *reinterpret_cast<PrimaryBuffer*>(mapping.bytes().data());
 
   // The first allocable BufferId is 1, because the primary buffer uses 0.
   primary_buffer.header.next_buffer_id.store(1, std::memory_order_relaxed);
@@ -201,17 +202,13 @@
   primary_buffer.block_allocator_1024().InitializeRegion();
   primary_buffer.block_allocator_2048().InitializeRegion();
 
-  return {
-      .node_link_memory = std::move(memory),
-      .primary_buffer_memory = std::move(primary_buffer_memory),
-  };
+  return {std::move(memory), std::move(mapping)};
 }
 
 // static
-Ref<NodeLinkMemory> NodeLinkMemory::Adopt(Ref<Node> node,
-                                          DriverMemory primary_buffer_memory) {
-  return AdoptRef(
-      new NodeLinkMemory(std::move(node), primary_buffer_memory.Map()));
+Ref<NodeLinkMemory> NodeLinkMemory::Create(Ref<Node> node,
+                                           DriverMemoryMapping memory) {
+  return AdoptRef(new NodeLinkMemory(std::move(node), std::move(memory)));
 }
 
 BufferId NodeLinkMemory::AllocateNewBufferId() {
diff --git a/src/ipcz/node_link_memory.h b/src/ipcz/node_link_memory.h
index be6770b..d5e5095 100644
--- a/src/ipcz/node_link_memory.h
+++ b/src/ipcz/node_link_memory.h
@@ -41,24 +41,6 @@
   // reserved for use by initial portals.
   static constexpr size_t kMaxInitialPortals = 12;
 
-  NodeLinkMemory(NodeLinkMemory&&);
-
-  // Returned by Allocate().
-  struct Allocation {
-    // The NodeLinkMemory created by a succesful call to Allocate(), or null if
-    // memory could not be allocated. This memory is initialized with a
-    // primary buffer (BufferId 0) whose contents have also been appropriately
-    // initialized. This object is ready for immediate use by a new NodeLink on
-    // the `node` passed to Allocate().
-    Ref<NodeLinkMemory> node_link_memory;
-
-    // A handle to the region underlying the new NodeLinkMemory's primary
-    // buffer. This should be shared with the corresponding NodeLink's remote
-    // node, where it can be passed to Adopt() to establish a new NodeLinkMemory
-    // there.
-    DriverMemory primary_buffer_memory;
-  };
-
   // Sets a reference to the NodeLink using this NodeLinkMemory. This is called
   // by the NodeLink itself before any other methods can be called on the
   // NodeLinkMemory, and it's only reset to null once the NodeLink is
@@ -67,16 +49,16 @@
   // memory pool as this one.
   void SetNodeLink(Ref<NodeLink> link);
 
-  // Constructs a new NodeLinkMemory over a newly allocated DriverMemory object.
-  // The new DriverMemory is returned in `primary_buffer_memory`, while the
-  // returned NodeLinkMemory internally retains a mapping of that memory.
-  static Allocation Allocate(Ref<Node> node);
+  // Allocates a new DriverMemory object and initializes its contents to be
+  // suitable as the primary buffer of a new NodeLinkMemory. Returns the memory
+  // along with a mapping of it.
+  static DriverMemoryWithMapping AllocateMemory(const IpczDriver& driver);
 
   // Constructs a new NodeLinkMemory with BufferId 0 (the primary buffer) mapped
-  // from `primary_buffer_memory`. The buffer must have been created and
-  // initialized by a prior call to Allocate() above.
-  static Ref<NodeLinkMemory> Adopt(Ref<Node> node,
-                                   DriverMemory primary_buffer_memory);
+  // as `primary_buffer_memory`. The buffer must have been created and
+  // initialized by a prior call to AllocateMemory() above.
+  static Ref<NodeLinkMemory> Create(Ref<Node> node,
+                                    DriverMemoryMapping primary_buffer_memory);
 
   // Returns a new BufferId which should still be unused by any buffer in this
   // NodeLinkMemory's BufferPool, or that of its peer NodeLinkMemory. When
@@ -152,7 +134,10 @@
  private:
   struct PrimaryBuffer;
 
-  NodeLinkMemory(Ref<Node> node, DriverMemoryMapping primary_buffer);
+  // Constructs a new NodeLinkMemory over `mapping`, which must correspond to
+  // a DriverMemory whose contents have already been initialized as a
+  // NodeLinkMemory primary buffer.
+  NodeLinkMemory(Ref<Node> node, DriverMemoryMapping mapping);
   ~NodeLinkMemory() override;
 
   // Indicates whether the NodeLinkMemory should be allowed to expand its
diff --git a/src/ipcz/node_link_memory_test.cc b/src/ipcz/node_link_memory_test.cc
index f7c37ba..8e99fb9 100644
--- a/src/ipcz/node_link_memory_test.cc
+++ b/src/ipcz/node_link_memory_test.cc
@@ -7,6 +7,7 @@
 #include <utility>
 #include <vector>
 
+#include "ipcz/driver_memory.h"
 #include "ipcz/driver_transport.h"
 #include "ipcz/ipcz.h"
 #include "ipcz/link_side.h"
@@ -33,15 +34,16 @@
 
   void SetUp() override {
     auto transports = DriverTransport::CreatePair(kTestDriver);
-    auto alloc = NodeLinkMemory::Allocate(node_a_);
-    link_a_ =
-        NodeLink::Create(node_a_, LinkSide::kA, kTestBrokerName,
-                         kTestNonBrokerName, Node::Type::kNormal, 0,
-                         transports.first, std::move(alloc.node_link_memory));
+    DriverMemoryWithMapping buffer =
+        NodeLinkMemory::AllocateMemory(kTestDriver);
+    link_a_ = NodeLink::Create(
+        node_a_, LinkSide::kA, kTestBrokerName, kTestNonBrokerName,
+        Node::Type::kNormal, 0, transports.first,
+        NodeLinkMemory::Create(node_a_, std::move(buffer.mapping)));
     link_b_ = NodeLink::Create(
         node_b_, LinkSide::kB, kTestNonBrokerName, kTestBrokerName,
         Node::Type::kBroker, 0, transports.second,
-        NodeLinkMemory::Adopt(node_b_, std::move(alloc.primary_buffer_memory)));
+        NodeLinkMemory::Create(node_b_, buffer.memory.Map()));
     node_a_->AddLink(kTestNonBrokerName, link_a_);
     node_b_->AddLink(kTestBrokerName, link_b_);
     link_a_->transport()->Activate();
diff --git a/src/ipcz/node_link_test.cc b/src/ipcz/node_link_test.cc
index 0eff379..94e91c1 100644
--- a/src/ipcz/node_link_test.cc
+++ b/src/ipcz/node_link_test.cc
@@ -6,6 +6,7 @@
 
 #include <utility>
 
+#include "ipcz/driver_memory.h"
 #include "ipcz/link_side.h"
 #include "ipcz/link_type.h"
 #include "ipcz/node_link_memory.h"
@@ -35,19 +36,18 @@
   auto transport1 =
       MakeRefCounted<DriverTransport>(DriverObject(kDriver, handle1));
 
-  NodeLinkMemory::Allocation allocation = NodeLinkMemory::Allocate(broker);
-  ABSL_ASSERT(allocation.node_link_memory);
+  DriverMemoryWithMapping buffer = NodeLinkMemory::AllocateMemory(kDriver);
+  ABSL_ASSERT(buffer.mapping.is_valid());
 
   const NodeName non_broker_name = broker->GenerateRandomName();
-  auto link0 =
-      NodeLink::Create(broker, LinkSide::kA, broker->GetAssignedName(),
-                       non_broker_name, Node::Type::kNormal, 0, transport0,
-                       std::move(allocation.node_link_memory));
+  auto link0 = NodeLink::Create(
+      broker, LinkSide::kA, broker->GetAssignedName(), non_broker_name,
+      Node::Type::kNormal, 0, transport0,
+      NodeLinkMemory::Create(broker, std::move(buffer.mapping)));
   auto link1 = NodeLink::Create(
       non_broker, LinkSide::kB, non_broker_name, broker->GetAssignedName(),
       Node::Type::kNormal, 0, transport1,
-      NodeLinkMemory::Adopt(non_broker,
-                            std::move(allocation.primary_buffer_memory)));
+      NodeLinkMemory::Create(non_broker, buffer.memory.Map()));
 
   transport0->Activate();
   transport1->Activate();
diff --git a/src/ipcz/node_messages.h b/src/ipcz/node_messages.h
index 30867ff..ed92d7a 100644
--- a/src/ipcz/node_messages.h
+++ b/src/ipcz/node_messages.h
@@ -11,6 +11,7 @@
 #include "ipcz/driver_object.h"
 #include "ipcz/driver_transport.h"
 #include "ipcz/handle_type.h"
+#include "ipcz/link_side.h"
 #include "ipcz/message.h"
 #include "ipcz/node_name.h"
 #include "ipcz/router_descriptor.h"
diff --git a/src/ipcz/node_messages_generator.h b/src/ipcz/node_messages_generator.h
index 6cdbf72..8e98ef0 100644
--- a/src/ipcz/node_messages_generator.h
+++ b/src/ipcz/node_messages_generator.h
@@ -54,6 +54,50 @@
   IPCZ_MSG_PARAM(uint32_t, num_initial_portals)
 IPCZ_MSG_END()
 
+// Sent by a non-broker node to a broker node, asking the broker to introduce
+// the non-broker to the node identified by `name`. If the broker is willing and
+// able to comply with this request, it will send an AcceptIntroduction message
+// (see below) to both the sender of this message and the node identified by
+// `name`.
+//
+// If the broker does not know the node named `name`, it will send only a
+// RejectIntroduction message back to the sender to indicate failure.
+IPCZ_MSG_BEGIN(RequestIntroduction, IPCZ_MSG_ID(10), IPCZ_MSG_VERSION(0))
+  IPCZ_MSG_PARAM(NodeName, name)
+IPCZ_MSG_END()
+
+// Introduces one node to another. Sent only by broker nodes and must only be
+// accepted from broker nodes.
+IPCZ_MSG_BEGIN(AcceptIntroduction, IPCZ_MSG_ID(11), IPCZ_MSG_VERSION(0))
+  // The name of the node being introduced to the recipient of this message.
+  IPCZ_MSG_PARAM(NodeName, name)
+
+  // Indicates which nominal side of the link (A or B) the recipient must assume
+  // for the NodeLink it will establish over `transport`.
+  IPCZ_MSG_PARAM(LinkSide, link_side)
+
+  // Indicates the highest ipcz protocol version which the remote side of
+  // `transport` able and willing to use according to the broker.
+  IPCZ_MSG_PARAM(uint32_t, remote_protocol_version)
+
+  // The DriverTransport which should be used by the recipient to establish a
+  // new NodeLink to the named node. The transport's peer endpoint will be
+  // given by the broker to the node identified by `name`.
+  IPCZ_MSG_PARAM_DRIVER_OBJECT(transport)
+
+  // A DriverMemory object which should adopted for the NodeLinkMemory instance
+  // of the newly established NodeLink. This becomes the new NodeLinkMemory's
+  // primary buffer.
+  IPCZ_MSG_PARAM_DRIVER_OBJECT(memory)
+IPCZ_MSG_END()
+
+// Sent back to a non-broker if the broker did not recognzie the subject of an
+// introduction request.
+IPCZ_MSG_BEGIN(RejectIntroduction, IPCZ_MSG_ID(12), IPCZ_MSG_VERSION(0))
+  // The name of the node whose introduction cannot be fulfilled.
+  IPCZ_MSG_PARAM(NodeName, name)
+IPCZ_MSG_END()
+
 // Shares a new buffer to support allocation of blocks of `block_size` bytes.
 // The sender must initialize an appropriate BlockAllocator within the buffer's
 // memory before sending this message.
diff --git a/src/ipcz/node_test.cc b/src/ipcz/node_test.cc
new file mode 100644
index 0000000..8c4685c
--- /dev/null
+++ b/src/ipcz/node_test.cc
@@ -0,0 +1,195 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "ipcz/node.h"
+
+#include <utility>
+
+#include "ipcz/driver_memory.h"
+#include "ipcz/driver_transport.h"
+#include "ipcz/ipcz.h"
+#include "ipcz/link_side.h"
+#include "ipcz/node_link.h"
+#include "ipcz/node_link_memory.h"
+#include "ipcz/node_name.h"
+#include "reference_drivers/sync_reference_driver.h"
+#include "testing/gtest/include/gtest/gtest.h"
+#include "util/ref_counted.h"
+
+namespace ipcz {
+namespace {
+
+const IpczDriver& kTestDriver = reference_drivers::kSyncReferenceDriver;
+
+constexpr NodeName kNodeAName(0, 1);
+constexpr NodeName kNodeBName(1, 2);
+constexpr NodeName kNodeCName(3, 5);
+
+class NodeTest : public testing::Test {
+ public:
+  void SetUp() override {
+    ConnectBrokerToNode(node_a_, kNodeAName);
+    ConnectBrokerToNode(node_b_, kNodeBName);
+  }
+
+  void TearDown() override {
+    node_b_->Close();
+    node_a_->Close();
+    broker_->Close();
+  }
+
+  Node& broker() { return *broker_; }
+  NodeName broker_name() { return broker_->GetAssignedName(); }
+  Node& node_a() { return *node_a_; }
+  Node& node_b() { return *node_b_; }
+
+ private:
+  void ConnectBrokerToNode(Ref<Node> node, const NodeName& name) {
+    auto transports = DriverTransport::CreatePair(kTestDriver);
+    DriverMemoryWithMapping buffer =
+        NodeLinkMemory::AllocateMemory(kTestDriver);
+    const NodeName broker_name = broker_->GetAssignedName();
+    auto broker_link = NodeLink::Create(
+        broker_, LinkSide::kA, broker_name, name, Node::Type::kNormal, 0,
+        transports.first,
+        NodeLinkMemory::Create(broker_, std::move(buffer.mapping)));
+    auto node_link = NodeLink::Create(
+        node, LinkSide::kB, name, broker_name, Node::Type::kBroker, 0,
+        transports.second, NodeLinkMemory::Create(node, buffer.memory.Map()));
+    node->SetAssignedName(name);
+    broker_->AddLink(name, broker_link);
+    node->AddLink(broker_name, node_link);
+    node->SetBrokerLink(node_link);
+    broker_link->transport()->Activate();
+    node_link->transport()->Activate();
+  }
+
+  const Ref<Node> broker_{MakeRefCounted<Node>(Node::Type::kBroker,
+                                               kTestDriver,
+                                               IPCZ_INVALID_DRIVER_HANDLE)};
+  const Ref<Node> node_a_{MakeRefCounted<Node>(Node::Type::kNormal,
+                                               kTestDriver,
+                                               IPCZ_INVALID_DRIVER_HANDLE)};
+  const Ref<Node> node_b_{MakeRefCounted<Node>(Node::Type::kNormal,
+                                               kTestDriver,
+                                               IPCZ_INVALID_DRIVER_HANDLE)};
+};
+
+TEST_F(NodeTest, EstablishExistingLinks) {
+  // When the requested node is already known, EstablishLink() responds
+  // immediately with the existing NodeLink.
+
+  bool called = false;
+  auto expect_link = [&called](Ref<NodeLink>& expected) {
+    return [expected = expected.get(), &called](NodeLink* link) {
+      EXPECT_EQ(expected, link);
+      called = true;
+    };
+  };
+
+  Ref<NodeLink> broker_to_a = broker().GetLink(kNodeAName);
+  EXPECT_TRUE(broker_to_a);
+  broker().EstablishLink(kNodeAName, expect_link(broker_to_a));
+  EXPECT_TRUE(called);
+  called = false;
+
+  Ref<NodeLink> broker_to_b = broker().GetLink(kNodeBName);
+  EXPECT_TRUE(broker_to_b);
+  broker().EstablishLink(kNodeBName, expect_link(broker_to_b));
+  EXPECT_TRUE(called);
+  called = false;
+
+  Ref<NodeLink> a_to_broker = node_a().GetLink(broker_name());
+  EXPECT_TRUE(a_to_broker);
+  node_a().EstablishLink(broker_name(), expect_link(a_to_broker));
+  EXPECT_TRUE(called);
+  called = false;
+
+  Ref<NodeLink> b_to_broker = node_b().GetLink(broker_name());
+  EXPECT_TRUE(b_to_broker);
+  node_b().EstablishLink(broker_name(), expect_link(b_to_broker));
+  EXPECT_TRUE(called);
+  called = false;
+}
+
+TEST_F(NodeTest, EstablishNewLinks) {
+  // When the requested node is not yet known to the caller but is known to
+  // their broker, EstablishLink() coordinates with the broker to get the
+  // two nodes introduced to each other.
+
+  NodeLink* established_link = nullptr;
+  EXPECT_FALSE(node_a().GetLink(kNodeBName));
+  EXPECT_FALSE(node_b().GetLink(kNodeAName));
+  node_a().EstablishLink(kNodeBName, [&established_link](NodeLink* link) {
+    established_link = link;
+  });
+  EXPECT_TRUE(established_link);
+
+  Ref<NodeLink> a_to_b = node_a().GetLink(kNodeBName);
+  Ref<NodeLink> b_to_a = node_b().GetLink(kNodeAName);
+  EXPECT_TRUE(a_to_b);
+  EXPECT_TRUE(b_to_a);
+  EXPECT_EQ(a_to_b.get(), established_link);
+
+  // A redundant EstablishLink() changes nothing, even from the other side.
+  node_a().EstablishLink(
+      kNodeBName, [&a_to_b](NodeLink* link) { EXPECT_EQ(a_to_b.get(), link); });
+  node_b().EstablishLink(
+      kNodeAName, [&b_to_a](NodeLink* link) { EXPECT_EQ(b_to_a.get(), link); });
+  EXPECT_EQ(a_to_b, node_a().GetLink(kNodeBName));
+  EXPECT_EQ(b_to_a, node_b().GetLink(kNodeAName));
+
+  // Verify that the new links are managing a common buffer pool.
+  constexpr uint64_t kMagic = 0x1123581321345589;
+  const Fragment a_fragment = a_to_b->memory().AllocateFragment(8);
+  EXPECT_TRUE(a_fragment.is_addressable());
+  *static_cast<uint64_t*>(a_fragment.address()) = kMagic;
+
+  const Fragment b_fragment =
+      b_to_a->memory().GetFragment(a_fragment.descriptor());
+  EXPECT_TRUE(b_fragment.is_addressable());
+  EXPECT_EQ(kMagic, *static_cast<uint64_t*>(b_fragment.address()));
+  *static_cast<uint64_t*>(b_fragment.address()) = 0;
+  EXPECT_EQ(0u, *static_cast<uint64_t*>(a_fragment.address()));
+}
+
+TEST_F(NodeTest, EstablishLinkFailureFromNonBroker) {
+  // If the named node is unknown to the broker, a link can't be established by
+  // a non-broker.
+  bool failed = false;
+  EXPECT_FALSE(broker().GetLink(kNodeCName));
+  EXPECT_FALSE(node_a().GetLink(kNodeCName));
+  node_a().EstablishLink(kNodeCName, [&](NodeLink* link) {
+    EXPECT_FALSE(link);
+    failed = true;
+  });
+  EXPECT_TRUE(failed);
+}
+
+TEST_F(NodeTest, EstablishLinkFailureFromBroker) {
+  // New links can't be automatically established by the broker.
+  bool failed = false;
+  EXPECT_FALSE(broker().GetLink(kNodeCName));
+  broker().EstablishLink(kNodeCName, [&](NodeLink* link) {
+    EXPECT_FALSE(link);
+    failed = true;
+  });
+  EXPECT_TRUE(failed);
+}
+
+TEST_F(NodeTest, EstablishLinkFailureWithoutBrokerLink) {
+  // A node with no broker link can't be introduced to anyone.
+  bool failed = false;
+  const Ref<Node> node_c = MakeRefCounted<Node>(
+      Node::Type::kNormal, kTestDriver, IPCZ_INVALID_DRIVER_HANDLE);
+  EXPECT_TRUE(broker().GetLink(kNodeAName));
+  node_c->EstablishLink(kNodeAName, [&](NodeLink* link) {
+    EXPECT_FALSE(link);
+    failed = true;
+  });
+  EXPECT_TRUE(failed);
+}
+
+}  // namespace
+}  // namespace ipcz
diff --git a/src/ipcz/ref_counted_fragment_test.cc b/src/ipcz/ref_counted_fragment_test.cc
index 3690f48..ed33e61 100644
--- a/src/ipcz/ref_counted_fragment_test.cc
+++ b/src/ipcz/ref_counted_fragment_test.cc
@@ -7,6 +7,7 @@
 #include <atomic>
 #include <tuple>
 
+#include "ipcz/driver_memory.h"
 #include "ipcz/fragment.h"
 #include "ipcz/fragment_ref.h"
 #include "ipcz/node.h"
@@ -160,7 +161,9 @@
 TEST_F(RefCountedFragmentTest, Free) {
   auto node = MakeRefCounted<Node>(Node::Type::kNormal, kTestDriver,
                                    IPCZ_INVALID_DRIVER_HANDLE);
-  auto memory = NodeLinkMemory::Allocate(std::move(node)).node_link_memory;
+  DriverMemoryWithMapping buffer = NodeLinkMemory::AllocateMemory(kTestDriver);
+  auto memory =
+      NodeLinkMemory::Create(std::move(node), std::move(buffer.mapping));
 
   // Allocate a ton of fragments and let them be released by FragmentRef on
   // destruction. If the fragments aren't freed properly, allocations will fail
diff --git a/src/ipcz/router_link_test.cc b/src/ipcz/router_link_test.cc
index 38057ba..f89f1d4 100644
--- a/src/ipcz/router_link_test.cc
+++ b/src/ipcz/router_link_test.cc
@@ -48,15 +48,16 @@
  public:
   TestNodePair() {
     auto transports = DriverTransport::CreatePair(kTestDriver);
-    auto alloc = NodeLinkMemory::Allocate(node_a_);
-    node_link_a_ =
-        NodeLink::Create(node_a_, LinkSide::kA, kTestBrokerName,
-                         kTestNonBrokerName, Node::Type::kNormal, 0,
-                         transports.first, std::move(alloc.node_link_memory));
+    DriverMemoryWithMapping buffer =
+        NodeLinkMemory::AllocateMemory(kTestDriver);
+    node_link_a_ = NodeLink::Create(
+        node_a_, LinkSide::kA, kTestBrokerName, kTestNonBrokerName,
+        Node::Type::kNormal, 0, transports.first,
+        NodeLinkMemory::Create(node_a_, std::move(buffer.mapping)));
     node_link_b_ = NodeLink::Create(
         node_b_, LinkSide::kB, kTestNonBrokerName, kTestBrokerName,
         Node::Type::kBroker, 0, transports.second,
-        NodeLinkMemory::Adopt(node_b_, std::move(alloc.primary_buffer_memory)));
+        NodeLinkMemory::Create(node_b_, buffer.memory.Map()));
     node_a_->AddLink(kTestNonBrokerName, node_link_a_);
     node_b_->AddLink(kTestBrokerName, node_link_b_);
   }