blob: 250cd25aa41de73bc0f5bb844c4c06c72a6c4dd0 [file] [log] [blame]
// Copyright 2022 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "ipcz/node_link.h"
#include <algorithm>
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <limits>
#include <optional>
#include <utility>
#include "ipcz/box.h"
#include "ipcz/fragment_ref.h"
#include "ipcz/ipcz.h"
#include "ipcz/link_side.h"
#include "ipcz/link_type.h"
#include "ipcz/message.h"
#include "ipcz/node.h"
#include "ipcz/node_connector.h"
#include "ipcz/node_link_memory.h"
#include "ipcz/node_messages.h"
#include "ipcz/parcel.h"
#include "ipcz/remote_router_link.h"
#include "ipcz/router.h"
#include "ipcz/router_link.h"
#include "ipcz/router_link_state.h"
#include "ipcz/sublink_id.h"
#include "third_party/abseil-cpp/absl/base/macros.h"
#include "util/log.h"
#include "util/ref_counted.h"
#include "util/safe_math.h"
namespace ipcz {
// static
Ref<NodeLink> NodeLink::CreateActive(Ref<Node> node,
LinkSide link_side,
const NodeName& local_node_name,
const NodeName& remote_node_name,
Node::Type remote_node_type,
uint32_t remote_protocol_version,
const Features& remote_features,
Ref<DriverTransport> transport,
Ref<NodeLinkMemory> memory) {
return AdoptRef(new NodeLink(
std::move(node), link_side, local_node_name, remote_node_name,
remote_node_type, remote_protocol_version, remote_features,
std::move(transport), std::move(memory), kActive));
}
// static
Ref<NodeLink> NodeLink::CreateInactive(Ref<Node> node,
LinkSide link_side,
const NodeName& local_node_name,
const NodeName& remote_node_name,
Node::Type remote_node_type,
uint32_t remote_protocol_version,
const Features& remote_features,
Ref<DriverTransport> transport,
Ref<NodeLinkMemory> memory) {
return AdoptRef(new NodeLink(
std::move(node), link_side, local_node_name, remote_node_name,
remote_node_type, remote_protocol_version, remote_features,
std::move(transport), std::move(memory), kNeverActivated));
}
NodeLink::NodeLink(Ref<Node> node,
LinkSide link_side,
const NodeName& local_node_name,
const NodeName& remote_node_name,
Node::Type remote_node_type,
uint32_t remote_protocol_version,
const Features& remote_features,
Ref<DriverTransport> transport,
Ref<NodeLinkMemory> memory,
ActivationState initial_activation_state)
: node_(std::move(node)),
link_side_(link_side),
local_node_name_(local_node_name),
remote_node_name_(remote_node_name),
remote_node_type_(remote_node_type),
remote_protocol_version_(remote_protocol_version),
remote_features_(remote_features),
available_features_(node_->features().Intersect(remote_features_)),
transport_(std::move(transport)),
memory_(std::move(memory)),
activation_state_(initial_activation_state) {
if (initial_activation_state == kActive) {
transport_->set_listener(WrapRefCounted(this));
memory_->SetNodeLink(WrapRefCounted(this));
}
}
NodeLink::~NodeLink() {
absl::MutexLock lock(&mutex_);
ABSL_HARDENING_ASSERT(activation_state_ != kActive);
}
void NodeLink::Activate() {
transport_->set_listener(WrapRefCounted(this));
memory_->SetNodeLink(WrapRefCounted(this));
{
absl::MutexLock lock(&mutex_);
ABSL_ASSERT(activation_state_ == kNeverActivated);
activation_state_ = kActive;
}
transport_->Activate();
}
Ref<RemoteRouterLink> NodeLink::AddRemoteRouterLink(
SublinkId sublink,
FragmentRef<RouterLinkState> link_state,
LinkType type,
LinkSide side,
Ref<Router> router) {
auto link = RemoteRouterLink::Create(WrapRefCounted(this), sublink,
std::move(link_state), type, side);
absl::MutexLock lock(&mutex_);
if (activation_state_ == kDeactivated) {
// We don't bind new RemoteRouterLinks once we've been deactivated, lest we
// incur leaky NodeLink references.
return nullptr;
}
auto [it, added] = sublinks_.try_emplace(
sublink, Sublink(std::move(link), std::move(router)));
if (!added) {
// The SublinkId provided here may have been received from another node and
// may already be in use if the node is misbehaving.
return nullptr;
}
return it->second.router_link;
}
void NodeLink::RemoveRemoteRouterLink(SublinkId sublink) {
absl::MutexLock lock(&mutex_);
sublinks_.erase(sublink);
}
std::optional<NodeLink::Sublink> NodeLink::GetSublink(SublinkId sublink) {
absl::MutexLock lock(&mutex_);
auto it = sublinks_.find(sublink);
if (it == sublinks_.end()) {
return std::nullopt;
}
return it->second;
}
Ref<Router> NodeLink::GetRouter(SublinkId sublink) {
absl::MutexLock lock(&mutex_);
auto it = sublinks_.find(sublink);
if (it == sublinks_.end()) {
return nullptr;
}
return it->second.receiver;
}
void NodeLink::AddBlockBuffer(BufferId id,
uint32_t block_size,
DriverMemory memory) {
msg::AddBlockBuffer add;
add.v0()->id = id;
add.v0()->block_size = block_size;
add.v0()->buffer = add.AppendDriverObject(memory.TakeDriverObject());
Transmit(add);
}
void NodeLink::RequestIntroduction(const NodeName& name) {
ABSL_ASSERT(remote_node_type_ == Node::Type::kBroker);
msg::RequestIntroduction request;
request.v0()->name = name;
Transmit(request);
}
void NodeLink::AcceptIntroduction(const NodeName& name,
LinkSide side,
Node::Type remote_node_type,
uint32_t remote_protocol_version,
const Features& remote_features,
Ref<DriverTransport> transport,
DriverMemory memory) {
ABSL_ASSERT(node_->type() == Node::Type::kBroker);
msg::AcceptIntroduction accept;
accept.v0()->name = name;
accept.v0()->link_side = side;
accept.v0()->remote_node_type = remote_node_type;
accept.v0()->padding = 0;
accept.v0()->remote_protocol_version = remote_protocol_version;
accept.v0()->transport =
accept.AppendDriverObject(transport->TakeDriverObject());
accept.v0()->memory = accept.AppendDriverObject(memory.TakeDriverObject());
accept.v1()->remote_features = remote_features.Serialize(accept);
Transmit(accept);
}
void NodeLink::RejectIntroduction(const NodeName& name) {
ABSL_ASSERT(node_->type() == Node::Type::kBroker);
msg::RejectIntroduction reject;
reject.v0()->name = name;
Transmit(reject);
}
void NodeLink::ReferNonBroker(Ref<DriverTransport> transport,
uint32_t num_initial_portals,
ReferralCallback callback) {
ABSL_ASSERT(node_->type() == Node::Type::kNormal &&
remote_node_type_ == Node::Type::kBroker);
uint64_t referral_id;
{
absl::MutexLock lock(&mutex_);
for (;;) {
referral_id = next_referral_id_++;
auto [it, inserted] =
pending_referrals_.try_emplace(referral_id, std::move(callback));
if (inserted) {
break;
}
}
}
msg::ReferNonBroker refer;
refer.v0()->referral_id = referral_id;
refer.v0()->num_initial_portals = num_initial_portals;
refer.v0()->transport =
refer.AppendDriverObject(transport->TakeDriverObject());
Transmit(refer);
}
void NodeLink::AcceptBypassLink(
const NodeName& current_peer_node,
SublinkId current_peer_sublink,
SequenceNumber inbound_sequence_length_from_bypassed_link,
SublinkId new_sublink,
FragmentRef<RouterLinkState> link_state) {
msg::AcceptBypassLink accept;
accept.v0()->current_peer_node = current_peer_node;
accept.v0()->current_peer_sublink = current_peer_sublink;
accept.v0()->inbound_sequence_length_from_bypassed_link =
inbound_sequence_length_from_bypassed_link;
accept.v0()->new_sublink = new_sublink;
accept.v0()->new_link_state_fragment = link_state.release().descriptor();
Transmit(accept);
}
void NodeLink::RequestMemory(size_t size, RequestMemoryCallback callback) {
const uint32_t size32 = checked_cast<uint32_t>(size);
{
absl::MutexLock lock(&mutex_);
pending_memory_requests_[size32].push_back(std::move(callback));
}
msg::RequestMemory request;
request.v0()->size = size32;
request.v0()->padding = 0;
Transmit(request);
}
void NodeLink::RelayMessage(const NodeName& to_node, Message& message) {
ABSL_ASSERT(remote_node_type_ == Node::Type::kBroker);
msg::RelayMessage relay;
relay.v0()->destination = to_node;
relay.v0()->data = relay.AllocateArray<uint8_t>(message.data_view().size());
relay.v0()->padding = 0;
memcpy(relay.GetArrayData(relay.v0()->data), message.data_view().data(),
message.data_view().size());
relay.v0()->driver_objects =
relay.AppendDriverObjects(message.driver_objects());
Transmit(relay);
}
bool NodeLink::DispatchRelayedMessage(msg::AcceptRelayedMessage& accept) {
// We only allow a limited subset of messages to be relayed through a broker.
// Namely, any message which might carry driver objects between two
// non-brokers needs to be relayable.
//
// If an unknown or unsupported message type is relayed it's silently
// discarded, rather than being rejected as a validation failure. This leaves
// open the possibility for newer versions of a message to introduce driver
// objects and support relaying.
absl::Span<uint8_t> data = accept.GetArrayView<uint8_t>(accept.v0()->data);
absl::Span<DriverObject> objects = accept.driver_objects();
if (data.size() < sizeof(internal::MessageHeaderV0)) {
return false;
}
const uint8_t message_id =
reinterpret_cast<internal::MessageHeaderV0*>(data.data())->message_id;
switch (message_id) {
case msg::AcceptParcelDriverObjects::kId: {
msg::AcceptParcelDriverObjects accept_parcel;
return accept_parcel.DeserializeRelayed(data, objects) &&
OnAcceptParcelDriverObjects(accept_parcel);
}
case msg::AddBlockBuffer::kId: {
msg::AddBlockBuffer add;
return add.DeserializeRelayed(data, objects) && OnAddBlockBuffer(add);
}
default:
DVLOG(4) << "Ignoring relayed message with ID "
<< static_cast<int>(message_id);
return true;
}
}
void NodeLink::Deactivate() {
{
absl::MutexLock lock(&mutex_);
if (activation_state_ != kActive) {
return;
}
activation_state_ = kDeactivated;
}
HandleTransportError();
transport_->Deactivate();
memory_->SetNodeLink(nullptr);
}
void NodeLink::Transmit(Message& message) {
if (!message.CanTransmitOn(*transport_)) {
// The driver has indicated that it can't transmit this message through our
// transport, so the message must instead be relayed through a broker.
auto broker = node_->GetBrokerLink();
if (!broker) {
DLOG(ERROR) << "Cannot relay message without a broker link";
return;
}
broker->RelayMessage(remote_node_name_, message);
return;
}
message.header().sequence_number = GenerateOutgoingSequenceNumber();
transport_->Transmit(message);
}
SequenceNumber NodeLink::GenerateOutgoingSequenceNumber() {
return SequenceNumber(next_outgoing_sequence_number_generator_.fetch_add(
1, std::memory_order_relaxed));
}
bool NodeLink::OnReferNonBroker(msg::ReferNonBroker& refer) {
if (remote_node_type_ != Node::Type::kNormal ||
node()->type() != Node::Type::kBroker) {
return false;
}
DriverObject transport = refer.TakeDriverObject(refer.v0()->transport);
if (!transport.is_valid()) {
return false;
}
DriverMemoryWithMapping link_memory =
NodeLinkMemory::AllocateMemory(node()->driver());
DriverMemoryWithMapping client_link_memory =
NodeLinkMemory::AllocateMemory(node()->driver());
if (!link_memory.mapping.is_valid() ||
!client_link_memory.mapping.is_valid()) {
// Not a validation failure, but we can't accept the referral because we
// can't allocate link memory for one side or the other.
msg::NonBrokerReferralRejected rejected;
rejected.v0()->referral_id = refer.v0()->referral_id;
Transmit(rejected);
return true;
}
return NodeConnector::HandleNonBrokerReferral(
node(), refer.v0()->referral_id, refer.v0()->num_initial_portals,
WrapRefCounted(this),
MakeRefCounted<DriverTransport>(std::move(transport)),
std::move(link_memory), std::move(client_link_memory));
}
bool NodeLink::OnNonBrokerReferralAccepted(
msg::NonBrokerReferralAccepted& accepted) {
if (remote_node_type_ != Node::Type::kBroker) {
return false;
}
ReferralCallback callback;
{
absl::MutexLock lock(&mutex_);
auto it = pending_referrals_.find(accepted.v0()->referral_id);
if (it == pending_referrals_.end()) {
return false;
}
callback = std::move(it->second);
pending_referrals_.erase(it);
}
const uint32_t protocol_version =
std::min(msg::kProtocolVersion, accepted.v0()->protocol_version);
auto transport = MakeRefCounted<DriverTransport>(
accepted.TakeDriverObject(accepted.v0()->transport));
DriverMemoryMapping mapping =
DriverMemory(accepted.TakeDriverObject(accepted.v0()->buffer)).Map();
if (!transport->driver_object().is_valid() || !mapping.is_valid()) {
// Not quite a validation failure if the broker simply failed to allocate
// resources for this link. Treat it like a connection failure.
callback(/*link=*/nullptr, /*num_initial_portals=*/0);
return true;
}
Features remote_features = {};
if (auto* v1 = accepted.v1()) {
remote_features = Features::Deserialize(accepted, v1->features);
}
Ref<NodeLink> link_to_referree = NodeLink::CreateInactive(
node_, LinkSide::kA, local_node_name_, accepted.v0()->name,
Node::Type::kNormal, protocol_version, remote_features,
std::move(transport),
NodeLinkMemory::Create(node_, LinkSide::kA, remote_features,
std::move(mapping)));
callback(link_to_referree, accepted.v0()->num_initial_portals);
link_to_referree->Activate();
return true;
}
bool NodeLink::OnNonBrokerReferralRejected(
msg::NonBrokerReferralRejected& rejected) {
if (remote_node_type_ != Node::Type::kBroker) {
return false;
}
ReferralCallback callback;
{
absl::MutexLock lock(&mutex_);
auto it = pending_referrals_.find(rejected.v0()->referral_id);
if (it == pending_referrals_.end()) {
return false;
}
callback = std::move(it->second);
pending_referrals_.erase(it);
}
callback(/*link=*/nullptr, /*num_initial_portals=*/0);
return true;
}
bool NodeLink::OnRequestIntroduction(msg::RequestIntroduction& request) {
if (node()->type() != Node::Type::kBroker) {
return false;
}
node()->HandleIntroductionRequest(*this, request.v0()->name);
return true;
}
bool NodeLink::OnAcceptIntroduction(msg::AcceptIntroduction& accept) {
if (remote_node_type_ != Node::Type::kBroker) {
return false;
}
DriverMemoryMapping mapping =
DriverMemory(accept.TakeDriverObject(accept.v0()->memory)).Map();
if (!mapping.is_valid()) {
return false;
}
Features remote_features = {};
if (auto* v1 = accept.v1()) {
remote_features = Features::Deserialize(accept, v1->remote_features);
}
DriverObject transport_object =
accept.TakeDriverObject(accept.v0()->transport);
if (!transport_object.is_valid()) {
return false;
}
auto transport = MakeRefCounted<DriverTransport>(std::move(transport_object));
node()->AcceptIntroduction(
*this, accept.v0()->name, accept.v0()->link_side,
accept.v0()->remote_node_type, accept.v0()->remote_protocol_version,
remote_features, std::move(transport),
NodeLinkMemory::Create(node(), accept.v0()->link_side, remote_features,
std::move(mapping)));
return true;
}
bool NodeLink::OnRejectIntroduction(msg::RejectIntroduction& reject) {
if (remote_node_type_ != Node::Type::kBroker) {
return false;
}
node()->NotifyIntroductionFailed(*this, reject.v0()->name);
return true;
}
bool NodeLink::OnRequestIndirectIntroduction(
msg::RequestIndirectIntroduction& request) {
// By convention only a broker on side B of a broker-to-broker link will send
// this message, and so only side-A broker-to-broker links can receive it.
if (remote_node_type_ != Node::Type::kBroker ||
node()->type() != Node::Type::kBroker || !link_side_.is_side_a()) {
return false;
}
return node()->HandleIndirectIntroductionRequest(
*this, request.v0()->target_node, request.v0()->source_node);
}
bool NodeLink::OnAddBlockBuffer(msg::AddBlockBuffer& add) {
DriverMemoryMapping mapping =
DriverMemory(add.TakeDriverObject(add.v0()->buffer)).Map();
if (!mapping.is_valid()) {
return false;
}
return memory().AddBlockBuffer(add.v0()->id, add.v0()->block_size,
std::move(mapping));
}
bool NodeLink::OnAcceptParcel(msg::AcceptParcel& accept) {
absl::Span<uint8_t> parcel_data =
accept.GetArrayView<uint8_t>(accept.v0()->parcel_data);
absl::Span<const HandleType> handle_types =
accept.GetArrayView<HandleType>(accept.v0()->handle_types);
absl::Span<const RouterDescriptor> new_routers =
accept.GetArrayView<RouterDescriptor>(accept.v0()->new_routers);
const SublinkId for_sublink = accept.v0()->sublink;
auto driver_objects = accept.driver_objects();
// Note that on any validation failure below, we defer rejection at least
// until any deserialized objects are stored in a new Parcel object. This
// ensures that they're properly cleaned up before we return.
bool parcel_valid = true;
bool is_split_parcel = false;
std::vector<Ref<APIObject>> objects(handle_types.size());
for (size_t i = 0; i < handle_types.size(); ++i) {
switch (handle_types[i]) {
case HandleType::kPortal: {
if (new_routers.empty()) {
parcel_valid = false;
continue;
}
Ref<Router> new_router =
Router::Deserialize(new_routers[0], *this, for_sublink);
if (!new_router) {
parcel_valid = false;
continue;
}
objects[i] = std::move(new_router);
new_routers.remove_prefix(1);
break;
}
case HandleType::kBoxedDriverObject: {
if (driver_objects.empty()) {
return false;
}
objects[i] = MakeRefCounted<Box>(std::move(driver_objects[0]));
driver_objects.remove_prefix(1);
break;
}
case HandleType::kRelayedBoxedDriverObject: {
is_split_parcel = true;
break;
}
case HandleType::kBoxedSubparcel:
// Store a placeholder object for each expected subparcel. These will
// be filled in by AcceptCompleteParcel() once the last complete
// subparcel is accepted.
objects[i] =
MakeRefCounted<Box>(MakeRefCounted<ParcelWrapper>(nullptr));
break;
default:
parcel_valid = false;
break;
}
}
if (!new_routers.empty() || !driver_objects.empty()) {
// There should be no unclaimed routers. If there are, it's a validation
// failure.
parcel_valid = false;
}
const uint32_t num_subparcels = accept.v0()->num_subparcels;
const uint32_t subparcel_index = accept.v0()->subparcel_index;
if (num_subparcels > Parcel::kMaxSubparcelsPerParcel ||
subparcel_index >= num_subparcels) {
return false;
}
auto parcel = std::make_unique<Parcel>(accept.v0()->sequence_number);
parcel->set_num_subparcels(num_subparcels);
parcel->set_subparcel_index(subparcel_index);
parcel->SetObjects(std::move(objects));
parcel->SetEnvelope(accept.TakeEnvelope());
if (!parcel_valid) {
return false;
}
const FragmentDescriptor descriptor = accept.v0()->parcel_fragment;
if (!descriptor.is_null()) {
// The parcel's data resides in a shared memory fragment.
const Fragment fragment = memory().GetFragment(descriptor);
if (fragment.is_pending()) {
// We don't have this buffer yet, but we expect to receive it ASAP. Defer
// acceptance until then.
WaitForParcelFragmentToResolve(for_sublink, std::move(parcel), descriptor,
is_split_parcel);
return true;
}
if (fragment.is_null() ||
!parcel->AdoptDataFragment(WrapRefCounted(&memory()), fragment)) {
return false;
}
} else {
// The parcel's data was inlined within the AcceptParcel message. Adopt the
// Message contents so our local Parcel doesn't need to copy any data.
parcel->SetDataFromMessage(std::move(accept).TakeReceivedData(),
parcel_data);
}
if (is_split_parcel) {
return AcceptParcelWithoutDriverObjects(for_sublink, std::move(parcel));
}
return AcceptCompleteParcel(for_sublink, std::move(parcel));
}
bool NodeLink::OnAcceptParcelDriverObjects(
msg::AcceptParcelDriverObjects& accept) {
auto parcel = std::make_unique<Parcel>(accept.v0()->sequence_number);
std::vector<Ref<APIObject>> objects;
objects.reserve(accept.driver_objects().size());
for (auto& object : accept.driver_objects()) {
objects.push_back(MakeRefCounted<Box>(std::move(object)));
}
parcel->SetObjects(std::move(objects));
return AcceptParcelDriverObjects(accept.v0()->sublink, std::move(parcel));
}
bool NodeLink::OnRouteClosed(msg::RouteClosed& route_closed) {
std::optional<Sublink> sublink = GetSublink(route_closed.v0()->sublink);
if (!sublink) {
// The sublink may have already been removed, for example if the application
// has already closed the associated router. It is therefore not considered
// an error to receive a RouteClosed message for an unknown sublink.
return true;
}
return sublink->receiver->AcceptRouteClosureFrom(
sublink->router_link->GetType(), route_closed.v0()->sequence_length);
}
bool NodeLink::OnRouteDisconnected(msg::RouteDisconnected& route_closed) {
std::optional<Sublink> sublink = GetSublink(route_closed.v0()->sublink);
if (!sublink) {
return true;
}
DVLOG(4) << "Accepting RouteDisconnected at "
<< sublink->router_link->Describe();
return sublink->receiver->AcceptRouteDisconnectedFrom(
sublink->router_link->GetType());
}
bool NodeLink::OnBypassPeer(msg::BypassPeer& bypass) {
std::optional<Sublink> sublink = GetSublink(bypass.v0()->sublink);
if (!sublink) {
return true;
}
// NOTE: This request is authenticated by the receiving Router, within
// BypassPeer().
return sublink->receiver->BypassPeer(*sublink->router_link,
bypass.v0()->bypass_target_node,
bypass.v0()->bypass_target_sublink);
}
bool NodeLink::OnAcceptBypassLink(msg::AcceptBypassLink& accept) {
Ref<NodeLink> node_link_to_peer =
node_->GetLink(accept.v0()->current_peer_node);
if (!node_link_to_peer) {
// If the link to the peer has been severed for whatever reason, the
// relevant route will be torn down anyway. It's safe to ignore this
// request in that case.
return true;
}
const Ref<Router> receiver =
node_link_to_peer->GetRouter(accept.v0()->current_peer_sublink);
if (!receiver) {
// Similar to above, if the targeted Router cannot be resolved from the
// given sublink, this implies that the route has already been at least
// partially torn down. It's safe to ignore this request.
return true;
}
auto link_state = memory().AdoptFragmentRefIfValid<RouterLinkState>(
accept.v0()->new_link_state_fragment);
if (link_state.is_null()) {
// Bypass links must always come with a valid fragment for their
// RouterLinkState. If one has not been provided, that's a validation
// failure.
return false;
}
return receiver->AcceptBypassLink(
*this, accept.v0()->new_sublink, std::move(link_state),
accept.v0()->inbound_sequence_length_from_bypassed_link);
}
bool NodeLink::OnStopProxying(msg::StopProxying& stop) {
Ref<Router> router = GetRouter(stop.v0()->sublink);
if (!router) {
return true;
}
return router->StopProxying(stop.v0()->inbound_sequence_length,
stop.v0()->outbound_sequence_length);
}
bool NodeLink::OnProxyWillStop(msg::ProxyWillStop& will_stop) {
Ref<Router> router = GetRouter(will_stop.v0()->sublink);
if (!router) {
return true;
}
return router->NotifyProxyWillStop(will_stop.v0()->inbound_sequence_length);
}
bool NodeLink::OnBypassPeerWithLink(msg::BypassPeerWithLink& bypass) {
Ref<Router> router = GetRouter(bypass.v0()->sublink);
if (!router) {
return true;
}
auto link_state = memory().AdoptFragmentRefIfValid<RouterLinkState>(
bypass.v0()->new_link_state_fragment);
if (link_state.is_null()) {
return false;
}
return router->AcceptBypassLink(*this, bypass.v0()->new_sublink,
std::move(link_state),
bypass.v0()->inbound_sequence_length);
}
bool NodeLink::OnStopProxyingToLocalPeer(msg::StopProxyingToLocalPeer& stop) {
Ref<Router> router = GetRouter(stop.v0()->sublink);
if (!router) {
return true;
}
return router->StopProxyingToLocalPeer(stop.v0()->outbound_sequence_length);
}
bool NodeLink::OnFlushRouter(msg::FlushRouter& flush) {
if (Ref<Router> router = GetRouter(flush.v0()->sublink)) {
router->Flush(Router::kForceProxyBypassAttempt);
}
return true;
}
bool NodeLink::OnRequestMemory(msg::RequestMemory& request) {
if (request.v0()->size == 0) {
return false;
}
DriverMemory memory(node_->driver(), request.v0()->size);
msg::ProvideMemory provide;
provide.v0()->size = request.v0()->size;
provide.v0()->buffer = provide.AppendDriverObject(memory.TakeDriverObject());
Transmit(provide);
return true;
}
bool NodeLink::OnProvideMemory(msg::ProvideMemory& provide) {
DriverMemory memory(provide.TakeDriverObject(provide.v0()->buffer));
RequestMemoryCallback callback;
{
absl::MutexLock lock(&mutex_);
auto it = pending_memory_requests_.find(provide.v0()->size);
if (it == pending_memory_requests_.end()) {
return false;
}
std::list<RequestMemoryCallback>& callbacks = it->second;
ABSL_ASSERT(!callbacks.empty());
callback = std::move(callbacks.front());
callbacks.pop_front();
if (callbacks.empty()) {
pending_memory_requests_.erase(it);
}
}
callback(std::move(memory));
return true;
}
bool NodeLink::OnRelayMessage(msg::RelayMessage& relay) {
if (node_->type() != Node::Type::kBroker) {
return false;
}
return node_->RelayMessage(remote_node_name_, relay);
}
bool NodeLink::OnAcceptRelayedMessage(msg::AcceptRelayedMessage& accept) {
if (remote_node_type_ != Node::Type::kBroker) {
return false;
}
return node_->AcceptRelayedMessage(accept);
}
void NodeLink::OnTransportError() {
HandleTransportError();
}
void NodeLink::HandleTransportError() {
SublinkMap sublinks;
SubparcelTrackerMap subparcel_trackers;
ReferralCallbackMap pending_referrals;
{
absl::MutexLock lock(&mutex_);
sublinks.swap(sublinks_);
subparcel_trackers.swap(subparcel_trackers_);
pending_referrals.swap(pending_referrals_);
}
for (auto& [id, sublink] : sublinks) {
DVLOG(4) << "NodeLink disconnection dropping "
<< sublink.router_link->Describe() << " which is bound to router "
<< sublink.receiver.get();
sublink.receiver->NotifyLinkDisconnected(*sublink.router_link);
}
for (auto& [id, callback] : pending_referrals) {
callback(/*link=*/nullptr, /*num_portals=*/0);
}
Ref<NodeLink> self = WrapRefCounted(this);
node_->DropConnection(*this);
memory_->NotifyLinkDisconnected();
}
void NodeLink::WaitForParcelFragmentToResolve(
SublinkId for_sublink,
std::unique_ptr<Parcel> parcel,
const FragmentDescriptor& descriptor,
bool is_split_parcel) {
auto wrapper = MakeRefCounted<ParcelWrapper>(std::move(parcel));
memory().WaitForBufferAsync(
descriptor.buffer_id(), [this_link = WrapRefCounted(this), for_sublink,
is_split_parcel, wrapper, descriptor]() {
Ref<NodeLinkMemory> memory = WrapRefCounted(&this_link->memory());
const Fragment fragment = memory->GetFragment(descriptor);
std::unique_ptr<Parcel> parcel = wrapper->TakeParcel();
if (!fragment.is_addressable() ||
!parcel->AdoptDataFragment(std::move(memory), fragment)) {
// The fragment is out of bounds or had an invalid header. Either way
// it doesn't look good for the remote node.
this_link->OnTransportError();
return;
}
if (is_split_parcel) {
this_link->AcceptParcelWithoutDriverObjects(for_sublink,
std::move(parcel));
} else {
this_link->AcceptCompleteParcel(for_sublink, std::move(parcel));
}
});
}
bool NodeLink::AcceptParcelWithoutDriverObjects(
SublinkId for_sublink,
std::unique_ptr<Parcel> parcel) {
const auto key = std::make_tuple(for_sublink, parcel->sequence_number());
std::unique_ptr<Parcel> parcel_with_driver_objects;
{
absl::MutexLock lock(&mutex_);
// Note that `parcel` is not actually moved here unless try_emplace
// succeeds.
auto [it, inserted] = partial_parcels_.try_emplace(key, std::move(parcel));
if (inserted) {
return true;
}
parcel_with_driver_objects = std::move(it->second);
partial_parcels_.erase(it);
}
return AcceptSplitParcel(for_sublink, std::move(parcel),
std::move(parcel_with_driver_objects));
}
bool NodeLink::AcceptParcelDriverObjects(SublinkId for_sublink,
std::unique_ptr<Parcel> parcel) {
const auto key = std::make_tuple(for_sublink, parcel->sequence_number());
std::unique_ptr<Parcel> parcel_without_driver_objects;
{
absl::MutexLock lock(&mutex_);
auto [it, inserted] = partial_parcels_.try_emplace(key, std::move(parcel));
if (inserted) {
return true;
}
parcel_without_driver_objects = std::move(it->second);
partial_parcels_.erase(it);
}
return AcceptSplitParcel(
for_sublink, std::move(parcel_without_driver_objects), std::move(parcel));
}
bool NodeLink::AcceptSplitParcel(
SublinkId for_sublink,
std::unique_ptr<Parcel> parcel_without_driver_objects,
std::unique_ptr<Parcel> parcel_with_driver_objects) {
// The parcel with no driver objects should still have an object attachemnt
// slot reserved for every relayed driver object.
if (parcel_without_driver_objects->num_objects() <
parcel_with_driver_objects->num_objects()) {
return false;
}
// Fill in all the object gaps in the data-only parcel with the boxed objects
// from the driver objects parcel.
auto& complete_parcel = parcel_without_driver_objects;
auto remaining_driver_objects = parcel_with_driver_objects->objects_view();
for (auto& object : complete_parcel->objects_view()) {
if (object) {
continue;
}
if (remaining_driver_objects.empty()) {
return false;
}
object = std::move(remaining_driver_objects[0]);
remaining_driver_objects.remove_prefix(1);
}
// At least one driver object was unclaimed by the data half of the parcel.
// That's not right.
if (!remaining_driver_objects.empty()) {
return false;
}
return AcceptCompleteParcel(for_sublink, std::move(complete_parcel));
}
bool NodeLink::AcceptCompleteParcel(SublinkId for_sublink,
std::unique_ptr<Parcel> parcel) {
const std::optional<Sublink> sublink = GetSublink(for_sublink);
if (!sublink) {
DVLOG(4) << "Dropping " << parcel->Describe() << " at "
<< local_node_name_.ToString() << ", arriving from "
<< remote_node_name_.ToString() << " via unknown sublink "
<< for_sublink;
return true;
}
// Note that the common case is a standalone complete parcel, where the number
// of subparcels is 1. In that case no additional tracking is necessary and we
// immediately accept the parcel below.
const size_t num_subparcels = parcel->num_subparcels();
if (num_subparcels > 1) {
auto key = std::make_tuple(for_sublink, parcel->sequence_number());
absl::MutexLock lock(&mutex_);
auto [it, inserted] =
subparcel_trackers_.try_emplace(key, SubparcelTracker{});
SubparcelTracker& tracker = it->second;
if (inserted) {
tracker.subparcels.resize(num_subparcels);
} else if (tracker.subparcels.size() != num_subparcels) {
// Inconsistent subparcel count expectations across subparcels. This is
// a validation failure.
return false;
}
// Note that `index` has already been validated against the expected number
// of subparcels in OnAcceptParcel().
const size_t index = parcel->subparcel_index();
ABSL_ASSERT(index < tracker.subparcels.size());
if (tracker.subparcels[index]) {
// Multiple subparcels claim the same index for this SequenceNumber. Bad.
return false;
}
tracker.subparcels[index] =
MakeRefCounted<ParcelWrapper>(std::move(parcel));
tracker.num_subparcels_received++;
if (tracker.num_subparcels_received < num_subparcels) {
// Still waiting for more subparcels.
return true;
}
// We have all subparcels for this SequenceNumber. Join them and proceed.
// We do this by iterating over the object attachments in subparcel 0,
// replacing any placeholder ParcelWrapper objects with our own real ones.
auto subparcels = std::move(tracker.subparcels);
subparcel_trackers_.erase(it);
parcel = subparcels[0]->TakeParcel();
absl::Span<Ref<ParcelWrapper>> remaining_subparcels =
absl::MakeSpan(subparcels).subspan(1);
for (auto& object : parcel->objects_view()) {
Box* box = Box::FromObject(object.get());
if (box && box->type() == Box::Type::kSubparcel) {
if (remaining_subparcels.empty()) {
// Too many placeholder objects in the main parcel. Bad.
return false;
}
ParcelWrapper* wrapper = box->subparcel().get();
wrapper->SetParcel(remaining_subparcels.front()->TakeParcel());
remaining_subparcels.remove_prefix(1);
}
}
if (!remaining_subparcels.empty()) {
// One or more subparcels unclaimed by the main parcel. Bad.
return false;
}
}
// At this point we've collected all expected subparcels and can pass the full
// parcel along to its receiver.
parcel->set_remote_source(WrapRefCounted(this));
const LinkType link_type = sublink->router_link->GetType();
if (link_type.is_outward()) {
DVLOG(4) << "Accepting inbound " << parcel->Describe() << " at "
<< sublink->router_link->Describe();
return sublink->receiver->AcceptInboundParcel(std::move(parcel));
}
ABSL_ASSERT(link_type.is_peripheral_inward());
DVLOG(4) << "Accepting outbound " << parcel->Describe() << " at "
<< sublink->router_link->Describe();
return sublink->receiver->AcceptOutboundParcel(std::move(parcel));
}
NodeLink::Sublink::Sublink(Ref<RemoteRouterLink> router_link,
Ref<Router> receiver)
: router_link(std::move(router_link)), receiver(std::move(receiver)) {}
NodeLink::Sublink::Sublink(Sublink&&) = default;
NodeLink::Sublink::Sublink(const Sublink&) = default;
NodeLink::Sublink& NodeLink::Sublink::operator=(Sublink&&) = default;
NodeLink::Sublink& NodeLink::Sublink::operator=(const Sublink&) = default;
NodeLink::Sublink::~Sublink() = default;
} // namespace ipcz