ipcz: Handle spontaneous route disconnection
Until now, spontaneous route disconnection (e.g. due to node crash)
has been treated as equivalent to route closure. This is only
an approximation of the correct behavior, since spontaneous
disconnection may unavoidably lose data.
This changes how we handle it, to more aggressively tear down any
links on the affected router, before propagating that teardown further
along the route.
Additionally, we cut off the route's parcel sequence at its current
length to ensure that routers do not anticipate further parcels
which may never arrive. To support this, SequencedQueue now allows
forced shortening of its final sequence length if set; and terminal
routers must tolerate parcels arriving beyond the end of their
expected inbound sequence length.
Bug: 1299283
Change-Id: I1515d31287a6344735b6cdc99abab5cfb78416ee
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3764297
Reviewed-by: Alex Gough <ajgo@chromium.org>
Commit-Queue: Ken Rockot <rockot@google.com>
Cr-Commit-Position: refs/heads/main@{#1027313}
NOKEYCHECK=True
GitOrigin-RevId: 4b919bdacab101b8a548292a764a9f1e9d11c71e
diff --git a/src/BUILD.gn b/src/BUILD.gn
index 24af984..155fa29 100644
--- a/src/BUILD.gn
+++ b/src/BUILD.gn
@@ -320,6 +320,42 @@
include_dirs = [ "${ipcz_src_root}" ]
}
+ipcz_source_set("ipcz_test_support") {
+ testonly = true
+ public = [
+ "test/mock_driver.h",
+ "test/multinode_test.h",
+ "test/test.h",
+ "test/test_base.h",
+ "test/test_transport_listener.h",
+ ]
+
+ sources = [
+ "test/mock_driver.cc",
+ "test/multinode_test.cc",
+ "test/test_base.cc",
+ "test/test_transport_listener.cc",
+ ]
+
+ if (enable_multiprocess_tests) {
+ public += [ "test/test_child_launcher.h" ]
+ sources += [ "test/test_child_launcher.cc" ]
+ }
+
+ deps = [
+ "//testing/gmock",
+ "//testing/gtest",
+ "//third_party/abseil-cpp:absl",
+ ]
+ public_deps = [ ":test_buildflags" ]
+ ipcz_public_deps = [
+ ":impl",
+ ":ipcz",
+ ":reference_drivers",
+ ":util",
+ ]
+}
+
ipcz_source_set("ipcz_tests_sources") {
testonly = true
@@ -343,27 +379,16 @@
"ipcz/sequenced_queue_test.cc",
"reference_drivers/sync_reference_driver_test.cc",
"remote_portal_test.cc",
- "test/mock_driver.cc",
- "test/mock_driver.h",
- "test/multinode_test.cc",
- "test/multinode_test.h",
- "test/test.h",
- "test/test_base.cc",
- "test/test_base.h",
- "test/test_transport_listener.cc",
- "test/test_transport_listener.h",
"trap_test.cc",
"util/ref_counted_test.cc",
"util/stack_trace_test.cc",
]
if (enable_multiprocess_tests) {
- public = [ "test/test_child_launcher.h" ]
sources += [
"reference_drivers/memfd_memory_test.cc",
"reference_drivers/multiprocess_reference_driver_test.cc",
"reference_drivers/socket_transport_test.cc",
- "test/test_child_launcher.cc",
]
}
@@ -372,13 +397,15 @@
"//testing/gtest",
"//third_party/abseil-cpp:absl",
]
- public_deps = [ ":test_buildflags" ]
ipcz_deps = [
":impl",
":ipcz",
":util",
]
- ipcz_public_deps = [ ":reference_drivers" ]
+ ipcz_public_deps = [
+ ":ipcz_test_support",
+ ":reference_drivers",
+ ]
configs = [ ":ipcz_include_src_dir" ]
}
diff --git a/src/ipcz/local_router_link.cc b/src/ipcz/local_router_link.cc
index 5b8f590..43bb12c 100644
--- a/src/ipcz/local_router_link.cc
+++ b/src/ipcz/local_router_link.cc
@@ -91,11 +91,6 @@
return state_->GetRouter(side_.opposite()).get() == &router;
}
-bool LocalRouterLink::IsRemoteLinkTo(const NodeLink& node_link,
- SublinkId sublink) {
- return false;
-}
-
void LocalRouterLink::AcceptParcel(Parcel& parcel) {
if (Ref<Router> receiver = state_->GetRouter(side_.opposite())) {
receiver->AcceptInboundParcel(parcel);
@@ -108,6 +103,12 @@
}
}
+void LocalRouterLink::AcceptRouteDisconnected() {
+ if (Ref<Router> receiver = state_->GetRouter(side_.opposite())) {
+ receiver->AcceptRouteDisconnectedFrom(state_->type());
+ }
+}
+
void LocalRouterLink::MarkSideStable() {
state_->link_state().SetSideStable(side_);
}
diff --git a/src/ipcz/local_router_link.h b/src/ipcz/local_router_link.h
index bfb58d4..ceb2b3f 100644
--- a/src/ipcz/local_router_link.h
+++ b/src/ipcz/local_router_link.h
@@ -30,9 +30,9 @@
LinkType GetType() const override;
RouterLinkState* GetLinkState() const override;
bool HasLocalPeer(const Router& router) override;
- bool IsRemoteLinkTo(const NodeLink& node_link, SublinkId sublink) override;
void AcceptParcel(Parcel& parcel) override;
void AcceptRouteClosure(SequenceNumber sequence_length) override;
+ void AcceptRouteDisconnected() override;
void MarkSideStable() override;
bool TryLockForBypass(const NodeName& bypass_request_source) override;
bool TryLockForClosure() override;
diff --git a/src/ipcz/node.cc b/src/ipcz/node.cc
index 3ff412b..91d44f1 100644
--- a/src/ipcz/node.cc
+++ b/src/ipcz/node.cc
@@ -280,6 +280,35 @@
return true;
}
+void Node::DropLink(const NodeName& name) {
+ Ref<NodeLink> link;
+ bool lost_broker = false;
+ {
+ absl::MutexLock lock(&mutex_);
+ auto it = node_links_.find(name);
+ if (it == node_links_.end()) {
+ return;
+ }
+ link = std::move(it->second);
+ node_links_.erase(it);
+
+ DVLOG(4) << "Node " << link->local_node_name().ToString() << " dropping "
+ << " link to " << link->remote_node_name().ToString();
+ if (link == broker_link_) {
+ DVLOG(4) << "Node " << link->local_node_name().ToString()
+ << " has lost its broker link";
+ broker_link_.reset();
+ lost_broker = true;
+ }
+ }
+
+ link->Deactivate();
+
+ if (lost_broker) {
+ CancelAllIntroductions();
+ }
+}
+
void Node::ShutDown() {
NodeLinkMap node_links;
{
@@ -291,6 +320,22 @@
for (const auto& entry : node_links) {
entry.second->Deactivate();
}
+
+ CancelAllIntroductions();
+}
+
+void Node::CancelAllIntroductions() {
+ PendingIntroductionMap introductions;
+ {
+ absl::MutexLock lock(&mutex_);
+ introductions.swap(pending_introductions_);
+ }
+
+ for (auto& [name, callbacks] : introductions) {
+ for (auto& callback : callbacks) {
+ callback(nullptr);
+ }
+ }
}
} // namespace ipcz
diff --git a/src/ipcz/node.h b/src/ipcz/node.h
index 5d37738..7c6f542 100644
--- a/src/ipcz/node.h
+++ b/src/ipcz/node.h
@@ -139,6 +139,9 @@
// the broker could not satisfy the request.
bool CancelIntroduction(const NodeName& name);
+ // Drops this node's link to the named node, if one exists.
+ void DropLink(const NodeName& name);
+
private:
~Node() override;
@@ -146,6 +149,10 @@
// preparation for this node's imminent destruction.
void ShutDown();
+ // Resolves all pending introduction requests with a null link, implying
+ // failure.
+ void CancelAllIntroductions();
+
const Type type_;
const IpczDriver& driver_;
const IpczDriverHandle driver_node_;
@@ -172,8 +179,9 @@
// A map of other nodes to which this node is waiting for an introduction from
// `broker_link_`. Once such an introduction is received, all callbacks for
// that NodeName are executed.
- absl::flat_hash_map<NodeName, std::vector<EstablishLinkCallback>>
- pending_introductions_ ABSL_GUARDED_BY(mutex_);
+ using PendingIntroductionMap =
+ absl::flat_hash_map<NodeName, std::vector<EstablishLinkCallback>>;
+ PendingIntroductionMap pending_introductions_ ABSL_GUARDED_BY(mutex_);
// Nodes may race to request introductions to each other from the same broker.
// This can lead to redundant introductions being sent which the requesting
diff --git a/src/ipcz/node_connector.cc b/src/ipcz/node_connector.cc
index 73702ba..a0d834a 100644
--- a/src/ipcz/node_connector.cc
+++ b/src/ipcz/node_connector.cc
@@ -11,6 +11,7 @@
#include "ipcz/driver_transport.h"
#include "ipcz/ipcz.h"
#include "ipcz/link_side.h"
+#include "ipcz/link_type.h"
#include "ipcz/node_link.h"
#include "ipcz/node_link_memory.h"
#include "ipcz/portal.h"
@@ -264,9 +265,14 @@
std::min(max_valid_portals, waiting_portals_.size());
for (size_t i = 0; i < num_valid_portals; ++i) {
const Ref<Router> router = waiting_portals_[i]->router();
- router->SetOutwardLink(to_link->AddRemoteRouterLink(
+ Ref<RouterLink> link = to_link->AddRemoteRouterLink(
SublinkId(i), to_link->memory().GetInitialRouterLinkState(i),
- LinkType::kCentral, link_side, router));
+ LinkType::kCentral, link_side, router);
+ if (link) {
+ router->SetOutwardLink(std::move(link));
+ } else {
+ router->AcceptRouteDisconnectedFrom(LinkType::kCentral);
+ }
}
// Elicit immediate peer closure on any surplus portals that were established
diff --git a/src/ipcz/node_link.cc b/src/ipcz/node_link.cc
index 69df0e3..be8860f 100644
--- a/src/ipcz/node_link.cc
+++ b/src/ipcz/node_link.cc
@@ -352,6 +352,19 @@
sublink->router_link->GetType(), route_closed.params().sequence_length);
}
+bool NodeLink::OnRouteDisconnected(msg::RouteDisconnected& route_closed) {
+ absl::optional<Sublink> sublink = GetSublink(route_closed.params().sublink);
+ if (!sublink) {
+ return true;
+ }
+
+ DVLOG(4) << "Accepting RouteDisconnected at "
+ << sublink->router_link->Describe();
+
+ return sublink->receiver->AcceptRouteDisconnectedFrom(
+ sublink->router_link->GetType());
+}
+
bool NodeLink::OnSetRouterLinkState(msg::SetRouterLinkState& set) {
if (set.params().descriptor.is_null()) {
return false;
@@ -380,8 +393,14 @@
}
for (auto& [id, sublink] : sublinks) {
- sublink.receiver->NotifyLinkDisconnected(*this, id);
+ DVLOG(4) << "NodeLink disconnection dropping "
+ << sublink.router_link->Describe() << " which is bound to router "
+ << sublink.receiver.get();
+ sublink.receiver->NotifyLinkDisconnected(*sublink.router_link);
}
+
+ Ref<NodeLink> self = WrapRefCounted(this);
+ node_->DropLink(remote_node_name_);
}
NodeLink::Sublink::Sublink(Ref<RemoteRouterLink> router_link,
diff --git a/src/ipcz/node_link.h b/src/ipcz/node_link.h
index 06e3c00..a463fde 100644
--- a/src/ipcz/node_link.h
+++ b/src/ipcz/node_link.h
@@ -156,6 +156,7 @@
bool OnAddBlockBuffer(msg::AddBlockBuffer& add) override;
bool OnAcceptParcel(msg::AcceptParcel& accept) override;
bool OnRouteClosed(msg::RouteClosed& route_closed) override;
+ bool OnRouteDisconnected(msg::RouteDisconnected& route_disconnected) override;
bool OnSetRouterLinkState(msg::SetRouterLinkState& set) override;
bool OnFlushRouter(msg::FlushRouter& flush) override;
void OnTransportError() override;
diff --git a/src/ipcz/node_messages_generator.h b/src/ipcz/node_messages_generator.h
index 8e98ef0..7151143 100644
--- a/src/ipcz/node_messages_generator.h
+++ b/src/ipcz/node_messages_generator.h
@@ -161,10 +161,18 @@
IPCZ_MSG_PARAM(SequenceNumber, sequence_length)
IPCZ_MSG_END()
+// Notifies a specific router that its route from the direction of this link has
+// been unexpectedly disconnected (e.g. due to a node crashing). This is
+// essentially the same as route closure but without respect for complete parcel
+// sequence delivery.
+IPCZ_MSG_BEGIN(RouteDisconnected, IPCZ_MSG_ID(23), IPCZ_MSG_VERSION(0))
+ IPCZ_MSG_PARAM(SublinkId, sublink)
+IPCZ_MSG_END()
+
// Notifies a node that the Router it has bound to `sublink` (on the
// transmitting NodeLink) now has an allocated RouterLinkState in the fragment
// identified by `descriptor`.
-IPCZ_MSG_BEGIN(SetRouterLinkState, IPCZ_MSG_ID(23), IPCZ_MSG_VERSION(0))
+IPCZ_MSG_BEGIN(SetRouterLinkState, IPCZ_MSG_ID(24), IPCZ_MSG_VERSION(0))
IPCZ_MSG_PARAM(SublinkId, sublink)
IPCZ_MSG_PARAM(FragmentDescriptor, descriptor)
IPCZ_MSG_END()
diff --git a/src/ipcz/remote_router_link.cc b/src/ipcz/remote_router_link.cc
index 6fba386..98dab6b 100644
--- a/src/ipcz/remote_router_link.cc
+++ b/src/ipcz/remote_router_link.cc
@@ -112,11 +112,6 @@
return false;
}
-bool RemoteRouterLink::IsRemoteLinkTo(const NodeLink& node_link,
- SublinkId sublink) {
- return node_link_.get() == &node_link && sublink_ == sublink;
-}
-
void RemoteRouterLink::AcceptParcel(Parcel& parcel) {
const absl::Span<Ref<APIObject>> objects = parcel.objects_view();
@@ -225,6 +220,12 @@
node_link()->Transmit(route_closed);
}
+void RemoteRouterLink::AcceptRouteDisconnected() {
+ msg::RouteDisconnected route_disconnected;
+ route_disconnected.params().sublink = sublink_;
+ node_link()->Transmit(route_disconnected);
+}
+
void RemoteRouterLink::MarkSideStable() {
side_is_stable_.store(true, std::memory_order_release);
if (RouterLinkState* state = GetLinkState()) {
diff --git a/src/ipcz/remote_router_link.h b/src/ipcz/remote_router_link.h
index a5c3aeb..e84e092 100644
--- a/src/ipcz/remote_router_link.h
+++ b/src/ipcz/remote_router_link.h
@@ -69,9 +69,9 @@
LinkType GetType() const override;
RouterLinkState* GetLinkState() const override;
bool HasLocalPeer(const Router& router) override;
- bool IsRemoteLinkTo(const NodeLink& node_link, SublinkId sublink) override;
void AcceptParcel(Parcel& parcel) override;
void AcceptRouteClosure(SequenceNumber sequence_length) override;
+ void AcceptRouteDisconnected() override;
void MarkSideStable() override;
bool TryLockForBypass(const NodeName& bypass_request_source) override;
bool TryLockForClosure() override;
diff --git a/src/ipcz/router.cc b/src/ipcz/router.cc
index 279ffc5..afb2585 100644
--- a/src/ipcz/router.cc
+++ b/src/ipcz/router.cc
@@ -59,6 +59,11 @@
Ref<RouterLink> link;
{
absl::MutexLock lock(&mutex_);
+ if (inbound_parcels_.final_sequence_length()) {
+ // If the inbound sequence is finalized, the peer portal must be gone.
+ return IPCZ_RESULT_NOT_FOUND;
+ }
+
const SequenceNumber sequence_number =
outbound_parcels_.GetCurrentSequenceLength();
parcel.set_sequence_number(sequence_number);
@@ -90,10 +95,8 @@
Ref<RouterLink> link;
{
absl::MutexLock lock(&mutex_);
- bool ok = outbound_parcels_.SetFinalSequenceLength(
+ outbound_parcels_.SetFinalSequenceLength(
outbound_parcels_.GetCurrentSequenceLength());
- ABSL_ASSERT(ok);
-
traps_.RemoveAll(dispatcher);
}
@@ -101,10 +104,22 @@
}
void Router::SetOutwardLink(Ref<RouterLink> link) {
+ ABSL_ASSERT(link);
+
{
absl::MutexLock lock(&mutex_);
ABSL_ASSERT(!outward_link_);
- outward_link_ = std::move(link);
+
+ if (!is_disconnected_) {
+ outward_link_ = std::move(link);
+ }
+ }
+
+ if (link) {
+ // If the link wasn't adopted, this Router has already been disconnected.
+ link->AcceptRouteDisconnected();
+ link->Deactivate();
+ return;
}
Flush();
@@ -116,7 +131,9 @@
absl::MutexLock lock(&mutex_);
const SequenceNumber sequence_number = parcel.sequence_number();
if (!inbound_parcels_.Push(sequence_number, std::move(parcel))) {
- return false;
+ // Unexpected route disconnection can cut off inbound sequences, so don't
+ // treat an out-of-bounds parcel as a validation failure.
+ return true;
}
status_.num_local_parcels = inbound_parcels_.GetNumAvailableElements();
@@ -144,7 +161,9 @@
// that tracks complete sequences from potentially fragmented contributions.
const SequenceNumber sequence_number = parcel.sequence_number();
if (!outbound_parcels_.Push(sequence_number, std::move(parcel))) {
- return false;
+ // Unexpected route disconnection can cut off outbound sequences, so don't
+ // treat an out-of-bounds parcel as a validation failure.
+ return true;
}
}
@@ -152,16 +171,17 @@
return true;
}
-bool Router::AcceptRouteClosureFrom(
- LinkType link_type,
- absl::optional<SequenceNumber> sequence_length) {
+bool Router::AcceptRouteClosureFrom(LinkType link_type,
+ SequenceNumber sequence_length) {
TrapEventDispatcher dispatcher;
{
absl::MutexLock lock(&mutex_);
if (link_type.is_outward()) {
- if (!inbound_parcels_.SetFinalSequenceLength(sequence_length.value_or(
- inbound_parcels_.GetCurrentSequenceLength()))) {
- return false;
+ if (!inbound_parcels_.SetFinalSequenceLength(sequence_length)) {
+ // Ignore if and only if the sequence was terminated early.
+ DVLOG(4) << "Discarding inbound route closure notification";
+ return inbound_parcels_.final_sequence_length().has_value() &&
+ *inbound_parcels_.final_sequence_length() <= sequence_length;
}
if (!inward_link_) {
@@ -173,9 +193,11 @@
dispatcher);
}
} else if (link_type.is_peripheral_inward()) {
- if (!outbound_parcels_.SetFinalSequenceLength(sequence_length.value_or(
- outbound_parcels_.GetCurrentSequenceLength()))) {
- return false;
+ if (!outbound_parcels_.SetFinalSequenceLength(sequence_length)) {
+ // Ignore if and only if the sequence was terminated early.
+ DVLOG(4) << "Discarding outbound route closure notification";
+ return outbound_parcels_.final_sequence_length().has_value() &&
+ *outbound_parcels_.final_sequence_length() <= sequence_length;
}
}
}
@@ -184,6 +206,49 @@
return true;
}
+bool Router::AcceptRouteDisconnectedFrom(LinkType link_type) {
+ TrapEventDispatcher dispatcher;
+ absl::InlinedVector<Ref<RouterLink>, 4> forwarding_links;
+ {
+ absl::MutexLock lock(&mutex_);
+
+ DVLOG(4) << "Router " << this << " disconnected from "
+ << link_type.ToString() << "link";
+
+ is_disconnected_ = true;
+ if (link_type.is_peripheral_inward()) {
+ outbound_parcels_.ForceTerminateSequence();
+ } else {
+ inbound_parcels_.ForceTerminateSequence();
+ }
+
+ // Wipe out all remaining links and propagate the disconnection over them.
+ forwarding_links.push_back(std::move(outward_link_));
+ forwarding_links.push_back(std::move(inward_link_));
+
+ if (!inward_link_) {
+ // Terminal routers may have trap events to fire.
+ status_.flags |= IPCZ_PORTAL_STATUS_PEER_CLOSED;
+ if (inbound_parcels_.IsSequenceFullyConsumed()) {
+ status_.flags |= IPCZ_PORTAL_STATUS_DEAD;
+ }
+ traps_.UpdatePortalStatus(status_, TrapSet::UpdateReason::kPeerClosed,
+ dispatcher);
+ }
+ }
+
+ for (const Ref<RouterLink>& link : forwarding_links) {
+ if (link) {
+ DVLOG(4) << "Forwarding disconnection over " << link->Describe();
+ link->AcceptRouteDisconnected();
+ link->Deactivate();
+ }
+ }
+
+ Flush();
+ return true;
+}
+
IpczResult Router::GetNextInboundParcel(IpczGetFlags flags,
void* data,
size_t* num_bytes,
@@ -248,6 +313,7 @@
// static
Ref<Router> Router::Deserialize(const RouterDescriptor& descriptor,
NodeLink& from_node_link) {
+ bool disconnected = false;
auto router = MakeRefCounted<Router>();
{
absl::MutexLock lock(&router->mutex_);
@@ -280,11 +346,15 @@
// The new portal is DOA, either because the associated NodeLink is dead,
// or the sublink ID was already in use. The latter implies a bug or bad
// behavior, but it should be harmless to ignore beyond this point.
- router->AcceptRouteClosureFrom(LinkType::kPeripheralOutward,
- descriptor.next_incoming_sequence_number);
+ disconnected = true;
}
}
+ if (disconnected) {
+ DVLOG(4) << "Disconnected new Router immediately after deserialization";
+ router->AcceptRouteDisconnectedFrom(LinkType::kPeripheralOutward);
+ }
+
router->Flush();
return router;
}
@@ -336,39 +406,27 @@
// Acquire a reference to the RemoteRouterLink created by an earlier call to
// SerializeNewRouter(). If the NodeLink has already been disconnected, this
// may be null.
- Ref<RemoteRouterLink> new_router_link;
if (auto new_sublink = to_node_link.GetSublink(descriptor.new_sublink)) {
- new_router_link = new_sublink->router_link;
- }
+ Ref<RemoteRouterLink> new_router_link = new_sublink->router_link;
+ {
+ absl::MutexLock lock(&mutex_);
+ ABSL_ASSERT(!inward_link_);
- bool deactivate_link = false;
- if (new_router_link) {
- absl::MutexLock lock(&mutex_);
- ABSL_ASSERT(!inward_link_);
-
- // It's possible that the new router was already closed and we've already
- // received a notification about this and forwarded any parcels it may have
- // sent. In that case it would be pointless to establish an inward link, so
- // we'll just drop it instead.
- if (outbound_parcels_.IsSequenceFullyConsumed()) {
- deactivate_link = true;
- } else {
- // TODO: Initiate proxy removal ASAP now that we're proxying.
- inward_link_ = new_router_link;
+ // If the new router has already been closed or disconnected, we will
+ // discard the new link to it.
+ if (!outbound_parcels_.final_sequence_length() && !is_disconnected_) {
+ // TODO: Initiate proxy removal ASAP now that we're proxying.
+ inward_link_ = std::move(new_router_link);
+ }
}
- }
- if (deactivate_link) {
- new_router_link->Deactivate();
- return;
- }
-
- if (!to_node_link.GetSublink(descriptor.new_sublink)) {
- // If the NodeLink was disconnected since we entered this method but before
- // `inward_link_` was set above, disconnection will not have been propagated
- // inward. Remedy that.
- AcceptRouteClosureFrom(LinkType::kPeripheralInward);
- return;
+ if (new_router_link) {
+ // The link was not adopted, so deactivate and discard it.
+ DVLOG(4) << "Dropping link to new router " << new_router_link->Describe();
+ new_router_link->AcceptRouteDisconnected();
+ new_router_link->Deactivate();
+ return;
+ }
}
// We may have inbound parcels queued which need to be forwarded to the new
@@ -376,32 +434,20 @@
Flush();
}
-void Router::NotifyLinkDisconnected(const NodeLink& node_link,
- SublinkId sublink) {
- Ref<RouterLink> dead_outward_link;
- SequenceNumber inbound_sequence_length;
- Ref<RouterLink> dead_inward_link;
- SequenceNumber outbound_sequence_length;
+void Router::NotifyLinkDisconnected(RemoteRouterLink& link) {
{
absl::MutexLock lock(&mutex_);
- if (outward_link_ && outward_link_->IsRemoteLinkTo(node_link, sublink)) {
- dead_outward_link = std::move(outward_link_);
- inbound_sequence_length = inbound_parcels_.GetCurrentSequenceLength();
- } else if (inward_link_ &&
- inward_link_->IsRemoteLinkTo(node_link, sublink)) {
- dead_inward_link = std::move(inward_link_);
- outbound_sequence_length = outbound_parcels_.GetCurrentSequenceLength();
+ if (outward_link_ == &link) {
+ outward_link_.reset();
+ } else if (inward_link_ == &link) {
+ inward_link_.reset();
}
}
- if (dead_outward_link) {
- AcceptRouteClosureFrom(dead_outward_link->GetType(),
- inbound_sequence_length);
- }
-
- if (dead_inward_link) {
- AcceptRouteClosureFrom(dead_inward_link->GetType(),
- outbound_sequence_length);
+ if (link.GetType().is_outward()) {
+ AcceptRouteDisconnectedFrom(LinkType::kPeripheralOutward);
+ } else {
+ AcceptRouteDisconnectedFrom(LinkType::kPeripheralInward);
}
}
diff --git a/src/ipcz/router.h b/src/ipcz/router.h
index d132947..cd98e30 100644
--- a/src/ipcz/router.h
+++ b/src/ipcz/router.h
@@ -21,6 +21,7 @@
namespace ipcz {
class NodeLink;
+class RemoteRouterLink;
// The Router is the main primitive responsible for routing parcels between ipcz
// portals. This class is thread-safe.
@@ -92,13 +93,19 @@
bool AcceptOutboundParcel(Parcel& parcel);
// Accepts notification that the other end of the route has been closed and
- // that the close end transmitted a total of `sequence_length` parcels before
- // closing. If `sequence_length` is unknown and omitted (due to closure being
- // forced by disconnection), the current sequence length in the appropriate
- // direction is used.
- bool AcceptRouteClosureFrom(
- LinkType link_type,
- absl::optional<SequenceNumber> sequence_length = absl::nullopt);
+ // that the closed end transmitted a total of `sequence_length` parcels before
+ // closing.
+ bool AcceptRouteClosureFrom(LinkType link_type,
+ SequenceNumber sequence_length);
+
+ // Accepts notification from a link bound to this Router that some node along
+ // the route (in the direction of that link) has been disconnected, e.g. due
+ // to a crash, and that the route is no longer functional as a result. This is
+ // similar to route closure, except no effort can realistically be made to
+ // deliver the complete sequence of parcels transmitted from that end of the
+ // route. `link_type` specifies the type of link which is propagating the
+ // notification to this rouer.
+ bool AcceptRouteDisconnectedFrom(LinkType link_type);
// Retrieves the next available inbound parcel from this Router, if present.
IpczResult GetNextInboundParcel(IpczGetFlags flags,
@@ -131,10 +138,10 @@
void BeginProxyingToNewRouter(NodeLink& to_node_link,
const RouterDescriptor& descriptor);
- // Notifies this Router that one of its links has been disconnected from a
- // remote node. The link is identified by a combination of a specific NodeLink
- // and SublinkId.
- void NotifyLinkDisconnected(const NodeLink& node_link, SublinkId sublink);
+ // Notifies this router that the given RemoteRouterLink has been disconnected
+ // due to an underlying NodeLink disconnection. This is only called for
+ // RemoteRouterLinks which are associated with this Router.
+ void NotifyLinkDisconnected(RemoteRouterLink& link);
// Flushes any inbound or outbound parcels, as well as any route closure
// notifications. RouterLinks which are no longer needed for the operation of
@@ -184,6 +191,10 @@
// present when received, and they are forwarded along `outward_link_` as soon
// as possible.
ParcelQueue outbound_parcels_ ABSL_GUARDED_BY(mutex_);
+
+ // Tracks whether this router has been unexpectedly disconnected from its
+ // links. This may be used to prevent additional links from being established.
+ bool is_disconnected_ ABSL_GUARDED_BY(mutex_) = false;
};
} // namespace ipcz
diff --git a/src/ipcz/router_link.h b/src/ipcz/router_link.h
index a682e18..b08942f 100644
--- a/src/ipcz/router_link.h
+++ b/src/ipcz/router_link.h
@@ -40,10 +40,6 @@
// Returns true iff this is a LocalRouterLink whose peer router is `router`.
virtual bool HasLocalPeer(const Router& router) = 0;
- // Returns true iff this is a RemoteRouterLink routing over `node_link` via
- // `sublink`.
- virtual bool IsRemoteLinkTo(const NodeLink& node_link, SublinkId sublink) = 0;
-
// Passes a parcel to the Router on the other side of this link to be queued
// and/or router further.
virtual void AcceptParcel(Parcel& parcel) = 0;
@@ -53,6 +49,12 @@
// transmitted from the closed side before it was closed.
virtual void AcceptRouteClosure(SequenceNumber sequence_length) = 0;
+ // Notifies the Router on the other side of the link that the route has been
+ // unexpectedly disconnected from this side. Unlike clean route closure above,
+ // in this case we don't know the final sequence length and can't guarantee
+ // delivery of any further parcels.
+ virtual void AcceptRouteDisconnected() = 0;
+
// Signals that this side of the link is in a stable state suitable for one
// side or the other to lock the link, either for bypass or closure
// propagation. Only once both sides are marked stable can either side lock
diff --git a/src/ipcz/sequenced_queue.h b/src/ipcz/sequenced_queue.h
index ddf67ca..c2de7f7 100644
--- a/src/ipcz/sequenced_queue.h
+++ b/src/ipcz/sequenced_queue.h
@@ -161,6 +161,25 @@
return Reallocate(length);
}
+ // Forcibly sets the final length of this queue's sequence to its currently
+ // available length. This means that if there is a gap in the available
+ // elements, the queue is cut off just before the gap and all elements beyond
+ // the gap are destroyed. If the final sequence length had already been set on
+ // this queue, this overrides that.
+ void ForceTerminateSequence() {
+ final_sequence_length_ = GetCurrentSequenceLength();
+ num_entries_ = GetNumAvailableElements();
+ if (num_entries_ == 0) {
+ storage_.clear();
+ entries_ = {};
+ return;
+ }
+
+ const size_t entries_offset = entries_.data() - storage_.data();
+ storage_.resize(entries_offset + num_entries_);
+ entries_ = EntryView(storage_.data() + entries_offset, num_entries_);
+ }
+
// Indicates whether this queue is still expecting to have more elements
// pushed. This is always true if the final sequence length has not been set
// yet.
diff --git a/src/ipcz/sequenced_queue_test.cc b/src/ipcz/sequenced_queue_test.cc
index 8c9bad9..54ed16f 100644
--- a/src/ipcz/sequenced_queue_test.cc
+++ b/src/ipcz/sequenced_queue_test.cc
@@ -69,6 +69,33 @@
EXPECT_FALSE(q.HasNextElement());
}
+TEST(SequencedQueueTest, ForceTerminateSequence) {
+ TestQueue q;
+ q.SetFinalSequenceLength(SequenceNumber(3));
+ EXPECT_TRUE(q.ExpectsMoreElements());
+ EXPECT_FALSE(q.HasNextElement());
+
+ // Push elements 0 and 2, leaving the current sequence length at 1, due to the
+ // gap in element 1.
+ EXPECT_TRUE(q.Push(SequenceNumber(0), "woot."));
+ EXPECT_TRUE(q.Push(SequenceNumber(2), "woot!"));
+ EXPECT_TRUE(q.ExpectsMoreElements());
+ EXPECT_TRUE(q.HasNextElement());
+ EXPECT_EQ(SequenceNumber(1), q.GetCurrentSequenceLength());
+
+ // We can't normally change the final sequence length once set.
+ EXPECT_FALSE(q.SetFinalSequenceLength(SequenceNumber(4)));
+ EXPECT_FALSE(q.SetFinalSequenceLength(SequenceNumber(0)));
+ EXPECT_FALSE(q.SetFinalSequenceLength(SequenceNumber(1)));
+
+ // But we can still force it to terminate at its current length. Now the gap
+ // at element 1 is irrelevant, and element 0 alone is the complete sequence.
+ q.ForceTerminateSequence();
+ EXPECT_FALSE(q.ExpectsMoreElements());
+ EXPECT_TRUE(q.HasNextElement());
+ EXPECT_FALSE(q.Push(SequenceNumber(1), "woot?"));
+}
+
TEST(SequencedQueueTest, SequenceTooLow) {
TestQueue q;