blob: 779e6fc0fa1275dd0957ab01255cacb2ecd70121 [file] [log] [blame]
// Copyright 2022 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "ipcz/router.h"
#include <algorithm>
#include <cstddef>
#include <cstring>
#include <utility>
#include "ipcz/ipcz.h"
#include "ipcz/node_link.h"
#include "ipcz/remote_router_link.h"
#include "ipcz/sequence_number.h"
#include "ipcz/trap_event_dispatcher.h"
#include "third_party/abseil-cpp/absl/base/macros.h"
#include "third_party/abseil-cpp/absl/container/inlined_vector.h"
#include "third_party/abseil-cpp/absl/synchronization/mutex.h"
#include "util/log.h"
namespace ipcz {
namespace {
// Helper structure used to accumulate individual parcel flushing operations
// within Router::Flush(), via CollectParcelsToFlush() below.
struct ParcelToFlush {
// The link over which to flush this parcel.
RouterLink* link;
// The parcel to be flushed.
Parcel parcel;
};
using ParcelsToFlush = absl::InlinedVector<ParcelToFlush, 8>;
// Helper which attempts to pop elements from `queue` for transmission along
// `edge`. This terminates either when `queue` is exhausted, or the next parcel
// in `queue` is to be transmitted over a link that is not yet known to `edge`.
// Any successfully popped elements are accumulated at the end of `parcels`.
void CollectParcelsToFlush(ParcelQueue& queue,
const RouteEdge& edge,
ParcelsToFlush& parcels) {
RouterLink* decaying_link = edge.decaying_link().get();
RouterLink* primary_link = edge.primary_link().get();
while (queue.HasNextElement()) {
const SequenceNumber n = queue.current_sequence_number();
RouterLink* link = nullptr;
if (decaying_link && edge.ShouldTransmitOnDecayingLink(n)) {
link = decaying_link;
} else if (primary_link && !edge.ShouldTransmitOnDecayingLink(n)) {
link = primary_link;
} else {
return;
}
ParcelToFlush& parcel = parcels.emplace_back(ParcelToFlush{.link = link});
const bool popped = queue.Pop(parcel.parcel);
ABSL_ASSERT(popped);
}
}
} // namespace
Router::Router() = default;
Router::~Router() {
// A Router MUST be serialized or closed before it can be destroyed. Both
// operations clear `traps_` and imply that no further traps should be added.
absl::MutexLock lock(&mutex_);
ABSL_ASSERT(traps_.empty());
}
bool Router::IsPeerClosed() {
absl::MutexLock lock(&mutex_);
return (status_.flags & IPCZ_PORTAL_STATUS_PEER_CLOSED) != 0;
}
bool Router::IsRouteDead() {
absl::MutexLock lock(&mutex_);
return (status_.flags & IPCZ_PORTAL_STATUS_DEAD) != 0;
}
void Router::QueryStatus(IpczPortalStatus& status) {
absl::MutexLock lock(&mutex_);
const size_t size = std::min(status.size, status_.size);
memcpy(&status, &status_, size);
status.size = size;
}
bool Router::HasLocalPeer(Router& router) {
absl::MutexLock lock(&mutex_);
return outward_edge_.primary_link() &&
outward_edge_.primary_link()->HasLocalPeer(router);
}
IpczResult Router::SendOutboundParcel(Parcel& parcel) {
Ref<RouterLink> link;
{
absl::MutexLock lock(&mutex_);
if (inbound_parcels_.final_sequence_length()) {
// If the inbound sequence is finalized, the peer portal must be gone.
return IPCZ_RESULT_NOT_FOUND;
}
const SequenceNumber sequence_number =
outbound_parcels_.GetCurrentSequenceLength();
parcel.set_sequence_number(sequence_number);
if (outward_edge_.primary_link() &&
outbound_parcels_.MaybeSkipSequenceNumber(sequence_number)) {
link = outward_edge_.primary_link();
} else {
// If there are no unsent parcels ahead of this one in the outbound
// sequence, and we have an active outward link, we can immediately
// transmit the parcel without any intermediate queueing step. That is the
// most common case, but otherwise we have to queue the parcel here and it
// will be flushed out ASAP.
DVLOG(4) << "Queuing outbound " << parcel.Describe();
const bool push_ok =
outbound_parcels_.Push(sequence_number, std::move(parcel));
ABSL_ASSERT(push_ok);
}
}
if (link) {
link->AcceptParcel(parcel);
} else {
Flush();
}
return IPCZ_RESULT_OK;
}
void Router::CloseRoute() {
TrapEventDispatcher dispatcher;
Ref<RouterLink> link;
{
absl::MutexLock lock(&mutex_);
outbound_parcels_.SetFinalSequenceLength(
outbound_parcels_.GetCurrentSequenceLength());
traps_.RemoveAll(dispatcher);
}
Flush();
}
void Router::SetOutwardLink(Ref<RouterLink> link) {
ABSL_ASSERT(link);
{
absl::MutexLock lock(&mutex_);
if (!is_disconnected_) {
outward_edge_.SetPrimaryLink(std::move(link));
}
}
if (link) {
// If the link wasn't adopted, this Router has already been disconnected.
link->AcceptRouteDisconnected();
link->Deactivate();
return;
}
Flush();
}
bool Router::AcceptInboundParcel(Parcel& parcel) {
TrapEventDispatcher dispatcher;
{
absl::MutexLock lock(&mutex_);
const SequenceNumber sequence_number = parcel.sequence_number();
if (!inbound_parcels_.Push(sequence_number, std::move(parcel))) {
// Unexpected route disconnection can cut off inbound sequences, so don't
// treat an out-of-bounds parcel as a validation failure.
return true;
}
if (!inward_edge_) {
// If this is a terminal router, we may have trap events to fire.
status_.num_local_parcels = inbound_parcels_.GetNumAvailableElements();
status_.num_local_bytes = inbound_parcels_.GetTotalAvailableElementSize();
traps_.UpdatePortalStatus(status_, TrapSet::UpdateReason::kNewLocalParcel,
dispatcher);
}
}
Flush();
return true;
}
bool Router::AcceptOutboundParcel(Parcel& parcel) {
{
absl::MutexLock lock(&mutex_);
// Proxied outbound parcels are always queued in a ParcelQueue even if they
// will be forwarded immediately. This allows us to track the full sequence
// of forwarded parcels so we can know with certainty when we're done
// forwarding.
//
// TODO: Using a queue here may increase latency along the route, because it
// it unnecessarily forces in-order forwarding. We could use an unordered
// queue for forwarding, but we'd still need some lighter-weight abstraction
// that tracks complete sequences from potentially fragmented contributions.
const SequenceNumber sequence_number = parcel.sequence_number();
if (!outbound_parcels_.Push(sequence_number, std::move(parcel))) {
// Unexpected route disconnection can cut off outbound sequences, so don't
// treat an out-of-bounds parcel as a validation failure.
return true;
}
}
Flush();
return true;
}
bool Router::AcceptRouteClosureFrom(LinkType link_type,
SequenceNumber sequence_length) {
TrapEventDispatcher dispatcher;
{
absl::MutexLock lock(&mutex_);
if (link_type.is_outward()) {
if (!inbound_parcels_.SetFinalSequenceLength(sequence_length)) {
// Ignore if and only if the sequence was terminated early.
DVLOG(4) << "Discarding inbound route closure notification";
return inbound_parcels_.final_sequence_length().has_value() &&
*inbound_parcels_.final_sequence_length() <= sequence_length;
}
if (!inward_edge_) {
status_.flags |= IPCZ_PORTAL_STATUS_PEER_CLOSED;
if (inbound_parcels_.IsSequenceFullyConsumed()) {
status_.flags |= IPCZ_PORTAL_STATUS_DEAD;
}
traps_.UpdatePortalStatus(status_, TrapSet::UpdateReason::kPeerClosed,
dispatcher);
}
} else if (link_type.is_peripheral_inward()) {
if (!outbound_parcels_.SetFinalSequenceLength(sequence_length)) {
// Ignore if and only if the sequence was terminated early.
DVLOG(4) << "Discarding outbound route closure notification";
return outbound_parcels_.final_sequence_length().has_value() &&
*outbound_parcels_.final_sequence_length() <= sequence_length;
}
}
}
Flush();
return true;
}
bool Router::AcceptRouteDisconnectedFrom(LinkType link_type) {
TrapEventDispatcher dispatcher;
absl::InlinedVector<Ref<RouterLink>, 4> forwarding_links;
{
absl::MutexLock lock(&mutex_);
DVLOG(4) << "Router " << this << " disconnected from "
<< link_type.ToString() << "link";
is_disconnected_ = true;
if (link_type.is_peripheral_inward()) {
outbound_parcels_.ForceTerminateSequence();
} else {
inbound_parcels_.ForceTerminateSequence();
}
// Wipe out all remaining links and propagate the disconnection over them.
forwarding_links.push_back(outward_edge_.ReleasePrimaryLink());
forwarding_links.push_back(outward_edge_.ReleaseDecayingLink());
if (inward_edge_) {
forwarding_links.push_back(inward_edge_->ReleasePrimaryLink());
forwarding_links.push_back(inward_edge_->ReleaseDecayingLink());
} else {
// Terminal routers may have trap events to fire.
status_.flags |= IPCZ_PORTAL_STATUS_PEER_CLOSED;
if (inbound_parcels_.IsSequenceFullyConsumed()) {
status_.flags |= IPCZ_PORTAL_STATUS_DEAD;
}
traps_.UpdatePortalStatus(status_, TrapSet::UpdateReason::kPeerClosed,
dispatcher);
}
}
for (const Ref<RouterLink>& link : forwarding_links) {
if (link) {
DVLOG(4) << "Forwarding disconnection over " << link->Describe();
link->AcceptRouteDisconnected();
link->Deactivate();
}
}
Flush();
return true;
}
IpczResult Router::GetNextInboundParcel(IpczGetFlags flags,
void* data,
size_t* num_bytes,
IpczHandle* handles,
size_t* num_handles) {
TrapEventDispatcher dispatcher;
absl::MutexLock lock(&mutex_);
if (inbound_parcels_.IsSequenceFullyConsumed()) {
return IPCZ_RESULT_NOT_FOUND;
}
if (!inbound_parcels_.HasNextElement()) {
return IPCZ_RESULT_UNAVAILABLE;
}
Parcel& p = inbound_parcels_.NextElement();
const bool allow_partial = (flags & IPCZ_GET_PARTIAL) != 0;
const size_t data_capacity = num_bytes ? *num_bytes : 0;
const size_t handles_capacity = num_handles ? *num_handles : 0;
const size_t data_size =
allow_partial ? std::min(p.data_size(), data_capacity) : p.data_size();
const size_t handles_size = allow_partial
? std::min(p.num_objects(), handles_capacity)
: p.num_objects();
if (num_bytes) {
*num_bytes = data_size;
}
if (num_handles) {
*num_handles = handles_size;
}
const bool consuming_whole_parcel =
data_capacity >= data_size && handles_capacity >= handles_size;
if (!consuming_whole_parcel && !allow_partial) {
return IPCZ_RESULT_RESOURCE_EXHAUSTED;
}
memcpy(data, p.data_view().data(), data_size);
const bool ok = inbound_parcels_.Consume(
data_size, absl::MakeSpan(handles, handles_size));
ABSL_ASSERT(ok);
status_.num_local_parcels = inbound_parcels_.GetNumAvailableElements();
status_.num_local_bytes = inbound_parcels_.GetTotalAvailableElementSize();
if (inbound_parcels_.IsSequenceFullyConsumed()) {
status_.flags |= IPCZ_PORTAL_STATUS_DEAD;
}
traps_.UpdatePortalStatus(
status_, TrapSet::UpdateReason::kLocalParcelConsumed, dispatcher);
return IPCZ_RESULT_OK;
}
IpczResult Router::Trap(const IpczTrapConditions& conditions,
IpczTrapEventHandler handler,
uint64_t context,
IpczTrapConditionFlags* satisfied_condition_flags,
IpczPortalStatus* status) {
absl::MutexLock lock(&mutex_);
return traps_.Add(conditions, handler, context, status_,
satisfied_condition_flags, status);
}
// static
Ref<Router> Router::Deserialize(const RouterDescriptor& descriptor,
NodeLink& from_node_link) {
bool disconnected = false;
auto router = MakeRefCounted<Router>();
{
absl::MutexLock lock(&router->mutex_);
router->outbound_parcels_.ResetInitialSequenceNumber(
descriptor.next_outgoing_sequence_number);
router->inbound_parcels_.ResetInitialSequenceNumber(
descriptor.next_incoming_sequence_number);
if (descriptor.peer_closed) {
router->status_.flags |= IPCZ_PORTAL_STATUS_PEER_CLOSED;
if (!router->inbound_parcels_.SetFinalSequenceLength(
descriptor.closed_peer_sequence_length)) {
return nullptr;
}
if (router->inbound_parcels_.IsSequenceFullyConsumed()) {
router->status_.flags |= IPCZ_PORTAL_STATUS_DEAD;
}
}
Ref<RemoteRouterLink> new_link = from_node_link.AddRemoteRouterLink(
descriptor.new_sublink, nullptr, LinkType::kPeripheralOutward,
LinkSide::kB, router);
if (new_link) {
router->outward_edge_.SetPrimaryLink(std::move(new_link));
DVLOG(4) << "Route extended from "
<< from_node_link.remote_node_name().ToString() << " to "
<< from_node_link.local_node_name().ToString() << " via sublink "
<< descriptor.new_sublink;
} else if (!descriptor.peer_closed) {
// The new portal is DOA, either because the associated NodeLink is dead,
// or the sublink ID was already in use. The latter implies a bug or bad
// behavior, but it should be harmless to ignore beyond this point.
disconnected = true;
}
}
if (disconnected) {
DVLOG(4) << "Disconnected new Router immediately after deserialization";
router->AcceptRouteDisconnectedFrom(LinkType::kPeripheralOutward);
}
router->Flush();
return router;
}
void Router::SerializeNewRouter(NodeLink& to_node_link,
RouterDescriptor& descriptor) {
TrapEventDispatcher dispatcher;
const SublinkId new_sublink = to_node_link.memory().AllocateSublinkIds(1);
descriptor.new_sublink = new_sublink;
{
absl::MutexLock lock(&mutex_);
traps_.RemoveAll(dispatcher);
descriptor.next_outgoing_sequence_number =
outbound_parcels_.current_sequence_number();
descriptor.next_incoming_sequence_number =
inbound_parcels_.current_sequence_number();
// Initialize an inward edge but with no link yet. This ensures that we
// don't look like a terminal router while waiting for a link to be set,
// which can only happen after `descriptor` is transmitted.
inward_edge_.emplace();
if (status_.flags & IPCZ_PORTAL_STATUS_PEER_CLOSED) {
descriptor.peer_closed = true;
descriptor.closed_peer_sequence_length =
*inbound_parcels_.final_sequence_length();
// Ensure that the new edge decays its link as soon as it has one, since
// we know the link will not be used.
inward_edge_->BeginPrimaryLinkDecay();
inward_edge_->set_length_to_decaying_link(
*inbound_parcels_.final_sequence_length());
inward_edge_->set_length_from_decaying_link(
outbound_parcels_.current_sequence_number());
}
// Once `descriptor` is transmitted to the destination node and the new
// Router is created there, it may immediately begin transmitting messages
// back to this node regarding `new_sublink`. We establish a new
// RemoteRouterLink now and register it to `new_sublink` on `to_node_link`,
// so that any such incoming messages are routed to `this`.
//
// NOTE: We do not yet provide `this` itself with a reference to the new
// RemoteRouterLink, because it's not yet safe for us to send messages to
// the remote node regarding `new_sublink`. `descriptor` must be transmitted
// first.
Ref<RemoteRouterLink> new_link = to_node_link.AddRemoteRouterLink(
new_sublink, nullptr, LinkType::kPeripheralInward, LinkSide::kA,
WrapRefCounted(this));
DVLOG(4) << "Router " << this << " extending route with tentative new "
<< new_link->Describe();
}
}
void Router::BeginProxyingToNewRouter(NodeLink& to_node_link,
const RouterDescriptor& descriptor) {
// Acquire a reference to the RemoteRouterLink created by an earlier call to
// SerializeNewRouter(). If the NodeLink has already been disconnected, this
// may be null.
if (auto new_sublink = to_node_link.GetSublink(descriptor.new_sublink)) {
Ref<RemoteRouterLink> new_router_link = new_sublink->router_link;
{
absl::MutexLock lock(&mutex_);
ABSL_ASSERT(inward_edge_);
// If the new router has already been closed or disconnected, we will
// discard the new link to it.
if (!outbound_parcels_.final_sequence_length() && !is_disconnected_) {
DVLOG(4) << "Router " << this << " will proxy to new router over "
<< new_router_link->Describe();
inward_edge_->SetPrimaryLink(std::move(new_router_link));
Ref<RouterLink> outward_link = outward_edge_.primary_link();
if (outward_link && outward_edge_.is_stable() &&
inward_edge_->is_stable()) {
outward_link->MarkSideStable();
}
}
}
if (new_router_link) {
// The link was not adopted, so deactivate and discard it.
DVLOG(4) << "Dropping link to new router " << new_router_link->Describe();
new_router_link->AcceptRouteDisconnected();
new_router_link->Deactivate();
return;
}
}
// We may have inbound parcels queued which need to be forwarded to the new
// Router, so give them a chance to be flushed out.
Flush();
}
bool Router::BypassPeer(RemoteRouterLink& requestor,
const NodeName& bypass_target_node,
SublinkId bypass_target_sublink) {
// TODO: Implement this.
return true;
}
bool Router::AcceptBypassLink(
Ref<NodeLink> new_node_link,
SublinkId new_sublink,
FragmentRef<RouterLinkState> new_link_state,
SequenceNumber inbound_sequence_length_from_bypassed_link) {
// TODO: Implement this.
return true;
}
bool Router::StopProxying(SequenceNumber inbound_sequence_length,
SequenceNumber outbound_sequence_length) {
// TODO: Implement this.
return true;
}
bool Router::NotifyProxyWillStop(SequenceNumber inbound_sequence_length) {
// TODO: Implement this.
return true;
}
bool Router::BypassPeerWithLink(NodeLink& from_node_link,
SublinkId new_sublink,
FragmentRef<RouterLinkState> new_link_state,
SequenceNumber inbound_sequence_length) {
// TODO: Implement this.
return true;
}
bool Router::StopProxyingToLocalPeer(SequenceNumber outbound_sequence_length) {
// TODO: Implement this.
return true;
}
void Router::NotifyLinkDisconnected(RemoteRouterLink& link) {
{
absl::MutexLock lock(&mutex_);
if (outward_edge_.primary_link() == &link) {
DVLOG(4) << "Primary " << link.Describe() << " disconnected";
outward_edge_.ReleasePrimaryLink();
} else if (outward_edge_.decaying_link() == &link) {
DVLOG(4) << "Decaying " << link.Describe() << " disconnected";
outward_edge_.ReleaseDecayingLink();
} else if (inward_edge_ && inward_edge_->primary_link() == &link) {
DVLOG(4) << "Primary " << link.Describe() << " disconnected";
inward_edge_->ReleasePrimaryLink();
} else if (inward_edge_ && inward_edge_->decaying_link() == &link) {
DVLOG(4) << "Decaying " << link.Describe() << " disconnected";
inward_edge_->ReleaseDecayingLink();
}
}
if (link.GetType().is_outward()) {
AcceptRouteDisconnectedFrom(LinkType::kPeripheralOutward);
} else {
AcceptRouteDisconnectedFrom(LinkType::kPeripheralInward);
}
}
void Router::Flush() {
Ref<RouterLink> outward_link;
Ref<RouterLink> inward_link;
Ref<RouterLink> decaying_outward_link;
Ref<RouterLink> decaying_inward_link;
Ref<RouterLink> dead_inward_link;
Ref<RouterLink> dead_outward_link;
absl::optional<SequenceNumber> final_inward_sequence_length;
absl::optional<SequenceNumber> final_outward_sequence_length;
ParcelsToFlush parcels_to_flush;
{
absl::MutexLock lock(&mutex_);
// Acquire stack references to all links we might want to use, so it's safe
// to acquire additional (unmanaged) references per ParcelToFlush.
outward_link = outward_edge_.primary_link();
inward_link = inward_edge_ ? inward_edge_->primary_link() : nullptr;
decaying_outward_link = outward_edge_.decaying_link();
decaying_inward_link =
inward_edge_ ? inward_edge_->decaying_link() : nullptr;
// Collect any parcels which are safe to transmit now. Note that we do not
// transmit anything or generally call into any RouterLinks while `mutex_`
// is held, because such calls may ultimately re-enter this Router
// (e.g. if a link is a LocalRouterLink, or even a RemoteRouterLink with a
// fully synchronous driver.) Instead we accumulate work within this block,
// and then perform any transmissions or link deactivations after the mutex
// is released further below.
CollectParcelsToFlush(outbound_parcels_, outward_edge_, parcels_to_flush);
if (inward_edge_) {
CollectParcelsToFlush(inbound_parcels_, *inward_edge_, parcels_to_flush);
}
if (outward_link && outbound_parcels_.IsSequenceFullyConsumed()) {
// Notify the other end of the route that this end is closed. See the
// AcceptRouteClosure() invocation further below.
final_outward_sequence_length =
*outbound_parcels_.final_sequence_length();
// We also have no more use for either outward or inward links: trivially
// there are no more outbound parcels to send outward, and there no longer
// exists an ultimate destination for any forwarded inbound parcels. So we
// drop both links now.
dead_outward_link = outward_edge_.ReleasePrimaryLink();
} else if (!inbound_parcels_.ExpectsMoreElements()) {
// If the other end of the route is gone and we've received all its
// parcels, we can simply drop the outward link in that case.
dead_outward_link = outward_edge_.ReleasePrimaryLink();
}
if (inbound_parcels_.IsSequenceFullyConsumed()) {
// We won't be receiving anything new from our peer, and if we're a proxy
// then we've also forwarded everything already. We can propagate closure
// inward and drop the inward link, if applicable.
final_inward_sequence_length = inbound_parcels_.final_sequence_length();
if (inward_edge_) {
dead_inward_link = inward_edge_->ReleasePrimaryLink();
}
}
}
for (ParcelToFlush& parcel : parcels_to_flush) {
parcel.link->AcceptParcel(parcel.parcel);
}
if (dead_outward_link) {
if (final_outward_sequence_length) {
dead_outward_link->AcceptRouteClosure(*final_outward_sequence_length);
}
dead_outward_link->Deactivate();
}
if (dead_inward_link) {
if (final_inward_sequence_length) {
dead_inward_link->AcceptRouteClosure(*final_inward_sequence_length);
}
dead_inward_link->Deactivate();
}
}
} // namespace ipcz