ipcz: Dynamic NodeLinkMemory capacity

This implements automatic expansion of NodeLinkMemory's memory capacity
in response to failed allocation requests, as a precursor to supporting
various uses of granular shared memory allocation within ipcz.

To support this, BufferPool is refactored and no longer exposed by
NodeLinkMemory. Instead its role is as a passive data structure for
caching shared buffers and associating them with specific allocation
schemes.

NodeLinkMemory now exposes methods to free and allocate fragments,
which forward to the BufferPool. If NodeLinkMemory fails to allocate a
requested fragment and certain other conditions apply, it may attempt to
allocate a new buffer to expand its allocation capacity.

Bug: 1299283
Change-Id: Id2db21a9142e9a65a814676aaa2076815ea424e4
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3758840
Commit-Queue: Ken Rockot <rockot@google.com>
Reviewed-by: Alex Gough <ajgo@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1023977}
NOKEYCHECK=True
GitOrigin-RevId: a6c67d833372bf1a14a61c81223b995672b41bca
diff --git a/src/BUILD.gn b/src/BUILD.gn
index 461339b..90b97fa 100644
--- a/src/BUILD.gn
+++ b/src/BUILD.gn
@@ -334,6 +334,7 @@
     "ipcz/driver_transport_test.cc",
     "ipcz/message_test.cc",
     "ipcz/node_connector_test.cc",
+    "ipcz/node_link_memory_test.cc",
     "ipcz/node_link_test.cc",
     "ipcz/parcel_queue_test.cc",
     "ipcz/ref_counted_fragment_test.cc",
diff --git a/src/ipcz/buffer_pool.cc b/src/ipcz/buffer_pool.cc
index b6373ca..945e33e 100644
--- a/src/ipcz/buffer_pool.cc
+++ b/src/ipcz/buffer_pool.cc
@@ -18,24 +18,6 @@
 
 BufferPool::~BufferPool() = default;
 
-bool BufferPool::AddBuffer(BufferId id, DriverMemoryMapping mapping) {
-  ABSL_ASSERT(mapping.is_valid());
-
-  absl::MutexLock lock(&mutex_);
-  auto [it, inserted] = mappings_.insert({id, std::move(mapping)});
-  return inserted;
-}
-
-absl::Span<uint8_t> BufferPool::GetBufferMemory(BufferId id) {
-  absl::MutexLock lock(&mutex_);
-  auto it = mappings_.find(id);
-  if (it == mappings_.end()) {
-    return {};
-  }
-
-  return it->second.bytes();
-}
-
 Fragment BufferPool::GetFragment(const FragmentDescriptor& descriptor) {
   if (descriptor.is_null()) {
     return {};
@@ -55,33 +37,74 @@
   return Fragment(descriptor, mapping.address_at(descriptor.offset()));
 }
 
-bool BufferPool::RegisterBlockAllocator(BufferId buffer_id,
-                                        const BlockAllocator& allocator) {
-  const size_t block_size = allocator.block_size();
+bool BufferPool::AddBlockBuffer(
+    BufferId id,
+    DriverMemoryMapping mapping,
+    absl::Span<const BlockAllocator> block_allocators) {
+  ABSL_ASSERT(mapping.is_valid());
 
-  absl::MutexLock lock(&mutex_);
-  auto mapping_it = mappings_.find(buffer_id);
-  if (mapping_it == mappings_.end()) {
-    return false;
+  // Basic consistency checks before we change any pool state: ensure that each
+  // given allocator actually lives within the memory mapped by `mapping`, and
+  // that each has a unique power-of-2 block size.
+  size_t block_sizes_present = 0;
+  for (const auto& allocator : block_allocators) {
+    if (&allocator.region().front() < &mapping.bytes().front() ||
+        &allocator.region().back() > &mapping.bytes().back()) {
+      // Not a valid allocator region for this mapping.
+      return false;
+    }
+
+    const size_t block_size = allocator.block_size();
+    ABSL_ASSERT(block_size >= 8 && block_size <= (1 << 30));
+    if (!absl::has_single_bit(block_size)) {
+      // Not a power of two.
+      return false;
+    }
+
+    if (block_sizes_present & block_size) {
+      // Duplicate allocator block size for this buffer.
+      return false;
+    }
+
+    block_sizes_present |= block_size;
   }
 
-  auto& mapping = mapping_it->second;
-  if (&allocator.region().front() < &mapping.bytes().front() ||
-      &allocator.region().back() > &mapping.bytes().back()) {
-    return false;
+  std::vector<WaitForBufferCallback> callbacks;
+  {
+    absl::MutexLock lock(&mutex_);
+    auto [it, inserted] = mappings_.insert({id, std::move(mapping)});
+    if (!inserted) {
+      ABSL_ASSERT(buffer_callbacks_.empty());
+      return false;
+    }
+
+    auto callbacks_it = buffer_callbacks_.find(id);
+    if (callbacks_it != buffer_callbacks_.end()) {
+      callbacks = std::move(callbacks_it->second);
+      buffer_callbacks_.erase(callbacks_it);
+    }
+
+    auto& inserted_mapping = it->second;
+    for (const auto& allocator : block_allocators) {
+      const size_t block_size = allocator.block_size();
+      auto [pool_it, pool_inserted] =
+          block_allocator_pools_.insert({block_size, nullptr});
+      auto& pool = pool_it->second;
+      if (pool_inserted) {
+        pool = std::make_unique<BlockAllocatorPool>();
+      }
+      pool->Add(id, inserted_mapping.bytes(), allocator);
+    }
   }
 
-  auto [it, inserted] = block_allocator_pools_.insert({block_size, nullptr});
-  auto& pool = it->second;
-  if (inserted) {
-    pool = std::make_unique<BlockAllocatorPool>();
+  for (auto& callback : callbacks) {
+    callback();
   }
 
-  pool->Add(buffer_id, mapping.bytes(), allocator);
   return true;
 }
 
-size_t BufferPool::GetTotalBlockAllocatorCapacity(size_t block_size) {
+size_t BufferPool::GetTotalBlockCapacity(size_t block_size) {
   BlockAllocatorPool* pool;
   {
     absl::MutexLock lock(&mutex_);
@@ -96,8 +119,9 @@
   return pool->GetCapacity();
 }
 
-Fragment BufferPool::AllocateFragment(size_t num_bytes) {
-  const size_t block_size = absl::bit_ceil(num_bytes);
+Fragment BufferPool::AllocateBlock(size_t block_size) {
+  ABSL_ASSERT(absl::has_single_bit(block_size));
+
   BlockAllocatorPool* pool;
   {
     absl::MutexLock lock(&mutex_);
@@ -108,14 +132,16 @@
 
     // NOTE: BlockAllocatorPools live as long as this BufferPool once added, and
     // they are thread-safe objects; so retaining this pointer through the
-    // extent of AllocateFragment() is safe.
+    // extent of AllocateBlock() is safe.
     pool = it->second.get();
   }
 
   return pool->Allocate();
 }
 
-Fragment BufferPool::AllocatePartialFragment(size_t preferred_num_bytes) {
+Fragment BufferPool::AllocateBlockBestEffort(size_t preferred_block_size) {
+  ABSL_ASSERT(absl::has_single_bit(preferred_block_size));
+
   // Limit the number of attempts we make to scale down the requested size in
   // search of an available fragment. This value was chosen arbitrarily.
   constexpr size_t kMaxAttempts = 3;
@@ -128,7 +154,7 @@
       return {};
     }
 
-    pool_iter = block_allocator_pools_.lower_bound(preferred_num_bytes);
+    pool_iter = block_allocator_pools_.lower_bound(preferred_block_size);
     if (pool_iter == block_allocator_pools_.end()) {
       --pool_iter;
     }
@@ -154,7 +180,7 @@
   return {};
 }
 
-bool BufferPool::FreeFragment(const Fragment& fragment) {
+bool BufferPool::FreeBlock(const Fragment& fragment) {
   BlockAllocatorPool* pool;
   {
     absl::MutexLock lock(&mutex_);
@@ -169,4 +195,18 @@
   return pool->Free(fragment);
 }
 
+void BufferPool::WaitForBufferAsync(BufferId id,
+                                    WaitForBufferCallback callback) {
+  {
+    absl::MutexLock lock(&mutex_);
+    auto it = mappings_.find(id);
+    if (it == mappings_.end()) {
+      buffer_callbacks_[id].push_back(std::move(callback));
+      return;
+    }
+  }
+
+  callback();
+}
+
 }  // namespace ipcz
diff --git a/src/ipcz/buffer_pool.h b/src/ipcz/buffer_pool.h
index 1268e74..4244375 100644
--- a/src/ipcz/buffer_pool.h
+++ b/src/ipcz/buffer_pool.h
@@ -16,6 +16,7 @@
 #include "ipcz/fragment_descriptor.h"
 #include "third_party/abseil-cpp/absl/container/flat_hash_map.h"
 #include "third_party/abseil-cpp/absl/synchronization/mutex.h"
+#include "third_party/abseil-cpp/absl/types/span.h"
 
 namespace ipcz {
 
@@ -33,20 +34,6 @@
   BufferPool();
   ~BufferPool();
 
-  // Registers `mapping` under `id` within this pool.
-  //
-  // Returns true if the mapping was successfully added, or false if the pool
-  // already had a buffer registered under the given `id`.
-  bool AddBuffer(BufferId id, DriverMemoryMapping mapping);
-
-  // Returns the full span of memory mapped by the identified buffer, or an
-  // empty span if no such buffer is registered with this BufferPool.
-  //
-  // Note that because buffers remain mapped indefinitely by the BufferPool
-  // once added, this span is safe to retain as long as the BufferPool itself
-  // remains alive.
-  absl::Span<uint8_t> GetBufferMemory(BufferId id);
-
   // Resolves `descriptor` to a concrete Fragment. If the descriptor is null or
   // describes a region of memory which exceeds the bounds of the identified
   // buffer, this returns a null Fragment.
@@ -59,44 +46,47 @@
   // span of mapped memory.
   Fragment GetFragment(const FragmentDescriptor& descriptor);
 
-  // Registers a BlockAllocator with this pool to support subsequent
-  // AllocateFragment() calls. If successful, the allocator may be used to
-  // fulfill fragment allocation requests for any size up to and including
-  // `block_size`.
+  // Registers `mapping` under `id` within this pool, along with a collection of
+  // BlockAllocators that have already been initialized within the mapped
+  // memory, to support block allocation by the pool.
   //
-  // `buffer_id` must identify a buffer mapping which has already been
-  // registered to this pool via AddBuffer(), and `allocator` must be
-  // constructed over a span of memory which falls entirely within that mapping.
+  // Returns true if the mapping and BlockAllocators were successfully added to
+  // the pool, or false if the pool already had a buffer registered under the
+  // given `id` or if any allocator within `allocators` is not contained by
+  // `mapping` or is otherwise invalid.
   //
-  // Returns true on success and false on failure. Failure implies that either
-  // `buffer_id` was unknown or `allocator` does not manage memory within the
-  // identified buffer.
-  bool RegisterBlockAllocator(BufferId buffer_id,
-                              const BlockAllocator& allocator);
+  // Note that every allocator in `block_allocators` must have a unique
+  // power-of-2 block size, as each buffer only supports at most one allocator
+  // per block size.
+  bool AddBlockBuffer(BufferId id,
+                      DriverMemoryMapping mapping,
+                      absl::Span<const BlockAllocator> block_allocators);
 
   // Returns the total size in bytes of capacity available across all registered
   // BlockAllocators for the given `block_size`.
-  size_t GetTotalBlockAllocatorCapacity(size_t block_size);
+  size_t GetTotalBlockCapacity(size_t block_size);
 
-  // Attempts to allocate an unused fragment from the pool with a size of at
-  // least `num_bytes`. For most allocations, this prefers to use a
-  // BlockAllocator for the smallest available block size which still fits
-  // `num_bytes`.
-  //
-  // If the BufferPool cannot accommodate the allocation request, this returns
-  // a null Fragment.
-  Fragment AllocateFragment(size_t num_bytes);
+  // Attempts to allocate an unused block of at least `block_size` bytes from
+  // any available block allocation buffer in the pool, preferring the smaller
+  // blocks over larger ones. If the BufferPool cannot accommodate the
+  // allocation request, this returns a null Fragment.
+  Fragment AllocateBlock(size_t block_size);
 
   // Similar to AllocateFragment(), but this may allocate less space than
   // requested if that's all that's available. May still return a null Fragment
   // if the BufferPool has trouble finding available memory.
-  Fragment AllocatePartialFragment(size_t preferred_num_bytes);
+  Fragment AllocateBlockBestEffort(size_t preferred_block_size);
 
-  // Frees a Fragment previously allocated from this pool via AllocateFragment()
-  // or AllocatePartialFragment(). Returns true if successful, or false if
-  // `fragment` does not identify a fragment allocated from a buffer managed by
-  // this pool.
-  bool FreeFragment(const Fragment& fragment);
+  // Frees a block previously allocated from this pool via AllocateBlock() or
+  // AllocateBlockBestEffort(). Returns true if successful, or false if
+  // `fragment` was not allocated from one of this pool's block buffers.
+  bool FreeBlock(const Fragment& fragment);
+
+  // Runs `callback` as soon as the identified buffer is added to the underlying
+  // BufferPool. If the buffer is already present here, `callback` is run
+  // immediately.
+  using WaitForBufferCallback = std::function<void()>;
+  void WaitForBufferAsync(BufferId id, WaitForBufferCallback callback);
 
  private:
   absl::Mutex mutex_;
@@ -109,6 +99,10 @@
   using BlockAllocatorPoolMap =
       std::map<size_t, std::unique_ptr<BlockAllocatorPool>>;
   BlockAllocatorPoolMap block_allocator_pools_ ABSL_GUARDED_BY(mutex_);
+
+  // Callbacks to be invoked when an identified buffer becomes available.
+  absl::flat_hash_map<BufferId, std::vector<WaitForBufferCallback>>
+      buffer_callbacks_ ABSL_GUARDED_BY(mutex_);
 };
 
 }  // namespace ipcz
diff --git a/src/ipcz/buffer_pool_test.cc b/src/ipcz/buffer_pool_test.cc
index a82dd34..428b61c 100644
--- a/src/ipcz/buffer_pool_test.cc
+++ b/src/ipcz/buffer_pool_test.cc
@@ -31,73 +31,118 @@
                            IPCZ_INVALID_DRIVER_HANDLE)};
 };
 
-TEST_F(BufferPoolTest, AddBuffer) {
-  constexpr size_t kSize = 4096;
-  DriverMemoryMapping mapping = AllocateDriverMemory(kSize);
-  absl::Span<uint8_t> bytes = mapping.bytes();
-
+TEST_F(BufferPoolTest, AddBlockBuffer) {
+  constexpr size_t kBufferSize = 4096;
+  constexpr size_t kBlockSize = 64;
+  DriverMemoryMapping mapping = AllocateDriverMemory(kBufferSize);
+  const absl::Span<uint8_t> bytes = mapping.bytes();
+  const BlockAllocator allocators[] = {{bytes, kBlockSize}};
+  constexpr BufferId id(0);
   BufferPool pool;
-  EXPECT_TRUE(pool.AddBuffer(BufferId{0}, std::move(mapping)));
+  EXPECT_TRUE(pool.AddBlockBuffer(id, std::move(mapping), allocators));
 
-  auto memory = pool.GetBufferMemory(BufferId{0});
-  EXPECT_EQ(bytes.data(), memory.data());
-  EXPECT_EQ(bytes.size(), memory.size());
+  Fragment fragment = pool.GetFragment({id, 0, kBufferSize});
+  EXPECT_TRUE(fragment.is_addressable());
+  EXPECT_EQ(bytes.data(), fragment.bytes().data());
+  EXPECT_EQ(bytes.size(), fragment.bytes().size());
+}
 
-  // No duplicates.
-  DriverMemoryMapping another_mapping = AllocateDriverMemory(kSize);
-  EXPECT_FALSE(pool.AddBuffer(BufferId{0}, std::move(another_mapping)));
+TEST_F(BufferPoolTest, AddBlockBufferNoDuplicateBufferId) {
+  constexpr size_t kBufferSize = 4096;
+  constexpr size_t kBlockSize = 64;
+  DriverMemoryMapping mapping = AllocateDriverMemory(kBufferSize);
+  const absl::Span<uint8_t> bytes = mapping.bytes();
+  const BlockAllocator allocators[] = {{bytes, kBlockSize}};
+  constexpr BufferId id(0);
+  BufferPool pool;
+  EXPECT_TRUE(pool.AddBlockBuffer(id, std::move(mapping), allocators));
 
-  // BufferId 0 is still the original buffer.
-  memory = pool.GetBufferMemory(BufferId{0});
-  EXPECT_EQ(bytes.data(), memory.data());
-  EXPECT_EQ(bytes.size(), memory.size());
+  // Adding another buffer with the same ID as above must fail.
+  DriverMemoryMapping another_mapping = AllocateDriverMemory(kBufferSize);
+  const BlockAllocator another_allocator(another_mapping.bytes(), kBlockSize);
+  EXPECT_FALSE(pool.AddBlockBuffer(id, std::move(another_mapping),
+                                   {&another_allocator, 1}));
 
-  DriverMemoryMapping yet_another_mapping = AllocateDriverMemory(kSize);
-  absl::Span<uint8_t> other_bytes = yet_another_mapping.bytes();
-  EXPECT_TRUE(pool.AddBuffer(BufferId{1}, std::move(yet_another_mapping)));
+  // Fragment resolution against buffer 0 should still map to the first buffer.
+  Fragment fragment = pool.GetFragment({id, 0, kBufferSize});
+  EXPECT_TRUE(fragment.is_addressable());
+  EXPECT_EQ(bytes.data(), fragment.bytes().data());
+  EXPECT_EQ(bytes.size(), fragment.bytes().size());
+}
 
-  // BufferId 0 is still the original buffer.
-  memory = pool.GetBufferMemory(BufferId{0});
-  EXPECT_EQ(bytes.data(), memory.data());
-  EXPECT_EQ(bytes.size(), memory.size());
+TEST_F(BufferPoolTest, AddBlockBufferNoDuplicateAllocatorBlockSizes) {
+  constexpr size_t kBufferSize = 4096;
+  constexpr size_t kBlockSize = 64;
+  DriverMemoryMapping mapping = AllocateDriverMemory(kBufferSize);
+  const absl::Span<uint8_t> bytes = mapping.bytes();
 
-  // BufferId 1 is available now too.
-  memory = pool.GetBufferMemory(BufferId{1});
-  EXPECT_EQ(other_bytes.data(), memory.data());
-  EXPECT_EQ(other_bytes.size(), memory.size());
+  // Carve up the buffer into two separate allocators for the same block size,
+  // and try to register them both. This is unsupported.
+  const BlockAllocator allocators[] = {
+      {bytes.subspan(0, kBlockSize * 2), kBlockSize},
+      {bytes.subspan(kBlockSize * 2), kBlockSize},
+  };
+
+  constexpr BufferId id(0);
+  BufferPool pool;
+  EXPECT_FALSE(pool.AddBlockBuffer(id, std::move(mapping), allocators));
+
+  // No buffer is registered to resolve this fragment.
+  Fragment fragment = pool.GetFragment({id, 0, 8});
+  EXPECT_TRUE(fragment.is_pending());
+}
+
+TEST_F(BufferPoolTest, AddBlockBufferRequireBlockSizePowerOfTwo) {
+  constexpr size_t kBufferSize = 4096;
+  constexpr size_t kBadBlockSize = 80;
+  DriverMemoryMapping mapping = AllocateDriverMemory(kBufferSize);
+  const BlockAllocator bad_allocator(mapping.bytes(), kBadBlockSize);
+
+  constexpr BufferId id(0);
+  BufferPool pool;
+  EXPECT_FALSE(
+      pool.AddBlockBuffer(id, std::move(mapping), {&bad_allocator, 1}));
+
+  // No buffer is registered to resolve this fragment.
+  Fragment fragment = pool.GetFragment({id, 0, 8});
+  EXPECT_TRUE(fragment.is_pending());
 }
 
 TEST_F(BufferPoolTest, GetFragment) {
-  constexpr size_t kSize1 = 4096;
-  constexpr size_t kSize2 = 2048;
-  DriverMemoryMapping mapping1 = AllocateDriverMemory(kSize1);
-  DriverMemoryMapping mapping2 = AllocateDriverMemory(kSize2);
+  constexpr size_t kBufferSize1 = 4096;
+  constexpr size_t kBufferSize2 = 2048;
+  constexpr size_t kBlockSize = 64;
+  DriverMemoryMapping mapping1 = AllocateDriverMemory(kBufferSize1);
+  DriverMemoryMapping mapping2 = AllocateDriverMemory(kBufferSize2);
   absl::Span<uint8_t> bytes1 = mapping1.bytes();
   absl::Span<uint8_t> bytes2 = mapping2.bytes();
+  BlockAllocator allocators1[] = {{bytes1, kBlockSize}};
+  BlockAllocator allocators2[] = {{bytes2, kBlockSize}};
 
   BufferPool pool;
-  EXPECT_TRUE(pool.AddBuffer(BufferId{1}, std::move(mapping1)));
-  EXPECT_TRUE(pool.AddBuffer(BufferId{2}, std::move(mapping2)));
+  constexpr BufferId id1(1);
+  constexpr BufferId id2(2);
+  EXPECT_TRUE(pool.AddBlockBuffer(id1, std::move(mapping1), allocators1));
+  EXPECT_TRUE(pool.AddBlockBuffer(id2, std::move(mapping2), allocators2));
 
   // We can resolve fragments covering entire buffers.
-  Fragment fragment =
-      pool.GetFragment(FragmentDescriptor{BufferId{1}, 0, kSize1});
+  Fragment fragment = pool.GetFragment({id1, /*offset=*/0, kBufferSize1});
   EXPECT_FALSE(fragment.is_null());
   EXPECT_TRUE(fragment.is_addressable());
   EXPECT_EQ(bytes1.data(), fragment.bytes().data());
-  EXPECT_EQ(kSize1, fragment.bytes().size());
+  EXPECT_EQ(kBufferSize1, fragment.bytes().size());
 
-  fragment = pool.GetFragment(FragmentDescriptor{BufferId{2}, 0, kSize2});
+  fragment = pool.GetFragment({id2, /*offset=*/0, kBufferSize2});
   EXPECT_FALSE(fragment.is_null());
   EXPECT_TRUE(fragment.is_addressable());
   EXPECT_EQ(bytes2.data(), fragment.bytes().data());
-  EXPECT_EQ(kSize2, fragment.bytes().size());
+  EXPECT_EQ(kBufferSize2, fragment.bytes().size());
 
   // We can resolve fragments covering a subspan of a buffer.
   constexpr size_t kPartialFragmentOffset = 4;
-  constexpr size_t kPartialFragmentSize = kSize2 / 2;
-  fragment = pool.GetFragment(FragmentDescriptor{
-      BufferId{2}, kPartialFragmentOffset, kPartialFragmentSize});
+  constexpr size_t kPartialFragmentSize = kBufferSize2 / 2;
+  fragment =
+      pool.GetFragment({id2, kPartialFragmentOffset, kPartialFragmentSize});
   EXPECT_FALSE(fragment.is_null());
   EXPECT_TRUE(fragment.is_addressable());
   EXPECT_EQ(bytes2.subspan(kPartialFragmentOffset).data(),
@@ -110,6 +155,7 @@
   EXPECT_FALSE(fragment.is_null());
   EXPECT_FALSE(fragment.is_addressable());
   EXPECT_TRUE(fragment.is_pending());
+  EXPECT_EQ(nullptr, fragment.address());
   EXPECT_EQ(descriptor.buffer_id(), fragment.buffer_id());
   EXPECT_EQ(descriptor.offset(), fragment.offset());
   EXPECT_EQ(descriptor.size(), fragment.size());
@@ -119,153 +165,147 @@
   EXPECT_TRUE(fragment.is_null());
 
   // Out-of-bounds descriptors resolve to null fragments too.
-  fragment = pool.GetFragment(FragmentDescriptor{BufferId{1}, 0, kSize1 + 1});
+  fragment = pool.GetFragment(FragmentDescriptor{id1, 0, kBufferSize1 + 1});
   EXPECT_TRUE(fragment.is_null());
 }
 
 TEST_F(BufferPoolTest, BasicBlockAllocation) {
-  BufferPool pool;
-  pool.AddBuffer(BufferId{0}, AllocateDriverMemory(4096));
-  pool.AddBuffer(BufferId{1}, AllocateDriverMemory(4096));
-
+  constexpr size_t kBufferSize = 4096;
   constexpr size_t kBlockSize = 64;
-  BlockAllocator allocator1(pool.GetBufferMemory(BufferId{0}), kBlockSize);
+
+  auto mapping0 = AllocateDriverMemory(kBufferSize);
+  auto mapping1 = AllocateDriverMemory(kBufferSize);
+  auto bytes0 = mapping0.bytes();
+  auto bytes1 = mapping1.bytes();
+
+  BlockAllocator allocator0(bytes0, kBlockSize);
+  allocator0.InitializeRegion();
+
+  BlockAllocator allocator1(bytes1, kBlockSize);
   allocator1.InitializeRegion();
 
-  BlockAllocator allocator2(pool.GetBufferMemory(BufferId{1}), kBlockSize);
-  allocator2.InitializeRegion();
+  BufferPool pool;
+  constexpr BufferId id0(0);
+  constexpr BufferId id1(1);
+  EXPECT_TRUE(pool.AddBlockBuffer(id0, std::move(mapping0), {&allocator0, 1}));
+  EXPECT_TRUE(pool.AddBlockBuffer(id1, std::move(mapping1), {&allocator1, 1}));
 
-  EXPECT_TRUE(pool.RegisterBlockAllocator(BufferId{0}, allocator1));
-
-  // No duplicates.
-  EXPECT_FALSE(pool.RegisterBlockAllocator(BufferId{0}, allocator2));
-
-  EXPECT_TRUE(pool.RegisterBlockAllocator(BufferId{1}, allocator2));
-
-  EXPECT_EQ(kBlockSize * (allocator1.capacity() + allocator2.capacity()),
-            pool.GetTotalBlockAllocatorCapacity(kBlockSize));
+  EXPECT_EQ(kBlockSize * (allocator0.capacity() + allocator1.capacity()),
+            pool.GetTotalBlockCapacity(kBlockSize));
 
   // We can't free something that isn't a valid allocation.
-  EXPECT_FALSE(pool.FreeFragment(Fragment{{}, nullptr}));
-  EXPECT_FALSE(pool.FreeFragment(Fragment{{BufferId{1000}, 0, 1}, nullptr}));
-  EXPECT_FALSE(pool.FreeFragment(
-      Fragment{{BufferId{0}, 0, 1}, pool.GetBufferMemory(BufferId{0}).data()}));
+  EXPECT_FALSE(pool.FreeBlock(Fragment{{}, nullptr}));
+  EXPECT_FALSE(pool.FreeBlock(Fragment{{BufferId{1000}, 0, 1}, nullptr}));
+  EXPECT_FALSE(pool.FreeBlock(Fragment{{BufferId{0}, 0, 1}, bytes0.data()}));
 
   // Allocate all available capacity.
   std::vector<Fragment> fragments;
   for (;;) {
-    Fragment fragment = pool.AllocateFragment(kBlockSize);
+    Fragment fragment = pool.AllocateBlock(kBlockSize);
     if (fragment.is_null()) {
       break;
     }
     fragments.push_back(fragment);
   }
 
-  EXPECT_EQ(allocator1.capacity() + allocator2.capacity(), fragments.size());
+  EXPECT_EQ(allocator0.capacity() + allocator1.capacity(), fragments.size());
   for (const Fragment& fragment : fragments) {
-    EXPECT_TRUE(pool.FreeFragment(fragment));
+    EXPECT_TRUE(pool.FreeBlock(fragment));
   }
 }
 
 TEST_F(BufferPoolTest, BlockAllocationSizing) {
-  BufferPool pool;
-  EXPECT_TRUE(pool.AddBuffer(BufferId{1}, AllocateDriverMemory(4096)));
-  EXPECT_TRUE(pool.AddBuffer(BufferId{2}, AllocateDriverMemory(4096)));
+  constexpr size_t kBufferSize = 4096;
+  DriverMemoryMapping mapping1 = AllocateDriverMemory(kBufferSize);
+  DriverMemoryMapping mapping2 = AllocateDriverMemory(kBufferSize);
 
   constexpr size_t kBuffer1BlockSize = 64;
-  BlockAllocator allocator1(pool.GetBufferMemory(BufferId{1}),
-                            kBuffer1BlockSize);
+  BlockAllocator allocator1(mapping1.bytes(), kBuffer1BlockSize);
   allocator1.InitializeRegion();
 
-  constexpr size_t kBuffer2BlockSize = 128;
-  BlockAllocator allocator2(pool.GetBufferMemory(BufferId{2}),
-                            kBuffer2BlockSize);
+  constexpr size_t kBuffer2BlockSize = kBuffer1BlockSize * 4;
+  BlockAllocator allocator2(mapping2.bytes(), kBuffer2BlockSize);
   allocator2.InitializeRegion();
 
-  EXPECT_TRUE(pool.RegisterBlockAllocator(BufferId{1}, allocator1));
-  EXPECT_TRUE(pool.RegisterBlockAllocator(BufferId{2}, allocator2));
+  BufferPool pool;
+  constexpr BufferId id1(1);
+  constexpr BufferId id2(2);
+  EXPECT_TRUE(pool.AddBlockBuffer(id1, std::move(mapping1), {&allocator1, 1}));
+  EXPECT_TRUE(pool.AddBlockBuffer(id2, std::move(mapping2), {&allocator2, 1}));
 
   // Allocations not larger than 64 bytes should be drawn from buffer 1.
 
-  Fragment fragment = pool.AllocateFragment(1);
+  Fragment fragment = pool.AllocateBlock(1);
   EXPECT_TRUE(fragment.is_addressable());
-  EXPECT_EQ(BufferId{1}, fragment.buffer_id());
+  EXPECT_EQ(id1, fragment.buffer_id());
   EXPECT_EQ(kBuffer1BlockSize, fragment.size());
 
-  fragment = pool.AllocateFragment(kBuffer1BlockSize / 2);
+  fragment = pool.AllocateBlock(kBuffer1BlockSize / 2);
   EXPECT_TRUE(fragment.is_addressable());
-  EXPECT_EQ(BufferId{1}, fragment.buffer_id());
+  EXPECT_EQ(id1, fragment.buffer_id());
   EXPECT_EQ(kBuffer1BlockSize, fragment.size());
 
-  fragment = pool.AllocateFragment(kBuffer1BlockSize);
+  fragment = pool.AllocateBlock(kBuffer1BlockSize);
   EXPECT_TRUE(fragment.is_addressable());
-  EXPECT_EQ(BufferId{1}, fragment.buffer_id());
+  EXPECT_EQ(id1, fragment.buffer_id());
   EXPECT_EQ(kBuffer1BlockSize, fragment.size());
 
-  // Larger allocations which are still no larger than 128 bytes should be drawn
-  // from buffer 2.
+  // Larger allocations which are still no larger than kBuffer2BlockSize bytes
+  // should be drawn from buffer 2.
 
-  fragment = pool.AllocateFragment(kBuffer1BlockSize + 1);
+  fragment = pool.AllocateBlock(kBuffer1BlockSize * 2);
   EXPECT_TRUE(fragment.is_addressable());
-  EXPECT_EQ(BufferId{2}, fragment.buffer_id());
+  EXPECT_EQ(id2, fragment.buffer_id());
   EXPECT_EQ(kBuffer2BlockSize, fragment.size());
 
-  fragment = pool.AllocateFragment(kBuffer2BlockSize);
+  fragment = pool.AllocateBlock(kBuffer2BlockSize);
   EXPECT_TRUE(fragment.is_addressable());
-  EXPECT_EQ(BufferId{2}, fragment.buffer_id());
+  EXPECT_EQ(id2, fragment.buffer_id());
   EXPECT_EQ(kBuffer2BlockSize, fragment.size());
 
   // Anything larger than kBuffer2BlockSize should fail to allocate.
 
-  fragment = pool.AllocateFragment(kBuffer2BlockSize + 1);
+  fragment = pool.AllocateBlock(kBuffer2BlockSize * 2);
   EXPECT_TRUE(fragment.is_null());
 }
 
-TEST_F(BufferPoolTest, PartialBlockAllocation) {
-  BufferPool pool;
-  EXPECT_TRUE(pool.AddBuffer(BufferId{1}, AllocateDriverMemory(4096)));
-  EXPECT_TRUE(pool.AddBuffer(BufferId{2}, AllocateDriverMemory(4096)));
+TEST_F(BufferPoolTest, BestEffortBlockAllocation) {
+  constexpr size_t kBufferSize = 4096;
+  auto mapping1 = AllocateDriverMemory(kBufferSize);
+  auto mapping2 = AllocateDriverMemory(kBufferSize);
 
   constexpr size_t kBuffer1BlockSize = 64;
-  BlockAllocator allocator1(pool.GetBufferMemory(BufferId{1}),
-                            kBuffer1BlockSize);
+  BlockAllocator allocator1(mapping1.bytes(), kBuffer1BlockSize);
   allocator1.InitializeRegion();
 
   constexpr size_t kBuffer2BlockSize = 128;
-  BlockAllocator allocator2(pool.GetBufferMemory(BufferId{2}),
-                            kBuffer2BlockSize);
+  BlockAllocator allocator2(mapping2.bytes(), kBuffer2BlockSize);
   allocator2.InitializeRegion();
 
-  EXPECT_TRUE(pool.RegisterBlockAllocator(BufferId{1}, allocator1));
-  EXPECT_TRUE(pool.RegisterBlockAllocator(BufferId{2}, allocator2));
+  BufferPool pool;
+  constexpr BufferId id1(1);
+  constexpr BufferId id2(2);
+  EXPECT_TRUE(pool.AddBlockBuffer(id1, std::move(mapping1), {&allocator1, 1}));
+  EXPECT_TRUE(pool.AddBlockBuffer(id2, std::move(mapping2), {&allocator2, 1}));
 
-  // Oversized partial allocations can succceed.
+  // Oversized best-effort allocations can succceed.
 
   Fragment partial_fragment =
-      pool.AllocatePartialFragment(kBuffer2BlockSize + 1);
+      pool.AllocateBlockBestEffort(kBuffer2BlockSize * 2);
   EXPECT_TRUE(partial_fragment.is_addressable());
-  EXPECT_EQ(BufferId{2}, partial_fragment.buffer_id());
+  EXPECT_EQ(id2, partial_fragment.buffer_id());
   EXPECT_EQ(kBuffer2BlockSize, partial_fragment.size());
 
   // If we exhaust a sufficient block size, we should fall back onto smaller
-  // block sizes.
-
-  // First allocate all available capacity for kBuffer2BlockSize.
-  std::vector<Fragment> fragments;
-  for (;;) {
-    Fragment fragment = pool.AllocateFragment(kBuffer2BlockSize);
-    if (fragment.is_null()) {
-      break;
-    }
-    fragments.push_back(fragment);
+  // block sizes. First allocate all available capacity for kBuffer2BlockSize,
+  // and then a partial allocation of kBuffer2BlockSize should succeed with a
+  // a smaller size (kBuffer1BlockSize).
+  while (!pool.AllocateBlock(kBuffer2BlockSize).is_null()) {
   }
 
-  // A partial allocation of kBuffer2BlockSize should still succeed, albeit for
-  // a smaller size (kBuffer1BlockSize).
-
-  partial_fragment = pool.AllocatePartialFragment(kBuffer2BlockSize);
+  partial_fragment = pool.AllocateBlockBestEffort(kBuffer2BlockSize);
   EXPECT_TRUE(partial_fragment.is_addressable());
-  EXPECT_EQ(BufferId{1}, partial_fragment.buffer_id());
+  EXPECT_EQ(id1, partial_fragment.buffer_id());
   EXPECT_EQ(kBuffer1BlockSize, partial_fragment.size());
 }
 
diff --git a/src/ipcz/fragment_ref.cc b/src/ipcz/fragment_ref.cc
index e586924..b294e1c 100644
--- a/src/ipcz/fragment_ref.cc
+++ b/src/ipcz/fragment_ref.cc
@@ -41,7 +41,7 @@
     return;
   }
 
-  memory->buffer_pool().FreeFragment(fragment);
+  memory->FreeFragment(fragment);
 }
 
 Fragment GenericFragmentRef::release() {
diff --git a/src/ipcz/node.cc b/src/ipcz/node.cc
index bc5e1ec..eb6f865 100644
--- a/src/ipcz/node.cc
+++ b/src/ipcz/node.cc
@@ -109,6 +109,14 @@
   return name;
 }
 
+void Node::AllocateSharedMemory(size_t size,
+                                AllocateSharedMemoryCallback callback) {
+  // TODO: Implement delegated allocation when this Node is connected to another
+  // with the IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE flag set. For now we
+  // assume all nodes can perform direct allocation.
+  callback(DriverMemory(driver_, size));
+}
+
 void Node::ShutDown() {
   NodeLinkMap node_links;
   {
diff --git a/src/ipcz/node.h b/src/ipcz/node.h
index 1310a17..d5fc67d 100644
--- a/src/ipcz/node.h
+++ b/src/ipcz/node.h
@@ -5,7 +5,10 @@
 #ifndef IPCZ_SRC_IPCZ_NODE_H_
 #define IPCZ_SRC_IPCZ_NODE_H_
 
+#include <functional>
+
 #include "ipcz/api_object.h"
+#include "ipcz/driver_memory.h"
 #include "ipcz/ipcz.h"
 #include "ipcz/node_name.h"
 #include "third_party/abseil-cpp/absl/container/flat_hash_map.h"
@@ -87,6 +90,14 @@
   // randomness.
   NodeName GenerateRandomName() const;
 
+  // Requests allocation of a new shared memory object of the given size.
+  // `callback` is invoked with the new object when allocation is complete.
+  // This operation is asynchronous if allocation is delegated to another node,
+  // but if this node can allocate directly through the driver, `callback` is
+  // invoked with the result before this method returns.
+  using AllocateSharedMemoryCallback = std::function<void(DriverMemory)>;
+  void AllocateSharedMemory(size_t size, AllocateSharedMemoryCallback callback);
+
  private:
   ~Node() override;
 
diff --git a/src/ipcz/node_link.cc b/src/ipcz/node_link.cc
index 2edd013..fe89b0b 100644
--- a/src/ipcz/node_link.cc
+++ b/src/ipcz/node_link.cc
@@ -58,6 +58,7 @@
       transport_(std::move(transport)),
       memory_(std::move(memory)) {
   transport_->set_listener(WrapRefCounted(this));
+  memory_->SetNodeLink(WrapRefCounted(this));
 }
 
 NodeLink::~NodeLink() {
@@ -114,6 +115,16 @@
   return it->second.receiver;
 }
 
+void NodeLink::AddBlockBuffer(BufferId id,
+                              uint32_t block_size,
+                              DriverMemory memory) {
+  msg::AddBlockBuffer add;
+  add.params().id = id;
+  add.params().block_size = block_size;
+  add.params().buffer = add.AppendDriverObject(memory.TakeDriverObject());
+  Transmit(add);
+}
+
 void NodeLink::Deactivate() {
   {
     absl::MutexLock lock(&mutex_);
@@ -125,6 +136,7 @@
 
   OnTransportError();
   transport_->Deactivate();
+  memory_->SetNodeLink(nullptr);
 }
 
 void NodeLink::Transmit(Message& message) {
@@ -146,6 +158,15 @@
       1, std::memory_order_relaxed));
 }
 
+bool NodeLink::OnAddBlockBuffer(msg::AddBlockBuffer& add) {
+  DriverMemory buffer(add.TakeDriverObject(add.params().buffer));
+  if (!buffer.is_valid()) {
+    return false;
+  }
+  return memory().AddBlockBuffer(add.params().id, add.params().block_size,
+                                 buffer.Map());
+}
+
 bool NodeLink::OnAcceptParcel(msg::AcceptParcel& accept) {
   absl::Span<const uint8_t> parcel_data =
       accept.GetArrayView<uint8_t>(accept.params().parcel_data);
diff --git a/src/ipcz/node_link.h b/src/ipcz/node_link.h
index 2922add..54fad52 100644
--- a/src/ipcz/node_link.h
+++ b/src/ipcz/node_link.h
@@ -99,6 +99,13 @@
   // Retrieves only the Router currently bound to `sublink` on this NodeLink.
   Ref<Router> GetRouter(SublinkId sublink);
 
+  // Sends a new driver memory object to the remote endpoint to be associated
+  // with BufferId within the peer NodeLink's associated NodeLinkMemory, and to
+  // be used to dynamically allocate blocks of `block_size` bytes. The BufferId
+  // must have already been reserved locally by this NodeLink using
+  // AllocateNewBufferId().
+  void AddBlockBuffer(BufferId id, uint32_t block_size, DriverMemory memory);
+
   // Permanently deactivates this NodeLink. Once this call returns the NodeLink
   // will no longer receive transport messages. It may still be used to transmit
   // outgoing messages, but it cannot be reactivated. Transmissions over a
@@ -125,6 +132,7 @@
   SequenceNumber GenerateOutgoingSequenceNumber();
 
   // NodeMessageListener overrides:
+  bool OnAddBlockBuffer(msg::AddBlockBuffer& add) override;
   bool OnAcceptParcel(msg::AcceptParcel& accept) override;
   bool OnRouteClosed(msg::RouteClosed& route_closed) override;
   bool OnFlushRouter(msg::FlushRouter& flush) override;
diff --git a/src/ipcz/node_link_memory.cc b/src/ipcz/node_link_memory.cc
index 1711739..74f7358 100644
--- a/src/ipcz/node_link_memory.cc
+++ b/src/ipcz/node_link_memory.cc
@@ -16,6 +16,9 @@
 #include "ipcz/node.h"
 #include "ipcz/node_link.h"
 #include "third_party/abseil-cpp/absl/base/macros.h"
+#include "third_party/abseil-cpp/absl/numeric/bits.h"
+#include "third_party/abseil-cpp/absl/synchronization/mutex.h"
+#include "util/log.h"
 #include "util/ref_counted.h"
 
 namespace ipcz {
@@ -25,12 +28,39 @@
 constexpr BufferId kPrimaryBufferId{0};
 
 // Fixed allocation size for each NodeLink's primary shared buffer.
-constexpr size_t kPrimaryBufferSize = 65536;
+constexpr size_t kPrimaryBufferSize = 64 * 1024;
 
 // The front of the primary buffer is reserved for special current and future
 // uses which require synchronous availability throughout a link's lifetime.
 constexpr size_t kPrimaryBufferReservedHeaderSize = 256;
 
+// NodeLinkMemory may expand its BufferPool's capacity for each fragment size
+// as needed. All newly allocated buffers for this purpose must be a multiple of
+// kBlockAllocatorPageSize. More specifically, a new buffer allocation for
+// fragment size `n` will be the smallest multiple of kBlockAllocatorPageSize
+// which can still fit at least kMinBlockAllocatorCapacity blocks of size `n`.
+constexpr size_t kBlockAllocatorPageSize = 64 * 1024;
+
+// The minimum number of blocks which new BlockAllocator buffers must support.
+// See comments on kBlockAllocatorPageSize above.
+constexpr size_t kMinBlockAllocatorCapacity = 8;
+
+// The maximum total BlockAllocator capacity to automatically reserve for any
+// given fragment size within the BufferPool. This is not a hard cap on capacity
+// per fragment size, but it sets a limit on how large the pool will grow
+// automatically in response to failed allocation requests.
+constexpr size_t kMaxBlockAllocatorCapacityPerFragmentSize = 256 * 1024;
+
+// The minimum fragment size (in bytes) to support with dedicated BufferPool
+// capacity. All fragment sizes are powers of two. Fragment allocations below
+// this size are rounded up to this size.
+constexpr size_t kMinFragmentSize = 64;
+
+// The maximum fragment size to support with dedicated BlockAllocator capacity
+// within the BufferPool. Allocations beyond this size must fail or fall back
+// onto a different allocation scheme which does not use a BlockAllocator.
+constexpr size_t kMaxFragmentSizeForBlockAllocation = 16 * 1024;
+
 // The number of fixed RouterLinkState locations in the primary buffer. This
 // limits the maximum number of initial portals supported by the ConnectNode()
 // API. Note that these states reside in a fixed location at the end of the
@@ -63,6 +93,10 @@
                                static_cast<uint8_t*>(base));
 }
 
+size_t GetBlockSizeForFragmentSize(size_t fragment_size) {
+  return std::max(kMinFragmentSize, absl::bit_ceil(fragment_size));
+}
+
 }  // namespace
 
 // This structure always sits at offset 0 in the primary buffer and has a fixed
@@ -117,21 +151,25 @@
   static_assert(sizeof(PrimaryBuffer) <= kPrimaryBufferSize,
                 "PrimaryBuffer structure is too large.");
 
-  buffer_pool_.AddBuffer(kPrimaryBufferId, std::move(primary_buffer_memory));
-  buffer_pool_.RegisterBlockAllocator(kPrimaryBufferId,
-                                      primary_buffer_.block_allocator_64());
-  buffer_pool_.RegisterBlockAllocator(kPrimaryBufferId,
-                                      primary_buffer_.block_allocator_256());
-  buffer_pool_.RegisterBlockAllocator(kPrimaryBufferId,
-                                      primary_buffer_.block_allocator_512());
-  buffer_pool_.RegisterBlockAllocator(kPrimaryBufferId,
-                                      primary_buffer_.block_allocator_1024());
-  buffer_pool_.RegisterBlockAllocator(kPrimaryBufferId,
-                                      primary_buffer_.block_allocator_2048());
+  const BlockAllocator allocators[] = {
+      primary_buffer_.block_allocator_64(),
+      primary_buffer_.block_allocator_256(),
+      primary_buffer_.block_allocator_512(),
+      primary_buffer_.block_allocator_1024(),
+      primary_buffer_.block_allocator_2048(),
+  };
+
+  buffer_pool_.AddBlockBuffer(kPrimaryBufferId,
+                              std::move(primary_buffer_memory), allocators);
 }
 
 NodeLinkMemory::~NodeLinkMemory() = default;
 
+void NodeLinkMemory::SetNodeLink(Ref<NodeLink> link) {
+  absl::MutexLock lock(&mutex_);
+  node_link_ = std::move(link);
+}
+
 // static
 NodeLinkMemory::Allocation NodeLinkMemory::Allocate(Ref<Node> node) {
   DriverMemory primary_buffer_memory(node->driver(), sizeof(PrimaryBuffer));
@@ -153,8 +191,10 @@
   // kMaxInitialPortals, so neither will be assuming initial ownership of any
   // SublinkIds at or above this value.
   primary_buffer.header.next_sublink_id.store(kMaxInitialPortals,
-                                              std::memory_order_release);
+                                              std::memory_order_relaxed);
 
+  // Note: Each InitializeRegion() performs an atomic release, so atomic stores
+  // before this section can be relaxed.
   primary_buffer.block_allocator_64().InitializeRegion();
   primary_buffer.block_allocator_256().InitializeRegion();
   primary_buffer.block_allocator_512().InitializeRegion();
@@ -197,4 +237,135 @@
                                       Fragment(descriptor, state));
 }
 
+Fragment NodeLinkMemory::GetFragment(const FragmentDescriptor& descriptor) {
+  return buffer_pool_.GetFragment(descriptor);
+}
+
+bool NodeLinkMemory::AddBlockBuffer(BufferId id,
+                                    size_t block_size,
+                                    DriverMemoryMapping mapping) {
+  const BlockAllocator allocator(mapping.bytes(), block_size);
+  return buffer_pool_.AddBlockBuffer(id, std::move(mapping), {&allocator, 1});
+}
+
+Fragment NodeLinkMemory::AllocateFragment(size_t size) {
+  if (size == 0 || size > kMaxFragmentSizeForBlockAllocation) {
+    // TODO: Support an alternative allocation scheme for large requests.
+    return {};
+  }
+
+  const size_t block_size = GetBlockSizeForFragmentSize(size);
+  Fragment fragment = buffer_pool_.AllocateBlock(block_size);
+  if (fragment.is_null()) {
+    // Use failure as a hint to possibly expand the pool's capacity. The
+    // caller's allocation will still fail, but maybe future allocations won't.
+    if (CanExpandBlockCapacity(block_size)) {
+      RequestBlockCapacity(block_size, [](bool success) {
+        if (!success) {
+          DLOG(ERROR) << "Failed to allocate new block capacity.";
+        }
+      });
+    }
+  }
+  return fragment;
+}
+
+bool NodeLinkMemory::FreeFragment(const Fragment& fragment) {
+  if (fragment.is_null() ||
+      fragment.size() > kMaxFragmentSizeForBlockAllocation) {
+    // TODO: Once we support larger non-block-based allocations, support freeing
+    // them from here as well.
+    return false;
+  }
+
+  ABSL_ASSERT(fragment.is_addressable());
+  return buffer_pool_.FreeBlock(fragment);
+}
+
+void NodeLinkMemory::WaitForBufferAsync(
+    BufferId id,
+    BufferPool::WaitForBufferCallback callback) {
+  buffer_pool_.WaitForBufferAsync(id, std::move(callback));
+}
+
+bool NodeLinkMemory::CanExpandBlockCapacity(size_t block_size) {
+  return buffer_pool_.GetTotalBlockCapacity(block_size) <
+         kMaxBlockAllocatorCapacityPerFragmentSize;
+}
+
+void NodeLinkMemory::RequestBlockCapacity(
+    size_t block_size,
+    RequestBlockCapacityCallback callback) {
+  ABSL_ASSERT(block_size >= kMinFragmentSize);
+
+  const size_t min_buffer_size = block_size * kMinBlockAllocatorCapacity;
+  const size_t num_pages =
+      (min_buffer_size + kBlockAllocatorPageSize - 1) / kBlockAllocatorPageSize;
+  const size_t buffer_size = num_pages * kBlockAllocatorPageSize;
+
+  Ref<NodeLink> link;
+  {
+    absl::MutexLock lock(&mutex_);
+    auto [it, need_new_request] =
+        capacity_callbacks_.emplace(block_size, CapacityCallbackList());
+    it->second.push_back(std::move(callback));
+    if (!need_new_request) {
+      // There was already a request pending for this block size. `callback`
+      // will be run when that request completes.
+      return;
+    }
+    link = node_link_;
+  }
+
+  node_->AllocateSharedMemory(
+      buffer_size, [self = WrapRefCounted(this), block_size,
+                    link = std::move(link)](DriverMemory memory) {
+        if (!memory.is_valid()) {
+          self->OnCapacityRequestComplete(block_size, false);
+          return;
+        }
+
+        DriverMemoryMapping mapping = memory.Map();
+        BlockAllocator allocator(mapping.bytes(), block_size);
+        allocator.InitializeRegion();
+
+        // SUBTLE: We first share the new buffer with the remote node, then
+        // register it locally. If we registered the buffer locally first, this
+        // could lead to a deadlock on the remote node: another thread on this
+        // node could race to send a message which uses a fragment from the new
+        // buffer before the message below is sent to share the new buffer with
+        // the remote node.
+        //
+        // The remote node would not be able to dispatch the first message until
+        // its pending fragment was resolved, and it wouldn't be able to resolve
+        // the pending fragment until it received the new buffer. But the
+        // message carrying the new buffer would have been queued after the
+        // first message and therefore could not be dispatched until after the
+        // first message. Hence, deadlock.
+        const BufferId id = self->AllocateNewBufferId();
+        link->AddBlockBuffer(id, block_size, std::move(memory));
+        self->AddBlockBuffer(id, block_size, std::move(mapping));
+        self->OnCapacityRequestComplete(block_size, true);
+      });
+}
+
+void NodeLinkMemory::OnCapacityRequestComplete(size_t block_size,
+                                               bool success) {
+  CapacityCallbackList callbacks;
+  {
+    absl::MutexLock lock(&mutex_);
+    auto it = capacity_callbacks_.find(block_size);
+    if (it == capacity_callbacks_.end()) {
+      return;
+    }
+
+    callbacks = std::move(it->second);
+    capacity_callbacks_.erase(it);
+  }
+
+  for (auto& callback : callbacks) {
+    callback(success);
+  }
+}
+
 }  // namespace ipcz
diff --git a/src/ipcz/node_link_memory.h b/src/ipcz/node_link_memory.h
index b335ec9..0390719 100644
--- a/src/ipcz/node_link_memory.h
+++ b/src/ipcz/node_link_memory.h
@@ -7,21 +7,27 @@
 
 #include <cstddef>
 #include <cstdint>
+#include <functional>
+#include <vector>
 
 #include "ipcz/buffer_id.h"
 #include "ipcz/buffer_pool.h"
 #include "ipcz/driver_memory.h"
 #include "ipcz/driver_memory_mapping.h"
+#include "ipcz/fragment_descriptor.h"
 #include "ipcz/fragment_ref.h"
 #include "ipcz/ipcz.h"
 #include "ipcz/router_link_state.h"
 #include "ipcz/sublink_id.h"
+#include "third_party/abseil-cpp/absl/container/flat_hash_map.h"
+#include "third_party/abseil-cpp/absl/synchronization/mutex.h"
 #include "third_party/abseil-cpp/absl/types/span.h"
 #include "util/ref_counted.h"
 
 namespace ipcz {
 
 class Node;
+class NodeLink;
 
 // NodeLinkMemory owns and manages all shared memory resource allocation on a
 // single NodeLink. Each end of a NodeLink has its own NodeLinkMemory instance
@@ -52,6 +58,14 @@
     DriverMemory primary_buffer_memory;
   };
 
+  // Sets a reference to the NodeLink using this NodeLinkMemory. This is called
+  // by the NodeLink itself before any other methods can be called on the
+  // NodeLinkMemory, and it's only reset to null once the NodeLink is
+  // deactivated. This link may be used to share information with the remote
+  // node, where another NodeLinkMemory is cooperatively managing the same
+  // memory pool as this one.
+  void SetNodeLink(Ref<NodeLink> link);
+
   // Constructs a new NodeLinkMemory over a newly allocated DriverMemory object.
   // The new DriverMemory is returned in `primary_buffer_memory`, while the
   // returned NodeLinkMemory internally retains a mapping of that memory.
@@ -63,11 +77,6 @@
   static Ref<NodeLinkMemory> Adopt(Ref<Node> node,
                                    DriverMemory primary_buffer_memory);
 
-  // Exposes the underlying BufferPool which owns all shared buffers for this
-  // NodeLinkMemory and which facilitates dynamic allocation of the fragments
-  // within.
-  BufferPool& buffer_pool() { return buffer_pool_; }
-
   // Returns a new BufferId which should still be unused by any buffer in this
   // NodeLinkMemory's BufferPool, or that of its peer NodeLinkMemory. When
   // allocating new a buffer to add to the BufferPool, its BufferId should be
@@ -85,12 +94,54 @@
   // FragmentRef is unmanaged and will never free its underlying fragment.
   FragmentRef<RouterLinkState> GetInitialRouterLinkState(size_t i);
 
+  // Resolves `descriptor` to a concrete Fragment. If the descriptor is null or
+  // describes a region of memory which exceeds the bounds of the identified
+  // buffer, this returns a null Fragment. If the descriptor's BufferId is not
+  // yet registered with this NodeLinkMemory, this returns a pending Fragment
+  // with the same BufferId and dimensions as `descriptor`.
+  Fragment GetFragment(const FragmentDescriptor& descriptor);
+
+  // Adds a new buffer to the underlying BufferPool to use as additional
+  // allocation capacity for blocks of size `block_size`. Note that the
+  // contents of the mapped region must already be initialized as a
+  // BlockAllocator.
+  bool AddBlockBuffer(BufferId id,
+                      size_t block_size,
+                      DriverMemoryMapping mapping);
+
+  // Allocates a Fragment of `size` bytes from the underlying BufferPool. May
+  // return a null Fragment if there was no readily available capacity.
+  Fragment AllocateFragment(size_t size);
+
+  // Frees a Fragment previously allocated through this NodeLinkMemory. Returns
+  // true on success. Returns false if `fragment` does not represent an
+  // allocated fragment within this NodeLinkMemory.
+  bool FreeFragment(const Fragment& fragment);
+
+  // Runs `callback` as soon as the identified buffer is added to the underlying
+  // BufferPool. If the buffer is already present here, `callback` is run
+  // immediately.
+  void WaitForBufferAsync(BufferId id,
+                          BufferPool::WaitForBufferCallback callback);
+
  private:
   struct PrimaryBuffer;
 
   NodeLinkMemory(Ref<Node> node, DriverMemoryMapping primary_buffer);
   ~NodeLinkMemory() override;
 
+  // Indicates whether the NodeLinkMemory should be allowed to expand its
+  // allocation capacity further for blocks of size `block_size`.
+  bool CanExpandBlockCapacity(size_t block_size);
+
+  // Attempts to expand the total block allocation capacity for blocks of
+  // `block_size` bytes. `callback` may be called synchronously or
+  // asynchronously with a result indicating whether the expansion succeeded.
+  using RequestBlockCapacityCallback = std::function<void(bool)>;
+  void RequestBlockCapacity(size_t block_size,
+                            RequestBlockCapacityCallback callback);
+  void OnCapacityRequestComplete(size_t block_size, bool success);
+
   const Ref<Node> node_;
 
   // The underlying BufferPool. Note that this object is itself thread-safe, so
@@ -100,6 +151,19 @@
   // Mapping for this link's fixed primary buffer.
   const absl::Span<uint8_t> primary_buffer_memory_;
   PrimaryBuffer& primary_buffer_;
+
+  absl::Mutex mutex_;
+
+  // The NodeLink which is using this NodeLinkMemory. Used to communicate with
+  // the NodeLinkMemory on the other side of the link.
+  Ref<NodeLink> node_link_ ABSL_GUARDED_BY(mutex_);
+
+  // Callbacks to invoke when a pending capacity request is fulfilled for a
+  // specific block size. Also used to prevent stacking of capacity requests for
+  // the same block size.
+  using CapacityCallbackList = std::vector<RequestBlockCapacityCallback>;
+  absl::flat_hash_map<uint32_t, CapacityCallbackList> capacity_callbacks_
+      ABSL_GUARDED_BY(mutex_);
 };
 
 }  // namespace ipcz
diff --git a/src/ipcz/node_link_memory_test.cc b/src/ipcz/node_link_memory_test.cc
new file mode 100644
index 0000000..f7c37ba
--- /dev/null
+++ b/src/ipcz/node_link_memory_test.cc
@@ -0,0 +1,226 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "ipcz/node_link_memory.h"
+
+#include <utility>
+#include <vector>
+
+#include "ipcz/driver_transport.h"
+#include "ipcz/ipcz.h"
+#include "ipcz/link_side.h"
+#include "ipcz/node.h"
+#include "ipcz/node_link.h"
+#include "ipcz/node_link_memory.h"
+#include "ipcz/node_name.h"
+#include "reference_drivers/sync_reference_driver.h"
+#include "testing/gtest/include/gtest/gtest.h"
+#include "util/ref_counted.h"
+
+namespace ipcz {
+namespace {
+
+const IpczDriver& kTestDriver = reference_drivers::kSyncReferenceDriver;
+
+constexpr NodeName kTestBrokerName(1, 2);
+constexpr NodeName kTestNonBrokerName(2, 3);
+
+class NodeLinkMemoryTest : public testing::Test {
+ public:
+  NodeLinkMemory& memory_a() { return link_a_->memory(); }
+  NodeLinkMemory& memory_b() { return link_b_->memory(); }
+
+  void SetUp() override {
+    auto transports = DriverTransport::CreatePair(kTestDriver);
+    auto alloc = NodeLinkMemory::Allocate(node_a_);
+    link_a_ =
+        NodeLink::Create(node_a_, LinkSide::kA, kTestBrokerName,
+                         kTestNonBrokerName, Node::Type::kNormal, 0,
+                         transports.first, std::move(alloc.node_link_memory));
+    link_b_ = NodeLink::Create(
+        node_b_, LinkSide::kB, kTestNonBrokerName, kTestBrokerName,
+        Node::Type::kBroker, 0, transports.second,
+        NodeLinkMemory::Adopt(node_b_, std::move(alloc.primary_buffer_memory)));
+    node_a_->AddLink(kTestNonBrokerName, link_a_);
+    node_b_->AddLink(kTestBrokerName, link_b_);
+    link_a_->transport()->Activate();
+    link_b_->transport()->Activate();
+  }
+
+  void TearDown() override {
+    node_b_->Close();
+    node_a_->Close();
+  }
+
+ private:
+  const Ref<Node> node_a_{MakeRefCounted<Node>(Node::Type::kBroker,
+                                               kTestDriver,
+                                               IPCZ_INVALID_DRIVER_HANDLE)};
+  const Ref<Node> node_b_{MakeRefCounted<Node>(Node::Type::kNormal,
+                                               kTestDriver,
+                                               IPCZ_INVALID_DRIVER_HANDLE)};
+  Ref<NodeLink> link_a_;
+  Ref<NodeLink> link_b_;
+};
+
+TEST_F(NodeLinkMemoryTest, BasicAllocAndFree) {
+  Fragment fragment = memory_a().AllocateFragment(64);
+  EXPECT_TRUE(fragment.is_addressable());
+  EXPECT_TRUE(fragment.address());
+  EXPECT_EQ(fragment.size(), 64u);
+  EXPECT_TRUE(memory_a().FreeFragment(fragment));
+}
+
+TEST_F(NodeLinkMemoryTest, Zero) {
+  // Zero-sized fragments cannot be allocated.
+  EXPECT_TRUE(memory_a().AllocateFragment(0).is_null());
+}
+
+TEST_F(NodeLinkMemoryTest, MinimumSize) {
+  // Very small fragment sizes a minimum of 64 bytes.
+  Fragment fragments[] = {
+      memory_a().AllocateFragment(1),  memory_a().AllocateFragment(2),
+      memory_a().AllocateFragment(3),  memory_a().AllocateFragment(4),
+      memory_a().AllocateFragment(17), memory_a().AllocateFragment(63),
+  };
+
+  for (const auto& fragment : fragments) {
+    EXPECT_TRUE(fragment.is_addressable());
+    EXPECT_EQ(64u, fragment.size());
+  }
+}
+
+TEST_F(NodeLinkMemoryTest, RoundUpSize) {
+  // Fragment sizes are rounded up to the nearest power of 2.
+  Fragment fragment = memory_a().AllocateFragment(250);
+  EXPECT_TRUE(fragment.is_addressable());
+  EXPECT_EQ(256u, fragment.size());
+}
+
+TEST_F(NodeLinkMemoryTest, SharedPrimaryBuffer) {
+  // Test basic allocation from the primary buffer which both NodeLinkMemory
+  // instances share from the moment they're constructed. Each NodeLinkMemory
+  // should be able to resolve and free fragments allocated by the other.
+
+  Fragment fragment_from_a = memory_a().AllocateFragment(8);
+  EXPECT_TRUE(fragment_from_a.is_addressable());
+  EXPECT_EQ(BufferId(0), fragment_from_a.buffer_id());
+  EXPECT_GE(fragment_from_a.size(), 8u);
+
+  Fragment same_fragment = memory_b().GetFragment(fragment_from_a.descriptor());
+  EXPECT_TRUE(same_fragment.is_addressable());
+  EXPECT_EQ(fragment_from_a.buffer_id(), same_fragment.buffer_id());
+  EXPECT_EQ(fragment_from_a.offset(), same_fragment.offset());
+  EXPECT_EQ(fragment_from_a.size(), same_fragment.size());
+
+  Fragment fragment_from_b = memory_b().AllocateFragment(16);
+  EXPECT_TRUE(fragment_from_b.is_addressable());
+  EXPECT_EQ(BufferId(0), fragment_from_b.buffer_id());
+  EXPECT_GE(fragment_from_b.size(), 16u);
+
+  same_fragment = memory_a().GetFragment(fragment_from_b.descriptor());
+  EXPECT_TRUE(same_fragment.is_addressable());
+  EXPECT_EQ(fragment_from_b.buffer_id(), same_fragment.buffer_id());
+  EXPECT_EQ(fragment_from_b.offset(), same_fragment.offset());
+  EXPECT_EQ(fragment_from_b.size(), same_fragment.size());
+
+  EXPECT_TRUE(memory_a().FreeFragment(fragment_from_b));
+  EXPECT_TRUE(memory_b().FreeFragment(fragment_from_a));
+}
+
+TEST_F(NodeLinkMemoryTest, ExpandCapacity) {
+  // If we depelete a NodeLinkMemory's capacity to allocate fragments of a given
+  // size, it should automatically acquire new capacity for future allocations.
+
+  constexpr size_t kSize = 64;
+  bool has_new_capacity = false;
+  memory_a().WaitForBufferAsync(
+      BufferId(1), [&has_new_capacity] { has_new_capacity = true; });
+  while (!memory_a().AllocateFragment(kSize).is_null())
+    ;
+
+  // Since we're using a synchronous driver, this should have already been true
+  // by the time the most recent failed allocation returned.
+  EXPECT_TRUE(has_new_capacity);
+
+  // And a subsequent allocation request should now succeed with a fragment from
+  // the new buffer.
+  Fragment fragment = memory_a().AllocateFragment(kSize);
+  EXPECT_FALSE(fragment.is_null());
+  EXPECT_TRUE(fragment.is_addressable());
+  EXPECT_EQ(BufferId(1), fragment.buffer_id());
+
+  // The new buffer should have also been shared with the other NodeLinkMemory
+  // already.
+  EXPECT_TRUE(memory_b().FreeFragment(fragment));
+}
+
+TEST_F(NodeLinkMemoryTest, LimitedCapacityExpansion) {
+  // A NodeLinkMemory will eventually stop expanding its capacity for new
+  // fragments of a given size.
+  static constexpr size_t kSize = 64;
+  std::vector<Fragment> fragments;
+  auto try_alloc = [&fragments, this] {
+    Fragment fragment = memory_a().AllocateFragment(kSize);
+    if (!fragment.is_null()) {
+      fragments.push_back(fragment);
+    }
+    return !fragment.is_null();
+  };
+
+  do {
+    // Deplete the current capacity.
+    while (try_alloc()) {
+    }
+
+    // Because we're using a synchronous driver, if the NodeLinkMemory will
+    // expand its capacity at all, it will have already done so by the time the
+    // the failed allocation returns above. So if allocation fails again here,
+    // then we've reached the capacity limit for this fragment size and we can
+    // end the test.
+  } while (try_alloc());
+
+  // Any additionally allocated buffers should already have been shared with the
+  // other NodeLinkMemory. Let it free all of the fragments and verify success
+  // in every case.
+  for (const auto& fragment : fragments) {
+    EXPECT_TRUE(memory_b().FreeFragment(fragment));
+  }
+}
+
+TEST_F(NodeLinkMemoryTest, OversizedAllocation) {
+  // Allocations which are too large for block-based allocation will fail for
+  // now. This may change as new allocation schemes are supported.
+  constexpr size_t kWayTooBig = 64 * 1024 * 1024;
+  Fragment fragment = memory_a().AllocateFragment(kWayTooBig);
+  EXPECT_TRUE(fragment.is_null());
+}
+
+TEST_F(NodeLinkMemoryTest, NewBlockSizes) {
+  // NodeLinkMemory begins life with a fixed set of block allocators available
+  // for certain common block sizes. These are capped out at 2 kB blocks, but
+  // NodeLinkMemory still supports block allocation of larger blocks as well --
+  // at least up to 16 kB in size. Verify that we can trigger new capacity for
+  // such sizes by attempting to allocate them.
+
+  constexpr size_t kPrettyBig = 16 * 1024;
+  Fragment fragment = memory_a().AllocateFragment(kPrettyBig);
+
+  // No initial capacity for 16 kB fragments.
+  EXPECT_TRUE(fragment.is_null());
+
+  // But the failure above should have triggered expansion of capacity for that
+  // size. This request should succeed.
+  fragment = memory_a().AllocateFragment(kPrettyBig);
+  EXPECT_FALSE(fragment.is_null());
+  EXPECT_TRUE(fragment.is_addressable());
+  EXPECT_GE(fragment.size(), kPrettyBig);
+
+  // And as with other cases, the new capacity should have already been shared
+  // with the other NodeLinkMemory.
+  EXPECT_TRUE(memory_b().FreeFragment(fragment));
+}
+
+}  // namespace
+}  // namespace ipcz
diff --git a/src/ipcz/node_messages_generator.h b/src/ipcz/node_messages_generator.h
index d8f3c9f..5f4d2e0 100644
--- a/src/ipcz/node_messages_generator.h
+++ b/src/ipcz/node_messages_generator.h
@@ -54,6 +54,21 @@
   IPCZ_MSG_PARAM(uint32_t, num_initial_portals)
 IPCZ_MSG_END()
 
+// Shares a new buffer to support allocation of blocks of `block_size` bytes.
+// The sender must initialize an appropriate BlockAllocator within the buffer's
+// memory before sending this message.
+IPCZ_MSG_BEGIN(AddBlockBuffer, IPCZ_MSG_ID(14), IPCZ_MSG_VERSION(0))
+  // The ID of the new buffer as allocated by the NodeLinkMemory on the NodeLink
+  // transmitting this message.
+  IPCZ_MSG_PARAM(BufferId, id)
+
+  // The size of blocks which can be allocated from within this buffer.
+  IPCZ_MSG_PARAM(uint32_t, block_size)
+
+  // A handle to the driver-managed, read-write-mappable buffer.
+  IPCZ_MSG_PARAM_DRIVER_OBJECT(buffer)
+IPCZ_MSG_END()
+
 // Conveys the contents of a parcel.
 IPCZ_MSG_BEGIN(AcceptParcel, IPCZ_MSG_ID(20), IPCZ_MSG_VERSION(0))
   // The SublinkId linking the source and destination Routers along the
diff --git a/src/ipcz/ref_counted_fragment_test.cc b/src/ipcz/ref_counted_fragment_test.cc
index 5425304..3690f48 100644
--- a/src/ipcz/ref_counted_fragment_test.cc
+++ b/src/ipcz/ref_counted_fragment_test.cc
@@ -167,8 +167,7 @@
   // and so will the test.
   constexpr size_t kNumAllocations = 100000;
   for (size_t i = 0; i < kNumAllocations; ++i) {
-    Fragment fragment =
-        memory->buffer_pool().AllocateFragment(sizeof(TestObject));
+    Fragment fragment = memory->AllocateFragment(sizeof(TestObject));
     EXPECT_TRUE(fragment.is_addressable());
     FragmentRef<TestObject> ref(RefCountedFragment::kAdoptExistingRef, memory,
                                 fragment);
diff --git a/src/ipcz/router_link_test.cc b/src/ipcz/router_link_test.cc
index 42ab8a8..3790412 100644
--- a/src/ipcz/router_link_test.cc
+++ b/src/ipcz/router_link_test.cc
@@ -61,9 +61,8 @@
         broker_->AddLink(kTestNonBrokerName, broker_node_link_);
         non_broker_->AddLink(kTestBrokerName, non_broker_node_link_);
 
-        auto fragment =
-            broker_node_link_->memory().buffer_pool().AllocateFragment(
-                sizeof(RouterLinkState));
+        auto fragment = broker_node_link_->memory().AllocateFragment(
+            sizeof(RouterLinkState));
         auto link_state = FragmentRef<RouterLinkState>(
             RefCountedFragment::kAdoptExistingRef,
             WrapRefCounted(&broker_node_link_->memory()), fragment);