blob: 1b358f19272e2c434b4d2a5063866157f2847e94 [file] [log] [blame]
// Copyright 2022 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef IPCZ_SRC_IPCZ_NODE_LINK_H_
#define IPCZ_SRC_IPCZ_NODE_LINK_H_
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <type_traits>
#include "ipcz/driver_memory.h"
#include "ipcz/driver_transport.h"
#include "ipcz/fragment_ref.h"
#include "ipcz/link_side.h"
#include "ipcz/link_type.h"
#include "ipcz/node.h"
#include "ipcz/node_link_memory.h"
#include "ipcz/node_messages.h"
#include "ipcz/node_name.h"
#include "ipcz/sequence_number.h"
#include "ipcz/sublink_id.h"
#include "third_party/abseil-cpp/absl/container/flat_hash_map.h"
#include "third_party/abseil-cpp/absl/synchronization/mutex.h"
#include "third_party/abseil-cpp/absl/types/optional.h"
#include "third_party/abseil-cpp/absl/types/span.h"
#include "util/ref_counted.h"
namespace ipcz {
class Message;
class Parcel;
class RemoteRouterLink;
class Router;
// A NodeLink instance encapsulates all communication from its owning node to
// exactly one other remote node in the sytem. Each NodeLink manages a
// DriverTransport for general-purpose I/O to and from the remote node.
//
// NodeLinks may also allocate an arbitrary number of sublinks which are used
// to multiplex the link and facilitate point-to-point communication between
// specific Router instances on either end.
class NodeLink : public msg::NodeMessageListener {
public:
struct Sublink {
Sublink(Ref<RemoteRouterLink> link, Ref<Router> receiver);
Sublink(Sublink&&);
Sublink(const Sublink&);
Sublink& operator=(Sublink&&);
Sublink& operator=(const Sublink&);
~Sublink();
Ref<RemoteRouterLink> router_link;
Ref<Router> receiver;
};
// Creates a new NodeLink over a transport which is already active. This is
// only safe to call from within the activity handler of that transport, and
// the returned NodeLink will have effectively already been activated.
static Ref<NodeLink> CreateActive(Ref<Node> node,
LinkSide link_side,
const NodeName& local_node_name,
const NodeName& remote_node_name,
Node::Type remote_node_type,
uint32_t remote_protocol_version,
Ref<DriverTransport> transport,
Ref<NodeLinkMemory> memory);
// Creates a new NodeLink over a transport which is not yet active. This
// NodeLink must be explicitly activated with Activate() before it can be
// used.
static Ref<NodeLink> CreateInactive(Ref<Node> node,
LinkSide link_side,
const NodeName& local_node_name,
const NodeName& remote_node_name,
Node::Type remote_node_type,
uint32_t remote_protocol_version,
Ref<DriverTransport> transport,
Ref<NodeLinkMemory> memory);
const Ref<Node>& node() const { return node_; }
LinkSide link_side() const { return link_side_; }
const NodeName& local_node_name() const { return local_node_name_; }
const NodeName& remote_node_name() const { return remote_node_name_; }
Node::Type remote_node_type() const { return remote_node_type_; }
uint32_t remote_protocol_version() const { return remote_protocol_version_; }
const Ref<DriverTransport>& transport() const { return transport_; }
NodeLinkMemory& memory() { return *memory_; }
const NodeLinkMemory& memory() const { return *memory_; }
// Activates this NodeLink. The NodeLink must have been created with
// CreateInactive() and must not have already been activated.
void Activate();
// Binds `sublink` on this NodeLink to the given `router`. `link_side`
// specifies which side of the link this end identifies as (A or B), and
// `type` specifies the type of link this is, from the perspective of
// `router`.
//
// If `link_state_fragment` is non-null, the given fragment contains the
// shared RouterLinkState structure for the new link. Only central links
// require a RouterLinkState.
Ref<RemoteRouterLink> AddRemoteRouterLink(
SublinkId sublink,
FragmentRef<RouterLinkState> link_state,
LinkType type,
LinkSide side,
Ref<Router> router);
// Removes the route specified by `sublink`. Once removed, any messages
// received for that sublink are ignored.
void RemoveRemoteRouterLink(SublinkId sublink);
// Retrieves the Router and RemoteRouterLink currently bound to `sublink`
// on this NodeLink.
absl::optional<Sublink> GetSublink(SublinkId sublink);
// Retrieves only the Router currently bound to `sublink` on this NodeLink.
Ref<Router> GetRouter(SublinkId sublink);
// Sends a new driver memory object to the remote endpoint to be associated
// with BufferId within the peer NodeLink's associated NodeLinkMemory, and to
// be used to dynamically allocate blocks of `block_size` bytes. The BufferId
// must have already been reserved locally by this NodeLink using
// AllocateNewBufferId().
void AddBlockBuffer(BufferId id, uint32_t block_size, DriverMemory memory);
// Asks the broker on the other end of this link to introduce the local node
// to the node identified by `name`. This will always elicit a response from
// the broker in the form of either an AcceptIntroduction or
// RejectIntroduction message.
void RequestIntroduction(const NodeName& name);
// Introduces the remote node to the node named `name`, with details needed to
// construct a new NodeLink to that node.
void AcceptIntroduction(const NodeName& name,
LinkSide side,
Node::Type remote_node_type,
uint32_t remote_protocol_version,
Ref<DriverTransport> transport,
DriverMemory memory);
// Rejects an introduction request previously sent by the remote node for the
// node identified by `name`.
void RejectIntroduction(const NodeName& name);
// May be called on a link from a non-broker to a broker in order to refer a
// new node to the remote broker. `transport` is a transport whose peer
// endpoint belongs to the referred node, and `num_initial_portals` is the
// number of initial portals expected on the resulting link from this side.
// Upon success, `callback` is invoked with a new NodeLink to the referred
// node and the number of initial portals expected by that side. On failure,
// `callback` is invoked with a null link.
using ReferralCallback = std::function<void(Ref<NodeLink>, uint32_t)>;
void ReferNonBroker(Ref<DriverTransport> transport,
uint32_t num_initial_portals,
ReferralCallback callback);
// Sends a request to the remote node to establish a new RouterLink over this
// this NodeLink, to replace an existing RouterLink between the remote node
// and `current_peer_node`. `current_peer_sublink` identifies the specific
// RouterLink between them which is to be replaced.
//
// `inbound_sequence_length_from_bypassed_link` is the final length of the
// parcel sequence to be routed over the link which is being bypassed.
// `new_sublink` and (optionally null) `new_link_state` can be used to
// establish the new link over the NodeLink transmitting this message.
void AcceptBypassLink(
const NodeName& current_peer_node,
SublinkId current_peer_sublink,
SequenceNumber inbound_sequence_length_from_bypassed_link,
SublinkId new_sublink,
FragmentRef<RouterLinkState> new_link_state);
// Sends a request to allocate a new shared memory region and invokes
// `callback` once the request succeeds or fails. On failure, `callback` is
// invoke with an invalid DriverMemory object.
using RequestMemoryCallback = std::function<void(DriverMemory)>;
void RequestMemory(size_t size, RequestMemoryCallback callback);
// Asks the remote node (which must be a broker) to relay `message` over to
// `to_node`. This is used to transmit driver objects between non-broker nodes
// whenever direct transmission is unsupported by the driver.
void RelayMessage(const NodeName& to_node, Message& message);
// Simulates receipt of a new message from the remote node on this link. This
// is called by the local Node with a message that was relayed to it by its
// broker. All relayed messages land on their destination node through this
// method.
bool DispatchRelayedMessage(msg::AcceptRelayedMessage& relay);
// Permanently deactivates this NodeLink. Once this call returns the NodeLink
// will no longer receive transport messages. It may still be used to transmit
// outgoing messages, but it cannot be reactivated. Transmissions over a
// deactivated transport may or may not guarantee delivery to the peer
// transport, as this is left to driver's discretion.
//
// Must only be called on an activated NodeLink, either one which was created
// with CreateActive(), or one which was activated later by calling
// Activate().
void Deactivate();
// Finalizes serialization of DriverObjects within `message` and transmits it
// to the NodeLink's peer, either over the DriverTransport or through shared
// memory.
void Transmit(Message& message);
private:
enum ActivationState {
kNeverActivated,
kActive,
kDeactivated,
};
NodeLink(Ref<Node> node,
LinkSide link_side,
const NodeName& local_node_name,
const NodeName& remote_node_name,
Node::Type remote_node_type,
uint32_t remote_protocol_version,
Ref<DriverTransport> transport,
Ref<NodeLinkMemory> memory,
ActivationState initial_activation_state);
~NodeLink() override;
SequenceNumber GenerateOutgoingSequenceNumber();
// NodeMessageListener overrides:
bool OnReferNonBroker(msg::ReferNonBroker& refer) override;
bool OnNonBrokerReferralAccepted(
msg::NonBrokerReferralAccepted& accepted) override;
bool OnNonBrokerReferralRejected(
msg::NonBrokerReferralRejected& rejected) override;
bool OnRequestIntroduction(msg::RequestIntroduction& request) override;
bool OnAcceptIntroduction(msg::AcceptIntroduction& accept) override;
bool OnRejectIntroduction(msg::RejectIntroduction& reject) override;
bool OnAddBlockBuffer(msg::AddBlockBuffer& add) override;
bool OnAcceptParcel(msg::AcceptParcel& accept) override;
bool OnAcceptParcelDriverObjects(
msg::AcceptParcelDriverObjects& accept) override;
bool OnRouteClosed(msg::RouteClosed& route_closed) override;
bool OnRouteDisconnected(msg::RouteDisconnected& route_disconnected) override;
bool OnSnapshotPeerQueueState(msg::SnapshotPeerQueueState& snapshot) override;
bool OnBypassPeer(msg::BypassPeer& bypass) override;
bool OnAcceptBypassLink(msg::AcceptBypassLink& accept) override;
bool OnStopProxying(msg::StopProxying& stop) override;
bool OnProxyWillStop(msg::ProxyWillStop& will_stop) override;
bool OnBypassPeerWithLink(msg::BypassPeerWithLink& bypass) override;
bool OnStopProxyingToLocalPeer(msg::StopProxyingToLocalPeer& stop) override;
bool OnFlushRouter(msg::FlushRouter& flush) override;
bool OnRequestMemory(msg::RequestMemory& request) override;
bool OnProvideMemory(msg::ProvideMemory& provide) override;
bool OnRelayMessage(msg::RelayMessage& relay) override;
bool OnAcceptRelayedMessage(msg::AcceptRelayedMessage& accept) override;
void OnTransportError() override;
// Invoked when we receive a Parcel whose data fragment resides in a buffer
// not yet known to the local node. This schedules the parcel for acceptance
// as soon as that buffer is available.
void WaitForParcelFragmentToResolve(SublinkId for_sublink,
Parcel& parcel,
const FragmentDescriptor& descriptor,
bool is_split_parcel);
bool AcceptParcelWithoutDriverObjects(SublinkId for_sublink, Parcel& parcel);
bool AcceptParcelDriverObjects(SublinkId for_sublink, Parcel& parcel);
bool AcceptSplitParcel(SublinkId for_sublink,
Parcel& parcel_without_driver_objects,
Parcel& parcel_with_driver_objects);
bool AcceptCompleteParcel(SublinkId for_sublink, Parcel& parcel);
const Ref<Node> node_;
const LinkSide link_side_;
const NodeName local_node_name_;
const NodeName remote_node_name_;
const Node::Type remote_node_type_;
const uint32_t remote_protocol_version_;
const Ref<DriverTransport> transport_;
const Ref<NodeLinkMemory> memory_;
absl::Mutex mutex_;
ActivationState activation_state_ ABSL_GUARDED_BY(mutex_);
// Messages transmitted from this NodeLink may traverse either the driver
// transport OR a shared memory queue. Messages transmitted from the same
// thread need to be processed in the same order by the receiving node. This
// is used to generate a sequence number for every message so they can be
// reordered on the receiving end.
std::atomic<uint64_t> next_outgoing_sequence_number_generator_{0};
using SublinkMap = absl::flat_hash_map<SublinkId, Sublink>;
SublinkMap sublinks_ ABSL_GUARDED_BY(mutex_);
// Pending memory allocation request callbacks. Keyed by request size, when
// an incoming ProvideMemory message is received, the front of the list for
// that size is removed from the map and invoked with the new memory object.
using MemoryRequestMap =
absl::flat_hash_map<uint32_t, std::list<RequestMemoryCallback>>;
MemoryRequestMap pending_memory_requests_ ABSL_GUARDED_BY(mutex_);
// Tracks partially received contents of split parcels so they can be
// reconstructed for dispatch.
using PartialParcelKey = std::tuple<SublinkId, SequenceNumber>;
using PartialParcelMap = absl::flat_hash_map<PartialParcelKey, Parcel>;
PartialParcelMap partial_parcels_ ABSL_GUARDED_BY(mutex_);
// Tracks pending referrals sent to the broker.
uint64_t next_referral_id_ = 0;
absl::flat_hash_map<uint64_t, ReferralCallback> pending_referrals_
ABSL_GUARDED_BY(mutex_);
};
} // namespace ipcz
#endif // IPCZ_SRC_IPCZ_NODE_LINK_H_