ipcz: Async reference driver
This differs from the sync driver only by its transport implementation.
Instead of synchronous and reentrant message dispatch, each transport's
events are queued and dispatched in-order on a dedicated background
thread.
This effectively simulates the same non-deterministic behavior inherent
in multiprocess usage, without the complexity of a multiprocess
environment. Multinode tests are run in this mode to provide additional
coverage and expose any potential race conditions introduced into ipcz
proper.
Bug: 1299283
Change-Id: I78be1fe6a63546677378ca471a915da731085c30
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3724043
Commit-Queue: Ken Rockot <rockot@google.com>
Reviewed-by: Daniel Cheng <dcheng@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1019679}
NOKEYCHECK=True
GitOrigin-RevId: 90ea32dff9126a07ded84387c0ac9ef91745ae56
diff --git a/src/BUILD.gn b/src/BUILD.gn
index 90436cb..75aef4a 100644
--- a/src/BUILD.gn
+++ b/src/BUILD.gn
@@ -137,11 +137,13 @@
testonly = true
public = [
+ "reference_drivers/async_reference_driver.h",
"reference_drivers/blob.h",
"reference_drivers/sync_reference_driver.h",
]
sources = [
+ "reference_drivers/async_reference_driver.cc",
"reference_drivers/blob.cc",
"reference_drivers/object.cc",
"reference_drivers/object.h",
diff --git a/src/ipcz/router.cc b/src/ipcz/router.cc
index 2634bab..d9e9acc 100644
--- a/src/ipcz/router.cc
+++ b/src/ipcz/router.cc
@@ -152,20 +152,19 @@
return true;
}
-bool Router::AcceptRouteClosureFrom(LinkType link_type,
- SequenceNumber sequence_length) {
- Ref<RouterLink> inward_forwarding_link;
+bool Router::AcceptRouteClosureFrom(
+ LinkType link_type,
+ absl::optional<SequenceNumber> sequence_length) {
TrapEventDispatcher dispatcher;
{
absl::MutexLock lock(&mutex_);
if (link_type.is_outward()) {
- if (!inbound_parcels_.SetFinalSequenceLength(sequence_length)) {
+ if (!inbound_parcels_.SetFinalSequenceLength(sequence_length.value_or(
+ inbound_parcels_.GetCurrentSequenceLength()))) {
return false;
}
- if (inward_link_) {
- inward_forwarding_link = inward_link_;
- } else {
+ if (!inward_link_) {
status_.flags |= IPCZ_PORTAL_STATUS_PEER_CLOSED;
if (inbound_parcels_.IsSequenceFullyConsumed()) {
status_.flags |= IPCZ_PORTAL_STATUS_DEAD;
@@ -174,16 +173,13 @@
dispatcher);
}
} else if (link_type.is_peripheral_inward()) {
- if (!outbound_parcels_.SetFinalSequenceLength(sequence_length)) {
+ if (!outbound_parcels_.SetFinalSequenceLength(sequence_length.value_or(
+ outbound_parcels_.GetCurrentSequenceLength()))) {
return false;
}
}
}
- if (inward_forwarding_link) {
- inward_forwarding_link->AcceptRouteClosure(sequence_length);
- }
-
Flush();
return true;
}
@@ -336,18 +332,16 @@
void Router::BeginProxyingToNewRouter(NodeLink& to_node_link,
const RouterDescriptor& descriptor) {
- // Acquire a reference to the sublink created by an earlier call to
- // SerializeNewRouter().
- const absl::optional<NodeLink::Sublink> new_sublink =
- to_node_link.GetSublink(descriptor.new_sublink);
- if (!new_sublink) {
- // The sublink has been torn down, presumably because of node disconnection.
- // Nowhere to proxy now, so we're done.
- return;
+ // Acquire a reference to the RemoteRouterLink created by an earlier call to
+ // SerializeNewRouter(). If the NodeLink has already been disconnected, this
+ // may be null.
+ Ref<RemoteRouterLink> new_router_link;
+ if (auto new_sublink = to_node_link.GetSublink(descriptor.new_sublink)) {
+ new_router_link = new_sublink->router_link;
}
bool deactivate_link = false;
- {
+ if (new_router_link) {
absl::MutexLock lock(&mutex_);
ABSL_ASSERT(!inward_link_);
@@ -359,12 +353,20 @@
deactivate_link = true;
} else {
// TODO: Initiate proxy removal ASAP now that we're proxying.
- inward_link_ = new_sublink->router_link;
+ inward_link_ = new_router_link;
}
}
if (deactivate_link) {
- new_sublink->router_link->Deactivate();
+ new_router_link->Deactivate();
+ return;
+ }
+
+ if (!to_node_link.GetSublink(descriptor.new_sublink)) {
+ // If the NodeLink was disconnected since we entered this method but before
+ // `inward_link_` was set above, disconnection will not have been propagated
+ // inward. Remedy that.
+ AcceptRouteClosureFrom(LinkType::kPeripheralInward);
return;
}
@@ -409,6 +411,7 @@
Ref<RouterLink> dead_outward_link;
absl::InlinedVector<Parcel, 2> inbound_parcels;
absl::InlinedVector<Parcel, 2> outbound_parcels;
+ absl::optional<SequenceNumber> final_inward_sequence_length;
absl::optional<SequenceNumber> final_outward_sequence_length;
{
absl::MutexLock lock(&mutex_);
@@ -448,7 +451,20 @@
// exists an ultimate destination for any forwarded inbound parcels. So we
// drop both links now.
dead_outward_link = std::move(outward_link_);
- dead_inward_link = std::move(inward_link_);
+ } else if (!inbound_parcels_.ExpectsMoreElements()) {
+ // If the other end of the route is gone and we've received all its
+ // parcels, we can simply drop the outward link in that case.
+ dead_outward_link = std::move(outward_link_);
+ }
+
+ if (inbound_parcels_.IsSequenceFullyConsumed()) {
+ // We won't be receiving anything new from our peer, and if we're a proxy
+ // then we've also forwarded everything already. We can propagate closure
+ // inward and drop the inward link, if applicable.
+ final_inward_sequence_length = inbound_parcels_.final_sequence_length();
+ if (inward_link_) {
+ dead_inward_link = std::move(inward_link_);
+ }
}
}
@@ -460,15 +476,17 @@
inward_link->AcceptParcel(parcel);
}
- if (final_outward_sequence_length) {
- outward_link->AcceptRouteClosure(*final_outward_sequence_length);
- }
-
if (dead_outward_link) {
+ if (final_outward_sequence_length) {
+ dead_outward_link->AcceptRouteClosure(*final_outward_sequence_length);
+ }
dead_outward_link->Deactivate();
}
if (dead_inward_link) {
+ if (final_inward_sequence_length) {
+ dead_inward_link->AcceptRouteClosure(*final_inward_sequence_length);
+ }
dead_inward_link->Deactivate();
}
}
diff --git a/src/ipcz/router.h b/src/ipcz/router.h
index 2a09f30..01d21ab 100644
--- a/src/ipcz/router.h
+++ b/src/ipcz/router.h
@@ -15,6 +15,7 @@
#include "ipcz/sequence_number.h"
#include "ipcz/trap_set.h"
#include "third_party/abseil-cpp/absl/synchronization/mutex.h"
+#include "third_party/abseil-cpp/absl/types/optional.h"
#include "util/ref_counted.h"
namespace ipcz {
@@ -92,9 +93,12 @@
// Accepts notification that the other end of the route has been closed and
// that the close end transmitted a total of `sequence_length` parcels before
- // closing.
- bool AcceptRouteClosureFrom(LinkType link_type,
- SequenceNumber sequence_length);
+ // closing. If `sequence_length` is unknown and omitted (due to closure being
+ // forced by disconnection), the current sequence length in the appropriate
+ // direction is used.
+ bool AcceptRouteClosureFrom(
+ LinkType link_type,
+ absl::optional<SequenceNumber> sequence_length = absl::nullopt);
// Retrieves the next available inbound parcel from this Router, if present.
IpczResult GetNextInboundParcel(IpczGetFlags flags,
diff --git a/src/reference_drivers/async_reference_driver.cc b/src/reference_drivers/async_reference_driver.cc
new file mode 100644
index 0000000..647e423
--- /dev/null
+++ b/src/reference_drivers/async_reference_driver.cc
@@ -0,0 +1,239 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "reference_drivers/async_reference_driver.h"
+
+#include <cstdint>
+#include <memory>
+#include <thread>
+#include <tuple>
+#include <utility>
+#include <vector>
+
+#include "ipcz/ipcz.h"
+#include "reference_drivers/object.h"
+#include "reference_drivers/single_process_reference_driver_base.h"
+#include "third_party/abseil-cpp/absl/base/macros.h"
+#include "third_party/abseil-cpp/absl/synchronization/mutex.h"
+#include "third_party/abseil-cpp/absl/synchronization/notification.h"
+#include "third_party/abseil-cpp/absl/types/span.h"
+#include "util/ref_counted.h"
+
+namespace ipcz::reference_drivers {
+
+namespace {
+
+// The driver transport implementation for the async reference driver. Each
+// AsyncTransport holds a direct reference to its peer, and transmissions are
+// tasks posted to the peer's task queue. Task are run on a dedicated background
+// thread for each transport.
+class AsyncTransport : public ObjectImpl<AsyncTransport, Object::kTransport> {
+ public:
+ using Pair = std::pair<Ref<AsyncTransport>, Ref<AsyncTransport>>;
+ static Pair CreatePair() {
+ Pair pair{MakeRefCounted<AsyncTransport>(),
+ MakeRefCounted<AsyncTransport>()};
+ std::tie(pair.second->peer_, pair.first->peer_) = pair;
+ return pair;
+ }
+
+ void Activate(IpczHandle transport, IpczTransportActivityHandler handler) {
+ absl::MutexLock lock(&mutex_);
+ transport_ = transport;
+ handler_ = handler;
+ active_ = true;
+ task_thread_ =
+ std::make_unique<std::thread>(&RunTaskThread, WrapRefCounted(this));
+ }
+
+ void Deactivate() {
+ std::unique_ptr<std::thread> task_thread_to_join;
+ {
+ absl::MutexLock lock(&mutex_);
+ // Join the task thread if we're not on it; otherwise detach it.
+ // Detachment is safe: the thread owns a ref to `this` as long as it's
+ // running, and it will terminate very soon after deactivation.
+ ABSL_HARDENING_ASSERT(task_thread_);
+ if (task_thread_->get_id() != std::this_thread::get_id()) {
+ task_thread_to_join = std::move(task_thread_);
+ } else {
+ task_thread_->detach();
+ task_thread_.reset();
+ }
+ active_ = false;
+ NotifyTaskThread();
+ }
+ if (task_thread_to_join) {
+ task_thread_to_join->join();
+ }
+ }
+
+ IpczResult Transmit(absl::Span<const uint8_t> data,
+ absl::Span<const IpczDriverHandle> handles) {
+ peer_->PostTask({data, handles});
+ return IPCZ_RESULT_OK;
+ }
+
+ // Object:
+ IpczResult Close() override {
+ peer_->PostTask(Task{IPCZ_TRANSPORT_ACTIVITY_ERROR});
+ peer_.reset();
+ return IPCZ_RESULT_OK;
+ }
+
+ private:
+ class Task {
+ public:
+ Task(absl::Span<const uint8_t> data,
+ absl::Span<const IpczDriverHandle> handles)
+ : data_(data.begin(), data.end()),
+ handles_(handles.begin(), handles.end()) {}
+ explicit Task(IpczTransportActivityFlags flags) : flags_(flags) {}
+ Task(Task&&) = default;
+ ~Task() {
+ for (IpczDriverHandle handle : handles_) {
+ Object::TakeFromHandle(handle)->Close();
+ }
+ }
+
+ IpczResult Run(AsyncTransport& transport) {
+ std::vector<IpczDriverHandle> handles = std::move(handles_);
+ return transport.Notify(flags_, data_, handles);
+ }
+
+ private:
+ std::vector<uint8_t> data_;
+ std::vector<IpczDriverHandle> handles_;
+ IpczTransportActivityFlags flags_ = IPCZ_NO_FLAGS;
+ };
+
+ void PostTask(Task task) {
+ absl::MutexLock lock(&mutex_);
+ tasks_.push_back(std::move(task));
+ NotifyTaskThread();
+ }
+
+ static void RunTaskThread(Ref<AsyncTransport> transport) {
+ transport->RunTasksUntilDeactivation();
+ transport->Notify(IPCZ_TRANSPORT_ACTIVITY_DEACTIVATED);
+ }
+
+ void RunTasksUntilDeactivation() {
+ for (;;) {
+ std::vector<Task> tasks;
+ absl::Notification* wait_for_task = nullptr;
+ {
+ absl::MutexLock lock(&mutex_);
+ if (!active_) {
+ return;
+ }
+ tasks.swap(tasks_);
+ if (tasks.empty()) {
+ wait_for_task = wait_for_task_.get();
+ }
+ }
+
+ if (wait_for_task) {
+ wait_for_task->WaitForNotification();
+ absl::MutexLock lock(&mutex_);
+ wait_for_task_ = std::make_unique<absl::Notification>();
+ continue;
+ }
+
+ for (Task& task : tasks) {
+ const IpczResult result = task.Run(*this);
+ if (result != IPCZ_RESULT_OK && result != IPCZ_RESULT_UNIMPLEMENTED) {
+ Notify(IPCZ_TRANSPORT_ACTIVITY_ERROR);
+ return;
+ }
+ }
+ }
+ }
+
+ IpczResult Notify(IpczTransportActivityFlags flags,
+ absl::Span<const uint8_t> data = {},
+ absl::Span<const IpczDriverHandle> handles = {}) {
+ return handler_(transport_, data.data(), data.size(), handles.data(),
+ handles.size(), flags, nullptr);
+ }
+
+ void NotifyTaskThread() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
+ if (wait_for_task_ && !wait_for_task_->HasBeenNotified()) {
+ wait_for_task_->Notify();
+ }
+ }
+
+ Ref<AsyncTransport> peer_;
+ IpczHandle transport_ = IPCZ_INVALID_HANDLE;
+ IpczTransportActivityHandler handler_;
+
+ absl::Mutex mutex_;
+ bool active_ ABSL_GUARDED_BY(mutex_) = false;
+ std::unique_ptr<std::thread> task_thread_ ABSL_GUARDED_BY(mutex_);
+ std::vector<Task> tasks_ ABSL_GUARDED_BY(mutex_);
+ std::unique_ptr<absl::Notification> wait_for_task_ ABSL_GUARDED_BY(mutex_) =
+ std::make_unique<absl::Notification>();
+};
+
+IpczResult IPCZ_API CreateTransports(IpczDriverHandle,
+ IpczDriverHandle,
+ uint32_t,
+ const void*,
+ IpczDriverHandle* transport0,
+ IpczDriverHandle* transport1) {
+ auto [first, second] = AsyncTransport::CreatePair();
+ *transport0 = Object::ReleaseAsHandle(std::move(first));
+ *transport1 = Object::ReleaseAsHandle(std::move(second));
+ return IPCZ_RESULT_OK;
+}
+
+IpczResult IPCZ_API ActivateTransport(IpczDriverHandle transport,
+ IpczHandle ipcz_transport,
+ IpczTransportActivityHandler handler,
+ uint32_t,
+ const void*) {
+ AsyncTransport::FromHandle(transport)->Activate(ipcz_transport, handler);
+ return IPCZ_RESULT_OK;
+}
+
+IpczResult IPCZ_API DeactivateTransport(IpczDriverHandle transport,
+ uint32_t,
+ const void*) {
+ AsyncTransport::FromHandle(transport)->Deactivate();
+ return IPCZ_RESULT_OK;
+}
+
+IpczResult IPCZ_API Transmit(IpczDriverHandle transport,
+ const void* data,
+ size_t num_bytes,
+ const IpczDriverHandle* handles,
+ size_t num_handles,
+ uint32_t,
+ const void*) {
+ AsyncTransport::FromHandle(transport)->Transmit(
+ {static_cast<const uint8_t*>(data), num_bytes}, {handles, num_handles});
+ return IPCZ_RESULT_OK;
+}
+
+} // namespace
+
+// Note that this driver inherits most of its implementation from the baseline
+// single-process driver. Only transport operation is overridden here.
+const IpczDriver kAsyncReferenceDriver = {
+ sizeof(kAsyncReferenceDriver),
+ kSingleProcessReferenceDriverBase.Close,
+ kSingleProcessReferenceDriverBase.Serialize,
+ kSingleProcessReferenceDriverBase.Deserialize,
+ CreateTransports,
+ ActivateTransport,
+ DeactivateTransport,
+ Transmit,
+ kSingleProcessReferenceDriverBase.AllocateSharedMemory,
+ kSingleProcessReferenceDriverBase.GetSharedMemoryInfo,
+ kSingleProcessReferenceDriverBase.DuplicateSharedMemory,
+ kSingleProcessReferenceDriverBase.MapSharedMemory,
+ kSingleProcessReferenceDriverBase.GenerateRandomBytes,
+};
+
+} // namespace ipcz::reference_drivers
diff --git a/src/reference_drivers/async_reference_driver.h b/src/reference_drivers/async_reference_driver.h
new file mode 100644
index 0000000..e596e60
--- /dev/null
+++ b/src/reference_drivers/async_reference_driver.h
@@ -0,0 +1,20 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef IPCZ_SRC_DRIVERS_ASYNC_REFERENCE_DRIVER_H_
+#define IPCZ_SRC_DRIVERS_ASYNC_REFERENCE_DRIVER_H_
+
+#include "ipcz/ipcz.h"
+
+namespace ipcz::reference_drivers {
+
+// An async driver for single-process tests. Each transport runs its own thread
+// with a simple task queue. Transmission from a transport posts a task to its
+// peer's queue. The resulting non-determinism effectively simulates a typical
+// production driver, without the complexity of a multiprocess environment.
+extern const IpczDriver kAsyncReferenceDriver;
+
+} // namespace ipcz::reference_drivers
+
+#endif // IPCZ_SRC_DRIVERS_ASYNC_REFERENCE_DRIVER_H_
diff --git a/src/remote_portal_test.cc b/src/remote_portal_test.cc
index bbcef8d..2d41f4f 100644
--- a/src/remote_portal_test.cc
+++ b/src/remote_portal_test.cc
@@ -80,6 +80,7 @@
EXPECT_EQ(kTestMessage1, message);
}
+ PingPong(b);
CloseAll({q, b});
}
@@ -99,6 +100,7 @@
EXPECT_EQ(kTestMessage2, message);
}
+ PingPong(b);
CloseAll({p, b});
}
@@ -110,6 +112,8 @@
EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(c1, nullptr, {&p, 1}));
EXPECT_EQ(IPCZ_RESULT_OK, Put(c2, "", {&p, 1}));
+ WaitForPingAndReply(c1);
+ WaitForPingAndReply(c2);
CloseAll({c1, c2});
}
diff --git a/src/test/multinode_test.cc b/src/test/multinode_test.cc
index 91a1059..920895d 100644
--- a/src/test/multinode_test.cc
+++ b/src/test/multinode_test.cc
@@ -9,6 +9,7 @@
#include <thread>
#include "ipcz/ipcz.h"
+#include "reference_drivers/async_reference_driver.h"
#include "reference_drivers/sync_reference_driver.h"
#include "third_party/abseil-cpp/absl/base/macros.h"
#include "third_party/abseil-cpp/absl/types/optional.h"
@@ -114,6 +115,9 @@
case DriverMode::kSync:
return reference_drivers::kSyncReferenceDriver;
+ case DriverMode::kAsync:
+ return reference_drivers::kAsyncReferenceDriver;
+
#if BUILDFLAG(ENABLE_IPCZ_MULTIPROCESS_TESTS)
case DriverMode::kMultiprocess:
return reference_drivers::kMultiprocessReferenceDriver;
diff --git a/src/test/multinode_test.h b/src/test/multinode_test.h
index 5ae41e8..7f0b98b 100644
--- a/src/test/multinode_test.h
+++ b/src/test/multinode_test.h
@@ -307,6 +307,7 @@
INSTANTIATE_TEST_SUITE_P( \
, suite, \
::testing::Values( \
- ipcz::test::DriverMode::kSync IPCZ_EXTRA_DRIVER_MODES))
+ ipcz::test::DriverMode::kSync, \
+ ipcz::test::DriverMode::kAsync IPCZ_EXTRA_DRIVER_MODES))
#endif // IPCZ_SRC_TEST_MULTINODE_TEST_H_
diff --git a/src/test/test_base.cc b/src/test/test_base.cc
index 5bc6674..f10e0ff 100644
--- a/src/test/test_base.cc
+++ b/src/test/test_base.cc
@@ -155,6 +155,16 @@
return Get(portal, message, handles);
}
+void TestBase::PingPong(IpczHandle portal) {
+ EXPECT_EQ(IPCZ_RESULT_OK, Put(portal, {}));
+ EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(portal));
+}
+
+void TestBase::WaitForPingAndReply(IpczHandle portal) {
+ EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(portal));
+ EXPECT_EQ(IPCZ_RESULT_OK, Put(portal, {}));
+}
+
void TestBase::VerifyEndToEnd(IpczHandle portal) {
static const char kTestMessage[] = "Ping!!!";
std::string message;
diff --git a/src/test/test_base.h b/src/test/test_base.h
index b9e7cc1..f9e4a08 100644
--- a/src/test/test_base.h
+++ b/src/test/test_base.h
@@ -83,6 +83,13 @@
std::string* message = nullptr,
absl::Span<IpczHandle> handles = {});
+ // Sends an empty parcel from `portal` and expects an empty parcel in return.
+ void PingPong(IpczHandle portal);
+
+ // Waits for an empty parcel on `portal` and then sends an empty parcel in
+ // return.
+ void WaitForPingAndReply(IpczHandle portal);
+
// Sends a parcel from `portal` and expects to receive a parcel back with the
// same contents. Typical usage is to call this from two different nodes, on
// a pair of connected portals, in order to verify working communication