blob: a84f055396e8093c3d5dc7862e2b6c9b57fd200e [file] [log] [blame]
// Copyright 2022 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "ipcz/router.h"
#include <algorithm>
#include <cstddef>
#include <cstring>
#include <optional>
#include <utility>
#include "ipcz/ipcz.h"
#include "ipcz/local_router_link.h"
#include "ipcz/node_link.h"
#include "ipcz/parcel_wrapper.h"
#include "ipcz/remote_router_link.h"
#include "ipcz/sequence_number.h"
#include "ipcz/trap_event_dispatcher.h"
#include "third_party/abseil-cpp/absl/base/macros.h"
#include "third_party/abseil-cpp/absl/container/inlined_vector.h"
#include "third_party/abseil-cpp/absl/synchronization/mutex.h"
#include "util/log.h"
#include "util/multi_mutex_lock.h"
#include "util/safe_math.h"
namespace ipcz {
namespace {
// Helper structure used to accumulate individual parcel flushing operations
// within Router::Flush(), via CollectParcelsToFlush() below.
struct ParcelToFlush {
// The link over which to flush this parcel.
RouterLink* link;
// The parcel to be flushed.
std::unique_ptr<Parcel> parcel;
};
using ParcelsToFlush = absl::InlinedVector<ParcelToFlush, 8>;
// Helper which attempts to pop elements from `queue` for transmission along
// `edge`. This terminates either when `queue` is exhausted, or the next parcel
// in `queue` is to be transmitted over a link that is not yet known to `edge`.
// Any successfully popped elements are accumulated at the end of `parcels`.
void CollectParcelsToFlush(ParcelQueue& queue,
const RouteEdge& edge,
ParcelsToFlush& parcels) {
RouterLink* decaying_link = edge.decaying_link();
RouterLink* primary_link = edge.primary_link().get();
while (queue.HasNextElement()) {
const SequenceNumber n = queue.current_sequence_number();
RouterLink* link = nullptr;
if (decaying_link && edge.ShouldTransmitOnDecayingLink(n)) {
link = decaying_link;
} else if (primary_link && !edge.ShouldTransmitOnDecayingLink(n)) {
link = primary_link;
} else {
return;
}
ParcelToFlush& parcel = parcels.emplace_back(ParcelToFlush{.link = link});
const bool popped = queue.Pop(parcel.parcel);
ABSL_ASSERT(popped);
}
}
bool ValidateAndAcquireObjectsForTransitFrom(
Router& sender,
absl::Span<const IpczHandle> handles,
std::vector<Ref<APIObject>>& objects) {
objects.resize(handles.size());
for (size_t i = 0; i < handles.size(); ++i) {
auto* object = APIObject::FromHandle(handles[i]);
if (!object || !object->CanSendFrom(sender)) {
return false;
}
objects[i] = WrapRefCounted(object);
}
return true;
}
} // namespace
Router::Router() = default;
Router::~Router() {
// A Router MUST be serialized or closed before it can be destroyed. Both
// operations clear `traps_` and imply that no further traps should be added.
absl::MutexLock lock(&mutex_);
ABSL_ASSERT(traps_.empty());
}
// static
Router::Pair Router::CreatePair() {
Pair routers{MakeRefCounted<Router>(), MakeRefCounted<Router>()};
DVLOG(5) << "Created new portal pair " << routers.first.get() << " and "
<< routers.second.get();
auto links = LocalRouterLink::CreatePair(LinkType::kCentral, routers,
LocalRouterLink::kStable);
routers.first->SetOutwardLink(std::move(links.first));
routers.second->SetOutwardLink(std::move(links.second));
return routers;
}
IpczResult Router::Close() {
CloseRoute();
return IPCZ_RESULT_OK;
}
bool Router::CanSendFrom(Router& sender) {
return &sender != this && !HasLocalPeer(sender);
}
bool Router::IsPeerClosed() {
absl::MutexLock lock(&mutex_);
return (status_flags_ & IPCZ_PORTAL_STATUS_PEER_CLOSED) != 0;
}
bool Router::IsRouteDead() {
absl::MutexLock lock(&mutex_);
return (status_flags_ & IPCZ_PORTAL_STATUS_DEAD) != 0;
}
bool Router::IsOnCentralRemoteLink() {
absl::MutexLock lock(&mutex_);
// This may only be called on terminal Routers.
ABSL_ASSERT(!inward_edge_);
return outward_edge_.primary_link() && outward_edge_.is_stable() &&
outward_edge_.primary_link()->GetType().is_central() &&
!outward_edge_.primary_link()->GetLocalPeer();
}
void Router::QueryStatus(IpczPortalStatus& status) {
absl::MutexLock lock(&mutex_);
status.size = std::min(status.size, sizeof(IpczPortalStatus));
status.flags = status_flags_;
status.num_local_parcels = inbound_parcels_.GetNumAvailableElements();
status.num_local_bytes = inbound_parcels_.GetTotalAvailableElementSize();
}
bool Router::HasLocalPeer(Router& router) {
absl::MutexLock lock(&mutex_);
return outward_edge_.GetLocalPeer() == &router;
}
std::unique_ptr<Parcel> Router::AllocateOutboundParcel(size_t num_bytes,
bool allow_partial) {
Ref<RouterLink> outward_link;
{
absl::MutexLock lock(&mutex_);
outward_link = outward_edge_.primary_link();
}
auto parcel = std::make_unique<Parcel>();
if (outward_link) {
outward_link->AllocateParcelData(num_bytes, allow_partial, *parcel);
} else {
parcel->AllocateData(num_bytes, allow_partial, nullptr);
}
return parcel;
}
IpczResult Router::SendOutboundParcel(std::unique_ptr<Parcel> parcel) {
Ref<RouterLink> link;
{
absl::MutexLock lock(&mutex_);
if (inbound_parcels_.final_sequence_length()) {
// If the inbound sequence is finalized, the peer portal must be gone.
return IPCZ_RESULT_NOT_FOUND;
}
const SequenceNumber sequence_number =
outbound_parcels_.GetCurrentSequenceLength();
parcel->set_sequence_number(sequence_number);
if (outward_edge_.primary_link() &&
outbound_parcels_.SkipElement(sequence_number)) {
link = outward_edge_.primary_link();
} else {
// If there are no unsent parcels ahead of this one in the outbound
// sequence, and we have an active outward link, we can immediately
// transmit the parcel without any intermediate queueing step. That is the
// most common case, but otherwise we have to queue the parcel here and it
// will be flushed out ASAP.
DVLOG(4) << "Queuing outbound " << parcel->Describe();
const bool push_ok =
outbound_parcels_.Push(sequence_number, std::move(parcel));
ABSL_ASSERT(push_ok);
}
}
if (link) {
// NOTE: This cannot be a use-after-move because `link` is always null in
// the case where `parcel` is moved above.
link->AcceptParcel(std::move(parcel));
} else {
Flush();
}
return IPCZ_RESULT_OK;
}
void Router::CloseRoute() {
TrapEventDispatcher dispatcher;
Ref<RouterLink> link;
{
absl::MutexLock lock(&mutex_);
outbound_parcels_.SetFinalSequenceLength(
outbound_parcels_.GetCurrentSequenceLength());
traps_.RemoveAll(dispatcher);
}
Flush();
}
void Router::SetOutwardLink(Ref<RouterLink> link) {
ABSL_ASSERT(link);
{
absl::MutexLock lock(&mutex_);
// If we have a stable inward edge (or none at all), and the outward edge
// is stable too, our new link can be marked stable from our side.
if (link->GetType().is_central() && outward_edge_.is_stable() &&
(!inward_edge_ || inward_edge_->is_stable())) {
link->MarkSideStable();
}
if (!is_disconnected_) {
outward_edge_.SetPrimaryLink(std::move(link));
}
}
if (link) {
// If the link wasn't adopted, this Router has already been disconnected.
link->AcceptRouteDisconnected();
link->Deactivate();
return;
}
Flush(kForceProxyBypassAttempt);
}
bool Router::AcceptInboundParcel(std::unique_ptr<Parcel> parcel) {
TrapEventDispatcher dispatcher;
{
absl::MutexLock lock(&mutex_);
const SequenceNumber sequence_number = parcel->sequence_number();
if (!inbound_parcels_.Push(sequence_number, std::move(parcel))) {
// Unexpected route disconnection can cut off inbound sequences, so don't
// treat an out-of-bounds parcel as a validation failure.
return true;
}
if (!inward_edge_) {
// If this is a terminal router, we may have trap events to fire.
if (sequence_number < inbound_parcels_.GetCurrentSequenceLength()) {
// Only notify traps if the new parcel is actually available for
// reading, which may not be the case if some preceding parcels have yet
// to be received.
traps_.NotifyNewLocalParcel(status_flags_, inbound_parcels_,
dispatcher);
}
}
}
Flush();
return true;
}
bool Router::AcceptOutboundParcel(std::unique_ptr<Parcel> parcel) {
{
absl::MutexLock lock(&mutex_);
// Proxied outbound parcels are always queued in a ParcelQueue even if they
// will be forwarded immediately. This allows us to track the full sequence
// of forwarded parcels so we can know with certainty when we're done
// forwarding.
//
// TODO: Using a queue here may increase latency along the route, because it
// it unnecessarily forces in-order forwarding. We could use an unordered
// queue for forwarding, but we'd still need some lighter-weight abstraction
// that tracks complete sequences from potentially fragmented contributions.
const SequenceNumber sequence_number = parcel->sequence_number();
if (!outbound_parcels_.Push(sequence_number, std::move(parcel))) {
// Unexpected route disconnection can cut off outbound sequences, so don't
// treat an out-of-bounds parcel as a validation failure.
return true;
}
}
Flush();
return true;
}
bool Router::AcceptRouteClosureFrom(LinkType link_type,
SequenceNumber sequence_length) {
TrapEventDispatcher dispatcher;
{
absl::MutexLock lock(&mutex_);
if (link_type.is_outward()) {
if (!inbound_parcels_.SetFinalSequenceLength(sequence_length)) {
// Ignore if and only if the sequence was terminated early.
DVLOG(4) << "Discarding inbound route closure notification";
return inbound_parcels_.final_sequence_length().has_value() &&
*inbound_parcels_.final_sequence_length() <= sequence_length;
}
if (!inward_edge_ && !bridge_) {
is_peer_closed_ = true;
if (inbound_parcels_.IsSequenceFullyConsumed()) {
status_flags_ |=
IPCZ_PORTAL_STATUS_PEER_CLOSED | IPCZ_PORTAL_STATUS_DEAD;
}
traps_.NotifyPeerClosed(status_flags_, inbound_parcels_, dispatcher);
}
} else if (link_type.is_peripheral_inward()) {
if (!outbound_parcels_.SetFinalSequenceLength(sequence_length)) {
// Ignore if and only if the sequence was terminated early.
DVLOG(4) << "Discarding outbound route closure notification";
return outbound_parcels_.final_sequence_length().has_value() &&
*outbound_parcels_.final_sequence_length() <= sequence_length;
}
} else if (link_type.is_bridge()) {
if (!outbound_parcels_.SetFinalSequenceLength(sequence_length)) {
return false;
}
bridge_.reset();
}
}
Flush();
return true;
}
bool Router::AcceptRouteDisconnectedFrom(LinkType link_type) {
TrapEventDispatcher dispatcher;
absl::InlinedVector<Ref<RouterLink>, 4> forwarding_links;
{
// If we have to drop any undeliverable parcels, make sure they're destroyed
// outside of `lock` in case they have any attached Routers.
std::vector<std::unique_ptr<Parcel>> dropped_parcels;
absl::MutexLock lock(&mutex_);
DVLOG(4) << "Router " << this << " disconnected from "
<< link_type.ToString() << "link";
is_disconnected_ = true;
if (link_type.is_peripheral_inward()) {
dropped_parcels = outbound_parcels_.ForceTerminateSequence();
} else {
dropped_parcels = inbound_parcels_.ForceTerminateSequence();
}
// Wipe out all remaining links and propagate the disconnection over them.
forwarding_links.push_back(outward_edge_.ReleasePrimaryLink());
forwarding_links.push_back(outward_edge_.ReleaseDecayingLink());
if (inward_edge_) {
forwarding_links.push_back(inward_edge_->ReleasePrimaryLink());
forwarding_links.push_back(inward_edge_->ReleaseDecayingLink());
} else if (bridge_) {
forwarding_links.push_back(bridge_->ReleasePrimaryLink());
forwarding_links.push_back(bridge_->ReleaseDecayingLink());
} else {
// Terminal routers may have trap events to fire.
is_peer_closed_ = true;
if (inbound_parcels_.IsSequenceFullyConsumed()) {
status_flags_ |=
IPCZ_PORTAL_STATUS_PEER_CLOSED | IPCZ_PORTAL_STATUS_DEAD;
}
traps_.NotifyPeerClosed(status_flags_, inbound_parcels_, dispatcher);
}
}
for (const Ref<RouterLink>& link : forwarding_links) {
if (link) {
DVLOG(4) << "Forwarding disconnection over " << link->Describe();
link->AcceptRouteDisconnected();
link->Deactivate();
}
}
Flush();
return true;
}
IpczResult Router::Put(absl::Span<const uint8_t> data,
absl::Span<const IpczHandle> handles) {
std::vector<Ref<APIObject>> objects;
if (!ValidateAndAcquireObjectsForTransitFrom(*this, handles, objects)) {
return IPCZ_RESULT_INVALID_ARGUMENT;
}
if (IsPeerClosed()) {
return IPCZ_RESULT_NOT_FOUND;
}
std::unique_ptr<Parcel> parcel =
AllocateOutboundParcel(data.size(), /*allow_partial=*/false);
if (!data.empty()) {
memcpy(parcel->data_view().data(), data.data(), data.size());
}
parcel->CommitData(data.size());
parcel->SetObjects(std::move(objects));
const IpczResult result = SendOutboundParcel(std::move(parcel));
if (result == IPCZ_RESULT_OK) {
// If the parcel was sent, the sender relinquishes handle ownership and
// therefore implicitly releases its ref to each object.
for (IpczHandle handle : handles) {
std::ignore = APIObject::TakeFromHandle(handle);
}
}
return result;
}
IpczResult Router::BeginPut(IpczBeginPutFlags flags,
volatile void** data,
size_t* num_bytes,
IpczTransaction* transaction) {
const bool allow_partial = (flags & IPCZ_BEGIN_PUT_ALLOW_PARTIAL) != 0;
if (IsPeerClosed()) {
return IPCZ_RESULT_NOT_FOUND;
}
const size_t num_bytes_to_request = num_bytes ? *num_bytes : 0;
std::unique_ptr<Parcel> parcel =
AllocateOutboundParcel(num_bytes_to_request, allow_partial);
if (num_bytes) {
*num_bytes = parcel->data_size();
}
if (data) {
*data = parcel->data_view().data();
}
if (!pending_puts_) {
pending_puts_ = std::make_unique<PendingTransactionSet>();
}
*transaction = pending_puts_->Add(std::move(parcel));
return IPCZ_RESULT_OK;
}
IpczResult Router::EndPut(IpczTransaction transaction,
size_t num_bytes_produced,
absl::Span<const IpczHandle> handles,
IpczEndPutFlags flags) {
const bool aborted = flags & IPCZ_END_PUT_ABORT;
std::vector<Ref<APIObject>> objects;
if (!aborted &&
!ValidateAndAcquireObjectsForTransitFrom(*this, handles, objects)) {
return IPCZ_RESULT_INVALID_ARGUMENT;
}
if (!pending_puts_) {
return IPCZ_RESULT_INVALID_ARGUMENT;
}
std::unique_ptr<Parcel> parcel;
if (aborted) {
parcel = pending_puts_->FinalizeForPut(transaction, 0);
} else {
parcel = pending_puts_->FinalizeForPut(transaction, num_bytes_produced);
}
if (!parcel) {
return IPCZ_RESULT_INVALID_ARGUMENT;
}
if (aborted) {
return IPCZ_RESULT_OK;
}
parcel->CommitData(num_bytes_produced);
parcel->SetObjects(std::move(objects));
IpczResult result = SendOutboundParcel(std::move(parcel));
if (result == IPCZ_RESULT_OK) {
// If the parcel was sent, the sender relinquishes handle ownership and
// therefore implicitly releases its ref to each object.
for (IpczHandle handle : handles) {
APIObject::TakeFromHandle(handle);
}
}
return result;
}
IpczResult Router::Get(IpczGetFlags flags,
void* data,
size_t* num_bytes,
IpczHandle* handles,
size_t* num_handles,
IpczHandle* parcel) {
TrapEventDispatcher dispatcher;
std::unique_ptr<Parcel> consumed_parcel;
{
absl::MutexLock lock(&mutex_);
if (inbound_parcels_.IsSequenceFullyConsumed()) {
return IPCZ_RESULT_NOT_FOUND;
}
if (!inbound_parcels_.HasNextElement()) {
return IPCZ_RESULT_UNAVAILABLE;
}
std::unique_ptr<Parcel>& p = inbound_parcels_.NextElement();
const bool allow_partial = (flags & IPCZ_GET_PARTIAL) != 0;
const size_t data_capacity = num_bytes ? *num_bytes : 0;
const size_t handles_capacity = num_handles ? *num_handles : 0;
if ((data_capacity && !data) || (handles_capacity && !handles)) {
return IPCZ_RESULT_INVALID_ARGUMENT;
}
if (pending_gets_ && !pending_gets_->empty() && is_pending_get_exclusive_) {
return IPCZ_RESULT_ALREADY_EXISTS;
}
const size_t data_size = allow_partial
? std::min(p->data_size(), data_capacity)
: p->data_size();
const size_t handles_size =
allow_partial ? std::min(p->num_objects(), handles_capacity)
: p->num_objects();
if (num_bytes) {
*num_bytes = data_size;
}
if (num_handles) {
*num_handles = handles_size;
}
const bool consuming_whole_parcel =
(data_capacity >= data_size && handles_capacity >= handles_size);
if (!consuming_whole_parcel && !allow_partial) {
return IPCZ_RESULT_RESOURCE_EXHAUSTED;
}
if (data_size > 0) {
memcpy(data, p->data_view().data(), data_size);
}
const bool ok = inbound_parcels_.Pop(consumed_parcel);
ABSL_ASSERT(ok);
consumed_parcel->ConsumeHandles(absl::MakeSpan(handles, handles_size));
if (inbound_parcels_.IsSequenceFullyConsumed()) {
status_flags_ |= IPCZ_PORTAL_STATUS_PEER_CLOSED | IPCZ_PORTAL_STATUS_DEAD;
}
traps_.NotifyLocalParcelConsumed(status_flags_, inbound_parcels_,
dispatcher);
}
if (parcel) {
*parcel = ParcelWrapper::ReleaseAsHandle(
MakeRefCounted<ParcelWrapper>(std::move(consumed_parcel)));
}
return IPCZ_RESULT_OK;
}
IpczResult Router::BeginGet(IpczBeginGetFlags flags,
const volatile void** data,
size_t* num_bytes,
IpczHandle* handles,
size_t* num_handles,
IpczTransaction* transaction) {
TrapEventDispatcher dispatcher;
absl::MutexLock lock(&mutex_);
if (!transaction || inward_edge_) {
return IPCZ_RESULT_INVALID_ARGUMENT;
}
if (num_handles && *num_handles && !handles) {
return IPCZ_RESULT_INVALID_ARGUMENT;
}
const bool overlapped = flags & IPCZ_BEGIN_GET_OVERLAPPED;
const bool allow_partial = flags & IPCZ_BEGIN_GET_PARTIAL;
if (!overlapped && pending_gets_ && !pending_gets_->empty()) {
return IPCZ_RESULT_ALREADY_EXISTS;
}
if (overlapped && is_pending_get_exclusive_) {
return IPCZ_RESULT_ALREADY_EXISTS;
}
if (!inbound_parcels_.HasNextElement()) {
return IPCZ_RESULT_UNAVAILABLE;
}
std::unique_ptr<Parcel>& p = inbound_parcels_.NextElement();
const size_t num_objects = p->num_objects();
const size_t handle_capacity = num_handles ? *num_handles : 0;
const size_t num_handles_to_consume = std::min(handle_capacity, num_objects);
if (num_handles_to_consume < num_objects && !allow_partial) {
if (num_handles) {
*num_handles = num_objects;
}
return IPCZ_RESULT_RESOURCE_EXHAUSTED;
}
if (num_handles) {
*num_handles = num_handles_to_consume;
}
p->ConsumeHandles(absl::MakeSpan(handles, num_handles_to_consume));
if (data) {
*data = p->data_view().data();
}
if (num_bytes) {
*num_bytes = p->data_size();
}
if (!pending_gets_) {
pending_gets_ = std::make_unique<PendingTransactionSet>();
}
if (overlapped) {
*transaction = pending_gets_->Add(TakeNextInboundParcel(dispatcher));
} else {
*transaction = pending_gets_->Add(std::move(p));
is_pending_get_exclusive_ = true;
}
return IPCZ_RESULT_OK;
}
IpczResult Router::EndGet(IpczTransaction transaction,
IpczEndGetFlags flags,
IpczHandle* parcel_handle) {
TrapEventDispatcher dispatcher;
absl::MutexLock lock(&mutex_);
if (!pending_gets_) {
return IPCZ_RESULT_INVALID_ARGUMENT;
}
std::unique_ptr<Parcel> parcel = pending_gets_->FinalizeForGet(transaction);
if (!parcel) {
return IPCZ_RESULT_INVALID_ARGUMENT;
}
const bool aborted = flags & IPCZ_END_GET_ABORT;
if (is_pending_get_exclusive_) {
ABSL_HARDENING_ASSERT(inbound_parcels_.HasNextElement());
ABSL_HARDENING_ASSERT(inbound_parcels_.current_sequence_number() ==
parcel->sequence_number());
inbound_parcels_.NextElement() = std::move(parcel);
if (!aborted) {
parcel = TakeNextInboundParcel(dispatcher);
}
is_pending_get_exclusive_ = false;
}
if (!aborted && parcel_handle) {
*parcel_handle = APIObject::ReleaseAsHandle(
MakeRefCounted<ParcelWrapper>(std::move(parcel)));
}
return IPCZ_RESULT_OK;
}
IpczResult Router::Trap(const IpczTrapConditions& conditions,
IpczTrapEventHandler handler,
uint64_t context,
IpczTrapConditionFlags* satisfied_condition_flags,
IpczPortalStatus* status) {
absl::MutexLock lock(&mutex_);
return traps_.Add(conditions, handler, context, status_flags_,
inbound_parcels_, satisfied_condition_flags, status);
}
IpczResult Router::MergeRoute(const Ref<Router>& other) {
if (HasLocalPeer(*other) || other == this) {
return IPCZ_RESULT_INVALID_ARGUMENT;
}
{
MultiMutexLock lock(&mutex_, &other->mutex_);
if (inward_edge_ || other->inward_edge_ || bridge_ || other->bridge_) {
// It's not legal to call this on non-terminal routers.
return IPCZ_RESULT_INVALID_ARGUMENT;
}
if (inbound_parcels_.current_sequence_number() > SequenceNumber(0) ||
outbound_parcels_.GetCurrentSequenceLength() > SequenceNumber(0) ||
other->inbound_parcels_.current_sequence_number() > SequenceNumber(0) ||
other->outbound_parcels_.GetCurrentSequenceLength() >
SequenceNumber(0)) {
// It's not legal to call this on a router which has transmitted outbound
// parcels to its peer or retrieved inbound parcels from its queue.
return IPCZ_RESULT_FAILED_PRECONDITION;
}
bridge_ = std::make_unique<RouteEdge>();
other->bridge_ = std::make_unique<RouteEdge>();
RouterLink::Pair links = LocalRouterLink::CreatePair(
LinkType::kBridge, Router::Pair(WrapRefCounted(this), other));
bridge_->SetPrimaryLink(std::move(links.first));
other->bridge_->SetPrimaryLink(std::move(links.second));
}
Flush();
return IPCZ_RESULT_OK;
}
// static
Ref<Router> Router::Deserialize(const RouterDescriptor& descriptor,
NodeLink& from_node_link,
SublinkId receiving_sublink) {
std::optional<SublinkId> new_decaying_sublink;
if (descriptor.proxy_already_bypassed) {
new_decaying_sublink = descriptor.new_decaying_sublink;
}
if (descriptor.new_sublink == receiving_sublink ||
descriptor.new_sublink == new_decaying_sublink ||
new_decaying_sublink == receiving_sublink) {
// New sublink IDs must be unique among each other, and must not identify
// the new Router as its own recipient.
return nullptr;
}
auto router = MakeRefCounted<Router>();
Ref<RemoteRouterLink> new_outward_link;
{
absl::MutexLock lock(&router->mutex_);
router->outbound_parcels_.ResetSequence(
descriptor.next_outgoing_sequence_number);
router->inbound_parcels_.ResetSequence(
descriptor.next_incoming_sequence_number);
if (descriptor.peer_closed) {
router->is_peer_closed_ = true;
if (!router->inbound_parcels_.SetFinalSequenceLength(
descriptor.closed_peer_sequence_length)) {
return nullptr;
}
if (router->inbound_parcels_.IsSequenceFullyConsumed()) {
router->status_flags_ |=
IPCZ_PORTAL_STATUS_PEER_CLOSED | IPCZ_PORTAL_STATUS_DEAD;
}
}
if (new_decaying_sublink) {
// When split from a local peer, our remote counterpart (our remote peer's
// former local peer) will use this link to forward parcels it already
// received from our peer. This link decays like any other decaying link
// once its usefulness has expired.
//
// The sequence length toward this link is the current outbound sequence
// length, which is to say, we will not be sending any parcels that way.
// The sequence length from the link is whatever had already been sent
// to our counterpart back on the peer's node.
Ref<RemoteRouterLink> new_decaying_link =
from_node_link.AddRemoteRouterLink(*new_decaying_sublink, nullptr,
LinkType::kPeripheralOutward,
LinkSide::kB, router);
if (!new_decaying_link) {
return nullptr;
}
router->outward_edge_.SetPrimaryLink(std::move(new_decaying_link));
router->outward_edge_.BeginPrimaryLinkDecay();
router->outward_edge_.set_length_to_decaying_link(
router->outbound_parcels_.current_sequence_number());
router->outward_edge_.set_length_from_decaying_link(
descriptor.decaying_incoming_sequence_length > SequenceNumber(0)
? descriptor.decaying_incoming_sequence_length
: descriptor.next_incoming_sequence_number);
auto link_state =
from_node_link.memory().AdoptFragmentRefIfValid<RouterLinkState>(
descriptor.new_link_state_fragment);
if (link_state.is_null()) {
// Central links require a valid link state fragment.
return nullptr;
}
new_outward_link = from_node_link.AddRemoteRouterLink(
descriptor.new_sublink, std::move(link_state), LinkType::kCentral,
LinkSide::kB, router);
if (!new_outward_link) {
return nullptr;
}
router->outward_edge_.SetPrimaryLink(new_outward_link);
DVLOG(4) << "Route extended from "
<< from_node_link.remote_node_name().ToString() << " to "
<< from_node_link.local_node_name().ToString() << " via sublink "
<< descriptor.new_sublink << " and decaying sublink "
<< *new_decaying_sublink;
} else {
if (!descriptor.new_link_state_fragment.is_null()) {
// No RouterLinkState fragment should be provided for this new
// peripheral link.
return nullptr;
}
new_outward_link = from_node_link.AddRemoteRouterLink(
descriptor.new_sublink, nullptr, LinkType::kPeripheralOutward,
LinkSide::kB, router);
if (new_outward_link) {
router->outward_edge_.SetPrimaryLink(new_outward_link);
DVLOG(4) << "Route extended from "
<< from_node_link.remote_node_name().ToString() << " to "
<< from_node_link.local_node_name().ToString()
<< " via sublink " << descriptor.new_sublink;
}
}
}
if (!new_outward_link) {
// The new portal is DOA, either because the associated NodeLink is dead, or
// or the sublink ID was already in use. The latter implies a bug or bad
// behavior, but it should be harmless to ignore beyond this point.
DVLOG(4) << "Disconnected new Router immediately after deserialization";
router->AcceptRouteDisconnectedFrom(LinkType::kPeripheralOutward);
} else if (descriptor.proxy_peer_node_name.is_valid()) {
// The source router rolled some peer bypass details into our descriptor to
// avoid some IPC overhead. We can begin bypassing the proxy now.
ABSL_ASSERT(new_outward_link);
router->BypassPeer(*new_outward_link, descriptor.proxy_peer_node_name,
descriptor.proxy_peer_sublink);
}
router->Flush(kForceProxyBypassAttempt);
return router;
}
void Router::SerializeNewRouter(NodeLink& to_node_link,
RouterDescriptor& descriptor) {
TrapEventDispatcher dispatcher;
Ref<Router> local_peer;
bool initiate_proxy_bypass = false;
{
absl::MutexLock lock(&mutex_);
traps_.RemoveAll(dispatcher);
local_peer = outward_edge_.GetLocalPeer();
initiate_proxy_bypass = outward_edge_.primary_link() &&
outward_edge_.primary_link()->TryLockForBypass(
to_node_link.remote_node_name());
}
if (local_peer && initiate_proxy_bypass &&
SerializeNewRouterWithLocalPeer(to_node_link, descriptor, local_peer)) {
local_peer->Flush(kForceProxyBypassAttempt);
return;
}
SerializeNewRouterAndConfigureProxy(to_node_link, descriptor,
initiate_proxy_bypass);
}
bool Router::SerializeNewRouterWithLocalPeer(NodeLink& to_node_link,
RouterDescriptor& descriptor,
Ref<Router> local_peer) {
MultiMutexLock lock(&mutex_, &local_peer->mutex_);
if (local_peer->outward_edge_.GetLocalPeer() != this) {
// If the peer was closed, its link to us may already be invalidated.
return false;
}
FragmentRef<RouterLinkState> new_link_state =
to_node_link.memory().TryAllocateRouterLinkState();
if (!new_link_state.is_addressable()) {
// If we couldn't allocate a RouterLinkState for a new central link, then
// we can't replace the central link yet. Fall back to the proxying case.
return false;
}
const SequenceNumber proxy_inbound_sequence_length =
local_peer->outbound_parcels_.current_sequence_number();
// The local peer no longer needs its link to us. We'll give it a new
// outward link in BeginProxyingToNewRouter() after this descriptor is
// transmitted.
local_peer->outward_edge_.ReleasePrimaryLink();
// The primary new sublink to the destination node will act as the route's
// new central link between our local peer and the new remote router.
//
// An additional sublink is allocated to act as a decaying inward link from
// this router to the new one, so we can forward any inbound parcels that have
// already been queued here.
const SublinkId new_sublink = to_node_link.memory().AllocateSublinkIds(2);
const SublinkId decaying_sublink = SublinkId(new_sublink.value() + 1);
// Register the new routes on the NodeLink. Note that we don't provide them to
// any routers yet since we don't want the routers using them until this
// descriptor is transmitted to its destination node. The links will be
// adopted after transmission in BeginProxyingToNewRouter().
Ref<RouterLink> new_link = to_node_link.AddRemoteRouterLink(
new_sublink, new_link_state, LinkType::kCentral, LinkSide::kA,
local_peer);
to_node_link.AddRemoteRouterLink(decaying_sublink, nullptr,
LinkType::kPeripheralInward, LinkSide::kA,
WrapRefCounted(this));
descriptor.new_sublink = new_sublink;
descriptor.new_link_state_fragment = new_link_state.release().descriptor();
descriptor.new_decaying_sublink = decaying_sublink;
descriptor.proxy_already_bypassed = true;
descriptor.next_outgoing_sequence_number =
outbound_parcels_.GetCurrentSequenceLength();
descriptor.next_incoming_sequence_number =
inbound_parcels_.current_sequence_number();
descriptor.decaying_incoming_sequence_length = proxy_inbound_sequence_length;
DVLOG(4) << "Splitting local pair to move router with outbound sequence "
<< "length " << descriptor.next_outgoing_sequence_number
<< " and current inbound sequence number "
<< descriptor.next_incoming_sequence_number;
if (inbound_parcels_.final_sequence_length()) {
descriptor.peer_closed = true;
descriptor.closed_peer_sequence_length =
*inbound_parcels_.final_sequence_length();
}
// Initialize an inward edge that will immediately begin decaying once it has
// a link (established in BeginProxyingToNewRouter()).
inward_edge_ = std::make_unique<RouteEdge>();
inward_edge_->BeginPrimaryLinkDecay();
inward_edge_->set_length_to_decaying_link(proxy_inbound_sequence_length);
inward_edge_->set_length_from_decaying_link(
outbound_parcels_.GetCurrentSequenceLength());
return true;
}
void Router::SerializeNewRouterAndConfigureProxy(
NodeLink& to_node_link,
RouterDescriptor& descriptor,
bool initiate_proxy_bypass) {
const SublinkId new_sublink = to_node_link.memory().AllocateSublinkIds(1);
absl::MutexLock lock(&mutex_);
descriptor.new_sublink = new_sublink;
descriptor.new_link_state_fragment = FragmentDescriptor();
descriptor.proxy_already_bypassed = false;
descriptor.next_outgoing_sequence_number =
outbound_parcels_.GetCurrentSequenceLength();
descriptor.next_incoming_sequence_number =
inbound_parcels_.current_sequence_number();
// Initialize an inward edge but with no link yet. This ensures that we
// don't look like a terminal router while waiting for a link to be set,
// which can only happen after `descriptor` is transmitted.
inward_edge_ = std::make_unique<RouteEdge>();
if (is_peer_closed_) {
descriptor.peer_closed = true;
descriptor.closed_peer_sequence_length =
*inbound_parcels_.final_sequence_length();
// Ensure that the new edge decays its link as soon as it has one, since
// we know the link will not be used.
inward_edge_->BeginPrimaryLinkDecay();
inward_edge_->set_length_to_decaying_link(
*inbound_parcels_.final_sequence_length());
inward_edge_->set_length_from_decaying_link(
outbound_parcels_.current_sequence_number());
} else if (initiate_proxy_bypass && outward_edge_.primary_link()) {
RemoteRouterLink* remote_link =
outward_edge_.primary_link()->AsRemoteRouterLink();
if (remote_link) {
descriptor.proxy_peer_node_name =
remote_link->node_link()->remote_node_name();
descriptor.proxy_peer_sublink = remote_link->sublink();
DVLOG(4) << "Will initiate proxy bypass immediately on deserialization "
<< "with peer at " << descriptor.proxy_peer_node_name.ToString()
<< " and peer route to proxy on sublink "
<< descriptor.proxy_peer_sublink;
inward_edge_->BeginPrimaryLinkDecay();
outward_edge_.BeginPrimaryLinkDecay();
} else {
// The link was locked in anticipation of initiating a proxy bypass, but
// that's no longer going to happen.
outward_edge_.primary_link()->Unlock();
}
}
// Once `descriptor` is transmitted to the destination node and the new
// Router is created there, it may immediately begin transmitting messages
// back to this node regarding `new_sublink`. We establish a new
// RemoteRouterLink now and register it to `new_sublink` on `to_node_link`,
// so that any such incoming messages are routed to `this`.
//
// NOTE: We do not yet provide `this` itself with a reference to the new
// RemoteRouterLink, because it's not yet safe for us to send messages to
// the remote node regarding `new_sublink`. `descriptor` must be transmitted
// first.
Ref<RemoteRouterLink> new_link = to_node_link.AddRemoteRouterLink(
new_sublink, nullptr, LinkType::kPeripheralInward, LinkSide::kA,
WrapRefCounted(this));
DVLOG(4) << "Router " << this << " extending route with tentative new "
<< new_link->Describe();
}
void Router::BeginProxyingToNewRouter(NodeLink& to_node_link,
const RouterDescriptor& descriptor) {
Ref<RouterLink> peer_link;
Ref<Router> local_peer;
// Acquire references to RemoteRouterLink(s) created by an earlier call to
// SerializeNewRouter(). If the NodeLink has already been disconnected, these
// may be null.
auto new_sublink = to_node_link.GetSublink(descriptor.new_sublink);
auto new_decaying_sublink =
to_node_link.GetSublink(descriptor.new_decaying_sublink);
if (!new_sublink) {
AcceptRouteDisconnectedFrom(LinkType::kPeripheralInward);
Flush(kForceProxyBypassAttempt);
return;
}
Ref<RemoteRouterLink> new_primary_link = new_sublink->router_link;
Ref<RemoteRouterLink> new_decaying_link;
{
absl::MutexLock lock(&mutex_);
ABSL_ASSERT(inward_edge_);
if (descriptor.proxy_already_bypassed) {
peer_link = outward_edge_.ReleasePrimaryLink();
local_peer = peer_link ? peer_link->GetLocalPeer() : nullptr;
new_decaying_link =
new_decaying_sublink ? new_decaying_sublink->router_link : nullptr;
}
if (local_peer && new_decaying_link && !is_disconnected_) {
// We've already bypassed this router. Use the new decaying link for our
// inward edge in case we need to forward parcels to the new router. The
// new primary link will be adopted by our peer further below.
inward_edge_->SetPrimaryLink(std::move(new_decaying_link));
} else if (!outbound_parcels_.final_sequence_length() &&
!new_decaying_link && !is_disconnected_) {
DVLOG(4) << "Router " << this << " will proxy to new router over "
<< new_primary_link->Describe();
inward_edge_->SetPrimaryLink(std::move(new_primary_link));
Ref<RouterLink> outward_link = outward_edge_.primary_link();
if (outward_link && outward_edge_.is_stable() &&
inward_edge_->is_stable()) {
outward_link->MarkSideStable();
}
}
}
if (local_peer && new_primary_link && !new_decaying_link) {
// If we have a `local_peer` and no decaying link, this means the decaying
// link was successfully adopted for our own inward edge; and the primary
// link is therefore meant to serve as our local peer's new outward link
// directly to the new remote router.
local_peer->SetOutwardLink(std::move(new_primary_link));
}
// New links were not adopted, implying that the new router has already been
// closed or disconnected.
if (new_primary_link) {
DVLOG(4) << "Dropping link to new router " << new_primary_link->Describe();
new_primary_link->AcceptRouteDisconnected();
new_primary_link->Deactivate();
}
if (new_decaying_link) {
DVLOG(4) << "Dropping link to new router " << new_decaying_link->Describe();
new_decaying_link->AcceptRouteDisconnected();
new_decaying_link->Deactivate();
}
// We may have inbound parcels queued which need to be forwarded to the new
// Router, so give them a chance to be flushed out.
Flush(kForceProxyBypassAttempt);
if (local_peer) {
local_peer->Flush(kForceProxyBypassAttempt);
}
}
bool Router::BypassPeer(RemoteRouterLink& requestor,
const NodeName& bypass_target_node,
SublinkId bypass_target_sublink) {
NodeLink& from_node_link = *requestor.node_link();
// Validate that the source of this request is actually our peripheral outward
// peer, and that we are therefore its inward peer.
{
absl::MutexLock lock(&mutex_);
const Ref<RouterLink>& outward_link = outward_edge_.primary_link();
if (!outward_link) {
// This Router may have been disconnected already due to some other
// failure along the route. This is not the fault of the requestor, so we
// silently ignore the request.
return true;
}
if (outward_link != &requestor) {
DLOG(ERROR) << "Rejecting BypassPeer received on " << requestor.Describe()
<< " with existing "
<< outward_edge_.primary_link()->Describe();
return false;
}
}
// There are two distinct cases to handle. The first case here is when the
// proxy's outward peer lives on a different node from us.
if (bypass_target_node != from_node_link.local_node_name()) {
Ref<NodeLink> link_to_bypass_target =
from_node_link.node()->GetLink(bypass_target_node);
if (link_to_bypass_target) {
return BypassPeerWithNewRemoteLink(
requestor, *link_to_bypass_target, bypass_target_sublink,
link_to_bypass_target->memory().TryAllocateRouterLinkState());
}
// We need to establish a link to the target node before we can proceed.
from_node_link.node()->EstablishLink(
bypass_target_node,
[router = WrapRefCounted(this), requestor = WrapRefCounted(&requestor),
bypass_target_sublink](NodeLink* link_to_bypass_target) {
if (!link_to_bypass_target) {
DLOG(ERROR) << "Disconnecting Router due to failed introduction";
router->AcceptRouteDisconnectedFrom(LinkType::kPeripheralOutward);
return;
}
router->BypassPeerWithNewRemoteLink(
*requestor, *link_to_bypass_target, bypass_target_sublink,
link_to_bypass_target->memory().TryAllocateRouterLinkState());
});
return true;
}
// The second case is when the proxy's outward peer lives on our own node.
return BypassPeerWithNewLocalLink(requestor, bypass_target_sublink);
}
bool Router::AcceptBypassLink(
NodeLink& new_node_link,
SublinkId new_sublink,
FragmentRef<RouterLinkState> new_link_state,
SequenceNumber inbound_sequence_length_from_bypassed_link) {
SequenceNumber length_to_proxy_from_us;
Ref<RemoteRouterLink> old_link;
Ref<RemoteRouterLink> new_link;
{
absl::ReleasableMutexLock lock(&mutex_);
if (is_disconnected_ || !outward_edge_.primary_link()) {
// We've already been unexpectedly disconnected from the proxy, so the
// route is dysfunctional. Don't establish new links.
DVLOG(4) << "Discarding proxy bypass link due to peer disconnection";
return true;
}
old_link =
WrapRefCounted(outward_edge_.primary_link()->AsRemoteRouterLink());
if (!old_link) {
// It only makes sense to receive this at a router whose outward link is
// remote. If we have a non-remote outward link, something is wrong.
DVLOG(4) << "Rejecting unexpected bypass link";
return false;
}
if (old_link->node_link() != &new_node_link &&
!old_link->CanNodeRequestBypass(new_node_link.remote_node_name())) {
// The new link must either go to the same node as the old link, or the
// the old link must have been expecting a bypass link to the new node.
DLOG(ERROR) << "Rejecting unauthorized BypassProxy";
return false;
}
length_to_proxy_from_us = outbound_parcels_.current_sequence_number();
if (!outward_edge_.BeginPrimaryLinkDecay()) {
DLOG(ERROR) << "Rejecting BypassProxy on failure to decay link";
return false;
}
// By convention the initiator of a bypass assumes side A of the bypass
// link, so we assume side B.
new_link = new_node_link.AddRemoteRouterLink(
new_sublink, std::move(new_link_state), LinkType::kCentral,
LinkSide::kB, WrapRefCounted(this));
if (new_link) {
DVLOG(4) << "Bypassing proxy on other end of " << old_link->Describe()
<< " using a new " << new_link->Describe()
<< " with length to proxy " << length_to_proxy_from_us
<< " and length from proxy "
<< inbound_sequence_length_from_bypassed_link;
outward_edge_.set_length_to_decaying_link(length_to_proxy_from_us);
outward_edge_.set_length_from_decaying_link(
inbound_sequence_length_from_bypassed_link);
outward_edge_.SetPrimaryLink(new_link);
}
}
if (!new_link) {
AcceptRouteDisconnectedFrom(LinkType::kCentral);
return true;
}
if (new_link->node_link() == old_link->node_link()) {
// If the new link goes to the same place as the old link, we only need
// to tell the proxy there to stop proxying. It has already conspired with
// its local outward peer.
old_link->StopProxyingToLocalPeer(length_to_proxy_from_us);
} else {
// Otherwise, tell the proxy to stop proxying and let its inward peer (our
// new outward peer) know that the proxy will stop.
old_link->StopProxying(length_to_proxy_from_us,
inbound_sequence_length_from_bypassed_link);
new_link->ProxyWillStop(length_to_proxy_from_us);
}
Flush();
return true;
}
bool Router::StopProxying(SequenceNumber inbound_sequence_length,
SequenceNumber outbound_sequence_length) {
Ref<Router> bridge_peer;
{
absl::MutexLock lock(&mutex_);
if (outward_edge_.is_stable()) {
// Proxies begin decaying their links before requesting to be bypassed,
// and they don't adopt new links after that. So if either edge is stable
// then someone is doing something wrong.
DLOG(ERROR) << "Rejecting StopProxying on invalid or non-proxying Router";
return false;
}
if (bridge_) {
// If we have a bridge link, we also need to update the router on the
// other side of the bridge.
bridge_peer = bridge_->GetDecayingLocalPeer();
if (!bridge_peer) {
return false;
}
} else if (!inward_edge_ || inward_edge_->is_stable()) {
// Not a proxy, so this request is invalid.
return false;
} else {
inward_edge_->set_length_to_decaying_link(inbound_sequence_length);
inward_edge_->set_length_from_decaying_link(outbound_sequence_length);
outward_edge_.set_length_to_decaying_link(outbound_sequence_length);
outward_edge_.set_length_from_decaying_link(inbound_sequence_length);
}
}
if (bridge_peer) {
MultiMutexLock lock(&mutex_, &bridge_peer->mutex_);
if (!bridge_ || bridge_->is_stable() || !bridge_peer->bridge_ ||
bridge_peer->bridge_->is_stable()) {
// The bridge is being or has already been torn down, so there's nothing
// to do here.
return true;
}
bridge_->set_length_to_decaying_link(inbound_sequence_length);
bridge_->set_length_from_decaying_link(outbound_sequence_length);
outward_edge_.set_length_to_decaying_link(outbound_sequence_length);
outward_edge_.set_length_from_decaying_link(inbound_sequence_length);
bridge_peer->bridge_->set_length_to_decaying_link(outbound_sequence_length);
bridge_peer->bridge_->set_length_from_decaying_link(
inbound_sequence_length);
bridge_peer->outward_edge_.set_length_to_decaying_link(
inbound_sequence_length);
bridge_peer->outward_edge_.set_length_from_decaying_link(
outbound_sequence_length);
}
Flush();
if (bridge_peer) {
bridge_peer->Flush();
}
return true;
}
bool Router::NotifyProxyWillStop(SequenceNumber inbound_sequence_length) {
{
absl::MutexLock lock(&mutex_);
if (outward_edge_.is_stable()) {
// If the outward edge is already stable, either this request is invalid,
// or we've lost all links due to disconnection. In the latter case we
// can silently ignore this, but the former case is a validation failure.
return is_disconnected_;
}
DVLOG(4) << "Bypassed proxy will stop forwarding inbound parcels after a "
<< "sequence length of " << inbound_sequence_length;
outward_edge_.set_length_from_decaying_link(inbound_sequence_length);
}
Flush();
return true;
}
bool Router::StopProxyingToLocalPeer(SequenceNumber outbound_sequence_length) {
Ref<Router> local_peer;
Ref<Router> bridge_peer;
{
absl::MutexLock lock(&mutex_);
if (bridge_) {
bridge_peer = bridge_->GetDecayingLocalPeer();
} else if (outward_edge_.decaying_link()) {
local_peer = outward_edge_.decaying_link()->GetLocalPeer();
} else {
// Ignore this request if we've been unexpectedly disconnected.
return is_disconnected_;
}
}
if (local_peer && !bridge_peer) {
// This is the common case, with no bridge link.
MultiMutexLock lock(&mutex_, &local_peer->mutex_);
RouterLink* const our_link = outward_edge_.decaying_link();
RouterLink* const peer_link = local_peer->outward_edge_.decaying_link();
if (!our_link || !peer_link) {
// Either Router may have been unexpectedly disconnected, in which case
// we can ignore this request.
return true;
}
if (!inward_edge_ || our_link->GetLocalPeer() != local_peer ||
peer_link->GetLocalPeer() != this) {
// Consistency check: this must be a proxying router, and both this router
// and its local peer must link to each other.
DLOG(ERROR) << "Rejecting StopProxyingToLocalPeer at invalid proxy";
return false;
}
DVLOG(4) << "Stopping proxy with decaying "
<< inward_edge_->decaying_link()->Describe() << " and decaying "
<< our_link->Describe();
local_peer->outward_edge_.set_length_from_decaying_link(
outbound_sequence_length);
outward_edge_.set_length_to_decaying_link(outbound_sequence_length);
inward_edge_->set_length_from_decaying_link(outbound_sequence_length);
} else if (bridge_peer) {
// When a bridge peer is present we actually have three local routers
// involved: this router, its outward peer, and its bridge peer. Both this
// router and the bridge peer serve as "the" proxy being bypassed in this
// case, so we'll be bypassing both of them below.
{
absl::MutexLock lock(&bridge_peer->mutex_);
if (bridge_peer->outward_edge_.is_stable()) {
return false;
}
local_peer = bridge_peer->outward_edge_.GetDecayingLocalPeer();
if (!local_peer) {
return false;
}
}
MultiMutexLock lock(&mutex_, &local_peer->mutex_, &bridge_peer->mutex_);
if (outward_edge_.is_stable() || local_peer->outward_edge_.is_stable() ||
bridge_peer->outward_edge_.is_stable()) {
return false;
}
local_peer->outward_edge_.set_length_from_decaying_link(
outbound_sequence_length);
outward_edge_.set_length_from_decaying_link(outbound_sequence_length);
bridge_->set_length_to_decaying_link(outbound_sequence_length);
bridge_peer->outward_edge_.set_length_to_decaying_link(
outbound_sequence_length);
bridge_peer->bridge_->set_length_from_decaying_link(
outbound_sequence_length);
} else {
// It's invalid to send call this on a Router with a non-local outward peer
// or bridge link.
DLOG(ERROR) << "Rejecting StopProxyingToLocalPeer with no local peer";
return false;
}
Flush();
local_peer->Flush();
if (bridge_peer) {
bridge_peer->Flush();
}
return true;
}
void Router::NotifyLinkDisconnected(RemoteRouterLink& link) {
{
absl::MutexLock lock(&mutex_);
if (outward_edge_.primary_link() == &link) {
DVLOG(4) << "Primary " << link.Describe() << " disconnected";
outward_edge_.ReleasePrimaryLink();
} else if (outward_edge_.decaying_link() == &link) {
DVLOG(4) << "Decaying " << link.Describe() << " disconnected";
outward_edge_.ReleaseDecayingLink();
} else if (inward_edge_ && inward_edge_->primary_link() == &link) {
DVLOG(4) << "Primary " << link.Describe() << " disconnected";
inward_edge_->ReleasePrimaryLink();
} else if (inward_edge_ && inward_edge_->decaying_link() == &link) {
DVLOG(4) << "Decaying " << link.Describe() << " disconnected";
inward_edge_->ReleaseDecayingLink();
}
}
if (link.GetType().is_outward()) {
AcceptRouteDisconnectedFrom(LinkType::kPeripheralOutward);
} else {
AcceptRouteDisconnectedFrom(LinkType::kPeripheralInward);
}
}
void Router::Flush(FlushBehavior behavior) {
Ref<RouterLink> outward_link;
Ref<RouterLink> inward_link;
Ref<RouterLink> bridge_link;
Ref<RouterLink> decaying_outward_link;
Ref<RouterLink> decaying_inward_link;
Ref<RouterLink> dead_inward_link;
Ref<RouterLink> dead_outward_link;
Ref<RouterLink> dead_bridge_link;
std::optional<SequenceNumber> final_inward_sequence_length;
std::optional<SequenceNumber> final_outward_sequence_length;
bool on_central_link = false;
bool inward_link_decayed = false;
bool outward_link_decayed = false;
bool dropped_last_decaying_link = false;
ParcelsToFlush parcels_to_flush;
TrapEventDispatcher dispatcher;
{
absl::MutexLock lock(&mutex_);
// Acquire stack references to all links we might want to use, so it's safe
// to acquire additional (unmanaged) references per ParcelToFlush.
outward_link = outward_edge_.primary_link();
inward_link = inward_edge_ ? inward_edge_->primary_link() : nullptr;
decaying_outward_link = WrapRefCounted(outward_edge_.decaying_link());
decaying_inward_link =
WrapRefCounted(inward_edge_ ? inward_edge_->decaying_link() : nullptr);
on_central_link = outward_link && outward_link->GetType().is_central();
if (bridge_) {
// Bridges have either a primary link or decaying link, but never both.
bridge_link = bridge_->primary_link()
? bridge_->primary_link()
: WrapRefCounted(bridge_->decaying_link());
}
// Collect any parcels which are safe to transmit now. Note that we do not
// transmit anything or generally call into any RouterLinks while `mutex_`
// is held, because such calls may ultimately re-enter this Router
// (e.g. if a link is a LocalRouterLink, or even a RemoteRouterLink with a
// fully synchronous driver.) Instead we accumulate work within this block,
// and then perform any transmissions or link deactivations after the mutex
// is released further below.
CollectParcelsToFlush(outbound_parcels_, outward_edge_, parcels_to_flush);
const SequenceNumber outbound_sequence_length_sent =
outbound_parcels_.current_sequence_number();
const SequenceNumber inbound_sequence_length_received =
inbound_parcels_.GetCurrentSequenceLength();
if (outward_edge_.MaybeFinishDecay(outbound_sequence_length_sent,
inbound_sequence_length_received)) {
DVLOG(4) << "Outward " << decaying_outward_link->Describe()
<< " fully decayed at " << outbound_sequence_length_sent
<< " sent and " << inbound_sequence_length_received
<< " recived";
outward_link_decayed = true;
}
if (inward_edge_) {
CollectParcelsToFlush(inbound_parcels_, *inward_edge_, parcels_to_flush);
const SequenceNumber inbound_sequence_length_sent =
inbound_parcels_.current_sequence_number();
const SequenceNumber outbound_sequence_length_received =
outbound_parcels_.GetCurrentSequenceLength();
if (inward_edge_->MaybeFinishDecay(inbound_sequence_length_sent,
outbound_sequence_length_received)) {
DVLOG(4) << "Inward " << decaying_inward_link->Describe()
<< " fully decayed at " << inbound_sequence_length_sent
<< " sent and " << outbound_sequence_length_received
<< " received";
inward_link_decayed = true;
}
} else if (bridge_link) {
CollectParcelsToFlush(inbound_parcels_, *bridge_, parcels_to_flush);
}
if (bridge_ && bridge_->MaybeFinishDecay(
inbound_parcels_.current_sequence_number(),
outbound_parcels_.current_sequence_number())) {
bridge_.reset();
}
if (is_peer_closed_ &&
(status_flags_ & IPCZ_PORTAL_STATUS_PEER_CLOSED) == 0 &&
!inbound_parcels_.ExpectsMoreElements()) {
// Set the PEER_CLOSED bit and trigger any relevant traps, if and only if
// the peer is actually closed and there are no more inbound parcels in
// flight towards us.
status_flags_ |= IPCZ_PORTAL_STATUS_PEER_CLOSED;
traps_.NotifyPeerClosed(status_flags_, inbound_parcels_, dispatcher);
}
// If we're dropping the last of our decaying links, our outward link may
// now be stable. This may unblock proxy bypass or other operations.
const bool inward_edge_stable =
!decaying_inward_link || inward_link_decayed;
const bool outward_edge_stable =
outward_link && (!decaying_outward_link || outward_link_decayed);
const bool both_edges_stable = inward_edge_stable && outward_edge_stable;
const bool either_link_decayed =
inward_link_decayed || outward_link_decayed;
if (on_central_link && either_link_decayed && both_edges_stable) {
DVLOG(4) << "Router with fully decayed links may be eligible for bypass "
<< " with outward " << outward_link->Describe();
outward_link->MarkSideStable();
dropped_last_decaying_link = true;
}
if (on_central_link && outbound_parcels_.IsSequenceFullyConsumed() &&
outward_link->TryLockForClosure()) {
// Notify the other end of the route that this end is closed. See the
// AcceptRouteClosure() invocation further below.
final_outward_sequence_length =
*outbound_parcels_.final_sequence_length();
// We also have no more use for either outward or inward links: trivially
// there are no more outbound parcels to send outward, and there no longer
// exists an ultimate destination for any forwarded inbound parcels. So we
// drop both links now.
dead_outward_link = outward_edge_.ReleasePrimaryLink();
} else if (!inbound_parcels_.ExpectsMoreElements()) {
// If the other end of the route is gone and we've received all its
// parcels, we can simply drop the outward link in that case.
dead_outward_link = outward_edge_.ReleasePrimaryLink();
}
if (inbound_parcels_.IsSequenceFullyConsumed()) {
// We won't be receiving anything new from our peer, and if we're a proxy
// then we've also forwarded everything already. We can propagate closure
// inward and drop the inward link, if applicable.
final_inward_sequence_length = inbound_parcels_.final_sequence_length();
if (inward_edge_) {
dead_inward_link = inward_edge_->ReleasePrimaryLink();
} else {
dead_bridge_link = std::move(bridge_link);
bridge_.reset();
}
}
}
for (ParcelToFlush& parcel : parcels_to_flush) {
parcel.link->AcceptParcel(std::move(parcel.parcel));
}
if (outward_link_decayed) {
decaying_outward_link->Deactivate();
}
if (inward_link_decayed) {
decaying_inward_link->Deactivate();
}
// If we have an outward link, and we have no decaying outward link (or our
// decaying outward link has just finished decaying above), we consider the
// the outward link to be stable.
const bool has_stable_outward_link =
outward_link && (!decaying_outward_link || outward_link_decayed);
// If we have no primary inward link, and we have no decaying inward link
// (or our decaying inward link has just finished decaying above), this
// router has no inward-facing links.
const bool has_no_inward_links =
!inward_link && (!decaying_inward_link || inward_link_decayed);
// Bridge bypass is only possible with no inward links and a stable outward
// link.
if (bridge_link && has_stable_outward_link && has_no_inward_links) {
MaybeStartBridgeBypass();
}
if (dead_outward_link) {
if (final_outward_sequence_length) {
dead_outward_link->AcceptRouteClosure(*final_outward_sequence_length);
}
dead_outward_link->Deactivate();
}
if (dead_inward_link) {
if (final_inward_sequence_length) {
dead_inward_link->AcceptRouteClosure(*final_inward_sequence_length);
}
dead_inward_link->Deactivate();
}
if (dead_bridge_link) {
if (final_inward_sequence_length) {
dead_bridge_link->AcceptRouteClosure(*final_inward_sequence_length);
}
}
if (dead_outward_link || !on_central_link) {
// If we're not on a central link, there's no more work to do.
return;
}
if (!dropped_last_decaying_link && behavior != kForceProxyBypassAttempt) {
// No relevant state changes, so there are no new bypass opportunities.
return;
}
if (inward_link && MaybeStartSelfBypass()) {
return;
}
if (outward_link) {
outward_link->FlushOtherSideIfWaiting();
}
}
bool Router::MaybeStartSelfBypass() {
Ref<RemoteRouterLink> remote_inward_link;
Ref<RemoteRouterLink> remote_outward_link;
Ref<Router> local_outward_peer;
{
absl::MutexLock lock(&mutex_);
if (!inward_edge_ || !inward_edge_->primary_link() ||
!inward_edge_->is_stable()) {
// Only a proxy with stable links can be bypassed.
return false;
}
const Ref<RouterLink>& outward_link = outward_edge_.primary_link();
RemoteRouterLink* inward_link =
inward_edge_->primary_link()->AsRemoteRouterLink();
if (!outward_link || !inward_link) {
return false;
}
const NodeName& inward_peer_name =
inward_link->node_link()->remote_node_name();
if (!outward_link->TryLockForBypass(inward_peer_name)) {
DVLOG(4) << "Proxy bypass blocked by busy " << outward_link->Describe();
return false;
}
remote_inward_link = WrapRefCounted(inward_link);
local_outward_peer = outward_link->GetLocalPeer();
if (!local_outward_peer) {
remote_outward_link = WrapRefCounted(outward_link->AsRemoteRouterLink());
}
}
if (remote_outward_link) {
// The simpler case here: our outward peer is on another node, so we begin
// decaying our inward and outward links and ask the inward peer to bypass
// us ASAP.
{
absl::MutexLock lock(&mutex_);
if (!inward_edge_ || !inward_edge_->primary_link() ||
!outward_edge_.primary_link()) {
// We've been disconnected since leaving the block above. Nothing to do.
return false;
}
outward_edge_.BeginPrimaryLinkDecay();
inward_edge_->BeginPrimaryLinkDecay();
}
DVLOG(4) << "Proxy sending bypass request to inward peer over "
<< remote_inward_link->Describe()
<< " targeting outward peer on other side of "
<< remote_outward_link->Describe();
remote_inward_link->BypassPeer(
remote_outward_link->node_link()->remote_node_name(),
remote_outward_link->sublink());
return true;
}
// When the bypass target is local to the same node as this router, we can
// establish the bypass link immediately and send it to the remote inward
// peer.
return StartSelfBypassToLocalPeer(
*local_outward_peer, *remote_inward_link,
remote_inward_link->node_link()->memory().TryAllocateRouterLinkState());
}
bool Router::StartSelfBypassToLocalPeer(
Router& local_outward_peer,
RemoteRouterLink& inward_link,
FragmentRef<RouterLinkState> new_link_state) {
if (new_link_state.is_null()) {
NodeLinkMemory& memory = inward_link.node_link()->memory();
memory.AllocateRouterLinkState(
[router = WrapRefCounted(this),
local_outward_peer = WrapRefCounted(&local_outward_peer),
inward_link = WrapRefCounted(&inward_link)](
FragmentRef<RouterLinkState> new_link_state) {
if (new_link_state.is_null()) {
// If this fails once, it's unlikely to succeed afterwards.
return;
}
router->StartSelfBypassToLocalPeer(*local_outward_peer, *inward_link,
std::move(new_link_state));
});
return true;
}
Ref<RemoteRouterLink> new_link;
SequenceNumber length_from_outward_peer;
const SublinkId new_sublink =
inward_link.node_link()->memory().AllocateSublinkIds(1);
{
MultiMutexLock lock(&mutex_, &local_outward_peer.mutex_);
const Ref<RouterLink>& outward_link = outward_edge_.primary_link();
const Ref<RouterLink>& peer_outward_link =
local_outward_peer.outward_edge_.primary_link();
if (!outward_link || !peer_outward_link || is_disconnected_ ||
local_outward_peer.is_disconnected_) {
DVLOG(4) << "Proxy bypass blocked due to peer closure or disconnection";
return false;
}
DVLOG(4) << "Proxy requesting own bypass from inward peer on "
<< inward_link.node_link()->remote_node_name().ToString()
<< " to local outward peer";
ABSL_ASSERT(outward_link->GetLocalPeer() == &local_outward_peer);
ABSL_ASSERT(peer_outward_link->GetLocalPeer() == this);
// Decay both of our existing links, as well as the local peer's link to us.
length_from_outward_peer =
local_outward_peer.outbound_parcels_.current_sequence_number();
local_outward_peer.outward_edge_.BeginPrimaryLinkDecay();
local_outward_peer.outward_edge_.set_length_to_decaying_link(
length_from_outward_peer);
outward_edge_.BeginPrimaryLinkDecay();
outward_edge_.set_length_from_decaying_link(length_from_outward_peer);
inward_edge_->BeginPrimaryLinkDecay();
inward_edge_->set_length_to_decaying_link(length_from_outward_peer);
new_link = inward_link.node_link()->AddRemoteRouterLink(
new_sublink, new_link_state, LinkType::kCentral, LinkSide::kA,
WrapRefCounted(&local_outward_peer));
}
if (!new_link) {
AcceptRouteDisconnectedFrom(LinkType::kCentral);
return false;
}
// Inform our inward peer on another node that they can bypass us using the
// new link we just created to our own outward local peer. Once that message
// is sent, it's safe for that local peer to adopt the new link.
inward_link.BypassPeerWithLink(new_sublink, std::move(new_link_state),
length_from_outward_peer);
local_outward_peer.SetOutwardLink(std::move(new_link));
return true;
}
void Router::MaybeStartBridgeBypass() {
Ref<Router> first_bridge = WrapRefCounted(this);
Ref<Router> second_bridge;
{
absl::MutexLock lock(&mutex_);
if (!bridge_ || !bridge_->is_stable()) {
return;
}
second_bridge = bridge_->GetLocalPeer();
if (!second_bridge) {
return;
}
}
Ref<Router> first_local_peer;
Ref<Router> second_local_peer;
Ref<RemoteRouterLink> first_remote_link;
Ref<RemoteRouterLink> second_remote_link;
{
MultiMutexLock lock(&mutex_, &second_bridge->mutex_);
const Ref<RouterLink>& link_to_first_peer = outward_edge_.primary_link();
const Ref<RouterLink>& link_to_second_peer =
second_bridge->outward_edge_.primary_link();
if (!link_to_first_peer || !link_to_second_peer) {
return;
}
NodeName first_peer_node_name;
first_local_peer = link_to_first_peer->GetLocalPeer();
first_remote_link =
WrapRefCounted(link_to_first_peer->AsRemoteRouterLink());
if (first_remote_link) {
first_peer_node_name = first_remote_link->node_link()->remote_node_name();
}
NodeName second_peer_node_name;
second_local_peer = link_to_second_peer->GetLocalPeer();
second_remote_link =
WrapRefCounted(link_to_second_peer->AsRemoteRouterLink());
if (second_remote_link) {
second_peer_node_name =
second_remote_link->node_link()->remote_node_name();
}
if (!link_to_first_peer->TryLockForBypass(second_peer_node_name)) {
return;
}
if (!link_to_second_peer->TryLockForBypass(first_peer_node_name)) {
// Cancel the decay on this bridge's side, because we couldn't decay the
// other side of the bridge yet.
link_to_first_peer->Unlock();
return;
}
}
// At this point, the outward links from each bridge router have been locked
// for bypass. There are now three distinct cases to handle, based around
// where the outward peer routers are located.
// Case 1: Neither bridge router's outward peer is local to this node. This is
// roughly equivalent to the normal proxy bypass case where the proxy belongs
// to a different node from its inward and outward peers. We send a message to
// our outward peer with sufficient information for it to bypass both bridge
// routers with a new central link directly to the other bridge router's
// outward peer.
if (!first_local_peer && !second_local_peer) {
{
MultiMutexLock lock(&mutex_, &second_bridge->mutex_);
if (!bridge_ || !second_bridge->bridge_) {
// If another thread raced to sever this link, we can give up
// immediately.
return;
}
outward_edge_.BeginPrimaryLinkDecay();
second_bridge->outward_edge_.BeginPrimaryLinkDecay();
bridge_->BeginPrimaryLinkDecay();
second_bridge->bridge_->BeginPrimaryLinkDecay();
}
second_remote_link->BypassPeer(
first_remote_link->node_link()->remote_node_name(),
first_remote_link->sublink());
return;
}
// Case 2: Only one of the bridge routers has a local outward peer. This is
// roughly equivalent to the normal proxy bypass case where the proxy and its
// outward peer belong to the same node. This case is handled separately since
// it's a bit more complex than the cases above and below.
if (!second_local_peer) {
StartBridgeBypassFromLocalPeer(
second_remote_link->node_link()->memory().TryAllocateRouterLinkState());
return;
} else if (!first_local_peer) {
second_bridge->StartBridgeBypassFromLocalPeer(
first_remote_link->node_link()->memory().TryAllocateRouterLinkState());
return;
}
// Case 3: Both bridge routers' outward peers are local to this node. This is
// a unique bypass case, as it's the only scenario where all involved routers
// are local to the same node and bypass can be orchestrated synchronously in
// a single step.
{
MultiMutexLock lock(&mutex_, &second_bridge->mutex_,
&first_local_peer->mutex_, &second_local_peer->mutex_);
if (!bridge_ || !second_bridge->bridge_) {
// If another thread raced to sever this link, we can give up immediately.
return;
}
const SequenceNumber length_from_first_peer =
first_local_peer->outbound_parcels_.current_sequence_number();
const SequenceNumber length_from_second_peer =
second_local_peer->outbound_parcels_.current_sequence_number();
RouteEdge& first_peer_edge = first_local_peer->outward_edge_;
first_peer_edge.BeginPrimaryLinkDecay();
first_peer_edge.set_length_to_decaying_link(length_from_first_peer);
first_peer_edge.set_length_from_decaying_link(length_from_second_peer);
RouteEdge& second_peer_edge = second_local_peer->outward_edge_;
second_peer_edge.BeginPrimaryLinkDecay();
second_peer_edge.set_length_to_decaying_link(length_from_second_peer);
second_peer_edge.set_length_from_decaying_link(length_from_first_peer);
outward_edge_.BeginPrimaryLinkDecay();
outward_edge_.set_length_to_decaying_link(length_from_second_peer);
outward_edge_.set_length_from_decaying_link(length_from_first_peer);
RouteEdge& peer_bridge_outward_edge = second_bridge->outward_edge_;
peer_bridge_outward_edge.BeginPrimaryLinkDecay();
peer_bridge_outward_edge.set_length_to_decaying_link(
length_from_first_peer);
peer_bridge_outward_edge.set_length_from_decaying_link(
length_from_second_peer);
bridge_->BeginPrimaryLinkDecay();
bridge_->set_length_to_decaying_link(length_from_first_peer);
bridge_->set_length_from_decaying_link(length_from_second_peer);
RouteEdge& peer_bridge = *second_bridge->bridge_;
peer_bridge.BeginPrimaryLinkDecay();
peer_bridge.set_length_to_decaying_link(length_from_second_peer);
peer_bridge.set_length_from_decaying_link(length_from_first_peer);
RouterLink::Pair links = LocalRouterLink::CreatePair(
LinkType::kCentral, Router::Pair(first_local_peer, second_local_peer));
first_local_peer->outward_edge_.SetPrimaryLink(std::move(links.first));
second_local_peer->outward_edge_.SetPrimaryLink(std::move(links.second));
}
first_bridge->Flush();
second_bridge->Flush();
first_local_peer->Flush();
second_local_peer->Flush();
}
void Router::StartBridgeBypassFromLocalPeer(
FragmentRef<RouterLinkState> link_state) {
Ref<Router> local_peer;
Ref<Router> other_bridge;
{
absl::MutexLock lock(&mutex_);
if (!bridge_ || !bridge_->is_stable()) {
return;
}
local_peer = outward_edge_.GetLocalPeer();
other_bridge = bridge_->GetLocalPeer();
if (!local_peer || !other_bridge) {
return;
}
}
Ref<RemoteRouterLink> remote_link;
{
absl::MutexLock lock(&other_bridge->mutex_);
if (!other_bridge->outward_edge_.primary_link()) {
return;
}
remote_link = WrapRefCounted(
other_bridge->outward_edge_.primary_link()->AsRemoteRouterLink());
if (!remote_link) {
return;
}
}
if (link_state.is_null()) {
// We need a new RouterLinkState on the remote link before we can complete
// this operation.
remote_link->node_link()->memory().AllocateRouterLinkState(
[router = WrapRefCounted(this)](FragmentRef<RouterLinkState> state) {
if (!state.is_null()) {
router->StartBridgeBypassFromLocalPeer(std::move(state));
}
});
return;
}
// At this point, we have a new RouterLinkState for a new link, we have
// references to all three local routers (this bridge router, its local peer,
// and the other bridge router), and we have a remote link to the other bridge
// router's outward peer. This is sufficient to initiate bypass.
const Ref<NodeLink>& node_link_to_peer = remote_link->node_link();
SequenceNumber length_from_local_peer;
const SublinkId bypass_sublink =
node_link_to_peer->memory().AllocateSublinkIds(1);
Ref<RemoteRouterLink> new_link = node_link_to_peer->AddRemoteRouterLink(
bypass_sublink, link_state, LinkType::kCentral, LinkSide::kA, local_peer);
{
MultiMutexLock lock(&mutex_, &other_bridge->mutex_, &local_peer->mutex_);
if (!bridge_ || !other_bridge->bridge_) {
// If another thread raced to sever this link, we can give up immediately.
return;
}
length_from_local_peer =
local_peer->outbound_parcels_.current_sequence_number();
RouteEdge& edge_from_local_peer = local_peer->outward_edge_;
edge_from_local_peer.BeginPrimaryLinkDecay();
edge_from_local_peer.set_length_to_decaying_link(length_from_local_peer);
RouteEdge& edge_to_other_peer = other_bridge->outward_edge_;
edge_to_other_peer.BeginPrimaryLinkDecay();
edge_to_other_peer.set_length_to_decaying_link(length_from_local_peer);
bridge_->BeginPrimaryLinkDecay();
bridge_->set_length_to_decaying_link(length_from_local_peer);
outward_edge_.BeginPrimaryLinkDecay();
outward_edge_.set_length_from_decaying_link(length_from_local_peer);
RouteEdge& other_bridge_edge = *other_bridge->bridge_;
other_bridge_edge.BeginPrimaryLinkDecay();
other_bridge_edge.set_length_from_decaying_link(length_from_local_peer);
}
remote_link->BypassPeerWithLink(bypass_sublink, std::move(link_state),
length_from_local_peer);
local_peer->SetOutwardLink(std::move(new_link));
Flush();
other_bridge->Flush();
local_peer->Flush();
}
bool Router::BypassPeerWithNewRemoteLink(
RemoteRouterLink& requestor,
NodeLink& node_link,
SublinkId bypass_target_sublink,
FragmentRef<RouterLinkState> new_link_state) {
if (new_link_state.is_null()) {
// We can't proceed with bypass until we have a fragment allocated for a new
// RouterLinkState.
node_link.memory().AllocateRouterLinkState(
[router = WrapRefCounted(this), requestor = WrapRefCounted(&requestor),
node_link = WrapRefCounted(&node_link),
bypass_target_sublink](FragmentRef<RouterLinkState> new_link_state) {
if (new_link_state.is_null()) {
// If this fails once, it's unlikely to succeed afterwards.
return;
}
router->BypassPeerWithNewRemoteLink(*requestor, *node_link,
bypass_target_sublink,
std::move(new_link_state));
});
return true;
}
// Begin decaying our outward link.
SequenceNumber length_to_decaying_link;
Ref<RouterLink> new_link;
const SublinkId new_sublink = node_link.memory().AllocateSublinkIds(1);
{
absl::ReleasableMutexLock lock(&mutex_);
if (!outward_edge_.primary_link() || is_disconnected_) {
// We've been disconnected since leaving the above block. Don't bother
// to request a bypass. This is not the requestor's fault, so it's not
// treated as an error.
return true;
}
if (!outward_edge_.BeginPrimaryLinkDecay()) {
DLOG(ERROR) << "Rejecting BypassPeer on failure to decay link";
return false;
}
length_to_decaying_link = outbound_parcels_.current_sequence_number();
outward_edge_.set_length_to_decaying_link(length_to_decaying_link);
new_link = node_link.AddRemoteRouterLink(new_sublink, new_link_state,
LinkType::kCentral, LinkSide::kA,
WrapRefCounted(this));
}
if (!new_link) {
// The NodeLink was disconnected before we could create a new link for
// this Router. This is not the requestor's fault, so it's not treated as
// an error.
AcceptRouteDisconnectedFrom(LinkType::kCentral);
return true;
}
const NodeName proxy_node_name = requestor.node_link()->remote_node_name();
DVLOG(4) << "Sending AcceptBypassLink from "
<< node_link.local_node_name().ToString() << " to "
<< node_link.remote_node_name().ToString() << " with new sublink "
<< new_sublink << " to replace a link to proxy "
<< proxy_node_name.ToString() << " via sublink "
<< bypass_target_sublink;
node_link.AcceptBypassLink(proxy_node_name, bypass_target_sublink,
length_to_decaying_link, new_sublink,
std::move(new_link_state));
// NOTE: This link is intentionally set *after* transmitting the
// above message. Otherwise the router might race on another thread to send
// messages via `new_sublink`, and the remote node would have no idea where
// to route them.
SetOutwardLink(std::move(new_link));
return true;
}
bool Router::BypassPeerWithNewLocalLink(RemoteRouterLink& requestor,
SublinkId bypass_target_sublink) {
NodeLink& from_node_link = *requestor.node_link();
const Ref<Router> new_local_peer =
from_node_link.GetRouter(bypass_target_sublink);
if (!new_local_peer) {
// The peer may have already been destroyed or disconnected from the proxy
// by the time we get here.
AcceptRouteDisconnectedFrom(LinkType::kPeripheralOutward);
return true;
}
Ref<RouterLink> link_from_new_local_peer_to_proxy;
SequenceNumber length_to_proxy_from_us;
SequenceNumber length_from_proxy_to_us;
{
MultiMutexLock lock(&mutex_, &new_local_peer->mutex_);
length_from_proxy_to_us =
new_local_peer->outbound_parcels_.current_sequence_number();
length_to_proxy_from_us = outbound_parcels_.current_sequence_number();
DVLOG(4) << "Proxy bypass requested with new local peer on "
<< from_node_link.local_node_name().ToString() << " and proxy on "
<< from_node_link.remote_node_name().ToString() << " via sublinks "
<< bypass_target_sublink << " and " << requestor.sublink()
<< "; length to the proxy is " << length_to_proxy_from_us
<< " and length from the proxy " << length_from_proxy_to_us;
link_from_new_local_peer_to_proxy =
new_local_peer->outward_edge_.primary_link();
if (!outward_edge_.primary_link() || !link_from_new_local_peer_to_proxy ||
is_disconnected_ || new_local_peer->is_disconnected_) {
return true;
}
// Otherwise immediately begin decay of both links to the proxy.
if (!outward_edge_.BeginPrimaryLinkDecay() ||
!new_local_peer->outward_edge_.BeginPrimaryLinkDecay()) {
DLOG(ERROR) << "Rejecting BypassPeer on failure to decay link";
return false;
}
outward_edge_.set_length_to_decaying_link(length_to_proxy_from_us);
outward_edge_.set_length_from_decaying_link(length_from_proxy_to_us);
new_local_peer->outward_edge_.set_length_to_decaying_link(
length_from_proxy_to_us);
new_local_peer->outward_edge_.set_length_from_decaying_link(
length_to_proxy_from_us);
// Finally, link the two routers with a new LocalRouterLink. This link will
// remain unstable until the decaying proxy links are gone.
RouterLink::Pair links = LocalRouterLink::CreatePair(
LinkType::kCentral, Router::Pair(WrapRefCounted(this), new_local_peer));
outward_edge_.SetPrimaryLink(std::move(links.first));
new_local_peer->outward_edge_.SetPrimaryLink(std::move(links.second));
}
link_from_new_local_peer_to_proxy->StopProxying(length_from_proxy_to_us,
length_to_proxy_from_us);
Flush();
new_local_peer->Flush();
return true;
}
std::unique_ptr<Parcel> Router::TakeNextInboundParcel(
TrapEventDispatcher& dispatcher) {
std::unique_ptr<Parcel> parcel;
inbound_parcels_.Pop(parcel);
if (inbound_parcels_.IsSequenceFullyConsumed()) {
status_flags_ |= IPCZ_PORTAL_STATUS_PEER_CLOSED | IPCZ_PORTAL_STATUS_DEAD;
}
traps_.NotifyLocalParcelConsumed(status_flags_, inbound_parcels_, dispatcher);
return parcel;
}
} // namespace ipcz