ipcz: Introduce RouteEdge
This primitive replaces individual RouterLinks on each Router and lays
the groundwork for incremental RouterLink replacement, which is needed
for proxy bypass operations.
No decaying links are actually established yet, so net behavior should
be unchanged.
Some other light refactoring is done here in preparation for the
imminent introduction of decaying links.
Bug: 1299283
Change-Id: I1baab954ad21d4ddbd4f830721af87cf1a02c596
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3767382
Commit-Queue: Ken Rockot <rockot@google.com>
Reviewed-by: Alex Gough <ajgo@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1027343}
NOKEYCHECK=True
GitOrigin-RevId: 360c6dfb28059f987cd274086904e4b1185d78f3
diff --git a/src/BUILD.gn b/src/BUILD.gn
index 155fa29..f434cab 100644
--- a/src/BUILD.gn
+++ b/src/BUILD.gn
@@ -234,6 +234,7 @@
"ipcz/portal.h",
"ipcz/ref_counted_fragment.h",
"ipcz/remote_router_link.h",
+ "ipcz/route_edge.h",
"ipcz/router.h",
"ipcz/router_link.h",
"ipcz/router_link_state.h",
@@ -282,6 +283,7 @@
"ipcz/portal.cc",
"ipcz/ref_counted_fragment.cc",
"ipcz/remote_router_link.cc",
+ "ipcz/route_edge.cc",
"ipcz/router.cc",
"ipcz/router_descriptor.cc",
"ipcz/router_descriptor.h",
@@ -375,6 +377,7 @@
"ipcz/node_test.cc",
"ipcz/parcel_queue_test.cc",
"ipcz/ref_counted_fragment_test.cc",
+ "ipcz/route_edge_test.cc",
"ipcz/router_link_test.cc",
"ipcz/sequenced_queue_test.cc",
"reference_drivers/sync_reference_driver_test.cc",
diff --git a/src/ipcz/node_name.cc b/src/ipcz/node_name.cc
index 6dd7ac1..892ea90 100644
--- a/src/ipcz/node_name.cc
+++ b/src/ipcz/node_name.cc
@@ -4,24 +4,19 @@
#include "ipcz/node_name.h"
-#include <cinttypes>
-#include <cstdint>
-#include <cstdio>
#include <string>
#include <type_traits>
#include "third_party/abseil-cpp/absl/base/macros.h"
+#include "third_party/abseil-cpp/absl/strings/str_cat.h"
namespace ipcz {
static_assert(std::is_standard_layout<NodeName>::value, "Invalid NodeName");
std::string NodeName::ToString() const {
- std::string name(33, 0);
- int length = snprintf(name.data(), name.size(), "%016" PRIx64 "%016" PRIx64,
- high_, low_);
- ABSL_ASSERT(length == 32);
- return name;
+ return absl::StrCat(absl::Hex(high_, absl::kZeroPad16),
+ absl::Hex(low_, absl::kZeroPad16));
}
} // namespace ipcz
diff --git a/src/ipcz/route_edge.cc b/src/ipcz/route_edge.cc
new file mode 100644
index 0000000..5c702a5
--- /dev/null
+++ b/src/ipcz/route_edge.cc
@@ -0,0 +1,99 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "ipcz/route_edge.h"
+
+#include <utility>
+
+#include "ipcz/router.h"
+#include "ipcz/router_link.h"
+#include "util/log.h"
+
+namespace ipcz {
+
+RouteEdge::RouteEdge() = default;
+
+RouteEdge::~RouteEdge() = default;
+
+void RouteEdge::SetPrimaryLink(Ref<RouterLink> link) {
+ ABSL_ASSERT(!primary_link_);
+ if (is_decay_deferred_) {
+ // This edge was set to decay its primary link before it had a primary link,
+ // so this primary link must be immediately set to decay.
+ is_decay_deferred_ = false;
+ decaying_link_ = std::move(link);
+ if (decaying_link_) {
+ DVLOG(4) << "Edge adopted decaying " << decaying_link_->Describe();
+ }
+ } else {
+ primary_link_ = std::move(link);
+ if (primary_link_) {
+ DVLOG(4) << "Edge adopted " << primary_link_->Describe();
+ }
+ }
+}
+
+Ref<RouterLink> RouteEdge::ReleasePrimaryLink() {
+ return std::move(primary_link_);
+}
+
+Ref<RouterLink> RouteEdge::ReleaseDecayingLink() {
+ return std::move(decaying_link_);
+}
+
+bool RouteEdge::BeginPrimaryLinkDecay() {
+ if (decaying_link_ || is_decay_deferred_) {
+ return false;
+ }
+
+ decaying_link_ = std::move(primary_link_);
+ is_decay_deferred_ = !decaying_link_;
+ return true;
+}
+
+bool RouteEdge::ShouldTransmitOnDecayingLink(SequenceNumber n) const {
+ return (decaying_link_ || is_decay_deferred_) &&
+ (!length_to_decaying_link_ || n < *length_to_decaying_link_);
+}
+
+bool RouteEdge::MaybeFinishDecay(SequenceNumber length_sent,
+ SequenceNumber length_received) {
+ if (!decaying_link_) {
+ return false;
+ }
+
+ if (!length_to_decaying_link_) {
+ DVLOG(4) << "Cannot decay yet with no known sequence length to "
+ << decaying_link_->Describe();
+ return false;
+ }
+
+ if (!length_from_decaying_link_) {
+ DVLOG(4) << "Cannot decay yet with no known sequence length to "
+ << decaying_link_->Describe();
+ return false;
+ }
+
+ if (length_sent < *length_to_decaying_link_) {
+ DVLOG(4) << "Cannot decay yet without sending full sequence up to "
+ << *length_to_decaying_link_ << " on "
+ << decaying_link_->Describe();
+ return false;
+ }
+
+ if (length_received < *length_from_decaying_link_) {
+ DVLOG(4) << "Cannot decay yet without receiving full sequence up to "
+ << *length_from_decaying_link_ << " on "
+ << decaying_link_->Describe();
+ return false;
+ }
+
+ ABSL_ASSERT(!is_decay_deferred_);
+ decaying_link_.reset();
+ length_to_decaying_link_.reset();
+ length_from_decaying_link_.reset();
+ return true;
+}
+
+} // namespace ipcz
diff --git a/src/ipcz/route_edge.h b/src/ipcz/route_edge.h
new file mode 100644
index 0000000..01079af
--- /dev/null
+++ b/src/ipcz/route_edge.h
@@ -0,0 +1,150 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef IPCZ_SRC_IPCZ_ROUTE_EDGE_H_
+#define IPCZ_SRC_IPCZ_ROUTE_EDGE_H_
+
+#include "ipcz/router_link.h"
+#include "ipcz/sequence_number.h"
+#include "ipcz/sublink_id.h"
+#include "third_party/abseil-cpp/absl/base/macros.h"
+#include "third_party/abseil-cpp/absl/types/optional.h"
+#include "util/ref_counted.h"
+
+namespace ipcz {
+
+class NodeLink;
+class Router;
+class RouterLink;
+
+// A RouteEdge is responsible for message ingress and egress on one
+// (inward-facing or outward-facing) side of a router. Every functioning router
+// has one outward edge and (if proxying) one inward edge.
+//
+// Over the course of its lifetime a RouteEdge may utilize many different
+// RouterLinks, but at any moment it has at most two: one "primary" link and one
+// "decaying" link.
+//
+// The decaying link's usage is restricted to transmission and receipt of a
+// limited range of parcels based on SequenceNumber, and once all expected
+// parcels are sent and received, the link is dropped from the edge.
+//
+// When a RouteEdge has no decaying link, it may be able to transition its
+// primary link to a decaying link, while adopting a new primary link to take
+// its place. This process of incremental link replacement is the basis for ipcz
+// route reduction.
+//
+// This object is not thread-safe.
+class RouteEdge {
+ public:
+ RouteEdge();
+ RouteEdge(const RouteEdge&) = delete;
+ RouteEdge& operator=(const RouteEdge&) = delete;
+ ~RouteEdge();
+
+ const Ref<RouterLink>& primary_link() const { return primary_link_; }
+ const Ref<RouterLink>& decaying_link() const { return decaying_link_; }
+
+ // Indicates whether this edge is stable, meaning it has no decaying link and
+ // it is not set to decay the next primary link it adopts.
+ bool is_stable() const { return !decaying_link_ && !is_decay_deferred_; }
+
+ // These accessors set the limits on the current (or deferred) decaying link.
+ // Once both of these values are set, the decaying link will be dropped from
+ // this edge as soon as the link transmits all messages up to (but not
+ // including) the SequenceNumber in `length_to_decaying_link_` AND the link
+ // receives all messages up to (but not including) the SequenceNumber in
+ // `length_from_decaying_link_`.
+ void set_length_to_decaying_link(SequenceNumber length) {
+ ABSL_ASSERT(!is_stable());
+ ABSL_ASSERT(!length_to_decaying_link_);
+ length_to_decaying_link_ = length;
+ }
+
+ void set_length_from_decaying_link(SequenceNumber length) {
+ ABSL_ASSERT(!is_stable());
+ ABSL_ASSERT(!length_from_decaying_link_);
+ length_from_decaying_link_ = length;
+ }
+
+ absl::optional<SequenceNumber> length_to_decaying_link() const {
+ return length_to_decaying_link_;
+ }
+
+ absl::optional<SequenceNumber> length_from_decaying_link() const {
+ return length_from_decaying_link_;
+ }
+
+ // Sets the primary link for this edge. Only valid to call if the edge does
+ // not currently have a primary link.
+ void SetPrimaryLink(Ref<RouterLink> link);
+
+ // Releases this edge's primary link and returns a reference to it.
+ Ref<RouterLink> ReleasePrimaryLink();
+
+ // Releases this edge's decaying link and returns a reference to it.
+ Ref<RouterLink> ReleaseDecayingLink();
+
+ // Sets the current primary link to begin decay; or if there is no primary
+ // link yet, marks this edge for deferred decay. In the latter case, the next
+ // primary link adopted by this edge will immediately begin to decay. This may
+ // only be called while the edge has no decaying link.
+ bool BeginPrimaryLinkDecay();
+
+ // Indicates whether a parcel with the given SequenceNumber should be
+ // transmitted over this edge's decaying link. If not, the parcel should be
+ // transmitted over this edge's primary link.
+ bool ShouldTransmitOnDecayingLink(SequenceNumber sequence_number) const;
+
+ // Attempts to drop this edge's decaying link, given that it has already
+ // transmitted a parcel sequence up to `length_sent` and received a parcel
+ // sequence up to `length_received`. Returns true if the decaying link was
+ // dropped, and false otherwise.
+ bool MaybeFinishDecay(SequenceNumber length_sent,
+ SequenceNumber length_received);
+
+ private:
+ // The primary link over which this edge transmits and accepts parcels and
+ // other messages. If a decaying link is also present, then the decaying link
+ // is preferred for transmission of all parcels with a SequenceNumber up to
+ // (but not including) `length_to_decaying_link_`. If that value is not set,
+ // the decaying link is always preferred when set.
+ Ref<RouterLink> primary_link_;
+
+ // If true, this edge was marked to decay its primary link before it actually
+ // acquired a primary link. In that case the next primary link adopted by
+ // this edge will be demoted immediately to a decaying link.
+ bool is_decay_deferred_ = false;
+
+ // If non-null, this is a link which used to be the edge's primary link but
+ // which is being phased out. The decaying link may continue to receive
+ // parcels, but once `length_from_decaying_link_` is set, it will only expect
+ // to receive parcels with a SequenceNumber up to (but not including) that
+ // value. Similarly, the decaying link will be preferred for message
+ // transmission as long as `length_to_decaying_link_` remains unknown, but as
+ // soon as that value is set, only parcels with a SequenceNumber up to
+ // (but not including) that value will be transmitted over this link. Once
+ // both sequence lengths are known and surpassed, the edge will drop this
+ // link.
+ Ref<RouterLink> decaying_link_;
+
+ // If present, the length of the parcel sequence after which this edge must
+ // stop using `decaying_link_` to transmit parcels. If this is 5, then the
+ // decaying link must be used to transmit any new parcels with a
+ // SequenceNumber in the range [0, 4] inclusive. Beyond that point the primary
+ // link must be used.
+ absl::optional<SequenceNumber> length_to_decaying_link_;
+
+ // If present, the length of the parcel sequence after which this edge can
+ // stop expecting to receive parcels over `decaying_link_`. If this is 7, then
+ // the Router using this edge should still expect to receive parcels from the
+ // decaying link as long as it is missing any parcel in the range [0, 6]
+ // inclusive. Beyond that point parcels should only be expected from the
+ // primary link.
+ absl::optional<SequenceNumber> length_from_decaying_link_;
+};
+
+} // namespace ipcz
+
+#endif // IPCZ_SRC_IPCZ_ROUTE_EDGE_H_
diff --git a/src/ipcz/route_edge_test.cc b/src/ipcz/route_edge_test.cc
new file mode 100644
index 0000000..53134e4
--- /dev/null
+++ b/src/ipcz/route_edge_test.cc
@@ -0,0 +1,129 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "ipcz/route_edge.h"
+
+#include <tuple>
+
+#include "ipcz/ipcz.h"
+#include "ipcz/link_type.h"
+#include "ipcz/local_router_link.h"
+#include "ipcz/router.h"
+#include "ipcz/sequence_number.h"
+#include "testing/gtest/include/gtest/gtest.h"
+#include "util/ref_counted.h"
+
+namespace ipcz {
+namespace {
+
+class RouteEdgeTest : public testing::Test {
+ public:
+ Ref<RouterLink> CreateLink() {
+ // This link does not need to be usable, it just needs to be unique. Hence
+ // we create and link two throwaway routers. and return one of their links.
+ auto a = MakeRefCounted<Router>();
+ auto b = MakeRefCounted<Router>();
+ auto [a_link, b_blink] =
+ LocalRouterLink::ConnectRouters(LinkType::kCentral, {a, b});
+ a->CloseRoute();
+ b->CloseRoute();
+ return a_link;
+ }
+};
+
+TEST_F(RouteEdgeTest, Stability) {
+ RouteEdge edge;
+
+ // Fresh edges are stable.
+ EXPECT_TRUE(edge.is_stable());
+
+ // Edges with only a primary link are stable.
+ auto link = CreateLink();
+ edge.SetPrimaryLink(link);
+ EXPECT_TRUE(edge.is_stable());
+ edge.ReleasePrimaryLink();
+
+ // Edges with a deferred decaying link are not stable.
+ edge.BeginPrimaryLinkDecay();
+ EXPECT_FALSE(edge.is_stable());
+
+ // Edges with only a decaying link are not stable. This link will be set to
+ // decay immediately due to the deferred decay from above.
+ edge.SetPrimaryLink(link);
+ EXPECT_FALSE(edge.is_stable());
+
+ // Edges with both a primary and decaying link are still not stable.
+ auto new_link = CreateLink();
+ edge.SetPrimaryLink(new_link);
+ EXPECT_FALSE(edge.is_stable());
+
+ // But once the decaying link is dropped, the edge is stable again.
+ edge.ReleaseDecayingLink();
+ EXPECT_TRUE(edge.is_stable());
+}
+
+TEST_F(RouteEdgeTest, LinkSelection) {
+ RouteEdge edge;
+ auto first_link = CreateLink();
+ auto second_link = CreateLink();
+
+ // With no primary or decaying link, the primary link is the default choice.
+ EXPECT_FALSE(edge.ShouldTransmitOnDecayingLink(SequenceNumber(0)));
+
+ // Now with only a primary link, that link is always selected.
+ edge.SetPrimaryLink(first_link);
+ EXPECT_FALSE(edge.ShouldTransmitOnDecayingLink(SequenceNumber(0)));
+ EXPECT_FALSE(edge.ShouldTransmitOnDecayingLink(SequenceNumber(5)));
+ EXPECT_FALSE(edge.ShouldTransmitOnDecayingLink(SequenceNumber(10)));
+
+ // With a decaying link but no outgoing sequence length limit, the decaying
+ // link is always selected.
+ edge.BeginPrimaryLinkDecay();
+ edge.SetPrimaryLink(second_link);
+ EXPECT_EQ(second_link, edge.primary_link());
+ EXPECT_EQ(first_link, edge.decaying_link());
+ EXPECT_TRUE(edge.ShouldTransmitOnDecayingLink(SequenceNumber(0)));
+ EXPECT_TRUE(edge.ShouldTransmitOnDecayingLink(SequenceNumber(5)));
+ EXPECT_TRUE(edge.ShouldTransmitOnDecayingLink(SequenceNumber(10)));
+
+ // Finally, with a limit on the decaying link's sequence length, selection now
+ // depends on the specific SequenceNumber being transmitted.
+ edge.set_length_to_decaying_link(SequenceNumber(5));
+ EXPECT_TRUE(edge.ShouldTransmitOnDecayingLink(SequenceNumber(0)));
+ EXPECT_TRUE(edge.ShouldTransmitOnDecayingLink(SequenceNumber(4)));
+ EXPECT_FALSE(edge.ShouldTransmitOnDecayingLink(SequenceNumber(5)));
+ EXPECT_FALSE(edge.ShouldTransmitOnDecayingLink(SequenceNumber(10)));
+}
+
+TEST_F(RouteEdgeTest, FinishDecay) {
+ RouteEdge edge;
+ auto link = CreateLink();
+ edge.SetPrimaryLink(link);
+ edge.BeginPrimaryLinkDecay();
+
+ // Decay cannot finish until inbound and outbound sequence length limits are
+ // set.
+ EXPECT_FALSE(edge.MaybeFinishDecay(SequenceNumber(0), SequenceNumber(0)));
+ edge.set_length_to_decaying_link(SequenceNumber(0));
+ EXPECT_FALSE(edge.MaybeFinishDecay(SequenceNumber(0), SequenceNumber(0)));
+ edge.set_length_from_decaying_link(SequenceNumber(0));
+ EXPECT_TRUE(edge.decaying_link());
+ EXPECT_TRUE(edge.MaybeFinishDecay(SequenceNumber(0), SequenceNumber(0)));
+ EXPECT_FALSE(edge.decaying_link());
+
+ // Decay also cannot finish while the sequence length limits have not yet
+ // been met by messages transmitted and received over the decaying link.
+ edge.SetPrimaryLink(link);
+ edge.BeginPrimaryLinkDecay();
+ edge.set_length_to_decaying_link(SequenceNumber(2));
+ edge.set_length_from_decaying_link(SequenceNumber(4));
+ EXPECT_FALSE(edge.MaybeFinishDecay(SequenceNumber(1), SequenceNumber(3)));
+ EXPECT_FALSE(edge.MaybeFinishDecay(SequenceNumber(1), SequenceNumber(4)));
+ EXPECT_TRUE(edge.decaying_link());
+ EXPECT_TRUE(edge.MaybeFinishDecay(SequenceNumber(2), SequenceNumber(4)));
+ EXPECT_FALSE(edge.decaying_link());
+}
+
+} // namespace
+} // namespace ipcz
diff --git a/src/ipcz/router.cc b/src/ipcz/router.cc
index afb2585..62e5c49 100644
--- a/src/ipcz/router.cc
+++ b/src/ipcz/router.cc
@@ -21,6 +21,48 @@
namespace ipcz {
+namespace {
+
+// Helper structure used to accumulate individual parcel flushing operations
+// within Router::Flush(), via CollectParcelsToFlush() below.
+struct ParcelToFlush {
+ // The link over which to flush this parcel.
+ RouterLink* link;
+
+ // The parcel to be flushed.
+ Parcel parcel;
+};
+
+using ParcelsToFlush = absl::InlinedVector<ParcelToFlush, 8>;
+
+// Helper which attempts to pop elements from `queue` for transmission along
+// `edge`. This terminates either when `queue` is exhausted, or the next parcel
+// in `queue` is to be transmitted over a link that is not yet known to `edge`.
+// Any successfully popped elements are accumulated at the end of `parcels`.
+void CollectParcelsToFlush(ParcelQueue& queue,
+ const RouteEdge& edge,
+ ParcelsToFlush& parcels) {
+ RouterLink* decaying_link = edge.decaying_link().get();
+ RouterLink* primary_link = edge.primary_link().get();
+ while (queue.HasNextElement()) {
+ const SequenceNumber n = queue.current_sequence_number();
+ RouterLink* link = nullptr;
+ if (decaying_link && edge.ShouldTransmitOnDecayingLink(n)) {
+ link = decaying_link;
+ } else if (primary_link && !edge.ShouldTransmitOnDecayingLink(n)) {
+ link = primary_link;
+ } else {
+ return;
+ }
+
+ ParcelToFlush& parcel = parcels.emplace_back(ParcelToFlush{.link = link});
+ const bool popped = queue.Pop(parcel.parcel);
+ ABSL_ASSERT(popped);
+ }
+}
+
+} // namespace
+
Router::Router() = default;
Router::~Router() {
@@ -49,10 +91,8 @@
bool Router::HasLocalPeer(Router& router) {
absl::MutexLock lock(&mutex_);
- if (!outward_link_) {
- return false;
- }
- return outward_link_->HasLocalPeer(router);
+ return outward_edge_.primary_link() &&
+ outward_edge_.primary_link()->HasLocalPeer(router);
}
IpczResult Router::SendOutboundParcel(Parcel& parcel) {
@@ -67,14 +107,15 @@
const SequenceNumber sequence_number =
outbound_parcels_.GetCurrentSequenceLength();
parcel.set_sequence_number(sequence_number);
- if (outward_link_ &&
+ if (outward_edge_.primary_link() &&
outbound_parcels_.MaybeSkipSequenceNumber(sequence_number)) {
+ link = outward_edge_.primary_link();
+ } else {
// If there are no unsent parcels ahead of this one in the outbound
// sequence, and we have an active outward link, we can immediately
- // transmit the parcel without any intermediate queueing step. This is the
- // most common case.
- link = outward_link_;
- } else {
+ // transmit the parcel without any intermediate queueing step. That is the
+ // most common case, but otherwise we have to queue the parcel here and it
+ // will be flushed out ASAP.
DVLOG(4) << "Queuing outbound " << parcel.Describe();
const bool push_ok =
outbound_parcels_.Push(sequence_number, std::move(parcel));
@@ -108,10 +149,8 @@
{
absl::MutexLock lock(&mutex_);
- ABSL_ASSERT(!outward_link_);
-
if (!is_disconnected_) {
- outward_link_ = std::move(link);
+ outward_edge_.SetPrimaryLink(std::move(link));
}
}
@@ -136,10 +175,13 @@
return true;
}
- status_.num_local_parcels = inbound_parcels_.GetNumAvailableElements();
- status_.num_local_bytes = inbound_parcels_.GetTotalAvailableElementSize();
- traps_.UpdatePortalStatus(status_, TrapSet::UpdateReason::kNewLocalParcel,
- dispatcher);
+ if (!inward_edge_) {
+ // If this is a terminal router, we may have trap events to fire.
+ status_.num_local_parcels = inbound_parcels_.GetNumAvailableElements();
+ status_.num_local_bytes = inbound_parcels_.GetTotalAvailableElementSize();
+ traps_.UpdatePortalStatus(status_, TrapSet::UpdateReason::kNewLocalParcel,
+ dispatcher);
+ }
}
Flush();
@@ -184,7 +226,7 @@
*inbound_parcels_.final_sequence_length() <= sequence_length;
}
- if (!inward_link_) {
+ if (!inward_edge_) {
status_.flags |= IPCZ_PORTAL_STATUS_PEER_CLOSED;
if (inbound_parcels_.IsSequenceFullyConsumed()) {
status_.flags |= IPCZ_PORTAL_STATUS_DEAD;
@@ -223,10 +265,12 @@
}
// Wipe out all remaining links and propagate the disconnection over them.
- forwarding_links.push_back(std::move(outward_link_));
- forwarding_links.push_back(std::move(inward_link_));
-
- if (!inward_link_) {
+ forwarding_links.push_back(outward_edge_.ReleasePrimaryLink());
+ forwarding_links.push_back(outward_edge_.ReleaseDecayingLink());
+ if (inward_edge_) {
+ forwarding_links.push_back(inward_edge_->ReleasePrimaryLink());
+ forwarding_links.push_back(inward_edge_->ReleaseDecayingLink());
+ } else {
// Terminal routers may have trap events to fire.
status_.flags |= IPCZ_PORTAL_STATUS_PEER_CLOSED;
if (inbound_parcels_.IsSequenceFullyConsumed()) {
@@ -336,7 +380,7 @@
descriptor.new_sublink, nullptr, LinkType::kPeripheralOutward,
LinkSide::kB, router);
if (new_link) {
- router->outward_link_ = std::move(new_link);
+ router->outward_edge_.SetPrimaryLink(std::move(new_link));
DVLOG(4) << "Route extended from "
<< from_node_link.remote_node_name().ToString() << " to "
@@ -362,6 +406,8 @@
void Router::SerializeNewRouter(NodeLink& to_node_link,
RouterDescriptor& descriptor) {
TrapEventDispatcher dispatcher;
+ const SublinkId new_sublink = to_node_link.memory().AllocateSublinkIds(1);
+ descriptor.new_sublink = new_sublink;
{
absl::MutexLock lock(&mutex_);
traps_.RemoveAll(dispatcher);
@@ -371,34 +417,42 @@
descriptor.next_incoming_sequence_number =
inbound_parcels_.current_sequence_number();
- DVLOG(4) << "Extending route to new router with outbound sequence length "
- << descriptor.next_outgoing_sequence_number
- << " and current inbound sequence number "
- << descriptor.next_incoming_sequence_number;
+ // Initialize an inward edge but with no link yet. This ensures that we
+ // don't look like a terminal router while waiting for a link to be set,
+ // which can only happen after `descriptor` is transmitted.
+ inward_edge_.emplace();
if (status_.flags & IPCZ_PORTAL_STATUS_PEER_CLOSED) {
descriptor.peer_closed = true;
descriptor.closed_peer_sequence_length =
*inbound_parcels_.final_sequence_length();
+
+ // Ensure that the new edge decays its link as soon as it has one, since
+ // we know the link will not be used.
+ inward_edge_->BeginPrimaryLinkDecay();
+ inward_edge_->set_length_to_decaying_link(
+ *inbound_parcels_.final_sequence_length());
+ inward_edge_->set_length_from_decaying_link(
+ outbound_parcels_.current_sequence_number());
}
+
+ // Once `descriptor` is transmitted to the destination node and the new
+ // Router is created there, it may immediately begin transmitting messages
+ // back to this node regarding `new_sublink`. We establish a new
+ // RemoteRouterLink now and register it to `new_sublink` on `to_node_link`,
+ // so that any such incoming messages are routed to `this`.
+ //
+ // NOTE: We do not yet provide `this` itself with a reference to the new
+ // RemoteRouterLink, because it's not yet safe for us to send messages to
+ // the remote node regarding `new_sublink`. `descriptor` must be transmitted
+ // first.
+ Ref<RemoteRouterLink> new_link = to_node_link.AddRemoteRouterLink(
+ new_sublink, nullptr, LinkType::kPeripheralInward, LinkSide::kA,
+ WrapRefCounted(this));
+
+ DVLOG(4) << "Router " << this << " extending route with tentative new "
+ << new_link->Describe();
}
-
- const SublinkId new_sublink = to_node_link.memory().AllocateSublinkIds(1);
- descriptor.new_sublink = new_sublink;
-
- // Once `descriptor` is transmitted to the destination node and the new Router
- // is created there, it may immediately begin transmitting messages back to
- // this node regarding `new_sublink`. We establish a new RemoteRouterLink now
- // and register it to `new_sublink` on `to_node_link`, so that any such
- // incoming messages are routed to `this`.
- //
- // NOTE: We do not yet provide `this` itself with a reference to the new
- // RemoteRouterLink, because it's not yet safe for us to send messages to the
- // remote node regarding `new_sublink`. `descriptor` must be transmitted
- // first.
- to_node_link.AddRemoteRouterLink(new_sublink, nullptr,
- LinkType::kPeripheralInward, LinkSide::kA,
- WrapRefCounted(this));
}
void Router::BeginProxyingToNewRouter(NodeLink& to_node_link,
@@ -410,13 +464,21 @@
Ref<RemoteRouterLink> new_router_link = new_sublink->router_link;
{
absl::MutexLock lock(&mutex_);
- ABSL_ASSERT(!inward_link_);
+ ABSL_ASSERT(inward_edge_);
// If the new router has already been closed or disconnected, we will
// discard the new link to it.
if (!outbound_parcels_.final_sequence_length() && !is_disconnected_) {
- // TODO: Initiate proxy removal ASAP now that we're proxying.
- inward_link_ = std::move(new_router_link);
+ DVLOG(4) << "Router " << this << " will proxy to new router over "
+ << new_router_link->Describe();
+
+ inward_edge_->SetPrimaryLink(std::move(new_router_link));
+
+ Ref<RouterLink> outward_link = outward_edge_.primary_link();
+ if (outward_link && outward_edge_.is_stable() &&
+ inward_edge_->is_stable()) {
+ outward_link->MarkSideStable();
+ }
}
}
@@ -437,10 +499,18 @@
void Router::NotifyLinkDisconnected(RemoteRouterLink& link) {
{
absl::MutexLock lock(&mutex_);
- if (outward_link_ == &link) {
- outward_link_.reset();
- } else if (inward_link_ == &link) {
- inward_link_.reset();
+ if (outward_edge_.primary_link() == &link) {
+ DVLOG(4) << "Primary " << link.Describe() << " disconnected";
+ outward_edge_.ReleasePrimaryLink();
+ } else if (outward_edge_.decaying_link() == &link) {
+ DVLOG(4) << "Decaying " << link.Describe() << " disconnected";
+ outward_edge_.ReleaseDecayingLink();
+ } else if (inward_edge_ && inward_edge_->primary_link() == &link) {
+ DVLOG(4) << "Primary " << link.Describe() << " disconnected";
+ inward_edge_->ReleasePrimaryLink();
+ } else if (inward_edge_ && inward_edge_->decaying_link() == &link) {
+ DVLOG(4) << "Decaying " << link.Describe() << " disconnected";
+ inward_edge_->ReleaseDecayingLink();
}
}
@@ -454,37 +524,35 @@
void Router::Flush() {
Ref<RouterLink> outward_link;
Ref<RouterLink> inward_link;
+ Ref<RouterLink> decaying_outward_link;
+ Ref<RouterLink> decaying_inward_link;
Ref<RouterLink> dead_inward_link;
Ref<RouterLink> dead_outward_link;
- absl::InlinedVector<Parcel, 2> inbound_parcels;
- absl::InlinedVector<Parcel, 2> outbound_parcels;
absl::optional<SequenceNumber> final_inward_sequence_length;
absl::optional<SequenceNumber> final_outward_sequence_length;
+ ParcelsToFlush parcels_to_flush;
{
absl::MutexLock lock(&mutex_);
- outward_link = outward_link_;
- inward_link = inward_link_;
- // Collect any outbound parcels which are safe to transmit now. Note that we
- // do not transmit anything or generally call into any RouterLinks while
- // `mutex_` is held, because such calls may ultimately re-enter this Router
+ // Acquire stack references to all links we might want to use, so it's safe
+ // to acquire additional (unmanaged) references per ParcelToFlush.
+ outward_link = outward_edge_.primary_link();
+ inward_link = inward_edge_ ? inward_edge_->primary_link() : nullptr;
+ decaying_outward_link = outward_edge_.decaying_link();
+ decaying_inward_link =
+ inward_edge_ ? inward_edge_->decaying_link() : nullptr;
+
+ // Collect any parcels which are safe to transmit now. Note that we do not
+ // transmit anything or generally call into any RouterLinks while `mutex_`
+ // is held, because such calls may ultimately re-enter this Router
// (e.g. if a link is a LocalRouterLink, or even a RemoteRouterLink with a
// fully synchronous driver.) Instead we accumulate work within this block,
// and then perform any transmissions or link deactivations after the mutex
// is released further below.
- Parcel parcel;
- while (outbound_parcels_.HasNextElement() && outward_link) {
- bool ok = outbound_parcels_.Pop(parcel);
- ABSL_ASSERT(ok);
- outbound_parcels.push_back(std::move(parcel));
- }
- // If we have an inward link, then we're a proxy. Collect any queued inbound
- // parcels to forward over that link.
- while (inbound_parcels_.HasNextElement() && inward_link) {
- bool ok = inbound_parcels_.Pop(parcel);
- ABSL_ASSERT(ok);
- inbound_parcels.push_back(std::move(parcel));
+ CollectParcelsToFlush(outbound_parcels_, outward_edge_, parcels_to_flush);
+ if (inward_edge_) {
+ CollectParcelsToFlush(inbound_parcels_, *inward_edge_, parcels_to_flush);
}
if (outward_link && outbound_parcels_.IsSequenceFullyConsumed()) {
@@ -497,11 +565,11 @@
// there are no more outbound parcels to send outward, and there no longer
// exists an ultimate destination for any forwarded inbound parcels. So we
// drop both links now.
- dead_outward_link = std::move(outward_link_);
+ dead_outward_link = outward_edge_.ReleasePrimaryLink();
} else if (!inbound_parcels_.ExpectsMoreElements()) {
// If the other end of the route is gone and we've received all its
// parcels, we can simply drop the outward link in that case.
- dead_outward_link = std::move(outward_link_);
+ dead_outward_link = outward_edge_.ReleasePrimaryLink();
}
if (inbound_parcels_.IsSequenceFullyConsumed()) {
@@ -509,18 +577,14 @@
// then we've also forwarded everything already. We can propagate closure
// inward and drop the inward link, if applicable.
final_inward_sequence_length = inbound_parcels_.final_sequence_length();
- if (inward_link_) {
- dead_inward_link = std::move(inward_link_);
+ if (inward_edge_) {
+ dead_inward_link = inward_edge_->ReleasePrimaryLink();
}
}
}
- for (Parcel& parcel : outbound_parcels) {
- outward_link->AcceptParcel(parcel);
- }
-
- for (Parcel& parcel : inbound_parcels) {
- inward_link->AcceptParcel(parcel);
+ for (ParcelToFlush& parcel : parcels_to_flush) {
+ parcel.link->AcceptParcel(parcel.parcel);
}
if (dead_outward_link) {
diff --git a/src/ipcz/router.h b/src/ipcz/router.h
index cd98e30..54b9037 100644
--- a/src/ipcz/router.h
+++ b/src/ipcz/router.h
@@ -10,6 +10,7 @@
#include "ipcz/ipcz.h"
#include "ipcz/parcel_queue.h"
+#include "ipcz/route_edge.h"
#include "ipcz/router_descriptor.h"
#include "ipcz/router_link.h"
#include "ipcz/sequence_number.h"
@@ -169,27 +170,25 @@
// traps are notified about any interesting state changes within the router.
TrapSet traps_ ABSL_GUARDED_BY(mutex_);
+ // The edge connecting this router outward to another, toward the portal on
+ // the other side of the route.
+ RouteEdge outward_edge_ ABSL_GUARDED_BY(mutex_);
+
+ // The edge connecting this router inward to another, closer to the portal on
+ // our own side of the route. Only present for proxying routers: terminal
+ // routers by definition can have no inward edge.
+ absl::optional<RouteEdge> inward_edge_ ABSL_GUARDED_BY(mutex_);
+
// Parcels received from the other end of the route. If this is a terminal
- // router, these may be retrieved by the application via a controlling portal.
+ // router, these may be retrieved by the application via a controlling portal;
+ // otherwise they will be forwarded along `inward_edge_` as soon as possible.
ParcelQueue inbound_parcels_ ABSL_GUARDED_BY(mutex_);
- // A link to this router's outward peer.
- //
- // TODO(rockot): Replace this with a dynamic link that can be incrementally
- // decayed and replaced.
- Ref<RouterLink> outward_link_ ABSL_GUARDED_BY(mutex_);
-
- // A link to this router's inward peer. Present only for proxying Routers.
- //
- // TODO(rockot): Replace this with a dynamic link that can be incrementally
- // decayed and replaced.
- Ref<RouterLink> inward_link_ ABSL_GUARDED_BY(mutex_);
-
// Parcels transmitted directly from this router (if sent by a controlling
// portal) or received from an inward peer which sent them outward toward this
// Router. These parcels generally only accumulate if there is no outward link
- // present when received, and they are forwarded along `outward_link_` as soon
- // as possible.
+ // present when attempting to transmit them, and they are forwarded along
+ // `outward_edge_` as soon as possible.
ParcelQueue outbound_parcels_ ABSL_GUARDED_BY(mutex_);
// Tracks whether this router has been unexpectedly disconnected from its