ipcz: Node introductions
Implements basic brokering of node introductions between non-broker
nodes. Supporting this required a minor refactoring of NodeLinkMemory
allocation and initialization.
Bug: 1299283
Change-Id: Ibb3f4725daad121e39ea43b8432aa81cd8f33ac5
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3759376
Commit-Queue: Ken Rockot <rockot@google.com>
Reviewed-by: Daniel Cheng <dcheng@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1027285}
NOKEYCHECK=True
GitOrigin-RevId: 121c7d53e4f177790f225c8af0e1f2310aefe25d
diff --git a/src/BUILD.gn b/src/BUILD.gn
index 90b97fa..24af984 100644
--- a/src/BUILD.gn
+++ b/src/BUILD.gn
@@ -336,6 +336,7 @@
"ipcz/node_connector_test.cc",
"ipcz/node_link_memory_test.cc",
"ipcz/node_link_test.cc",
+ "ipcz/node_test.cc",
"ipcz/parcel_queue_test.cc",
"ipcz/ref_counted_fragment_test.cc",
"ipcz/router_link_test.cc",
diff --git a/src/ipcz/driver_memory.cc b/src/ipcz/driver_memory.cc
index df404fb..bc28fcd 100644
--- a/src/ipcz/driver_memory.cc
+++ b/src/ipcz/driver_memory.cc
@@ -63,4 +63,18 @@
return DriverMemoryMapping(*memory_.driver(), mapping_handle, address, size_);
}
+DriverMemoryWithMapping::DriverMemoryWithMapping() = default;
+
+DriverMemoryWithMapping::DriverMemoryWithMapping(DriverMemory memory,
+ DriverMemoryMapping mapping)
+ : memory(std::move(memory)), mapping(std::move(mapping)) {}
+
+DriverMemoryWithMapping::DriverMemoryWithMapping(DriverMemoryWithMapping&&) =
+ default;
+
+DriverMemoryWithMapping& DriverMemoryWithMapping::operator=(
+ DriverMemoryWithMapping&&) = default;
+
+DriverMemoryWithMapping::~DriverMemoryWithMapping() = default;
+
} // namespace ipcz
diff --git a/src/ipcz/driver_memory.h b/src/ipcz/driver_memory.h
index 639b806..3545418 100644
--- a/src/ipcz/driver_memory.h
+++ b/src/ipcz/driver_memory.h
@@ -52,6 +52,19 @@
size_t size_ = 0;
};
+// This pairs a DriverMemory object with a mapping of that same object, for
+// convenience.
+struct DriverMemoryWithMapping {
+ DriverMemoryWithMapping();
+ DriverMemoryWithMapping(DriverMemory memory, DriverMemoryMapping mapping);
+ DriverMemoryWithMapping(DriverMemoryWithMapping&&);
+ DriverMemoryWithMapping& operator=(DriverMemoryWithMapping&&);
+ ~DriverMemoryWithMapping();
+
+ DriverMemory memory;
+ DriverMemoryMapping mapping;
+};
+
} // namespace ipcz
#endif // IPCZ_SRC_IPCZ_DRIVER_MEMORY_H_
diff --git a/src/ipcz/node.cc b/src/ipcz/node.cc
index eb6f865..3ff412b 100644
--- a/src/ipcz/node.cc
+++ b/src/ipcz/node.cc
@@ -4,11 +4,15 @@
#include "ipcz/node.h"
+#include <utility>
#include <vector>
+#include "ipcz/driver_memory.h"
#include "ipcz/ipcz.h"
+#include "ipcz/link_side.h"
#include "ipcz/node_connector.h"
#include "ipcz/node_link.h"
+#include "ipcz/node_link_memory.h"
#include "ipcz/portal.h"
#include "ipcz/router.h"
#include "third_party/abseil-cpp/absl/base/macros.h"
@@ -101,6 +105,15 @@
return false;
}
+Ref<NodeLink> Node::GetLink(const NodeName& name) {
+ absl::MutexLock lock(&mutex_);
+ auto it = node_links_.find(name);
+ if (it == node_links_.end()) {
+ return nullptr;
+ }
+ return it->second;
+}
+
NodeName Node::GenerateRandomName() const {
NodeName name;
IpczResult result =
@@ -117,6 +130,156 @@
callback(DriverMemory(driver_, size));
}
+void Node::EstablishLink(const NodeName& name, EstablishLinkCallback callback) {
+ Ref<NodeLink> broker;
+ Ref<NodeLink> link;
+ {
+ absl::MutexLock lock(&mutex_);
+ broker = broker_link_;
+ auto it = node_links_.find(name);
+ if (it != node_links_.end()) {
+ link = it->second;
+ } else if (type_ == Type::kNormal && broker) {
+ auto [pending_it, inserted] = pending_introductions_.insert({name, {}});
+ pending_it->second.push_back(std::move(callback));
+ if (!inserted) {
+ // There's already an introduction request out for this node, so there's
+ // nothing more we need to do.
+ return;
+ }
+ }
+ }
+
+ if (broker && !link) {
+ broker->RequestIntroduction(name);
+ } else {
+ callback(link.get());
+ }
+}
+
+void Node::HandleIntroductionRequest(NodeLink& from_node_link,
+ const NodeName& for_node) {
+ // NodeLink must never accept these requests on non-broker nodes.
+ ABSL_ASSERT(type_ == Type::kBroker);
+
+ const NodeName requestor = from_node_link.remote_node_name();
+
+ DVLOG(4) << "Broker " << from_node_link.local_node_name().ToString()
+ << " received introduction request for " << for_node.ToString()
+ << " from " << requestor.ToString();
+
+ // A key which uniquely identifies the pair of nodes being introduced
+ // regardless of who requested the introduction.
+ const auto key = (requestor < for_node)
+ ? IntroductionKey(requestor, for_node)
+ : IntroductionKey(for_node, requestor);
+
+ Ref<NodeLink> target_link;
+ {
+ absl::MutexLock lock(&mutex_);
+ auto it = node_links_.find(for_node);
+ if (it != node_links_.end()) {
+ target_link = it->second;
+
+ auto [intro_it, inserted] = in_progress_introductions_.insert(key);
+ if (!inserted) {
+ // We're already introducing the same two nodes, so drop this request.
+ return;
+ }
+ }
+ }
+
+ if (!target_link) {
+ from_node_link.RejectIntroduction(for_node);
+ return;
+ }
+
+ DriverMemoryWithMapping buffer = NodeLinkMemory::AllocateMemory(driver_);
+ auto [transport_for_target, transport_for_requestor] =
+ DriverTransport::CreatePair(driver_, target_link->transport().get(),
+ from_node_link.transport().get());
+ target_link->AcceptIntroduction(
+ requestor, LinkSide::kA, from_node_link.remote_protocol_version(),
+ std::move(transport_for_target), buffer.memory.Clone());
+ from_node_link.AcceptIntroduction(
+ for_node, LinkSide::kB, target_link->remote_protocol_version(),
+ std::move(transport_for_requestor), std::move(buffer.memory));
+
+ absl::MutexLock lock(&mutex_);
+ in_progress_introductions_.erase(key);
+}
+
+void Node::AcceptIntroduction(NodeLink& from_node_link,
+ const NodeName& name,
+ LinkSide side,
+ uint32_t remote_protocol_version,
+ Ref<DriverTransport> transport,
+ Ref<NodeLinkMemory> memory) {
+ // NodeLink should never dispatch this method to a node if the introduction
+ // didn't come from a broker, so this assertion should always hold.
+ ABSL_ASSERT(from_node_link.remote_node_type() == Node::Type::kBroker);
+
+ const NodeName local_name = from_node_link.local_node_name();
+
+ DVLOG(4) << "Node " << local_name.ToString() << " received introduction to "
+ << name.ToString() << " from broker "
+ << from_node_link.remote_node_name().ToString();
+
+ Ref<NodeLink> new_link = NodeLink::Create(
+ WrapRefCounted(this), side, local_name, name, Type::kNormal,
+ remote_protocol_version, transport, std::move(memory));
+ ABSL_ASSERT(new_link);
+
+ std::vector<EstablishLinkCallback> callbacks;
+ {
+ absl::MutexLock lock(&mutex_);
+ auto [link_it, inserted] = node_links_.insert({name, new_link});
+ if (!inserted) {
+ // If both nodes race to request an introduction to each other, the
+ // broker may send redundant introductions. It does however take care to
+ // ensure that they're ordered consistently across both nodes, so
+ // redundant introductions can be safely ignored by convention.
+ }
+
+ // If this node requested this introduction, we may have callbacks to run.
+ // Note that it is not an error to receive an unrequested introduction,
+ // since it is only necessary for one of the introduced nodes to have
+ // requested it.
+ auto it = pending_introductions_.find(name);
+ if (it != pending_introductions_.end()) {
+ callbacks = std::move(it->second);
+ pending_introductions_.erase(it);
+ }
+ }
+
+ if (transport) {
+ transport->Activate();
+ }
+
+ for (auto& callback : callbacks) {
+ callback(new_link.get());
+ }
+}
+
+bool Node::CancelIntroduction(const NodeName& name) {
+ std::vector<EstablishLinkCallback> callbacks;
+ {
+ absl::MutexLock lock(&mutex_);
+ auto it = pending_introductions_.find(name);
+ if (it == pending_introductions_.end()) {
+ return false;
+ }
+ callbacks = std::move(it->second);
+ pending_introductions_.erase(it);
+ }
+
+ for (auto& callback : callbacks) {
+ callback(nullptr);
+ }
+
+ return true;
+}
+
void Node::ShutDown() {
NodeLinkMap node_links;
{
diff --git a/src/ipcz/node.h b/src/ipcz/node.h
index d5fc67d..5d37738 100644
--- a/src/ipcz/node.h
+++ b/src/ipcz/node.h
@@ -10,14 +10,17 @@
#include "ipcz/api_object.h"
#include "ipcz/driver_memory.h"
#include "ipcz/ipcz.h"
+#include "ipcz/link_side.h"
#include "ipcz/node_name.h"
#include "third_party/abseil-cpp/absl/container/flat_hash_map.h"
+#include "third_party/abseil-cpp/absl/container/flat_hash_set.h"
#include "third_party/abseil-cpp/absl/synchronization/mutex.h"
#include "third_party/abseil-cpp/absl/types/span.h"
namespace ipcz {
class NodeLink;
+class NodeLinkMemory;
// A Node controls creation and interconnection of a collection of routers which
// can establish links to and from other routers in other nodes. Every node is
@@ -86,6 +89,11 @@
// Registers a new NodeLink for the given `remote_node_name`.
bool AddLink(const NodeName& remote_node_name, Ref<NodeLink> link);
+ // Returns a reference to the NodeLink used by this Node to communicate with
+ // the remote node identified by `name`; or null if this node has no NodeLink
+ // connected to that node.
+ Ref<NodeLink> GetLink(const NodeName& name);
+
// Generates a new random NodeName using this node's driver as a source of
// randomness.
NodeName GenerateRandomName() const;
@@ -98,6 +106,39 @@
using AllocateSharedMemoryCallback = std::function<void(DriverMemory)>;
void AllocateSharedMemory(size_t size, AllocateSharedMemoryCallback callback);
+ // Asynchronously attempts to establish a new NodeLink directly to the named
+ // node, invoking `callback` when complete. On success, this node will retain
+ // a new NodeLink to the named node, and `callback` will be invoked with a
+ // reference to that link. Otherwise `callback` will be invoked with a null
+ // reference.
+ //
+ // If the calling node already has a link to the named node, `callback` may
+ // be invoked synchronously with a link to that node before this method
+ // returns.
+ using EstablishLinkCallback = std::function<void(NodeLink*)>;
+ void EstablishLink(const NodeName& name, EstablishLinkCallback callback);
+
+ // Handles an incoming introduction request. Must only be called on a broker
+ // node. If this broker has a NodeLink to the node named by `for_node`, it
+ // will introduce that node and the remote node on `from_node_link`.
+ void HandleIntroductionRequest(NodeLink& from_node_link,
+ const NodeName& for_node);
+
+ // Accepts an introduction received from the broker. `transport` and `memory`
+ // can be used to establish a new NodeLink to the remote node, whose name is
+ // `name`. The NodeLink must assume a role as the given `side` of the link.
+ void AcceptIntroduction(NodeLink& from_node_link,
+ const NodeName& name,
+ LinkSide side,
+ uint32_t remote_protocol_version,
+ Ref<DriverTransport> transport,
+ Ref<NodeLinkMemory> memory);
+
+ // Handles a rejected introduction from the broker. This is called on a
+ // non-broker node that previously requested an introduction to `name` if
+ // the broker could not satisfy the request.
+ bool CancelIntroduction(const NodeName& name);
+
private:
~Node() override;
@@ -127,6 +168,42 @@
// system.
using NodeLinkMap = absl::flat_hash_map<NodeName, Ref<NodeLink>>;
NodeLinkMap node_links_ ABSL_GUARDED_BY(mutex_);
+
+ // A map of other nodes to which this node is waiting for an introduction from
+ // `broker_link_`. Once such an introduction is received, all callbacks for
+ // that NodeName are executed.
+ absl::flat_hash_map<NodeName, std::vector<EstablishLinkCallback>>
+ pending_introductions_ ABSL_GUARDED_BY(mutex_);
+
+ // Nodes may race to request introductions to each other from the same broker.
+ // This can lead to redundant introductions being sent which the requesting
+ // nodes should be able to ignore. However, the following could occur on a
+ // broker which is processing a request from node A on Thread 1 while also
+ // processing a request from node B on thread 2:
+ //
+ // Thread 1 Thread 2 Time
+ // --- --- |
+ // A requests intro to B B requests intro to A v
+ // Send B intro X to A
+ // Send A intro Y to B
+ // Send A intro X to B
+ // Send B intro Y to A
+ //
+ // Each unique intro shares either end of a transport with its recipients,
+ // so both A and B must accept the same introduction (either X or Y). In this
+ // scenario however, A will first receive and accept intro X, and will ignore
+ // intro Y as redundant. But B will receive intro Y first and ignore intro X
+ // as redundant. This is bad.
+ //
+ // The set of `in_progress_introductions_` allows this (broker) node to guard
+ // against such interleaved introductions. Immediately before sending an intro
+ // to both recipients, a key identifying them is placed into the set. This key
+ // is removed immediately after both introductions are sent. If another thread
+ // is asked to introduce the same two nodes while the key is still present, it
+ // will ignore the request and send nothing.
+ using IntroductionKey = std::pair<NodeName, NodeName>;
+ absl::flat_hash_set<IntroductionKey> in_progress_introductions_
+ ABSL_GUARDED_BY(mutex_);
};
} // namespace ipcz
diff --git a/src/ipcz/node_connector.cc b/src/ipcz/node_connector.cc
index 7186359..73702ba 100644
--- a/src/ipcz/node_connector.cc
+++ b/src/ipcz/node_connector.cc
@@ -37,8 +37,9 @@
flags,
std::move(waiting_portals),
std::move(callback)),
- link_memory_allocation_(NodeLinkMemory::Allocate(node_)) {
- ABSL_ASSERT(link_memory_allocation_.node_link_memory);
+ link_memory_allocation_(
+ NodeLinkMemory::AllocateMemory(node_->driver())) {
+ ABSL_ASSERT(link_memory_allocation_.mapping.is_valid());
}
~NodeConnectorForBrokerToNonBroker() override = default;
@@ -58,7 +59,7 @@
connect.params().num_initial_portals =
checked_cast<uint32_t>(num_portals());
connect.params().buffer = connect.AppendDriverObject(
- link_memory_allocation_.primary_buffer_memory.TakeDriverObject());
+ link_memory_allocation_.memory.TakeDriverObject());
return IPCZ_RESULT_OK == transport_->Transmit(connect);
}
@@ -70,10 +71,11 @@
<< new_remote_node_name_.ToString();
AcceptConnection(
- NodeLink::Create(node_, LinkSide::kA, broker_name_,
- new_remote_node_name_, Node::Type::kNormal,
- connect.params().protocol_version, transport_,
- std::move(link_memory_allocation_.node_link_memory)),
+ NodeLink::Create(
+ node_, LinkSide::kA, broker_name_, new_remote_node_name_,
+ Node::Type::kNormal, connect.params().protocol_version, transport_,
+ NodeLinkMemory::Create(node_,
+ std::move(link_memory_allocation_.mapping))),
LinkSide::kA, connect.params().num_initial_portals);
return true;
}
@@ -81,7 +83,7 @@
private:
const NodeName broker_name_{node_->GetAssignedName()};
const NodeName new_remote_node_name_{node_->GenerateRandomName()};
- NodeLinkMemory::Allocation link_memory_allocation_;
+ DriverMemoryWithMapping link_memory_allocation_;
};
class NodeConnectorForNonBrokerToBroker : public NodeConnector {
@@ -122,11 +124,11 @@
return false;
}
- auto new_link = NodeLink::Create(
- node_, LinkSide::kB, connect.params().receiver_name,
- connect.params().broker_name, Node::Type::kBroker,
- connect.params().protocol_version, transport_,
- NodeLinkMemory::Adopt(node_, std::move(buffer_memory)));
+ auto new_link =
+ NodeLink::Create(node_, LinkSide::kB, connect.params().receiver_name,
+ connect.params().broker_name, Node::Type::kBroker,
+ connect.params().protocol_version, transport_,
+ NodeLinkMemory::Create(node_, buffer_memory.Map()));
node_->SetAssignedName(connect.params().receiver_name);
node_->SetBrokerLink(new_link);
diff --git a/src/ipcz/node_link.cc b/src/ipcz/node_link.cc
index 7e08c64..69df0e3 100644
--- a/src/ipcz/node_link.cc
+++ b/src/ipcz/node_link.cc
@@ -125,6 +125,39 @@
Transmit(add);
}
+void NodeLink::RequestIntroduction(const NodeName& name) {
+ ABSL_ASSERT(remote_node_type_ == Node::Type::kBroker);
+
+ msg::RequestIntroduction request;
+ request.params().name = name;
+ Transmit(request);
+}
+
+void NodeLink::AcceptIntroduction(const NodeName& name,
+ LinkSide side,
+ uint32_t remote_protocol_version,
+ Ref<DriverTransport> transport,
+ DriverMemory memory) {
+ ABSL_ASSERT(node_->type() == Node::Type::kBroker);
+
+ msg::AcceptIntroduction accept;
+ accept.params().name = name;
+ accept.params().link_side = side;
+ accept.params().remote_protocol_version = remote_protocol_version;
+ accept.params().transport =
+ accept.AppendDriverObject(transport->TakeDriverObject());
+ accept.params().memory = accept.AppendDriverObject(memory.TakeDriverObject());
+ Transmit(accept);
+}
+
+void NodeLink::RejectIntroduction(const NodeName& name) {
+ ABSL_ASSERT(node_->type() == Node::Type::kBroker);
+
+ msg::RejectIntroduction reject;
+ reject.params().name = name;
+ Transmit(reject);
+}
+
void NodeLink::Deactivate() {
{
absl::MutexLock lock(&mutex_);
@@ -158,6 +191,59 @@
1, std::memory_order_relaxed));
}
+bool NodeLink::OnRequestIntroduction(msg::RequestIntroduction& request) {
+ // TODO: Support broker-to-broker introduction requests.
+ if (remote_node_type_ != Node::Type::kNormal ||
+ node()->type() != Node::Type::kBroker) {
+ return false;
+ }
+
+ node()->HandleIntroductionRequest(*this, request.params().name);
+ return true;
+}
+
+bool NodeLink::OnAcceptIntroduction(msg::AcceptIntroduction& accept) {
+ if (remote_node_type_ != Node::Type::kBroker) {
+ return false;
+ }
+
+ if (node()->type() != Node::Type::kNormal) {
+ // TODO: Support broker-to-broker introductions.
+ return false;
+ }
+
+ auto memory = DriverMemory(accept.TakeDriverObject(accept.params().memory));
+ if (!memory.is_valid()) {
+ return false;
+ }
+
+ auto mapping = memory.Map();
+ if (!mapping.is_valid()) {
+ return false;
+ }
+
+ auto transport = MakeRefCounted<DriverTransport>(
+ accept.TakeDriverObject(accept.params().transport));
+ node()->AcceptIntroduction(
+ *this, accept.params().name, accept.params().link_side,
+ accept.params().remote_protocol_version, std::move(transport),
+ NodeLinkMemory::Create(node(), std::move(mapping)));
+ return true;
+}
+
+bool NodeLink::OnRejectIntroduction(msg::RejectIntroduction& reject) {
+ if (remote_node_type_ != Node::Type::kBroker) {
+ return false;
+ }
+
+ if (node()->type() != Node::Type::kNormal) {
+ // TODO: Support broker-to-broker introductions.
+ return false;
+ }
+
+ return node()->CancelIntroduction(reject.params().name);
+}
+
bool NodeLink::OnAddBlockBuffer(msg::AddBlockBuffer& add) {
DriverMemory buffer(add.TakeDriverObject(add.params().buffer));
if (!buffer.is_valid()) {
diff --git a/src/ipcz/node_link.h b/src/ipcz/node_link.h
index d7d2292..06e3c00 100644
--- a/src/ipcz/node_link.h
+++ b/src/ipcz/node_link.h
@@ -106,6 +106,24 @@
// AllocateNewBufferId().
void AddBlockBuffer(BufferId id, uint32_t block_size, DriverMemory memory);
+ // Asks the broker on the other end of this link to introduce the local node
+ // to the node identified by `name`. This will always elicit a response from
+ // the broker in the form of either an AcceptIntroduction or
+ // RejectIntroduction message.
+ void RequestIntroduction(const NodeName& name);
+
+ // Introduces the remote node to the node named `name`, with details needed to
+ // construct a new NodeLink to that node.
+ void AcceptIntroduction(const NodeName& name,
+ LinkSide side,
+ uint32_t remote_protocol_version,
+ Ref<DriverTransport> transport,
+ DriverMemory memory);
+
+ // Rejects an introduction request previously sent by the remote node for the
+ // node identified by `name`.
+ void RejectIntroduction(const NodeName& name);
+
// Permanently deactivates this NodeLink. Once this call returns the NodeLink
// will no longer receive transport messages. It may still be used to transmit
// outgoing messages, but it cannot be reactivated. Transmissions over a
@@ -132,6 +150,9 @@
SequenceNumber GenerateOutgoingSequenceNumber();
// NodeMessageListener overrides:
+ bool OnRequestIntroduction(msg::RequestIntroduction& request) override;
+ bool OnAcceptIntroduction(msg::AcceptIntroduction& accept) override;
+ bool OnRejectIntroduction(msg::RejectIntroduction& reject) override;
bool OnAddBlockBuffer(msg::AddBlockBuffer& add) override;
bool OnAcceptParcel(msg::AcceptParcel& accept) override;
bool OnRouteClosed(msg::RouteClosed& route_closed) override;
diff --git a/src/ipcz/node_link_memory.cc b/src/ipcz/node_link_memory.cc
index 6bf92d0..52f2617 100644
--- a/src/ipcz/node_link_memory.cc
+++ b/src/ipcz/node_link_memory.cc
@@ -150,6 +150,7 @@
// Consistency check here, because PrimaryBuffer is private to NodeLinkMemory.
static_assert(sizeof(PrimaryBuffer) <= kPrimaryBufferSize,
"PrimaryBuffer structure is too large.");
+ ABSL_HARDENING_ASSERT(primary_buffer_memory_.size() >= kPrimaryBufferSize);
const BlockAllocator allocators[] = {
primary_buffer_.block_allocator_64(),
@@ -171,16 +172,16 @@
}
// static
-NodeLinkMemory::Allocation NodeLinkMemory::Allocate(Ref<Node> node) {
- DriverMemory primary_buffer_memory(node->driver(), sizeof(PrimaryBuffer));
- if (!primary_buffer_memory.is_valid()) {
- return {.node_link_memory = nullptr, .primary_buffer_memory = {}};
+DriverMemoryWithMapping NodeLinkMemory::AllocateMemory(
+ const IpczDriver& driver) {
+ DriverMemory memory(driver, kPrimaryBufferSize);
+ if (!memory.is_valid()) {
+ return {};
}
- auto memory = AdoptRef(
- new NodeLinkMemory(std::move(node), primary_buffer_memory.Map()));
-
- PrimaryBuffer& primary_buffer = memory->primary_buffer_;
+ DriverMemoryMapping mapping = memory.Map();
+ PrimaryBuffer& primary_buffer =
+ *reinterpret_cast<PrimaryBuffer*>(mapping.bytes().data());
// The first allocable BufferId is 1, because the primary buffer uses 0.
primary_buffer.header.next_buffer_id.store(1, std::memory_order_relaxed);
@@ -201,17 +202,13 @@
primary_buffer.block_allocator_1024().InitializeRegion();
primary_buffer.block_allocator_2048().InitializeRegion();
- return {
- .node_link_memory = std::move(memory),
- .primary_buffer_memory = std::move(primary_buffer_memory),
- };
+ return {std::move(memory), std::move(mapping)};
}
// static
-Ref<NodeLinkMemory> NodeLinkMemory::Adopt(Ref<Node> node,
- DriverMemory primary_buffer_memory) {
- return AdoptRef(
- new NodeLinkMemory(std::move(node), primary_buffer_memory.Map()));
+Ref<NodeLinkMemory> NodeLinkMemory::Create(Ref<Node> node,
+ DriverMemoryMapping memory) {
+ return AdoptRef(new NodeLinkMemory(std::move(node), std::move(memory)));
}
BufferId NodeLinkMemory::AllocateNewBufferId() {
diff --git a/src/ipcz/node_link_memory.h b/src/ipcz/node_link_memory.h
index be6770b..d5e5095 100644
--- a/src/ipcz/node_link_memory.h
+++ b/src/ipcz/node_link_memory.h
@@ -41,24 +41,6 @@
// reserved for use by initial portals.
static constexpr size_t kMaxInitialPortals = 12;
- NodeLinkMemory(NodeLinkMemory&&);
-
- // Returned by Allocate().
- struct Allocation {
- // The NodeLinkMemory created by a succesful call to Allocate(), or null if
- // memory could not be allocated. This memory is initialized with a
- // primary buffer (BufferId 0) whose contents have also been appropriately
- // initialized. This object is ready for immediate use by a new NodeLink on
- // the `node` passed to Allocate().
- Ref<NodeLinkMemory> node_link_memory;
-
- // A handle to the region underlying the new NodeLinkMemory's primary
- // buffer. This should be shared with the corresponding NodeLink's remote
- // node, where it can be passed to Adopt() to establish a new NodeLinkMemory
- // there.
- DriverMemory primary_buffer_memory;
- };
-
// Sets a reference to the NodeLink using this NodeLinkMemory. This is called
// by the NodeLink itself before any other methods can be called on the
// NodeLinkMemory, and it's only reset to null once the NodeLink is
@@ -67,16 +49,16 @@
// memory pool as this one.
void SetNodeLink(Ref<NodeLink> link);
- // Constructs a new NodeLinkMemory over a newly allocated DriverMemory object.
- // The new DriverMemory is returned in `primary_buffer_memory`, while the
- // returned NodeLinkMemory internally retains a mapping of that memory.
- static Allocation Allocate(Ref<Node> node);
+ // Allocates a new DriverMemory object and initializes its contents to be
+ // suitable as the primary buffer of a new NodeLinkMemory. Returns the memory
+ // along with a mapping of it.
+ static DriverMemoryWithMapping AllocateMemory(const IpczDriver& driver);
// Constructs a new NodeLinkMemory with BufferId 0 (the primary buffer) mapped
- // from `primary_buffer_memory`. The buffer must have been created and
- // initialized by a prior call to Allocate() above.
- static Ref<NodeLinkMemory> Adopt(Ref<Node> node,
- DriverMemory primary_buffer_memory);
+ // as `primary_buffer_memory`. The buffer must have been created and
+ // initialized by a prior call to AllocateMemory() above.
+ static Ref<NodeLinkMemory> Create(Ref<Node> node,
+ DriverMemoryMapping primary_buffer_memory);
// Returns a new BufferId which should still be unused by any buffer in this
// NodeLinkMemory's BufferPool, or that of its peer NodeLinkMemory. When
@@ -152,7 +134,10 @@
private:
struct PrimaryBuffer;
- NodeLinkMemory(Ref<Node> node, DriverMemoryMapping primary_buffer);
+ // Constructs a new NodeLinkMemory over `mapping`, which must correspond to
+ // a DriverMemory whose contents have already been initialized as a
+ // NodeLinkMemory primary buffer.
+ NodeLinkMemory(Ref<Node> node, DriverMemoryMapping mapping);
~NodeLinkMemory() override;
// Indicates whether the NodeLinkMemory should be allowed to expand its
diff --git a/src/ipcz/node_link_memory_test.cc b/src/ipcz/node_link_memory_test.cc
index f7c37ba..8e99fb9 100644
--- a/src/ipcz/node_link_memory_test.cc
+++ b/src/ipcz/node_link_memory_test.cc
@@ -7,6 +7,7 @@
#include <utility>
#include <vector>
+#include "ipcz/driver_memory.h"
#include "ipcz/driver_transport.h"
#include "ipcz/ipcz.h"
#include "ipcz/link_side.h"
@@ -33,15 +34,16 @@
void SetUp() override {
auto transports = DriverTransport::CreatePair(kTestDriver);
- auto alloc = NodeLinkMemory::Allocate(node_a_);
- link_a_ =
- NodeLink::Create(node_a_, LinkSide::kA, kTestBrokerName,
- kTestNonBrokerName, Node::Type::kNormal, 0,
- transports.first, std::move(alloc.node_link_memory));
+ DriverMemoryWithMapping buffer =
+ NodeLinkMemory::AllocateMemory(kTestDriver);
+ link_a_ = NodeLink::Create(
+ node_a_, LinkSide::kA, kTestBrokerName, kTestNonBrokerName,
+ Node::Type::kNormal, 0, transports.first,
+ NodeLinkMemory::Create(node_a_, std::move(buffer.mapping)));
link_b_ = NodeLink::Create(
node_b_, LinkSide::kB, kTestNonBrokerName, kTestBrokerName,
Node::Type::kBroker, 0, transports.second,
- NodeLinkMemory::Adopt(node_b_, std::move(alloc.primary_buffer_memory)));
+ NodeLinkMemory::Create(node_b_, buffer.memory.Map()));
node_a_->AddLink(kTestNonBrokerName, link_a_);
node_b_->AddLink(kTestBrokerName, link_b_);
link_a_->transport()->Activate();
diff --git a/src/ipcz/node_link_test.cc b/src/ipcz/node_link_test.cc
index 0eff379..94e91c1 100644
--- a/src/ipcz/node_link_test.cc
+++ b/src/ipcz/node_link_test.cc
@@ -6,6 +6,7 @@
#include <utility>
+#include "ipcz/driver_memory.h"
#include "ipcz/link_side.h"
#include "ipcz/link_type.h"
#include "ipcz/node_link_memory.h"
@@ -35,19 +36,18 @@
auto transport1 =
MakeRefCounted<DriverTransport>(DriverObject(kDriver, handle1));
- NodeLinkMemory::Allocation allocation = NodeLinkMemory::Allocate(broker);
- ABSL_ASSERT(allocation.node_link_memory);
+ DriverMemoryWithMapping buffer = NodeLinkMemory::AllocateMemory(kDriver);
+ ABSL_ASSERT(buffer.mapping.is_valid());
const NodeName non_broker_name = broker->GenerateRandomName();
- auto link0 =
- NodeLink::Create(broker, LinkSide::kA, broker->GetAssignedName(),
- non_broker_name, Node::Type::kNormal, 0, transport0,
- std::move(allocation.node_link_memory));
+ auto link0 = NodeLink::Create(
+ broker, LinkSide::kA, broker->GetAssignedName(), non_broker_name,
+ Node::Type::kNormal, 0, transport0,
+ NodeLinkMemory::Create(broker, std::move(buffer.mapping)));
auto link1 = NodeLink::Create(
non_broker, LinkSide::kB, non_broker_name, broker->GetAssignedName(),
Node::Type::kNormal, 0, transport1,
- NodeLinkMemory::Adopt(non_broker,
- std::move(allocation.primary_buffer_memory)));
+ NodeLinkMemory::Create(non_broker, buffer.memory.Map()));
transport0->Activate();
transport1->Activate();
diff --git a/src/ipcz/node_messages.h b/src/ipcz/node_messages.h
index 30867ff..ed92d7a 100644
--- a/src/ipcz/node_messages.h
+++ b/src/ipcz/node_messages.h
@@ -11,6 +11,7 @@
#include "ipcz/driver_object.h"
#include "ipcz/driver_transport.h"
#include "ipcz/handle_type.h"
+#include "ipcz/link_side.h"
#include "ipcz/message.h"
#include "ipcz/node_name.h"
#include "ipcz/router_descriptor.h"
diff --git a/src/ipcz/node_messages_generator.h b/src/ipcz/node_messages_generator.h
index 6cdbf72..8e98ef0 100644
--- a/src/ipcz/node_messages_generator.h
+++ b/src/ipcz/node_messages_generator.h
@@ -54,6 +54,50 @@
IPCZ_MSG_PARAM(uint32_t, num_initial_portals)
IPCZ_MSG_END()
+// Sent by a non-broker node to a broker node, asking the broker to introduce
+// the non-broker to the node identified by `name`. If the broker is willing and
+// able to comply with this request, it will send an AcceptIntroduction message
+// (see below) to both the sender of this message and the node identified by
+// `name`.
+//
+// If the broker does not know the node named `name`, it will send only a
+// RejectIntroduction message back to the sender to indicate failure.
+IPCZ_MSG_BEGIN(RequestIntroduction, IPCZ_MSG_ID(10), IPCZ_MSG_VERSION(0))
+ IPCZ_MSG_PARAM(NodeName, name)
+IPCZ_MSG_END()
+
+// Introduces one node to another. Sent only by broker nodes and must only be
+// accepted from broker nodes.
+IPCZ_MSG_BEGIN(AcceptIntroduction, IPCZ_MSG_ID(11), IPCZ_MSG_VERSION(0))
+ // The name of the node being introduced to the recipient of this message.
+ IPCZ_MSG_PARAM(NodeName, name)
+
+ // Indicates which nominal side of the link (A or B) the recipient must assume
+ // for the NodeLink it will establish over `transport`.
+ IPCZ_MSG_PARAM(LinkSide, link_side)
+
+ // Indicates the highest ipcz protocol version which the remote side of
+ // `transport` able and willing to use according to the broker.
+ IPCZ_MSG_PARAM(uint32_t, remote_protocol_version)
+
+ // The DriverTransport which should be used by the recipient to establish a
+ // new NodeLink to the named node. The transport's peer endpoint will be
+ // given by the broker to the node identified by `name`.
+ IPCZ_MSG_PARAM_DRIVER_OBJECT(transport)
+
+ // A DriverMemory object which should adopted for the NodeLinkMemory instance
+ // of the newly established NodeLink. This becomes the new NodeLinkMemory's
+ // primary buffer.
+ IPCZ_MSG_PARAM_DRIVER_OBJECT(memory)
+IPCZ_MSG_END()
+
+// Sent back to a non-broker if the broker did not recognzie the subject of an
+// introduction request.
+IPCZ_MSG_BEGIN(RejectIntroduction, IPCZ_MSG_ID(12), IPCZ_MSG_VERSION(0))
+ // The name of the node whose introduction cannot be fulfilled.
+ IPCZ_MSG_PARAM(NodeName, name)
+IPCZ_MSG_END()
+
// Shares a new buffer to support allocation of blocks of `block_size` bytes.
// The sender must initialize an appropriate BlockAllocator within the buffer's
// memory before sending this message.
diff --git a/src/ipcz/node_test.cc b/src/ipcz/node_test.cc
new file mode 100644
index 0000000..8c4685c
--- /dev/null
+++ b/src/ipcz/node_test.cc
@@ -0,0 +1,195 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "ipcz/node.h"
+
+#include <utility>
+
+#include "ipcz/driver_memory.h"
+#include "ipcz/driver_transport.h"
+#include "ipcz/ipcz.h"
+#include "ipcz/link_side.h"
+#include "ipcz/node_link.h"
+#include "ipcz/node_link_memory.h"
+#include "ipcz/node_name.h"
+#include "reference_drivers/sync_reference_driver.h"
+#include "testing/gtest/include/gtest/gtest.h"
+#include "util/ref_counted.h"
+
+namespace ipcz {
+namespace {
+
+const IpczDriver& kTestDriver = reference_drivers::kSyncReferenceDriver;
+
+constexpr NodeName kNodeAName(0, 1);
+constexpr NodeName kNodeBName(1, 2);
+constexpr NodeName kNodeCName(3, 5);
+
+class NodeTest : public testing::Test {
+ public:
+ void SetUp() override {
+ ConnectBrokerToNode(node_a_, kNodeAName);
+ ConnectBrokerToNode(node_b_, kNodeBName);
+ }
+
+ void TearDown() override {
+ node_b_->Close();
+ node_a_->Close();
+ broker_->Close();
+ }
+
+ Node& broker() { return *broker_; }
+ NodeName broker_name() { return broker_->GetAssignedName(); }
+ Node& node_a() { return *node_a_; }
+ Node& node_b() { return *node_b_; }
+
+ private:
+ void ConnectBrokerToNode(Ref<Node> node, const NodeName& name) {
+ auto transports = DriverTransport::CreatePair(kTestDriver);
+ DriverMemoryWithMapping buffer =
+ NodeLinkMemory::AllocateMemory(kTestDriver);
+ const NodeName broker_name = broker_->GetAssignedName();
+ auto broker_link = NodeLink::Create(
+ broker_, LinkSide::kA, broker_name, name, Node::Type::kNormal, 0,
+ transports.first,
+ NodeLinkMemory::Create(broker_, std::move(buffer.mapping)));
+ auto node_link = NodeLink::Create(
+ node, LinkSide::kB, name, broker_name, Node::Type::kBroker, 0,
+ transports.second, NodeLinkMemory::Create(node, buffer.memory.Map()));
+ node->SetAssignedName(name);
+ broker_->AddLink(name, broker_link);
+ node->AddLink(broker_name, node_link);
+ node->SetBrokerLink(node_link);
+ broker_link->transport()->Activate();
+ node_link->transport()->Activate();
+ }
+
+ const Ref<Node> broker_{MakeRefCounted<Node>(Node::Type::kBroker,
+ kTestDriver,
+ IPCZ_INVALID_DRIVER_HANDLE)};
+ const Ref<Node> node_a_{MakeRefCounted<Node>(Node::Type::kNormal,
+ kTestDriver,
+ IPCZ_INVALID_DRIVER_HANDLE)};
+ const Ref<Node> node_b_{MakeRefCounted<Node>(Node::Type::kNormal,
+ kTestDriver,
+ IPCZ_INVALID_DRIVER_HANDLE)};
+};
+
+TEST_F(NodeTest, EstablishExistingLinks) {
+ // When the requested node is already known, EstablishLink() responds
+ // immediately with the existing NodeLink.
+
+ bool called = false;
+ auto expect_link = [&called](Ref<NodeLink>& expected) {
+ return [expected = expected.get(), &called](NodeLink* link) {
+ EXPECT_EQ(expected, link);
+ called = true;
+ };
+ };
+
+ Ref<NodeLink> broker_to_a = broker().GetLink(kNodeAName);
+ EXPECT_TRUE(broker_to_a);
+ broker().EstablishLink(kNodeAName, expect_link(broker_to_a));
+ EXPECT_TRUE(called);
+ called = false;
+
+ Ref<NodeLink> broker_to_b = broker().GetLink(kNodeBName);
+ EXPECT_TRUE(broker_to_b);
+ broker().EstablishLink(kNodeBName, expect_link(broker_to_b));
+ EXPECT_TRUE(called);
+ called = false;
+
+ Ref<NodeLink> a_to_broker = node_a().GetLink(broker_name());
+ EXPECT_TRUE(a_to_broker);
+ node_a().EstablishLink(broker_name(), expect_link(a_to_broker));
+ EXPECT_TRUE(called);
+ called = false;
+
+ Ref<NodeLink> b_to_broker = node_b().GetLink(broker_name());
+ EXPECT_TRUE(b_to_broker);
+ node_b().EstablishLink(broker_name(), expect_link(b_to_broker));
+ EXPECT_TRUE(called);
+ called = false;
+}
+
+TEST_F(NodeTest, EstablishNewLinks) {
+ // When the requested node is not yet known to the caller but is known to
+ // their broker, EstablishLink() coordinates with the broker to get the
+ // two nodes introduced to each other.
+
+ NodeLink* established_link = nullptr;
+ EXPECT_FALSE(node_a().GetLink(kNodeBName));
+ EXPECT_FALSE(node_b().GetLink(kNodeAName));
+ node_a().EstablishLink(kNodeBName, [&established_link](NodeLink* link) {
+ established_link = link;
+ });
+ EXPECT_TRUE(established_link);
+
+ Ref<NodeLink> a_to_b = node_a().GetLink(kNodeBName);
+ Ref<NodeLink> b_to_a = node_b().GetLink(kNodeAName);
+ EXPECT_TRUE(a_to_b);
+ EXPECT_TRUE(b_to_a);
+ EXPECT_EQ(a_to_b.get(), established_link);
+
+ // A redundant EstablishLink() changes nothing, even from the other side.
+ node_a().EstablishLink(
+ kNodeBName, [&a_to_b](NodeLink* link) { EXPECT_EQ(a_to_b.get(), link); });
+ node_b().EstablishLink(
+ kNodeAName, [&b_to_a](NodeLink* link) { EXPECT_EQ(b_to_a.get(), link); });
+ EXPECT_EQ(a_to_b, node_a().GetLink(kNodeBName));
+ EXPECT_EQ(b_to_a, node_b().GetLink(kNodeAName));
+
+ // Verify that the new links are managing a common buffer pool.
+ constexpr uint64_t kMagic = 0x1123581321345589;
+ const Fragment a_fragment = a_to_b->memory().AllocateFragment(8);
+ EXPECT_TRUE(a_fragment.is_addressable());
+ *static_cast<uint64_t*>(a_fragment.address()) = kMagic;
+
+ const Fragment b_fragment =
+ b_to_a->memory().GetFragment(a_fragment.descriptor());
+ EXPECT_TRUE(b_fragment.is_addressable());
+ EXPECT_EQ(kMagic, *static_cast<uint64_t*>(b_fragment.address()));
+ *static_cast<uint64_t*>(b_fragment.address()) = 0;
+ EXPECT_EQ(0u, *static_cast<uint64_t*>(a_fragment.address()));
+}
+
+TEST_F(NodeTest, EstablishLinkFailureFromNonBroker) {
+ // If the named node is unknown to the broker, a link can't be established by
+ // a non-broker.
+ bool failed = false;
+ EXPECT_FALSE(broker().GetLink(kNodeCName));
+ EXPECT_FALSE(node_a().GetLink(kNodeCName));
+ node_a().EstablishLink(kNodeCName, [&](NodeLink* link) {
+ EXPECT_FALSE(link);
+ failed = true;
+ });
+ EXPECT_TRUE(failed);
+}
+
+TEST_F(NodeTest, EstablishLinkFailureFromBroker) {
+ // New links can't be automatically established by the broker.
+ bool failed = false;
+ EXPECT_FALSE(broker().GetLink(kNodeCName));
+ broker().EstablishLink(kNodeCName, [&](NodeLink* link) {
+ EXPECT_FALSE(link);
+ failed = true;
+ });
+ EXPECT_TRUE(failed);
+}
+
+TEST_F(NodeTest, EstablishLinkFailureWithoutBrokerLink) {
+ // A node with no broker link can't be introduced to anyone.
+ bool failed = false;
+ const Ref<Node> node_c = MakeRefCounted<Node>(
+ Node::Type::kNormal, kTestDriver, IPCZ_INVALID_DRIVER_HANDLE);
+ EXPECT_TRUE(broker().GetLink(kNodeAName));
+ node_c->EstablishLink(kNodeAName, [&](NodeLink* link) {
+ EXPECT_FALSE(link);
+ failed = true;
+ });
+ EXPECT_TRUE(failed);
+}
+
+} // namespace
+} // namespace ipcz
diff --git a/src/ipcz/ref_counted_fragment_test.cc b/src/ipcz/ref_counted_fragment_test.cc
index 3690f48..ed33e61 100644
--- a/src/ipcz/ref_counted_fragment_test.cc
+++ b/src/ipcz/ref_counted_fragment_test.cc
@@ -7,6 +7,7 @@
#include <atomic>
#include <tuple>
+#include "ipcz/driver_memory.h"
#include "ipcz/fragment.h"
#include "ipcz/fragment_ref.h"
#include "ipcz/node.h"
@@ -160,7 +161,9 @@
TEST_F(RefCountedFragmentTest, Free) {
auto node = MakeRefCounted<Node>(Node::Type::kNormal, kTestDriver,
IPCZ_INVALID_DRIVER_HANDLE);
- auto memory = NodeLinkMemory::Allocate(std::move(node)).node_link_memory;
+ DriverMemoryWithMapping buffer = NodeLinkMemory::AllocateMemory(kTestDriver);
+ auto memory =
+ NodeLinkMemory::Create(std::move(node), std::move(buffer.mapping));
// Allocate a ton of fragments and let them be released by FragmentRef on
// destruction. If the fragments aren't freed properly, allocations will fail
diff --git a/src/ipcz/router_link_test.cc b/src/ipcz/router_link_test.cc
index 38057ba..f89f1d4 100644
--- a/src/ipcz/router_link_test.cc
+++ b/src/ipcz/router_link_test.cc
@@ -48,15 +48,16 @@
public:
TestNodePair() {
auto transports = DriverTransport::CreatePair(kTestDriver);
- auto alloc = NodeLinkMemory::Allocate(node_a_);
- node_link_a_ =
- NodeLink::Create(node_a_, LinkSide::kA, kTestBrokerName,
- kTestNonBrokerName, Node::Type::kNormal, 0,
- transports.first, std::move(alloc.node_link_memory));
+ DriverMemoryWithMapping buffer =
+ NodeLinkMemory::AllocateMemory(kTestDriver);
+ node_link_a_ = NodeLink::Create(
+ node_a_, LinkSide::kA, kTestBrokerName, kTestNonBrokerName,
+ Node::Type::kNormal, 0, transports.first,
+ NodeLinkMemory::Create(node_a_, std::move(buffer.mapping)));
node_link_b_ = NodeLink::Create(
node_b_, LinkSide::kB, kTestNonBrokerName, kTestBrokerName,
Node::Type::kBroker, 0, transports.second,
- NodeLinkMemory::Adopt(node_b_, std::move(alloc.primary_buffer_memory)));
+ NodeLinkMemory::Create(node_b_, buffer.memory.Map()));
node_a_->AddLink(kTestNonBrokerName, node_link_a_);
node_b_->AddLink(kTestBrokerName, node_link_b_);
}