blob: c69534f21f239ee19105edd73fd324fb3c73afa8 [file] [log] [blame]
// Copyright 2022 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "ipcz/router.h"
#include <algorithm>
#include <cstddef>
#include <cstring>
#include <utility>
#include "ipcz/ipcz.h"
#include "ipcz/local_router_link.h"
#include "ipcz/node_link.h"
#include "ipcz/remote_router_link.h"
#include "ipcz/sequence_number.h"
#include "ipcz/trap_event_dispatcher.h"
#include "third_party/abseil-cpp/absl/base/macros.h"
#include "third_party/abseil-cpp/absl/container/inlined_vector.h"
#include "third_party/abseil-cpp/absl/synchronization/mutex.h"
#include "util/log.h"
#include "util/multi_mutex_lock.h"
#include "util/safe_math.h"
namespace ipcz {
namespace {
// Helper structure used to accumulate individual parcel flushing operations
// within Router::Flush(), via CollectParcelsToFlush() below.
struct ParcelToFlush {
// The link over which to flush this parcel.
RouterLink* link;
// The parcel to be flushed.
Parcel parcel;
};
using ParcelsToFlush = absl::InlinedVector<ParcelToFlush, 8>;
// Helper which attempts to pop elements from `queue` for transmission along
// `edge`. This terminates either when `queue` is exhausted, or the next parcel
// in `queue` is to be transmitted over a link that is not yet known to `edge`.
// Any successfully popped elements are accumulated at the end of `parcels`.
void CollectParcelsToFlush(ParcelQueue& queue,
const RouteEdge& edge,
ParcelsToFlush& parcels) {
RouterLink* decaying_link = edge.decaying_link().get();
RouterLink* primary_link = edge.primary_link().get();
while (queue.HasNextElement()) {
const SequenceNumber n = queue.current_sequence_number();
RouterLink* link = nullptr;
if (decaying_link && edge.ShouldTransmitOnDecayingLink(n)) {
link = decaying_link;
} else if (primary_link && !edge.ShouldTransmitOnDecayingLink(n)) {
link = primary_link;
} else {
return;
}
ParcelToFlush& parcel = parcels.emplace_back(ParcelToFlush{.link = link});
const bool popped = queue.Pop(parcel.parcel);
ABSL_ASSERT(popped);
}
}
} // namespace
Router::Router() = default;
Router::~Router() {
// A Router MUST be serialized or closed before it can be destroyed. Both
// operations clear `traps_` and imply that no further traps should be added.
absl::MutexLock lock(&mutex_);
ABSL_ASSERT(traps_.empty());
}
bool Router::IsPeerClosed() {
absl::MutexLock lock(&mutex_);
return (status_.flags & IPCZ_PORTAL_STATUS_PEER_CLOSED) != 0;
}
bool Router::IsRouteDead() {
absl::MutexLock lock(&mutex_);
return (status_.flags & IPCZ_PORTAL_STATUS_DEAD) != 0;
}
bool Router::IsOnCentralRemoteLink() {
absl::MutexLock lock(&mutex_);
// This may only be called on terminal Routers.
ABSL_ASSERT(!inward_edge_);
return outward_edge_.primary_link() && outward_edge_.is_stable() &&
outward_edge_.primary_link()->GetType().is_central() &&
!outward_edge_.primary_link()->GetLocalPeer();
}
void Router::QueryStatus(IpczPortalStatus& status) {
absl::MutexLock lock(&mutex_);
const size_t size = std::min(status.size, status_.size);
memcpy(&status, &status_, size);
status.size = size;
}
bool Router::HasLocalPeer(Router& router) {
absl::MutexLock lock(&mutex_);
return outward_edge_.primary_link() &&
outward_edge_.primary_link()->GetLocalPeer() == &router;
}
IpczResult Router::SendOutboundParcel(Parcel& parcel) {
Ref<RouterLink> link;
{
absl::MutexLock lock(&mutex_);
if (inbound_parcels_.final_sequence_length()) {
// If the inbound sequence is finalized, the peer portal must be gone.
return IPCZ_RESULT_NOT_FOUND;
}
const SequenceNumber sequence_number =
outbound_parcels_.GetCurrentSequenceLength();
parcel.set_sequence_number(sequence_number);
if (outward_edge_.primary_link() &&
outbound_parcels_.MaybeSkipSequenceNumber(sequence_number)) {
link = outward_edge_.primary_link();
} else {
// If there are no unsent parcels ahead of this one in the outbound
// sequence, and we have an active outward link, we can immediately
// transmit the parcel without any intermediate queueing step. That is the
// most common case, but otherwise we have to queue the parcel here and it
// will be flushed out ASAP.
DVLOG(4) << "Queuing outbound " << parcel.Describe();
const bool push_ok =
outbound_parcels_.Push(sequence_number, std::move(parcel));
ABSL_ASSERT(push_ok);
}
}
if (link) {
link->AcceptParcel(parcel);
} else {
Flush();
}
return IPCZ_RESULT_OK;
}
void Router::CloseRoute() {
TrapEventDispatcher dispatcher;
Ref<RouterLink> link;
{
absl::MutexLock lock(&mutex_);
outbound_parcels_.SetFinalSequenceLength(
outbound_parcels_.GetCurrentSequenceLength());
traps_.RemoveAll(dispatcher);
}
Flush();
}
void Router::SetOutwardLink(Ref<RouterLink> link) {
ABSL_ASSERT(link);
{
absl::MutexLock lock(&mutex_);
// If we have a stable inward edge (or none at all), and the outward edge
// is stable too, our new link can be marked stable from our side.
if (link->GetType().is_central() && outward_edge_.is_stable() &&
(!inward_edge_ || inward_edge_->is_stable())) {
link->MarkSideStable();
}
if (!is_disconnected_) {
outward_edge_.SetPrimaryLink(std::move(link));
}
}
if (link) {
// If the link wasn't adopted, this Router has already been disconnected.
link->AcceptRouteDisconnected();
link->Deactivate();
return;
}
Flush(kForceProxyBypassAttempt);
}
size_t Router::GetOutboundCapacityInBytes(const IpczPutLimits& limits) {
if (limits.max_queued_bytes == 0 || limits.max_queued_parcels == 0) {
return 0;
}
size_t num_queued_bytes = 0;
Ref<RouterLink> link;
{
absl::MutexLock lock(&mutex_);
if (outbound_parcels_.GetNumAvailableElements() >=
limits.max_queued_parcels) {
return 0;
}
if (outbound_parcels_.GetTotalAvailableElementSize() >
limits.max_queued_bytes) {
return 0;
}
num_queued_bytes = outbound_parcels_.GetTotalAvailableElementSize();
link = outward_edge_.primary_link();
}
size_t link_capacity =
link ? link->GetParcelCapacityInBytes(limits) : limits.max_queued_bytes;
if (link_capacity <= num_queued_bytes) {
return 0;
}
return link_capacity - num_queued_bytes;
}
size_t Router::GetInboundCapacityInBytes(const IpczPutLimits& limits) {
absl::MutexLock lock(&mutex_);
const size_t num_queued_parcels = inbound_parcels_.GetNumAvailableElements();
const size_t num_queued_bytes =
inbound_parcels_.GetTotalAvailableElementSize();
if (num_queued_bytes >= limits.max_queued_bytes ||
num_queued_parcels >= limits.max_queued_parcels) {
return 0;
}
return limits.max_queued_bytes - num_queued_bytes;
}
bool Router::AcceptInboundParcel(Parcel& parcel) {
TrapEventDispatcher dispatcher;
{
absl::MutexLock lock(&mutex_);
const SequenceNumber sequence_number = parcel.sequence_number();
if (!inbound_parcels_.Push(sequence_number, std::move(parcel))) {
// Unexpected route disconnection can cut off inbound sequences, so don't
// treat an out-of-bounds parcel as a validation failure.
return true;
}
if (!inward_edge_) {
// If this is a terminal router, we may have trap events to fire.
status_.num_local_parcels = inbound_parcels_.GetNumAvailableElements();
status_.num_local_bytes = inbound_parcels_.GetTotalAvailableElementSize();
traps_.UpdatePortalStatus(status_, TrapSet::UpdateReason::kNewLocalParcel,
dispatcher);
const Ref<RouterLink>& outward_link = outward_edge_.primary_link();
if (outward_link && outward_link->GetType().is_central()) {
outward_link->UpdateInboundQueueState(status_.num_local_parcels,
status_.num_local_bytes);
}
}
}
Flush();
return true;
}
bool Router::AcceptOutboundParcel(Parcel& parcel) {
{
absl::MutexLock lock(&mutex_);
// Proxied outbound parcels are always queued in a ParcelQueue even if they
// will be forwarded immediately. This allows us to track the full sequence
// of forwarded parcels so we can know with certainty when we're done
// forwarding.
//
// TODO: Using a queue here may increase latency along the route, because it
// it unnecessarily forces in-order forwarding. We could use an unordered
// queue for forwarding, but we'd still need some lighter-weight abstraction
// that tracks complete sequences from potentially fragmented contributions.
const SequenceNumber sequence_number = parcel.sequence_number();
if (!outbound_parcels_.Push(sequence_number, std::move(parcel))) {
// Unexpected route disconnection can cut off outbound sequences, so don't
// treat an out-of-bounds parcel as a validation failure.
return true;
}
}
Flush();
return true;
}
bool Router::AcceptRouteClosureFrom(LinkType link_type,
SequenceNumber sequence_length) {
TrapEventDispatcher dispatcher;
{
absl::MutexLock lock(&mutex_);
if (link_type.is_outward()) {
if (!inbound_parcels_.SetFinalSequenceLength(sequence_length)) {
// Ignore if and only if the sequence was terminated early.
DVLOG(4) << "Discarding inbound route closure notification";
return inbound_parcels_.final_sequence_length().has_value() &&
*inbound_parcels_.final_sequence_length() <= sequence_length;
}
if (!inward_edge_) {
status_.flags |= IPCZ_PORTAL_STATUS_PEER_CLOSED;
if (inbound_parcels_.IsSequenceFullyConsumed()) {
status_.flags |= IPCZ_PORTAL_STATUS_DEAD;
}
traps_.UpdatePortalStatus(status_, TrapSet::UpdateReason::kPeerClosed,
dispatcher);
}
} else if (link_type.is_peripheral_inward()) {
if (!outbound_parcels_.SetFinalSequenceLength(sequence_length)) {
// Ignore if and only if the sequence was terminated early.
DVLOG(4) << "Discarding outbound route closure notification";
return outbound_parcels_.final_sequence_length().has_value() &&
*outbound_parcels_.final_sequence_length() <= sequence_length;
}
}
}
Flush();
return true;
}
bool Router::AcceptRouteDisconnectedFrom(LinkType link_type) {
TrapEventDispatcher dispatcher;
absl::InlinedVector<Ref<RouterLink>, 4> forwarding_links;
{
absl::MutexLock lock(&mutex_);
DVLOG(4) << "Router " << this << " disconnected from "
<< link_type.ToString() << "link";
is_disconnected_ = true;
if (link_type.is_peripheral_inward()) {
outbound_parcels_.ForceTerminateSequence();
} else {
inbound_parcels_.ForceTerminateSequence();
}
// Wipe out all remaining links and propagate the disconnection over them.
forwarding_links.push_back(outward_edge_.ReleasePrimaryLink());
forwarding_links.push_back(outward_edge_.ReleaseDecayingLink());
if (inward_edge_) {
forwarding_links.push_back(inward_edge_->ReleasePrimaryLink());
forwarding_links.push_back(inward_edge_->ReleaseDecayingLink());
} else {
// Terminal routers may have trap events to fire.
status_.flags |= IPCZ_PORTAL_STATUS_PEER_CLOSED;
if (inbound_parcels_.IsSequenceFullyConsumed()) {
status_.flags |= IPCZ_PORTAL_STATUS_DEAD;
}
traps_.UpdatePortalStatus(status_, TrapSet::UpdateReason::kPeerClosed,
dispatcher);
}
}
for (const Ref<RouterLink>& link : forwarding_links) {
if (link) {
DVLOG(4) << "Forwarding disconnection over " << link->Describe();
link->AcceptRouteDisconnected();
link->Deactivate();
}
}
Flush();
return true;
}
void Router::NotifyPeerConsumedData() {
TrapEventDispatcher dispatcher;
{
absl::MutexLock lock(&mutex_);
const Ref<RouterLink>& outward_link = outward_edge_.primary_link();
if (!outward_link || !outward_link->GetType().is_central() ||
inward_edge_) {
return;
}
const RouterLinkState::QueueState peer_state =
outward_link->GetPeerQueueState();
status_.num_remote_parcels = peer_state.num_parcels;
status_.num_remote_bytes = peer_state.num_bytes;
traps_.UpdatePortalStatus(status_, TrapSet::UpdateReason::kRemoteActivity,
dispatcher);
if (!traps_.need_remote_state()) {
outward_link->EnablePeerMonitoring(false);
}
}
}
IpczResult Router::GetNextInboundParcel(IpczGetFlags flags,
void* data,
size_t* num_bytes,
IpczHandle* handles,
size_t* num_handles) {
TrapEventDispatcher dispatcher;
Ref<RouterLink> link_to_notify;
{
absl::MutexLock lock(&mutex_);
if (inbound_parcels_.IsSequenceFullyConsumed()) {
return IPCZ_RESULT_NOT_FOUND;
}
if (!inbound_parcels_.HasNextElement()) {
return IPCZ_RESULT_UNAVAILABLE;
}
Parcel& p = inbound_parcels_.NextElement();
const bool allow_partial = (flags & IPCZ_GET_PARTIAL) != 0;
const size_t data_capacity = num_bytes ? *num_bytes : 0;
const size_t handles_capacity = num_handles ? *num_handles : 0;
const size_t data_size =
allow_partial ? std::min(p.data_size(), data_capacity) : p.data_size();
const size_t handles_size =
allow_partial ? std::min(p.num_objects(), handles_capacity)
: p.num_objects();
if (num_bytes) {
*num_bytes = data_size;
}
if (num_handles) {
*num_handles = handles_size;
}
const bool consuming_whole_parcel =
data_capacity >= data_size && handles_capacity >= handles_size;
if (!consuming_whole_parcel && !allow_partial) {
return IPCZ_RESULT_RESOURCE_EXHAUSTED;
}
memcpy(data, p.data_view().data(), data_size);
const bool ok = inbound_parcels_.Consume(
data_size, absl::MakeSpan(handles, handles_size));
ABSL_ASSERT(ok);
status_.num_local_parcels = inbound_parcels_.GetNumAvailableElements();
status_.num_local_bytes = inbound_parcels_.GetTotalAvailableElementSize();
if (inbound_parcels_.IsSequenceFullyConsumed()) {
status_.flags |= IPCZ_PORTAL_STATUS_DEAD;
}
traps_.UpdatePortalStatus(
status_, TrapSet::UpdateReason::kLocalParcelConsumed, dispatcher);
const Ref<RouterLink>& outward_link = outward_edge_.primary_link();
if (outward_link && outward_link->GetType().is_central() &&
outward_link->UpdateInboundQueueState(status_.num_local_parcels,
status_.num_local_bytes)) {
link_to_notify = outward_link;
}
}
if (link_to_notify) {
link_to_notify->NotifyDataConsumed();
}
return IPCZ_RESULT_OK;
}
IpczResult Router::Trap(const IpczTrapConditions& conditions,
IpczTrapEventHandler handler,
uint64_t context,
IpczTrapConditionFlags* satisfied_condition_flags,
IpczPortalStatus* status) {
const bool need_remote_state =
(conditions.flags & (IPCZ_TRAP_BELOW_MAX_REMOTE_PARCELS |
IPCZ_TRAP_BELOW_MAX_REMOTE_BYTES)) != 0;
{
absl::MutexLock lock(&mutex_);
const Ref<RouterLink>& outward_link = outward_edge_.primary_link();
if (need_remote_state) {
status_.num_remote_parcels = outbound_parcels_.GetNumAvailableElements();
status_.num_remote_bytes =
outbound_parcels_.GetTotalAvailableElementSize();
if (outward_link && outward_link->GetType().is_central()) {
const RouterLinkState::QueueState peer_state =
outward_link->GetPeerQueueState();
status_.num_remote_parcels =
SaturatedAdd(status_.num_remote_parcels,
static_cast<size_t>(peer_state.num_parcels));
status_.num_remote_bytes =
SaturatedAdd(status_.num_remote_bytes,
static_cast<size_t>(peer_state.num_bytes));
}
}
const bool already_monitoring_remote_state = traps_.need_remote_state();
IpczResult result = traps_.Add(conditions, handler, context, status_,
satisfied_condition_flags, status);
if (result != IPCZ_RESULT_OK || !need_remote_state) {
return result;
}
if (!already_monitoring_remote_state) {
outward_link->EnablePeerMonitoring(true);
}
}
// Safeguard against races between remote state changes and the new trap being
// installed above. Only reached if the new trap monitors remote state.
ABSL_ASSERT(need_remote_state);
NotifyPeerConsumedData();
return IPCZ_RESULT_OK;
}
// static
Ref<Router> Router::Deserialize(const RouterDescriptor& descriptor,
NodeLink& from_node_link) {
bool disconnected = false;
auto router = MakeRefCounted<Router>();
{
absl::MutexLock lock(&router->mutex_);
router->outbound_parcels_.ResetInitialSequenceNumber(
descriptor.next_outgoing_sequence_number);
router->inbound_parcels_.ResetInitialSequenceNumber(
descriptor.next_incoming_sequence_number);
if (descriptor.peer_closed) {
router->status_.flags |= IPCZ_PORTAL_STATUS_PEER_CLOSED;
if (!router->inbound_parcels_.SetFinalSequenceLength(
descriptor.closed_peer_sequence_length)) {
return nullptr;
}
if (router->inbound_parcels_.IsSequenceFullyConsumed()) {
router->status_.flags |= IPCZ_PORTAL_STATUS_DEAD;
}
}
Ref<RemoteRouterLink> new_link = from_node_link.AddRemoteRouterLink(
descriptor.new_sublink, nullptr, LinkType::kPeripheralOutward,
LinkSide::kB, router);
if (new_link) {
router->outward_edge_.SetPrimaryLink(std::move(new_link));
DVLOG(4) << "Route extended from "
<< from_node_link.remote_node_name().ToString() << " to "
<< from_node_link.local_node_name().ToString() << " via sublink "
<< descriptor.new_sublink;
} else if (!descriptor.peer_closed) {
// The new portal is DOA, either because the associated NodeLink is dead,
// or the sublink ID was already in use. The latter implies a bug or bad
// behavior, but it should be harmless to ignore beyond this point.
disconnected = true;
}
}
if (disconnected) {
DVLOG(4) << "Disconnected new Router immediately after deserialization";
router->AcceptRouteDisconnectedFrom(LinkType::kPeripheralOutward);
}
router->Flush(kForceProxyBypassAttempt);
return router;
}
void Router::SerializeNewRouter(NodeLink& to_node_link,
RouterDescriptor& descriptor) {
TrapEventDispatcher dispatcher;
const SublinkId new_sublink = to_node_link.memory().AllocateSublinkIds(1);
descriptor.new_sublink = new_sublink;
{
absl::MutexLock lock(&mutex_);
traps_.RemoveAll(dispatcher);
descriptor.next_outgoing_sequence_number =
outbound_parcels_.GetCurrentSequenceLength();
descriptor.next_incoming_sequence_number =
inbound_parcels_.current_sequence_number();
// Initialize an inward edge but with no link yet. This ensures that we
// don't look like a terminal router while waiting for a link to be set,
// which can only happen after `descriptor` is transmitted.
inward_edge_.emplace();
if (status_.flags & IPCZ_PORTAL_STATUS_PEER_CLOSED) {
descriptor.peer_closed = true;
descriptor.closed_peer_sequence_length =
*inbound_parcels_.final_sequence_length();
// Ensure that the new edge decays its link as soon as it has one, since
// we know the link will not be used.
inward_edge_->BeginPrimaryLinkDecay();
inward_edge_->set_length_to_decaying_link(
*inbound_parcels_.final_sequence_length());
inward_edge_->set_length_from_decaying_link(
outbound_parcels_.current_sequence_number());
}
// Once `descriptor` is transmitted to the destination node and the new
// Router is created there, it may immediately begin transmitting messages
// back to this node regarding `new_sublink`. We establish a new
// RemoteRouterLink now and register it to `new_sublink` on `to_node_link`,
// so that any such incoming messages are routed to `this`.
//
// NOTE: We do not yet provide `this` itself with a reference to the new
// RemoteRouterLink, because it's not yet safe for us to send messages to
// the remote node regarding `new_sublink`. `descriptor` must be transmitted
// first.
Ref<RemoteRouterLink> new_link = to_node_link.AddRemoteRouterLink(
new_sublink, nullptr, LinkType::kPeripheralInward, LinkSide::kA,
WrapRefCounted(this));
DVLOG(4) << "Router " << this << " extending route with tentative new "
<< new_link->Describe();
}
}
void Router::BeginProxyingToNewRouter(NodeLink& to_node_link,
const RouterDescriptor& descriptor) {
// Acquire a reference to the RemoteRouterLink created by an earlier call to
// SerializeNewRouter(). If the NodeLink has already been disconnected, this
// may be null.
if (auto new_sublink = to_node_link.GetSublink(descriptor.new_sublink)) {
Ref<RemoteRouterLink> new_router_link = new_sublink->router_link;
{
absl::MutexLock lock(&mutex_);
ABSL_ASSERT(inward_edge_);
// If the new router has already been closed or disconnected, we will
// discard the new link to it.
if (!outbound_parcels_.final_sequence_length() && !is_disconnected_) {
DVLOG(4) << "Router " << this << " will proxy to new router over "
<< new_router_link->Describe();
inward_edge_->SetPrimaryLink(std::move(new_router_link));
Ref<RouterLink> outward_link = outward_edge_.primary_link();
if (outward_link && outward_edge_.is_stable() &&
inward_edge_->is_stable()) {
outward_link->MarkSideStable();
}
}
}
if (new_router_link) {
// The link was not adopted, so deactivate and discard it.
DVLOG(4) << "Dropping link to new router " << new_router_link->Describe();
new_router_link->AcceptRouteDisconnected();
new_router_link->Deactivate();
return;
}
}
// We may have inbound parcels queued which need to be forwarded to the new
// Router, so give them a chance to be flushed out.
Flush(kForceProxyBypassAttempt);
}
bool Router::BypassPeer(RemoteRouterLink& requestor,
const NodeName& bypass_target_node,
SublinkId bypass_target_sublink) {
NodeLink& from_node_link = *requestor.node_link();
// Validate that the source of this request is actually our peripheral outward
// peer, and that we are therefore its inward peer.
{
absl::MutexLock lock(&mutex_);
const Ref<RouterLink>& outward_link = outward_edge_.primary_link();
if (!outward_link) {
// This Router may have been disconnected already due to some other
// failure along the route. This is not the fault of the requestor, so we
// silently ignore the request.
return true;
}
if (outward_link != &requestor ||
!outward_link->GetType().is_peripheral_outward()) {
DLOG(ERROR) << "Rejecting RequestProxyBypass received on "
<< requestor.Describe() << " with existing "
<< outward_edge_.primary_link()->Describe();
return false;
}
}
// There are two distinct cases to handle. The first case here is when the
// proxy's outward peer lives on a different node from us.
if (bypass_target_node != from_node_link.local_node_name()) {
Ref<NodeLink> link_to_bypass_target =
from_node_link.node()->GetLink(bypass_target_node);
if (link_to_bypass_target) {
return BypassPeerWithNewRemoteLink(
requestor, *link_to_bypass_target, bypass_target_sublink,
link_to_bypass_target->memory().TryAllocateRouterLinkState());
}
// We need to establish a link to the target node before we can proceed.
from_node_link.node()->EstablishLink(
bypass_target_node,
[router = WrapRefCounted(this), requestor = WrapRefCounted(&requestor),
bypass_target_sublink](NodeLink* link_to_bypass_target) {
if (!link_to_bypass_target) {
DLOG(ERROR) << "Disconnecting Router due to failed introduction";
router->AcceptRouteDisconnectedFrom(LinkType::kPeripheralOutward);
return;
}
router->BypassPeerWithNewRemoteLink(
*requestor, *link_to_bypass_target, bypass_target_sublink,
link_to_bypass_target->memory().TryAllocateRouterLinkState());
});
return true;
}
// The second case is when the proxy's outward peer lives on our own node.
return BypassPeerWithNewLocalLink(requestor, bypass_target_sublink);
}
bool Router::AcceptBypassLink(
NodeLink& new_node_link,
SublinkId new_sublink,
FragmentRef<RouterLinkState> new_link_state,
SequenceNumber inbound_sequence_length_from_bypassed_link) {
SequenceNumber length_to_proxy_from_us;
Ref<RemoteRouterLink> old_link;
Ref<RemoteRouterLink> new_link;
{
absl::ReleasableMutexLock lock(&mutex_);
if (is_disconnected_ || !outward_edge_.primary_link()) {
// We've already been unexpectedly disconnected from the proxy, so the
// route is dysfunctional. Don't establish new links.
DVLOG(4) << "Discarding proxy bypass link due to peer disconnection";
return true;
}
old_link =
WrapRefCounted(outward_edge_.primary_link()->AsRemoteRouterLink());
if (!old_link) {
// It only makes sense to receive this at a router whose outward link is
// remote. If we have a non-remote outward link, something is wrong.
DVLOG(4) << "Rejecting unexpected bypass link";
return false;
}
if (old_link->node_link() != &new_node_link &&
!old_link->CanNodeRequestBypass(new_node_link.remote_node_name())) {
// The new link must either go to the same node as the old link, or the
// the old link must have been expecting a bypass link to the new node.
DLOG(ERROR) << "Rejecting unauthorized BypassProxy";
return false;
}
length_to_proxy_from_us = outbound_parcels_.current_sequence_number();
if (!outward_edge_.BeginPrimaryLinkDecay()) {
DLOG(ERROR) << "Rejecting BypassProxy on failure to decay link";
return false;
}
// By convention the initiator of a bypass assumes side A of the bypass
// link, so we assume side B.
new_link = new_node_link.AddRemoteRouterLink(
new_sublink, std::move(new_link_state), LinkType::kCentral,
LinkSide::kB, WrapRefCounted(this));
if (new_link) {
DVLOG(4) << "Bypassing proxy on other end of " << old_link->Describe()
<< " using a new " << new_link->Describe()
<< " with length to proxy " << length_to_proxy_from_us
<< " and length from proxy "
<< inbound_sequence_length_from_bypassed_link;
outward_edge_.set_length_to_decaying_link(length_to_proxy_from_us);
outward_edge_.set_length_from_decaying_link(
inbound_sequence_length_from_bypassed_link);
outward_edge_.SetPrimaryLink(new_link);
}
}
if (!new_link) {
AcceptRouteDisconnectedFrom(LinkType::kCentral);
return true;
}
if (new_link->node_link() == old_link->node_link()) {
// If the new link goes to the same place as the old link, we only need
// to tell the proxy there to stop proxying. It has already conspired with
// its local outward peer.
old_link->StopProxyingToLocalPeer(length_to_proxy_from_us);
} else {
// Otherwise, tell the proxy to stop proxying and let its inward peer (our
// new outward peer) know that the proxy will stop.
old_link->StopProxying(length_to_proxy_from_us,
inbound_sequence_length_from_bypassed_link);
new_link->ProxyWillStop(length_to_proxy_from_us);
}
Flush();
return true;
}
bool Router::StopProxying(SequenceNumber inbound_sequence_length,
SequenceNumber outbound_sequence_length) {
{
absl::MutexLock lock(&mutex_);
if (outward_edge_.is_stable() || !inward_edge_ ||
inward_edge_->is_stable()) {
// Proxies begin decaying their links before requesting to be bypassed,
// and they don't adopt new links after that. So if either edge is stable
// then someone is doing something wrong.
DLOG(ERROR) << "Rejecting StopProxying on invalid or non-proxying Router";
return false;
}
inward_edge_->set_length_to_decaying_link(inbound_sequence_length);
inward_edge_->set_length_from_decaying_link(outbound_sequence_length);
outward_edge_.set_length_to_decaying_link(outbound_sequence_length);
outward_edge_.set_length_from_decaying_link(inbound_sequence_length);
}
Flush();
return true;
}
bool Router::NotifyProxyWillStop(SequenceNumber inbound_sequence_length) {
{
absl::MutexLock lock(&mutex_);
if (outward_edge_.is_stable()) {
// If the outward edge is already stable, either this request is invalid,
// or we've lost all links due to disconnection. In the latter case we
// can silently ignore this, but the former case is a validation failure.
return is_disconnected_;
}
DVLOG(4) << "Bypassed proxy will stop forwarding inbound parcels after a "
<< "sequence length of " << inbound_sequence_length;
outward_edge_.set_length_from_decaying_link(inbound_sequence_length);
}
Flush();
return true;
}
bool Router::StopProxyingToLocalPeer(SequenceNumber outbound_sequence_length) {
Ref<Router> local_peer;
{
absl::MutexLock lock(&mutex_);
if (outward_edge_.decaying_link()) {
local_peer = outward_edge_.decaying_link()->GetLocalPeer();
} else {
// Ignore this request if we've been unexpectedly disconnected.
return is_disconnected_;
}
}
if (!local_peer) {
// It's invalid to send call this on a Router with a non-local outward peer.
DLOG(ERROR) << "Rejecting StopProxyingToLocalPeer with no local peer";
return false;
}
{
MultiMutexLock lock(&mutex_, &local_peer->mutex_);
const Ref<RouterLink>& our_link = outward_edge_.decaying_link();
const Ref<RouterLink>& peer_link =
local_peer->outward_edge_.decaying_link();
if (!our_link || !peer_link) {
// Either Router may have been unexpectedly disconnected, in which case
// we can ignore this request.
return true;
}
if (!inward_edge_ || our_link->GetLocalPeer() != local_peer ||
peer_link->GetLocalPeer() != this) {
// Consistency check: this must be a proxying router, and both this router
// and its local peer must link to each other.
DLOG(ERROR) << "Rejecting StopProxyingToLocalPeer at invalid proxy";
return false;
}
DVLOG(4) << "Stopping proxy with decaying "
<< inward_edge_->decaying_link()->Describe() << " and decaying "
<< our_link->Describe();
local_peer->outward_edge_.set_length_from_decaying_link(
outbound_sequence_length);
outward_edge_.set_length_to_decaying_link(outbound_sequence_length);
inward_edge_->set_length_from_decaying_link(outbound_sequence_length);
}
Flush();
local_peer->Flush();
return true;
}
void Router::NotifyLinkDisconnected(RemoteRouterLink& link) {
{
absl::MutexLock lock(&mutex_);
if (outward_edge_.primary_link() == &link) {
DVLOG(4) << "Primary " << link.Describe() << " disconnected";
outward_edge_.ReleasePrimaryLink();
} else if (outward_edge_.decaying_link() == &link) {
DVLOG(4) << "Decaying " << link.Describe() << " disconnected";
outward_edge_.ReleaseDecayingLink();
} else if (inward_edge_ && inward_edge_->primary_link() == &link) {
DVLOG(4) << "Primary " << link.Describe() << " disconnected";
inward_edge_->ReleasePrimaryLink();
} else if (inward_edge_ && inward_edge_->decaying_link() == &link) {
DVLOG(4) << "Decaying " << link.Describe() << " disconnected";
inward_edge_->ReleaseDecayingLink();
}
}
if (link.GetType().is_outward()) {
AcceptRouteDisconnectedFrom(LinkType::kPeripheralOutward);
} else {
AcceptRouteDisconnectedFrom(LinkType::kPeripheralInward);
}
}
void Router::Flush(FlushBehavior behavior) {
Ref<RouterLink> outward_link;
Ref<RouterLink> inward_link;
Ref<RouterLink> decaying_outward_link;
Ref<RouterLink> decaying_inward_link;
Ref<RouterLink> dead_inward_link;
Ref<RouterLink> dead_outward_link;
absl::optional<SequenceNumber> final_inward_sequence_length;
absl::optional<SequenceNumber> final_outward_sequence_length;
bool on_central_link = false;
bool inward_link_decayed = false;
bool outward_link_decayed = false;
bool dropped_last_decaying_link = false;
ParcelsToFlush parcels_to_flush;
{
absl::MutexLock lock(&mutex_);
// Acquire stack references to all links we might want to use, so it's safe
// to acquire additional (unmanaged) references per ParcelToFlush.
outward_link = outward_edge_.primary_link();
inward_link = inward_edge_ ? inward_edge_->primary_link() : nullptr;
decaying_outward_link = outward_edge_.decaying_link();
decaying_inward_link =
inward_edge_ ? inward_edge_->decaying_link() : nullptr;
on_central_link = outward_link && outward_link->GetType().is_central();
// Collect any parcels which are safe to transmit now. Note that we do not
// transmit anything or generally call into any RouterLinks while `mutex_`
// is held, because such calls may ultimately re-enter this Router
// (e.g. if a link is a LocalRouterLink, or even a RemoteRouterLink with a
// fully synchronous driver.) Instead we accumulate work within this block,
// and then perform any transmissions or link deactivations after the mutex
// is released further below.
CollectParcelsToFlush(outbound_parcels_, outward_edge_, parcels_to_flush);
const SequenceNumber outbound_sequence_length_sent =
outbound_parcels_.current_sequence_number();
const SequenceNumber inbound_sequence_length_received =
inbound_parcels_.GetCurrentSequenceLength();
if (outward_edge_.MaybeFinishDecay(outbound_sequence_length_sent,
inbound_sequence_length_received)) {
DVLOG(4) << "Outward " << decaying_outward_link->Describe()
<< " fully decayed at " << outbound_sequence_length_sent
<< " sent and " << inbound_sequence_length_received
<< " recived";
outward_link_decayed = true;
}
if (inward_edge_) {
CollectParcelsToFlush(inbound_parcels_, *inward_edge_, parcels_to_flush);
const SequenceNumber inbound_sequence_length_sent =
inbound_parcels_.current_sequence_number();
const SequenceNumber outbound_sequence_length_received =
outbound_parcels_.GetCurrentSequenceLength();
if (inward_edge_->MaybeFinishDecay(inbound_sequence_length_sent,
outbound_sequence_length_received)) {
DVLOG(4) << "Inward " << decaying_inward_link->Describe()
<< " fully decayed at " << inbound_sequence_length_sent
<< " sent and " << outbound_sequence_length_received
<< " received";
inward_link_decayed = true;
}
}
// If we're dropping the last of our decaying links, our outward link may
// now be stable. This may unblock proxy bypass or other operations.
const bool inward_edge_stable =
!decaying_inward_link || inward_link_decayed;
const bool outward_edge_stable =
outward_link && (!decaying_outward_link || outward_link_decayed);
const bool both_edges_stable = inward_edge_stable && outward_edge_stable;
const bool either_link_decayed =
inward_link_decayed || outward_link_decayed;
if (on_central_link && either_link_decayed && both_edges_stable) {
DVLOG(4) << "Router with fully decayed links may be eligible for bypass "
<< " with outward " << outward_link->Describe();
outward_link->MarkSideStable();
dropped_last_decaying_link = true;
}
if (on_central_link && outbound_parcels_.IsSequenceFullyConsumed() &&
outward_link->TryLockForClosure()) {
// Notify the other end of the route that this end is closed. See the
// AcceptRouteClosure() invocation further below.
final_outward_sequence_length =
*outbound_parcels_.final_sequence_length();
// We also have no more use for either outward or inward links: trivially
// there are no more outbound parcels to send outward, and there no longer
// exists an ultimate destination for any forwarded inbound parcels. So we
// drop both links now.
dead_outward_link = outward_edge_.ReleasePrimaryLink();
} else if (!inbound_parcels_.ExpectsMoreElements()) {
// If the other end of the route is gone and we've received all its
// parcels, we can simply drop the outward link in that case.
dead_outward_link = outward_edge_.ReleasePrimaryLink();
}
if (inbound_parcels_.IsSequenceFullyConsumed()) {
// We won't be receiving anything new from our peer, and if we're a proxy
// then we've also forwarded everything already. We can propagate closure
// inward and drop the inward link, if applicable.
final_inward_sequence_length = inbound_parcels_.final_sequence_length();
if (inward_edge_) {
dead_inward_link = inward_edge_->ReleasePrimaryLink();
}
}
}
for (ParcelToFlush& parcel : parcels_to_flush) {
parcel.link->AcceptParcel(parcel.parcel);
}
if (outward_link_decayed) {
decaying_outward_link->Deactivate();
}
if (inward_link_decayed) {
decaying_inward_link->Deactivate();
}
if (dead_outward_link) {
if (final_outward_sequence_length) {
dead_outward_link->AcceptRouteClosure(*final_outward_sequence_length);
}
dead_outward_link->Deactivate();
}
if (dead_inward_link) {
if (final_inward_sequence_length) {
dead_inward_link->AcceptRouteClosure(*final_inward_sequence_length);
}
dead_inward_link->Deactivate();
}
if (dead_outward_link || !on_central_link) {
// If we're not on a central link, there's no more work to do.
return;
}
if (!dropped_last_decaying_link && behavior != kForceProxyBypassAttempt) {
// No relevant state changes, so there are no new bypass opportunities.
return;
}
if (inward_link && MaybeStartSelfBypass()) {
return;
}
if (outward_link) {
outward_link->FlushOtherSideIfWaiting();
}
}
bool Router::MaybeStartSelfBypass() {
Ref<RemoteRouterLink> remote_inward_link;
Ref<RemoteRouterLink> remote_outward_link;
Ref<Router> local_outward_peer;
{
absl::MutexLock lock(&mutex_);
if (!inward_edge_ || !inward_edge_->primary_link() ||
!inward_edge_->is_stable()) {
// Only a proxy with stable links can be bypassed.
return false;
}
const Ref<RouterLink>& outward_link = outward_edge_.primary_link();
RemoteRouterLink* inward_link =
inward_edge_->primary_link()->AsRemoteRouterLink();
if (!outward_link || !inward_link) {
return false;
}
const NodeName& inward_peer_name =
inward_link->node_link()->remote_node_name();
if (!outward_link->TryLockForBypass(inward_peer_name)) {
DVLOG(4) << "Proxy bypass blocked by busy " << outward_link->Describe();
return false;
}
remote_inward_link = WrapRefCounted(inward_link);
local_outward_peer = outward_link->GetLocalPeer();
if (!local_outward_peer) {
remote_outward_link = WrapRefCounted(outward_link->AsRemoteRouterLink());
}
}
if (remote_outward_link) {
// The simpler case here: our outward peer is on another node, so we begin
// decaying our inward and outward links and ask the inward peer to bypass
// us ASAP.
{
absl::MutexLock lock(&mutex_);
if (!inward_edge_ || !inward_edge_->primary_link() ||
!outward_edge_.primary_link()) {
// We've been disconnected since leaving the block above. Nothing to do.
return false;
}
outward_edge_.BeginPrimaryLinkDecay();
inward_edge_->BeginPrimaryLinkDecay();
}
DVLOG(4) << "Proxy sending bypass request to inward peer over "
<< remote_inward_link->Describe()
<< " targeting outward peer on other side of "
<< remote_outward_link->Describe();
remote_inward_link->BypassPeer(
remote_outward_link->node_link()->remote_node_name(),
remote_outward_link->sublink());
return true;
}
// When the bypass target is local to the same node as this router, we can
// establish the bypass link immediately and send it to the remote inward
// peer.
return StartSelfBypassToLocalPeer(
*local_outward_peer, *remote_inward_link,
remote_inward_link->node_link()->memory().TryAllocateRouterLinkState());
}
bool Router::StartSelfBypassToLocalPeer(
Router& local_outward_peer,
RemoteRouterLink& inward_link,
FragmentRef<RouterLinkState> new_link_state) {
if (new_link_state.is_null()) {
NodeLinkMemory& memory = inward_link.node_link()->memory();
memory.AllocateRouterLinkState(
[router = WrapRefCounted(this),
local_outward_peer = WrapRefCounted(&local_outward_peer),
inward_link = WrapRefCounted(&inward_link)](
FragmentRef<RouterLinkState> new_link_state) {
router->StartSelfBypassToLocalPeer(*local_outward_peer, *inward_link,
std::move(new_link_state));
});
return true;
}
Ref<RemoteRouterLink> new_link;
SequenceNumber length_from_outward_peer;
const SublinkId new_sublink =
inward_link.node_link()->memory().AllocateSublinkIds(1);
{
MultiMutexLock lock(&mutex_, &local_outward_peer.mutex_);
const Ref<RouterLink>& outward_link = outward_edge_.primary_link();
const Ref<RouterLink>& peer_outward_link =
local_outward_peer.outward_edge_.primary_link();
if (!outward_link || !peer_outward_link || is_disconnected_ ||
local_outward_peer.is_disconnected_) {
DVLOG(4) << "Proxy bypass blocked due to peer closure or disconnection";
return false;
}
DVLOG(4) << "Proxy requesting own bypass from inward peer on "
<< inward_link.node_link()->remote_node_name().ToString()
<< " to local outward peer";
ABSL_ASSERT(outward_link->GetLocalPeer() == &local_outward_peer);
ABSL_ASSERT(peer_outward_link->GetLocalPeer() == this);
// Decay both of our existing links, as well as the local peer's link to us.
length_from_outward_peer =
local_outward_peer.outbound_parcels_.current_sequence_number();
local_outward_peer.outward_edge_.BeginPrimaryLinkDecay();
local_outward_peer.outward_edge_.set_length_to_decaying_link(
length_from_outward_peer);
outward_edge_.BeginPrimaryLinkDecay();
outward_edge_.set_length_from_decaying_link(length_from_outward_peer);
inward_edge_->BeginPrimaryLinkDecay();
inward_edge_->set_length_to_decaying_link(length_from_outward_peer);
new_link = inward_link.node_link()->AddRemoteRouterLink(
new_sublink, new_link_state, LinkType::kCentral, LinkSide::kA,
WrapRefCounted(&local_outward_peer));
}
if (!new_link) {
AcceptRouteDisconnectedFrom(LinkType::kCentral);
return false;
}
// Inform our inward peer on another node that they can bypass us using the
// new link we just created to our own outward local peer. Once that message
// is sent, it's safe for that local peer to adopt the new link.
inward_link.BypassPeerWithLink(new_sublink, std::move(new_link_state),
length_from_outward_peer);
local_outward_peer.SetOutwardLink(std::move(new_link));
return true;
}
bool Router::BypassPeerWithNewRemoteLink(
RemoteRouterLink& requestor,
NodeLink& node_link,
SublinkId bypass_target_sublink,
FragmentRef<RouterLinkState> new_link_state) {
if (new_link_state.is_null()) {
// We can't proceed with bypass until we have a fragment allocated for a new
// RouterLinkState.
node_link.memory().AllocateRouterLinkState(
[router = WrapRefCounted(this), requestor = WrapRefCounted(&requestor),
node_link = WrapRefCounted(&node_link),
bypass_target_sublink](FragmentRef<RouterLinkState> new_link_state) {
router->BypassPeerWithNewRemoteLink(*requestor, *node_link,
bypass_target_sublink,
std::move(new_link_state));
});
return true;
}
// Begin decaying our outward link.
SequenceNumber length_to_decaying_link;
Ref<RouterLink> new_link;
const SublinkId new_sublink = node_link.memory().AllocateSublinkIds(1);
{
absl::ReleasableMutexLock lock(&mutex_);
if (!outward_edge_.primary_link() || is_disconnected_) {
// We've been disconnected since leaving the above block. Don't bother
// to request a bypass. This is not the requestor's fault, so it's not
// treated as an error.
return true;
}
if (!outward_edge_.BeginPrimaryLinkDecay()) {
DLOG(ERROR) << "Rejecting BypassPeer on failure to decay link";
return false;
}
length_to_decaying_link = outbound_parcels_.current_sequence_number();
outward_edge_.set_length_to_decaying_link(length_to_decaying_link);
new_link = node_link.AddRemoteRouterLink(new_sublink, new_link_state,
LinkType::kCentral, LinkSide::kA,
WrapRefCounted(this));
}
if (!new_link) {
// The NodeLink was disconnected before we could create a new link for
// this Router. This is not the requestor's fault, so it's not treated as
// an error.
AcceptRouteDisconnectedFrom(LinkType::kCentral);
return true;
}
const NodeName proxy_node_name = requestor.node_link()->remote_node_name();
DVLOG(4) << "Sending AcceptBypassLink from "
<< node_link.local_node_name().ToString() << " to "
<< node_link.remote_node_name().ToString() << " with new sublink "
<< new_sublink << " to replace a link to proxy "
<< proxy_node_name.ToString() << " via sublink "
<< bypass_target_sublink;
node_link.AcceptBypassLink(proxy_node_name, bypass_target_sublink,
length_to_decaying_link, new_sublink,
std::move(new_link_state));
// NOTE: This link is intentionally set *after* transmitting the
// above message. Otherwise the router might race on another thread to send
// messages via `new_sublink`, and the remote node would have no idea where
// to route them.
SetOutwardLink(std::move(new_link));
return true;
}
bool Router::BypassPeerWithNewLocalLink(RemoteRouterLink& requestor,
SublinkId bypass_target_sublink) {
NodeLink& from_node_link = *requestor.node_link();
const Ref<Router> new_local_peer =
from_node_link.GetRouter(bypass_target_sublink);
if (!new_local_peer) {
// The peer may have already been destroyed or disconnected from the proxy
// by the time we get here.
AcceptRouteDisconnectedFrom(LinkType::kPeripheralOutward);
return true;
}
Ref<RouterLink> link_from_new_local_peer_to_proxy;
SequenceNumber length_to_proxy_from_us;
SequenceNumber length_from_proxy_to_us;
{
MultiMutexLock lock(&mutex_, &new_local_peer->mutex_);
length_from_proxy_to_us =
new_local_peer->outbound_parcels_.current_sequence_number();
length_to_proxy_from_us = outbound_parcels_.current_sequence_number();
DVLOG(4) << "Proxy bypass requested with new local peer on "
<< from_node_link.local_node_name().ToString() << " and proxy on "
<< from_node_link.remote_node_name().ToString() << " via sublinks "
<< bypass_target_sublink << " and " << requestor.sublink()
<< "; length to the proxy is " << length_to_proxy_from_us
<< " and length from the proxy " << length_from_proxy_to_us;
link_from_new_local_peer_to_proxy =
new_local_peer->outward_edge_.primary_link();
if (!outward_edge_.primary_link() || !link_from_new_local_peer_to_proxy ||
is_disconnected_ || new_local_peer->is_disconnected_) {
return true;
}
// Otherwise immediately begin decay of both links to the proxy.
if (!outward_edge_.BeginPrimaryLinkDecay() ||
!new_local_peer->outward_edge_.BeginPrimaryLinkDecay()) {
DLOG(ERROR) << "Rejecting RequestProxyBypass on failure to decay link";
return false;
}
outward_edge_.set_length_to_decaying_link(length_to_proxy_from_us);
outward_edge_.set_length_from_decaying_link(length_from_proxy_to_us);
new_local_peer->outward_edge_.set_length_to_decaying_link(
length_from_proxy_to_us);
new_local_peer->outward_edge_.set_length_from_decaying_link(
length_to_proxy_from_us);
// Finally, link the two routers with a new LocalRouterLink. This link will
// remain unstable until the decaying proxy links are gone.
RouterLink::Pair links = LocalRouterLink::CreatePair(
LinkType::kCentral, Router::Pair(WrapRefCounted(this), new_local_peer));
outward_edge_.SetPrimaryLink(std::move(links.first));
new_local_peer->outward_edge_.SetPrimaryLink(std::move(links.second));
}
link_from_new_local_peer_to_proxy->StopProxying(length_from_proxy_to_us,
length_to_proxy_from_us);
Flush();
new_local_peer->Flush();
return true;
}
} // namespace ipcz