ipcz: Remove remote queue state monitoring
This feature was required by the original MojoIpcz data pipe
implementation, but it's no longer required. Since it impacts
baseline ipcz performance, remove it for now.
Hypothetical API features (IPCZ_TRAP_*REMOTE* condition flags)
are left in place since we can still provide an implementation
after ipcz launch.
Bug: 1299283
Fixed: 1383754
Change-Id: Ifc08130f8ffd8b3fd02a8fcc2a5c31da90034629
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/4022090
Reviewed-by: Robert Sesek <rsesek@chromium.org>
Commit-Queue: Ken Rockot <rockot@google.com>
Cr-Commit-Position: refs/heads/main@{#1071995}
NOKEYCHECK=True
GitOrigin-RevId: 60a93ab7a4e966cd9ccbb3e17b67718cd5b8e61f
diff --git a/src/BUILD.gn b/src/BUILD.gn
index a8fe1da..7763017 100644
--- a/src/BUILD.gn
+++ b/src/BUILD.gn
@@ -247,8 +247,6 @@
]
sources = [
"ipcz/api_object.cc",
- "ipcz/atomic_queue_state.cc",
- "ipcz/atomic_queue_state.h",
"ipcz/block_allocator.cc",
"ipcz/block_allocator_pool.cc",
"ipcz/block_allocator_pool.h",
@@ -275,7 +273,6 @@
"ipcz/message_macros/message_params_declaration_macros.h",
"ipcz/message_macros/message_params_declaration_macros.h",
"ipcz/message_macros/undef_message_macros.h",
- "ipcz/monitored_atomic.h",
"ipcz/node.cc",
"ipcz/node_connector.cc",
"ipcz/node_link.cc",
diff --git a/src/ipcz/atomic_queue_state.cc b/src/ipcz/atomic_queue_state.cc
deleted file mode 100644
index d2f2dc9..0000000
--- a/src/ipcz/atomic_queue_state.cc
+++ /dev/null
@@ -1,38 +0,0 @@
-// Copyright 2022 The Chromium Authors
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "ipcz/atomic_queue_state.h"
-
-#include <cstdint>
-
-#include "ipcz/monitored_atomic.h"
-#include "third_party/abseil-cpp/absl/base/macros.h"
-
-namespace ipcz {
-
-AtomicQueueState::AtomicQueueState() noexcept = default;
-
-AtomicQueueState::QueryResult AtomicQueueState::Query(
- const MonitorSelection& monitors) {
- return {
- .num_parcels_consumed =
- num_parcels_consumed_.Query({.monitor = monitors.monitor_parcels}),
- .num_bytes_consumed =
- num_bytes_consumed_.Query({.monitor = monitors.monitor_bytes}),
- };
-}
-
-bool AtomicQueueState::Update(const UpdateValue& value) {
- ABSL_ASSERT(value.num_parcels_consumed <=
- MonitoredAtomic<uint64_t>::kMaxValue);
- ABSL_ASSERT(value.num_bytes_consumed <= MonitoredAtomic<uint64_t>::kMaxValue);
- const bool parcels_were_monitored =
- num_parcels_consumed_.UpdateValueAndResetMonitor(
- value.num_parcels_consumed);
- const bool bytes_were_monitored =
- num_bytes_consumed_.UpdateValueAndResetMonitor(value.num_bytes_consumed);
- return parcels_were_monitored || bytes_were_monitored;
-}
-
-} // namespace ipcz
diff --git a/src/ipcz/atomic_queue_state.h b/src/ipcz/atomic_queue_state.h
deleted file mode 100644
index bb7401f..0000000
--- a/src/ipcz/atomic_queue_state.h
+++ /dev/null
@@ -1,75 +0,0 @@
-// Copyright 2022 The Chromium Authors
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef IPCZ_SRC_IPCZ_ATOMIC_QUEUE_STATE_
-#define IPCZ_SRC_IPCZ_ATOMIC_QUEUE_STATE_
-
-#include <cstdint>
-#include <type_traits>
-
-#include "ipcz/monitored_atomic.h"
-
-namespace ipcz {
-
-// AtomicQueueState holds some trivial data about how much of a router's inbound
-// parcel sequence has been consumed so far.
-//
-// Note that the fields herein are not strictly synchronized. If a queue
-// accumulates a 4k parcel and an 8k parcel which are both then consumed by the
-// application, the remote sender may observe `num_parcels_consumed` at 0, then
-// 1, then 2; and they may observe `num_bytes_consumed` at 0, then 4k, and then
-// 12k; the ordering of those individual progressions is guaranteed, but there's
-// no guarantee that an observer will see `num_parcels_consumed` as 1 at the
-// same time they see `num_bytes_consumed` as 4k.
-class alignas(8) AtomicQueueState {
- public:
- AtomicQueueState() noexcept;
-
- // Performs a best-effort query of the most recently visible value on both
- // fields and returns them as a QueryResult. `monitors` determines whether
- // each field will be atomically marked for monitoring at the same time its
- // value is retrieved.
- struct QueryResult {
- MonitoredAtomic<uint64_t>::State num_parcels_consumed;
- MonitoredAtomic<uint64_t>::State num_bytes_consumed;
- };
- struct MonitorSelection {
- bool monitor_parcels;
- bool monitor_bytes;
- };
- QueryResult Query(const MonitorSelection& monitors);
-
- // Updates both fields with new values, resetting any monitor bit that may
- // have been set on either one. If either field had a monitor bit set prior to
- // this update, this returns true. Otherwise it returns false.
- struct UpdateValue {
- uint64_t num_parcels_consumed;
- uint64_t num_bytes_consumed;
- };
- bool Update(const UpdateValue& value);
-
- private:
- // The number of parcels consumed from the router's inbound parcel queue,
- // either by the application reading from its portal, or by ipcz proxying them
- // onward to another router.
- MonitoredAtomic<uint64_t> num_parcels_consumed_{0};
-
- // The total number of bytes of data consumed from the router's inbound parcel
- // queue. This is the sum of the data size of all parcels covered by
- // `consumed_sequence_length`, plus any bytes already consumed from the
- // next parcel in sequence if it's been partially consumed..
- MonitoredAtomic<uint64_t> num_bytes_consumed_{0};
-};
-
-// This must remain stable at 16 bytes in size, as it's part of shared memory
-// layouts. Trivial copyability is also required as a proxy condition to prevent
-// changes which might break that usage (e.g. introduction of a non-trivial
-// destructor.)
-static_assert(sizeof(AtomicQueueState) == 16, "Invalid AtomicQueueState size");
-static_assert(std::is_trivially_copyable_v<AtomicQueueState>,
- "AtomicQueueState must be trivially copyable");
-
-} // namespace ipcz
-
-#endif // IPCZ_SRC_IPCZ_ATOMIC_QUEUE_STATE_
diff --git a/src/ipcz/local_router_link.cc b/src/ipcz/local_router_link.cc
index 6091424..c6df993 100644
--- a/src/ipcz/local_router_link.cc
+++ b/src/ipcz/local_router_link.cc
@@ -131,20 +131,6 @@
}
}
-AtomicQueueState* LocalRouterLink::GetPeerQueueState() {
- return &state_->link_state().GetQueueState(side_.opposite());
-}
-
-AtomicQueueState* LocalRouterLink::GetLocalQueueState() {
- return &state_->link_state().GetQueueState(side_);
-}
-
-void LocalRouterLink::SnapshotPeerQueueState(const OperationContext& context) {
- if (Ref<Router> receiver = state_->GetRouter(side_.opposite())) {
- receiver->SnapshotPeerQueueState(context);
- }
-}
-
void LocalRouterLink::AcceptRouteDisconnected(const OperationContext& context) {
if (Ref<Router> receiver = state_->GetRouter(side_.opposite())) {
receiver->AcceptRouteDisconnectedFrom(context, state_->type());
diff --git a/src/ipcz/local_router_link.h b/src/ipcz/local_router_link.h
index ccf33f2..03e66b2 100644
--- a/src/ipcz/local_router_link.h
+++ b/src/ipcz/local_router_link.h
@@ -45,9 +45,6 @@
void AcceptRouteClosure(const OperationContext& context,
SequenceNumber sequence_length) override;
void AcceptRouteDisconnected(const OperationContext& context) override;
- AtomicQueueState* GetPeerQueueState() override;
- AtomicQueueState* GetLocalQueueState() override;
- void SnapshotPeerQueueState(const OperationContext& context) override;
void MarkSideStable() override;
bool TryLockForBypass(const NodeName& bypass_request_source) override;
bool TryLockForClosure() override;
diff --git a/src/ipcz/monitored_atomic.h b/src/ipcz/monitored_atomic.h
deleted file mode 100644
index b8ec5c4..0000000
--- a/src/ipcz/monitored_atomic.h
+++ /dev/null
@@ -1,77 +0,0 @@
-// Copyright 2022 The Chromium Authors
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef IPCZ_SRC_IPCZ_MONITORED_VALUE_H_
-#define IPCZ_SRC_IPCZ_MONITORED_VALUE_H_
-
-#include <atomic>
-#include <limits>
-#include <type_traits>
-
-namespace ipcz {
-
-// MonitoredAtomic is a trivial wrapper around around an atomic unsigned
-// integral value, with the high bit reserved for primitive communication
-// between one producer and any number of concurrent consumers of the value.
-//
-// Consumers can atomically query the value while simultaneously signaling that
-// they want to be notified about the next time the value changes. Producers can
-// atomically update the value while simulataneously querying (and resetting)
-// the consumer's interest in being notified about the change.
-template <typename T>
-class MonitoredAtomic {
- static_assert(std::is_integral_v<T> && std::is_unsigned_v<T>,
- "MonitoredAtomic requires an unsigned integral type");
-
- public:
- struct State {
- T value;
- bool monitored;
- };
-
- static constexpr T kMaxValue = std::numeric_limits<T>::max() >> 1;
- static constexpr T kMonitorBit = kMaxValue + 1;
-
- MonitoredAtomic() noexcept = default;
- explicit MonitoredAtomic(T value) noexcept : value_(value) {}
-
- // Returns a best-effort snapshot of the most recent underlying value. If
- // `monitor` is true in `options`, then the stored value is also atomically
- // flagged for monitoring.
- struct QueryOptions {
- bool monitor;
- };
- State Query(const QueryOptions& options) {
- T value = value_.load(std::memory_order_relaxed);
- while (options.monitor && !IsMonitored(value) &&
- !value_.compare_exchange_weak(value, Monitored(value),
- std::memory_order_release,
- std::memory_order_relaxed)) {
- }
- return {.value = Unmonitored(value), .monitored = IsMonitored(value)};
- }
-
- // Stores a new underlying value, resetting the monitor bit if it was set.
- // Returns a boolean indicating whether the monitor bit was set.
- [[nodiscard]] bool UpdateValueAndResetMonitor(T value) {
- T old_value = value_.load(std::memory_order_relaxed);
- while (value != old_value &&
- !value_.compare_exchange_weak(old_value, value,
- std::memory_order_release,
- std::memory_order_relaxed)) {
- }
- return IsMonitored(old_value);
- }
-
- private:
- static bool IsMonitored(T value) { return (value & kMonitorBit) != 0; }
- static T Monitored(T value) { return value | kMonitorBit; }
- static T Unmonitored(T value) { return value & kMaxValue; }
-
- std::atomic<T> value_{0};
-};
-
-} // namespace ipcz
-
-#endif // IPCZ_SRC_IPCZ_MONITORED_VALUE_H_
diff --git a/src/ipcz/node_link.cc b/src/ipcz/node_link.cc
index 37d59a9..121bdd1 100644
--- a/src/ipcz/node_link.cc
+++ b/src/ipcz/node_link.cc
@@ -636,14 +636,6 @@
context, sublink->router_link->GetType());
}
-bool NodeLink::OnSnapshotPeerQueueState(msg::SnapshotPeerQueueState& snapshot) {
- const OperationContext context{OperationContext::kTransportNotification};
- if (Ref<Router> router = GetRouter(snapshot.params().sublink)) {
- router->SnapshotPeerQueueState(context);
- }
- return true;
-}
-
bool NodeLink::OnBypassPeer(msg::BypassPeer& bypass) {
absl::optional<Sublink> sublink = GetSublink(bypass.params().sublink);
if (!sublink) {
diff --git a/src/ipcz/node_link.h b/src/ipcz/node_link.h
index fcd412a..1ac415c 100644
--- a/src/ipcz/node_link.h
+++ b/src/ipcz/node_link.h
@@ -245,7 +245,6 @@
msg::AcceptParcelDriverObjects& accept) override;
bool OnRouteClosed(msg::RouteClosed& route_closed) override;
bool OnRouteDisconnected(msg::RouteDisconnected& route_disconnected) override;
- bool OnSnapshotPeerQueueState(msg::SnapshotPeerQueueState& snapshot) override;
bool OnBypassPeer(msg::BypassPeer& bypass) override;
bool OnAcceptBypassLink(msg::AcceptBypassLink& accept) override;
bool OnStopProxying(msg::StopProxying& stop) override;
diff --git a/src/ipcz/node_messages_generator.h b/src/ipcz/node_messages_generator.h
index 3d553ba..d709529 100644
--- a/src/ipcz/node_messages_generator.h
+++ b/src/ipcz/node_messages_generator.h
@@ -366,13 +366,6 @@
IPCZ_MSG_PARAM(SublinkId, sublink)
IPCZ_MSG_END()
-// Notifies a router that it may be interested in a recent change to its outward
-// peer's visible queue state.
-IPCZ_MSG_BEGIN(SnapshotPeerQueueState, IPCZ_MSG_ID(24), IPCZ_MSG_VERSION(0))
- // Identifies the router to receive this message.
- IPCZ_MSG_PARAM(SublinkId, sublink)
-IPCZ_MSG_END()
-
// Informs a router that its outward peer can be bypassed. Given routers X and Y
// on the central link, and a router Z as Y's inward peer:
//
diff --git a/src/ipcz/remote_router_link.cc b/src/ipcz/remote_router_link.cc
index c4daf80..0fdf825 100644
--- a/src/ipcz/remote_router_link.cc
+++ b/src/ipcz/remote_router_link.cc
@@ -303,26 +303,6 @@
node_link()->Transmit(route_closed);
}
-AtomicQueueState* RemoteRouterLink::GetPeerQueueState() {
- if (auto* state = GetLinkState()) {
- return &state->GetQueueState(side_.opposite());
- }
- return nullptr;
-}
-
-AtomicQueueState* RemoteRouterLink::GetLocalQueueState() {
- if (auto* state = GetLinkState()) {
- return &state->GetQueueState(side_);
- }
- return nullptr;
-}
-
-void RemoteRouterLink::SnapshotPeerQueueState(const OperationContext& context) {
- msg::SnapshotPeerQueueState snapshot;
- snapshot.params().sublink = sublink_;
- node_link()->Transmit(snapshot);
-}
-
void RemoteRouterLink::AcceptRouteDisconnected(
const OperationContext& context) {
msg::RouteDisconnected route_disconnected;
diff --git a/src/ipcz/remote_router_link.h b/src/ipcz/remote_router_link.h
index 970bf55..76b878a 100644
--- a/src/ipcz/remote_router_link.h
+++ b/src/ipcz/remote_router_link.h
@@ -65,9 +65,6 @@
void AcceptRouteClosure(const OperationContext& context,
SequenceNumber sequence_length) override;
void AcceptRouteDisconnected(const OperationContext& context) override;
- AtomicQueueState* GetPeerQueueState() override;
- AtomicQueueState* GetLocalQueueState() override;
- void SnapshotPeerQueueState(const OperationContext& context) override;
void MarkSideStable() override;
bool TryLockForBypass(const NodeName& bypass_request_source) override;
bool TryLockForClosure() override;
diff --git a/src/ipcz/router.cc b/src/ipcz/router.cc
index 280f4f2..63b0bb6 100644
--- a/src/ipcz/router.cc
+++ b/src/ipcz/router.cc
@@ -9,7 +9,6 @@
#include <cstring>
#include <utility>
-#include "ipcz/atomic_queue_state.h"
#include "ipcz/ipcz.h"
#include "ipcz/local_router_link.h"
#include "ipcz/node_link.h"
@@ -99,12 +98,6 @@
void Router::QueryStatus(IpczPortalStatus& status) {
absl::MutexLock lock(&mutex_);
- AtomicQueueState::QueryResult result;
- if (auto* state = GetPeerQueueState()) {
- result = state->Query({.monitor_parcels = false, .monitor_bytes = false});
- }
-
- UpdateStatusForPeerQueueState(result);
const size_t size = std::min(status.size, status_.size);
memcpy(&status, &status_, size);
status.size = size;
@@ -218,8 +211,6 @@
}
const OperationContext context{OperationContext::kAPICall};
- SnapshotPeerQueueState(context);
-
absl::MutexLock lock(&mutex_);
if (status_.num_remote_parcels >= limits.max_queued_parcels ||
status_.num_remote_bytes >= limits.max_queued_bytes) {
@@ -392,53 +383,6 @@
return true;
}
-void Router::SnapshotPeerQueueState(const OperationContext& context) {
- TrapEventDispatcher dispatcher;
- absl::ReleasableMutexLock lock(&mutex_);
- Ref<RouterLink> outward_link = outward_edge_.primary_link();
- if (!outward_link || !outward_link->GetType().is_central() || inward_edge_) {
- return;
- }
-
- AtomicQueueState* peer_state = outward_link->GetPeerQueueState();
- if (!peer_state) {
- lock.Release();
- // Try again after we have RouterLinkState access.
- outward_link->WaitForLinkStateAsync([self = WrapRefCounted(this), context] {
- self->SnapshotPeerQueueState(context);
- });
- return;
- }
-
- // Start with a cheaper snapshot, which may be good enough.
- const AtomicQueueState::QueryResult state =
- peer_state->Query({.monitor_parcels = false, .monitor_bytes = false});
- UpdateStatusForPeerQueueState(state);
- traps_.UpdatePortalStatus(context, status_,
- TrapSet::UpdateReason::kRemoteActivity, dispatcher);
- if (!traps_.need_remote_state()) {
- return;
- }
-
- const bool monitor_sequence_length =
- traps_.need_remote_parcels() && !state.num_parcels_consumed.monitored;
- const bool monitor_num_bytes =
- traps_.need_remote_bytes() && !state.num_bytes_consumed.monitored;
- if (!monitor_sequence_length && !monitor_num_bytes) {
- return;
- }
-
- // We have at least one trap interested in remote queue state, the caller
- // requested monitoring, and the state isn't currently being monitored. Take
- // another snapshot, this time flipping any appropriate monitor bits.
- UpdateStatusForPeerQueueState(peer_state->Query({
- .monitor_parcels = traps_.need_remote_parcels(),
- .monitor_bytes = traps_.need_remote_bytes(),
- }));
- traps_.UpdatePortalStatus(context, status_,
- TrapSet::UpdateReason::kRemoteActivity, dispatcher);
-}
-
IpczResult Router::GetNextInboundParcel(IpczGetFlags flags,
void* data,
size_t* num_bytes,
@@ -447,7 +391,6 @@
IpczHandle* parcel) {
const OperationContext context{OperationContext::kAPICall};
TrapEventDispatcher dispatcher;
- Ref<RouterLink> link_to_notify;
Parcel consumed_parcel;
{
absl::MutexLock lock(&mutex_);
@@ -511,13 +454,6 @@
traps_.UpdatePortalStatus(context, status_,
TrapSet::UpdateReason::kLocalParcelConsumed,
dispatcher);
- if (RefreshLocalQueueState()) {
- link_to_notify = outward_edge_.primary_link();
- }
- }
-
- if (link_to_notify) {
- link_to_notify->SnapshotPeerQueueState(context);
}
if (parcel) {
@@ -561,7 +497,6 @@
IpczResult Router::CommitGetNextIncomingParcel(size_t num_data_bytes_consumed,
absl::Span<IpczHandle> handles) {
const OperationContext context{OperationContext::kAPICall};
- Ref<RouterLink> link_to_notify;
TrapEventDispatcher dispatcher;
{
absl::MutexLock lock(&mutex_);
@@ -589,15 +524,7 @@
traps_.UpdatePortalStatus(context, status_,
TrapSet::UpdateReason::kLocalParcelConsumed,
dispatcher);
- if (RefreshLocalQueueState()) {
- link_to_notify = outward_edge_.primary_link();
- }
}
-
- if (link_to_notify) {
- link_to_notify->SnapshotPeerQueueState(context);
- }
-
return IPCZ_RESULT_OK;
}
@@ -607,39 +534,6 @@
IpczTrapConditionFlags* satisfied_condition_flags,
IpczPortalStatus* status) {
absl::MutexLock lock(&mutex_);
-
- const bool need_remote_parcels =
- (conditions.flags & IPCZ_TRAP_BELOW_MAX_REMOTE_PARCELS) != 0;
- const bool need_remote_bytes =
- (conditions.flags & IPCZ_TRAP_BELOW_MAX_REMOTE_BYTES) != 0;
- if (need_remote_parcels || need_remote_bytes) {
- if (AtomicQueueState* peer_state = GetPeerQueueState()) {
- const AtomicQueueState::QueryResult state =
- peer_state->Query({.monitor_parcels = false, .monitor_bytes = false});
- UpdateStatusForPeerQueueState(state);
-
- // If the status already meets some conditions and would block trap
- // installation, OR if it's already being monitored for changes, we can
- // just go ahead and install the trap. Otherwise we have to re-query and
- // set any monitoring bits ourselves.
- const bool monitor_parcels =
- need_remote_parcels && !state.num_parcels_consumed.monitored;
- const bool monitor_bytes =
- need_remote_bytes && !state.num_bytes_consumed.monitored;
- if (!TrapSet::GetSatisfiedConditions(conditions, status_) &&
- (monitor_parcels || monitor_bytes)) {
- UpdateStatusForPeerQueueState(
- peer_state->Query({.monitor_parcels = need_remote_parcels,
- .monitor_bytes = need_remote_bytes}));
- }
- } else {
- status_.num_remote_parcels =
- outbound_parcels_.GetCurrentSequenceLength().value();
- status_.num_remote_bytes = saturated_cast<size_t>(
- outbound_parcels_.GetTotalElementSizeQueuedSoFar());
- }
- }
-
return traps_.Add(conditions, handler, context, status_,
satisfied_condition_flags, status);
}
@@ -1411,8 +1305,6 @@
bool inward_link_decayed = false;
bool outward_link_decayed = false;
bool dropped_last_decaying_link = false;
- bool snapshot_peer_queue_state = false;
- bool peer_needs_local_state_update = false;
ParcelsToFlush parcels_to_flush;
{
absl::MutexLock lock(&mutex_);
@@ -1425,8 +1317,6 @@
decaying_inward_link =
inward_edge_ ? inward_edge_->decaying_link() : nullptr;
on_central_link = outward_link && outward_link->GetType().is_central();
- snapshot_peer_queue_state = on_central_link && traps_.need_remote_state();
- peer_needs_local_state_update = on_central_link && RefreshLocalQueueState();
if (bridge_) {
// Bridges have either a primary link or decaying link, but never both.
bridge_link = bridge_->primary_link() ? bridge_->primary_link()
@@ -1572,14 +1462,6 @@
return;
}
- if (snapshot_peer_queue_state) {
- SnapshotPeerQueueState(context);
- }
-
- if (peer_needs_local_state_update) {
- outward_link->SnapshotPeerQueueState(context);
- }
-
if (!dropped_last_decaying_link && behavior != kForceProxyBypassAttempt) {
// No relevant state changes, so there are no new bypass opportunities.
return;
@@ -1594,72 +1476,6 @@
}
}
-AtomicQueueState* Router::GetPeerQueueState() {
- if (!outward_edge_.primary_link()) {
- return nullptr;
- }
-
- if (!outward_edge_.primary_link()->GetType().is_central()) {
- return nullptr;
- }
-
- return outward_edge_.primary_link()->GetPeerQueueState();
-}
-
-bool Router::RefreshLocalQueueState() {
- const Ref<RouterLink>& outward_link = outward_edge_.primary_link();
- if (!outward_link) {
- return false;
- }
-
- auto* state = outward_link->GetLocalQueueState();
- if (!state) {
- return false;
- }
-
- const uint64_t num_parcels_consumed =
- inbound_parcels_.current_sequence_number().value();
- const uint64_t num_bytes_consumed =
- inbound_parcels_.total_consumed_element_size();
- if (last_queue_update_ &&
- last_queue_update_->num_parcels_consumed == num_parcels_consumed &&
- last_queue_update_->num_bytes_consumed == num_bytes_consumed) {
- // If our current status doesn't differ in some way from the last time we
- // updated the local AtomicQueueState, there's nothing to do.
- return false;
- }
-
- last_queue_update_ = AtomicQueueState::UpdateValue{
- .num_parcels_consumed = num_parcels_consumed,
- .num_bytes_consumed = num_bytes_consumed,
- };
- return state->Update(*last_queue_update_);
-}
-
-void Router::UpdateStatusForPeerQueueState(
- const AtomicQueueState::QueryResult& state) {
- // The consumed amounts should never exceed produced amounts. If they do,
- // treat them as zero.
- const uint64_t num_parcels_produced =
- outbound_parcels_.GetCurrentSequenceLength().value();
- uint64_t num_parcels_consumed = 0;
- if (state.num_parcels_consumed.value <= num_parcels_produced) {
- num_parcels_consumed = state.num_parcels_consumed.value;
- }
-
- const uint64_t num_bytes_produced =
- outbound_parcels_.GetTotalElementSizeQueuedSoFar();
- uint64_t num_bytes_consumed = 0;
- if (state.num_bytes_consumed.value <= num_bytes_produced) {
- num_bytes_consumed = state.num_bytes_consumed.value;
- }
-
- status_.num_remote_parcels =
- saturated_cast<size_t>(num_parcels_produced - num_parcels_consumed);
- status_.num_remote_bytes =
- saturated_cast<size_t>(num_bytes_produced - num_bytes_consumed);
-}
-
bool Router::MaybeStartSelfBypass(const OperationContext& context) {
Ref<RemoteRouterLink> remote_inward_link;
Ref<RemoteRouterLink> remote_outward_link;
diff --git a/src/ipcz/router.h b/src/ipcz/router.h
index 2663a42..9da6073 100644
--- a/src/ipcz/router.h
+++ b/src/ipcz/router.h
@@ -24,7 +24,6 @@
namespace ipcz {
-class AtomicQueueState;
class NodeLink;
class RemoteRouterLink;
struct RouterLinkState;
@@ -138,10 +137,6 @@
LinkType link_type,
SequenceNumber sequence_length);
- // Queries the remote peer's queue state and performs any local state upates
- // needed to reflect it. `source` indicates why the snapshot is being taken.
- void SnapshotPeerQueueState(const OperationContext& context);
-
// Accepts notification from a link bound to this Router that some node along
// the route (in the direction of that link) has been disconnected, e.g. due
// to a crash, and that the route is no longer functional as a result. This is
@@ -336,23 +331,6 @@
private:
~Router() override;
- // Returns a handle to the outward peer's queue state, if available. Otherwise
- // returns null.
- AtomicQueueState* GetPeerQueueState() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
-
- // Updates the AtomicQueueState shared with this Router's outward peer, based
- // on the current portal status. Any monitor bit set by the remote peer is
- // reset, and this returns the value of that bit prior to the reset. If this
- // returns true, the caller is responsible for notifying the remote peer about
- // a state change.
- [[nodiscard]] bool RefreshLocalQueueState()
- ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
-
- // Updates this Router's status to reflect how many parcels and total bytes of
- // parcel data remain on the remote peer's inbound queue.
- void UpdateStatusForPeerQueueState(const AtomicQueueState::QueryResult& state)
- ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
-
// Attempts to initiate bypass of this router by its peers, and ultimately to
// remove this router from its route.
//
@@ -440,11 +418,6 @@
// this router, iff this is a terminal router.
IpczPortalStatus status_ ABSL_GUARDED_BY(mutex_) = {sizeof(status_)};
- // A local cache of the most recently stored value for our own local
- // AtomicQueueState.
- absl::optional<AtomicQueueState::UpdateValue> last_queue_update_
- ABSL_GUARDED_BY(mutex_);
-
// A set of traps installed via a controlling portal where applicable. These
// traps are notified about any interesting state changes within the router.
TrapSet traps_ ABSL_GUARDED_BY(mutex_);
diff --git a/src/ipcz/router_link.h b/src/ipcz/router_link.h
index a8031b3..8a0dc22 100644
--- a/src/ipcz/router_link.h
+++ b/src/ipcz/router_link.h
@@ -10,7 +10,6 @@
#include <string>
#include <utility>
-#include "ipcz/atomic_queue_state.h"
#include "ipcz/fragment_ref.h"
#include "ipcz/link_type.h"
#include "ipcz/node_name.h"
@@ -85,19 +84,6 @@
// delivery of any further parcels.
virtual void AcceptRouteDisconnected(const OperationContext& context) = 0;
- // Returns the AtomicQueueState for the other side of this link if available.
- // Otherwise returns null.
- virtual AtomicQueueState* GetPeerQueueState() = 0;
-
- // Returns the AtomicQueueState for this side of the link if available.
- // Otherwise returns null.
- virtual AtomicQueueState* GetLocalQueueState() = 0;
-
- // Notifies the other side that this side has updated its visible queue state
- // in some way which may be interesting to them. This should be called
- // sparingly to avoid redundant IPC traffic and redundant idle wakes.
- virtual void SnapshotPeerQueueState(const OperationContext& context) = 0;
-
// Signals that this side of the link is in a stable state suitable for one
// side or the other to lock the link, either for bypass or closure
// propagation. Only once both sides are marked stable can either side lock
diff --git a/src/ipcz/router_link_state.cc b/src/ipcz/router_link_state.cc
index 441b6d4..95b335b 100644
--- a/src/ipcz/router_link_state.cc
+++ b/src/ipcz/router_link_state.cc
@@ -125,8 +125,4 @@
return true;
}
-AtomicQueueState& RouterLinkState::GetQueueState(LinkSide side) {
- return SelectBySide(side, side_a_queue_state, side_b_queue_state);
-}
-
} // namespace ipcz
diff --git a/src/ipcz/router_link_state.h b/src/ipcz/router_link_state.h
index b633688..faabae2 100644
--- a/src/ipcz/router_link_state.h
+++ b/src/ipcz/router_link_state.h
@@ -9,7 +9,6 @@
#include <cstdint>
#include <type_traits>
-#include "ipcz/atomic_queue_state.h"
#include "ipcz/ipcz.h"
#include "ipcz/link_side.h"
#include "ipcz/node_name.h"
@@ -72,14 +71,8 @@
// validate that C is an appropriate source of such a bypass request.
NodeName allowed_bypass_request_source;
- // An approximation of the queue state on each side of the link. These are
- // used both for best-effort querying of remote conditions as well as for
- // reliable synchronization against remote activity.
- AtomicQueueState side_a_queue_state;
- AtomicQueueState side_b_queue_state;
-
// More reserved slots, padding out this structure to 64 bytes.
- uint32_t reserved1[2] = {0};
+ uint32_t reserved1[10] = {0};
bool is_locked_by(LinkSide side) const {
Status s = status.load(std::memory_order_relaxed);
@@ -115,10 +108,6 @@
// attempt was made to TryLock() from that side, while the other side was
// still unstable.
bool ResetWaitingBit(LinkSide side);
-
- // Returns a view of the inbound parcel queue state for the given `side` of
- // this link.
- AtomicQueueState& GetQueueState(LinkSide side);
};
// The size of this structure is fixed at 64 bytes to ensure that it fits the
diff --git a/src/queueing_test.cc b/src/queueing_test.cc
index 635ccec..6498a27 100644
--- a/src/queueing_test.cc
+++ b/src/queueing_test.cc
@@ -44,7 +44,9 @@
Close(b);
}
-MULTINODE_TEST(QueueingTest, RemoteQueueFeedback) {
+// Disabled because remote queue state monitoring has been temporarily dropped
+// from ipcz to improve performance. See https://crbug.com/1383754.
+MULTINODE_TEST(QueueingTest, DISABLED_RemoteQueueFeedback) {
// Exercises operations which rely on feedback from the remote peer regarding
// its inbound parcel queue state.
IpczHandle c = SpawnTestNode<RemoteQueueFeedbackClient>();
@@ -129,7 +131,9 @@
Close(b);
}
-MULTINODE_TEST(QueueingTest, TwoPhaseQueueing) {
+// Disabled because remote queue state monitoring has been temporarily dropped
+// from ipcz to improve performance. See https://crbug.com/1383754.
+MULTINODE_TEST(QueueingTest, DISABLED_TwoPhaseQueueing) {
IpczHandle c = SpawnTestNode<TwoPhaseQueueingClient>();
WaitForDirectRemoteLink(c);
@@ -227,7 +231,9 @@
Close(b);
}
-MULTINODE_TEST(QueueingTest, RemoteQueueFeedbackStressTest) {
+// Disabled because remote queue state monitoring has been temporarily dropped
+// from ipcz to improve performance. See https://crbug.com/1383754.
+MULTINODE_TEST(QueueingTest, DISABLED_RemoteQueueFeedbackStressTest) {
IpczHandle c = SpawnTestNode<RemoteQueueFeedbackStressTestClient>();
size_t bytes_remaining = kStressTestPayloadSize;