ipcz: Implement remote parcel queue accounting
For the common case of stable routes with a single central link, this
implements basic remote parcel queue accounting via RouterLink and
RouterLinkState.
This allows traps to monitor for consumption of parcels and parcel data
by a portal's remote peer, and it allows for applications to self-limit
outgoing transmissions in order to keep the remote queue within some
agreed-upon bounds.
Ihese features make it possible for Mojo data pipes to be reimplemented
on top of ipcz portals.
Bug: 1299283
Change-Id: I9e088f354d6bc9d616ba9b8c84946fb4c0dc8d88
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3803785
Commit-Queue: Ken Rockot <rockot@google.com>
Reviewed-by: Daniel Cheng <dcheng@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1033117}
NOKEYCHECK=True
GitOrigin-RevId: f03f83a73199622c1b6285c9291bbc31dffc6aa5
diff --git a/src/BUILD.gn b/src/BUILD.gn
index 8756a8d..f689d5b 100644
--- a/src/BUILD.gn
+++ b/src/BUILD.gn
@@ -381,6 +381,7 @@
"ipcz/route_edge_test.cc",
"ipcz/router_link_test.cc",
"ipcz/sequenced_queue_test.cc",
+ "queueing_test.cc",
"reference_drivers/sync_reference_driver_test.cc",
"remote_portal_test.cc",
"trap_test.cc",
diff --git a/src/ipcz/local_router_link.cc b/src/ipcz/local_router_link.cc
index 4922f81..9f01d8c 100644
--- a/src/ipcz/local_router_link.cc
+++ b/src/ipcz/local_router_link.cc
@@ -114,6 +114,27 @@
}
}
+size_t LocalRouterLink::GetParcelCapacityInBytes(const IpczPutLimits& limits) {
+ return state_->GetRouter(side_.opposite())->GetInboundCapacityInBytes(limits);
+}
+
+RouterLinkState::QueueState LocalRouterLink::GetPeerQueueState() {
+ return state_->link_state().GetQueueState(side_.opposite());
+}
+
+bool LocalRouterLink::UpdateInboundQueueState(size_t num_parcels,
+ size_t num_bytes) {
+ return state_->link_state().UpdateQueueState(side_, num_parcels, num_bytes);
+}
+
+void LocalRouterLink::NotifyDataConsumed() {
+ state_->GetRouter(side_.opposite())->NotifyPeerConsumedData();
+}
+
+bool LocalRouterLink::EnablePeerMonitoring(bool enable) {
+ return state_->link_state().SetSideIsMonitoringPeer(side_, enable);
+}
+
void LocalRouterLink::AcceptRouteDisconnected() {
if (Ref<Router> receiver = state_->GetRouter(side_.opposite())) {
receiver->AcceptRouteDisconnectedFrom(state_->type());
diff --git a/src/ipcz/local_router_link.h b/src/ipcz/local_router_link.h
index 77291e8..64385af 100644
--- a/src/ipcz/local_router_link.h
+++ b/src/ipcz/local_router_link.h
@@ -40,6 +40,11 @@
void AcceptParcel(Parcel& parcel) override;
void AcceptRouteClosure(SequenceNumber sequence_length) override;
void AcceptRouteDisconnected() override;
+ size_t GetParcelCapacityInBytes(const IpczPutLimits& limits) override;
+ RouterLinkState::QueueState GetPeerQueueState() override;
+ bool UpdateInboundQueueState(size_t num_parcels, size_t num_bytes) override;
+ void NotifyDataConsumed() override;
+ bool EnablePeerMonitoring(bool enable) override;
void MarkSideStable() override;
bool TryLockForBypass(const NodeName& bypass_request_source) override;
bool TryLockForClosure() override;
diff --git a/src/ipcz/node_link.cc b/src/ipcz/node_link.cc
index 1bf9a92..22904e0 100644
--- a/src/ipcz/node_link.cc
+++ b/src/ipcz/node_link.cc
@@ -608,6 +608,13 @@
sublink->router_link->GetType());
}
+bool NodeLink::OnNotifyDataConsumed(msg::NotifyDataConsumed& notify) {
+ if (Ref<Router> router = GetRouter(notify.params().sublink)) {
+ router->NotifyPeerConsumedData();
+ }
+ return true;
+}
+
bool NodeLink::OnBypassPeer(msg::BypassPeer& bypass) {
absl::optional<Sublink> sublink = GetSublink(bypass.params().sublink);
if (!sublink) {
diff --git a/src/ipcz/node_link.h b/src/ipcz/node_link.h
index f3b83cb..2671bdb 100644
--- a/src/ipcz/node_link.h
+++ b/src/ipcz/node_link.h
@@ -241,6 +241,7 @@
msg::AcceptParcelDriverObjects& accept) override;
bool OnRouteClosed(msg::RouteClosed& route_closed) override;
bool OnRouteDisconnected(msg::RouteDisconnected& route_disconnected) override;
+ bool OnNotifyDataConsumed(msg::NotifyDataConsumed& notify) override;
bool OnBypassPeer(msg::BypassPeer& bypass) override;
bool OnAcceptBypassLink(msg::AcceptBypassLink& accept) override;
bool OnStopProxying(msg::StopProxying& stop) override;
diff --git a/src/ipcz/node_messages_generator.h b/src/ipcz/node_messages_generator.h
index d3e8a94..ba36f9e 100644
--- a/src/ipcz/node_messages_generator.h
+++ b/src/ipcz/node_messages_generator.h
@@ -317,6 +317,13 @@
IPCZ_MSG_PARAM(SublinkId, sublink)
IPCZ_MSG_END()
+// Notifies a Router that the other side of its route has consumed some parcels
+// or parcel data from its inbound queue.
+IPCZ_MSG_BEGIN(NotifyDataConsumed, IPCZ_MSG_ID(24), IPCZ_MSG_VERSION(0))
+ // Identifies the router to receive this message.
+ IPCZ_MSG_PARAM(SublinkId, sublink)
+IPCZ_MSG_END()
+
// Informs a router that its outward peer can be bypassed. Given routers X and Y
// on the central link, and a router Z as Y's inward peer:
//
diff --git a/src/ipcz/portal.cc b/src/ipcz/portal.cc
index 4de1d7c..66c5fd4 100644
--- a/src/ipcz/portal.cc
+++ b/src/ipcz/portal.cc
@@ -80,6 +80,10 @@
return IPCZ_RESULT_NOT_FOUND;
}
+ if (limits && router_->GetOutboundCapacityInBytes(*limits) < data.size()) {
+ return IPCZ_RESULT_RESOURCE_EXHAUSTED;
+ }
+
Parcel parcel;
parcel.SetInlinedData(std::vector<uint8_t>(data.begin(), data.end()));
parcel.SetObjects(std::move(objects));
diff --git a/src/ipcz/remote_router_link.cc b/src/ipcz/remote_router_link.cc
index dd1ba21..0d6de61 100644
--- a/src/ipcz/remote_router_link.cc
+++ b/src/ipcz/remote_router_link.cc
@@ -239,6 +239,52 @@
node_link()->Transmit(route_closed);
}
+size_t RemoteRouterLink::GetParcelCapacityInBytes(const IpczPutLimits& limits) {
+ if (limits.max_queued_bytes == 0 || limits.max_queued_parcels == 0) {
+ return 0;
+ }
+
+ RouterLinkState* state = GetLinkState();
+ if (!state) {
+ // This is only a best-effort estimate. With no link state yet, err on the
+ // side of more data flow.
+ return limits.max_queued_bytes;
+ }
+
+ const RouterLinkState::QueueState peer_queue =
+ state->GetQueueState(side_.opposite());
+ if (peer_queue.num_parcels >= limits.max_queued_parcels ||
+ peer_queue.num_bytes >= limits.max_queued_bytes) {
+ return 0;
+ }
+
+ return limits.max_queued_bytes - peer_queue.num_bytes;
+}
+
+RouterLinkState::QueueState RemoteRouterLink::GetPeerQueueState() {
+ if (auto* state = GetLinkState()) {
+ return state->GetQueueState(side_.opposite());
+ }
+ return {.num_parcels = 0, .num_bytes = 0};
+}
+
+bool RemoteRouterLink::UpdateInboundQueueState(size_t num_parcels,
+ size_t num_bytes) {
+ RouterLinkState* state = GetLinkState();
+ return state && state->UpdateQueueState(side_, num_parcels, num_bytes);
+}
+
+void RemoteRouterLink::NotifyDataConsumed() {
+ msg::NotifyDataConsumed notify;
+ notify.params().sublink = sublink_;
+ node_link()->Transmit(notify);
+}
+
+bool RemoteRouterLink::EnablePeerMonitoring(bool enable) {
+ RouterLinkState* state = GetLinkState();
+ return state && state->SetSideIsMonitoringPeer(side_, enable);
+}
+
void RemoteRouterLink::AcceptRouteDisconnected() {
msg::RouteDisconnected route_disconnected;
route_disconnected.params().sublink = sublink_;
diff --git a/src/ipcz/remote_router_link.h b/src/ipcz/remote_router_link.h
index 98950ab..0685e28 100644
--- a/src/ipcz/remote_router_link.h
+++ b/src/ipcz/remote_router_link.h
@@ -56,6 +56,11 @@
void AcceptParcel(Parcel& parcel) override;
void AcceptRouteClosure(SequenceNumber sequence_length) override;
void AcceptRouteDisconnected() override;
+ size_t GetParcelCapacityInBytes(const IpczPutLimits& limits) override;
+ RouterLinkState::QueueState GetPeerQueueState() override;
+ bool UpdateInboundQueueState(size_t num_parcels, size_t num_bytes) override;
+ void NotifyDataConsumed() override;
+ bool EnablePeerMonitoring(bool enable) override;
void MarkSideStable() override;
bool TryLockForBypass(const NodeName& bypass_request_source) override;
bool TryLockForClosure() override;
diff --git a/src/ipcz/router.cc b/src/ipcz/router.cc
index a76a357..c69534f 100644
--- a/src/ipcz/router.cc
+++ b/src/ipcz/router.cc
@@ -20,6 +20,7 @@
#include "third_party/abseil-cpp/absl/synchronization/mutex.h"
#include "util/log.h"
#include "util/multi_mutex_lock.h"
+#include "util/safe_math.h"
namespace ipcz {
@@ -183,6 +184,49 @@
Flush(kForceProxyBypassAttempt);
}
+size_t Router::GetOutboundCapacityInBytes(const IpczPutLimits& limits) {
+ if (limits.max_queued_bytes == 0 || limits.max_queued_parcels == 0) {
+ return 0;
+ }
+
+ size_t num_queued_bytes = 0;
+ Ref<RouterLink> link;
+ {
+ absl::MutexLock lock(&mutex_);
+ if (outbound_parcels_.GetNumAvailableElements() >=
+ limits.max_queued_parcels) {
+ return 0;
+ }
+ if (outbound_parcels_.GetTotalAvailableElementSize() >
+ limits.max_queued_bytes) {
+ return 0;
+ }
+
+ num_queued_bytes = outbound_parcels_.GetTotalAvailableElementSize();
+ link = outward_edge_.primary_link();
+ }
+
+ size_t link_capacity =
+ link ? link->GetParcelCapacityInBytes(limits) : limits.max_queued_bytes;
+ if (link_capacity <= num_queued_bytes) {
+ return 0;
+ }
+
+ return link_capacity - num_queued_bytes;
+}
+
+size_t Router::GetInboundCapacityInBytes(const IpczPutLimits& limits) {
+ absl::MutexLock lock(&mutex_);
+ const size_t num_queued_parcels = inbound_parcels_.GetNumAvailableElements();
+ const size_t num_queued_bytes =
+ inbound_parcels_.GetTotalAvailableElementSize();
+ if (num_queued_bytes >= limits.max_queued_bytes ||
+ num_queued_parcels >= limits.max_queued_parcels) {
+ return 0;
+ }
+ return limits.max_queued_bytes - num_queued_bytes;
+}
+
bool Router::AcceptInboundParcel(Parcel& parcel) {
TrapEventDispatcher dispatcher;
{
@@ -200,6 +244,12 @@
status_.num_local_bytes = inbound_parcels_.GetTotalAvailableElementSize();
traps_.UpdatePortalStatus(status_, TrapSet::UpdateReason::kNewLocalParcel,
dispatcher);
+
+ const Ref<RouterLink>& outward_link = outward_edge_.primary_link();
+ if (outward_link && outward_link->GetType().is_central()) {
+ outward_link->UpdateInboundQueueState(status_.num_local_parcels,
+ status_.num_local_bytes);
+ }
}
}
@@ -312,54 +362,91 @@
return true;
}
+void Router::NotifyPeerConsumedData() {
+ TrapEventDispatcher dispatcher;
+ {
+ absl::MutexLock lock(&mutex_);
+ const Ref<RouterLink>& outward_link = outward_edge_.primary_link();
+ if (!outward_link || !outward_link->GetType().is_central() ||
+ inward_edge_) {
+ return;
+ }
+
+ const RouterLinkState::QueueState peer_state =
+ outward_link->GetPeerQueueState();
+ status_.num_remote_parcels = peer_state.num_parcels;
+ status_.num_remote_bytes = peer_state.num_bytes;
+ traps_.UpdatePortalStatus(status_, TrapSet::UpdateReason::kRemoteActivity,
+ dispatcher);
+
+ if (!traps_.need_remote_state()) {
+ outward_link->EnablePeerMonitoring(false);
+ }
+ }
+}
+
IpczResult Router::GetNextInboundParcel(IpczGetFlags flags,
void* data,
size_t* num_bytes,
IpczHandle* handles,
size_t* num_handles) {
TrapEventDispatcher dispatcher;
- absl::MutexLock lock(&mutex_);
- if (inbound_parcels_.IsSequenceFullyConsumed()) {
- return IPCZ_RESULT_NOT_FOUND;
- }
- if (!inbound_parcels_.HasNextElement()) {
- return IPCZ_RESULT_UNAVAILABLE;
+ Ref<RouterLink> link_to_notify;
+ {
+ absl::MutexLock lock(&mutex_);
+ if (inbound_parcels_.IsSequenceFullyConsumed()) {
+ return IPCZ_RESULT_NOT_FOUND;
+ }
+ if (!inbound_parcels_.HasNextElement()) {
+ return IPCZ_RESULT_UNAVAILABLE;
+ }
+
+ Parcel& p = inbound_parcels_.NextElement();
+ const bool allow_partial = (flags & IPCZ_GET_PARTIAL) != 0;
+ const size_t data_capacity = num_bytes ? *num_bytes : 0;
+ const size_t handles_capacity = num_handles ? *num_handles : 0;
+ const size_t data_size =
+ allow_partial ? std::min(p.data_size(), data_capacity) : p.data_size();
+ const size_t handles_size =
+ allow_partial ? std::min(p.num_objects(), handles_capacity)
+ : p.num_objects();
+ if (num_bytes) {
+ *num_bytes = data_size;
+ }
+ if (num_handles) {
+ *num_handles = handles_size;
+ }
+
+ const bool consuming_whole_parcel =
+ data_capacity >= data_size && handles_capacity >= handles_size;
+ if (!consuming_whole_parcel && !allow_partial) {
+ return IPCZ_RESULT_RESOURCE_EXHAUSTED;
+ }
+
+ memcpy(data, p.data_view().data(), data_size);
+ const bool ok = inbound_parcels_.Consume(
+ data_size, absl::MakeSpan(handles, handles_size));
+ ABSL_ASSERT(ok);
+
+ status_.num_local_parcels = inbound_parcels_.GetNumAvailableElements();
+ status_.num_local_bytes = inbound_parcels_.GetTotalAvailableElementSize();
+ if (inbound_parcels_.IsSequenceFullyConsumed()) {
+ status_.flags |= IPCZ_PORTAL_STATUS_DEAD;
+ }
+ traps_.UpdatePortalStatus(
+ status_, TrapSet::UpdateReason::kLocalParcelConsumed, dispatcher);
+
+ const Ref<RouterLink>& outward_link = outward_edge_.primary_link();
+ if (outward_link && outward_link->GetType().is_central() &&
+ outward_link->UpdateInboundQueueState(status_.num_local_parcels,
+ status_.num_local_bytes)) {
+ link_to_notify = outward_link;
+ }
}
- Parcel& p = inbound_parcels_.NextElement();
- const bool allow_partial = (flags & IPCZ_GET_PARTIAL) != 0;
- const size_t data_capacity = num_bytes ? *num_bytes : 0;
- const size_t handles_capacity = num_handles ? *num_handles : 0;
- const size_t data_size =
- allow_partial ? std::min(p.data_size(), data_capacity) : p.data_size();
- const size_t handles_size = allow_partial
- ? std::min(p.num_objects(), handles_capacity)
- : p.num_objects();
- if (num_bytes) {
- *num_bytes = data_size;
+ if (link_to_notify) {
+ link_to_notify->NotifyDataConsumed();
}
- if (num_handles) {
- *num_handles = handles_size;
- }
-
- const bool consuming_whole_parcel =
- data_capacity >= data_size && handles_capacity >= handles_size;
- if (!consuming_whole_parcel && !allow_partial) {
- return IPCZ_RESULT_RESOURCE_EXHAUSTED;
- }
-
- memcpy(data, p.data_view().data(), data_size);
- const bool ok = inbound_parcels_.Consume(
- data_size, absl::MakeSpan(handles, handles_size));
- ABSL_ASSERT(ok);
-
- status_.num_local_parcels = inbound_parcels_.GetNumAvailableElements();
- status_.num_local_bytes = inbound_parcels_.GetTotalAvailableElementSize();
- if (inbound_parcels_.IsSequenceFullyConsumed()) {
- status_.flags |= IPCZ_PORTAL_STATUS_DEAD;
- }
- traps_.UpdatePortalStatus(
- status_, TrapSet::UpdateReason::kLocalParcelConsumed, dispatcher);
return IPCZ_RESULT_OK;
}
@@ -368,9 +455,46 @@
uint64_t context,
IpczTrapConditionFlags* satisfied_condition_flags,
IpczPortalStatus* status) {
- absl::MutexLock lock(&mutex_);
- return traps_.Add(conditions, handler, context, status_,
- satisfied_condition_flags, status);
+ const bool need_remote_state =
+ (conditions.flags & (IPCZ_TRAP_BELOW_MAX_REMOTE_PARCELS |
+ IPCZ_TRAP_BELOW_MAX_REMOTE_BYTES)) != 0;
+ {
+ absl::MutexLock lock(&mutex_);
+ const Ref<RouterLink>& outward_link = outward_edge_.primary_link();
+ if (need_remote_state) {
+ status_.num_remote_parcels = outbound_parcels_.GetNumAvailableElements();
+ status_.num_remote_bytes =
+ outbound_parcels_.GetTotalAvailableElementSize();
+
+ if (outward_link && outward_link->GetType().is_central()) {
+ const RouterLinkState::QueueState peer_state =
+ outward_link->GetPeerQueueState();
+ status_.num_remote_parcels =
+ SaturatedAdd(status_.num_remote_parcels,
+ static_cast<size_t>(peer_state.num_parcels));
+ status_.num_remote_bytes =
+ SaturatedAdd(status_.num_remote_bytes,
+ static_cast<size_t>(peer_state.num_bytes));
+ }
+ }
+
+ const bool already_monitoring_remote_state = traps_.need_remote_state();
+ IpczResult result = traps_.Add(conditions, handler, context, status_,
+ satisfied_condition_flags, status);
+ if (result != IPCZ_RESULT_OK || !need_remote_state) {
+ return result;
+ }
+
+ if (!already_monitoring_remote_state) {
+ outward_link->EnablePeerMonitoring(true);
+ }
+ }
+
+ // Safeguard against races between remote state changes and the new trap being
+ // installed above. Only reached if the new trap monitors remote state.
+ ABSL_ASSERT(need_remote_state);
+ NotifyPeerConsumedData();
+ return IPCZ_RESULT_OK;
}
// static
diff --git a/src/ipcz/router.h b/src/ipcz/router.h
index 5ffd361..4e3c5c3 100644
--- a/src/ipcz/router.h
+++ b/src/ipcz/router.h
@@ -96,6 +96,16 @@
// within this call.
void SetOutwardLink(Ref<RouterLink> link);
+ // Returns a best-effort estimation of the maximum parcel size (in bytes) that
+ // can be sent outward from this router without the receiving portal exceeding
+ // any of the specified `limits`.
+ size_t GetOutboundCapacityInBytes(const IpczPutLimits& limits);
+
+ // Returns the maximum inbound parcel size (in bytes) that can be accepted by
+ // this router's inbound parcel queue without that queue exceeding any of the
+ // specified `limits`.
+ size_t GetInboundCapacityInBytes(const IpczPutLimits& limits);
+
// Accepts an inbound parcel from the outward edge of this router, either to
// queue it for retrieval or forward it further inward.
bool AcceptInboundParcel(Parcel& parcel);
@@ -111,6 +121,10 @@
bool AcceptRouteClosureFrom(LinkType link_type,
SequenceNumber sequence_length);
+ // Informs this router that its outward peer consumed some inbound parcels or
+ // parcel data.
+ void NotifyPeerConsumedData();
+
// Accepts notification from a link bound to this Router that some node along
// the route (in the direction of that link) has been disconnected, e.g. due
// to a crash, and that the route is no longer functional as a result. This is
diff --git a/src/ipcz/router_link.h b/src/ipcz/router_link.h
index b6fa153..66cfd94 100644
--- a/src/ipcz/router_link.h
+++ b/src/ipcz/router_link.h
@@ -8,6 +8,7 @@
#include "ipcz/fragment_ref.h"
#include "ipcz/link_type.h"
#include "ipcz/node_name.h"
+#include "ipcz/router_link_state.h"
#include "ipcz/sequence_number.h"
#include "ipcz/sublink_id.h"
#include "util/ref_counted.h"
@@ -18,7 +19,6 @@
class Parcel;
class RemoteRouterLink;
class Router;
-struct RouterLinkState;
// A RouterLink represents one endpoint of a link between two Routers. All
// subclasses must be thread-safe.
@@ -62,6 +62,31 @@
// delivery of any further parcels.
virtual void AcceptRouteDisconnected() = 0;
+ // Returns a best-effort estimation of how much new parcel data can be
+ // transmitted across the link before one or more limits described by `limits`
+ // would be exceeded on the receiving portal.
+ virtual size_t GetParcelCapacityInBytes(const IpczPutLimits& limits) = 0;
+
+ // Returns a best-effort snapshot of the last known state of the inbound
+ // parcel queue on the other side of this link. This is only meaningful for
+ // central links.
+ virtual RouterLinkState::QueueState GetPeerQueueState() = 0;
+
+ // Updates the QueueState for this side of the link, returning true if and
+ // only if the other side wants to be notified about the update.
+ virtual bool UpdateInboundQueueState(size_t num_parcels,
+ size_t num_bytes) = 0;
+
+ // Notifies the other side that this side has consumed some parcels or parcel
+ // data from its inbound queue. Should only be called on central links when
+ // the other side has expressed interest in such notifications.
+ virtual void NotifyDataConsumed() = 0;
+
+ // Controls whether the caller's side of the link is interested in being
+ // notified about data consumption on the opposite side of the link. Returns
+ // the previous value of this bit.
+ virtual bool EnablePeerMonitoring(bool enable) = 0;
+
// Signals that this side of the link is in a stable state suitable for one
// side or the other to lock the link, either for bypass or closure
// propagation. Only once both sides are marked stable can either side lock
diff --git a/src/ipcz/router_link_state.cc b/src/ipcz/router_link_state.cc
index 774104b..6fdb059 100644
--- a/src/ipcz/router_link_state.cc
+++ b/src/ipcz/router_link_state.cc
@@ -10,13 +10,33 @@
namespace ipcz {
+namespace {
+
+template <typename T, typename U>
+void StoreSaturated(std::atomic<T>& dest, U value) {
+ if (value < std::numeric_limits<T>::max()) {
+ dest.store(value, std::memory_order_relaxed);
+ } else {
+ dest.store(std::numeric_limits<T>::max(), std::memory_order_relaxed);
+ }
+}
+
+template <typename T>
+T& SelectBySide(LinkSide side, T& for_a, T& for_b) {
+ if (side.is_side_a()) {
+ return for_a;
+ }
+ return for_b;
+}
+
+} // namespace
+
RouterLinkState::RouterLinkState() = default;
// static
RouterLinkState& RouterLinkState::Initialize(void* where) {
auto& state = *static_cast<RouterLinkState*>(where);
new (&state) RouterLinkState();
- memset(state.reserved, 0, sizeof(state.reserved));
std::atomic_thread_fence(std::memory_order_release);
return state;
}
@@ -104,4 +124,38 @@
return true;
}
+RouterLinkState::QueueState RouterLinkState::GetQueueState(
+ LinkSide side) const {
+ return {
+ .num_parcels = SelectBySide(side, num_parcels_on_a, num_parcels_on_b)
+ .load(std::memory_order_relaxed),
+ .num_bytes = SelectBySide(side, num_bytes_on_a, num_bytes_on_b)
+ .load(std::memory_order_relaxed),
+ };
+}
+
+bool RouterLinkState::UpdateQueueState(LinkSide side,
+ size_t num_parcels,
+ size_t num_bytes) {
+ StoreSaturated(SelectBySide(side, num_parcels_on_a, num_parcels_on_b),
+ num_parcels);
+ StoreSaturated(SelectBySide(side, num_bytes_on_a, num_bytes_on_b), num_bytes);
+ const uint32_t other_side_monitoring_this_side =
+ SelectBySide(side, kSideBMonitoringSideA, kSideAMonitoringSideB);
+ return (status.load(std::memory_order_relaxed) &
+ other_side_monitoring_this_side) != 0;
+}
+
+bool RouterLinkState::SetSideIsMonitoringPeer(LinkSide side,
+ bool is_monitoring) {
+ const uint32_t monitoring_bit =
+ SelectBySide(side, kSideAMonitoringSideB, kSideBMonitoringSideA);
+ uint32_t expected = kStable;
+ while (!status.compare_exchange_weak(expected, expected | monitoring_bit,
+ std::memory_order_relaxed,
+ std::memory_order_relaxed)) {
+ }
+ return (expected & monitoring_bit) != 0;
+}
+
} // namespace ipcz
diff --git a/src/ipcz/router_link_state.h b/src/ipcz/router_link_state.h
index 1ccefda..0d1ae54 100644
--- a/src/ipcz/router_link_state.h
+++ b/src/ipcz/router_link_state.h
@@ -18,10 +18,11 @@
// Structure shared between both Routers connected by RouterLink. This is used
// to synchronously query and reflect the state of each Router to the other,
-// and ultimately to facilitate orderly state changes across the route.
-//
-// This may live in shared memory, where it should be managed as a
+// and ultimately to facilitate orderly state changes across the route. This
+// may live in shared memory, where it should be managed as a
// RefCountedFragment.
+//
+// Note that RouterLinkStates are effectively only used by central links.
struct IPCZ_ALIGN(8) RouterLinkState : public RefCountedFragment {
RouterLinkState();
@@ -29,7 +30,7 @@
static RouterLinkState& Initialize(void* where);
// Link status which both sides atomically update to coordinate orderly proxy
- // bypass and route closure propagation. Used only for central links.
+ // bypass, route closure propagation, and other operations.
using Status = uint32_t;
// Status constants follow.
@@ -61,6 +62,13 @@
static constexpr Status kLockedBySideA = 1 << 4;
static constexpr Status kLockedBySideB = 1 << 5;
+ // Set if the link on either side A or B wishes to be notified when parcels
+ // or parcel data are consumed by the other side. In practice these are only
+ // set when a router has a trap installed to monitor such conditions, which
+ // applications may leverage to e.g. implement a back-pressure mechanism.
+ static constexpr Status kSideAMonitoringSideB = 1 << 6;
+ static constexpr Status kSideBMonitoringSideA = 1 << 7;
+
std::atomic<Status> status{kUnstable};
// In a situation with three routers A-B-C and a central link between A and
@@ -70,8 +78,16 @@
// validate that C is an appropriate source of such a bypass request.
NodeName allowed_bypass_request_source;
- // Reserved slots.
- uint32_t reserved[10];
+ // These fields approximate the number of parcels and data bytes received and
+ // queued for retrieval on each side of this link. Values here are saturated
+ // if the actual values would exceed the max uint32_t value.
+ std::atomic<uint32_t> num_parcels_on_a{0};
+ std::atomic<uint32_t> num_bytes_on_a{0};
+ std::atomic<uint32_t> num_parcels_on_b{0};
+ std::atomic<uint32_t> num_bytes_on_b{0};
+
+ // More reserved slots, padding out this structure to 64 bytes.
+ uint32_t reserved1[6] = {0};
bool is_locked_by(LinkSide side) const {
Status s = status.load(std::memory_order_relaxed);
@@ -107,6 +123,25 @@
// attempt was made to TryLock() from that side, while the other side was
// still unstable.
bool ResetWaitingBit(LinkSide side);
+
+ // Returns a snapshot of the inbound parcel queue state on the given side of
+ // this link.
+ struct QueueState {
+ uint32_t num_parcels;
+ uint32_t num_bytes;
+ };
+ QueueState GetQueueState(LinkSide side) const;
+
+ // Updates the queue state for the given side of this link. Values which
+ // exceed 2**32-1 are clamped to that value. Returns true if and only if the
+ // opposite side of the link wants to be notified about this update.
+ bool UpdateQueueState(LinkSide side, size_t num_parcels, size_t num_bytes);
+
+ // Sets an appropriate bit to indicate whether the router on the given side of
+ // this link should notify the opposite side after consuming inbound parcels
+ // or parcel data. Returns the previous value of the relevant bit, which may
+ // be the same as the old value.
+ bool SetSideIsMonitoringPeer(LinkSide side, bool is_monitoring);
};
// The size of this structure is fixed at 64 bytes to ensure that it fits the
diff --git a/src/ipcz/trap_set.cc b/src/ipcz/trap_set.cc
index 2d0d43d..2aacb02 100644
--- a/src/ipcz/trap_set.cc
+++ b/src/ipcz/trap_set.cc
@@ -52,6 +52,11 @@
return event_flags;
}
+bool NeedRemoteState(IpczTrapConditionFlags flags) {
+ return (flags & (IPCZ_TRAP_BELOW_MAX_REMOTE_PARCELS |
+ IPCZ_TRAP_BELOW_MAX_REMOTE_BYTES)) != 0;
+}
+
} // namespace
TrapSet::TrapSet() = default;
@@ -86,6 +91,9 @@
}
traps_.emplace_back(conditions, handler, context);
+ if (NeedRemoteState(conditions.flags)) {
+ ++num_traps_monitoring_remote_state_;
+ }
return IPCZ_RESULT_OK;
}
@@ -104,6 +112,9 @@
dispatcher.DeferEvent(trap.handler, trap.context, flags, status);
it = traps_.erase(it);
+ if (NeedRemoteState(flags)) {
+ --num_traps_monitoring_remote_state_;
+ }
}
}
@@ -113,6 +124,7 @@
last_known_status_);
}
traps_.clear();
+ num_traps_monitoring_remote_state_ = 0;
}
TrapSet::Trap::Trap(IpczTrapConditions conditions,
diff --git a/src/ipcz/trap_set.h b/src/ipcz/trap_set.h
index fe45641..671f0cd 100644
--- a/src/ipcz/trap_set.h
+++ b/src/ipcz/trap_set.h
@@ -33,6 +33,11 @@
// A previously queued inbound parcel has been fully or partially retrieved
// by the application.
kLocalParcelConsumed,
+
+ // A remote peer has changed state in a way that may be interesting to a
+ // trap in the set; for example, parcels may have been consumed from the
+ // remote queue.
+ kRemoteActivity,
};
TrapSet();
@@ -44,6 +49,12 @@
bool empty() const { return traps_.empty(); }
+ // Indicates whether any installed traps in this set require monitoring of
+ // remote queue state.
+ bool need_remote_state() const {
+ return num_traps_monitoring_remote_state_ > 0;
+ }
+
// Attempts to install a new trap in the set. This effectively implements
// the ipcz Trap() API. If `conditions` are already met, returns
// IPCZ_RESULT_FAILED_PRECONDITION and populates `satisfied_condition_flags`
@@ -81,6 +92,7 @@
using TrapList = absl::InlinedVector<Trap, 4>;
TrapList traps_;
+ size_t num_traps_monitoring_remote_state_ = 0;
IpczPortalStatus last_known_status_ = {.size = sizeof(last_known_status_)};
};
diff --git a/src/queueing_test.cc b/src/queueing_test.cc
new file mode 100644
index 0000000..454602d
--- /dev/null
+++ b/src/queueing_test.cc
@@ -0,0 +1,110 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "ipcz/ipcz.h"
+#include "test/multinode_test.h"
+#include "testing/gtest/include/gtest/gtest.h"
+#include "third_party/abseil-cpp/absl/synchronization/notification.h"
+
+namespace ipcz {
+namespace {
+
+using QueueingTestNode = test::TestNode;
+using QueueingTest = test::MultinodeTest<QueueingTestNode>;
+
+MULTINODE_TEST_NODE(QueueingTestNode, RemoteQueueFeedbackClient) {
+ IpczHandle b = ConnectToBroker();
+
+ // Wait for the first parcel to arrive.
+ EXPECT_EQ(IPCZ_RESULT_OK,
+ WaitForConditions(b, {.flags = IPCZ_TRAP_ABOVE_MIN_LOCAL_PARCELS,
+ .min_local_parcels = 0}));
+
+ // Send and ack and wait for another parcel to arrive.
+ absl::Notification new_parcel_arrived;
+ EXPECT_EQ(IPCZ_RESULT_OK,
+ Trap(b, {.flags = IPCZ_TRAP_NEW_LOCAL_PARCEL},
+ [&](const IpczTrapEvent&) { new_parcel_arrived.Notify(); }));
+ EXPECT_EQ(IPCZ_RESULT_OK, Put(b, "ok"));
+ new_parcel_arrived.WaitForNotification();
+
+ std::string data;
+ EXPECT_EQ(IPCZ_RESULT_OK, Get(b, &data));
+ EXPECT_EQ("1234", data);
+
+ std::string ack;
+ EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(b, &ack));
+ EXPECT_EQ("ok", ack);
+
+ EXPECT_EQ(IPCZ_RESULT_OK, WaitForConditionFlags(b, IPCZ_TRAP_PEER_CLOSED));
+ Close(b);
+}
+
+TEST_P(QueueingTest, RemoteQueueFeedback) {
+ // Exercises operations which rely on feedback from the remote peer regarding
+ // its inbound parcel queue state.
+ IpczHandle c = SpawnTestNode<RemoteQueueFeedbackClient>();
+
+ // This trap can only be set while the remote portal appears to be non-empty.
+ const IpczTrapConditions all_bytes_consumed = {
+ .size = sizeof(all_bytes_consumed),
+ .flags = IPCZ_TRAP_BELOW_MAX_REMOTE_BYTES,
+ .max_remote_bytes = 1,
+ };
+ EXPECT_EQ(IPCZ_RESULT_FAILED_PRECONDITION,
+ Trap(c, all_bytes_consumed, [&](const auto&) {}));
+
+ // Send 4 bytes and wait for acknowledgement that the parcel was received.
+ std::string ack;
+ EXPECT_EQ(IPCZ_RESULT_OK, Put(c, "1234"));
+ EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(c, &ack));
+ EXPECT_EQ("ok", ack);
+
+ // Now these operations should always fail due to the specified limits.
+ EXPECT_EQ(IPCZ_RESULT_RESOURCE_EXHAUSTED,
+ PutWithLimits(c, {.max_queued_parcels = 1}, "meh"));
+ EXPECT_EQ(IPCZ_RESULT_RESOURCE_EXHAUSTED,
+ PutWithLimits(c, {.max_queued_bytes = 4}, "?"));
+
+ // Now we should be able to install traps for both queued parcels and bytes on
+ // the remote side.
+ absl::Notification consumed_parcels;
+ const IpczTrapConditions all_parcels_consumed = {
+ .size = sizeof(all_parcels_consumed),
+ .flags = IPCZ_TRAP_BELOW_MAX_REMOTE_PARCELS,
+ .max_remote_parcels = 1,
+ };
+ EXPECT_EQ(
+ IPCZ_RESULT_OK,
+ Trap(c, all_parcels_consumed, [&](const IpczTrapEvent& event) {
+ EXPECT_TRUE(event.condition_flags & IPCZ_TRAP_BELOW_MAX_REMOTE_PARCELS);
+ consumed_parcels.Notify();
+ }));
+
+ absl::Notification consumed_bytes;
+ EXPECT_EQ(
+ IPCZ_RESULT_OK,
+ Trap(c, all_bytes_consumed, [&](const IpczTrapEvent& event) {
+ EXPECT_TRUE(event.condition_flags & IPCZ_TRAP_BELOW_MAX_REMOTE_BYTES);
+ consumed_bytes.Notify();
+ }));
+
+ // Ack back to the client so it will read its queue. Then we can wait for both
+ // traps to notify.
+ EXPECT_EQ(IPCZ_RESULT_OK, Put(c, "ok"));
+ consumed_parcels.WaitForNotification();
+ consumed_bytes.WaitForNotification();
+
+ // And now this Put operation should succeed.
+ EXPECT_EQ(IPCZ_RESULT_OK,
+ PutWithLimits(c, {.max_queued_parcels = 1, .max_queued_bytes = 4},
+ "meh!"));
+
+ Close(c);
+}
+
+INSTANTIATE_MULTINODE_TEST_SUITE_P(QueueingTest);
+
+} // namespace
+} // namespace ipcz
diff --git a/src/test/test_base.cc b/src/test/test_base.cc
index de9e098..13c85cc 100644
--- a/src/test/test_base.cc
+++ b/src/test/test_base.cc
@@ -75,6 +75,20 @@
handles.size(), IPCZ_NO_FLAGS, nullptr);
}
+IpczResult TestBase::PutWithLimits(IpczHandle portal,
+ const IpczPutLimits& limits,
+ std::string_view message,
+ absl::Span<IpczHandle> handles) {
+ IpczPutLimits sized_limits = limits;
+ sized_limits.size = sizeof(sized_limits);
+ const IpczPutOptions options = {
+ .size = sizeof(options),
+ .limits = &sized_limits,
+ };
+ return ipcz().Put(portal, message.data(), message.size(), handles.data(),
+ handles.size(), IPCZ_NO_FLAGS, &options);
+}
+
IpczResult TestBase::Get(IpczHandle portal,
std::string* message,
absl::Span<IpczHandle> handles) {
@@ -108,9 +122,14 @@
IpczPortalStatus* status) {
auto handler = std::make_unique<TrapEventHandler>(std::move(fn));
auto context = reinterpret_cast<uintptr_t>(handler.get());
+
+ // For convenience, set the `size` field correctly so callers don't have to.
+ IpczTrapConditions sized_conditions = conditions;
+ sized_conditions.size = sizeof(sized_conditions);
+
const IpczResult result =
- ipcz().Trap(portal, &conditions, &HandleEvent, context, IPCZ_NO_FLAGS,
- nullptr, flags, status);
+ ipcz().Trap(portal, &sized_conditions, &HandleEvent, context,
+ IPCZ_NO_FLAGS, nullptr, flags, status);
if (result == IPCZ_RESULT_OK) {
std::ignore = handler.release();
}
diff --git a/src/test/test_base.h b/src/test/test_base.h
index cdcad57..4786fbf 100644
--- a/src/test/test_base.h
+++ b/src/test/test_base.h
@@ -41,6 +41,10 @@
IpczResult Put(IpczHandle portal,
std::string_view message,
absl::Span<IpczHandle> handles = {});
+ IpczResult PutWithLimits(IpczHandle portal,
+ const IpczPutLimits& limits,
+ std::string_view message,
+ absl::Span<IpczHandle> handles = {});
// Shorthand for ipcz Get() to retrieve the next available parcel from
// `portal`.If no parcel is available, or any other condition prevents Get()
diff --git a/src/util/safe_math.h b/src/util/safe_math.h
index 15cbb8c..5affb98 100644
--- a/src/util/safe_math.h
+++ b/src/util/safe_math.h
@@ -38,6 +38,15 @@
return result;
}
+template <typename T>
+T SaturatedAdd(T a, T b) {
+ T result;
+ if (!__builtin_add_overflow(a, b, &result)) {
+ return result;
+ }
+ return std::numeric_limits<T>::max();
+}
+
} // namespace ipcz
#endif // IPCZ_SRC_UTIL_SAFE_MATH_