| // Copyright 2022 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "ipcz/node.h" |
| |
| #include <utility> |
| #include <vector> |
| |
| #include "ipcz/driver_memory.h" |
| #include "ipcz/ipcz.h" |
| #include "ipcz/link_side.h" |
| #include "ipcz/node_connector.h" |
| #include "ipcz/node_link.h" |
| #include "ipcz/node_link_memory.h" |
| #include "ipcz/portal.h" |
| #include "ipcz/router.h" |
| #include "third_party/abseil-cpp/absl/base/macros.h" |
| #include "third_party/abseil-cpp/absl/synchronization/mutex.h" |
| #include "util/log.h" |
| #include "util/ref_counted.h" |
| |
| namespace ipcz { |
| |
| Node::Node(Type type, const IpczDriver& driver, IpczDriverHandle driver_node) |
| : type_(type), driver_(driver), driver_node_(driver_node) { |
| if (type_ == Type::kBroker) { |
| // Only brokers assign their own names. |
| assigned_name_ = GenerateRandomName(); |
| DVLOG(4) << "Created new broker node " << assigned_name_.ToString(); |
| } else { |
| DVLOG(4) << "Created new non-broker node " << this; |
| } |
| } |
| |
| Node::~Node() = default; |
| |
| IpczResult Node::Close() { |
| ShutDown(); |
| return IPCZ_RESULT_OK; |
| } |
| |
| IpczResult Node::ConnectNode(IpczDriverHandle driver_transport, |
| IpczConnectNodeFlags flags, |
| absl::Span<IpczHandle> initial_portals) { |
| std::vector<Ref<Portal>> portals(initial_portals.size()); |
| for (size_t i = 0; i < initial_portals.size(); ++i) { |
| auto portal = |
| MakeRefCounted<Portal>(WrapRefCounted(this), MakeRefCounted<Router>()); |
| portals[i] = portal; |
| initial_portals[i] = Portal::ReleaseAsHandle(std::move(portal)); |
| } |
| |
| auto transport = |
| MakeRefCounted<DriverTransport>(DriverObject(driver_, driver_transport)); |
| IpczResult result = NodeConnector::ConnectNode(WrapRefCounted(this), |
| transport, flags, portals); |
| if (result != IPCZ_RESULT_OK) { |
| // On failure the caller retains ownership of `driver_transport`. Release |
| // it here so it doesn't get closed when `transport` is destroyed. |
| transport->Release(); |
| |
| // Wipe out the initial portals we created, since they are invalid and |
| // effectively not returned to the caller on failure. |
| for (Ref<Portal>& portal : portals) { |
| Ref<Portal> doomed_portal = AdoptRef(portal.get()); |
| } |
| return result; |
| } |
| return IPCZ_RESULT_OK; |
| } |
| |
| NodeName Node::GetAssignedName() { |
| absl::MutexLock lock(&mutex_); |
| return assigned_name_; |
| } |
| |
| Ref<NodeLink> Node::GetBrokerLink() { |
| absl::MutexLock lock(&mutex_); |
| return broker_link_; |
| } |
| |
| void Node::SetBrokerLink(Ref<NodeLink> link) { |
| absl::MutexLock lock(&mutex_); |
| ABSL_ASSERT(!broker_link_); |
| broker_link_ = std::move(link); |
| } |
| |
| void Node::SetAssignedName(const NodeName& name) { |
| absl::MutexLock lock(&mutex_); |
| ABSL_ASSERT(!assigned_name_.is_valid()); |
| assigned_name_ = name; |
| } |
| |
| bool Node::AddLink(const NodeName& remote_node_name, Ref<NodeLink> link) { |
| { |
| absl::MutexLock lock(&mutex_); |
| auto [it, inserted] = node_links_.insert({remote_node_name, link}); |
| if (inserted) { |
| return true; |
| } |
| } |
| |
| link->Deactivate(); |
| return false; |
| } |
| |
| Ref<NodeLink> Node::GetLink(const NodeName& name) { |
| absl::MutexLock lock(&mutex_); |
| auto it = node_links_.find(name); |
| if (it == node_links_.end()) { |
| return nullptr; |
| } |
| return it->second; |
| } |
| |
| NodeName Node::GenerateRandomName() const { |
| NodeName name; |
| IpczResult result = |
| driver_.GenerateRandomBytes(sizeof(name), IPCZ_NO_FLAGS, nullptr, &name); |
| ABSL_ASSERT(result == IPCZ_RESULT_OK); |
| return name; |
| } |
| |
| void Node::SetAllocationDelegate(Ref<NodeLink> link) { |
| absl::MutexLock lock(&mutex_); |
| ABSL_ASSERT(!allocation_delegate_link_); |
| allocation_delegate_link_ = std::move(link); |
| } |
| |
| void Node::AllocateSharedMemory(size_t size, |
| AllocateSharedMemoryCallback callback) { |
| Ref<NodeLink> delegate; |
| { |
| absl::MutexLock lock(&mutex_); |
| delegate = allocation_delegate_link_; |
| } |
| |
| if (delegate) { |
| delegate->RequestMemory(size, std::move(callback)); |
| } else { |
| callback(DriverMemory(driver_, size)); |
| } |
| } |
| |
| void Node::EstablishLink(const NodeName& name, EstablishLinkCallback callback) { |
| Ref<NodeLink> broker; |
| Ref<NodeLink> link; |
| { |
| absl::MutexLock lock(&mutex_); |
| broker = broker_link_; |
| auto it = node_links_.find(name); |
| if (it != node_links_.end()) { |
| link = it->second; |
| } else if (type_ == Type::kNormal && broker) { |
| auto [pending_it, inserted] = pending_introductions_.insert({name, {}}); |
| pending_it->second.push_back(std::move(callback)); |
| if (!inserted) { |
| // There's already an introduction request out for this node, so there's |
| // nothing more we need to do. |
| return; |
| } |
| } |
| } |
| |
| if (broker && !link) { |
| broker->RequestIntroduction(name); |
| } else { |
| callback(link.get()); |
| } |
| } |
| |
| void Node::HandleIntroductionRequest(NodeLink& from_node_link, |
| const NodeName& for_node) { |
| // NodeLink must never accept these requests on non-broker nodes. |
| ABSL_ASSERT(type_ == Type::kBroker); |
| |
| const NodeName requestor = from_node_link.remote_node_name(); |
| |
| DVLOG(4) << "Broker " << from_node_link.local_node_name().ToString() |
| << " received introduction request for " << for_node.ToString() |
| << " from " << requestor.ToString(); |
| |
| // A key which uniquely identifies the pair of nodes being introduced |
| // regardless of who requested the introduction. |
| const auto key = (requestor < for_node) |
| ? IntroductionKey(requestor, for_node) |
| : IntroductionKey(for_node, requestor); |
| |
| Ref<NodeLink> target_link; |
| { |
| absl::MutexLock lock(&mutex_); |
| auto it = node_links_.find(for_node); |
| if (it != node_links_.end()) { |
| target_link = it->second; |
| |
| auto [intro_it, inserted] = in_progress_introductions_.insert(key); |
| if (!inserted) { |
| // We're already introducing the same two nodes, so drop this request. |
| return; |
| } |
| } |
| } |
| |
| if (!target_link) { |
| from_node_link.RejectIntroduction(for_node); |
| return; |
| } |
| |
| DriverMemoryWithMapping buffer = NodeLinkMemory::AllocateMemory(driver_); |
| auto [transport_for_target, transport_for_requestor] = |
| DriverTransport::CreatePair(driver_, target_link->transport().get(), |
| from_node_link.transport().get()); |
| target_link->AcceptIntroduction( |
| requestor, LinkSide::kA, from_node_link.remote_protocol_version(), |
| std::move(transport_for_target), buffer.memory.Clone()); |
| from_node_link.AcceptIntroduction( |
| for_node, LinkSide::kB, target_link->remote_protocol_version(), |
| std::move(transport_for_requestor), std::move(buffer.memory)); |
| |
| absl::MutexLock lock(&mutex_); |
| in_progress_introductions_.erase(key); |
| } |
| |
| void Node::AcceptIntroduction(NodeLink& from_node_link, |
| const NodeName& name, |
| LinkSide side, |
| uint32_t remote_protocol_version, |
| Ref<DriverTransport> transport, |
| Ref<NodeLinkMemory> memory) { |
| // NodeLink should never dispatch this method to a node if the introduction |
| // didn't come from a broker, so this assertion should always hold. |
| ABSL_ASSERT(from_node_link.remote_node_type() == Node::Type::kBroker); |
| |
| const NodeName local_name = from_node_link.local_node_name(); |
| |
| DVLOG(4) << "Node " << local_name.ToString() << " received introduction to " |
| << name.ToString() << " from broker " |
| << from_node_link.remote_node_name().ToString(); |
| |
| Ref<NodeLink> new_link = NodeLink::CreateInactive( |
| WrapRefCounted(this), side, local_name, name, Type::kNormal, |
| remote_protocol_version, transport, memory); |
| ABSL_ASSERT(new_link); |
| |
| std::vector<EstablishLinkCallback> callbacks; |
| { |
| absl::MutexLock lock(&mutex_); |
| auto [link_it, inserted] = node_links_.insert({name, new_link}); |
| if (!inserted) { |
| // If both nodes race to request an introduction to each other, the |
| // broker may send redundant introductions. It does however take care to |
| // ensure that they're ordered consistently across both nodes, so |
| // redundant introductions can be safely ignored by convention. |
| return; |
| } |
| |
| // If this node requested this introduction, we may have callbacks to run. |
| // Note that it is not an error to receive an unrequested introduction, |
| // since it is only necessary for one of the introduced nodes to have |
| // requested it. |
| auto it = pending_introductions_.find(name); |
| if (it != pending_introductions_.end()) { |
| callbacks = std::move(it->second); |
| pending_introductions_.erase(it); |
| } |
| } |
| |
| new_link->Activate(); |
| for (auto& callback : callbacks) { |
| callback(new_link.get()); |
| } |
| } |
| |
| bool Node::CancelIntroduction(const NodeName& name) { |
| std::vector<EstablishLinkCallback> callbacks; |
| { |
| absl::MutexLock lock(&mutex_); |
| auto it = pending_introductions_.find(name); |
| if (it == pending_introductions_.end()) { |
| return false; |
| } |
| callbacks = std::move(it->second); |
| pending_introductions_.erase(it); |
| } |
| |
| for (auto& callback : callbacks) { |
| callback(nullptr); |
| } |
| |
| return true; |
| } |
| |
| bool Node::RelayMessage(const NodeName& from_node, msg::RelayMessage& relay) { |
| ABSL_ASSERT(type_ == Type::kBroker); |
| auto link = GetLink(relay.params().destination); |
| if (!link) { |
| return true; |
| } |
| |
| absl::Span<uint8_t> data = relay.GetArrayView<uint8_t>(relay.params().data); |
| msg::AcceptRelayedMessage accept; |
| accept.params().source = from_node; |
| accept.params().data = accept.AllocateArray<uint8_t>(data.size()); |
| memcpy(accept.GetArrayData(accept.params().data), data.data(), data.size()); |
| accept.params().driver_objects = |
| accept.AppendDriverObjects(relay.driver_objects()); |
| link->Transmit(accept); |
| return true; |
| } |
| |
| bool Node::AcceptRelayedMessage(msg::AcceptRelayedMessage& accept) { |
| if (auto link = GetLink(accept.params().source)) { |
| link->DispatchRelayedMessage(accept); |
| } |
| return true; |
| } |
| |
| void Node::DropLink(const NodeName& name) { |
| Ref<NodeLink> link; |
| bool lost_broker = false; |
| { |
| absl::MutexLock lock(&mutex_); |
| auto it = node_links_.find(name); |
| if (it == node_links_.end()) { |
| return; |
| } |
| link = std::move(it->second); |
| node_links_.erase(it); |
| |
| const NodeName& local_name = link->local_node_name(); |
| DVLOG(4) << "Node " << local_name.ToString() << " dropping " |
| << " link to " << link->remote_node_name().ToString(); |
| if (link == broker_link_) { |
| DVLOG(4) << "Node " << local_name.ToString() << " lost its broker link"; |
| broker_link_.reset(); |
| lost_broker = true; |
| } |
| |
| if (link == allocation_delegate_link_) { |
| DVLOG(4) << "Node " << local_name.ToString() |
| << " lost its allocation delegate"; |
| allocation_delegate_link_.reset(); |
| } |
| } |
| |
| link->Deactivate(); |
| |
| if (lost_broker) { |
| CancelAllIntroductions(); |
| } |
| } |
| |
| void Node::ShutDown() { |
| NodeLinkMap node_links; |
| { |
| absl::MutexLock lock(&mutex_); |
| std::swap(node_links_, node_links); |
| broker_link_.reset(); |
| allocation_delegate_link_.reset(); |
| } |
| |
| for (const auto& entry : node_links) { |
| entry.second->Deactivate(); |
| } |
| |
| CancelAllIntroductions(); |
| } |
| |
| void Node::CancelAllIntroductions() { |
| PendingIntroductionMap introductions; |
| { |
| absl::MutexLock lock(&mutex_); |
| introductions.swap(pending_introductions_); |
| } |
| |
| for (auto& [name, callbacks] : introductions) { |
| for (auto& callback : callbacks) { |
| callback(nullptr); |
| } |
| } |
| } |
| |
| } // namespace ipcz |