ipcz: Implement remote parcel queue accounting

For the common case of stable routes with a single central link, this
implements basic remote parcel queue accounting via RouterLink and
RouterLinkState.

This allows traps to monitor for consumption of parcels and parcel data
by a portal's remote peer, and it allows for applications to self-limit
outgoing transmissions in order to keep the remote queue within some
agreed-upon bounds.

Ihese features make it possible for Mojo data pipes to be reimplemented
on top of ipcz portals.

Bug: 1299283
Change-Id: I9e088f354d6bc9d616ba9b8c84946fb4c0dc8d88
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3803785
Commit-Queue: Ken Rockot <rockot@google.com>
Reviewed-by: Daniel Cheng <dcheng@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1033117}
NOKEYCHECK=True
GitOrigin-RevId: f03f83a73199622c1b6285c9291bbc31dffc6aa5
diff --git a/src/BUILD.gn b/src/BUILD.gn
index 8756a8d..f689d5b 100644
--- a/src/BUILD.gn
+++ b/src/BUILD.gn
@@ -381,6 +381,7 @@
     "ipcz/route_edge_test.cc",
     "ipcz/router_link_test.cc",
     "ipcz/sequenced_queue_test.cc",
+    "queueing_test.cc",
     "reference_drivers/sync_reference_driver_test.cc",
     "remote_portal_test.cc",
     "trap_test.cc",
diff --git a/src/ipcz/local_router_link.cc b/src/ipcz/local_router_link.cc
index 4922f81..9f01d8c 100644
--- a/src/ipcz/local_router_link.cc
+++ b/src/ipcz/local_router_link.cc
@@ -114,6 +114,27 @@
   }
 }
 
+size_t LocalRouterLink::GetParcelCapacityInBytes(const IpczPutLimits& limits) {
+  return state_->GetRouter(side_.opposite())->GetInboundCapacityInBytes(limits);
+}
+
+RouterLinkState::QueueState LocalRouterLink::GetPeerQueueState() {
+  return state_->link_state().GetQueueState(side_.opposite());
+}
+
+bool LocalRouterLink::UpdateInboundQueueState(size_t num_parcels,
+                                              size_t num_bytes) {
+  return state_->link_state().UpdateQueueState(side_, num_parcels, num_bytes);
+}
+
+void LocalRouterLink::NotifyDataConsumed() {
+  state_->GetRouter(side_.opposite())->NotifyPeerConsumedData();
+}
+
+bool LocalRouterLink::EnablePeerMonitoring(bool enable) {
+  return state_->link_state().SetSideIsMonitoringPeer(side_, enable);
+}
+
 void LocalRouterLink::AcceptRouteDisconnected() {
   if (Ref<Router> receiver = state_->GetRouter(side_.opposite())) {
     receiver->AcceptRouteDisconnectedFrom(state_->type());
diff --git a/src/ipcz/local_router_link.h b/src/ipcz/local_router_link.h
index 77291e8..64385af 100644
--- a/src/ipcz/local_router_link.h
+++ b/src/ipcz/local_router_link.h
@@ -40,6 +40,11 @@
   void AcceptParcel(Parcel& parcel) override;
   void AcceptRouteClosure(SequenceNumber sequence_length) override;
   void AcceptRouteDisconnected() override;
+  size_t GetParcelCapacityInBytes(const IpczPutLimits& limits) override;
+  RouterLinkState::QueueState GetPeerQueueState() override;
+  bool UpdateInboundQueueState(size_t num_parcels, size_t num_bytes) override;
+  void NotifyDataConsumed() override;
+  bool EnablePeerMonitoring(bool enable) override;
   void MarkSideStable() override;
   bool TryLockForBypass(const NodeName& bypass_request_source) override;
   bool TryLockForClosure() override;
diff --git a/src/ipcz/node_link.cc b/src/ipcz/node_link.cc
index 1bf9a92..22904e0 100644
--- a/src/ipcz/node_link.cc
+++ b/src/ipcz/node_link.cc
@@ -608,6 +608,13 @@
       sublink->router_link->GetType());
 }
 
+bool NodeLink::OnNotifyDataConsumed(msg::NotifyDataConsumed& notify) {
+  if (Ref<Router> router = GetRouter(notify.params().sublink)) {
+    router->NotifyPeerConsumedData();
+  }
+  return true;
+}
+
 bool NodeLink::OnBypassPeer(msg::BypassPeer& bypass) {
   absl::optional<Sublink> sublink = GetSublink(bypass.params().sublink);
   if (!sublink) {
diff --git a/src/ipcz/node_link.h b/src/ipcz/node_link.h
index f3b83cb..2671bdb 100644
--- a/src/ipcz/node_link.h
+++ b/src/ipcz/node_link.h
@@ -241,6 +241,7 @@
       msg::AcceptParcelDriverObjects& accept) override;
   bool OnRouteClosed(msg::RouteClosed& route_closed) override;
   bool OnRouteDisconnected(msg::RouteDisconnected& route_disconnected) override;
+  bool OnNotifyDataConsumed(msg::NotifyDataConsumed& notify) override;
   bool OnBypassPeer(msg::BypassPeer& bypass) override;
   bool OnAcceptBypassLink(msg::AcceptBypassLink& accept) override;
   bool OnStopProxying(msg::StopProxying& stop) override;
diff --git a/src/ipcz/node_messages_generator.h b/src/ipcz/node_messages_generator.h
index d3e8a94..ba36f9e 100644
--- a/src/ipcz/node_messages_generator.h
+++ b/src/ipcz/node_messages_generator.h
@@ -317,6 +317,13 @@
   IPCZ_MSG_PARAM(SublinkId, sublink)
 IPCZ_MSG_END()
 
+// Notifies a Router that the other side of its route has consumed some parcels
+// or parcel data from its inbound queue.
+IPCZ_MSG_BEGIN(NotifyDataConsumed, IPCZ_MSG_ID(24), IPCZ_MSG_VERSION(0))
+  // Identifies the router to receive this message.
+  IPCZ_MSG_PARAM(SublinkId, sublink)
+IPCZ_MSG_END()
+
 // Informs a router that its outward peer can be bypassed. Given routers X and Y
 // on the central link, and a router Z as Y's inward peer:
 //
diff --git a/src/ipcz/portal.cc b/src/ipcz/portal.cc
index 4de1d7c..66c5fd4 100644
--- a/src/ipcz/portal.cc
+++ b/src/ipcz/portal.cc
@@ -80,6 +80,10 @@
     return IPCZ_RESULT_NOT_FOUND;
   }
 
+  if (limits && router_->GetOutboundCapacityInBytes(*limits) < data.size()) {
+    return IPCZ_RESULT_RESOURCE_EXHAUSTED;
+  }
+
   Parcel parcel;
   parcel.SetInlinedData(std::vector<uint8_t>(data.begin(), data.end()));
   parcel.SetObjects(std::move(objects));
diff --git a/src/ipcz/remote_router_link.cc b/src/ipcz/remote_router_link.cc
index dd1ba21..0d6de61 100644
--- a/src/ipcz/remote_router_link.cc
+++ b/src/ipcz/remote_router_link.cc
@@ -239,6 +239,52 @@
   node_link()->Transmit(route_closed);
 }
 
+size_t RemoteRouterLink::GetParcelCapacityInBytes(const IpczPutLimits& limits) {
+  if (limits.max_queued_bytes == 0 || limits.max_queued_parcels == 0) {
+    return 0;
+  }
+
+  RouterLinkState* state = GetLinkState();
+  if (!state) {
+    // This is only a best-effort estimate. With no link state yet, err on the
+    // side of more data flow.
+    return limits.max_queued_bytes;
+  }
+
+  const RouterLinkState::QueueState peer_queue =
+      state->GetQueueState(side_.opposite());
+  if (peer_queue.num_parcels >= limits.max_queued_parcels ||
+      peer_queue.num_bytes >= limits.max_queued_bytes) {
+    return 0;
+  }
+
+  return limits.max_queued_bytes - peer_queue.num_bytes;
+}
+
+RouterLinkState::QueueState RemoteRouterLink::GetPeerQueueState() {
+  if (auto* state = GetLinkState()) {
+    return state->GetQueueState(side_.opposite());
+  }
+  return {.num_parcels = 0, .num_bytes = 0};
+}
+
+bool RemoteRouterLink::UpdateInboundQueueState(size_t num_parcels,
+                                               size_t num_bytes) {
+  RouterLinkState* state = GetLinkState();
+  return state && state->UpdateQueueState(side_, num_parcels, num_bytes);
+}
+
+void RemoteRouterLink::NotifyDataConsumed() {
+  msg::NotifyDataConsumed notify;
+  notify.params().sublink = sublink_;
+  node_link()->Transmit(notify);
+}
+
+bool RemoteRouterLink::EnablePeerMonitoring(bool enable) {
+  RouterLinkState* state = GetLinkState();
+  return state && state->SetSideIsMonitoringPeer(side_, enable);
+}
+
 void RemoteRouterLink::AcceptRouteDisconnected() {
   msg::RouteDisconnected route_disconnected;
   route_disconnected.params().sublink = sublink_;
diff --git a/src/ipcz/remote_router_link.h b/src/ipcz/remote_router_link.h
index 98950ab..0685e28 100644
--- a/src/ipcz/remote_router_link.h
+++ b/src/ipcz/remote_router_link.h
@@ -56,6 +56,11 @@
   void AcceptParcel(Parcel& parcel) override;
   void AcceptRouteClosure(SequenceNumber sequence_length) override;
   void AcceptRouteDisconnected() override;
+  size_t GetParcelCapacityInBytes(const IpczPutLimits& limits) override;
+  RouterLinkState::QueueState GetPeerQueueState() override;
+  bool UpdateInboundQueueState(size_t num_parcels, size_t num_bytes) override;
+  void NotifyDataConsumed() override;
+  bool EnablePeerMonitoring(bool enable) override;
   void MarkSideStable() override;
   bool TryLockForBypass(const NodeName& bypass_request_source) override;
   bool TryLockForClosure() override;
diff --git a/src/ipcz/router.cc b/src/ipcz/router.cc
index a76a357..c69534f 100644
--- a/src/ipcz/router.cc
+++ b/src/ipcz/router.cc
@@ -20,6 +20,7 @@
 #include "third_party/abseil-cpp/absl/synchronization/mutex.h"
 #include "util/log.h"
 #include "util/multi_mutex_lock.h"
+#include "util/safe_math.h"
 
 namespace ipcz {
 
@@ -183,6 +184,49 @@
   Flush(kForceProxyBypassAttempt);
 }
 
+size_t Router::GetOutboundCapacityInBytes(const IpczPutLimits& limits) {
+  if (limits.max_queued_bytes == 0 || limits.max_queued_parcels == 0) {
+    return 0;
+  }
+
+  size_t num_queued_bytes = 0;
+  Ref<RouterLink> link;
+  {
+    absl::MutexLock lock(&mutex_);
+    if (outbound_parcels_.GetNumAvailableElements() >=
+        limits.max_queued_parcels) {
+      return 0;
+    }
+    if (outbound_parcels_.GetTotalAvailableElementSize() >
+        limits.max_queued_bytes) {
+      return 0;
+    }
+
+    num_queued_bytes = outbound_parcels_.GetTotalAvailableElementSize();
+    link = outward_edge_.primary_link();
+  }
+
+  size_t link_capacity =
+      link ? link->GetParcelCapacityInBytes(limits) : limits.max_queued_bytes;
+  if (link_capacity <= num_queued_bytes) {
+    return 0;
+  }
+
+  return link_capacity - num_queued_bytes;
+}
+
+size_t Router::GetInboundCapacityInBytes(const IpczPutLimits& limits) {
+  absl::MutexLock lock(&mutex_);
+  const size_t num_queued_parcels = inbound_parcels_.GetNumAvailableElements();
+  const size_t num_queued_bytes =
+      inbound_parcels_.GetTotalAvailableElementSize();
+  if (num_queued_bytes >= limits.max_queued_bytes ||
+      num_queued_parcels >= limits.max_queued_parcels) {
+    return 0;
+  }
+  return limits.max_queued_bytes - num_queued_bytes;
+}
+
 bool Router::AcceptInboundParcel(Parcel& parcel) {
   TrapEventDispatcher dispatcher;
   {
@@ -200,6 +244,12 @@
       status_.num_local_bytes = inbound_parcels_.GetTotalAvailableElementSize();
       traps_.UpdatePortalStatus(status_, TrapSet::UpdateReason::kNewLocalParcel,
                                 dispatcher);
+
+      const Ref<RouterLink>& outward_link = outward_edge_.primary_link();
+      if (outward_link && outward_link->GetType().is_central()) {
+        outward_link->UpdateInboundQueueState(status_.num_local_parcels,
+                                              status_.num_local_bytes);
+      }
     }
   }
 
@@ -312,54 +362,91 @@
   return true;
 }
 
+void Router::NotifyPeerConsumedData() {
+  TrapEventDispatcher dispatcher;
+  {
+    absl::MutexLock lock(&mutex_);
+    const Ref<RouterLink>& outward_link = outward_edge_.primary_link();
+    if (!outward_link || !outward_link->GetType().is_central() ||
+        inward_edge_) {
+      return;
+    }
+
+    const RouterLinkState::QueueState peer_state =
+        outward_link->GetPeerQueueState();
+    status_.num_remote_parcels = peer_state.num_parcels;
+    status_.num_remote_bytes = peer_state.num_bytes;
+    traps_.UpdatePortalStatus(status_, TrapSet::UpdateReason::kRemoteActivity,
+                              dispatcher);
+
+    if (!traps_.need_remote_state()) {
+      outward_link->EnablePeerMonitoring(false);
+    }
+  }
+}
+
 IpczResult Router::GetNextInboundParcel(IpczGetFlags flags,
                                         void* data,
                                         size_t* num_bytes,
                                         IpczHandle* handles,
                                         size_t* num_handles) {
   TrapEventDispatcher dispatcher;
-  absl::MutexLock lock(&mutex_);
-  if (inbound_parcels_.IsSequenceFullyConsumed()) {
-    return IPCZ_RESULT_NOT_FOUND;
-  }
-  if (!inbound_parcels_.HasNextElement()) {
-    return IPCZ_RESULT_UNAVAILABLE;
+  Ref<RouterLink> link_to_notify;
+  {
+    absl::MutexLock lock(&mutex_);
+    if (inbound_parcels_.IsSequenceFullyConsumed()) {
+      return IPCZ_RESULT_NOT_FOUND;
+    }
+    if (!inbound_parcels_.HasNextElement()) {
+      return IPCZ_RESULT_UNAVAILABLE;
+    }
+
+    Parcel& p = inbound_parcels_.NextElement();
+    const bool allow_partial = (flags & IPCZ_GET_PARTIAL) != 0;
+    const size_t data_capacity = num_bytes ? *num_bytes : 0;
+    const size_t handles_capacity = num_handles ? *num_handles : 0;
+    const size_t data_size =
+        allow_partial ? std::min(p.data_size(), data_capacity) : p.data_size();
+    const size_t handles_size =
+        allow_partial ? std::min(p.num_objects(), handles_capacity)
+                      : p.num_objects();
+    if (num_bytes) {
+      *num_bytes = data_size;
+    }
+    if (num_handles) {
+      *num_handles = handles_size;
+    }
+
+    const bool consuming_whole_parcel =
+        data_capacity >= data_size && handles_capacity >= handles_size;
+    if (!consuming_whole_parcel && !allow_partial) {
+      return IPCZ_RESULT_RESOURCE_EXHAUSTED;
+    }
+
+    memcpy(data, p.data_view().data(), data_size);
+    const bool ok = inbound_parcels_.Consume(
+        data_size, absl::MakeSpan(handles, handles_size));
+    ABSL_ASSERT(ok);
+
+    status_.num_local_parcels = inbound_parcels_.GetNumAvailableElements();
+    status_.num_local_bytes = inbound_parcels_.GetTotalAvailableElementSize();
+    if (inbound_parcels_.IsSequenceFullyConsumed()) {
+      status_.flags |= IPCZ_PORTAL_STATUS_DEAD;
+    }
+    traps_.UpdatePortalStatus(
+        status_, TrapSet::UpdateReason::kLocalParcelConsumed, dispatcher);
+
+    const Ref<RouterLink>& outward_link = outward_edge_.primary_link();
+    if (outward_link && outward_link->GetType().is_central() &&
+        outward_link->UpdateInboundQueueState(status_.num_local_parcels,
+                                              status_.num_local_bytes)) {
+      link_to_notify = outward_link;
+    }
   }
 
-  Parcel& p = inbound_parcels_.NextElement();
-  const bool allow_partial = (flags & IPCZ_GET_PARTIAL) != 0;
-  const size_t data_capacity = num_bytes ? *num_bytes : 0;
-  const size_t handles_capacity = num_handles ? *num_handles : 0;
-  const size_t data_size =
-      allow_partial ? std::min(p.data_size(), data_capacity) : p.data_size();
-  const size_t handles_size = allow_partial
-                                  ? std::min(p.num_objects(), handles_capacity)
-                                  : p.num_objects();
-  if (num_bytes) {
-    *num_bytes = data_size;
+  if (link_to_notify) {
+    link_to_notify->NotifyDataConsumed();
   }
-  if (num_handles) {
-    *num_handles = handles_size;
-  }
-
-  const bool consuming_whole_parcel =
-      data_capacity >= data_size && handles_capacity >= handles_size;
-  if (!consuming_whole_parcel && !allow_partial) {
-    return IPCZ_RESULT_RESOURCE_EXHAUSTED;
-  }
-
-  memcpy(data, p.data_view().data(), data_size);
-  const bool ok = inbound_parcels_.Consume(
-      data_size, absl::MakeSpan(handles, handles_size));
-  ABSL_ASSERT(ok);
-
-  status_.num_local_parcels = inbound_parcels_.GetNumAvailableElements();
-  status_.num_local_bytes = inbound_parcels_.GetTotalAvailableElementSize();
-  if (inbound_parcels_.IsSequenceFullyConsumed()) {
-    status_.flags |= IPCZ_PORTAL_STATUS_DEAD;
-  }
-  traps_.UpdatePortalStatus(
-      status_, TrapSet::UpdateReason::kLocalParcelConsumed, dispatcher);
   return IPCZ_RESULT_OK;
 }
 
@@ -368,9 +455,46 @@
                         uint64_t context,
                         IpczTrapConditionFlags* satisfied_condition_flags,
                         IpczPortalStatus* status) {
-  absl::MutexLock lock(&mutex_);
-  return traps_.Add(conditions, handler, context, status_,
-                    satisfied_condition_flags, status);
+  const bool need_remote_state =
+      (conditions.flags & (IPCZ_TRAP_BELOW_MAX_REMOTE_PARCELS |
+                           IPCZ_TRAP_BELOW_MAX_REMOTE_BYTES)) != 0;
+  {
+    absl::MutexLock lock(&mutex_);
+    const Ref<RouterLink>& outward_link = outward_edge_.primary_link();
+    if (need_remote_state) {
+      status_.num_remote_parcels = outbound_parcels_.GetNumAvailableElements();
+      status_.num_remote_bytes =
+          outbound_parcels_.GetTotalAvailableElementSize();
+
+      if (outward_link && outward_link->GetType().is_central()) {
+        const RouterLinkState::QueueState peer_state =
+            outward_link->GetPeerQueueState();
+        status_.num_remote_parcels =
+            SaturatedAdd(status_.num_remote_parcels,
+                         static_cast<size_t>(peer_state.num_parcels));
+        status_.num_remote_bytes =
+            SaturatedAdd(status_.num_remote_bytes,
+                         static_cast<size_t>(peer_state.num_bytes));
+      }
+    }
+
+    const bool already_monitoring_remote_state = traps_.need_remote_state();
+    IpczResult result = traps_.Add(conditions, handler, context, status_,
+                                   satisfied_condition_flags, status);
+    if (result != IPCZ_RESULT_OK || !need_remote_state) {
+      return result;
+    }
+
+    if (!already_monitoring_remote_state) {
+      outward_link->EnablePeerMonitoring(true);
+    }
+  }
+
+  // Safeguard against races between remote state changes and the new trap being
+  // installed above. Only reached if the new trap monitors remote state.
+  ABSL_ASSERT(need_remote_state);
+  NotifyPeerConsumedData();
+  return IPCZ_RESULT_OK;
 }
 
 // static
diff --git a/src/ipcz/router.h b/src/ipcz/router.h
index 5ffd361..4e3c5c3 100644
--- a/src/ipcz/router.h
+++ b/src/ipcz/router.h
@@ -96,6 +96,16 @@
   // within this call.
   void SetOutwardLink(Ref<RouterLink> link);
 
+  // Returns a best-effort estimation of the maximum parcel size (in bytes) that
+  // can be sent outward from this router without the receiving portal exceeding
+  // any of the specified `limits`.
+  size_t GetOutboundCapacityInBytes(const IpczPutLimits& limits);
+
+  // Returns the maximum inbound parcel size (in bytes) that can be accepted by
+  // this router's inbound parcel queue without that queue exceeding any of the
+  // specified `limits`.
+  size_t GetInboundCapacityInBytes(const IpczPutLimits& limits);
+
   // Accepts an inbound parcel from the outward edge of this router, either to
   // queue it for retrieval or forward it further inward.
   bool AcceptInboundParcel(Parcel& parcel);
@@ -111,6 +121,10 @@
   bool AcceptRouteClosureFrom(LinkType link_type,
                               SequenceNumber sequence_length);
 
+  // Informs this router that its outward peer consumed some inbound parcels or
+  // parcel data.
+  void NotifyPeerConsumedData();
+
   // Accepts notification from a link bound to this Router that some node along
   // the route (in the direction of that link) has been disconnected, e.g. due
   // to a crash, and that the route is no longer functional as a result. This is
diff --git a/src/ipcz/router_link.h b/src/ipcz/router_link.h
index b6fa153..66cfd94 100644
--- a/src/ipcz/router_link.h
+++ b/src/ipcz/router_link.h
@@ -8,6 +8,7 @@
 #include "ipcz/fragment_ref.h"
 #include "ipcz/link_type.h"
 #include "ipcz/node_name.h"
+#include "ipcz/router_link_state.h"
 #include "ipcz/sequence_number.h"
 #include "ipcz/sublink_id.h"
 #include "util/ref_counted.h"
@@ -18,7 +19,6 @@
 class Parcel;
 class RemoteRouterLink;
 class Router;
-struct RouterLinkState;
 
 // A RouterLink represents one endpoint of a link between two Routers. All
 // subclasses must be thread-safe.
@@ -62,6 +62,31 @@
   // delivery of any further parcels.
   virtual void AcceptRouteDisconnected() = 0;
 
+  // Returns a best-effort estimation of how much new parcel data can be
+  // transmitted across the link before one or more limits described by `limits`
+  // would be exceeded on the receiving portal.
+  virtual size_t GetParcelCapacityInBytes(const IpczPutLimits& limits) = 0;
+
+  // Returns a best-effort snapshot of the last known state of the inbound
+  // parcel queue on the other side of this link. This is only meaningful for
+  // central links.
+  virtual RouterLinkState::QueueState GetPeerQueueState() = 0;
+
+  // Updates the QueueState for this side of the link, returning true if and
+  // only if the other side wants to be notified about the update.
+  virtual bool UpdateInboundQueueState(size_t num_parcels,
+                                       size_t num_bytes) = 0;
+
+  // Notifies the other side that this side has consumed some parcels or parcel
+  // data from its inbound queue. Should only be called on central links when
+  // the other side has expressed interest in such notifications.
+  virtual void NotifyDataConsumed() = 0;
+
+  // Controls whether the caller's side of the link is interested in being
+  // notified about data consumption on the opposite side of the link. Returns
+  // the previous value of this bit.
+  virtual bool EnablePeerMonitoring(bool enable) = 0;
+
   // Signals that this side of the link is in a stable state suitable for one
   // side or the other to lock the link, either for bypass or closure
   // propagation. Only once both sides are marked stable can either side lock
diff --git a/src/ipcz/router_link_state.cc b/src/ipcz/router_link_state.cc
index 774104b..6fdb059 100644
--- a/src/ipcz/router_link_state.cc
+++ b/src/ipcz/router_link_state.cc
@@ -10,13 +10,33 @@
 
 namespace ipcz {
 
+namespace {
+
+template <typename T, typename U>
+void StoreSaturated(std::atomic<T>& dest, U value) {
+  if (value < std::numeric_limits<T>::max()) {
+    dest.store(value, std::memory_order_relaxed);
+  } else {
+    dest.store(std::numeric_limits<T>::max(), std::memory_order_relaxed);
+  }
+}
+
+template <typename T>
+T& SelectBySide(LinkSide side, T& for_a, T& for_b) {
+  if (side.is_side_a()) {
+    return for_a;
+  }
+  return for_b;
+}
+
+}  // namespace
+
 RouterLinkState::RouterLinkState() = default;
 
 // static
 RouterLinkState& RouterLinkState::Initialize(void* where) {
   auto& state = *static_cast<RouterLinkState*>(where);
   new (&state) RouterLinkState();
-  memset(state.reserved, 0, sizeof(state.reserved));
   std::atomic_thread_fence(std::memory_order_release);
   return state;
 }
@@ -104,4 +124,38 @@
   return true;
 }
 
+RouterLinkState::QueueState RouterLinkState::GetQueueState(
+    LinkSide side) const {
+  return {
+      .num_parcels = SelectBySide(side, num_parcels_on_a, num_parcels_on_b)
+                         .load(std::memory_order_relaxed),
+      .num_bytes = SelectBySide(side, num_bytes_on_a, num_bytes_on_b)
+                       .load(std::memory_order_relaxed),
+  };
+}
+
+bool RouterLinkState::UpdateQueueState(LinkSide side,
+                                       size_t num_parcels,
+                                       size_t num_bytes) {
+  StoreSaturated(SelectBySide(side, num_parcels_on_a, num_parcels_on_b),
+                 num_parcels);
+  StoreSaturated(SelectBySide(side, num_bytes_on_a, num_bytes_on_b), num_bytes);
+  const uint32_t other_side_monitoring_this_side =
+      SelectBySide(side, kSideBMonitoringSideA, kSideAMonitoringSideB);
+  return (status.load(std::memory_order_relaxed) &
+          other_side_monitoring_this_side) != 0;
+}
+
+bool RouterLinkState::SetSideIsMonitoringPeer(LinkSide side,
+                                              bool is_monitoring) {
+  const uint32_t monitoring_bit =
+      SelectBySide(side, kSideAMonitoringSideB, kSideBMonitoringSideA);
+  uint32_t expected = kStable;
+  while (!status.compare_exchange_weak(expected, expected | monitoring_bit,
+                                       std::memory_order_relaxed,
+                                       std::memory_order_relaxed)) {
+  }
+  return (expected & monitoring_bit) != 0;
+}
+
 }  // namespace ipcz
diff --git a/src/ipcz/router_link_state.h b/src/ipcz/router_link_state.h
index 1ccefda..0d1ae54 100644
--- a/src/ipcz/router_link_state.h
+++ b/src/ipcz/router_link_state.h
@@ -18,10 +18,11 @@
 
 // Structure shared between both Routers connected by RouterLink. This is used
 // to synchronously query and reflect the state of each Router to the other,
-// and ultimately to facilitate orderly state changes across the route.
-//
-// This may live in shared memory, where it should be managed as a
+// and ultimately to facilitate orderly state changes across the route. This
+// may live in shared memory, where it should be managed as a
 // RefCountedFragment.
+//
+// Note that RouterLinkStates are effectively only used by central links.
 struct IPCZ_ALIGN(8) RouterLinkState : public RefCountedFragment {
   RouterLinkState();
 
@@ -29,7 +30,7 @@
   static RouterLinkState& Initialize(void* where);
 
   // Link status which both sides atomically update to coordinate orderly proxy
-  // bypass and route closure propagation. Used only for central links.
+  // bypass, route closure propagation, and other operations.
   using Status = uint32_t;
 
   // Status constants follow.
@@ -61,6 +62,13 @@
   static constexpr Status kLockedBySideA = 1 << 4;
   static constexpr Status kLockedBySideB = 1 << 5;
 
+  // Set if the link on either side A or B wishes to be notified when parcels
+  // or parcel data are consumed by the other side. In practice these are only
+  // set when a router has a trap installed to monitor such conditions, which
+  // applications may leverage to e.g. implement a back-pressure mechanism.
+  static constexpr Status kSideAMonitoringSideB = 1 << 6;
+  static constexpr Status kSideBMonitoringSideA = 1 << 7;
+
   std::atomic<Status> status{kUnstable};
 
   // In a situation with three routers A-B-C and a central link between A and
@@ -70,8 +78,16 @@
   // validate that C is an appropriate source of such a bypass request.
   NodeName allowed_bypass_request_source;
 
-  // Reserved slots.
-  uint32_t reserved[10];
+  // These fields approximate the number of parcels and data bytes received and
+  // queued for retrieval on each side of this link. Values here are saturated
+  // if the actual values would exceed the max uint32_t value.
+  std::atomic<uint32_t> num_parcels_on_a{0};
+  std::atomic<uint32_t> num_bytes_on_a{0};
+  std::atomic<uint32_t> num_parcels_on_b{0};
+  std::atomic<uint32_t> num_bytes_on_b{0};
+
+  // More reserved slots, padding out this structure to 64 bytes.
+  uint32_t reserved1[6] = {0};
 
   bool is_locked_by(LinkSide side) const {
     Status s = status.load(std::memory_order_relaxed);
@@ -107,6 +123,25 @@
   // attempt was made to TryLock() from that side, while the other side was
   // still unstable.
   bool ResetWaitingBit(LinkSide side);
+
+  // Returns a snapshot of the inbound parcel queue state on the given side of
+  // this link.
+  struct QueueState {
+    uint32_t num_parcels;
+    uint32_t num_bytes;
+  };
+  QueueState GetQueueState(LinkSide side) const;
+
+  // Updates the queue state for the given side of this link. Values which
+  // exceed 2**32-1 are clamped to that value. Returns true if and only if the
+  // opposite side of the link wants to be notified about this update.
+  bool UpdateQueueState(LinkSide side, size_t num_parcels, size_t num_bytes);
+
+  // Sets an appropriate bit to indicate whether the router on the given side of
+  // this link should notify the opposite side after consuming inbound parcels
+  // or parcel data. Returns the previous value of the relevant bit, which may
+  // be the same as the old value.
+  bool SetSideIsMonitoringPeer(LinkSide side, bool is_monitoring);
 };
 
 // The size of this structure is fixed at 64 bytes to ensure that it fits the
diff --git a/src/ipcz/trap_set.cc b/src/ipcz/trap_set.cc
index 2d0d43d..2aacb02 100644
--- a/src/ipcz/trap_set.cc
+++ b/src/ipcz/trap_set.cc
@@ -52,6 +52,11 @@
   return event_flags;
 }
 
+bool NeedRemoteState(IpczTrapConditionFlags flags) {
+  return (flags & (IPCZ_TRAP_BELOW_MAX_REMOTE_PARCELS |
+                   IPCZ_TRAP_BELOW_MAX_REMOTE_BYTES)) != 0;
+}
+
 }  // namespace
 
 TrapSet::TrapSet() = default;
@@ -86,6 +91,9 @@
   }
 
   traps_.emplace_back(conditions, handler, context);
+  if (NeedRemoteState(conditions.flags)) {
+    ++num_traps_monitoring_remote_state_;
+  }
   return IPCZ_RESULT_OK;
 }
 
@@ -104,6 +112,9 @@
 
     dispatcher.DeferEvent(trap.handler, trap.context, flags, status);
     it = traps_.erase(it);
+    if (NeedRemoteState(flags)) {
+      --num_traps_monitoring_remote_state_;
+    }
   }
 }
 
@@ -113,6 +124,7 @@
                           last_known_status_);
   }
   traps_.clear();
+  num_traps_monitoring_remote_state_ = 0;
 }
 
 TrapSet::Trap::Trap(IpczTrapConditions conditions,
diff --git a/src/ipcz/trap_set.h b/src/ipcz/trap_set.h
index fe45641..671f0cd 100644
--- a/src/ipcz/trap_set.h
+++ b/src/ipcz/trap_set.h
@@ -33,6 +33,11 @@
     // A previously queued inbound parcel has been fully or partially retrieved
     // by the application.
     kLocalParcelConsumed,
+
+    // A remote peer has changed state in a way that may be interesting to a
+    // trap in the set; for example, parcels may have been consumed from the
+    // remote queue.
+    kRemoteActivity,
   };
 
   TrapSet();
@@ -44,6 +49,12 @@
 
   bool empty() const { return traps_.empty(); }
 
+  // Indicates whether any installed traps in this set require monitoring of
+  // remote queue state.
+  bool need_remote_state() const {
+    return num_traps_monitoring_remote_state_ > 0;
+  }
+
   // Attempts to install a new trap in the set. This effectively implements
   // the ipcz Trap() API. If `conditions` are already met, returns
   // IPCZ_RESULT_FAILED_PRECONDITION and populates `satisfied_condition_flags`
@@ -81,6 +92,7 @@
 
   using TrapList = absl::InlinedVector<Trap, 4>;
   TrapList traps_;
+  size_t num_traps_monitoring_remote_state_ = 0;
   IpczPortalStatus last_known_status_ = {.size = sizeof(last_known_status_)};
 };
 
diff --git a/src/queueing_test.cc b/src/queueing_test.cc
new file mode 100644
index 0000000..454602d
--- /dev/null
+++ b/src/queueing_test.cc
@@ -0,0 +1,110 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "ipcz/ipcz.h"
+#include "test/multinode_test.h"
+#include "testing/gtest/include/gtest/gtest.h"
+#include "third_party/abseil-cpp/absl/synchronization/notification.h"
+
+namespace ipcz {
+namespace {
+
+using QueueingTestNode = test::TestNode;
+using QueueingTest = test::MultinodeTest<QueueingTestNode>;
+
+MULTINODE_TEST_NODE(QueueingTestNode, RemoteQueueFeedbackClient) {
+  IpczHandle b = ConnectToBroker();
+
+  // Wait for the first parcel to arrive.
+  EXPECT_EQ(IPCZ_RESULT_OK,
+            WaitForConditions(b, {.flags = IPCZ_TRAP_ABOVE_MIN_LOCAL_PARCELS,
+                                  .min_local_parcels = 0}));
+
+  // Send and ack and wait for another parcel to arrive.
+  absl::Notification new_parcel_arrived;
+  EXPECT_EQ(IPCZ_RESULT_OK,
+            Trap(b, {.flags = IPCZ_TRAP_NEW_LOCAL_PARCEL},
+                 [&](const IpczTrapEvent&) { new_parcel_arrived.Notify(); }));
+  EXPECT_EQ(IPCZ_RESULT_OK, Put(b, "ok"));
+  new_parcel_arrived.WaitForNotification();
+
+  std::string data;
+  EXPECT_EQ(IPCZ_RESULT_OK, Get(b, &data));
+  EXPECT_EQ("1234", data);
+
+  std::string ack;
+  EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(b, &ack));
+  EXPECT_EQ("ok", ack);
+
+  EXPECT_EQ(IPCZ_RESULT_OK, WaitForConditionFlags(b, IPCZ_TRAP_PEER_CLOSED));
+  Close(b);
+}
+
+TEST_P(QueueingTest, RemoteQueueFeedback) {
+  // Exercises operations which rely on feedback from the remote peer regarding
+  // its inbound parcel queue state.
+  IpczHandle c = SpawnTestNode<RemoteQueueFeedbackClient>();
+
+  // This trap can only be set while the remote portal appears to be non-empty.
+  const IpczTrapConditions all_bytes_consumed = {
+      .size = sizeof(all_bytes_consumed),
+      .flags = IPCZ_TRAP_BELOW_MAX_REMOTE_BYTES,
+      .max_remote_bytes = 1,
+  };
+  EXPECT_EQ(IPCZ_RESULT_FAILED_PRECONDITION,
+            Trap(c, all_bytes_consumed, [&](const auto&) {}));
+
+  // Send 4 bytes and wait for acknowledgement that the parcel was received.
+  std::string ack;
+  EXPECT_EQ(IPCZ_RESULT_OK, Put(c, "1234"));
+  EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(c, &ack));
+  EXPECT_EQ("ok", ack);
+
+  // Now these operations should always fail due to the specified limits.
+  EXPECT_EQ(IPCZ_RESULT_RESOURCE_EXHAUSTED,
+            PutWithLimits(c, {.max_queued_parcels = 1}, "meh"));
+  EXPECT_EQ(IPCZ_RESULT_RESOURCE_EXHAUSTED,
+            PutWithLimits(c, {.max_queued_bytes = 4}, "?"));
+
+  // Now we should be able to install traps for both queued parcels and bytes on
+  // the remote side.
+  absl::Notification consumed_parcels;
+  const IpczTrapConditions all_parcels_consumed = {
+      .size = sizeof(all_parcels_consumed),
+      .flags = IPCZ_TRAP_BELOW_MAX_REMOTE_PARCELS,
+      .max_remote_parcels = 1,
+  };
+  EXPECT_EQ(
+      IPCZ_RESULT_OK,
+      Trap(c, all_parcels_consumed, [&](const IpczTrapEvent& event) {
+        EXPECT_TRUE(event.condition_flags & IPCZ_TRAP_BELOW_MAX_REMOTE_PARCELS);
+        consumed_parcels.Notify();
+      }));
+
+  absl::Notification consumed_bytes;
+  EXPECT_EQ(
+      IPCZ_RESULT_OK,
+      Trap(c, all_bytes_consumed, [&](const IpczTrapEvent& event) {
+        EXPECT_TRUE(event.condition_flags & IPCZ_TRAP_BELOW_MAX_REMOTE_BYTES);
+        consumed_bytes.Notify();
+      }));
+
+  // Ack back to the client so it will read its queue. Then we can wait for both
+  // traps to notify.
+  EXPECT_EQ(IPCZ_RESULT_OK, Put(c, "ok"));
+  consumed_parcels.WaitForNotification();
+  consumed_bytes.WaitForNotification();
+
+  // And now this Put operation should succeed.
+  EXPECT_EQ(IPCZ_RESULT_OK,
+            PutWithLimits(c, {.max_queued_parcels = 1, .max_queued_bytes = 4},
+                          "meh!"));
+
+  Close(c);
+}
+
+INSTANTIATE_MULTINODE_TEST_SUITE_P(QueueingTest);
+
+}  // namespace
+}  // namespace ipcz
diff --git a/src/test/test_base.cc b/src/test/test_base.cc
index de9e098..13c85cc 100644
--- a/src/test/test_base.cc
+++ b/src/test/test_base.cc
@@ -75,6 +75,20 @@
                     handles.size(), IPCZ_NO_FLAGS, nullptr);
 }
 
+IpczResult TestBase::PutWithLimits(IpczHandle portal,
+                                   const IpczPutLimits& limits,
+                                   std::string_view message,
+                                   absl::Span<IpczHandle> handles) {
+  IpczPutLimits sized_limits = limits;
+  sized_limits.size = sizeof(sized_limits);
+  const IpczPutOptions options = {
+      .size = sizeof(options),
+      .limits = &sized_limits,
+  };
+  return ipcz().Put(portal, message.data(), message.size(), handles.data(),
+                    handles.size(), IPCZ_NO_FLAGS, &options);
+}
+
 IpczResult TestBase::Get(IpczHandle portal,
                          std::string* message,
                          absl::Span<IpczHandle> handles) {
@@ -108,9 +122,14 @@
                           IpczPortalStatus* status) {
   auto handler = std::make_unique<TrapEventHandler>(std::move(fn));
   auto context = reinterpret_cast<uintptr_t>(handler.get());
+
+  // For convenience, set the `size` field correctly so callers don't have to.
+  IpczTrapConditions sized_conditions = conditions;
+  sized_conditions.size = sizeof(sized_conditions);
+
   const IpczResult result =
-      ipcz().Trap(portal, &conditions, &HandleEvent, context, IPCZ_NO_FLAGS,
-                  nullptr, flags, status);
+      ipcz().Trap(portal, &sized_conditions, &HandleEvent, context,
+                  IPCZ_NO_FLAGS, nullptr, flags, status);
   if (result == IPCZ_RESULT_OK) {
     std::ignore = handler.release();
   }
diff --git a/src/test/test_base.h b/src/test/test_base.h
index cdcad57..4786fbf 100644
--- a/src/test/test_base.h
+++ b/src/test/test_base.h
@@ -41,6 +41,10 @@
   IpczResult Put(IpczHandle portal,
                  std::string_view message,
                  absl::Span<IpczHandle> handles = {});
+  IpczResult PutWithLimits(IpczHandle portal,
+                           const IpczPutLimits& limits,
+                           std::string_view message,
+                           absl::Span<IpczHandle> handles = {});
 
   // Shorthand for ipcz Get() to retrieve the next available parcel from
   // `portal`.If no parcel is available, or any other condition prevents Get()
diff --git a/src/util/safe_math.h b/src/util/safe_math.h
index 15cbb8c..5affb98 100644
--- a/src/util/safe_math.h
+++ b/src/util/safe_math.h
@@ -38,6 +38,15 @@
   return result;
 }
 
+template <typename T>
+T SaturatedAdd(T a, T b) {
+  T result;
+  if (!__builtin_add_overflow(a, b, &result)) {
+    return result;
+  }
+  return std::numeric_limits<T>::max();
+}
+
 }  // namespace ipcz
 
 #endif  // IPCZ_SRC_UTIL_SAFE_MATH_