blob: 97e1231f46ac9dae3639ae4602e0aa0424887971 [file] [log] [blame]
// Copyright 2022 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "ipcz/router.h"
#include <algorithm>
#include <cstddef>
#include <cstring>
#include <utility>
#include "ipcz/ipcz.h"
#include "ipcz/node_link.h"
#include "ipcz/remote_router_link.h"
#include "ipcz/sequence_number.h"
#include "ipcz/trap_event_dispatcher.h"
#include "third_party/abseil-cpp/absl/base/macros.h"
#include "third_party/abseil-cpp/absl/container/inlined_vector.h"
#include "third_party/abseil-cpp/absl/synchronization/mutex.h"
#include "util/log.h"
namespace ipcz {
Router::Router() = default;
Router::~Router() {
// A Router MUST be serialized or closed before it can be destroyed. Both
// operations clear `traps_` and imply that no further traps should be added.
absl::MutexLock lock(&mutex_);
ABSL_ASSERT(traps_.empty());
}
bool Router::IsPeerClosed() {
absl::MutexLock lock(&mutex_);
return (status_.flags & IPCZ_PORTAL_STATUS_PEER_CLOSED) != 0;
}
bool Router::IsRouteDead() {
absl::MutexLock lock(&mutex_);
return (status_.flags & IPCZ_PORTAL_STATUS_DEAD) != 0;
}
void Router::QueryStatus(IpczPortalStatus& status) {
absl::MutexLock lock(&mutex_);
const size_t size = std::min(status.size, status_.size);
memcpy(&status, &status_, size);
status.size = size;
}
bool Router::HasLocalPeer(Router& router) {
absl::MutexLock lock(&mutex_);
if (!outward_link_) {
return false;
}
return outward_link_->HasLocalPeer(router);
}
IpczResult Router::SendOutboundParcel(Parcel& parcel) {
Ref<RouterLink> link;
{
absl::MutexLock lock(&mutex_);
const SequenceNumber sequence_number =
outbound_parcels_.GetCurrentSequenceLength();
parcel.set_sequence_number(sequence_number);
if (outward_link_ &&
outbound_parcels_.MaybeSkipSequenceNumber(sequence_number)) {
// If there are no unsent parcels ahead of this one in the outbound
// sequence, and we have an active outward link, we can immediately
// transmit the parcel without any intermediate queueing step. This is the
// most common case.
link = outward_link_;
} else {
DVLOG(4) << "Queuing outbound " << parcel.Describe();
const bool push_ok =
outbound_parcels_.Push(sequence_number, std::move(parcel));
ABSL_ASSERT(push_ok);
}
}
if (link) {
link->AcceptParcel(parcel);
} else {
Flush();
}
return IPCZ_RESULT_OK;
}
void Router::CloseRoute() {
TrapEventDispatcher dispatcher;
Ref<RouterLink> link;
{
absl::MutexLock lock(&mutex_);
bool ok = outbound_parcels_.SetFinalSequenceLength(
outbound_parcels_.GetCurrentSequenceLength());
ABSL_ASSERT(ok);
traps_.RemoveAll(dispatcher);
}
Flush();
}
void Router::SetOutwardLink(Ref<RouterLink> link) {
{
absl::MutexLock lock(&mutex_);
ABSL_ASSERT(!outward_link_);
outward_link_ = std::move(link);
}
Flush();
}
bool Router::AcceptInboundParcel(Parcel& parcel) {
TrapEventDispatcher dispatcher;
{
absl::MutexLock lock(&mutex_);
const SequenceNumber sequence_number = parcel.sequence_number();
if (!inbound_parcels_.Push(sequence_number, std::move(parcel))) {
return false;
}
status_.num_local_parcels = inbound_parcels_.GetNumAvailableElements();
status_.num_local_bytes = inbound_parcels_.GetTotalAvailableElementSize();
traps_.UpdatePortalStatus(status_, TrapSet::UpdateReason::kNewLocalParcel,
dispatcher);
}
Flush();
return true;
}
bool Router::AcceptOutboundParcel(Parcel& parcel) {
{
absl::MutexLock lock(&mutex_);
// Proxied outbound parcels are always queued in a ParcelQueue even if they
// will be forwarded immediately. This allows us to track the full sequence
// of forwarded parcels so we can know with certainty when we're done
// forwarding.
//
// TODO: Using a queue here may increase latency along the route, because it
// it unnecessarily forces in-order forwarding. We could use an unordered
// queue for forwarding, but we'd still need some lighter-weight abstraction
// that tracks complete sequences from potentially fragmented contributions.
const SequenceNumber sequence_number = parcel.sequence_number();
if (!outbound_parcels_.Push(sequence_number, std::move(parcel))) {
return false;
}
}
Flush();
return true;
}
bool Router::AcceptRouteClosureFrom(LinkType link_type,
SequenceNumber sequence_length) {
TrapEventDispatcher dispatcher;
{
absl::MutexLock lock(&mutex_);
if (link_type.is_outward()) {
if (!inbound_parcels_.SetFinalSequenceLength(sequence_length)) {
return false;
}
if (inward_link_) {
inward_link_->AcceptRouteClosure(sequence_length);
} else {
status_.flags |= IPCZ_PORTAL_STATUS_PEER_CLOSED;
if (inbound_parcels_.IsSequenceFullyConsumed()) {
status_.flags |= IPCZ_PORTAL_STATUS_DEAD;
}
traps_.UpdatePortalStatus(status_, TrapSet::UpdateReason::kPeerClosed,
dispatcher);
}
} else if (link_type.is_peripheral_inward()) {
if (!outbound_parcels_.SetFinalSequenceLength(sequence_length)) {
return false;
}
}
}
Flush();
return true;
}
IpczResult Router::GetNextInboundParcel(IpczGetFlags flags,
void* data,
size_t* num_bytes,
IpczHandle* handles,
size_t* num_handles) {
TrapEventDispatcher dispatcher;
absl::MutexLock lock(&mutex_);
if (inbound_parcels_.IsSequenceFullyConsumed()) {
return IPCZ_RESULT_NOT_FOUND;
}
if (!inbound_parcels_.HasNextElement()) {
return IPCZ_RESULT_UNAVAILABLE;
}
Parcel& p = inbound_parcels_.NextElement();
const bool allow_partial = (flags & IPCZ_GET_PARTIAL) != 0;
const size_t data_capacity = num_bytes ? *num_bytes : 0;
const size_t handles_capacity = num_handles ? *num_handles : 0;
const size_t data_size =
allow_partial ? std::min(p.data_size(), data_capacity) : p.data_size();
const size_t handles_size = allow_partial
? std::min(p.num_objects(), handles_capacity)
: p.num_objects();
if (num_bytes) {
*num_bytes = data_size;
}
if (num_handles) {
*num_handles = handles_size;
}
const bool consuming_whole_parcel =
data_capacity >= data_size && handles_capacity >= handles_size;
if (!consuming_whole_parcel && !allow_partial) {
return IPCZ_RESULT_RESOURCE_EXHAUSTED;
}
memcpy(data, p.data_view().data(), data_size);
const bool ok = inbound_parcels_.Consume(
data_size, absl::MakeSpan(handles, handles_size));
ABSL_ASSERT(ok);
status_.num_local_parcels = inbound_parcels_.GetNumAvailableElements();
status_.num_local_bytes = inbound_parcels_.GetTotalAvailableElementSize();
if (inbound_parcels_.IsSequenceFullyConsumed()) {
status_.flags |= IPCZ_PORTAL_STATUS_DEAD;
}
traps_.UpdatePortalStatus(
status_, TrapSet::UpdateReason::kLocalParcelConsumed, dispatcher);
return IPCZ_RESULT_OK;
}
IpczResult Router::Trap(const IpczTrapConditions& conditions,
IpczTrapEventHandler handler,
uint64_t context,
IpczTrapConditionFlags* satisfied_condition_flags,
IpczPortalStatus* status) {
absl::MutexLock lock(&mutex_);
return traps_.Add(conditions, handler, context, status_,
satisfied_condition_flags, status);
}
// static
Ref<Router> Router::Deserialize(const RouterDescriptor& descriptor,
NodeLink& from_node_link) {
auto router = MakeRefCounted<Router>();
{
absl::MutexLock lock(&router->mutex_);
router->outbound_parcels_.ResetInitialSequenceNumber(
descriptor.next_outgoing_sequence_number);
router->inbound_parcels_.ResetInitialSequenceNumber(
descriptor.next_incoming_sequence_number);
if (descriptor.peer_closed) {
router->status_.flags |= IPCZ_PORTAL_STATUS_PEER_CLOSED;
if (!router->inbound_parcels_.SetFinalSequenceLength(
descriptor.closed_peer_sequence_length)) {
return nullptr;
}
if (router->inbound_parcels_.IsSequenceFullyConsumed()) {
router->status_.flags |= IPCZ_PORTAL_STATUS_DEAD;
}
}
Ref<RemoteRouterLink> new_link = from_node_link.AddRemoteRouterLink(
descriptor.new_sublink, LinkType::kPeripheralOutward, LinkSide::kB,
router);
if (!new_link) {
return nullptr;
}
router->outward_link_ = std::move(new_link);
DVLOG(4) << "Route extended from "
<< from_node_link.remote_node_name().ToString() << " to "
<< from_node_link.local_node_name().ToString() << " via sublink "
<< descriptor.new_sublink;
}
router->Flush();
return router;
}
void Router::SerializeNewRouter(NodeLink& to_node_link,
RouterDescriptor& descriptor) {
TrapEventDispatcher dispatcher;
{
absl::MutexLock lock(&mutex_);
traps_.RemoveAll(dispatcher);
descriptor.next_outgoing_sequence_number =
outbound_parcels_.current_sequence_number();
descriptor.next_incoming_sequence_number =
inbound_parcels_.current_sequence_number();
DVLOG(4) << "Extending route to new router with outbound sequence length "
<< descriptor.next_outgoing_sequence_number
<< " and current inbound sequence number "
<< descriptor.next_incoming_sequence_number;
if (status_.flags & IPCZ_PORTAL_STATUS_PEER_CLOSED) {
descriptor.peer_closed = true;
descriptor.closed_peer_sequence_length =
*inbound_parcels_.final_sequence_length();
}
}
const SublinkId new_sublink = to_node_link.memory().AllocateSublinkIds(1);
descriptor.new_sublink = new_sublink;
// Once `descriptor` is transmitted to the destination node and the new Router
// is created there, it may immediately begin transmitting messages back to
// this node regarding `new_sublink`. We establish a new RemoteRouterLink now
// and register it to `new_sublink` on `to_node_link`, so that any such
// incoming messages are routed to `this`.
//
// NOTE: We do not yet provide `this` itself with a reference to the new
// RemoteRouterLink, because it's not yet safe for us to send messages to the
// remote node regarding `new_sublink`. `descriptor` must be transmitted
// first.
to_node_link.AddRemoteRouterLink(new_sublink, LinkType::kPeripheralInward,
LinkSide::kA, WrapRefCounted(this));
}
void Router::BeginProxyingToNewRouter(NodeLink& to_node_link,
const RouterDescriptor& descriptor) {
// Acquire a reference to the sublink created by an earlier call to
// SerializeNewRouter().
const absl::optional<NodeLink::Sublink> new_sublink =
to_node_link.GetSublink(descriptor.new_sublink);
if (!new_sublink) {
// The sublink has been torn down, presumably because of node disconnection.
// Nowhere to proxy now, so we're done.
return;
}
bool deactivate_link = false;
{
absl::MutexLock lock(&mutex_);
ABSL_ASSERT(!inward_link_);
// It's possible that the new router was already closed and we've already
// received a notification about this and forwarded any parcels it may have
// sent. In that case it would be pointless to establish an inward link, so
// we'll just drop it instead.
if (outbound_parcels_.IsSequenceFullyConsumed()) {
deactivate_link = true;
} else {
// TODO: Initiate proxy removal ASAP now that we're proxying.
inward_link_ = new_sublink->router_link;
}
}
if (deactivate_link) {
new_sublink->router_link->Deactivate();
return;
}
// We may have inbound parcels queued which need to be forwarded to the new
// Router, so give them a chance to be flushed out.
Flush();
}
void Router::NotifyLinkDisconnected(const NodeLink& node_link,
SublinkId sublink) {
Ref<RouterLink> dead_outward_link;
SequenceNumber inbound_sequence_length;
Ref<RouterLink> dead_inward_link;
SequenceNumber outbound_sequence_length;
{
absl::MutexLock lock(&mutex_);
if (outward_link_ && outward_link_->IsRemoteLinkTo(node_link, sublink)) {
dead_outward_link = std::move(outward_link_);
inbound_sequence_length = inbound_parcels_.GetCurrentSequenceLength();
} else if (inward_link_ &&
inward_link_->IsRemoteLinkTo(node_link, sublink)) {
dead_inward_link = std::move(inward_link_);
outbound_sequence_length = outbound_parcels_.GetCurrentSequenceLength();
}
}
if (dead_outward_link) {
AcceptRouteClosureFrom(dead_outward_link->GetType(),
inbound_sequence_length);
}
if (dead_inward_link) {
AcceptRouteClosureFrom(dead_inward_link->GetType(),
outbound_sequence_length);
}
}
void Router::Flush() {
Ref<RouterLink> outward_link;
Ref<RouterLink> inward_link;
Ref<RouterLink> dead_inward_link;
Ref<RouterLink> dead_outward_link;
absl::InlinedVector<Parcel, 2> inbound_parcels;
absl::InlinedVector<Parcel, 2> outbound_parcels;
absl::optional<SequenceNumber> final_outward_sequence_length;
{
absl::MutexLock lock(&mutex_);
outward_link = outward_link_;
inward_link = inward_link_;
// Collect any outbound parcels which are safe to transmit now. Note that we
// do not transmit anything or generally call into any RouterLinks while
// `mutex_` is held, because such calls may ultimately re-enter this Router
// (e.g. if a link is a LocalRouterLink, or even a RemoteRouterLink with a
// fully synchronous driver.) Instead we accumulate work within this block,
// and then perform any transmissions or link deactivations after the mutex
// is released further below.
Parcel parcel;
while (outbound_parcels_.HasNextElement() && outward_link) {
bool ok = outbound_parcels_.Pop(parcel);
ABSL_ASSERT(ok);
outbound_parcels.push_back(std::move(parcel));
}
// If we have an inward link, then we're a proxy. Collect any queued inbound
// parcels to forward over that link.
while (inbound_parcels_.HasNextElement() && inward_link) {
bool ok = inbound_parcels_.Pop(parcel);
ABSL_ASSERT(ok);
inbound_parcels.push_back(std::move(parcel));
}
if (outward_link && outbound_parcels_.IsSequenceFullyConsumed()) {
// Notify the other end of the route that this end is closed. See the
// AcceptRouteClosure() invocation further below.
final_outward_sequence_length =
*outbound_parcels_.final_sequence_length();
// We also have no more use for either outward or inward links: trivially
// there are no more outbound parcels to send outward, and there no longer
// exists an ultimate destination for any forwarded inbound parcels. So we
// drop both links now.
dead_outward_link = std::move(outward_link_);
dead_inward_link = std::move(inward_link_);
}
}
for (Parcel& parcel : outbound_parcels) {
outward_link->AcceptParcel(parcel);
}
for (Parcel& parcel : inbound_parcels) {
inward_link->AcceptParcel(parcel);
}
if (final_outward_sequence_length) {
outward_link->AcceptRouteClosure(*final_outward_sequence_length);
}
if (dead_outward_link) {
dead_outward_link->Deactivate();
}
if (dead_inward_link) {
dead_inward_link->Deactivate();
}
}
} // namespace ipcz