ipcz: Delegated shared memory allocation

Implements support for IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE, a flag
for ConnectNode() which indicates that all shared memory allocation by
the local node must be delegated to the remote node.

A new DriverMode is added for parameterized multinode tests which uses
the existing async driver but which also always specifies
IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE when connecting a non-broker
test node to the broker test node.

Bug: 1299283
Change-Id: I257ffe37d85b1d25ca343a0c8f9ce1afef3c5787
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3785279
Reviewed-by: Alex Gough <ajgo@chromium.org>
Commit-Queue: Ken Rockot <rockot@google.com>
Cr-Commit-Position: refs/heads/main@{#1030722}
NOKEYCHECK=True
GitOrigin-RevId: 166bbe9cbe25a5e7ca64155572a7e864f64c46b7
diff --git a/src/ipcz/message.cc b/src/ipcz/message.cc
index f53727e..5872d6b 100644
--- a/src/ipcz/message.cc
+++ b/src/ipcz/message.cc
@@ -160,6 +160,10 @@
 }
 
 uint32_t Message::AppendDriverObject(DriverObject object) {
+  if (!object.is_valid()) {
+    return internal::kInvalidDriverObjectIndex;
+  }
+
   const uint32_t index = checked_cast<uint32_t>(driver_objects_.size());
   driver_objects_.push_back(std::move(object));
   return index;
@@ -173,12 +177,17 @@
   };
   driver_objects_.reserve(driver_objects_.size() + objects.size());
   for (auto& object : objects) {
+    ABSL_ASSERT(object.is_valid());
     driver_objects_.push_back(std::move(object));
   }
   return data;
 }
 
 DriverObject Message::TakeDriverObject(uint32_t index) {
+  if (index == internal::kInvalidDriverObjectIndex) {
+    return {};
+  }
+
   // Note that `index` has already been validated by now.
   ABSL_HARDENING_ASSERT(index < driver_objects_.size());
   return std::move(driver_objects_[index]);
@@ -344,10 +353,12 @@
     switch (param.type) {
       case internal::ParamType::kDriverObject: {
         const uint32_t index = GetParamValueAt<uint32_t>(param.offset);
-        if (is_object_claimed[index]) {
-          return false;
+        if (index != internal::kInvalidDriverObjectIndex) {
+          if (is_object_claimed[index]) {
+            return false;
+          }
+          is_object_claimed[index] = true;
         }
-        is_object_claimed[index] = true;
         break;
       }
 
diff --git a/src/ipcz/message.h b/src/ipcz/message.h
index a0b2d29..7aa50d2 100644
--- a/src/ipcz/message.h
+++ b/src/ipcz/message.h
@@ -115,6 +115,10 @@
   uint32_t num_objects;
 };
 
+// Encodes an invalid driver object index. Any driver object field encoded as
+// this value will deserialize to an invalid DriverObject.
+constexpr uint32_t kInvalidDriverObjectIndex = 0xffffffff;
+
 // End of wire structure definitions. Anything below this line is not meant to
 // be encoded into messages.
 #pragma pack(pop)
diff --git a/src/ipcz/node.cc b/src/ipcz/node.cc
index c21aa6f..5c4cf84 100644
--- a/src/ipcz/node.cc
+++ b/src/ipcz/node.cc
@@ -122,12 +122,25 @@
   return name;
 }
 
+void Node::SetAllocationDelegate(Ref<NodeLink> link) {
+  absl::MutexLock lock(&mutex_);
+  ABSL_ASSERT(!allocation_delegate_link_);
+  allocation_delegate_link_ = std::move(link);
+}
+
 void Node::AllocateSharedMemory(size_t size,
                                 AllocateSharedMemoryCallback callback) {
-  // TODO: Implement delegated allocation when this Node is connected to another
-  // with the IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE flag set. For now we
-  // assume all nodes can perform direct allocation.
-  callback(DriverMemory(driver_, size));
+  Ref<NodeLink> delegate;
+  {
+    absl::MutexLock lock(&mutex_);
+    delegate = allocation_delegate_link_;
+  }
+
+  if (delegate) {
+    delegate->RequestMemory(size, std::move(callback));
+  } else {
+    callback(DriverMemory(driver_, size));
+  }
 }
 
 void Node::EstablishLink(const NodeName& name, EstablishLinkCallback callback) {
@@ -227,7 +240,7 @@
 
   Ref<NodeLink> new_link = NodeLink::CreateInactive(
       WrapRefCounted(this), side, local_name, name, Type::kNormal,
-      remote_protocol_version, transport, std::move(memory));
+      remote_protocol_version, transport, memory);
   ABSL_ASSERT(new_link);
 
   std::vector<EstablishLinkCallback> callbacks;
@@ -290,14 +303,20 @@
     link = std::move(it->second);
     node_links_.erase(it);
 
-    DVLOG(4) << "Node " << link->local_node_name().ToString() << " dropping "
+    const NodeName& local_name = link->local_node_name();
+    DVLOG(4) << "Node " << local_name.ToString() << " dropping "
              << " link to " << link->remote_node_name().ToString();
     if (link == broker_link_) {
-      DVLOG(4) << "Node " << link->local_node_name().ToString()
-               << " has lost its broker link";
+      DVLOG(4) << "Node " << local_name.ToString() << " lost its broker link";
       broker_link_.reset();
       lost_broker = true;
     }
+
+    if (link == allocation_delegate_link_) {
+      DVLOG(4) << "Node " << local_name.ToString()
+               << " lost its allocation delegate";
+      allocation_delegate_link_.reset();
+    }
   }
 
   link->Deactivate();
@@ -313,6 +332,7 @@
     absl::MutexLock lock(&mutex_);
     std::swap(node_links_, node_links);
     broker_link_.reset();
+    allocation_delegate_link_.reset();
   }
 
   for (const auto& entry : node_links) {
diff --git a/src/ipcz/node.h b/src/ipcz/node.h
index 7c6f542..ded796d 100644
--- a/src/ipcz/node.h
+++ b/src/ipcz/node.h
@@ -98,6 +98,13 @@
   // randomness.
   NodeName GenerateRandomName() const;
 
+  // Sets a NodeLink to use for asynchronous shared memory allocation requests.
+  // This is configured when the ConnectNode() API is called with
+  // IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE. Typically this is combined with
+  // IPCZ_CONNECT_NODE_TO_BROKER when connecting from a sandboxed process which
+  // cannot allocate its own shared memory regions.
+  void SetAllocationDelegate(Ref<NodeLink> link);
+
   // Requests allocation of a new shared memory object of the given size.
   // `callback` is invoked with the new object when allocation is complete.
   // This operation is asynchronous if allocation is delegated to another node,
@@ -168,6 +175,11 @@
   // the node will lose all its other links too.
   Ref<NodeLink> broker_link_ ABSL_GUARDED_BY(mutex_);
 
+  // A link over which all internal shared memory allocation is delegated. If
+  // null, this Node will always attempt to allocate shared memory directly
+  // through its ipcz driver.
+  Ref<NodeLink> allocation_delegate_link_ ABSL_GUARDED_BY(mutex_);
+
   // Lookup table of broker-assigned node names and links to those nodes. All of
   // these links and their associated names are received by the `broker_link_`
   // if this is a non-broker node. If this is a broker node, these links are
diff --git a/src/ipcz/node_connector.cc b/src/ipcz/node_connector.cc
index 0d3fddd..d33b320 100644
--- a/src/ipcz/node_connector.cc
+++ b/src/ipcz/node_connector.cc
@@ -132,9 +132,9 @@
         NodeLinkMemory::Create(node_, buffer_memory.Map()));
     node_->SetAssignedName(connect.params().receiver_name);
     node_->SetBrokerLink(new_link);
-
-    // TODO: Support delegated allocation of shared memory.
-    ABSL_ASSERT((flags_ & IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE) == 0);
+    if ((flags_ & IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE) != 0) {
+      node_->SetAllocationDelegate(new_link);
+    }
 
     AcceptConnection(std::move(new_link), LinkSide::kB,
                      connect.params().num_initial_portals);
diff --git a/src/ipcz/node_link.cc b/src/ipcz/node_link.cc
index 51f05de..f84033d 100644
--- a/src/ipcz/node_link.cc
+++ b/src/ipcz/node_link.cc
@@ -8,6 +8,7 @@
 #include <atomic>
 #include <cstddef>
 #include <cstdint>
+#include <limits>
 #include <utility>
 
 #include "ipcz/box.h"
@@ -28,6 +29,7 @@
 #include "third_party/abseil-cpp/absl/base/macros.h"
 #include "util/log.h"
 #include "util/ref_counted.h"
+#include "util/safe_math.h"
 
 namespace ipcz {
 
@@ -225,6 +227,18 @@
   Transmit(accept);
 }
 
+void NodeLink::RequestMemory(size_t size, RequestMemoryCallback callback) {
+  const uint32_t size32 = checked_cast<uint32_t>(size);
+  {
+    absl::MutexLock lock(&mutex_);
+    pending_memory_requests_[size32].push_back(std::move(callback));
+  }
+
+  msg::RequestMemory request;
+  request.params().size = size32;
+  Transmit(request);
+}
+
 void NodeLink::Deactivate() {
   {
     absl::MutexLock lock(&mutex_);
@@ -531,6 +545,39 @@
   return true;
 }
 
+bool NodeLink::OnRequestMemory(msg::RequestMemory& request) {
+  DriverMemory memory(node_->driver(), request.params().size);
+  msg::ProvideMemory provide;
+  provide.params().size = request.params().size;
+  provide.params().buffer =
+      provide.AppendDriverObject(memory.TakeDriverObject());
+  Transmit(provide);
+  return true;
+}
+
+bool NodeLink::OnProvideMemory(msg::ProvideMemory& provide) {
+  DriverMemory memory(provide.TakeDriverObject(provide.params().buffer));
+  RequestMemoryCallback callback;
+  {
+    absl::MutexLock lock(&mutex_);
+    auto it = pending_memory_requests_.find(provide.params().size);
+    if (it == pending_memory_requests_.end()) {
+      return false;
+    }
+
+    std::list<RequestMemoryCallback>& callbacks = it->second;
+    ABSL_ASSERT(!callbacks.empty());
+    callback = std::move(callbacks.front());
+    callbacks.pop_front();
+    if (callbacks.empty()) {
+      pending_memory_requests_.erase(it);
+    }
+  }
+
+  callback(std::move(memory));
+  return true;
+}
+
 void NodeLink::OnTransportError() {
   SublinkMap sublinks;
   {
diff --git a/src/ipcz/node_link.h b/src/ipcz/node_link.h
index 11decaa..76003be 100644
--- a/src/ipcz/node_link.h
+++ b/src/ipcz/node_link.h
@@ -160,6 +160,12 @@
       SublinkId new_sublink,
       FragmentRef<RouterLinkState> new_link_state);
 
+  // Sends a request to allocate a new shared memory region and invokes
+  // `callback` once the request succeeds or fails. On failure, `callback` is
+  // invoke with an invalid DriverMemory object.
+  using RequestMemoryCallback = std::function<void(DriverMemory)>;
+  void RequestMemory(size_t size, RequestMemoryCallback callback);
+
   // Permanently deactivates this NodeLink. Once this call returns the NodeLink
   // will no longer receive transport messages. It may still be used to transmit
   // outgoing messages, but it cannot be reactivated. Transmissions over a
@@ -211,6 +217,8 @@
   bool OnBypassPeerWithLink(msg::BypassPeerWithLink& bypass) override;
   bool OnStopProxyingToLocalPeer(msg::StopProxyingToLocalPeer& stop) override;
   bool OnFlushRouter(msg::FlushRouter& flush) override;
+  bool OnRequestMemory(msg::RequestMemory& request) override;
+  bool OnProvideMemory(msg::ProvideMemory& provide) override;
   void OnTransportError() override;
 
   const Ref<Node> node_;
@@ -234,6 +242,13 @@
 
   using SublinkMap = absl::flat_hash_map<SublinkId, Sublink>;
   SublinkMap sublinks_ ABSL_GUARDED_BY(mutex_);
+
+  // Pending memory allocation request callbacks. Keyed by request size, when
+  // an incoming ProvideMemory message is received, the front of the list for
+  // that size is removed from the map and invoked with the new memory object.
+  using MemoryRequestMap =
+      absl::flat_hash_map<uint32_t, std::list<RequestMemoryCallback>>;
+  MemoryRequestMap pending_memory_requests_ ABSL_GUARDED_BY(mutex_);
 };
 
 }  // namespace ipcz
diff --git a/src/ipcz/node_messages_generator.h b/src/ipcz/node_messages_generator.h
index c0a3b97..afdaa77 100644
--- a/src/ipcz/node_messages_generator.h
+++ b/src/ipcz/node_messages_generator.h
@@ -351,4 +351,21 @@
   IPCZ_MSG_PARAM(SublinkId, sublink)
 IPCZ_MSG_END()
 
+// Requests allocation of a shared memory region of a given size. If the
+// recipient can comply, they will send back a corresponding ProvideMemory
+// message with a serialized memory region. This message is only sent to a
+// node's allocation delegate (usually the broker), which is established by
+// providing the IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE flag to ConnectNode().
+IPCZ_MSG_BEGIN(RequestMemory, IPCZ_MSG_ID(64), IPCZ_MSG_VERSION(0))
+  IPCZ_MSG_PARAM(uint32_t, size)
+IPCZ_MSG_END()
+
+// Provides a new shared buffer to the receiver, owned exclusively by the
+// receiver. The receiver is free to duplicate this buffer and share it with
+// other nodes.
+IPCZ_MSG_BEGIN(ProvideMemory, IPCZ_MSG_ID(65), IPCZ_MSG_VERSION(0))
+  IPCZ_MSG_PARAM(uint32_t, size)
+  IPCZ_MSG_PARAM_DRIVER_OBJECT(buffer)
+IPCZ_MSG_END()
+
 IPCZ_MSG_END_INTERFACE()
diff --git a/src/test/multinode_test.cc b/src/test/multinode_test.cc
index 56764a8..9455476 100644
--- a/src/test/multinode_test.cc
+++ b/src/test/multinode_test.cc
@@ -118,6 +118,9 @@
     case DriverMode::kAsync:
       return reference_drivers::kAsyncReferenceDriver;
 
+    case DriverMode::kAsyncDelegatedAlloc:
+      return reference_drivers::kAsyncReferenceDriver;
+
 #if BUILDFLAG(ENABLE_IPCZ_MULTIPROCESS_TESTS)
     case DriverMode::kMultiprocess:
       return reference_drivers::kMultiprocessReferenceDriver;
@@ -142,12 +145,15 @@
 }
 
 void TestNode::ConnectToBroker(absl::Span<IpczHandle> portals) {
+  uint32_t flags = IPCZ_CONNECT_NODE_TO_BROKER;
+  if (driver_mode_ == DriverMode::kAsyncDelegatedAlloc) {
+    flags |= IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE;
+  }
   IpczDriverHandle transport =
       std::exchange(transport_, IPCZ_INVALID_DRIVER_HANDLE);
   ABSL_ASSERT(transport != IPCZ_INVALID_DRIVER_HANDLE);
-  const IpczResult result =
-      ipcz().ConnectNode(node(), transport, portals.size(),
-                         IPCZ_CONNECT_NODE_TO_BROKER, nullptr, portals.data());
+  const IpczResult result = ipcz().ConnectNode(
+      node(), transport, portals.size(), flags, nullptr, portals.data());
   ASSERT_EQ(IPCZ_RESULT_OK, result);
 }
 
diff --git a/src/test/multinode_test.h b/src/test/multinode_test.h
index 7f0b98b..6f08423 100644
--- a/src/test/multinode_test.h
+++ b/src/test/multinode_test.h
@@ -303,11 +303,12 @@
 #endif
 
 // TODO: Add other DriverMode enumerators here as support is landed.
-#define INSTANTIATE_MULTINODE_TEST_SUITE_P(suite) \
-  INSTANTIATE_TEST_SUITE_P(                       \
-      , suite,                                    \
-      ::testing::Values(                          \
-          ipcz::test::DriverMode::kSync,          \
-          ipcz::test::DriverMode::kAsync IPCZ_EXTRA_DRIVER_MODES))
+#define INSTANTIATE_MULTINODE_TEST_SUITE_P(suite)                    \
+  INSTANTIATE_TEST_SUITE_P(                                          \
+      , suite,                                                       \
+      ::testing::Values(ipcz::test::DriverMode::kSync,               \
+                        ipcz::test::DriverMode::kAsync,              \
+                        ipcz::test::DriverMode::kAsyncDelegatedAlloc \
+                            IPCZ_EXTRA_DRIVER_MODES))
 
 #endif  // IPCZ_SRC_TEST_MULTINODE_TEST_H_