ipcz: Preparation for multiple brokers

ipcz will support multiple interconnected networks, each with their own
broker process. This is a precursor to that, doing some necessary
plumbing and refactoring ahead of the impending logic changes. This CL
should not change behavior in any meaningful way.

Bug: 1299283
Change-Id: Ide8a6146f7c086a9d6266a57eb294d235de9f63d
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3928396
Reviewed-by: Alex Gough <ajgo@chromium.org>
Commit-Queue: Ken Rockot <rockot@google.com>
Cr-Commit-Position: refs/heads/main@{#1053658}
NOKEYCHECK=True
GitOrigin-RevId: e98594abe0fae089990ce02381baed21f5bb97d5
diff --git a/src/BUILD.gn b/src/BUILD.gn
index 1d699b0..1884f9a 100644
--- a/src/BUILD.gn
+++ b/src/BUILD.gn
@@ -228,6 +228,7 @@
     "ipcz/node_link_memory.h",
     "ipcz/node_messages.h",
     "ipcz/node_name.h",
+    "ipcz/node_type.h",
     "ipcz/parcel.h",
     "ipcz/parcel_queue.h",
     "ipcz/portal.h",
diff --git a/src/ipcz/node.cc b/src/ipcz/node.cc
index 7f39ef4..b9dd93d 100644
--- a/src/ipcz/node.cc
+++ b/src/ipcz/node.cc
@@ -80,46 +80,56 @@
   return broker_link_;
 }
 
-void Node::SetBrokerLink(Ref<NodeLink> link) {
-  std::vector<BrokerLinkCallback> callbacks;
-  {
-    absl::MutexLock lock(&mutex_);
-    ABSL_ASSERT(!broker_link_);
-    broker_link_ = link;
-    broker_link_callbacks_.swap(callbacks);
-  }
-
-  for (auto& callback : callbacks) {
-    callback(link);
-  }
-}
-
 void Node::SetAssignedName(const NodeName& name) {
   absl::MutexLock lock(&mutex_);
   ABSL_ASSERT(!assigned_name_.is_valid());
   assigned_name_ = name;
 }
 
-bool Node::AddLink(const NodeName& remote_node_name, Ref<NodeLink> link) {
+bool Node::AddConnection(const NodeName& remote_node_name,
+                         Connection connection) {
+  std::vector<BrokerLinkCallback> callbacks;
   {
-    absl::MutexLock lock(&mutex_);
-    auto [it, inserted] = node_links_.insert({remote_node_name, link});
-    if (inserted) {
-      return true;
+    absl::ReleasableMutexLock lock(&mutex_);
+    auto [it, inserted] = connections_.insert({remote_node_name, connection});
+    if (!inserted) {
+      lock.Release();
+      connection.link->Deactivate();
+      return false;
+    }
+
+    if (connection.link->remote_node_type() == Type::kBroker) {
+      // The first connection accepted by a non-broker must be a connection to
+      // its own broker.
+      ABSL_ASSERT(connections_.size() == 1);
+      ABSL_ASSERT(!broker_link_);
+      broker_link_ = connection.link;
+      broker_link_callbacks_.swap(callbacks);
     }
   }
 
-  link->Deactivate();
-  return false;
+  for (auto& callback : callbacks) {
+    callback(connection.link);
+  }
+  return true;
+}
+
+absl::optional<Node::Connection> Node::GetConnection(const NodeName& name) {
+  absl::MutexLock lock(&mutex_);
+  auto it = connections_.find(name);
+  if (it == connections_.end()) {
+    return absl::nullopt;
+  }
+  return it->second;
 }
 
 Ref<NodeLink> Node::GetLink(const NodeName& name) {
   absl::MutexLock lock(&mutex_);
-  auto it = node_links_.find(name);
-  if (it == node_links_.end()) {
+  auto it = connections_.find(name);
+  if (it == connections_.end()) {
     return nullptr;
   }
-  return it->second;
+  return it->second.link;
 }
 
 NodeName Node::GenerateRandomName() const {
@@ -152,15 +162,16 @@
 }
 
 void Node::EstablishLink(const NodeName& name, EstablishLinkCallback callback) {
-  Ref<NodeLink> broker;
-  Ref<NodeLink> link;
+  Ref<NodeLink> existing_link;
+  Ref<NodeLink> our_broker;
   {
     absl::MutexLock lock(&mutex_);
-    broker = broker_link_;
-    auto it = node_links_.find(name);
-    if (it != node_links_.end()) {
-      link = it->second;
-    } else if (type_ == Type::kNormal && broker) {
+    auto it = connections_.find(name);
+    if (it != connections_.end()) {
+      existing_link = it->second.link;
+    } else if (type_ == Type::kNormal && broker_link_) {
+      our_broker = broker_link_;
+
       auto [pending_it, inserted] = pending_introductions_.insert({name, {}});
       pending_it->second.push_back(std::move(callback));
       if (!inserted) {
@@ -171,11 +182,13 @@
     }
   }
 
-  if (broker && !link) {
-    broker->RequestIntroduction(name);
-  } else {
-    callback(link.get());
+  if (our_broker) {
+    our_broker->RequestIntroduction(name);
+    return;
   }
+
+  // NOTE: `existing_link` may be null here, implying that we have failed.
+  callback(existing_link.get());
 }
 
 void Node::HandleIntroductionRequest(NodeLink& from_node_link,
@@ -189,50 +202,19 @@
            << " received introduction request for " << for_node.ToString()
            << " from " << requestor.ToString();
 
-  // A key which uniquely identifies the pair of nodes being introduced
-  // regardless of who requested the introduction.
-  const auto key = (requestor < for_node)
-                       ? IntroductionKey(requestor, for_node)
-                       : IntroductionKey(for_node, requestor);
-
-  Ref<NodeLink> target_link;
-  {
-    absl::MutexLock lock(&mutex_);
-    auto it = node_links_.find(for_node);
-    if (it != node_links_.end()) {
-      target_link = it->second;
-
-      auto [intro_it, inserted] = in_progress_introductions_.insert(key);
-      if (!inserted) {
-        // We're already introducing the same two nodes, so drop this request.
-        return;
-      }
-    }
-  }
-
-  if (!target_link) {
+  const absl::optional<Connection> target_connection = GetConnection(for_node);
+  if (!target_connection) {
     from_node_link.RejectIntroduction(for_node);
     return;
   }
 
-  DriverMemoryWithMapping buffer = NodeLinkMemory::AllocateMemory(driver_);
-  auto [transport_for_target, transport_for_requestor] =
-      DriverTransport::CreatePair(driver_, target_link->transport().get(),
-                                  from_node_link.transport().get());
-  target_link->AcceptIntroduction(
-      requestor, LinkSide::kA, from_node_link.remote_protocol_version(),
-      std::move(transport_for_target), buffer.memory.Clone());
-  from_node_link.AcceptIntroduction(
-      for_node, LinkSide::kB, target_link->remote_protocol_version(),
-      std::move(transport_for_requestor), std::move(buffer.memory));
-
-  absl::MutexLock lock(&mutex_);
-  in_progress_introductions_.erase(key);
+  IntroduceRemoteNodes(from_node_link, *target_connection->link);
 }
 
 void Node::AcceptIntroduction(NodeLink& from_node_link,
                               const NodeName& name,
                               LinkSide side,
+                              Node::Type remote_node_type,
                               uint32_t remote_protocol_version,
                               Ref<DriverTransport> transport,
                               Ref<NodeLinkMemory> memory) {
@@ -247,14 +229,19 @@
            << from_node_link.remote_node_name().ToString();
 
   Ref<NodeLink> new_link = NodeLink::CreateInactive(
-      WrapRefCounted(this), side, local_name, name, Type::kNormal,
+      WrapRefCounted(this), side, local_name, name, remote_node_type,
       remote_protocol_version, transport, memory);
   ABSL_ASSERT(new_link);
 
   std::vector<EstablishLinkCallback> callbacks;
   {
     absl::MutexLock lock(&mutex_);
-    auto [link_it, inserted] = node_links_.insert({name, new_link});
+    auto [connection_it, inserted] =
+        connections_.insert({name,
+                             {
+                                 .link = new_link,
+                                 .broker = WrapRefCounted(&from_node_link),
+                             }});
     if (!inserted) {
       // If both nodes race to request an introduction to each other, the
       // broker may send redundant introductions. It does however take care to
@@ -324,21 +311,21 @@
   return true;
 }
 
-void Node::DropLink(const NodeName& name) {
+void Node::DropConnection(const NodeName& name) {
   Ref<NodeLink> link;
   bool lost_broker = false;
   {
     absl::MutexLock lock(&mutex_);
-    auto it = node_links_.find(name);
-    if (it == node_links_.end()) {
+    auto it = connections_.find(name);
+    if (it == connections_.end()) {
       return;
     }
-    link = std::move(it->second);
-    node_links_.erase(it);
+    link = std::move(it->second.link);
+    connections_.erase(it);
 
     const NodeName& local_name = link->local_node_name();
     DVLOG(4) << "Node " << local_name.ToString() << " dropping "
-             << " link to " << link->remote_node_name().ToString();
+             << "link to " << link->remote_node_name().ToString();
     if (link == broker_link_) {
       DVLOG(4) << "Node " << local_name.ToString() << " lost its broker link";
       broker_link_.reset();
@@ -375,16 +362,16 @@
 }
 
 void Node::ShutDown() {
-  NodeLinkMap node_links;
+  ConnectionMap connections;
   {
     absl::MutexLock lock(&mutex_);
-    std::swap(node_links_, node_links);
+    connections_.swap(connections);
     broker_link_.reset();
     allocation_delegate_link_.reset();
   }
 
-  for (const auto& entry : node_links) {
-    entry.second->Deactivate();
+  for (const auto& entry : connections) {
+    entry.second.link->Deactivate();
   }
 
   CancelAllIntroductions();
@@ -404,4 +391,36 @@
   }
 }
 
+void Node::IntroduceRemoteNodes(NodeLink& first, NodeLink& second) {
+  // Ensure that no other thread does the same introduction concurrently.
+  const NodeName& first_name = first.remote_node_name();
+  const NodeName& second_name = second.remote_node_name();
+  const auto key = (first_name < second_name)
+                       ? IntroductionKey(first_name, second_name)
+                       : IntroductionKey(second_name, first_name);
+  {
+    absl::MutexLock lock(&mutex_);
+    auto [it, inserted] = in_progress_introductions_.insert(key);
+    if (!inserted) {
+      return;
+    }
+  }
+
+  DriverMemoryWithMapping buffer = NodeLinkMemory::AllocateMemory(driver_);
+  auto [transport_for_first_node, transport_for_second_node] =
+      DriverTransport::CreatePair(driver_, first.transport().get(),
+                                  second.transport().get());
+  first.AcceptIntroduction(second_name, LinkSide::kA, second.remote_node_type(),
+                           second.remote_protocol_version(),
+                           std::move(transport_for_first_node),
+                           buffer.memory.Clone());
+  second.AcceptIntroduction(first_name, LinkSide::kB, first.remote_node_type(),
+                            first.remote_protocol_version(),
+                            std::move(transport_for_second_node),
+                            std::move(buffer.memory));
+
+  absl::MutexLock lock(&mutex_);
+  in_progress_introductions_.erase(key);
+}
+
 }  // namespace ipcz
diff --git a/src/ipcz/node.h b/src/ipcz/node.h
index e79704f..c9e3e62 100644
--- a/src/ipcz/node.h
+++ b/src/ipcz/node.h
@@ -6,6 +6,8 @@
 #define IPCZ_SRC_IPCZ_NODE_H_
 
 #include <functional>
+#include <utility>
+#include <vector>
 
 #include "ipcz/api_object.h"
 #include "ipcz/driver_memory.h"
@@ -13,9 +15,11 @@
 #include "ipcz/link_side.h"
 #include "ipcz/node_messages.h"
 #include "ipcz/node_name.h"
+#include "ipcz/node_type.h"
 #include "third_party/abseil-cpp/absl/container/flat_hash_map.h"
 #include "third_party/abseil-cpp/absl/container/flat_hash_set.h"
 #include "third_party/abseil-cpp/absl/synchronization/mutex.h"
+#include "third_party/abseil-cpp/absl/types/optional.h"
 #include "third_party/abseil-cpp/absl/types/span.h"
 
 namespace ipcz {
@@ -29,17 +33,19 @@
 // introduced to each other exclusively through such brokers.
 class Node : public APIObjectImpl<Node, APIObject::kNode> {
  public:
-  enum class Type {
-    // A broker node assigns its own name and is able to assign names to other
-    // nodes upon connection. Brokers are trusted to introduce nodes to each
-    // other upon request, and brokers may connect to other brokers in order to
-    // share information and effectively bridge two node networks together.
-    kBroker,
+  using Type = NodeType;
 
-    // A "normal" (i.e. non-broker) node is assigned a permanent name by the
-    // first broker node it connects to, and it can only make contact with other
-    // nodes by requesting an introduction from that broker.
-    kNormal,
+  // State regarding a connection to a single remote node.
+  struct Connection {
+    // The NodeLink used to communicate with the remote node.
+    Ref<NodeLink> link;
+
+    // The NodeLink used to communicate with the broker of the remote node's
+    // network. If the remote node belongs to the same network as the local
+    // node, then this is the same link the local node's `broker_link_`. If the
+    // local node *is* the broker for the remote node on `link`, then this link
+    // is null.
+    Ref<NodeLink> broker;
   };
 
   // Constructs a new node of the given `type`, using `driver` to support IPC.
@@ -69,30 +75,23 @@
   // Gets a reference to the node's broker link, if it has one.
   Ref<NodeLink> GetBrokerLink();
 
-  // Sets this node's broker link, which is used e.g. to make introduction
-  // requests.
-  //
-  // This is called by a NodeConnector implementation after accepting a valid
-  // handshake message from a broker node, and `link` will be used as this
-  // node's permanent broker.
-  //
-  // Note that like any other NodeLink used by this Node, the same `link` must
-  // also be registered via AddLink() to associate it with its remote Node's
-  // name. This is also done by NodeConnector.
-  void SetBrokerLink(Ref<NodeLink> link);
-
   // Sets this node's assigned name as given by a broker. NodeConnector is
   // responsible for calling on non-broker Nodes this after receiving the
   // expected handshake from a broker. Must not be called on broker nodes, as
   // they assign their own name at construction time.
   void SetAssignedName(const NodeName& name);
 
-  // Registers a new NodeLink for the given `remote_node_name`.
-  bool AddLink(const NodeName& remote_node_name, Ref<NodeLink> link);
+  // Registers a new connection for the given `remote_node_name`.
+  bool AddConnection(const NodeName& remote_node_name, Connection connection);
+
+  // Returns a copy of the Connection to the remote node named by `name`, or
+  // null if this node has no connection to that node.
+  absl::optional<Node::Connection> GetConnection(const NodeName& name);
 
   // Returns a reference to the NodeLink used by this Node to communicate with
   // the remote node identified by `name`; or null if this node has no NodeLink
-  // connected to that node.
+  // connected to that node. This is shorthand for GetConnection() in the common
+  // case where the caller only wants the underlying NodeLink.
   Ref<NodeLink> GetLink(const NodeName& name);
 
   // Generates a new random NodeName using this node's driver as a source of
@@ -138,6 +137,7 @@
   void AcceptIntroduction(NodeLink& from_node_link,
                           const NodeName& name,
                           LinkSide side,
+                          Node::Type remote_node_type,
                           uint32_t remote_protocol_version,
                           Ref<DriverTransport> transport,
                           Ref<NodeLinkMemory> memory);
@@ -154,8 +154,8 @@
   // the relay source directly.
   bool AcceptRelayedMessage(msg::AcceptRelayedMessage& accept);
 
-  // Drops this node's link to the named node, if one exists.
-  void DropLink(const NodeName& name);
+  // Drops this node's connection to the named node, if one exists.
+  void DropConnection(const NodeName& name);
 
   // Asynchronously waits for this Node to acquire a broker link and then
   // invokes `callback` with it. If this node already has a broker link then the
@@ -174,6 +174,10 @@
   // failure.
   void CancelAllIntroductions();
 
+  // Creates a new transport and link memory and sends introduction messages to
+  // introduce the remote node on `first` to the remote node on `second`.
+  void IntroduceRemoteNodes(NodeLink& first, NodeLink& second);
+
   const Type type_;
   const IpczDriver& driver_;
   const IpczDriverHandle driver_node_;
@@ -199,8 +203,8 @@
   // if this is a non-broker node. If this is a broker node, these links are
   // either assigned by this node itself, or received from other brokers in the
   // system.
-  using NodeLinkMap = absl::flat_hash_map<NodeName, Ref<NodeLink>>;
-  NodeLinkMap node_links_ ABSL_GUARDED_BY(mutex_);
+  using ConnectionMap = absl::flat_hash_map<NodeName, Connection>;
+  ConnectionMap connections_ ABSL_GUARDED_BY(mutex_);
 
   // A map of other nodes to which this node is waiting for an introduction from
   // `broker_link_`. Once such an introduction is received, all callbacks for
diff --git a/src/ipcz/node_connector.cc b/src/ipcz/node_connector.cc
index a9f4bb3..3b12a7a 100644
--- a/src/ipcz/node_connector.cc
+++ b/src/ipcz/node_connector.cc
@@ -71,13 +71,12 @@
              << broker_name_.ToString() << " from new node "
              << new_remote_node_name_.ToString();
 
-    AcceptConnection(
-        NodeLink::CreateActive(
-            node_, LinkSide::kA, broker_name_, new_remote_node_name_,
-            Node::Type::kNormal, connect.params().protocol_version, transport_,
-            NodeLinkMemory::Create(node_,
-                                   std::move(link_memory_allocation_.mapping))),
-        LinkSide::kA, connect.params().num_initial_portals);
+    Ref<NodeLink> link = NodeLink::CreateActive(
+        node_, LinkSide::kA, broker_name_, new_remote_node_name_,
+        Node::Type::kNormal, connect.params().protocol_version, transport_,
+        NodeLinkMemory::Create(node_,
+                               std::move(link_memory_allocation_.mapping)));
+    AcceptConnection({.link = link}, connect.params().num_initial_portals);
     return true;
   }
 
@@ -131,12 +130,11 @@
         connect.params().protocol_version, transport_,
         NodeLinkMemory::Create(node_, buffer_memory.Map()));
     node_->SetAssignedName(connect.params().receiver_name);
-    node_->SetBrokerLink(new_link);
     if ((flags_ & IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE) != 0) {
       node_->SetAllocationDelegate(new_link);
     }
 
-    AcceptConnection(std::move(new_link), LinkSide::kB,
+    AcceptConnection({.link = new_link, .broker = new_link},
                      connect.params().num_initial_portals);
     return true;
   }
@@ -179,13 +177,13 @@
 
     broker_link_->ReferNonBroker(
         std::move(transport_for_broker_), checked_cast<uint32_t>(num_portals()),
-        [connector = WrapRefCounted(this)](
+        [connector = WrapRefCounted(this), broker = broker_link_](
             Ref<NodeLink> link_to_referred_node,
             uint32_t remote_num_initial_portals) {
           if (link_to_referred_node) {
-            connector->AcceptConnection(std::move(link_to_referred_node),
-                                        LinkSide::kA,
-                                        remote_num_initial_portals);
+            connector->AcceptConnection(
+                {.link = link_to_referred_node, .broker = broker},
+                remote_num_initial_portals);
           } else {
             connector->RejectConnection();
           }
@@ -255,11 +253,14 @@
         broker_protocol_version, transport_,
         NodeLinkMemory::Create(node_, broker_buffer.Map()));
     node_->SetAssignedName(connect.params().name);
-    node_->SetBrokerLink(broker_link);
     if ((flags_ & IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE) != 0) {
       node_->SetAllocationDelegate(broker_link);
     }
-    node_->AddLink(connect.params().broker_name, std::move(broker_link));
+    node_->AddConnection(connect.params().broker_name,
+                         {
+                             .link = broker_link,
+                             .broker = broker_link,
+                         });
 
     const uint32_t referrer_protocol_version = std::min(
         connect.params().referrer_protocol_version, msg::kProtocolVersion);
@@ -269,7 +270,7 @@
         referrer_protocol_version, std::move(referrer_transport),
         NodeLinkMemory::Create(node_, referrer_buffer.Map()));
 
-    AcceptConnection(referrer_link, LinkSide::kB,
+    AcceptConnection({.link = referrer_link, .broker = broker_link},
                      connect.params().num_initial_portals);
     referrer_link->Activate();
     return true;
@@ -324,7 +325,7 @@
         node_, LinkSide::kA, broker_name_, referred_node_name_,
         Node::Type::kNormal, protocol_version, transport_,
         NodeLinkMemory::Create(node_, std::move(link_memory_.mapping)));
-    AcceptConnection(link_to_referree, LinkSide::kA, /*num_remote_portals=*/0);
+    AcceptConnection({.link = link_to_referree}, /*num_remote_portals=*/0);
 
     // Now we can create a new link to introduce both clients -- the referrer
     // and the referree -- to each other.
@@ -511,21 +512,20 @@
 
 NodeConnector::~NodeConnector() = default;
 
-void NodeConnector::AcceptConnection(Ref<NodeLink> new_link,
-                                     LinkSide link_side,
+void NodeConnector::AcceptConnection(Node::Connection connection,
                                      uint32_t num_remote_portals) {
-  node_->AddLink(new_link->remote_node_name(), new_link);
+  node_->AddConnection(connection.link->remote_node_name(), connection);
   if (callback_) {
-    callback_(new_link);
+    callback_(connection.link);
   }
-  EstablishWaitingPortals(std::move(new_link), link_side, num_remote_portals);
+  EstablishWaitingPortals(std::move(connection.link), num_remote_portals);
 }
 
 void NodeConnector::RejectConnection() {
   if (callback_) {
     callback_(nullptr);
   }
-  EstablishWaitingPortals(nullptr, LinkSide::kA, 0);
+  EstablishWaitingPortals(nullptr, 0);
   if (transport_) {
     transport_->Deactivate();
   }
@@ -541,7 +541,6 @@
 }
 
 void NodeConnector::EstablishWaitingPortals(Ref<NodeLink> to_link,
-                                            LinkSide link_side,
                                             size_t max_valid_portals) {
   ABSL_ASSERT(to_link != nullptr || max_valid_portals == 0);
   const size_t num_valid_portals =
@@ -550,7 +549,7 @@
     const Ref<Router> router = waiting_portals_[i]->router();
     Ref<RouterLink> link = to_link->AddRemoteRouterLink(
         SublinkId(i), to_link->memory().GetInitialRouterLinkState(i),
-        LinkType::kCentral, link_side, router);
+        LinkType::kCentral, to_link->link_side(), router);
     if (link) {
       router->SetOutwardLink(std::move(link));
     } else {
diff --git a/src/ipcz/node_connector.h b/src/ipcz/node_connector.h
index 260ee30..96d3dc2 100644
--- a/src/ipcz/node_connector.h
+++ b/src/ipcz/node_connector.h
@@ -12,13 +12,13 @@
 #include "ipcz/driver_transport.h"
 #include "ipcz/ipcz.h"
 #include "ipcz/link_side.h"
+#include "ipcz/node.h"
 #include "ipcz/node_messages.h"
 #include "third_party/abseil-cpp/absl/types/span.h"
 #include "util/ref_counted.h"
 
 namespace ipcz {
 
-class Node;
 class NodeLink;
 class Portal;
 
@@ -73,11 +73,9 @@
 
   size_t num_portals() const { return waiting_portals_.size(); }
 
-  // Invoked once by the implementation when it has completed the handshake.
-  // `new_link` has already assumed ownership of the underlying transport and
-  // is listening for incoming messages on it. Destroys `this`.
-  void AcceptConnection(Ref<NodeLink> new_link,
-                        LinkSide link_side,
+  // Invoked once by the implementation when it has completed its handshake.
+  // Destroys `this`.
+  void AcceptConnection(Node::Connection connection,
                         uint32_t num_remote_portals);
 
   // Invoked if the transport observes an error before receiving the expected
@@ -95,9 +93,7 @@
 
  private:
   bool ActivateTransport();
-  void EstablishWaitingPortals(Ref<NodeLink> to_link,
-                               LinkSide link_side,
-                               size_t max_valid_portals);
+  void EstablishWaitingPortals(Ref<NodeLink> to_link, size_t max_valid_portals);
 
   const ConnectCallback callback_;
 };
diff --git a/src/ipcz/node_link.cc b/src/ipcz/node_link.cc
index 4a46839..7f384a7 100644
--- a/src/ipcz/node_link.cc
+++ b/src/ipcz/node_link.cc
@@ -191,6 +191,7 @@
 
 void NodeLink::AcceptIntroduction(const NodeName& name,
                                   LinkSide side,
+                                  Node::Type remote_node_type,
                                   uint32_t remote_protocol_version,
                                   Ref<DriverTransport> transport,
                                   DriverMemory memory) {
@@ -199,6 +200,7 @@
   msg::AcceptIntroduction accept;
   accept.params().name = name;
   accept.params().link_side = side;
+  accept.params().remote_node_type = remote_node_type;
   accept.params().remote_protocol_version = remote_protocol_version;
   accept.params().transport =
       accept.AppendDriverObject(transport->TakeDriverObject());
@@ -466,8 +468,8 @@
       accept.TakeDriverObject(accept.params().transport));
   node()->AcceptIntroduction(
       *this, accept.params().name, accept.params().link_side,
-      accept.params().remote_protocol_version, std::move(transport),
-      NodeLinkMemory::Create(node(), std::move(mapping)));
+      accept.params().remote_node_type, accept.params().remote_protocol_version,
+      std::move(transport), NodeLinkMemory::Create(node(), std::move(mapping)));
   return true;
 }
 
@@ -796,7 +798,7 @@
   }
 
   Ref<NodeLink> self = WrapRefCounted(this);
-  node_->DropLink(remote_node_name_);
+  node_->DropConnection(remote_node_name_);
 }
 
 void NodeLink::WaitForParcelFragmentToResolve(
diff --git a/src/ipcz/node_link.h b/src/ipcz/node_link.h
index 3bb7ee5..1b358f1 100644
--- a/src/ipcz/node_link.h
+++ b/src/ipcz/node_link.h
@@ -137,6 +137,7 @@
   // construct a new NodeLink to that node.
   void AcceptIntroduction(const NodeName& name,
                           LinkSide side,
+                          Node::Type remote_node_type,
                           uint32_t remote_protocol_version,
                           Ref<DriverTransport> transport,
                           DriverMemory memory);
diff --git a/src/ipcz/node_link_memory_test.cc b/src/ipcz/node_link_memory_test.cc
index c21a479..44ba853 100644
--- a/src/ipcz/node_link_memory_test.cc
+++ b/src/ipcz/node_link_memory_test.cc
@@ -44,8 +44,9 @@
         node_b_, LinkSide::kB, kTestNonBrokerName, kTestBrokerName,
         Node::Type::kBroker, 0, transports.second,
         NodeLinkMemory::Create(node_b_, buffer.memory.Map()));
-    node_a_->AddLink(kTestNonBrokerName, link_a_);
-    node_b_->AddLink(kTestBrokerName, link_b_);
+    node_a_->AddConnection(kTestNonBrokerName, {.link = link_a_});
+    node_b_->AddConnection(kTestBrokerName,
+                           {.link = link_b_, .broker = link_a_});
     link_a_->Activate();
     link_b_->Activate();
   }
diff --git a/src/ipcz/node_messages.h b/src/ipcz/node_messages.h
index d27cff4..790502e 100644
--- a/src/ipcz/node_messages.h
+++ b/src/ipcz/node_messages.h
@@ -14,6 +14,7 @@
 #include "ipcz/link_side.h"
 #include "ipcz/message.h"
 #include "ipcz/node_name.h"
+#include "ipcz/node_type.h"
 #include "ipcz/router_descriptor.h"
 #include "ipcz/sequence_number.h"
 #include "ipcz/sublink_id.h"
diff --git a/src/ipcz/node_messages_generator.h b/src/ipcz/node_messages_generator.h
index d35d404..8f0cc88 100644
--- a/src/ipcz/node_messages_generator.h
+++ b/src/ipcz/node_messages_generator.h
@@ -205,6 +205,9 @@
   // for the NodeLink it will establish over `transport`.
   IPCZ_MSG_PARAM(LinkSide, link_side)
 
+  // Indicates the type of the remote node being introduced.
+  IPCZ_MSG_PARAM(NodeType, remote_node_type)
+
   // Indicates the highest ipcz protocol version which the remote side of
   // `transport` able and willing to use according to the broker.
   IPCZ_MSG_PARAM(uint32_t, remote_protocol_version)
diff --git a/src/ipcz/node_test.cc b/src/ipcz/node_test.cc
index 08ec83d..20ebb84 100644
--- a/src/ipcz/node_test.cc
+++ b/src/ipcz/node_test.cc
@@ -58,9 +58,8 @@
         node, LinkSide::kB, name, broker_name, Node::Type::kBroker, 0,
         transports.second, NodeLinkMemory::Create(node, buffer.memory.Map()));
     node->SetAssignedName(name);
-    broker_->AddLink(name, broker_link);
-    node->AddLink(broker_name, node_link);
-    node->SetBrokerLink(node_link);
+    broker_->AddConnection(name, {.link = broker_link});
+    node->AddConnection(broker_name, {.link = node_link, .broker = node_link});
     broker_link->Activate();
     node_link->Activate();
   }
diff --git a/src/ipcz/node_type.h b/src/ipcz/node_type.h
new file mode 100644
index 0000000..8d245b8
--- /dev/null
+++ b/src/ipcz/node_type.h
@@ -0,0 +1,30 @@
+// Copyright 2022 The Chromium Authors
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef IPCZ_SRC_IPCZ_NODE_TYPE_H_
+#define IPCZ_SRC_IPCZ_NODE_TYPE_H_
+
+#include <cstdint>
+
+namespace ipcz {
+
+// Enumeration indicating the role a Node plays in its network of nodes. Note
+// that this is used by internal wire messages, so values must not be changed or
+// removed.
+enum class NodeType : uint8_t {
+  // A broker node assigns its own name and is able to assign names to other
+  // nodes upon connection. Brokers are trusted to introduce nodes to each
+  // other upon request, and brokers may connect to other brokers in order to
+  // share information and effectively bridge two node networks together.
+  kBroker,
+
+  // A "normal" (i.e. non-broker) node is assigned a permanent name by the
+  // first broker node it connects to, and it can only make contact with other
+  // nodes by requesting an introduction from that broker.
+  kNormal,
+};
+
+}  // namespace ipcz
+
+#endif  // IPCZ_SRC_IPCZ_NODE_TYPE_H_
diff --git a/src/ipcz/router_link_test.cc b/src/ipcz/router_link_test.cc
index 4e78789..d153c2f 100644
--- a/src/ipcz/router_link_test.cc
+++ b/src/ipcz/router_link_test.cc
@@ -58,8 +58,9 @@
         node_b_, LinkSide::kB, kTestNonBrokerName, kTestBrokerName,
         Node::Type::kBroker, 0, transports.second,
         NodeLinkMemory::Create(node_b_, buffer.memory.Map()));
-    node_a_->AddLink(kTestNonBrokerName, node_link_a_);
-    node_b_->AddLink(kTestBrokerName, node_link_b_);
+    node_a_->AddConnection(kTestNonBrokerName, {.link = node_link_a_});
+    node_b_->AddConnection(kTestBrokerName,
+                           {.link = node_link_b_, .broker = node_link_b_});
   }
 
   ~TestNodePair() {