ipcz: Async reference driver

This differs from the sync driver only by its transport implementation.
Instead of synchronous and reentrant message dispatch, each transport's
events are queued and dispatched in-order on a dedicated background
thread.

This effectively simulates the same non-deterministic behavior inherent
in multiprocess usage, without the complexity of a multiprocess
environment. Multinode tests are run in this mode to provide additional
coverage and expose any potential race conditions introduced into ipcz
proper.

Bug: 1299283
Change-Id: I78be1fe6a63546677378ca471a915da731085c30
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3724043
Commit-Queue: Ken Rockot <rockot@google.com>
Reviewed-by: Daniel Cheng <dcheng@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1019679}
NOKEYCHECK=True
GitOrigin-RevId: 90ea32dff9126a07ded84387c0ac9ef91745ae56
diff --git a/src/BUILD.gn b/src/BUILD.gn
index 90436cb..75aef4a 100644
--- a/src/BUILD.gn
+++ b/src/BUILD.gn
@@ -137,11 +137,13 @@
   testonly = true
 
   public = [
+    "reference_drivers/async_reference_driver.h",
     "reference_drivers/blob.h",
     "reference_drivers/sync_reference_driver.h",
   ]
 
   sources = [
+    "reference_drivers/async_reference_driver.cc",
     "reference_drivers/blob.cc",
     "reference_drivers/object.cc",
     "reference_drivers/object.h",
diff --git a/src/ipcz/router.cc b/src/ipcz/router.cc
index 2634bab..d9e9acc 100644
--- a/src/ipcz/router.cc
+++ b/src/ipcz/router.cc
@@ -152,20 +152,19 @@
   return true;
 }
 
-bool Router::AcceptRouteClosureFrom(LinkType link_type,
-                                    SequenceNumber sequence_length) {
-  Ref<RouterLink> inward_forwarding_link;
+bool Router::AcceptRouteClosureFrom(
+    LinkType link_type,
+    absl::optional<SequenceNumber> sequence_length) {
   TrapEventDispatcher dispatcher;
   {
     absl::MutexLock lock(&mutex_);
     if (link_type.is_outward()) {
-      if (!inbound_parcels_.SetFinalSequenceLength(sequence_length)) {
+      if (!inbound_parcels_.SetFinalSequenceLength(sequence_length.value_or(
+              inbound_parcels_.GetCurrentSequenceLength()))) {
         return false;
       }
 
-      if (inward_link_) {
-        inward_forwarding_link = inward_link_;
-      } else {
+      if (!inward_link_) {
         status_.flags |= IPCZ_PORTAL_STATUS_PEER_CLOSED;
         if (inbound_parcels_.IsSequenceFullyConsumed()) {
           status_.flags |= IPCZ_PORTAL_STATUS_DEAD;
@@ -174,16 +173,13 @@
                                   dispatcher);
       }
     } else if (link_type.is_peripheral_inward()) {
-      if (!outbound_parcels_.SetFinalSequenceLength(sequence_length)) {
+      if (!outbound_parcels_.SetFinalSequenceLength(sequence_length.value_or(
+              outbound_parcels_.GetCurrentSequenceLength()))) {
         return false;
       }
     }
   }
 
-  if (inward_forwarding_link) {
-    inward_forwarding_link->AcceptRouteClosure(sequence_length);
-  }
-
   Flush();
   return true;
 }
@@ -336,18 +332,16 @@
 
 void Router::BeginProxyingToNewRouter(NodeLink& to_node_link,
                                       const RouterDescriptor& descriptor) {
-  // Acquire a reference to the sublink created by an earlier call to
-  // SerializeNewRouter().
-  const absl::optional<NodeLink::Sublink> new_sublink =
-      to_node_link.GetSublink(descriptor.new_sublink);
-  if (!new_sublink) {
-    // The sublink has been torn down, presumably because of node disconnection.
-    // Nowhere to proxy now, so we're done.
-    return;
+  // Acquire a reference to the RemoteRouterLink created by an earlier call to
+  // SerializeNewRouter(). If the NodeLink has already been disconnected, this
+  // may be null.
+  Ref<RemoteRouterLink> new_router_link;
+  if (auto new_sublink = to_node_link.GetSublink(descriptor.new_sublink)) {
+    new_router_link = new_sublink->router_link;
   }
 
   bool deactivate_link = false;
-  {
+  if (new_router_link) {
     absl::MutexLock lock(&mutex_);
     ABSL_ASSERT(!inward_link_);
 
@@ -359,12 +353,20 @@
       deactivate_link = true;
     } else {
       // TODO: Initiate proxy removal ASAP now that we're proxying.
-      inward_link_ = new_sublink->router_link;
+      inward_link_ = new_router_link;
     }
   }
 
   if (deactivate_link) {
-    new_sublink->router_link->Deactivate();
+    new_router_link->Deactivate();
+    return;
+  }
+
+  if (!to_node_link.GetSublink(descriptor.new_sublink)) {
+    // If the NodeLink was disconnected since we entered this method but before
+    // `inward_link_` was set above, disconnection will not have been propagated
+    // inward. Remedy that.
+    AcceptRouteClosureFrom(LinkType::kPeripheralInward);
     return;
   }
 
@@ -409,6 +411,7 @@
   Ref<RouterLink> dead_outward_link;
   absl::InlinedVector<Parcel, 2> inbound_parcels;
   absl::InlinedVector<Parcel, 2> outbound_parcels;
+  absl::optional<SequenceNumber> final_inward_sequence_length;
   absl::optional<SequenceNumber> final_outward_sequence_length;
   {
     absl::MutexLock lock(&mutex_);
@@ -448,7 +451,20 @@
       // exists an ultimate destination for any forwarded inbound parcels. So we
       // drop both links now.
       dead_outward_link = std::move(outward_link_);
-      dead_inward_link = std::move(inward_link_);
+    } else if (!inbound_parcels_.ExpectsMoreElements()) {
+      // If the other end of the route is gone and we've received all its
+      // parcels, we can simply drop the outward link in that case.
+      dead_outward_link = std::move(outward_link_);
+    }
+
+    if (inbound_parcels_.IsSequenceFullyConsumed()) {
+      // We won't be receiving anything new from our peer, and if we're a proxy
+      // then we've also forwarded everything already. We can propagate closure
+      // inward and drop the inward link, if applicable.
+      final_inward_sequence_length = inbound_parcels_.final_sequence_length();
+      if (inward_link_) {
+        dead_inward_link = std::move(inward_link_);
+      }
     }
   }
 
@@ -460,15 +476,17 @@
     inward_link->AcceptParcel(parcel);
   }
 
-  if (final_outward_sequence_length) {
-    outward_link->AcceptRouteClosure(*final_outward_sequence_length);
-  }
-
   if (dead_outward_link) {
+    if (final_outward_sequence_length) {
+      dead_outward_link->AcceptRouteClosure(*final_outward_sequence_length);
+    }
     dead_outward_link->Deactivate();
   }
 
   if (dead_inward_link) {
+    if (final_inward_sequence_length) {
+      dead_inward_link->AcceptRouteClosure(*final_inward_sequence_length);
+    }
     dead_inward_link->Deactivate();
   }
 }
diff --git a/src/ipcz/router.h b/src/ipcz/router.h
index 2a09f30..01d21ab 100644
--- a/src/ipcz/router.h
+++ b/src/ipcz/router.h
@@ -15,6 +15,7 @@
 #include "ipcz/sequence_number.h"
 #include "ipcz/trap_set.h"
 #include "third_party/abseil-cpp/absl/synchronization/mutex.h"
+#include "third_party/abseil-cpp/absl/types/optional.h"
 #include "util/ref_counted.h"
 
 namespace ipcz {
@@ -92,9 +93,12 @@
 
   // Accepts notification that the other end of the route has been closed and
   // that the close end transmitted a total of `sequence_length` parcels before
-  // closing.
-  bool AcceptRouteClosureFrom(LinkType link_type,
-                              SequenceNumber sequence_length);
+  // closing. If `sequence_length` is unknown and omitted (due to closure being
+  // forced by disconnection), the current sequence length in the appropriate
+  // direction is used.
+  bool AcceptRouteClosureFrom(
+      LinkType link_type,
+      absl::optional<SequenceNumber> sequence_length = absl::nullopt);
 
   // Retrieves the next available inbound parcel from this Router, if present.
   IpczResult GetNextInboundParcel(IpczGetFlags flags,
diff --git a/src/reference_drivers/async_reference_driver.cc b/src/reference_drivers/async_reference_driver.cc
new file mode 100644
index 0000000..647e423
--- /dev/null
+++ b/src/reference_drivers/async_reference_driver.cc
@@ -0,0 +1,239 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "reference_drivers/async_reference_driver.h"
+
+#include <cstdint>
+#include <memory>
+#include <thread>
+#include <tuple>
+#include <utility>
+#include <vector>
+
+#include "ipcz/ipcz.h"
+#include "reference_drivers/object.h"
+#include "reference_drivers/single_process_reference_driver_base.h"
+#include "third_party/abseil-cpp/absl/base/macros.h"
+#include "third_party/abseil-cpp/absl/synchronization/mutex.h"
+#include "third_party/abseil-cpp/absl/synchronization/notification.h"
+#include "third_party/abseil-cpp/absl/types/span.h"
+#include "util/ref_counted.h"
+
+namespace ipcz::reference_drivers {
+
+namespace {
+
+// The driver transport implementation for the async reference driver. Each
+// AsyncTransport holds a direct reference to its peer, and transmissions are
+// tasks posted to the peer's task queue. Task are run on a dedicated background
+// thread for each transport.
+class AsyncTransport : public ObjectImpl<AsyncTransport, Object::kTransport> {
+ public:
+  using Pair = std::pair<Ref<AsyncTransport>, Ref<AsyncTransport>>;
+  static Pair CreatePair() {
+    Pair pair{MakeRefCounted<AsyncTransport>(),
+              MakeRefCounted<AsyncTransport>()};
+    std::tie(pair.second->peer_, pair.first->peer_) = pair;
+    return pair;
+  }
+
+  void Activate(IpczHandle transport, IpczTransportActivityHandler handler) {
+    absl::MutexLock lock(&mutex_);
+    transport_ = transport;
+    handler_ = handler;
+    active_ = true;
+    task_thread_ =
+        std::make_unique<std::thread>(&RunTaskThread, WrapRefCounted(this));
+  }
+
+  void Deactivate() {
+    std::unique_ptr<std::thread> task_thread_to_join;
+    {
+      absl::MutexLock lock(&mutex_);
+      // Join the task thread if we're not on it; otherwise detach it.
+      // Detachment is safe: the thread owns a ref to `this` as long as it's
+      // running, and it will terminate very soon after deactivation.
+      ABSL_HARDENING_ASSERT(task_thread_);
+      if (task_thread_->get_id() != std::this_thread::get_id()) {
+        task_thread_to_join = std::move(task_thread_);
+      } else {
+        task_thread_->detach();
+        task_thread_.reset();
+      }
+      active_ = false;
+      NotifyTaskThread();
+    }
+    if (task_thread_to_join) {
+      task_thread_to_join->join();
+    }
+  }
+
+  IpczResult Transmit(absl::Span<const uint8_t> data,
+                      absl::Span<const IpczDriverHandle> handles) {
+    peer_->PostTask({data, handles});
+    return IPCZ_RESULT_OK;
+  }
+
+  // Object:
+  IpczResult Close() override {
+    peer_->PostTask(Task{IPCZ_TRANSPORT_ACTIVITY_ERROR});
+    peer_.reset();
+    return IPCZ_RESULT_OK;
+  }
+
+ private:
+  class Task {
+   public:
+    Task(absl::Span<const uint8_t> data,
+         absl::Span<const IpczDriverHandle> handles)
+        : data_(data.begin(), data.end()),
+          handles_(handles.begin(), handles.end()) {}
+    explicit Task(IpczTransportActivityFlags flags) : flags_(flags) {}
+    Task(Task&&) = default;
+    ~Task() {
+      for (IpczDriverHandle handle : handles_) {
+        Object::TakeFromHandle(handle)->Close();
+      }
+    }
+
+    IpczResult Run(AsyncTransport& transport) {
+      std::vector<IpczDriverHandle> handles = std::move(handles_);
+      return transport.Notify(flags_, data_, handles);
+    }
+
+   private:
+    std::vector<uint8_t> data_;
+    std::vector<IpczDriverHandle> handles_;
+    IpczTransportActivityFlags flags_ = IPCZ_NO_FLAGS;
+  };
+
+  void PostTask(Task task) {
+    absl::MutexLock lock(&mutex_);
+    tasks_.push_back(std::move(task));
+    NotifyTaskThread();
+  }
+
+  static void RunTaskThread(Ref<AsyncTransport> transport) {
+    transport->RunTasksUntilDeactivation();
+    transport->Notify(IPCZ_TRANSPORT_ACTIVITY_DEACTIVATED);
+  }
+
+  void RunTasksUntilDeactivation() {
+    for (;;) {
+      std::vector<Task> tasks;
+      absl::Notification* wait_for_task = nullptr;
+      {
+        absl::MutexLock lock(&mutex_);
+        if (!active_) {
+          return;
+        }
+        tasks.swap(tasks_);
+        if (tasks.empty()) {
+          wait_for_task = wait_for_task_.get();
+        }
+      }
+
+      if (wait_for_task) {
+        wait_for_task->WaitForNotification();
+        absl::MutexLock lock(&mutex_);
+        wait_for_task_ = std::make_unique<absl::Notification>();
+        continue;
+      }
+
+      for (Task& task : tasks) {
+        const IpczResult result = task.Run(*this);
+        if (result != IPCZ_RESULT_OK && result != IPCZ_RESULT_UNIMPLEMENTED) {
+          Notify(IPCZ_TRANSPORT_ACTIVITY_ERROR);
+          return;
+        }
+      }
+    }
+  }
+
+  IpczResult Notify(IpczTransportActivityFlags flags,
+                    absl::Span<const uint8_t> data = {},
+                    absl::Span<const IpczDriverHandle> handles = {}) {
+    return handler_(transport_, data.data(), data.size(), handles.data(),
+                    handles.size(), flags, nullptr);
+  }
+
+  void NotifyTaskThread() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
+    if (wait_for_task_ && !wait_for_task_->HasBeenNotified()) {
+      wait_for_task_->Notify();
+    }
+  }
+
+  Ref<AsyncTransport> peer_;
+  IpczHandle transport_ = IPCZ_INVALID_HANDLE;
+  IpczTransportActivityHandler handler_;
+
+  absl::Mutex mutex_;
+  bool active_ ABSL_GUARDED_BY(mutex_) = false;
+  std::unique_ptr<std::thread> task_thread_ ABSL_GUARDED_BY(mutex_);
+  std::vector<Task> tasks_ ABSL_GUARDED_BY(mutex_);
+  std::unique_ptr<absl::Notification> wait_for_task_ ABSL_GUARDED_BY(mutex_) =
+      std::make_unique<absl::Notification>();
+};
+
+IpczResult IPCZ_API CreateTransports(IpczDriverHandle,
+                                     IpczDriverHandle,
+                                     uint32_t,
+                                     const void*,
+                                     IpczDriverHandle* transport0,
+                                     IpczDriverHandle* transport1) {
+  auto [first, second] = AsyncTransport::CreatePair();
+  *transport0 = Object::ReleaseAsHandle(std::move(first));
+  *transport1 = Object::ReleaseAsHandle(std::move(second));
+  return IPCZ_RESULT_OK;
+}
+
+IpczResult IPCZ_API ActivateTransport(IpczDriverHandle transport,
+                                      IpczHandle ipcz_transport,
+                                      IpczTransportActivityHandler handler,
+                                      uint32_t,
+                                      const void*) {
+  AsyncTransport::FromHandle(transport)->Activate(ipcz_transport, handler);
+  return IPCZ_RESULT_OK;
+}
+
+IpczResult IPCZ_API DeactivateTransport(IpczDriverHandle transport,
+                                        uint32_t,
+                                        const void*) {
+  AsyncTransport::FromHandle(transport)->Deactivate();
+  return IPCZ_RESULT_OK;
+}
+
+IpczResult IPCZ_API Transmit(IpczDriverHandle transport,
+                             const void* data,
+                             size_t num_bytes,
+                             const IpczDriverHandle* handles,
+                             size_t num_handles,
+                             uint32_t,
+                             const void*) {
+  AsyncTransport::FromHandle(transport)->Transmit(
+      {static_cast<const uint8_t*>(data), num_bytes}, {handles, num_handles});
+  return IPCZ_RESULT_OK;
+}
+
+}  // namespace
+
+// Note that this driver inherits most of its implementation from the baseline
+// single-process driver. Only transport operation is overridden here.
+const IpczDriver kAsyncReferenceDriver = {
+    sizeof(kAsyncReferenceDriver),
+    kSingleProcessReferenceDriverBase.Close,
+    kSingleProcessReferenceDriverBase.Serialize,
+    kSingleProcessReferenceDriverBase.Deserialize,
+    CreateTransports,
+    ActivateTransport,
+    DeactivateTransport,
+    Transmit,
+    kSingleProcessReferenceDriverBase.AllocateSharedMemory,
+    kSingleProcessReferenceDriverBase.GetSharedMemoryInfo,
+    kSingleProcessReferenceDriverBase.DuplicateSharedMemory,
+    kSingleProcessReferenceDriverBase.MapSharedMemory,
+    kSingleProcessReferenceDriverBase.GenerateRandomBytes,
+};
+
+}  // namespace ipcz::reference_drivers
diff --git a/src/reference_drivers/async_reference_driver.h b/src/reference_drivers/async_reference_driver.h
new file mode 100644
index 0000000..e596e60
--- /dev/null
+++ b/src/reference_drivers/async_reference_driver.h
@@ -0,0 +1,20 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef IPCZ_SRC_DRIVERS_ASYNC_REFERENCE_DRIVER_H_
+#define IPCZ_SRC_DRIVERS_ASYNC_REFERENCE_DRIVER_H_
+
+#include "ipcz/ipcz.h"
+
+namespace ipcz::reference_drivers {
+
+// An async driver for single-process tests. Each transport runs its own thread
+// with a simple task queue. Transmission from a transport posts a task to its
+// peer's queue. The resulting non-determinism effectively simulates a typical
+// production driver, without the complexity of a multiprocess environment.
+extern const IpczDriver kAsyncReferenceDriver;
+
+}  // namespace ipcz::reference_drivers
+
+#endif  // IPCZ_SRC_DRIVERS_ASYNC_REFERENCE_DRIVER_H_
diff --git a/src/remote_portal_test.cc b/src/remote_portal_test.cc
index bbcef8d..2d41f4f 100644
--- a/src/remote_portal_test.cc
+++ b/src/remote_portal_test.cc
@@ -80,6 +80,7 @@
     EXPECT_EQ(kTestMessage1, message);
   }
 
+  PingPong(b);
   CloseAll({q, b});
 }
 
@@ -99,6 +100,7 @@
     EXPECT_EQ(kTestMessage2, message);
   }
 
+  PingPong(b);
   CloseAll({p, b});
 }
 
@@ -110,6 +112,8 @@
   EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(c1, nullptr, {&p, 1}));
   EXPECT_EQ(IPCZ_RESULT_OK, Put(c2, "", {&p, 1}));
 
+  WaitForPingAndReply(c1);
+  WaitForPingAndReply(c2);
   CloseAll({c1, c2});
 }
 
diff --git a/src/test/multinode_test.cc b/src/test/multinode_test.cc
index 91a1059..920895d 100644
--- a/src/test/multinode_test.cc
+++ b/src/test/multinode_test.cc
@@ -9,6 +9,7 @@
 #include <thread>
 
 #include "ipcz/ipcz.h"
+#include "reference_drivers/async_reference_driver.h"
 #include "reference_drivers/sync_reference_driver.h"
 #include "third_party/abseil-cpp/absl/base/macros.h"
 #include "third_party/abseil-cpp/absl/types/optional.h"
@@ -114,6 +115,9 @@
     case DriverMode::kSync:
       return reference_drivers::kSyncReferenceDriver;
 
+    case DriverMode::kAsync:
+      return reference_drivers::kAsyncReferenceDriver;
+
 #if BUILDFLAG(ENABLE_IPCZ_MULTIPROCESS_TESTS)
     case DriverMode::kMultiprocess:
       return reference_drivers::kMultiprocessReferenceDriver;
diff --git a/src/test/multinode_test.h b/src/test/multinode_test.h
index 5ae41e8..7f0b98b 100644
--- a/src/test/multinode_test.h
+++ b/src/test/multinode_test.h
@@ -307,6 +307,7 @@
   INSTANTIATE_TEST_SUITE_P(                       \
       , suite,                                    \
       ::testing::Values(                          \
-          ipcz::test::DriverMode::kSync IPCZ_EXTRA_DRIVER_MODES))
+          ipcz::test::DriverMode::kSync,          \
+          ipcz::test::DriverMode::kAsync IPCZ_EXTRA_DRIVER_MODES))
 
 #endif  // IPCZ_SRC_TEST_MULTINODE_TEST_H_
diff --git a/src/test/test_base.cc b/src/test/test_base.cc
index 5bc6674..f10e0ff 100644
--- a/src/test/test_base.cc
+++ b/src/test/test_base.cc
@@ -155,6 +155,16 @@
   return Get(portal, message, handles);
 }
 
+void TestBase::PingPong(IpczHandle portal) {
+  EXPECT_EQ(IPCZ_RESULT_OK, Put(portal, {}));
+  EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(portal));
+}
+
+void TestBase::WaitForPingAndReply(IpczHandle portal) {
+  EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(portal));
+  EXPECT_EQ(IPCZ_RESULT_OK, Put(portal, {}));
+}
+
 void TestBase::VerifyEndToEnd(IpczHandle portal) {
   static const char kTestMessage[] = "Ping!!!";
   std::string message;
diff --git a/src/test/test_base.h b/src/test/test_base.h
index b9e7cc1..f9e4a08 100644
--- a/src/test/test_base.h
+++ b/src/test/test_base.h
@@ -83,6 +83,13 @@
                        std::string* message = nullptr,
                        absl::Span<IpczHandle> handles = {});
 
+  // Sends an empty parcel from `portal` and expects an empty parcel in return.
+  void PingPong(IpczHandle portal);
+
+  // Waits for an empty parcel on `portal` and then sends an empty parcel in
+  // return.
+  void WaitForPingAndReply(IpczHandle portal);
+
   // Sends a parcel from `portal` and expects to receive a parcel back with the
   // same contents. Typical usage is to call this from two different nodes, on
   // a pair of connected portals, in order to verify working communication