| // Copyright 2022 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "ipcz/node_link.h" |
| |
| #include <algorithm> |
| #include <atomic> |
| #include <cstddef> |
| #include <cstdint> |
| #include <limits> |
| #include <utility> |
| |
| #include "ipcz/box.h" |
| #include "ipcz/fragment_ref.h" |
| #include "ipcz/ipcz.h" |
| #include "ipcz/link_side.h" |
| #include "ipcz/link_type.h" |
| #include "ipcz/message.h" |
| #include "ipcz/node.h" |
| #include "ipcz/node_link_memory.h" |
| #include "ipcz/node_messages.h" |
| #include "ipcz/portal.h" |
| #include "ipcz/remote_router_link.h" |
| #include "ipcz/router.h" |
| #include "ipcz/router_link.h" |
| #include "ipcz/router_link_state.h" |
| #include "ipcz/sublink_id.h" |
| #include "third_party/abseil-cpp/absl/base/macros.h" |
| #include "util/log.h" |
| #include "util/ref_counted.h" |
| #include "util/safe_math.h" |
| |
| namespace ipcz { |
| |
| namespace { |
| |
| template <typename T> |
| FragmentRef<T> MaybeAdoptFragmentRef(NodeLinkMemory& memory, |
| const FragmentDescriptor& descriptor) { |
| if (descriptor.is_null() || descriptor.size() < sizeof(T)) { |
| return {}; |
| } |
| |
| return memory.AdoptFragmentRef<T>(memory.GetFragment(descriptor)); |
| } |
| |
| } // namespace |
| |
| // static |
| Ref<NodeLink> NodeLink::CreateActive(Ref<Node> node, |
| LinkSide link_side, |
| const NodeName& local_node_name, |
| const NodeName& remote_node_name, |
| Node::Type remote_node_type, |
| uint32_t remote_protocol_version, |
| Ref<DriverTransport> transport, |
| Ref<NodeLinkMemory> memory) { |
| return AdoptRef(new NodeLink(std::move(node), link_side, local_node_name, |
| remote_node_name, remote_node_type, |
| remote_protocol_version, std::move(transport), |
| std::move(memory), kActive)); |
| } |
| |
| // static |
| Ref<NodeLink> NodeLink::CreateInactive(Ref<Node> node, |
| LinkSide link_side, |
| const NodeName& local_node_name, |
| const NodeName& remote_node_name, |
| Node::Type remote_node_type, |
| uint32_t remote_protocol_version, |
| Ref<DriverTransport> transport, |
| Ref<NodeLinkMemory> memory) { |
| return AdoptRef(new NodeLink(std::move(node), link_side, local_node_name, |
| remote_node_name, remote_node_type, |
| remote_protocol_version, std::move(transport), |
| std::move(memory), kNeverActivated)); |
| } |
| |
| NodeLink::NodeLink(Ref<Node> node, |
| LinkSide link_side, |
| const NodeName& local_node_name, |
| const NodeName& remote_node_name, |
| Node::Type remote_node_type, |
| uint32_t remote_protocol_version, |
| Ref<DriverTransport> transport, |
| Ref<NodeLinkMemory> memory, |
| ActivationState initial_activation_state) |
| : node_(std::move(node)), |
| link_side_(link_side), |
| local_node_name_(local_node_name), |
| remote_node_name_(remote_node_name), |
| remote_node_type_(remote_node_type), |
| remote_protocol_version_(remote_protocol_version), |
| transport_(std::move(transport)), |
| memory_(std::move(memory)), |
| activation_state_(initial_activation_state) { |
| if (initial_activation_state == kActive) { |
| transport_->set_listener(WrapRefCounted(this)); |
| memory_->SetNodeLink(WrapRefCounted(this)); |
| } |
| } |
| |
| NodeLink::~NodeLink() { |
| absl::MutexLock lock(&mutex_); |
| ABSL_HARDENING_ASSERT(activation_state_ != kActive); |
| } |
| |
| void NodeLink::Activate() { |
| transport_->set_listener(WrapRefCounted(this)); |
| memory_->SetNodeLink(WrapRefCounted(this)); |
| |
| { |
| absl::MutexLock lock(&mutex_); |
| ABSL_ASSERT(activation_state_ == kNeverActivated); |
| activation_state_ = kActive; |
| } |
| |
| transport_->Activate(); |
| } |
| |
| Ref<RemoteRouterLink> NodeLink::AddRemoteRouterLink( |
| SublinkId sublink, |
| FragmentRef<RouterLinkState> link_state, |
| LinkType type, |
| LinkSide side, |
| Ref<Router> router) { |
| auto link = RemoteRouterLink::Create(WrapRefCounted(this), sublink, |
| std::move(link_state), type, side); |
| |
| absl::MutexLock lock(&mutex_); |
| if (activation_state_ == kDeactivated) { |
| // We don't bind new RemoteRouterLinks once we've been deactivated, lest we |
| // incur leaky NodeLink references. |
| return nullptr; |
| } |
| |
| auto [it, added] = sublinks_.try_emplace( |
| sublink, Sublink(std::move(link), std::move(router))); |
| if (!added) { |
| // The SublinkId provided here may have been received from another node and |
| // may already be in use if the node is misbehaving. |
| return nullptr; |
| } |
| return it->second.router_link; |
| } |
| |
| void NodeLink::RemoveRemoteRouterLink(SublinkId sublink) { |
| absl::MutexLock lock(&mutex_); |
| sublinks_.erase(sublink); |
| } |
| |
| absl::optional<NodeLink::Sublink> NodeLink::GetSublink(SublinkId sublink) { |
| absl::MutexLock lock(&mutex_); |
| auto it = sublinks_.find(sublink); |
| if (it == sublinks_.end()) { |
| return absl::nullopt; |
| } |
| return it->second; |
| } |
| |
| Ref<Router> NodeLink::GetRouter(SublinkId sublink) { |
| absl::MutexLock lock(&mutex_); |
| auto it = sublinks_.find(sublink); |
| if (it == sublinks_.end()) { |
| return nullptr; |
| } |
| return it->second.receiver; |
| } |
| |
| void NodeLink::AddBlockBuffer(BufferId id, |
| uint32_t block_size, |
| DriverMemory memory) { |
| msg::AddBlockBuffer add; |
| add.params().id = id; |
| add.params().block_size = block_size; |
| add.params().buffer = add.AppendDriverObject(memory.TakeDriverObject()); |
| Transmit(add); |
| } |
| |
| void NodeLink::RequestIntroduction(const NodeName& name) { |
| ABSL_ASSERT(remote_node_type_ == Node::Type::kBroker); |
| |
| msg::RequestIntroduction request; |
| request.params().name = name; |
| Transmit(request); |
| } |
| |
| void NodeLink::AcceptIntroduction(const NodeName& name, |
| LinkSide side, |
| uint32_t remote_protocol_version, |
| Ref<DriverTransport> transport, |
| DriverMemory memory) { |
| ABSL_ASSERT(node_->type() == Node::Type::kBroker); |
| |
| msg::AcceptIntroduction accept; |
| accept.params().name = name; |
| accept.params().link_side = side; |
| accept.params().remote_protocol_version = remote_protocol_version; |
| accept.params().transport = |
| accept.AppendDriverObject(transport->TakeDriverObject()); |
| accept.params().memory = accept.AppendDriverObject(memory.TakeDriverObject()); |
| Transmit(accept); |
| } |
| |
| void NodeLink::RejectIntroduction(const NodeName& name) { |
| ABSL_ASSERT(node_->type() == Node::Type::kBroker); |
| |
| msg::RejectIntroduction reject; |
| reject.params().name = name; |
| Transmit(reject); |
| } |
| |
| void NodeLink::AcceptBypassLink( |
| const NodeName& current_peer_node, |
| SublinkId current_peer_sublink, |
| SequenceNumber inbound_sequence_length_from_bypassed_link, |
| SublinkId new_sublink, |
| FragmentRef<RouterLinkState> link_state) { |
| msg::AcceptBypassLink accept; |
| accept.params().current_peer_node = current_peer_node; |
| accept.params().current_peer_sublink = current_peer_sublink; |
| accept.params().inbound_sequence_length_from_bypassed_link = |
| inbound_sequence_length_from_bypassed_link; |
| accept.params().new_sublink = new_sublink; |
| accept.params().new_link_state_fragment = link_state.release().descriptor(); |
| Transmit(accept); |
| } |
| |
| void NodeLink::RequestMemory(size_t size, RequestMemoryCallback callback) { |
| const uint32_t size32 = checked_cast<uint32_t>(size); |
| { |
| absl::MutexLock lock(&mutex_); |
| pending_memory_requests_[size32].push_back(std::move(callback)); |
| } |
| |
| msg::RequestMemory request; |
| request.params().size = size32; |
| Transmit(request); |
| } |
| |
| void NodeLink::Deactivate() { |
| { |
| absl::MutexLock lock(&mutex_); |
| if (activation_state_ != kActive) { |
| return; |
| } |
| activation_state_ = kDeactivated; |
| } |
| |
| OnTransportError(); |
| transport_->Deactivate(); |
| memory_->SetNodeLink(nullptr); |
| } |
| |
| void NodeLink::Transmit(Message& message) { |
| if (!message.CanTransmitOn(*transport_)) { |
| // The driver has indicated that it can't transmit this message through our |
| // transport, so the message must instead be relayed through a broker. |
| // |
| // TODO: Broker relaying not yet implemented. |
| ABSL_ASSERT(false); |
| return; |
| } |
| |
| message.header().sequence_number = GenerateOutgoingSequenceNumber(); |
| transport_->Transmit(message); |
| } |
| |
| SequenceNumber NodeLink::GenerateOutgoingSequenceNumber() { |
| return SequenceNumber(next_outgoing_sequence_number_generator_.fetch_add( |
| 1, std::memory_order_relaxed)); |
| } |
| |
| bool NodeLink::OnRequestIntroduction(msg::RequestIntroduction& request) { |
| // TODO: Support broker-to-broker introduction requests. |
| if (remote_node_type_ != Node::Type::kNormal || |
| node()->type() != Node::Type::kBroker) { |
| return false; |
| } |
| |
| node()->HandleIntroductionRequest(*this, request.params().name); |
| return true; |
| } |
| |
| bool NodeLink::OnAcceptIntroduction(msg::AcceptIntroduction& accept) { |
| if (remote_node_type_ != Node::Type::kBroker) { |
| return false; |
| } |
| |
| if (node()->type() != Node::Type::kNormal) { |
| // TODO: Support broker-to-broker introductions. |
| return false; |
| } |
| |
| auto memory = DriverMemory(accept.TakeDriverObject(accept.params().memory)); |
| if (!memory.is_valid()) { |
| return false; |
| } |
| |
| auto mapping = memory.Map(); |
| if (!mapping.is_valid()) { |
| return false; |
| } |
| |
| auto transport = MakeRefCounted<DriverTransport>( |
| accept.TakeDriverObject(accept.params().transport)); |
| node()->AcceptIntroduction( |
| *this, accept.params().name, accept.params().link_side, |
| accept.params().remote_protocol_version, std::move(transport), |
| NodeLinkMemory::Create(node(), std::move(mapping))); |
| return true; |
| } |
| |
| bool NodeLink::OnRejectIntroduction(msg::RejectIntroduction& reject) { |
| if (remote_node_type_ != Node::Type::kBroker) { |
| return false; |
| } |
| |
| if (node()->type() != Node::Type::kNormal) { |
| // TODO: Support broker-to-broker introductions. |
| return false; |
| } |
| |
| return node()->CancelIntroduction(reject.params().name); |
| } |
| |
| bool NodeLink::OnAddBlockBuffer(msg::AddBlockBuffer& add) { |
| DriverMemory buffer(add.TakeDriverObject(add.params().buffer)); |
| if (!buffer.is_valid()) { |
| return false; |
| } |
| return memory().AddBlockBuffer(add.params().id, add.params().block_size, |
| buffer.Map()); |
| } |
| |
| bool NodeLink::OnAcceptParcel(msg::AcceptParcel& accept) { |
| absl::Span<const uint8_t> parcel_data = |
| accept.GetArrayView<uint8_t>(accept.params().parcel_data); |
| absl::Span<const HandleType> handle_types = |
| accept.GetArrayView<HandleType>(accept.params().handle_types); |
| absl::Span<const RouterDescriptor> new_routers = |
| accept.GetArrayView<RouterDescriptor>(accept.params().new_routers); |
| auto driver_objects = accept.driver_objects(); |
| |
| // Note that on any validation failure below, we defer rejection at least |
| // until any deserialized objects are stored in a new Parcel object. This |
| // ensures that they're properly cleaned up before we return. |
| bool parcel_valid = true; |
| std::vector<Ref<APIObject>> objects(handle_types.size()); |
| for (size_t i = 0; i < handle_types.size(); ++i) { |
| switch (handle_types[i]) { |
| case HandleType::kPortal: { |
| if (new_routers.empty()) { |
| parcel_valid = false; |
| continue; |
| } |
| |
| Ref<Router> new_router = Router::Deserialize(new_routers[0], *this); |
| if (!new_router) { |
| parcel_valid = false; |
| continue; |
| } |
| |
| objects[i] = MakeRefCounted<Portal>(node_, std::move(new_router)); |
| new_routers.remove_prefix(1); |
| break; |
| } |
| |
| case HandleType::kBox: { |
| if (driver_objects.empty()) { |
| return false; |
| } |
| |
| objects[i] = MakeRefCounted<Box>(std::move(driver_objects[0])); |
| driver_objects.remove_prefix(1); |
| break; |
| } |
| |
| default: |
| parcel_valid = false; |
| break; |
| } |
| } |
| |
| if (!new_routers.empty() || !driver_objects.empty()) { |
| // There should be no unclaimed routers. If there are, it's a validation |
| // failure. |
| parcel_valid = false; |
| } |
| |
| const SublinkId for_sublink = accept.params().sublink; |
| Parcel parcel(accept.params().sequence_number); |
| parcel.SetObjects(std::move(objects)); |
| if (!parcel_valid) { |
| return false; |
| } |
| |
| parcel.SetInlinedData( |
| std::vector<uint8_t>(parcel_data.begin(), parcel_data.end())); |
| |
| const absl::optional<Sublink> sublink = GetSublink(for_sublink); |
| if (!sublink) { |
| DVLOG(4) << "Dropping " << parcel.Describe() << " at " |
| << local_node_name_.ToString() << ", arriving from " |
| << remote_node_name_.ToString() << " via unknown sublink " |
| << for_sublink; |
| return true; |
| } |
| const LinkType link_type = sublink->router_link->GetType(); |
| if (link_type.is_outward()) { |
| DVLOG(4) << "Accepting inbound " << parcel.Describe() << " at " |
| << sublink->router_link->Describe(); |
| return sublink->receiver->AcceptInboundParcel(parcel); |
| } |
| |
| ABSL_ASSERT(link_type.is_peripheral_inward()); |
| DVLOG(4) << "Accepting outbound " << parcel.Describe() << " at " |
| << sublink->router_link->Describe(); |
| return sublink->receiver->AcceptOutboundParcel(parcel); |
| } |
| |
| bool NodeLink::OnRouteClosed(msg::RouteClosed& route_closed) { |
| absl::optional<Sublink> sublink = GetSublink(route_closed.params().sublink); |
| if (!sublink) { |
| // The sublink may have already been removed, for example if the application |
| // has already closed the associated router. It is therefore not considered |
| // an error to receive a RouteClosed message for an unknown sublink. |
| return true; |
| } |
| |
| return sublink->receiver->AcceptRouteClosureFrom( |
| sublink->router_link->GetType(), route_closed.params().sequence_length); |
| } |
| |
| bool NodeLink::OnRouteDisconnected(msg::RouteDisconnected& route_closed) { |
| absl::optional<Sublink> sublink = GetSublink(route_closed.params().sublink); |
| if (!sublink) { |
| return true; |
| } |
| |
| DVLOG(4) << "Accepting RouteDisconnected at " |
| << sublink->router_link->Describe(); |
| |
| return sublink->receiver->AcceptRouteDisconnectedFrom( |
| sublink->router_link->GetType()); |
| } |
| |
| bool NodeLink::OnBypassPeer(msg::BypassPeer& bypass) { |
| absl::optional<Sublink> sublink = GetSublink(bypass.params().sublink); |
| if (!sublink) { |
| return true; |
| } |
| |
| // NOTE: This request is authenticated by the receiving Router, within |
| // BypassPeer(). |
| return sublink->receiver->BypassPeer(*sublink->router_link, |
| bypass.params().bypass_target_node, |
| bypass.params().bypass_target_sublink); |
| } |
| |
| bool NodeLink::OnAcceptBypassLink(msg::AcceptBypassLink& accept) { |
| Ref<NodeLink> node_link_to_peer = |
| node_->GetLink(accept.params().current_peer_node); |
| if (!node_link_to_peer) { |
| // If the link to the peer has been severed for whatever reason, the |
| // relevant route will be torn down anyway. It's safe to ignore this |
| // request in that case. |
| return true; |
| } |
| |
| const Ref<Router> receiver = |
| node_link_to_peer->GetRouter(accept.params().current_peer_sublink); |
| if (!receiver) { |
| // Similar to above, if the targeted Router cannot be resolved from the |
| // given sublink, this implies that the route has already been at least |
| // partially torn down. It's safe to ignore this request. |
| return true; |
| } |
| |
| auto link_state = MaybeAdoptFragmentRef<RouterLinkState>( |
| memory(), accept.params().new_link_state_fragment); |
| if (link_state.is_null()) { |
| // Bypass links must always come with a valid fragment for their |
| // RouterLinkState. If one has not been provided, that's a validation |
| // failure. |
| return false; |
| } |
| |
| return receiver->AcceptBypassLink( |
| *this, accept.params().new_sublink, std::move(link_state), |
| accept.params().inbound_sequence_length_from_bypassed_link); |
| } |
| |
| bool NodeLink::OnStopProxying(msg::StopProxying& stop) { |
| Ref<Router> router = GetRouter(stop.params().sublink); |
| if (!router) { |
| return true; |
| } |
| |
| return router->StopProxying(stop.params().inbound_sequence_length, |
| stop.params().outbound_sequence_length); |
| } |
| |
| bool NodeLink::OnProxyWillStop(msg::ProxyWillStop& will_stop) { |
| Ref<Router> router = GetRouter(will_stop.params().sublink); |
| if (!router) { |
| return true; |
| } |
| |
| return router->NotifyProxyWillStop( |
| will_stop.params().inbound_sequence_length); |
| } |
| |
| bool NodeLink::OnBypassPeerWithLink(msg::BypassPeerWithLink& bypass) { |
| Ref<Router> router = GetRouter(bypass.params().sublink); |
| if (!router) { |
| return true; |
| } |
| |
| auto link_state = MaybeAdoptFragmentRef<RouterLinkState>( |
| memory(), bypass.params().new_link_state_fragment); |
| if (link_state.is_null()) { |
| return false; |
| } |
| return router->AcceptBypassLink(*this, bypass.params().new_sublink, |
| std::move(link_state), |
| bypass.params().inbound_sequence_length); |
| } |
| |
| bool NodeLink::OnStopProxyingToLocalPeer(msg::StopProxyingToLocalPeer& stop) { |
| Ref<Router> router = GetRouter(stop.params().sublink); |
| if (!router) { |
| return true; |
| } |
| |
| return router->StopProxyingToLocalPeer( |
| stop.params().outbound_sequence_length); |
| } |
| |
| bool NodeLink::OnFlushRouter(msg::FlushRouter& flush) { |
| if (Ref<Router> router = GetRouter(flush.params().sublink)) { |
| router->Flush(Router::kForceProxyBypassAttempt); |
| } |
| return true; |
| } |
| |
| bool NodeLink::OnRequestMemory(msg::RequestMemory& request) { |
| DriverMemory memory(node_->driver(), request.params().size); |
| msg::ProvideMemory provide; |
| provide.params().size = request.params().size; |
| provide.params().buffer = |
| provide.AppendDriverObject(memory.TakeDriverObject()); |
| Transmit(provide); |
| return true; |
| } |
| |
| bool NodeLink::OnProvideMemory(msg::ProvideMemory& provide) { |
| DriverMemory memory(provide.TakeDriverObject(provide.params().buffer)); |
| RequestMemoryCallback callback; |
| { |
| absl::MutexLock lock(&mutex_); |
| auto it = pending_memory_requests_.find(provide.params().size); |
| if (it == pending_memory_requests_.end()) { |
| return false; |
| } |
| |
| std::list<RequestMemoryCallback>& callbacks = it->second; |
| ABSL_ASSERT(!callbacks.empty()); |
| callback = std::move(callbacks.front()); |
| callbacks.pop_front(); |
| if (callbacks.empty()) { |
| pending_memory_requests_.erase(it); |
| } |
| } |
| |
| callback(std::move(memory)); |
| return true; |
| } |
| |
| void NodeLink::OnTransportError() { |
| SublinkMap sublinks; |
| { |
| absl::MutexLock lock(&mutex_); |
| sublinks.swap(sublinks_); |
| } |
| |
| for (auto& [id, sublink] : sublinks) { |
| DVLOG(4) << "NodeLink disconnection dropping " |
| << sublink.router_link->Describe() << " which is bound to router " |
| << sublink.receiver.get(); |
| sublink.receiver->NotifyLinkDisconnected(*sublink.router_link); |
| } |
| |
| Ref<NodeLink> self = WrapRefCounted(this); |
| node_->DropLink(remote_node_name_); |
| } |
| |
| NodeLink::Sublink::Sublink(Ref<RemoteRouterLink> router_link, |
| Ref<Router> receiver) |
| : router_link(std::move(router_link)), receiver(std::move(receiver)) {} |
| |
| NodeLink::Sublink::Sublink(Sublink&&) = default; |
| |
| NodeLink::Sublink::Sublink(const Sublink&) = default; |
| |
| NodeLink::Sublink& NodeLink::Sublink::operator=(Sublink&&) = default; |
| |
| NodeLink::Sublink& NodeLink::Sublink::operator=(const Sublink&) = default; |
| |
| NodeLink::Sublink::~Sublink() = default; |
| |
| } // namespace ipcz |