ipcz: Reduce received message copies

Instead of always defaulting to inline storage within Message, this adds
a movable heap buffer (ReceivedDataBuffer) to Message as an alternative
backing store. ReceivedDataBuffer is used to receive incoming Message
data, so that a message's underlying buffer can be relocated once the
message has been validated.

Parcel now also supports its data being backed by a ReceivedDataBuffer.
Parcels accepted from a cross-node Message avoid an extra copy when
the parcel data itself does not reside in shared memory, because now
they can take direct ownership of the Message contents embedding the
parcel's data.

To facilitate the latter change and to reduce the memory footprint and
cost to move each Parcel object, Parcel is refactored here to use a
variant type for its backing data storage, and to move object attachment
state to the heap instead of inlining it. In practice this reduces
Parcel size from 150 bytes to 80 bytes on 64-bit platforms.

Bug: 1299283
Fixed: 1384209
Change-Id: I742f4477691e4801c0da76d48d612d8591e0ac4e
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/4023503
Reviewed-by: Daniel Cheng <dcheng@chromium.org>
Commit-Queue: Ken Rockot <rockot@google.com>
Cr-Commit-Position: refs/heads/main@{#1071957}
NOKEYCHECK=True
GitOrigin-RevId: 4ffe9346790c56659e7be367f266babaf127d430
diff --git a/src/ipcz/message.cc b/src/ipcz/message.cc
index b736842..cb0351e 100644
--- a/src/ipcz/message.cc
+++ b/src/ipcz/message.cc
@@ -6,6 +6,7 @@
 
 #include <cstddef>
 #include <cstdint>
+#include <cstdlib>
 #include <cstring>
 #include <utility>
 
@@ -134,10 +135,30 @@
 
 }  // namespace
 
+Message::ReceivedDataBuffer::ReceivedDataBuffer() = default;
+
+// NOTE: This malloc'd buffer is intentionally NOT zero-initialized, because we
+// will fully overwrite its contents.
+Message::ReceivedDataBuffer::ReceivedDataBuffer(size_t size)
+    : data_(static_cast<uint8_t*>(malloc(size))), size_(size) {}
+
+Message::ReceivedDataBuffer::ReceivedDataBuffer(ReceivedDataBuffer&& other)
+    : data_(std::move(other.data_)), size_(std::exchange(other.size_, 0)) {}
+
+Message::ReceivedDataBuffer& Message::ReceivedDataBuffer::operator=(
+    ReceivedDataBuffer&& other) {
+  data_ = std::move(other.data_);
+  size_ = std::exchange(other.size_, 0);
+  return *this;
+}
+
+Message::ReceivedDataBuffer::~ReceivedDataBuffer() = default;
+
 Message::Message() = default;
 
 Message::Message(uint8_t message_id, size_t params_size)
-    : data_(sizeof(internal::MessageHeader) + params_size) {
+    : inlined_data_(sizeof(internal::MessageHeader) + params_size),
+      data_(absl::MakeSpan(*inlined_data_)) {
   internal::MessageHeader& h = header();
   h.size = sizeof(h);
   h.version = 0;
@@ -155,7 +176,10 @@
   size_t offset = Align(data_.size());
   size_t num_bytes = Align(CheckAdd(sizeof(internal::ArrayHeader),
                                     CheckMul(element_size, num_elements)));
-  data_.resize(CheckAdd(offset, num_bytes));
+
+  ABSL_ASSERT(inlined_data_);
+  inlined_data_->resize(CheckAdd(offset, num_bytes));
+  data_ = absl::MakeSpan(*inlined_data_);
   auto& header = *reinterpret_cast<internal::ArrayHeader*>(&data_[offset]);
   header.num_bytes = checked_cast<uint32_t>(num_bytes);
   header.num_elements = checked_cast<uint32_t>(num_elements);
@@ -276,11 +300,20 @@
   return all_driver_objects_ok;
 }
 
+Message::ReceivedDataBuffer Message::TakeReceivedData() && {
+  ABSL_ASSERT(received_data_.has_value());
+  ReceivedDataBuffer buffer(std::move(*received_data_));
+  received_data_.reset();
+  data_ = {};
+  return buffer;
+}
+
 bool Message::CopyDataAndValidateHeader(absl::Span<const uint8_t> data) {
   // Copy the data into a local message object to avoid any TOCTOU issues in
   // case `data` is in unsafe shared memory.
-  data_.resize(data.size());
-  memcpy(data_.data(), data.data(), data.size());
+  received_data_.emplace(data.size());
+  memcpy(received_data_->data(), data.data(), data.size());
+  data_ = received_data_->bytes();
 
   // The message must at least be large enough to encode a v0 MessageHeader.
   if (data_.size() < sizeof(internal::MessageHeaderV0)) {
diff --git a/src/ipcz/message.h b/src/ipcz/message.h
index 7187528..9f8fb50 100644
--- a/src/ipcz/message.h
+++ b/src/ipcz/message.h
@@ -14,6 +14,7 @@
 #include "ipcz/sequence_number.h"
 #include "third_party/abseil-cpp/absl/base/macros.h"
 #include "third_party/abseil-cpp/absl/container/inlined_vector.h"
+#include "third_party/abseil-cpp/absl/types/optional.h"
 #include "third_party/abseil-cpp/absl/types/span.h"
 #include "util/safe_math.h"
 
@@ -169,6 +170,34 @@
 // Message helps build, serialize, and deserialize ipcz-internal messages.
 class IPCZ_ALIGN(8) Message {
  public:
+  enum { kIncoming };
+
+  // ReceivedDataBuffer is a fixed-size, heap-allocated data buffer which is
+  // allocated uninitialized and which can be moved out of the Message which
+  // allocated it. This is used strictly as storage for received message data.
+  struct FreeDeleter {
+    void operator()(void* ptr) { free(ptr); }
+  };
+  using ReceivedDataPtr = std::unique_ptr<uint8_t, FreeDeleter>;
+  class ReceivedDataBuffer {
+   public:
+    ReceivedDataBuffer();
+    explicit ReceivedDataBuffer(size_t size);
+    ReceivedDataBuffer(ReceivedDataBuffer&&);
+    ReceivedDataBuffer& operator=(ReceivedDataBuffer&&);
+    ~ReceivedDataBuffer();
+
+    uint8_t* data() const { return data_.get(); }
+    size_t size() const { return size_; }
+    bool empty() const { return size_ == 0; }
+
+    absl::Span<uint8_t> bytes() const { return absl::MakeSpan(data(), size()); }
+
+   private:
+    ReceivedDataPtr data_;
+    size_t size_ = 0;
+  };
+
   Message();
   Message(uint8_t message_id, size_t params_size);
   ~Message();
@@ -181,7 +210,7 @@
     return *reinterpret_cast<const internal::MessageHeader*>(data_.data());
   }
 
-  absl::Span<uint8_t> data_view() { return absl::MakeSpan(data_); }
+  absl::Span<uint8_t> data_view() { return data_; }
 
   absl::Span<uint8_t> params_data_view() {
     return absl::MakeSpan(&data_[header().size], data_.size() - header().size);
@@ -331,6 +360,11 @@
   bool DeserializeUnknownType(const DriverTransport::RawMessage& message,
                               const DriverTransport& transport);
 
+  // For a Message whose contents were received from another node, this takes
+  // ownership of the heap-allocated copy of those contents. Invalidates this
+  // Message.
+  ReceivedDataBuffer TakeReceivedData() &&;
+
  protected:
   // Returns `x` aligned above to the nearest 8-byte boundary.
   constexpr size_t Align(size_t x) { return (x + 7) & ~7; }
@@ -382,14 +416,25 @@
       absl::Span<const uint8_t> data,
       absl::Span<DriverObject> objects);
 
-  // Raw serialized data for this message. This always begins with MessageHeader
-  // (or potentially some newer or older version thereof), whose actual size
-  // is determined by the header's `size` field. After that many bytes, a
-  // parameters structure immediately follows, as generated by an invocation of
-  // IPCZ_MSG_BEGIN()/IPCZ_MSG_END(). After fixed parameters, any number of
-  // dynamicaly inlined allocations may follow (e.g. for array contents,
-  // driver objects, etc.)
-  absl::InlinedVector<uint8_t, 128> data_;
+  // Inlined storage for this message's data. Used when constructing outgoing
+  // messages, since most are small and can avoid additional heap allocation
+  // before hitting the wire.
+  absl::optional<absl::InlinedVector<uint8_t, 128>> inlined_data_;
+
+  // Heap storage for this message's data, as received from a transport.
+  absl::optional<ReceivedDataBuffer> received_data_;
+
+  // A view over *either* `received_data_` *or* `inlined_data_`, or empty if
+  // neither is present.
+  //
+  // This is the raw serialized data for this message. It always begins with a
+  // MessageHeader (or potentially some newer or older version thereof), whose
+  // actual size is determined by the header's `size` field. After that many
+  // bytes, a parameters structure immediately follows, as generated by an
+  // invocation of IPCZ_MSG_BEGIN()/IPCZ_MSG_END(). After fixed parameters, any
+  // number of dynamicaly inlined allocations may follow (e.g. for array
+  // contents, driver objects, etc.)
+  absl::Span<uint8_t> data_;
 
   // Collection of DriverObjects attached to this message. These are attached
   // while building a message (e.g. by calling AppendDriverObject), and they are
@@ -441,6 +486,10 @@
     p.header.version = ParamDataType::kVersion;
   }
 
+  // Special constructor which avoids initializing storage that won't be used
+  // anyway.
+  explicit MessageWithParams(decltype(Message::kIncoming)) : Message() {}
+
   ~MessageWithParams() = default;
 
   // Convenient accessors for the message's main parameters struct, whose
diff --git a/src/ipcz/message_macros/message_declaration_macros.h b/src/ipcz/message_macros/message_declaration_macros.h
index 89b5e68..9ec1875 100644
--- a/src/ipcz/message_macros/message_declaration_macros.h
+++ b/src/ipcz/message_macros/message_declaration_macros.h
@@ -17,6 +17,7 @@
     id_decl;                                                     \
     version_decl;                                                \
     name();                                                      \
+    explicit name(decltype(kIncoming));                          \
     ~name();                                                     \
     bool Deserialize(const DriverTransport::RawMessage& message, \
                      const DriverTransport& transport);          \
diff --git a/src/ipcz/message_macros/message_definition_macros.h b/src/ipcz/message_macros/message_definition_macros.h
index f4149ae..02b2352 100644
--- a/src/ipcz/message_macros/message_definition_macros.h
+++ b/src/ipcz/message_macros/message_definition_macros.h
@@ -12,6 +12,7 @@
 
 #define IPCZ_MSG_BEGIN(name, id_decl, version_decl)                        \
   name::name() = default;                                                  \
+  name::name(decltype(kIncoming)) : MessageWithParams(kIncoming) {}        \
   name::~name() = default;                                                 \
   bool name::Deserialize(const DriverTransport::RawMessage& message,       \
                          const DriverTransport& transport) {               \
diff --git a/src/ipcz/message_macros/message_listener_definition_macros.h b/src/ipcz/message_macros/message_listener_definition_macros.h
index 72b31e4..3502fbb 100644
--- a/src/ipcz/message_macros/message_listener_definition_macros.h
+++ b/src/ipcz/message_macros/message_listener_definition_macros.h
@@ -30,7 +30,7 @@
 
 #define IPCZ_MSG_BEGIN(name, id_decl, version_decl)     \
   case name::kId: {                                     \
-    name message;                                       \
+    name message(Message::kIncoming);                   \
     if (!message.Deserialize(raw_message, transport)) { \
       return false;                                     \
     }                                                   \
diff --git a/src/ipcz/node_link.cc b/src/ipcz/node_link.cc
index 0fb03fe..37d59a9 100644
--- a/src/ipcz/node_link.cc
+++ b/src/ipcz/node_link.cc
@@ -500,7 +500,7 @@
 }
 
 bool NodeLink::OnAcceptParcel(msg::AcceptParcel& accept) {
-  absl::Span<const uint8_t> parcel_data =
+  absl::Span<uint8_t> parcel_data =
       accept.GetArrayView<uint8_t>(accept.params().parcel_data);
   absl::Span<const HandleType> handle_types =
       accept.GetArrayView<HandleType>(accept.params().handle_types);
@@ -583,9 +583,10 @@
       return false;
     }
   } else {
-    // The parcel's data was inlined within the AcceptParcel message.
-    parcel.SetInlinedData(
-        std::vector<uint8_t>(parcel_data.begin(), parcel_data.end()));
+    // The parcel's data was inlined within the AcceptParcel message. Adopt the
+    // Message contents so our local Parcel doesn't need to copy any data.
+    parcel.SetDataFromMessage(std::move(accept).TakeReceivedData(),
+                              parcel_data);
   }
 
   if (is_split_parcel) {
diff --git a/src/ipcz/node_link_memory_test.cc b/src/ipcz/node_link_memory_test.cc
index d1bdbd2..d7b5941 100644
--- a/src/ipcz/node_link_memory_test.cc
+++ b/src/ipcz/node_link_memory_test.cc
@@ -290,7 +290,7 @@
     Parcel parcel;
     parcel.AllocateData(kParcelSize, /*allow_partial=*/false,
                         &links.second->memory());
-    if (parcel.data_fragment().is_null()) {
+    if (!parcel.has_data_fragment()) {
       break;
     }
 
diff --git a/src/ipcz/parcel.cc b/src/ipcz/parcel.cc
index af271de..e85578f 100644
--- a/src/ipcz/parcel.cc
+++ b/src/ipcz/parcel.cc
@@ -23,56 +23,37 @@
 Parcel::Parcel(SequenceNumber sequence_number)
     : sequence_number_(sequence_number) {}
 
-// Note: We do not use default move construction or assignment because we want
-// to explicitly clear the data and object views of the moved-from Parcel.
-Parcel::Parcel(Parcel&& other)
-    : sequence_number_(other.sequence_number_),
-      remote_source_(std::move(other.remote_source_)),
-      inlined_data_(std::move(other.inlined_data_)),
-      data_fragment_(std::exchange(other.data_fragment_, {})),
-      data_fragment_memory_(
-          std::exchange(other.data_fragment_memory_, nullptr)),
-      objects_(std::move(other.objects_)),
-      data_view_(std::exchange(other.data_view_, {})),
-      objects_view_(std::exchange(other.objects_view_, {})) {}
+Parcel::Parcel(Parcel&& other) = default;
 
-Parcel& Parcel::operator=(Parcel&& other) {
-  sequence_number_ = other.sequence_number_;
-  remote_source_ = std::move(other.remote_source_);
-  inlined_data_ = std::move(other.inlined_data_);
-  data_fragment_ = std::exchange(other.data_fragment_, {});
-  data_fragment_memory_ = std::move(other.data_fragment_memory_);
-  objects_ = std::move(other.objects_);
-  data_view_ = std::exchange(other.data_view_, {});
-  objects_view_ = std::exchange(other.objects_view_, {});
-  return *this;
-}
+Parcel& Parcel::operator=(Parcel&& other) = default;
 
 Parcel::~Parcel() {
-  for (Ref<APIObject>& object : objects_) {
-    if (object) {
-      object->Close();
+  if (objects_) {
+    for (Ref<APIObject>& object : objects_->storage) {
+      if (object) {
+        object->Close();
+      }
     }
   }
-
-  if (!data_fragment_.is_null()) {
-    ABSL_ASSERT(data_fragment_memory_);
-    data_fragment_memory_->FreeFragment(data_fragment_);
-  }
 }
 
 void Parcel::SetInlinedData(std::vector<uint8_t> data) {
-  inlined_data_ = std::move(data);
-  data_view_ = absl::MakeSpan(inlined_data_);
+  data_.view = absl::MakeSpan(data);
+  data_.storage = std::move(data);
+}
+
+void Parcel::SetDataFromMessage(Message::ReceivedDataBuffer buffer,
+                                absl::Span<uint8_t> data_view) {
+  ABSL_ASSERT(data_view.begin() >= buffer.bytes().begin());
+  ABSL_ASSERT(data_view.end() <= buffer.bytes().end());
+  data_.storage = std::move(buffer);
+  data_.view = data_view;
 }
 
 void Parcel::AllocateData(size_t num_bytes,
                           bool allow_partial,
                           NodeLinkMemory* memory) {
-  // This should never be called on a Parcel that already has data.
-  ABSL_ASSERT(inlined_data_.empty());
-  ABSL_ASSERT(data_fragment_.is_null());
-  ABSL_ASSERT(data_view_.empty());
+  ABSL_ASSERT(absl::holds_alternative<absl::monostate>(data_.storage));
 
   Fragment fragment;
   if (num_bytes > 0 && memory) {
@@ -85,8 +66,9 @@
   }
 
   if (fragment.is_null()) {
-    inlined_data_.resize(num_bytes);
-    data_view_ = absl::MakeSpan(inlined_data_);
+    std::vector<uint8_t> bytes(num_bytes);
+    data_.view = absl::MakeSpan(bytes);
+    data_.storage = std::move(bytes);
     return;
   }
 
@@ -98,19 +80,13 @@
   // is not written until CommitData().
   const size_t data_size =
       std::min(num_bytes, fragment.size() - sizeof(FragmentHeader));
-  data_fragment_ = fragment;
-  data_fragment_memory_ = WrapRefCounted(memory);
-  data_view_ =
+  data_.storage.emplace<DataFragment>(WrapRefCounted(memory), fragment);
+  data_.view =
       fragment.mutable_bytes().subspan(sizeof(FragmentHeader), data_size);
 }
 
 bool Parcel::AdoptDataFragment(Ref<NodeLinkMemory> memory,
                                const Fragment& fragment) {
-  // This should never be called on a Parcel that already has data.
-  ABSL_ASSERT(inlined_data_.empty());
-  ABSL_ASSERT(data_fragment_.is_null());
-  ABSL_ASSERT(data_view_.empty());
-
   if (!fragment.is_addressable() || fragment.size() <= sizeof(FragmentHeader)) {
     return false;
   }
@@ -125,28 +101,30 @@
     return false;
   }
 
-  data_fragment_ = fragment;
-  data_fragment_memory_ = std::move(memory);
-  data_view_ =
+  data_.storage.emplace<DataFragment>(std::move(memory), fragment);
+  data_.view =
       fragment.mutable_bytes().subspan(sizeof(FragmentHeader), data_size);
   return true;
 }
 
 void Parcel::SetObjects(std::vector<Ref<APIObject>> objects) {
-  objects_ = std::move(objects);
-  objects_view_ = absl::MakeSpan(objects_);
+  ABSL_ASSERT(!objects_);
+  objects_ = std::make_unique<ObjectStorageWithView>();
+  objects_->storage = std::move(objects);
+  objects_->view = absl::MakeSpan(objects_->storage);
 }
 
 void Parcel::CommitData(size_t num_bytes) {
-  data_view_ = data_view_.first(num_bytes);
-  if (data_fragment_.is_null()) {
+  data_.view = data_.view.first(num_bytes);
+  if (!has_data_fragment()) {
     return;
   }
 
-  ABSL_ASSERT(data_fragment_.is_addressable());
-  ABSL_ASSERT(num_bytes <= data_fragment_.size() + sizeof(FragmentHeader));
-  auto& header =
-      *reinterpret_cast<FragmentHeader*>(data_fragment_.mutable_bytes().data());
+  DataFragment& storage = absl::get<DataFragment>(data_.storage);
+  ABSL_ASSERT(storage.is_valid());
+  ABSL_ASSERT(num_bytes <= storage.fragment().size() + sizeof(FragmentHeader));
+  auto& header = *reinterpret_cast<FragmentHeader*>(
+      storage.fragment().mutable_bytes().data());
   header.reserved = 0;
 
   // This store-release is balanced by the load-acquire in AdoptDataFragment()
@@ -158,23 +136,29 @@
 }
 
 void Parcel::ReleaseDataFragment() {
-  ABSL_ASSERT(!data_fragment_.is_null());
-  data_fragment_ = {};
-  data_fragment_memory_.reset();
-  data_view_ = {};
+  ABSL_ASSERT(has_data_fragment());
+  std::ignore = absl::get<DataFragment>(data_.storage).release();
+  data_.storage.emplace<absl::monostate>();
+  data_.view = {};
 }
 
 void Parcel::Consume(size_t num_bytes, absl::Span<IpczHandle> out_handles) {
-  auto data = data_view();
-  auto objects = objects_view();
+  auto data = data_.view;
+  absl::Span<Ref<APIObject>> objects;
+  if (objects_) {
+    objects = objects_->view;
+  }
   ABSL_ASSERT(num_bytes <= data.size());
   ABSL_ASSERT(out_handles.size() <= objects.size());
+
   for (size_t i = 0; i < out_handles.size(); ++i) {
     out_handles[i] = APIObject::ReleaseAsHandle(std::move(objects[i]));
   }
 
-  data_view_.remove_prefix(num_bytes);
-  objects_view_.remove_prefix(out_handles.size());
+  data_.view.remove_prefix(num_bytes);
+  if (objects_) {
+    objects_->view.remove_prefix(out_handles.size());
+  }
 }
 
 std::string Parcel::Describe() const {
@@ -202,4 +186,45 @@
   return ss.str();
 }
 
+Parcel::DataFragment::DataFragment(DataFragment&& other)
+    : memory_(std::move(other.memory_)),
+      fragment_(std::exchange(other.fragment_, {})) {}
+
+Parcel::DataFragment& Parcel::DataFragment::operator=(DataFragment&& other) {
+  reset();
+  memory_ = std::move(other.memory_);
+  fragment_ = std::exchange(other.fragment_, {});
+  return *this;
+}
+
+Parcel::DataFragment::~DataFragment() {
+  reset();
+}
+
+Fragment Parcel::DataFragment::release() {
+  memory_.reset();
+  return std::exchange(fragment_, {});
+}
+
+void Parcel::DataFragment::reset() {
+  if (!is_valid()) {
+    return;
+  }
+
+  memory_->FreeFragment(fragment_);
+  memory_.reset();
+  fragment_ = {};
+}
+
+Parcel::DataStorageWithView::DataStorageWithView(DataStorageWithView&& other)
+    : storage(std::exchange(other.storage, absl::monostate{})),
+      view(std::exchange(other.view, {})) {}
+
+Parcel::DataStorageWithView& Parcel::DataStorageWithView::operator=(
+    DataStorageWithView&& other) {
+  storage = std::exchange(other.storage, absl::monostate{});
+  view = std::exchange(other.view, {});
+  return *this;
+}
+
 }  // namespace ipcz
diff --git a/src/ipcz/parcel.h b/src/ipcz/parcel.h
index 20fe1ec..b19f63c 100644
--- a/src/ipcz/parcel.h
+++ b/src/ipcz/parcel.h
@@ -8,15 +8,19 @@
 #include <atomic>
 #include <cstddef>
 #include <cstdint>
+#include <memory>
 #include <string>
 #include <vector>
 
 #include "ipcz/api_object.h"
 #include "ipcz/fragment.h"
 #include "ipcz/ipcz.h"
+#include "ipcz/message.h"
 #include "ipcz/sequence_number.h"
+#include "third_party/abseil-cpp/absl/base/macros.h"
 #include "third_party/abseil-cpp/absl/container/inlined_vector.h"
 #include "third_party/abseil-cpp/absl/types/span.h"
+#include "third_party/abseil-cpp/absl/types/variant.h"
 #include "util/ref_counted.h"
 
 namespace ipcz {
@@ -39,15 +43,25 @@
 
   // Indicates whether this Parcel is empty, meaning its data and objects have
   // been fully consumed.
-  bool empty() const { return data_view_.empty() && objects_view_.empty(); }
+  bool empty() const { return data_view().empty() && objects_view().empty(); }
 
+  // Sets this Parcel's data to the contents of `data`. Any prior data in the
+  // Parcel is discarded.
   void SetInlinedData(std::vector<uint8_t> data);
+
+  // Sets this Parcel's data to the contents of `data_view`, backed by a subset
+  // of the memory within `buffer`. Any prior data in the Parcel is discarded.
+  void SetDataFromMessage(Message::ReceivedDataBuffer buffer,
+                          absl::Span<uint8_t> data_view);
+
+  // Attaches the given set of `objects` to this Parcel.
   void SetObjects(std::vector<Ref<APIObject>> objects);
 
-  // Allocates `num_bytes` of storage for this parcel's data. If `memory` is
+  // Allocates `num_bytes` of storage for this Parcel's data. If `memory` is
   // non-null then its fragment pool is the preferred allocation source.
-  // Otherwise memory is allocated on the heap, and the data placed therein will
-  // be inlined within any message that transmits this parcel.
+  // Otherwise memory is allocated and zero-initialized on the heap, and the
+  // data placed therein will be inlined within any message that transmits this
+  // parcel.
   //
   // If `memory` is non-null and `allow_partial` is true, this may allocate less
   // memory than requested if some reasonable amount of space is still available
@@ -55,6 +69,9 @@
   //
   // Upon return, data_view() references the allocated memory wherever it
   // resides.
+  //
+  // If the Parcel had any data attached prior to this call, it is discarded and
+  // replaced with the newly allocated storage.
   void AllocateData(size_t num_bytes,
                     bool allow_partial,
                     NodeLinkMemory* memory);
@@ -70,20 +87,30 @@
   }
   const Ref<NodeLink>& remote_source() const { return remote_source_; }
 
-  absl::Span<uint8_t> data_view() { return data_view_; }
-  absl::Span<const uint8_t> data_view() const { return data_view_; }
+  absl::Span<uint8_t> data_view() { return data_.view; }
+  absl::Span<const uint8_t> data_view() const { return data_.view; }
 
   size_t data_size() const { return data_view().size(); }
 
-  const Fragment& data_fragment() const { return data_fragment_; }
+  bool has_data_fragment() const {
+    return absl::holds_alternative<DataFragment>(data_.storage);
+  }
+  const Fragment& data_fragment() const {
+    ABSL_ASSERT(has_data_fragment());
+    return absl::get<DataFragment>(data_.storage).fragment();
+  }
   const Ref<NodeLinkMemory>& data_fragment_memory() const {
-    return data_fragment_memory_;
+    ABSL_ASSERT(has_data_fragment());
+    return absl::get<DataFragment>(data_.storage).memory();
   }
 
-  absl::Span<Ref<APIObject>> objects_view() { return objects_view_; }
-  absl::Span<const Ref<APIObject>> objects_view() const {
-    return objects_view_;
+  absl::Span<Ref<APIObject>> objects_view() const {
+    if (!objects_) {
+      return {};
+    }
+    return objects_->view;
   }
+
   size_t num_objects() const { return objects_view().size(); }
 
   // Commits `num_bytes` of data to this Parcel's data fragment. This MUST be
@@ -127,26 +154,84 @@
     uint32_t reserved;
   };
 
+  // Holds a Fragment and a reference to its backing memory. Used when a
+  // Parcel's data lives in shared memory. This implements exclusive ownership
+  // of the underlying fragment, and upon destruction it releases the fragment
+  // back into the memory pool.
+  class DataFragment {
+   public:
+    DataFragment() = default;
+    DataFragment(Ref<NodeLinkMemory> memory, const Fragment& fragment)
+        : memory_(std::move(memory)), fragment_(fragment) {
+      // Parcels can only be given data fragments which are already addressable.
+      ABSL_ASSERT(is_valid());
+    }
+    DataFragment(DataFragment&& other);
+    DataFragment& operator=(DataFragment&& other);
+    ~DataFragment();
+
+    bool is_valid() const { return memory_ && fragment_.is_addressable(); }
+
+    const Fragment& fragment() const { return fragment_; }
+    const Ref<NodeLinkMemory>& memory() const { return memory_; }
+
+    [[nodiscard]] Fragment release();
+    void reset();
+
+   private:
+    Ref<NodeLinkMemory> memory_;
+    Fragment fragment_;
+  };
+
+  // A variant backing type for the parcel's data. Data may be in shared memory,
+  // heap-allocated and initialized from within the Parcel, or heap-allocated by
+  // a received Message and moved into the Parcel from there.
+  using DataStorage = absl::variant<absl::monostate,
+                                    DataFragment,
+                                    std::vector<uint8_t>,
+                                    Message::ReceivedDataBuffer>;
+
+  // Groups a DataStorage with a view into its data. This defines its own move
+  // construction and assignment operators to ensure that moved-from data is
+  // cleared. Note that the view may reference only a subset of the data within
+  // the DataStorage. This subset is considered the Parcel's data.
+  struct DataStorageWithView {
+    DataStorageWithView() = default;
+    DataStorageWithView(DataStorageWithView&& other);
+    DataStorageWithView& operator=(DataStorageWithView&& other);
+    ~DataStorageWithView() = default;
+
+    DataStorage storage;
+    absl::Span<uint8_t> view;
+  };
+
+  // Groups a vector of object attachments along with a view into that vector.
+  // Note that the view may reference only a subset of the elements within the
+  // vector if some objects have been removed or not-yet-attached.
+  struct ObjectStorageWithView {
+    ObjectStorageWithView() = default;
+    ObjectStorageWithView(const ObjectStorageWithView&) = delete;
+    ObjectStorageWithView& operator=(const ObjectStorageWithView&) = delete;
+    ~ObjectStorageWithView() = default;
+
+    std::vector<Ref<APIObject>> storage;
+    absl::Span<Ref<APIObject>> view;
+  };
+
   SequenceNumber sequence_number_{0};
 
   // If this Parcel was received from a remote node, this tracks the NodeLink
   // which received it.
   Ref<NodeLink> remote_source_;
 
-  // A copy of the parcel's data, owned by the Parcel itself. Used only if
-  // `data_fragment_` is null.
-  std::vector<uint8_t> inlined_data_;
+  // Concrete storage for the parcel's data, along with a view the data not yet
+  // consumed.
+  DataStorageWithView data_;
 
-  // If non-null, a shared memory fragment which contains this parcel's data.
-  Fragment data_fragment_;
-  Ref<NodeLinkMemory> data_fragment_memory_;
-
-  // The set of APIObjects attached to this parcel.
-  std::vector<Ref<APIObject>> objects_;
-
-  // Views into any unconsumed data and objects.
-  absl::Span<uint8_t> data_view_;
-  absl::Span<Ref<APIObject>> objects_view_;
+  // The set of APIObjects attached to this parcel, and a view of the objects
+  // not yet consumed from it. Heap-allocated to keep Parcels small in the
+  // common case of no object attachments.
+  std::unique_ptr<ObjectStorageWithView> objects_;
 };
 
 }  // namespace ipcz
diff --git a/src/ipcz/remote_router_link.cc b/src/ipcz/remote_router_link.cc
index a2dbc96..c4daf80 100644
--- a/src/ipcz/remote_router_link.cc
+++ b/src/ipcz/remote_router_link.cc
@@ -185,7 +185,7 @@
   // Allocate all the arrays in the message. Note that each allocation may
   // relocate the parcel data in memory, so views into these arrays should not
   // be acquired until all allocations are complete.
-  if (parcel.data_fragment().is_null() ||
+  if (!parcel.has_data_fragment() ||
       parcel.data_fragment_memory() != &node_link()->memory()) {
     // Only inline parcel data within the message when we don't have a separate
     // data fragment allocated already, or if the allocated fragment is on the