blob: 9d266da0610fb8027ee83dc5325e10612b6337a3 [file] [log] [blame]
// Copyright 2022 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "ipcz/node.h"
#include <utility>
#include <vector>
#include "ipcz/driver_memory.h"
#include "ipcz/ipcz.h"
#include "ipcz/link_side.h"
#include "ipcz/node_connector.h"
#include "ipcz/node_link.h"
#include "ipcz/node_link_memory.h"
#include "ipcz/portal.h"
#include "ipcz/router.h"
#include "third_party/abseil-cpp/absl/base/macros.h"
#include "third_party/abseil-cpp/absl/container/inlined_vector.h"
#include "third_party/abseil-cpp/absl/synchronization/mutex.h"
#include "third_party/abseil-cpp/absl/types/span.h"
#include "util/log.h"
#include "util/ref_counted.h"
namespace ipcz {
namespace {
// Returns a copy of the structure pointed to by `options` if non-null;
// otherwise returns a default set of options. This function will also adapt
// to newer or older versions of the input options on input, coercing them into
// the current implementation's version if needed.
IpczCreateNodeOptions CopyOrUseDefaultOptions(
const IpczCreateNodeOptions* options) {
IpczCreateNodeOptions copied_options = {0};
if (options) {
memcpy(&copied_options, options,
std::min(options->size, sizeof(copied_options)));
}
copied_options.size = sizeof(copied_options);
return copied_options;
}
} // namespace
// A pending introduction tracks progress of one or more outstanding
// introduction requests for a single node in the system.
class Node::PendingIntroduction {
public:
// Constructs a new object to track introduction a specific node. `broker`
// is the sequence of broker nodes queried for the introduction.
explicit PendingIntroduction(absl::Span<const Ref<NodeLink>> brokers) {
pending_replies_.reserve(brokers.size());
for (const auto& broker : brokers) {
pending_replies_.insert(broker->remote_node_name());
}
}
// Indicates that all pending responses have come back indicating failure,
// and that the pending introduction itself has failed.
bool failed() const { return pending_replies_.empty(); }
// Marks a specific broker as having responded with rejection.
void NotifyFailureFrom(NodeLink& rejecting_broker) {
pending_replies_.erase(rejecting_broker.remote_node_name());
}
// Registers a new callback to be invoked once the introduction process has
// completed, regardless of success or failure.
void AddCallback(Node::EstablishLinkCallback callback) {
callbacks_.push_back(std::move(callback));
}
// Runs all callbacks associated with this introduction. If the introduction
// failed, `result` will be null. Otherwise it's a link to the newly
// introduced remote node.
void Finish(NodeLink* result) {
for (auto& callback : callbacks_) {
callback(result);
}
}
private:
// Set of brokers from whom we're still expecting a reply for this pending
// introduction request.
absl::flat_hash_set<NodeName> pending_replies_;
// Callbacks to be invoked once the introduction is finished.
std::vector<EstablishLinkCallback> callbacks_;
};
Node::Node(Type type,
const IpczDriver& driver,
IpczDriverHandle driver_node,
const IpczCreateNodeOptions* options)
: type_(type),
driver_(driver),
driver_node_(driver_node),
options_(CopyOrUseDefaultOptions(options)) {
if (type_ == Type::kBroker) {
// Only brokers assign their own names.
assigned_name_ = GenerateRandomName();
DVLOG(4) << "Created new broker node " << assigned_name_.ToString();
} else {
DVLOG(4) << "Created new non-broker node " << this;
}
}
Node::~Node() = default;
IpczResult Node::Close() {
ShutDown();
return IPCZ_RESULT_OK;
}
IpczResult Node::ConnectNode(IpczDriverHandle driver_transport,
IpczConnectNodeFlags flags,
absl::Span<IpczHandle> initial_portals) {
std::vector<Ref<Portal>> portals(initial_portals.size());
for (size_t i = 0; i < initial_portals.size(); ++i) {
auto portal =
MakeRefCounted<Portal>(WrapRefCounted(this), MakeRefCounted<Router>());
portals[i] = portal;
initial_portals[i] = Portal::ReleaseAsHandle(std::move(portal));
}
auto transport =
MakeRefCounted<DriverTransport>(DriverObject(driver_, driver_transport));
IpczResult result = NodeConnector::ConnectNode(WrapRefCounted(this),
transport, flags, portals);
if (result != IPCZ_RESULT_OK) {
// On failure the caller retains ownership of `driver_transport`. Release
// it here so it doesn't get closed when `transport` is destroyed.
transport->Release();
// Wipe out the initial portals we created, since they are invalid and
// effectively not returned to the caller on failure.
for (Ref<Portal>& portal : portals) {
Ref<Portal> doomed_portal = AdoptRef(portal.get());
}
return result;
}
return IPCZ_RESULT_OK;
}
NodeName Node::GetAssignedName() {
absl::MutexLock lock(&mutex_);
return assigned_name_;
}
Ref<NodeLink> Node::GetBrokerLink() {
absl::MutexLock lock(&mutex_);
return broker_link_;
}
void Node::SetAssignedName(const NodeName& name) {
absl::MutexLock lock(&mutex_);
ABSL_ASSERT(!assigned_name_.is_valid());
assigned_name_ = name;
}
bool Node::AddConnection(const NodeName& remote_node_name,
Connection connection) {
std::vector<BrokerLinkCallback> callbacks;
{
absl::ReleasableMutexLock lock(&mutex_);
auto [it, inserted] = connections_.insert({remote_node_name, connection});
if (!inserted) {
lock.Release();
const OperationContext context{OperationContext::kTransportNotification};
connection.link->Deactivate(context);
return false;
}
const bool remote_is_broker =
connection.link->remote_node_type() == Type::kBroker;
const bool local_is_broker = type_ == Type::kBroker;
if (local_is_broker && remote_is_broker) {
// We're a broker, and this is a link to some other broker. We retain a
// separate mapping of other brokers so they can be consulted for
// introductions.
other_brokers_.insert({remote_node_name, connection.link});
} else if (remote_is_broker) {
// The first connection accepted by a non-broker must be a connection to
// its own broker.
ABSL_ASSERT(connections_.size() == 1);
ABSL_ASSERT(!broker_link_);
broker_link_ = connection.link;
broker_link_callbacks_.swap(callbacks);
}
}
for (auto& callback : callbacks) {
callback(connection.link);
}
return true;
}
absl::optional<Node::Connection> Node::GetConnection(const NodeName& name) {
absl::MutexLock lock(&mutex_);
auto it = connections_.find(name);
if (it == connections_.end()) {
return absl::nullopt;
}
return it->second;
}
Ref<NodeLink> Node::GetLink(const NodeName& name) {
absl::MutexLock lock(&mutex_);
auto it = connections_.find(name);
if (it == connections_.end()) {
return nullptr;
}
return it->second.link;
}
NodeName Node::GenerateRandomName() const {
NodeName name;
IpczResult result =
driver_.GenerateRandomBytes(sizeof(name), IPCZ_NO_FLAGS, nullptr, &name);
ABSL_ASSERT(result == IPCZ_RESULT_OK);
return name;
}
void Node::SetAllocationDelegate(Ref<NodeLink> link) {
absl::MutexLock lock(&mutex_);
ABSL_ASSERT(!allocation_delegate_link_);
allocation_delegate_link_ = std::move(link);
}
void Node::AllocateSharedMemory(size_t size,
AllocateSharedMemoryCallback callback) {
Ref<NodeLink> delegate;
{
absl::MutexLock lock(&mutex_);
delegate = allocation_delegate_link_;
}
if (delegate) {
delegate->RequestMemory(size, std::move(callback));
} else {
callback(DriverMemory(driver_, size));
}
}
void Node::EstablishLink(const NodeName& name, EstablishLinkCallback callback) {
Ref<NodeLink> existing_link;
absl::InlinedVector<Ref<NodeLink>, 2> brokers_to_query;
{
absl::MutexLock lock(&mutex_);
auto it = connections_.find(name);
if (it != connections_.end()) {
existing_link = it->second.link;
} else {
if (type_ == Type::kNormal && broker_link_) {
brokers_to_query.push_back(broker_link_);
} else if (!other_brokers_.empty()) {
ABSL_ASSERT(type_ == Type::kBroker);
brokers_to_query.reserve(other_brokers_.size());
for (const auto& [broker_name, link] : other_brokers_) {
brokers_to_query.push_back(link);
}
}
if (!brokers_to_query.empty()) {
auto [pending_it, inserted] =
pending_introductions_.insert({name, nullptr});
auto& intro = pending_it->second;
if (!intro) {
intro = std::make_unique<PendingIntroduction>(
absl::MakeSpan(brokers_to_query));
}
intro->AddCallback(std::move(callback));
if (!inserted) {
// There was already a pending introduction we can wait for.
return;
}
}
}
}
if (!brokers_to_query.empty()) {
for (const auto& broker : brokers_to_query) {
broker->RequestIntroduction(name);
}
return;
}
// NOTE: `existing_link` may be null here, implying that we have failed.
callback(existing_link.get());
}
void Node::HandleIntroductionRequest(NodeLink& from_node_link,
const NodeName& for_node) {
// NodeLink must never accept these requests on non-broker nodes.
ABSL_ASSERT(type_ == Type::kBroker);
const NodeName requestor = from_node_link.remote_node_name();
DVLOG(4) << "Broker " << from_node_link.local_node_name().ToString()
<< " received introduction request for " << for_node.ToString()
<< " from " << requestor.ToString();
const absl::optional<Connection> target_connection = GetConnection(for_node);
if (!target_connection) {
// We are not familiar with the requested node. Attempt to establish our own
// link to it first, then try again.
EstablishLink(for_node, [self = WrapRefCounted(this),
requestor = WrapRefCounted(&from_node_link),
name = for_node](NodeLink* link) {
if (!link) {
requestor->RejectIntroduction(name);
return;
}
self->HandleIntroductionRequest(*requestor, name);
});
return;
}
const bool is_target_in_network = !target_connection->broker;
const bool is_target_broker =
target_connection->link == target_connection->broker;
const bool is_requestor_broker =
from_node_link.remote_node_type() == Type::kBroker;
if (is_requestor_broker && is_target_broker) {
DLOG(ERROR) << "Invalid introduction request from broker "
<< requestor.ToString() << " for broker "
<< for_node.ToString();
return;
}
if (is_target_broker || is_requestor_broker || is_target_in_network ||
target_connection->broker->link_side().is_side_a()) {
// If one of the two nodes being introduced is a broker, or if the target
// is in-network (which implies the requestor is too, if it's not a broker)
// then we are the only node that can introduce these two nodes.
//
// Otherwise if this is an introduction between two non-brokers in separate
// networks, by convention we can only perform the introduction if we're on
// side A of the link between the two relevant brokers.
IntroduceRemoteNodes(from_node_link, *target_connection->link);
return;
}
// This is an introduction between two non-brokers in separate networks, and
// we (one of the networks' brokers) are on side B of the link to the other
// network's broker. This introduction is therefore the other broker's
// responsibility.
msg::RequestIndirectIntroduction request;
request.params().source_node = from_node_link.remote_node_name();
request.params().target_node = target_connection->link->remote_node_name();
target_connection->broker->Transmit(request);
}
void Node::AcceptIntroduction(NodeLink& from_node_link,
const NodeName& name,
LinkSide side,
Node::Type remote_node_type,
uint32_t remote_protocol_version,
Ref<DriverTransport> transport,
Ref<NodeLinkMemory> memory) {
// NodeLink should never dispatch this method to a node if the introduction
// didn't come from a broker, so this assertion should always hold.
ABSL_ASSERT(from_node_link.remote_node_type() == Node::Type::kBroker);
const NodeName local_name = from_node_link.local_node_name();
DVLOG(4) << "Node " << local_name.ToString() << " received introduction to "
<< name.ToString() << " from broker "
<< from_node_link.remote_node_name().ToString();
Ref<NodeLink> new_link = NodeLink::CreateInactive(
WrapRefCounted(this), side, local_name, name, remote_node_type,
remote_protocol_version, transport, memory);
ABSL_ASSERT(new_link);
std::unique_ptr<PendingIntroduction> pending_introduction;
{
absl::MutexLock lock(&mutex_);
if (type_ == Type::kNormal && !broker_link_) {
// If we've lost our broker connection, we should ignore any further
// introductions that arrive.
return;
}
auto [connection_it, inserted] =
connections_.insert({name,
{
.link = new_link,
.broker = WrapRefCounted(&from_node_link),
}});
if (!inserted) {
// If both nodes race to request an introduction to each other, the
// broker may send redundant introductions. It does however take care to
// ensure that they're ordered consistently across both nodes, so
// redundant introductions can be safely ignored by convention.
return;
}
// If this node requested this introduction, we may have callbacks to run.
// Note that it is not an error to receive an unrequested introduction,
// since it is only necessary for one of the introduced nodes to have
// requested it.
auto it = pending_introductions_.find(name);
if (it != pending_introductions_.end()) {
pending_introduction = std::move(it->second);
pending_introductions_.erase(it);
}
}
new_link->Activate();
if (pending_introduction) {
pending_introduction->Finish(new_link.get());
}
}
void Node::NotifyIntroductionFailed(NodeLink& from_broker,
const NodeName& name) {
std::unique_ptr<PendingIntroduction> failed_introduction;
{
absl::MutexLock lock(&mutex_);
auto it = pending_introductions_.find(name);
if (it == pending_introductions_.end()) {
return;
}
auto& intro = it->second;
intro->NotifyFailureFrom(from_broker);
if (!intro->failed()) {
// We're still waiting for replies from one or more other brokers.
return;
}
failed_introduction = std::move(intro);
pending_introductions_.erase(it);
}
failed_introduction->Finish(nullptr);
}
bool Node::RelayMessage(const NodeName& from_node, msg::RelayMessage& relay) {
ABSL_ASSERT(type_ == Type::kBroker);
auto link = GetLink(relay.params().destination);
if (!link) {
return true;
}
absl::Span<uint8_t> data = relay.GetArrayView<uint8_t>(relay.params().data);
msg::AcceptRelayedMessage accept;
accept.params().source = from_node;
accept.params().data = accept.AllocateArray<uint8_t>(data.size());
memcpy(accept.GetArrayData(accept.params().data), data.data(), data.size());
accept.params().driver_objects =
accept.AppendDriverObjects(relay.driver_objects());
link->Transmit(accept);
return true;
}
bool Node::AcceptRelayedMessage(msg::AcceptRelayedMessage& accept) {
if (auto link = GetLink(accept.params().source)) {
link->DispatchRelayedMessage(accept);
}
return true;
}
void Node::DropConnection(const OperationContext& context,
const NodeName& name) {
Ref<NodeLink> link;
std::vector<NodeName> pending_introductions;
bool lost_broker = false;
{
absl::MutexLock lock(&mutex_);
auto it = connections_.find(name);
if (it == connections_.end()) {
return;
}
link = std::move(it->second.link);
connections_.erase(it);
const NodeName& local_name = link->local_node_name();
DVLOG(4) << "Node " << local_name.ToString() << " dropping "
<< "link to " << link->remote_node_name().ToString();
if (link == broker_link_) {
DVLOG(4) << "Node " << local_name.ToString() << " lost its broker link";
broker_link_.reset();
lost_broker = true;
} else if (link->remote_node_type() == Type::kBroker) {
other_brokers_.erase(link->remote_node_name());
}
if (link == allocation_delegate_link_) {
DVLOG(4) << "Node " << local_name.ToString()
<< " lost its allocation delegate";
allocation_delegate_link_.reset();
}
// Accumulate the set of currently pending introductions. If any of them are
// awaiting a response from the dropped link, their expectations will be
// updated accordingly by NotifyIntroductionFailed() below.
pending_introductions.reserve(pending_introductions_.size());
for (auto& [target, intro] : pending_introductions_) {
pending_introductions.push_back(target);
}
}
link->Deactivate(context);
if (lost_broker) {
CancelAllIntroductions();
} else {
for (auto& target : pending_introductions) {
NotifyIntroductionFailed(*link, target);
}
}
}
void Node::WaitForBrokerLinkAsync(BrokerLinkCallback callback) {
Ref<NodeLink> broker_link;
{
absl::MutexLock lock(&mutex_);
if (!broker_link_) {
broker_link_callbacks_.push_back(std::move(callback));
return;
}
broker_link = broker_link_;
}
callback(std::move(broker_link));
}
bool Node::HandleIndirectIntroductionRequest(NodeLink& from_node_link,
const NodeName& our_node,
const NodeName& their_node) {
// Enforced by NodeLink before dispatching to this Node.
ABSL_ASSERT(type_ == Type::kBroker);
ABSL_ASSERT(from_node_link.remote_node_type() == Type::kBroker);
ABSL_ASSERT(from_node_link.link_side().is_side_a());
absl::optional<Connection> connection_to_their_node =
GetConnection(their_node);
if (!connection_to_their_node) {
// We need to establish our own direct connection to `their_node` before we
// can help introduce it to `our_node`.
EstablishLink(their_node, [self = WrapRefCounted(this),
their_broker = WrapRefCounted(&from_node_link),
our_node, their_node](NodeLink* link) {
if (!link) {
// If we could not get our own link to the identified node, then we
// can't complete the introduction request. Notify our own node of the
// failure in case they were awaiting such an introduction.
absl::optional<Connection> connection_to_our_node =
self->GetConnection(our_node);
if (connection_to_our_node) {
connection_to_our_node->link->RejectIntroduction(their_node);
}
return;
}
self->HandleIndirectIntroductionRequest(*their_broker, our_node,
their_node);
});
return true;
}
absl::optional<Connection> connection_to_our_node = GetConnection(our_node);
if (!connection_to_our_node) {
// We may have lost this connection for any number of reasons, but in any
// case we can't fulfill the request.
connection_to_their_node->link->RejectIntroduction(our_node);
return true;
}
IntroduceRemoteNodes(*connection_to_our_node->link,
*connection_to_their_node->link);
return true;
}
void Node::ShutDown() {
ConnectionMap connections;
{
absl::MutexLock lock(&mutex_);
connections_.swap(connections);
broker_link_.reset();
allocation_delegate_link_.reset();
other_brokers_.clear();
}
const OperationContext context{OperationContext::kAPICall};
for (const auto& entry : connections) {
entry.second.link->Deactivate(context);
}
CancelAllIntroductions();
}
void Node::CancelAllIntroductions() {
PendingIntroductionMap introductions;
{
absl::MutexLock lock(&mutex_);
introductions.swap(pending_introductions_);
}
for (auto& [name, intro] : introductions) {
intro->Finish(nullptr);
}
}
void Node::IntroduceRemoteNodes(NodeLink& first, NodeLink& second) {
// Ensure that no other thread does the same introduction concurrently.
const NodeName& first_name = first.remote_node_name();
const NodeName& second_name = second.remote_node_name();
const auto key = (first_name < second_name)
? IntroductionKey(first_name, second_name)
: IntroductionKey(second_name, first_name);
{
absl::MutexLock lock(&mutex_);
auto [it, inserted] = in_progress_introductions_.insert(key);
if (!inserted) {
return;
}
}
DriverMemoryWithMapping buffer = NodeLinkMemory::AllocateMemory(driver_);
auto [transport_for_first_node, transport_for_second_node] =
DriverTransport::CreatePair(driver_, first.transport().get(),
second.transport().get());
first.AcceptIntroduction(second_name, LinkSide::kA, second.remote_node_type(),
second.remote_protocol_version(),
std::move(transport_for_first_node),
buffer.memory.Clone());
second.AcceptIntroduction(first_name, LinkSide::kB, first.remote_node_type(),
first.remote_protocol_version(),
std::move(transport_for_second_node),
std::move(buffer.memory));
absl::MutexLock lock(&mutex_);
in_progress_introductions_.erase(key);
}
} // namespace ipcz