blob: 0d6de61889d6bc81e35616d52ba2a6262a61745b [file] [log] [blame]
// Copyright 2022 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "ipcz/remote_router_link.h"
#include <sstream>
#include <utility>
#include "ipcz/box.h"
#include "ipcz/node_link.h"
#include "ipcz/node_link_memory.h"
#include "ipcz/node_messages.h"
#include "ipcz/portal.h"
#include "ipcz/router.h"
#include "util/log.h"
namespace ipcz {
RemoteRouterLink::RemoteRouterLink(Ref<NodeLink> node_link,
SublinkId sublink,
FragmentRef<RouterLinkState> link_state,
LinkType type,
LinkSide side)
: node_link_(std::move(node_link)),
sublink_(sublink),
type_(type),
side_(side) {
// Central links must be constructed with a valid RouterLinkState fragment.
// Other links must not.
ABSL_ASSERT(type.is_central() == !link_state.is_null());
if (type.is_central()) {
SetLinkState(std::move(link_state));
}
}
RemoteRouterLink::~RemoteRouterLink() = default;
// static
Ref<RemoteRouterLink> RemoteRouterLink::Create(
Ref<NodeLink> node_link,
SublinkId sublink,
FragmentRef<RouterLinkState> link_state,
LinkType type,
LinkSide side) {
return AdoptRef(new RemoteRouterLink(std::move(node_link), sublink,
std::move(link_state), type, side));
}
void RemoteRouterLink::SetLinkState(FragmentRef<RouterLinkState> state) {
ABSL_ASSERT(type_.is_central());
ABSL_ASSERT(!state.is_null());
if (state.is_pending()) {
// Wait for the fragment's buffer to be mapped locally.
Ref<NodeLinkMemory> memory = WrapRefCounted(&node_link()->memory());
FragmentDescriptor descriptor = state.fragment().descriptor();
memory->WaitForBufferAsync(
descriptor.buffer_id(),
[self = WrapRefCounted(this), memory, descriptor] {
self->SetLinkState(memory->AdoptFragmentRef<RouterLinkState>(
memory->GetFragment(descriptor)));
});
return;
}
ABSL_ASSERT(state.is_addressable());
// SetLinkState() must be called with an addressable fragment only once.
ABSL_ASSERT(link_state_.load(std::memory_order_acquire) == nullptr);
// The release when storing `link_state_` is balanced by an acquire in
// GetLinkState().
link_state_fragment_ = std::move(state);
link_state_.store(link_state_fragment_.get(), std::memory_order_release);
// If this side of the link was already marked stable before the
// RouterLinkState was available, `side_is_stable_` will be true. In that
// case, set the stable bit in RouterLinkState immediately. This may unblock
// some routing work. The acquire here is balanced by a release in
// MarkSideStable().
if (side_is_stable_.load(std::memory_order_acquire)) {
MarkSideStable();
}
if (Ref<Router> router = node_link()->GetRouter(sublink_)) {
router->Flush(Router::kForceProxyBypassAttempt);
}
}
LinkType RemoteRouterLink::GetType() const {
return type_;
}
RouterLinkState* RemoteRouterLink::GetLinkState() const {
return link_state_.load(std::memory_order_acquire);
}
Ref<Router> RemoteRouterLink::GetLocalPeer() {
return nullptr;
}
RemoteRouterLink* RemoteRouterLink::AsRemoteRouterLink() {
return this;
}
void RemoteRouterLink::AcceptParcel(Parcel& parcel) {
const absl::Span<Ref<APIObject>> objects = parcel.objects_view();
msg::AcceptParcel accept;
accept.params().sublink = sublink_;
accept.params().sequence_number = parcel.sequence_number();
size_t num_portals = 0;
absl::InlinedVector<DriverObject, 2> driver_objects;
bool must_relay_driver_objects = false;
for (Ref<APIObject>& object : objects) {
switch (object->object_type()) {
case APIObject::kPortal:
++num_portals;
break;
case APIObject::kBox: {
Box* box = Box::FromObject(object.get());
ABSL_ASSERT(box);
if (!box->object().CanTransmitOn(*node_link()->transport())) {
must_relay_driver_objects = true;
}
driver_objects.push_back(std::move(box->object()));
break;
}
default:
break;
}
}
// If driver objects will require relaying through the broker, then the parcel
// must be split into two separate messages: one for the driver objects (which
// will be relayed), and one for the rest of the message (which will transmit
// directly).
//
// This ensures that many side effects of message receipt are well-ordered
// with other transmissions on the same link from the same thread. Namely,
// since a thread may send a message which introduces a new remote Router on a
// new sublink, followed immediately by a message which targets that Router,
// it is critical that both messages arrive in the order they were sent. If
// one of the messages is relayed while the other is not, ordering could not
// be guaranteed.
const bool must_split_parcel = must_relay_driver_objects;
// Allocate all the arrays in the message. Note that each allocation may
// relocate the parcel data in memory, so views into these arrays should not
// be acquired until all allocations are complete.
accept.params().parcel_data =
accept.AllocateArray<uint8_t>(parcel.data_view().size());
accept.params().handle_types =
accept.AllocateArray<HandleType>(objects.size());
accept.params().new_routers =
accept.AllocateArray<RouterDescriptor>(num_portals);
const absl::Span<uint8_t> parcel_data =
accept.GetArrayView<uint8_t>(accept.params().parcel_data);
const absl::Span<HandleType> handle_types =
accept.GetArrayView<HandleType>(accept.params().handle_types);
const absl::Span<RouterDescriptor> new_routers =
accept.GetArrayView<RouterDescriptor>(accept.params().new_routers);
if (!parcel_data.empty()) {
memcpy(parcel_data.data(), parcel.data_view().data(), parcel.data_size());
}
// Serialize attached objects. We accumulate the Routers of all attached
// portals, because we need to reference them again after transmission, with
// a 1:1 correspondence to the serialized RouterDescriptors.
absl::InlinedVector<Ref<Router>, 4> routers_to_proxy;
for (size_t i = 0; i < objects.size(); ++i) {
APIObject& object = *objects[i];
switch (object.object_type()) {
case APIObject::kPortal: {
handle_types[i] = HandleType::kPortal;
Ref<Router> router = Portal::FromObject(&object)->router();
router->SerializeNewRouter(*node_link(), new_routers[i]);
routers_to_proxy.push_back(std::move(router));
break;
}
case APIObject::kBox:
handle_types[i] =
must_split_parcel ? HandleType::kRelayedBox : HandleType::kBox;
break;
default:
DLOG(FATAL) << "Attempted to transmit an invalid object.";
break;
}
}
if (must_split_parcel) {
msg::AcceptParcelDriverObjects accept_objects;
accept_objects.params().sublink = sublink_;
accept_objects.params().sequence_number = parcel.sequence_number();
accept_objects.params().driver_objects =
accept_objects.AppendDriverObjects(absl::MakeSpan(driver_objects));
DVLOG(4) << "Transmitting objects for " << parcel.Describe() << " over "
<< Describe();
node_link()->Transmit(accept_objects);
} else {
accept.params().driver_objects =
accept.AppendDriverObjects(absl::MakeSpan(driver_objects));
}
DVLOG(4) << "Transmitting " << parcel.Describe() << " over " << Describe();
node_link()->Transmit(accept);
// Now that the parcel has been transmitted, it's safe to start proxying from
// any routers whose routes have just been extended to the destination.
ABSL_ASSERT(routers_to_proxy.size() == new_routers.size());
for (size_t i = 0; i < routers_to_proxy.size(); ++i) {
routers_to_proxy[i]->BeginProxyingToNewRouter(*node_link(), new_routers[i]);
}
// Finally, a Parcel will normally close all attached objects when destroyed.
// Since we've successfully transmitted this parcel and all its objects, we
// prevent that behavior by taking away all its object references.
for (Ref<APIObject>& object : objects) {
Ref<APIObject> released_object = std::move(object);
}
}
void RemoteRouterLink::AcceptRouteClosure(SequenceNumber sequence_length) {
msg::RouteClosed route_closed;
route_closed.params().sublink = sublink_;
route_closed.params().sequence_length = sequence_length;
node_link()->Transmit(route_closed);
}
size_t RemoteRouterLink::GetParcelCapacityInBytes(const IpczPutLimits& limits) {
if (limits.max_queued_bytes == 0 || limits.max_queued_parcels == 0) {
return 0;
}
RouterLinkState* state = GetLinkState();
if (!state) {
// This is only a best-effort estimate. With no link state yet, err on the
// side of more data flow.
return limits.max_queued_bytes;
}
const RouterLinkState::QueueState peer_queue =
state->GetQueueState(side_.opposite());
if (peer_queue.num_parcels >= limits.max_queued_parcels ||
peer_queue.num_bytes >= limits.max_queued_bytes) {
return 0;
}
return limits.max_queued_bytes - peer_queue.num_bytes;
}
RouterLinkState::QueueState RemoteRouterLink::GetPeerQueueState() {
if (auto* state = GetLinkState()) {
return state->GetQueueState(side_.opposite());
}
return {.num_parcels = 0, .num_bytes = 0};
}
bool RemoteRouterLink::UpdateInboundQueueState(size_t num_parcels,
size_t num_bytes) {
RouterLinkState* state = GetLinkState();
return state && state->UpdateQueueState(side_, num_parcels, num_bytes);
}
void RemoteRouterLink::NotifyDataConsumed() {
msg::NotifyDataConsumed notify;
notify.params().sublink = sublink_;
node_link()->Transmit(notify);
}
bool RemoteRouterLink::EnablePeerMonitoring(bool enable) {
RouterLinkState* state = GetLinkState();
return state && state->SetSideIsMonitoringPeer(side_, enable);
}
void RemoteRouterLink::AcceptRouteDisconnected() {
msg::RouteDisconnected route_disconnected;
route_disconnected.params().sublink = sublink_;
node_link()->Transmit(route_disconnected);
}
void RemoteRouterLink::MarkSideStable() {
side_is_stable_.store(true, std::memory_order_release);
if (RouterLinkState* state = GetLinkState()) {
state->SetSideStable(side_);
}
}
bool RemoteRouterLink::TryLockForBypass(const NodeName& bypass_request_source) {
RouterLinkState* state = GetLinkState();
if (!state || !state->TryLock(side_)) {
return false;
}
state->allowed_bypass_request_source = bypass_request_source;
// Balanced by an acquire in CanNodeRequestBypass().
std::atomic_thread_fence(std::memory_order_release);
return true;
}
bool RemoteRouterLink::TryLockForClosure() {
RouterLinkState* state = GetLinkState();
return state && state->TryLock(side_);
}
void RemoteRouterLink::Unlock() {
if (RouterLinkState* state = GetLinkState()) {
state->Unlock(side_);
}
}
bool RemoteRouterLink::FlushOtherSideIfWaiting() {
RouterLinkState* state = GetLinkState();
if (!state || !state->ResetWaitingBit(side_.opposite())) {
return false;
}
msg::FlushRouter flush;
flush.params().sublink = sublink_;
node_link()->Transmit(flush);
return true;
}
bool RemoteRouterLink::CanNodeRequestBypass(
const NodeName& bypass_request_source) {
RouterLinkState* state = GetLinkState();
// Balanced by a release in TryLockForBypass().
std::atomic_thread_fence(std::memory_order_acquire);
return state && state->is_locked_by(side_.opposite()) &&
state->allowed_bypass_request_source == bypass_request_source;
}
void RemoteRouterLink::Deactivate() {
node_link()->RemoveRemoteRouterLink(sublink_);
}
void RemoteRouterLink::BypassPeer(const NodeName& bypass_target_node,
SublinkId bypass_target_sublink) {
msg::BypassPeer bypass;
bypass.params().sublink = sublink_;
bypass.params().reserved0 = 0;
bypass.params().bypass_target_node = bypass_target_node;
bypass.params().bypass_target_sublink = bypass_target_sublink;
node_link()->Transmit(bypass);
}
void RemoteRouterLink::StopProxying(SequenceNumber inbound_sequence_length,
SequenceNumber outbound_sequence_length) {
msg::StopProxying stop;
stop.params().sublink = sublink_;
stop.params().inbound_sequence_length = inbound_sequence_length;
stop.params().outbound_sequence_length = outbound_sequence_length;
node_link()->Transmit(stop);
}
void RemoteRouterLink::ProxyWillStop(SequenceNumber inbound_sequence_length) {
msg::ProxyWillStop will_stop;
will_stop.params().sublink = sublink_;
will_stop.params().inbound_sequence_length = inbound_sequence_length;
node_link()->Transmit(will_stop);
}
void RemoteRouterLink::BypassPeerWithLink(
SublinkId new_sublink,
FragmentRef<RouterLinkState> new_link_state,
SequenceNumber inbound_sequence_length) {
msg::BypassPeerWithLink bypass;
bypass.params().sublink = sublink_;
bypass.params().new_sublink = new_sublink;
bypass.params().new_link_state_fragment =
new_link_state.release().descriptor();
bypass.params().inbound_sequence_length = inbound_sequence_length;
node_link()->Transmit(bypass);
}
void RemoteRouterLink::StopProxyingToLocalPeer(
SequenceNumber outbound_sequence_length) {
msg::StopProxyingToLocalPeer stop;
stop.params().sublink = sublink_;
stop.params().outbound_sequence_length = outbound_sequence_length;
node_link()->Transmit(stop);
}
std::string RemoteRouterLink::Describe() const {
std::stringstream ss;
ss << type_.ToString() << " link from "
<< node_link_->local_node_name().ToString() << " to "
<< node_link_->remote_node_name().ToString() << " via sublink "
<< sublink_;
return ss.str();
}
} // namespace ipcz