ipcz: Fix DriverTransport Listener lifetimes

Both NodeConnector and NodeLink make assumptions that they're safe to
destroy immediately after calling their transport's Deactivate(). This
is not a valid assumption, because the transport holds an unowned
reference to its Listener. They aren't safe to destroy until
deactivation is *complete*, thus guaranteeing that the Listener will no longer be invoked.

This changes DriverTransport::Listener to be RefCounted and has
DriverTransport retain a reference to its active Listener.

Also plumbs an OnTransportDeactivated() event down to the Listener
in case they want to observe deactivation being completed by the
driver.

Bug: 1299283
Change-Id: Ia39be1ee8f274a669ab4d94d8cf7431be3a395fa
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3703668
Commit-Queue: Ken Rockot <rockot@google.com>
Reviewed-by: Alex Gough <ajgo@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1017647}
NOKEYCHECK=True
GitOrigin-RevId: dc756fc2c4d90248e44f4cb64f3185f0357dea4c
diff --git a/src/ipcz/driver_transport.cc b/src/ipcz/driver_transport.cc
index 5928cfa..d5865d4 100644
--- a/src/ipcz/driver_transport.cc
+++ b/src/ipcz/driver_transport.cc
@@ -34,6 +34,7 @@
   if (flags & IPCZ_TRANSPORT_ACTIVITY_DEACTIVATED) {
     const Ref<DriverTransport> doomed_transport =
         DriverTransport::TakeFromHandle(transport);
+    doomed_transport->NotifyDeactivated();
     return IPCZ_RESULT_OK;
   }
 
@@ -88,12 +89,26 @@
 
 bool DriverTransport::Notify(const RawMessage& message) {
   ABSL_ASSERT(listener_);
-  return listener_->OnTransportMessage(message, *this);
+  // Listener methods may set a new Listener on this DriverTransport, and that
+  // may drop their own last reference. Keep a reference here to ensure this
+  // Listener remains alive through the extent of its notification.
+  Ref<Listener> listener = listener_;
+  return listener->OnTransportMessage(message, *this);
 }
 
 void DriverTransport::NotifyError() {
   ABSL_ASSERT(listener_);
-  listener_->OnTransportError();
+  // Listener methods may set a new Listener on this DriverTransport, and that
+  // may drop their own last reference. Keep a reference here to ensure this
+  // Listener remains alive through the extent of its notification.
+  Ref<Listener> listener = listener_;
+  return listener->OnTransportError();
+}
+
+void DriverTransport::NotifyDeactivated() {
+  ABSL_ASSERT(listener_);
+  Ref<Listener> listener = std::move(listener_);
+  listener->OnTransportDeactivated();
 }
 
 IpczResult DriverTransport::Close() {
diff --git a/src/ipcz/driver_transport.h b/src/ipcz/driver_transport.h
index e18c7f1..1dfadc4 100644
--- a/src/ipcz/driver_transport.h
+++ b/src/ipcz/driver_transport.h
@@ -37,9 +37,9 @@
   };
 
   // A Listener to receive message and error events from the driver.
-  class Listener {
+  class Listener : public RefCounted {
    public:
-    virtual ~Listener() = default;
+    ~Listener() override = default;
 
     // Accepts a raw message from the transport. Note that this is called
     // without *any* validation of the size or content of `message`.
@@ -48,6 +48,10 @@
 
     // Indicates that some unrecoverable error has occurred with the transport.
     virtual void OnTransportError() = 0;
+
+    // Indicates that dectivation has been completed by the driver, meaning that
+    // no further methods will be invoked on this Listener.
+    virtual void OnTransportDeactivated() {}
   };
 
   // Constructs a new DriverTransport object over the driver-created transport
@@ -57,8 +61,8 @@
   // Set the object handling any incoming message or error notifications. This
   // is only safe to set before Activate() is called, or from within one of the
   // Listener methods when invoked by this DriverTransport (because invocations
-  // are mutually exclusive). `listener` must outlive this DriverTransport.
-  void set_listener(Listener* listener) { listener_ = listener; }
+  // are mutually exclusive).
+  void set_listener(Ref<Listener> listener) { listener_ = std::move(listener); }
 
   // Exposes the underlying driver handle for this transport.
   const DriverObject& driver_object() const { return transport_; }
@@ -97,6 +101,10 @@
   bool Notify(const RawMessage& message);
   void NotifyError();
 
+  // Invoked once the driver has finalized deactivation of this transport, as
+  // previously requested by a call to Deactivate().
+  void NotifyDeactivated();
+
   // APIObject:
   IpczResult Close() override;
 
@@ -105,7 +113,7 @@
 
   DriverObject transport_;
 
-  Listener* listener_ = nullptr;
+  Ref<Listener> listener_;
 };
 
 }  // namespace ipcz
diff --git a/src/ipcz/node.cc b/src/ipcz/node.cc
index 33462bc..bc5e1ec 100644
--- a/src/ipcz/node.cc
+++ b/src/ipcz/node.cc
@@ -89,9 +89,16 @@
 }
 
 bool Node::AddLink(const NodeName& remote_node_name, Ref<NodeLink> link) {
-  absl::MutexLock lock(&mutex_);
-  auto [it, inserted] = node_links_.insert({remote_node_name, std::move(link)});
-  return inserted;
+  {
+    absl::MutexLock lock(&mutex_);
+    auto [it, inserted] = node_links_.insert({remote_node_name, link});
+    if (inserted) {
+      return true;
+    }
+  }
+
+  link->Deactivate();
+  return false;
 }
 
 NodeName Node::GenerateRandomName() const {
diff --git a/src/ipcz/node_connector.cc b/src/ipcz/node_connector.cc
index 76d4514..1013577 100644
--- a/src/ipcz/node_connector.cc
+++ b/src/ipcz/node_connector.cc
@@ -68,6 +68,7 @@
     DVLOG(4) << "Accepting ConnectFromNonBrokerToBroker on broker "
              << broker_name_.ToString() << " from new node "
              << new_remote_node_name_.ToString();
+
     AcceptConnection(
         NodeLink::Create(node_, LinkSide::kA, broker_name_,
                          new_remote_node_name_, Node::Type::kNormal,
@@ -233,7 +234,6 @@
     callback_(new_link);
   }
   EstablishWaitingPortals(std::move(new_link), link_side, num_remote_portals);
-  active_self_.reset();
 }
 
 void NodeConnector::RejectConnection() {
@@ -242,12 +242,10 @@
   }
   EstablishWaitingPortals(nullptr, LinkSide::kA, 0);
   transport_->Deactivate();
-  active_self_.reset();
 }
 
 bool NodeConnector::ActivateTransportAndConnect() {
-  active_self_ = WrapRefCounted(this);
-  transport_->set_listener(this);
+  transport_->set_listener(WrapRefCounted(this));
   if (transport_->Activate() != IPCZ_RESULT_OK) {
     RejectConnection();
     return false;
diff --git a/src/ipcz/node_connector.h b/src/ipcz/node_connector.h
index 091561c..e5345d0 100644
--- a/src/ipcz/node_connector.h
+++ b/src/ipcz/node_connector.h
@@ -29,7 +29,7 @@
 // Once an initial handshake is complete the underlying transport is adopted by
 // a new NodeLink and handed off to the local Node to communicate with the
 // remote node, and this object is destroyed.
-class NodeConnector : public RefCounted, public msg::NodeMessageListener {
+class NodeConnector : public msg::NodeMessageListener {
  public:
   // Constructs a new NodeConnector to send and receive a handshake on
   // `transport`. The specific type of connector to create is determined by a
@@ -77,7 +77,6 @@
   const Ref<DriverTransport> transport_;
   const IpczConnectNodeFlags flags_;
   const std::vector<Ref<Portal>> waiting_portals_;
-  Ref<NodeConnector> active_self_;
 
  private:
   bool ActivateTransportAndConnect();
diff --git a/src/ipcz/node_link.cc b/src/ipcz/node_link.cc
index 5cffcf1..6fb7f58 100644
--- a/src/ipcz/node_link.cc
+++ b/src/ipcz/node_link.cc
@@ -57,13 +57,12 @@
       remote_protocol_version_(remote_protocol_version),
       transport_(std::move(transport)),
       memory_(std::move(memory)) {
-  transport_->set_listener(this);
+  transport_->set_listener(WrapRefCounted(this));
 }
 
 NodeLink::~NodeLink() {
-  // Ensure this NodeLink is deactivated even if it was never adopted by a Node.
-  // If it was already deactivated, this is a no-op.
-  Deactivate();
+  absl::MutexLock lock(&mutex_);
+  ABSL_HARDENING_ASSERT(!active_);
 }
 
 Ref<RemoteRouterLink> NodeLink::AddRemoteRouterLink(SublinkId sublink,
diff --git a/src/ipcz/node_link.h b/src/ipcz/node_link.h
index ed1aa12..47c20ee 100644
--- a/src/ipcz/node_link.h
+++ b/src/ipcz/node_link.h
@@ -39,7 +39,7 @@
 // NodeLinks may also allocate an arbitrary number of sublinks which are used
 // to multiplex the link and facilitate point-to-point communication between
 // specific Router instances on either end.
-class NodeLink : public RefCounted, private msg::NodeMessageListener {
+class NodeLink : public msg::NodeMessageListener {
  public:
   struct Sublink {
     Sublink(Ref<RemoteRouterLink> link, Ref<Router> receiver);
diff --git a/src/test/test_transport_listener.cc b/src/test/test_transport_listener.cc
index f7eb836..757649f 100644
--- a/src/test/test_transport_listener.cc
+++ b/src/test/test_transport_listener.cc
@@ -12,18 +12,60 @@
 #include "ipcz/node.h"
 #include "testing/gtest/include/gtest/gtest.h"
 #include "third_party/abseil-cpp/absl/base/macros.h"
+#include "third_party/abseil-cpp/absl/synchronization/notification.h"
 #include "util/ref_counted.h"
 
 namespace ipcz::test {
 
+class TestTransportListener::ListenerImpl : public DriverTransport::Listener {
+ public:
+  ListenerImpl() = default;
+
+  void set_message_handler(
+      TestTransportListener::GenericMessageHandler handler) {
+    ABSL_ASSERT(!message_handler_);
+    message_handler_ = std::move(handler);
+  }
+
+  void set_error_handler(TestTransportListener::ErrorHandler handler) {
+    ABSL_ASSERT(!error_handler_);
+    error_handler_ = std::move(handler);
+  }
+
+  void WaitForDeactivation() { deactivation_.WaitForNotification(); }
+
+  // DriverTransport::Listener:
+  bool OnTransportMessage(const DriverTransport::RawMessage& message,
+                          const DriverTransport& transport) override {
+    ABSL_ASSERT(message_handler_);
+    return message_handler_(message);
+  }
+
+  void OnTransportError() override {
+    if (error_handler_) {
+      error_handler_();
+    }
+  }
+
+  void OnTransportDeactivated() override { deactivation_.Notify(); }
+
+ private:
+  ~ListenerImpl() override = default;
+
+  TestTransportListener::GenericMessageHandler message_handler_ = nullptr;
+  TestTransportListener::ErrorHandler error_handler_ = nullptr;
+  absl::Notification deactivation_;
+};
+
 TestTransportListener::TestTransportListener(IpczHandle node,
                                              IpczDriverHandle handle)
     : TestTransportListener(MakeRefCounted<DriverTransport>(
           DriverObject(reinterpret_cast<Node*>(node)->driver(), handle))) {}
 
 TestTransportListener::TestTransportListener(Ref<DriverTransport> transport)
-    : transport_(std::move(transport)) {
-  transport_->set_listener(this);
+    : transport_(std::move(transport)),
+      listener_(MakeRefCounted<ListenerImpl>()) {
+  transport_->set_listener(listener_);
 }
 
 TestTransportListener::~TestTransportListener() {
@@ -37,11 +79,11 @@
 
   transport_->Deactivate();
   activated_ = false;
+  listener_->WaitForDeactivation();
 }
 
 void TestTransportListener::OnRawMessage(GenericMessageHandler handler) {
-  ABSL_ASSERT(!message_handler_);
-  message_handler_ = std::move(handler);
+  listener_->set_message_handler(std::move(handler));
   ActivateTransportIfNecessary();
 }
 
@@ -56,8 +98,7 @@
 }
 
 void TestTransportListener::OnError(ErrorHandler handler) {
-  ABSL_ASSERT(!error_handler_);
-  error_handler_ = std::move(handler);
+  listener_->set_error_handler(std::move(handler));
 
   // Since the caller only cares about handling errors, ensure all valid
   // messages are cleanly discarded. This also activates the transport.
@@ -76,17 +117,4 @@
   transport_->Activate();
 }
 
-bool TestTransportListener::OnTransportMessage(
-    const DriverTransport::RawMessage& message,
-    const DriverTransport& transport) {
-  ABSL_ASSERT(message_handler_);
-  return message_handler_(message);
-}
-
-void TestTransportListener::OnTransportError() {
-  if (error_handler_) {
-    error_handler_();
-  }
-};
-
 }  // namespace ipcz::test
diff --git a/src/test/test_transport_listener.h b/src/test/test_transport_listener.h
index d2a3e0d..b196ef4 100644
--- a/src/test/test_transport_listener.h
+++ b/src/test/test_transport_listener.h
@@ -15,7 +15,7 @@
 
 // Helper for tests to conveniently listen for incoming messages on a driver
 // transport.
-class TestTransportListener : public DriverTransport::Listener {
+class TestTransportListener {
  public:
   using GenericMessageHandler =
       std::function<bool(const DriverTransport::RawMessage& message)>;
@@ -23,7 +23,7 @@
 
   TestTransportListener(IpczHandle node, IpczDriverHandle handle);
   explicit TestTransportListener(Ref<DriverTransport> transport);
-  ~TestTransportListener() override;
+  ~TestTransportListener();
 
   // Deactivates the transport and stops listening for messages and/or errors.
   void StopListening();
@@ -66,17 +66,13 @@
   }
 
  private:
+  class ListenerImpl;
+
   void ActivateTransportIfNecessary();
 
-  // DriverTransport::Listener:
-  bool OnTransportMessage(const DriverTransport::RawMessage& message,
-                          const DriverTransport& transport) override;
-  void OnTransportError() override;
-
   const Ref<DriverTransport> transport_;
+  const Ref<ListenerImpl> listener_;
   bool activated_ = false;
-  GenericMessageHandler message_handler_;
-  ErrorHandler error_handler_;
 };
 
 }  // namespace ipcz::test