ipcz: Support overlapping two-phase Puts

This will enable in-place serialization of IPCs within shared portal
memory, without requiring exclusive access to the portal.

This breaks the ABI by adding a new argument to EndPut() to identify
a specific transaction; and by modifying the behavior of BeginPut()
to require a non-null output `data` argument.

Bug: 1394557
Change-Id: Ie0a52d8444e52ec1e1811eb74d4218205b3cf369
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/4552597
Reviewed-by: Alex Gough <ajgo@chromium.org>
Commit-Queue: Ken Rockot <rockot@google.com>
Cr-Commit-Position: refs/heads/main@{#1147372}
NOKEYCHECK=True
GitOrigin-RevId: fb97c6ba91631982786b2ffbcb8aca4b0d36eb98
diff --git a/include/ipcz/ipcz.h b/include/ipcz/ipcz.h
index f21a773..b6621ac 100644
--- a/include/ipcz/ipcz.h
+++ b/include/ipcz/ipcz.h
@@ -1300,16 +1300,15 @@
   // BeginPut()
   // ==========
   //
-  // Begins a two-phase put operation on `portal`. While a two-phase put
-  // operation is in progress on a portal, any other BeginPut() call on the same
-  // portal will fail with IPCZ_RESULT_ALREADY_EXISTS.
-  //
-  // Unlike a plain Put() call, two-phase put operations allow the application
-  // to write directly into portal memory, potentially reducing memory access
-  // costs by eliminating redundant copying and caching.
+  // Begins a two-phase put operation on `portal`, returning a pointer in
+  // `*data` which points to writable portal memory. The application can write
+  // parcel data directly into this location and complete the transaction by
+  // calling EndPut().
   //
   // The input value of `*num_bytes` tells ipcz how much data the caller would
-  // like to place into the portal.
+  // like to place into the portal. If the call is successful, the output value
+  // of `*num_bytes` conveys the actual capacity available for writing at
+  // `data`.
   //
   // Limits provided to BeginPut() elicit similar behavior to Put(), with the
   // exception that `flags` may specify IPCZ_BEGIN_PUT_ALLOW_PARTIAL to allow
@@ -1319,22 +1318,21 @@
   // to reflect the remaining capacity of the portal, allowing the caller to
   // commit at least some portion of their data with EndPut().
   //
-  // Handles for two-phase puts are only provided when finalizing the operation
-  // with EndPut().
+  // Handles to transmit within a two-phase put are provided when committing the
+  // operation with EndPut().
   //
   // Returns:
   //
   //    IPCZ_RESULT_OK if the two-phase put operation has been successfully
-  //        initiated. This operation must be completed with EndPut() before any
-  //        further Put() or BeginPut() calls are allowed on `portal`. `*data`
-  //        is set to the address of a portal buffer into which the application
-  //        may copy its data, and `*num_bytes` is updated to reflect the
-  //        capacity of that buffer, which may be greater than (or less than, if
-  //        and only if IPCZ_BEGIN_PUT_ALLOW_PARTIAL was set in `flags`) the
-  //        capacity requested by the input value of `*num_bytes`.
+  //        initiated. `*data` is set to the address of a portal buffer into
+  //        which the application may write its data, and `*num_bytes` is
+  //        updated to reflect the capacity of that buffer, which may be greater
+  //        than (or less than, if and only if IPCZ_BEGIN_PUT_ALLOW_PARTIAL was
+  //        set in `flags`) the capacity requested by the input value of
+  //        `*num_bytes`.
   //
-  //    IPCZ_RESULT_INVALID_ARGUMENT if `portal` is invalid, `*num_bytes` is
-  //        non-zero but `data` is null, or options is non-null and invalid.
+  //    IPCZ_RESULT_INVALID_ARGUMENT if `portal` is invalid, `data` is null, or
+  //        options is non-null but invalid.
   //
   //    IPCZ_RESULT_RESOURCE_EXHAUSTED if completing the put with the number of
   //        bytes specified by `*num_bytes` would cause the portal to exceed the
@@ -1342,26 +1340,25 @@
   //        specified in `flags`) data byte limit specified by
   //        `options->limits`.
   //
-  //    IPCZ_RESULT_ALREADY_EXISTS if there is already a two-phase put operation
-  //        in progress on `portal`.
-  //
   //    IPCZ_RESULT_NOT_FOUND if it is known that the opposite portal has
   //        already been closed and anything put into this portal would be lost.
   IpczResult(IPCZ_API* BeginPut)(
       IpczHandle portal,                          // in
       IpczBeginPutFlags flags,                    // in
       const struct IpczBeginPutOptions* options,  // in
-      size_t* num_bytes,                          // out
+      size_t* num_bytes,                          // in/out
       void** data);                               // out
 
   // EndPut()
   // ========
   //
-  // Ends the two-phase put operation started by the most recent successful call
-  // to BeginPut() on `portal`.
+  // Ends the two-phase put transaction identified by `data` on `portal`.
   //
-  // `num_bytes_produced` specifies the number of bytes actually written into
-  // the buffer that was returned from the original BeginPut() call.
+  // `data` is the address of the transaction's data, as previously returned by
+  // a call to BeginPut() on the same `portal`.
+  //
+  // `num_bytes_produced` specifies the number of bytes actually written at
+  // `data` by the application prior to calling EndPut().
   //
   // Usage of `handles` and `num_handles` is identical to Put().
   //
@@ -1369,15 +1366,9 @@
   // provided handles remain property of the caller. If it succeeds, their
   // ownership is assumed by ipcz.
   //
-  // If IPCZ_END_PUT_ABORT is given in `flags` and there is a two-phase put
-  // operation in progress on `portal`, all other arguments are ignored and the
-  // pending two-phase put operation is cancelled without committing a new
-  // parcel to the portal.
-  //
-  // If EndPut() fails for any reason other than
-  // IPCZ_RESULT_FAILED_PRECONDITION, the two-phase put operation remains in
-  // progress, and EndPut() must be called again to abort the operation or
-  // attempt completion with different arguments.
+  // If IPCZ_END_PUT_ABORT is given in `flags` and `data` is valid for `portal`,
+  // all other arguments are ignored and the corresponding transaction is
+  // aborted with its parcel data discarded.
   //
   // `options` is unused and must be null.
   //
@@ -1387,17 +1378,20 @@
   //        aborted. If not aborted all data and handles were committed to a new
   //        parcel enqueued for retrieval by the opposite portal.
   //
-  //    IPCZ_RESULT_INVALID_ARGUMENT if `portal` is invalid, `num_handles` is
+  //    IPCZ_RESULT_INVALID_ARGUMENT if `portal` is invalid, `data` does not
+  //        identify a valid put transaction on `portal`, `num_handles` is
   //        non-zero but `handles` is null, `num_bytes_produced` is larger than
   //        the capacity of the buffer originally returned by BeginPut(), or any
-  //        handle in `handles` is invalid or not serializable.
-  //
-  //    IPCZ_RESULT_FAILED_PRECONDITION if there was no two-phase put operation
-  //        in progress on `portal`.
+  //        handle in `handles` is invalid or not serializable. If `portal` and
+  //        `data` referenced a valid transaction, the transaction is still
+  //        in progress if EndPut() returns this error.
   //
   //    IPCZ_RESULT_NOT_FOUND if it is known that the opposite portal has
   //        already been closed and anything put into this portal would be lost.
+  //        The transaction referenced by the caller is implicitly aborted and
+  //        ownership of all passed handles is retained by the caller.
   IpczResult(IPCZ_API* EndPut)(IpczHandle portal,          // in
+                               const void* data,           // in
                                size_t num_bytes_produced,  // in
                                const IpczHandle* handles,  // in
                                size_t num_handles,         // in
diff --git a/src/api.cc b/src/api.cc
index 20e28ae..97f8239 100644
--- a/src/api.cc
+++ b/src/api.cc
@@ -172,10 +172,7 @@
                     size_t* num_bytes,
                     void** data) {
   ipcz::Portal* portal = ipcz::Portal::FromHandle(portal_handle);
-  if (!portal) {
-    return IPCZ_RESULT_INVALID_ARGUMENT;
-  }
-  if (num_bytes && *num_bytes > 0 && !data) {
+  if (!portal || !data) {
     return IPCZ_RESULT_INVALID_ARGUMENT;
   }
   if (options && options->size < sizeof(IpczBeginPutOptions)) {
@@ -191,17 +188,18 @@
   if (!num_bytes) {
     num_bytes = &dummy_num_bytes;
   }
-  return portal->BeginPut(flags, limits, *num_bytes, data);
+  return portal->BeginPut(flags, limits, *num_bytes, *data);
 }
 
 IpczResult EndPut(IpczHandle portal_handle,
+                  const void* data,
                   size_t num_bytes_produced,
                   const IpczHandle* handles,
                   size_t num_handles,
                   IpczEndPutFlags flags,
                   const void* options) {
   ipcz::Portal* portal = ipcz::Portal::FromHandle(portal_handle);
-  if (!portal) {
+  if (!portal || !data) {
     return IPCZ_RESULT_INVALID_ARGUMENT;
   }
   if (num_handles > 0 && !handles) {
@@ -209,10 +207,10 @@
   }
 
   if (flags & IPCZ_END_PUT_ABORT) {
-    return portal->AbortPut();
+    return portal->AbortPut(data);
   }
 
-  return portal->CommitPut(num_bytes_produced,
+  return portal->CommitPut(data, num_bytes_produced,
                            absl::MakeSpan(handles, num_handles));
 }
 
diff --git a/src/api_test.cc b/src/api_test.cc
index 02e0863..1eb5b3b 100644
--- a/src/api_test.cc
+++ b/src/api_test.cc
@@ -336,7 +336,7 @@
             ipcz().BeginPut(IPCZ_INVALID_HANDLE, IPCZ_NO_FLAGS, nullptr,
                             &num_bytes, &data));
 
-  // Non-zero size but null data.
+  // Null data.
   EXPECT_EQ(IPCZ_RESULT_INVALID_ARGUMENT,
             ipcz().BeginPut(a, IPCZ_NO_FLAGS, nullptr, &num_bytes, nullptr));
 
@@ -345,35 +345,37 @@
   EXPECT_EQ(IPCZ_RESULT_INVALID_ARGUMENT,
             ipcz().BeginPut(a, IPCZ_NO_FLAGS, &options, &num_bytes, &data));
 
-  // Duplicate two-phase Put.
+  // Start a put transaction to test EndPut().
   EXPECT_EQ(IPCZ_RESULT_OK,
-            ipcz().BeginPut(a, IPCZ_NO_FLAGS, nullptr, nullptr, nullptr));
-  EXPECT_EQ(IPCZ_RESULT_ALREADY_EXISTS,
-            ipcz().BeginPut(a, IPCZ_NO_FLAGS, nullptr, nullptr, nullptr));
+            ipcz().BeginPut(a, IPCZ_NO_FLAGS, nullptr, &num_bytes, &data));
 
   // Invalid portal.
   EXPECT_EQ(IPCZ_RESULT_INVALID_ARGUMENT,
-            ipcz().EndPut(IPCZ_INVALID_HANDLE, 0, nullptr, 0, IPCZ_NO_FLAGS,
-                          nullptr));
+            ipcz().EndPut(IPCZ_INVALID_HANDLE, data, 0, nullptr, 0,
+                          IPCZ_NO_FLAGS, nullptr));
+
+  // Invalid data address.
+  EXPECT_EQ(IPCZ_RESULT_INVALID_ARGUMENT,
+            ipcz().EndPut(a, nullptr, 0, nullptr, 0, IPCZ_NO_FLAGS, nullptr));
 
   // Non-zero number of handles, but null handle buffer.
   EXPECT_EQ(IPCZ_RESULT_INVALID_ARGUMENT,
-            ipcz().EndPut(a, 0, nullptr, 1, IPCZ_NO_FLAGS, nullptr));
+            ipcz().EndPut(a, data, 0, nullptr, 1, IPCZ_NO_FLAGS, nullptr));
 
   // Oversized data.
-  EXPECT_EQ(IPCZ_RESULT_INVALID_ARGUMENT,
-            ipcz().EndPut(a, kPutSize * 2, nullptr, 0, IPCZ_NO_FLAGS, nullptr));
+  EXPECT_EQ(
+      IPCZ_RESULT_INVALID_ARGUMENT,
+      ipcz().EndPut(a, data, kPutSize * 2, nullptr, 0, IPCZ_NO_FLAGS, nullptr));
 
   // Invalid handle attachment.
   IpczHandle invalid_handle = IPCZ_INVALID_HANDLE;
-  EXPECT_EQ(IPCZ_RESULT_INVALID_ARGUMENT,
-            ipcz().EndPut(a, 0, &invalid_handle, 1, IPCZ_NO_FLAGS, nullptr));
+  EXPECT_EQ(
+      IPCZ_RESULT_INVALID_ARGUMENT,
+      ipcz().EndPut(a, data, 0, &invalid_handle, 1, IPCZ_NO_FLAGS, nullptr));
 
-  // Two-phase Put not in progress.
+  // Commit it.
   EXPECT_EQ(IPCZ_RESULT_OK,
-            ipcz().EndPut(a, 0, nullptr, 0, IPCZ_NO_FLAGS, nullptr));
-  EXPECT_EQ(IPCZ_RESULT_FAILED_PRECONDITION,
-            ipcz().EndPut(a, 0, nullptr, 0, IPCZ_NO_FLAGS, nullptr));
+            ipcz().EndPut(a, data, 0, nullptr, 0, IPCZ_NO_FLAGS, nullptr));
 
   CloseAll({a, b, node});
 }
@@ -446,8 +448,8 @@
             ipcz().BeginPut(a, IPCZ_NO_FLAGS, nullptr, &num_bytes, &out_data));
   EXPECT_EQ(kMessage.size(), num_bytes);
   memcpy(out_data, kMessage.data(), kMessage.size());
-  EXPECT_EQ(IPCZ_RESULT_OK,
-            ipcz().EndPut(a, num_bytes, nullptr, 0, IPCZ_NO_FLAGS, nullptr));
+  EXPECT_EQ(IPCZ_RESULT_OK, ipcz().EndPut(a, out_data, num_bytes, nullptr, 0,
+                                          IPCZ_NO_FLAGS, nullptr));
 
   const void* in_data;
   EXPECT_EQ(IPCZ_RESULT_OK, ipcz().BeginGet(b, IPCZ_NO_FLAGS, nullptr, &in_data,
@@ -472,6 +474,71 @@
   CloseAll({a, b, node});
 }
 
+TEST_F(APITest, OverlappedTwoPhasePuts) {
+  const IpczHandle node = CreateNode(kDefaultDriver);
+  auto [a, b] = OpenPortals(node);
+
+  constexpr std::string_view kMessage1 = "Hello.";
+  constexpr std::string_view kMessage2 = "World?";
+  constexpr std::string_view kMessage3 = "OK!";
+
+  // Set up three concurrent transactions.
+
+  size_t num_bytes1 = kMessage1.size();
+  void* out_data1;
+  EXPECT_EQ(IPCZ_RESULT_OK, ipcz().BeginPut(a, IPCZ_NO_FLAGS, nullptr,
+                                            &num_bytes1, &out_data1));
+  EXPECT_EQ(kMessage1.size(), num_bytes1);
+  memcpy(out_data1, kMessage1.data(), kMessage1.size());
+
+  size_t num_bytes2 = kMessage2.size();
+  void* out_data2;
+  EXPECT_EQ(IPCZ_RESULT_OK, ipcz().BeginPut(a, IPCZ_NO_FLAGS, nullptr,
+                                            &num_bytes2, &out_data2));
+  EXPECT_EQ(kMessage2.size(), num_bytes2);
+  memcpy(out_data2, kMessage2.data(), kMessage2.size());
+
+  size_t num_bytes3 = kMessage3.size();
+  void* out_data3;
+  EXPECT_EQ(IPCZ_RESULT_OK, ipcz().BeginPut(a, IPCZ_NO_FLAGS, nullptr,
+                                            &num_bytes3, &out_data3));
+  EXPECT_EQ(kMessage3.size(), num_bytes3);
+  memcpy(out_data3, kMessage3.data(), kMessage3.size());
+
+  // Complete them out-of-order. They should arrive in the order in which they
+  // were completed rather than the order in which they were started.
+
+  EXPECT_EQ(IPCZ_RESULT_OK, ipcz().EndPut(a, out_data3, num_bytes3, nullptr, 0,
+                                          IPCZ_NO_FLAGS, nullptr));
+  EXPECT_EQ(IPCZ_RESULT_OK, ipcz().EndPut(a, out_data1, num_bytes1, nullptr, 0,
+                                          IPCZ_NO_FLAGS, nullptr));
+  EXPECT_EQ(IPCZ_RESULT_OK, ipcz().EndPut(a, out_data2, num_bytes2, nullptr, 0,
+                                          IPCZ_NO_FLAGS, nullptr));
+
+  // Also for good measure attempt to terminate a transaction twice.
+  EXPECT_EQ(IPCZ_RESULT_INVALID_ARGUMENT,
+            ipcz().EndPut(a, out_data1, num_bytes1, nullptr, 0, IPCZ_NO_FLAGS,
+                          nullptr));
+
+  char message[16] = {};
+  size_t num_bytes = std::size(message);
+  EXPECT_EQ(IPCZ_RESULT_OK, ipcz().Get(b, IPCZ_NO_FLAGS, nullptr, message,
+                                       &num_bytes, nullptr, nullptr, nullptr));
+  EXPECT_EQ(kMessage3, std::string_view(message, num_bytes));
+
+  num_bytes = std::size(message);
+  EXPECT_EQ(IPCZ_RESULT_OK, ipcz().Get(b, IPCZ_NO_FLAGS, nullptr, message,
+                                       &num_bytes, nullptr, nullptr, nullptr));
+  EXPECT_EQ(kMessage1, std::string_view(message, num_bytes));
+
+  num_bytes = std::size(message);
+  EXPECT_EQ(IPCZ_RESULT_OK, ipcz().Get(b, IPCZ_NO_FLAGS, nullptr, message,
+                                       &num_bytes, nullptr, nullptr, nullptr));
+  EXPECT_EQ(kMessage2, std::string_view(message, num_bytes));
+
+  CloseAll({a, b, node});
+}
+
 TEST_F(APITest, TrapInvalid) {
   const IpczHandle node = CreateNode(kDefaultDriver);
   auto [a, b] = OpenPortals(node);
diff --git a/src/ipcz/portal.cc b/src/ipcz/portal.cc
index b669803..c4b5015 100644
--- a/src/ipcz/portal.cc
+++ b/src/ipcz/portal.cc
@@ -15,6 +15,7 @@
 #include "ipcz/trap_event_dispatcher.h"
 #include "third_party/abseil-cpp/absl/types/span.h"
 #include "util/log.h"
+#include "util/overloaded.h"
 #include "util/ref_counted.h"
 
 namespace ipcz {
@@ -119,7 +120,7 @@
 IpczResult Portal::BeginPut(IpczBeginPutFlags flags,
                             const IpczPutLimits* limits,
                             size_t& num_data_bytes,
-                            void** data) {
+                            void*& data) {
   const bool allow_partial = (flags & IPCZ_BEGIN_PUT_ALLOW_PARTIAL) != 0;
   if (limits) {
     size_t max_num_data_bytes = router_->GetOutboundCapacityInBytes(*limits);
@@ -135,28 +136,40 @@
     return IPCZ_RESULT_NOT_FOUND;
   }
 
+  // Always request a non-zero size for two-phase puts so that we always have
+  // a non-null data address upon which to key the operation in EndPut().
+  const size_t num_bytes_to_request = num_data_bytes ? num_data_bytes : 1;
   Parcel parcel;
-  const IpczResult allocation_result =
-      router_->AllocateOutboundParcel(num_data_bytes, allow_partial, parcel);
+  const IpczResult allocation_result = router_->AllocateOutboundParcel(
+      num_bytes_to_request, allow_partial, parcel);
   absl::MutexLock lock(&mutex_);
-  if (in_two_phase_put_) {
-    return IPCZ_RESULT_ALREADY_EXISTS;
-  }
   if (allocation_result != IPCZ_RESULT_OK) {
     return allocation_result;
   }
 
-  in_two_phase_put_ = true;
-  pending_parcel_ = std::move(parcel);
-
-  num_data_bytes = pending_parcel_->data_view().size();
-  if (data) {
-    *data = num_data_bytes ? pending_parcel_->data_view().data() : nullptr;
-  }
+  num_data_bytes = parcel.data_view().size();
+  data = parcel.data_view().data();
+  absl::visit(Overloaded{[&](absl::monostate) {
+                           pending_parcels_.emplace<Parcel>(std::move(parcel));
+                         },
+                         [&](Parcel& first_parcel) {
+                           const void* first_key =
+                               first_parcel.data_view().data();
+                           PendingParcelMap parcels;
+                           parcels[first_key] = std::move(first_parcel);
+                           parcels[data] = std::move(parcel);
+                           pending_parcels_.emplace<PendingParcelMap>(
+                               std::move(parcels));
+                         },
+                         [&](PendingParcelMap& parcels) {
+                           parcels[data] = std::move(parcel);
+                         }},
+              pending_parcels_);
   return IPCZ_RESULT_OK;
 }
 
-IpczResult Portal::CommitPut(size_t num_data_bytes_produced,
+IpczResult Portal::CommitPut(const void* data,
+                             size_t num_data_bytes_produced,
                              absl::Span<const IpczHandle> handles) {
   std::vector<Ref<APIObject>> objects;
   if (!ValidateAndAcquireObjectsForTransitFrom(*this, handles, objects)) {
@@ -166,15 +179,34 @@
   Parcel parcel;
   {
     absl::MutexLock lock(&mutex_);
-    if (!in_two_phase_put_ || !pending_parcel_) {
-      return IPCZ_RESULT_FAILED_PRECONDITION;
-    }
+    const bool is_request_valid = absl::visit(
+        Overloaded{
+            [&](absl::monostate) { return false; },
+            [&](Parcel& first_parcel) {
+              if (first_parcel.data_view().data() != data ||
+                  num_data_bytes_produced > first_parcel.data_view().size()) {
+                return false;
+              }
 
-    if (num_data_bytes_produced > pending_parcel_->data_view().size()) {
+              parcel = std::move(first_parcel);
+              pending_parcels_ = absl::monostate{};
+              return true;
+            },
+            [&](PendingParcelMap& parcels) {
+              auto it = parcels.find(data);
+              if (it == parcels.end() ||
+                  num_data_bytes_produced > it->second.data_view().size()) {
+                return false;
+              }
+
+              parcel = std::move(it->second);
+              parcels.erase(it);
+              return true;
+            }},
+        pending_parcels_);
+    if (!is_request_valid) {
       return IPCZ_RESULT_INVALID_ARGUMENT;
     }
-
-    parcel = *std::exchange(pending_parcel_, absl::nullopt);
   }
 
   parcel.CommitData(num_data_bytes_produced);
@@ -186,25 +218,37 @@
     for (IpczHandle handle : handles) {
       APIObject::TakeFromHandle(handle);
     }
-
-    absl::MutexLock lock(&mutex_);
-    in_two_phase_put_ = false;
-  } else {
-    absl::MutexLock lock(&mutex_);
-    pending_parcel_ = std::move(parcel);
   }
 
   return result;
 }
 
-IpczResult Portal::AbortPut() {
+IpczResult Portal::AbortPut(const void* data) {
   absl::MutexLock lock(&mutex_);
-  if (!in_two_phase_put_) {
-    return IPCZ_RESULT_FAILED_PRECONDITION;
+  const bool is_request_valid =
+      absl::visit(Overloaded{[&](absl::monostate) { return false; },
+                             [&](Parcel& first_parcel) {
+                               if (first_parcel.data_view().data() != data) {
+                                 return false;
+                               }
+
+                               pending_parcels_ = absl::monostate{};
+                               return true;
+                             },
+                             [&](PendingParcelMap& parcels) {
+                               auto it = parcels.find(data);
+                               if (it == parcels.end()) {
+                                 return false;
+                               }
+
+                               parcels.erase(it);
+                               return true;
+                             }},
+                  pending_parcels_);
+  if (!is_request_valid) {
+    return IPCZ_RESULT_INVALID_ARGUMENT;
   }
 
-  in_two_phase_put_ = false;
-  pending_parcel_.reset();
   return IPCZ_RESULT_OK;
 }
 
diff --git a/src/ipcz/portal.h b/src/ipcz/portal.h
index 7c09682..174c640 100644
--- a/src/ipcz/portal.h
+++ b/src/ipcz/portal.h
@@ -11,9 +11,11 @@
 #include "ipcz/api_object.h"
 #include "ipcz/ipcz.h"
 #include "ipcz/parcel.h"
+#include "third_party/abseil-cpp/absl/container/flat_hash_map.h"
 #include "third_party/abseil-cpp/absl/synchronization/mutex.h"
 #include "third_party/abseil-cpp/absl/types/optional.h"
 #include "third_party/abseil-cpp/absl/types/span.h"
+#include "third_party/abseil-cpp/absl/types/variant.h"
 #include "util/ref_counted.h"
 
 namespace ipcz {
@@ -52,10 +54,11 @@
   IpczResult BeginPut(IpczBeginPutFlags flags,
                       const IpczPutLimits* limits,
                       size_t& num_data_bytes,
-                      void** data);
-  IpczResult CommitPut(size_t num_data_bytes_produced,
+                      void*& data);
+  IpczResult CommitPut(const void* data,
+                       size_t num_data_bytes_produced,
                        absl::Span<const IpczHandle> handles);
-  IpczResult AbortPut();
+  IpczResult AbortPut(const void* data);
 
   IpczResult Get(IpczGetFlags flags,
                  void* data,
@@ -78,12 +81,13 @@
 
   absl::Mutex mutex_;
 
-  // The parcel being built by the in-progress two-phase Put operation, if one
-  // is in progress.
-  absl::optional<Parcel> pending_parcel_ ABSL_GUARDED_BY(mutex_);
-
-  bool in_two_phase_put_ ABSL_GUARDED_BY(mutex_) = false;
   bool in_two_phase_get_ ABSL_GUARDED_BY(mutex_) = false;
+
+  // Tracks parcels being built for two-phase put operations. The most common
+  // case is a single concurrent put, so this case is optimized to store an
+  // inlined Parcel object with no hash table.
+  using PendingParcelMap = absl::flat_hash_map<const void*, Parcel>;
+  absl::variant<absl::monostate, Parcel, PendingParcelMap> pending_parcels_;
 };
 
 }  // namespace ipcz
diff --git a/src/queueing_test.cc b/src/queueing_test.cc
index 6498a27..089e9df 100644
--- a/src/queueing_test.cc
+++ b/src/queueing_test.cc
@@ -157,8 +157,8 @@
   // There should not be enough space for all 4 bytes.
   EXPECT_EQ(3u, num_bytes);
   memcpy(data, "ipc", 3);
-  EXPECT_EQ(IPCZ_RESULT_OK,
-            ipcz().EndPut(c, num_bytes, nullptr, 0, IPCZ_NO_FLAGS, nullptr));
+  EXPECT_EQ(IPCZ_RESULT_OK, ipcz().EndPut(c, data, num_bytes, nullptr, 0,
+                                          IPCZ_NO_FLAGS, nullptr));
 
   EXPECT_EQ(IPCZ_RESULT_OK, WaitForConditionFlags(c, IPCZ_TRAP_PEER_CLOSED));
   Close(c);
@@ -255,7 +255,7 @@
       size_t num_bytes = std::min(bytes_remaining, capacity);
       bytes_remaining -= num_bytes;
       memset(data, '!', num_bytes);
-      EXPECT_EQ(IPCZ_RESULT_OK, ipcz().EndPut(c, num_bytes, nullptr, 0,
+      EXPECT_EQ(IPCZ_RESULT_OK, ipcz().EndPut(c, data, num_bytes, nullptr, 0,
                                               IPCZ_NO_FLAGS, nullptr));
       continue;
     }