blob: d565cf78c2b19d701918907da34676b21f25d791 [file] [log] [blame]
// Copyright 2015 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <stddef.h>
#include <stdint.h>
#include <algorithm>
#include <array>
#include <memory>
#include <tuple>
#include <utility>
#include "base/functional/bind.h"
#include "base/functional/callback.h"
#include "base/functional/callback_helpers.h"
#include "base/memory/ptr_util.h"
#include "base/memory/raw_ptr.h"
#include "base/run_loop.h"
#include "base/synchronization/waitable_event.h"
#include "base/task/sequenced_task_runner.h"
#include "base/task/single_thread_task_runner.h"
#include "base/task/thread_pool.h"
#include "base/test/bind.h"
#include "base/test/scoped_feature_list.h"
#include "base/test/task_environment.h"
#include "base/test/test_future.h"
#include "base/threading/sequence_bound.h"
#include "base/threading/thread.h"
#include "base/threading/thread_restrictions.h"
#include "mojo/public/cpp/bindings/associated_receiver.h"
#include "mojo/public/cpp/bindings/associated_remote.h"
#include "mojo/public/cpp/bindings/lib/multiplex_router.h"
#include "mojo/public/cpp/bindings/pending_associated_receiver.h"
#include "mojo/public/cpp/bindings/pending_associated_remote.h"
#include "mojo/public/cpp/bindings/receiver.h"
#include "mojo/public/cpp/bindings/remote.h"
#include "mojo/public/cpp/bindings/shared_associated_remote.h"
#include "mojo/public/cpp/bindings/tests/associated_interface_unittest.test-mojom.h"
#include "mojo/public/cpp/bindings/unique_associated_receiver_set.h"
#include "mojo/public/cpp/system/functions.h"
#include "mojo/public/interfaces/bindings/tests/ping_service.test-mojom.h"
#include "mojo/public/interfaces/bindings/tests/test_associated_interfaces.test-mojom.h"
#include "mojo/public/interfaces/bindings/tests/test_sync_methods.test-mojom.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace mojo {
namespace test {
namespace associated_interface_unittest {
namespace {
using mojo::internal::MultiplexRouter;
class IntegerSenderImpl : public IntegerSender {
public:
IntegerSenderImpl() = default;
explicit IntegerSenderImpl(PendingAssociatedReceiver<IntegerSender> receiver)
: receiver_(this, std::move(receiver)) {}
~IntegerSenderImpl() override = default;
void set_notify_send_method_called(
base::RepeatingCallback<void(int32_t)> callback) {
notify_send_method_called_ = std::move(callback);
}
void Echo(int32_t value, EchoCallback callback) override {
if (value == -1) {
receiver_.ReportBadMessage("Reporting bad message for value == -1");
return;
}
std::move(callback).Run(value);
}
void Send(int32_t value) override { notify_send_method_called_.Run(value); }
AssociatedReceiver<IntegerSender>* receiver() { return &receiver_; }
private:
AssociatedReceiver<IntegerSender> receiver_{this};
base::RepeatingCallback<void(int32_t)> notify_send_method_called_;
};
class IntegerSenderConnectionImpl : public IntegerSenderConnection {
public:
explicit IntegerSenderConnectionImpl(
PendingReceiver<IntegerSenderConnection> receiver)
: receiver_(this, std::move(receiver)) {}
~IntegerSenderConnectionImpl() override = default;
void GetSender(PendingAssociatedReceiver<IntegerSender> receiver) override {
DCHECK(receiver.is_valid());
senders_.Add(std::make_unique<IntegerSenderImpl>(), std::move(receiver));
}
void AsyncGetSender(AsyncGetSenderCallback callback) override {
PendingAssociatedRemote<IntegerSender> remote;
GetSender(remote.InitWithNewEndpointAndPassReceiver());
std::move(callback).Run(std::move(remote));
}
Receiver<IntegerSenderConnection>* receiver() { return &receiver_; }
private:
Receiver<IntegerSenderConnection> receiver_;
UniqueAssociatedReceiverSet<IntegerSender> senders_;
};
class AssociatedInterfaceTest : public testing::Test {
public:
AssociatedInterfaceTest()
: main_runner_(base::SingleThreadTaskRunner::GetCurrentDefault()) {}
~AssociatedInterfaceTest() override = default;
template <typename T>
PendingAssociatedRemote<T> EmulatePassingAssociatedRemote(
PendingAssociatedRemote<T> remote,
scoped_refptr<MultiplexRouter> source,
scoped_refptr<MultiplexRouter> target) {
ScopedInterfaceEndpointHandle handle = remote.PassHandle();
CHECK(handle.pending_association());
auto id = source->AssociateInterface(std::move(handle));
return PendingAssociatedRemote<T>(target->CreateLocalEndpointHandle(id),
remote.version());
}
void CreateRouterPair(scoped_refptr<MultiplexRouter>* router0,
scoped_refptr<MultiplexRouter>* router1) {
MessagePipe pipe;
*router0 = MultiplexRouter::CreateAndStartReceiving(
std::move(pipe.handle0), MultiplexRouter::MULTI_INTERFACE, true,
main_runner_);
*router1 = MultiplexRouter::CreateAndStartReceiving(
std::move(pipe.handle1), MultiplexRouter::MULTI_INTERFACE, false,
main_runner_);
}
void CreateIntegerSenderWithExistingRouters(
scoped_refptr<MultiplexRouter> router0,
PendingAssociatedRemote<IntegerSender>* remote0,
scoped_refptr<MultiplexRouter> router1,
PendingAssociatedReceiver<IntegerSender>* receiver1) {
*receiver1 = remote0->InitWithNewEndpointAndPassReceiver();
*remote0 =
EmulatePassingAssociatedRemote(std::move(*remote0), router1, router0);
}
void CreateIntegerSender(PendingAssociatedRemote<IntegerSender>* remote,
PendingAssociatedReceiver<IntegerSender>* receiver) {
scoped_refptr<MultiplexRouter> router0;
scoped_refptr<MultiplexRouter> router1;
CreateRouterPair(&router0, &router1);
CreateIntegerSenderWithExistingRouters(router1, remote, router0, receiver);
}
private:
base::test::TaskEnvironment task_environment;
scoped_refptr<base::SequencedTaskRunner> main_runner_;
};
void Fail() {
FAIL() << "Unexpected connection error";
}
TEST_F(AssociatedInterfaceTest, InterfacesAtBothEnds) {
// Bind to the same pipe two associated interfaces, whose implementation lives
// at different ends. Test that the two don't interfere with each other.
scoped_refptr<MultiplexRouter> router0;
scoped_refptr<MultiplexRouter> router1;
CreateRouterPair(&router0, &router1);
PendingAssociatedReceiver<IntegerSender> receiver;
PendingAssociatedRemote<IntegerSender> remote;
CreateIntegerSenderWithExistingRouters(router1, &remote, router0, &receiver);
IntegerSenderImpl impl0(std::move(receiver));
AssociatedRemote<IntegerSender> remote0(std::move(remote));
CreateIntegerSenderWithExistingRouters(router0, &remote, router1, &receiver);
IntegerSenderImpl impl1(std::move(receiver));
AssociatedRemote<IntegerSender> remote1(std::move(remote));
base::RunLoop run_loop, run_loop2;
bool remote0_callback_run = false;
remote0->Echo(123, base::BindLambdaForTesting([&](int32_t value) {
EXPECT_EQ(123, value);
remote0_callback_run = true;
run_loop.Quit();
}));
bool remote1_callback_run = false;
remote1->Echo(456, base::BindLambdaForTesting([&](int32_t value) {
EXPECT_EQ(456, value);
remote1_callback_run = true;
run_loop2.Quit();
}));
run_loop.Run();
run_loop2.Run();
EXPECT_TRUE(remote0_callback_run);
EXPECT_TRUE(remote1_callback_run);
bool remote0_disconnect_handler_run = false;
base::RunLoop run_loop3;
remote0.set_disconnect_handler(base::BindLambdaForTesting([&] {
remote0_disconnect_handler_run = true;
run_loop3.Quit();
}));
impl0.receiver()->reset();
run_loop3.Run();
EXPECT_TRUE(remote0_disconnect_handler_run);
bool remote1_disconnect_handler_run = false;
base::RunLoop run_loop4;
impl1.receiver()->set_disconnect_handler(base::BindLambdaForTesting([&] {
remote1_disconnect_handler_run = true;
run_loop4.Quit();
}));
remote1.reset();
run_loop4.Run();
EXPECT_TRUE(remote1_disconnect_handler_run);
}
class TestSender {
public:
TestSender()
: task_runner_(base::ThreadPool::CreateSequencedTaskRunner({})),
next_sender_(nullptr),
max_value_to_send_(-1) {}
// The following three methods are called on the corresponding sender thread.
void SetUp(PendingAssociatedRemote<IntegerSender> remote,
TestSender* next_sender,
int32_t max_value_to_send) {
CHECK(task_runner()->RunsTasksInCurrentSequence());
remote_.Bind(std::move(remote));
next_sender_ = next_sender ? next_sender : this;
max_value_to_send_ = max_value_to_send;
}
void Send(int32_t value) {
CHECK(task_runner()->RunsTasksInCurrentSequence());
if (value > max_value_to_send_)
return;
remote_->Send(value);
next_sender_->task_runner()->PostTask(
FROM_HERE, base::BindOnce(&TestSender::Send,
base::Unretained(next_sender_), ++value));
}
void TearDown() {
CHECK(task_runner()->RunsTasksInCurrentSequence());
remote_.reset();
}
base::SequencedTaskRunner* task_runner() { return task_runner_.get(); }
private:
scoped_refptr<base::SequencedTaskRunner> task_runner_;
raw_ptr<TestSender> next_sender_;
int32_t max_value_to_send_;
AssociatedRemote<IntegerSender> remote_;
};
class TestReceiver {
public:
TestReceiver()
: task_runner_(base::ThreadPool::CreateSequencedTaskRunner({})),
expected_calls_(0) {}
void SetUp(PendingAssociatedReceiver<IntegerSender> receiver0,
PendingAssociatedReceiver<IntegerSender> receiver1,
size_t expected_calls,
base::OnceClosure notify_finish) {
CHECK(task_runner()->RunsTasksInCurrentSequence());
impl0_ = std::make_unique<IntegerSenderImpl>(std::move(receiver0));
impl0_->set_notify_send_method_called(base::BindRepeating(
&TestReceiver::SendMethodCalled, base::Unretained(this)));
impl1_ = std::make_unique<IntegerSenderImpl>(std::move(receiver1));
impl1_->set_notify_send_method_called(base::BindRepeating(
&TestReceiver::SendMethodCalled, base::Unretained(this)));
expected_calls_ = expected_calls;
notify_finish_ = std::move(notify_finish);
}
void TearDown() {
CHECK(task_runner()->RunsTasksInCurrentSequence());
impl0_.reset();
impl1_.reset();
}
base::SequencedTaskRunner* task_runner() { return task_runner_.get(); }
const std::vector<int32_t>& values() const { return values_; }
private:
void SendMethodCalled(int32_t value) {
values_.push_back(value);
if (values_.size() >= expected_calls_)
std::move(notify_finish_).Run();
}
scoped_refptr<base::SequencedTaskRunner> task_runner_;
size_t expected_calls_;
std::unique_ptr<IntegerSenderImpl> impl0_;
std::unique_ptr<IntegerSenderImpl> impl1_;
std::vector<int32_t> values_;
base::OnceClosure notify_finish_;
};
class NotificationCounter {
public:
NotificationCounter(size_t total_count, base::OnceClosure notify_finish)
: total_count_(total_count),
current_count_(0),
notify_finish_(std::move(notify_finish)) {}
~NotificationCounter() = default;
// Okay to call from any thread.
void OnGotNotification() {
bool finshed = false;
{
base::AutoLock locker(lock_);
CHECK_LT(current_count_, total_count_);
current_count_++;
finshed = current_count_ == total_count_;
}
if (finshed)
std::move(notify_finish_).Run();
}
private:
base::Lock lock_;
const size_t total_count_;
size_t current_count_;
base::OnceClosure notify_finish_;
};
TEST_F(AssociatedInterfaceTest, MultiThreadAccess) {
// Set up four associated interfaces on a message pipe. Use the inteface
// pointers on four threads in parallel; run the interface implementations on
// two threads. Test that multi-threaded access works.
const int32_t kMaxValue = 1000;
MessagePipe pipe;
scoped_refptr<MultiplexRouter> router0;
scoped_refptr<MultiplexRouter> router1;
CreateRouterPair(&router0, &router1);
std::array<PendingAssociatedReceiver<IntegerSender>, 4> pending_receivers;
std::array<PendingAssociatedRemote<IntegerSender>, 4> pending_remotes;
for (size_t i = 0; i < 4; ++i) {
CreateIntegerSenderWithExistingRouters(router1, &pending_remotes[i],
router0, &pending_receivers[i]);
}
std::array<TestSender, 4> senders;
for (size_t i = 0; i < 4; ++i) {
senders[i].task_runner()->PostTask(
FROM_HERE,
base::BindOnce(&TestSender::SetUp, base::Unretained(&senders[i]),
std::move(pending_remotes[i]), nullptr,
kMaxValue * (i + 1) / 4));
}
base::RunLoop run_loop;
std::array<TestReceiver, 2> receivers;
NotificationCounter counter(2, run_loop.QuitClosure());
for (size_t i = 0; i < 2; ++i) {
receivers[i].task_runner()->PostTask(
FROM_HERE,
base::BindOnce(&TestReceiver::SetUp, base::Unretained(&receivers[i]),
std::move(pending_receivers[2 * i]),
std::move(pending_receivers[2 * i + 1]),
static_cast<size_t>(kMaxValue / 2),
base::BindOnce(&NotificationCounter::OnGotNotification,
base::Unretained(&counter))));
}
for (size_t i = 0; i < 4; ++i) {
senders[i].task_runner()->PostTask(
FROM_HERE,
base::BindOnce(&TestSender::Send, base::Unretained(&senders[i]),
kMaxValue * i / 4 + 1));
}
run_loop.Run();
for (size_t i = 0; i < 4; ++i) {
base::RunLoop run_loop2;
senders[i].task_runner()->PostTaskAndReply(
FROM_HERE,
base::BindOnce(&TestSender::TearDown, base::Unretained(&senders[i])),
run_loop2.QuitClosure());
run_loop2.Run();
}
for (size_t i = 0; i < 2; ++i) {
base::RunLoop run_loop2;
receivers[i].task_runner()->PostTaskAndReply(
FROM_HERE,
base::BindOnce(&TestReceiver::TearDown,
base::Unretained(&receivers[i])),
run_loop2.QuitClosure());
run_loop2.Run();
}
EXPECT_EQ(static_cast<size_t>(kMaxValue / 2), receivers[0].values().size());
EXPECT_EQ(static_cast<size_t>(kMaxValue / 2), receivers[1].values().size());
std::vector<int32_t> all_values;
all_values.insert(all_values.end(), receivers[0].values().begin(),
receivers[0].values().end());
all_values.insert(all_values.end(), receivers[1].values().begin(),
receivers[1].values().end());
std::sort(all_values.begin(), all_values.end());
for (size_t i = 0; i < all_values.size(); ++i)
ASSERT_EQ(static_cast<int32_t>(i + 1), all_values[i]);
}
TEST_F(AssociatedInterfaceTest, FIFO) {
// Set up four associated interfaces on a message pipe. Use the inteface
// pointers on four threads; run the interface implementations on two threads.
// Take turns to make calls using the four pointers. Test that FIFO-ness is
// preserved.
const int32_t kMaxValue = 100;
MessagePipe pipe;
scoped_refptr<MultiplexRouter> router0;
scoped_refptr<MultiplexRouter> router1;
CreateRouterPair(&router0, &router1);
std::array<PendingAssociatedReceiver<IntegerSender>, 4> pending_receivers;
std::array<PendingAssociatedRemote<IntegerSender>, 4> pending_remotes;
for (size_t i = 0; i < 4; ++i) {
CreateIntegerSenderWithExistingRouters(router1, &pending_remotes[i],
router0, &pending_receivers[i]);
}
std::array<TestSender, 4> senders;
for (size_t i = 0; i < 4; ++i) {
senders[i].task_runner()->PostTask(
FROM_HERE,
base::BindOnce(&TestSender::SetUp, base::Unretained(&senders[i]),
std::move(pending_remotes[i]),
base::Unretained(&senders[(i + 1) % 4]), kMaxValue));
}
base::RunLoop run_loop;
std::array<TestReceiver, 2> receivers;
NotificationCounter counter(2, run_loop.QuitClosure());
for (size_t i = 0; i < 2; ++i) {
receivers[i].task_runner()->PostTask(
FROM_HERE,
base::BindOnce(&TestReceiver::SetUp, base::Unretained(&receivers[i]),
std::move(pending_receivers[2 * i]),
std::move(pending_receivers[2 * i + 1]),
static_cast<size_t>(kMaxValue / 2),
base::BindOnce(&NotificationCounter::OnGotNotification,
base::Unretained(&counter))));
}
senders[0].task_runner()->PostTask(
FROM_HERE,
base::BindOnce(&TestSender::Send, base::Unretained(&senders[0]), 1));
run_loop.Run();
for (size_t i = 0; i < 4; ++i) {
base::RunLoop run_loop2;
senders[i].task_runner()->PostTaskAndReply(
FROM_HERE,
base::BindOnce(&TestSender::TearDown, base::Unretained(&senders[i])),
run_loop2.QuitClosure());
run_loop2.Run();
}
for (size_t i = 0; i < 2; ++i) {
base::RunLoop run_loop2;
receivers[i].task_runner()->PostTaskAndReply(
FROM_HERE,
base::BindOnce(&TestReceiver::TearDown,
base::Unretained(&receivers[i])),
run_loop2.QuitClosure());
run_loop2.Run();
}
EXPECT_EQ(static_cast<size_t>(kMaxValue / 2), receivers[0].values().size());
EXPECT_EQ(static_cast<size_t>(kMaxValue / 2), receivers[1].values().size());
for (size_t i = 0; i < 2; ++i) {
for (size_t j = 1; j < receivers[i].values().size(); ++j)
EXPECT_LT(receivers[i].values()[j - 1], receivers[i].values()[j]);
}
}
TEST_F(AssociatedInterfaceTest, PassAssociatedInterfaces) {
Remote<IntegerSenderConnection> connection_remote;
IntegerSenderConnectionImpl connection(
connection_remote.BindNewPipeAndPassReceiver());
AssociatedRemote<IntegerSender> sender0;
connection_remote->GetSender(sender0.BindNewEndpointAndPassReceiver());
base::RunLoop run_loop;
sender0->Echo(123, base::BindLambdaForTesting([&](int32_t value) {
EXPECT_EQ(123, value);
run_loop.Quit();
}));
run_loop.Run();
AssociatedRemote<IntegerSender> sender1;
base::RunLoop run_loop2;
connection_remote->AsyncGetSender(base::BindLambdaForTesting(
[&](PendingAssociatedRemote<IntegerSender> sender) {
sender1.Bind(std::move(sender));
run_loop2.Quit();
}));
run_loop2.Run();
EXPECT_TRUE(sender1);
base::RunLoop run_loop3;
sender1->Echo(456, base::BindLambdaForTesting([&](int32_t value) {
EXPECT_EQ(456, value);
run_loop3.Quit();
}));
run_loop3.Run();
}
TEST_F(AssociatedInterfaceTest,
ReceiverWaitAndPauseWhenNoAssociatedInterfaces) {
Remote<IntegerSenderConnection> connection_remote;
IntegerSenderConnectionImpl connection(
connection_remote.BindNewPipeAndPassReceiver());
AssociatedRemote<IntegerSender> sender0;
connection_remote->GetSender(sender0.BindNewEndpointAndPassReceiver());
EXPECT_FALSE(
connection.receiver()->internal_state()->HasAssociatedInterfaces());
// There are no associated interfaces running on the pipe yet. It is okay to
// pause.
connection.receiver()->Pause();
connection.receiver()->Resume();
// There are no associated interfaces running on the pipe yet. It is okay to
// wait.
EXPECT_TRUE(connection.receiver()->WaitForIncomingCall());
// The previous wait has dispatched the GetSender request message, therefore
// an associated interface has been set up on the pipe. It is not allowed to
// wait or pause.
EXPECT_TRUE(
connection.receiver()->internal_state()->HasAssociatedInterfaces());
}
class PingServiceImpl : public PingService {
public:
explicit PingServiceImpl(PendingAssociatedReceiver<PingService> receiver)
: receiver_(this, std::move(receiver)) {}
~PingServiceImpl() override = default;
AssociatedReceiver<PingService>& receiver() { return receiver_; }
void set_ping_handler(base::RepeatingClosure handler) {
ping_handler_ = std::move(handler);
}
// PingService:
void Ping(PingCallback callback) override {
if (ping_handler_)
ping_handler_.Run();
std::move(callback).Run();
}
private:
AssociatedReceiver<PingService> receiver_;
base::RepeatingClosure ping_handler_;
};
class PingProviderImpl : public AssociatedPingProvider {
public:
explicit PingProviderImpl(PendingReceiver<AssociatedPingProvider> receiver)
: receiver_(this, std::move(receiver)) {}
~PingProviderImpl() override = default;
// AssociatedPingProvider:
void GetPing(PendingAssociatedReceiver<PingService> receiver) override {
ping_services_.emplace_back(new PingServiceImpl(std::move(receiver)));
if (expected_receivers_count_ > 0 &&
ping_services_.size() == expected_receivers_count_ && quit_waiting_) {
expected_receivers_count_ = 0;
std::move(quit_waiting_).Run();
}
}
std::vector<std::unique_ptr<PingServiceImpl>>& ping_services() {
return ping_services_;
}
void WaitForReceivers(size_t count) {
DCHECK(!quit_waiting_);
expected_receivers_count_ = count;
base::RunLoop loop;
quit_waiting_ = loop.QuitClosure();
loop.Run();
}
private:
Receiver<AssociatedPingProvider> receiver_;
std::vector<std::unique_ptr<PingServiceImpl>> ping_services_;
size_t expected_receivers_count_ = 0;
base::OnceClosure quit_waiting_;
};
class CallbackFilter : public MessageFilter {
public:
explicit CallbackFilter(base::RepeatingClosure callback)
: callback_(std::move(callback)) {}
~CallbackFilter() override = default;
static std::unique_ptr<CallbackFilter> Wrap(base::RepeatingClosure callback) {
return std::make_unique<CallbackFilter>(std::move(callback));
}
// MessageFilter:
bool WillDispatch(Message* message) override {
callback_.Run();
return true;
}
void DidDispatchOrReject(Message* message, bool accepted) override {}
private:
base::RepeatingClosure callback_;
};
// Verifies that filters work as expected on associated receivers, i.e. that
// they're notified in order, before dispatch; and that each associated
// receiver in a group operates with its own set of filters.
TEST_F(AssociatedInterfaceTest, ReceiverWithFilters) {
Remote<AssociatedPingProvider> provider;
PingProviderImpl provider_impl(provider.BindNewPipeAndPassReceiver());
AssociatedRemote<PingService> ping_a, ping_b;
provider->GetPing(ping_a.BindNewEndpointAndPassReceiver());
provider->GetPing(ping_b.BindNewEndpointAndPassReceiver());
provider_impl.WaitForReceivers(2);
ASSERT_EQ(2u, provider_impl.ping_services().size());
PingServiceImpl& ping_a_impl = *provider_impl.ping_services()[0];
PingServiceImpl& ping_b_impl = *provider_impl.ping_services()[1];
int a_status = 0;
int b_status = 0;
ping_a_impl.receiver().SetFilter(
CallbackFilter::Wrap(base::BindLambdaForTesting([&] {
EXPECT_EQ(0, a_status);
EXPECT_EQ(0, b_status);
a_status = 1;
})));
ping_b_impl.receiver().SetFilter(
CallbackFilter::Wrap(base::BindLambdaForTesting([&] {
EXPECT_EQ(1, a_status);
EXPECT_EQ(0, b_status);
b_status = 1;
})));
for (int i = 0; i < 10; ++i) {
a_status = 0;
b_status = 0;
{
base::RunLoop loop;
ping_a->Ping(loop.QuitClosure());
loop.Run();
}
EXPECT_EQ(1, a_status);
EXPECT_EQ(0, b_status);
{
base::RunLoop loop;
ping_b->Ping(loop.QuitClosure());
loop.Run();
}
EXPECT_EQ(1, a_status);
EXPECT_EQ(1, b_status);
}
}
TEST_F(AssociatedInterfaceTest, AssociatedRemoteFlushForTesting) {
PendingAssociatedReceiver<IntegerSender> receiver;
PendingAssociatedRemote<IntegerSender> remote;
CreateIntegerSender(&remote, &receiver);
IntegerSenderImpl impl0(std::move(receiver));
AssociatedRemote<IntegerSender> remote0(std::move(remote));
remote0.set_disconnect_handler(base::BindOnce(&Fail));
bool remote0_callback_run = false;
remote0->Echo(123, base::BindLambdaForTesting([&](int32_t value) {
EXPECT_EQ(123, value);
remote0_callback_run = true;
}));
remote0.FlushForTesting();
EXPECT_TRUE(remote0_callback_run);
}
TEST_F(AssociatedInterfaceTest, AssociatedRemoteFlushForTestingWithClosedPeer) {
PendingAssociatedReceiver<IntegerSender> receiver;
PendingAssociatedRemote<IntegerSender> remote;
CreateIntegerSender(&remote, &receiver);
AssociatedRemote<IntegerSender> remote0(std::move(remote));
bool called = false;
remote0.set_disconnect_handler(
base::BindLambdaForTesting([&] { called = true; }));
receiver.reset();
remote0.FlushForTesting();
EXPECT_TRUE(called);
remote0.FlushForTesting();
}
TEST_F(AssociatedInterfaceTest, AssociatedBindingFlushForTesting) {
PendingAssociatedReceiver<IntegerSender> receiver;
PendingAssociatedRemote<IntegerSender> remote;
CreateIntegerSender(&remote, &receiver);
IntegerSenderImpl impl0(std::move(receiver));
impl0.receiver()->set_disconnect_handler(base::BindOnce(&Fail));
AssociatedRemote<IntegerSender> remote0(std::move(remote));
bool remote0_callback_run = false;
remote0->Echo(123, base::BindLambdaForTesting([&](int32_t value) {
EXPECT_EQ(123, value);
remote0_callback_run = true;
}));
// Because the flush is sent from the receiver, it only guarantees that the
// request has been received, not the response. The second flush waits for the
// response to be received.
impl0.receiver()->FlushForTesting();
impl0.receiver()->FlushForTesting();
EXPECT_TRUE(remote0_callback_run);
}
TEST_F(AssociatedInterfaceTest,
AssociatedReceiverFlushForTestingWithClosedPeer) {
scoped_refptr<MultiplexRouter> router0;
scoped_refptr<MultiplexRouter> router1;
CreateRouterPair(&router0, &router1);
PendingAssociatedReceiver<IntegerSender> receiver;
{
PendingAssociatedRemote<IntegerSender> remote;
CreateIntegerSenderWithExistingRouters(router1, &remote, router0,
&receiver);
}
IntegerSenderImpl impl(std::move(receiver));
bool called = false;
impl.receiver()->set_disconnect_handler(
base::BindLambdaForTesting([&] { called = true; }));
impl.receiver()->FlushForTesting();
EXPECT_TRUE(called);
impl.receiver()->FlushForTesting();
}
TEST_F(AssociatedInterfaceTest, ReceiverFlushForTesting) {
Remote<IntegerSenderConnection> remote;
IntegerSenderConnectionImpl impl(remote.BindNewPipeAndPassReceiver());
bool called = false;
remote->AsyncGetSender(base::BindLambdaForTesting(
[&](PendingAssociatedRemote<IntegerSender> remote) { called = true; }));
EXPECT_FALSE(called);
impl.receiver()->set_disconnect_handler(base::BindOnce(&Fail));
// Because the flush is sent from the receiver, it only guarantees that the
// request has been received, not the response. The second flush waits for the
// response to be received.
impl.receiver()->FlushForTesting();
impl.receiver()->FlushForTesting();
EXPECT_TRUE(called);
}
TEST_F(AssociatedInterfaceTest, ReceiverFlushForTestingWithClosedPeer) {
Remote<IntegerSenderConnection> remote;
IntegerSenderConnectionImpl impl(remote.BindNewPipeAndPassReceiver());
bool called = false;
impl.receiver()->set_disconnect_handler(
base::BindLambdaForTesting([&] { called = true; }));
remote.reset();
EXPECT_FALSE(called);
impl.receiver()->FlushForTesting();
EXPECT_TRUE(called);
impl.receiver()->FlushForTesting();
}
TEST_F(AssociatedInterfaceTest, RemoteFlushForTesting) {
Remote<IntegerSenderConnection> remote;
IntegerSenderConnectionImpl impl(remote.BindNewPipeAndPassReceiver());
bool called = false;
remote.set_disconnect_handler(base::BindOnce(&Fail));
remote->AsyncGetSender(base::BindLambdaForTesting(
[&](PendingAssociatedRemote<IntegerSender> remote) { called = true; }));
EXPECT_FALSE(called);
remote.FlushForTesting();
EXPECT_TRUE(called);
}
TEST_F(AssociatedInterfaceTest, RemoteFlushForTestingWithClosedPeer) {
Remote<IntegerSenderConnection> remote;
std::ignore = remote.BindNewPipeAndPassReceiver();
bool called = false;
remote.set_disconnect_handler(
base::BindLambdaForTesting([&] { called = true; }));
EXPECT_FALSE(called);
remote.FlushForTesting();
EXPECT_TRUE(called);
remote.FlushForTesting();
}
TEST_F(AssociatedInterfaceTest, AssociatedReceiverConnectionErrorWithReason) {
PendingAssociatedReceiver<IntegerSender> pending_receiver;
PendingAssociatedRemote<IntegerSender> pending_remote;
CreateIntegerSender(&pending_remote, &pending_receiver);
IntegerSenderImpl impl(std::move(pending_receiver));
AssociatedRemote<IntegerSender> remote(std::move(pending_remote));
base::RunLoop run_loop;
impl.receiver()->set_disconnect_with_reason_handler(
base::BindLambdaForTesting(
[&](uint32_t custom_reason, const std::string& description) {
EXPECT_EQ(123u, custom_reason);
EXPECT_EQ("farewell", description);
run_loop.Quit();
}));
remote.ResetWithReason(123u, "farewell");
run_loop.Run();
}
TEST_F(AssociatedInterfaceTest,
PendingAssociatedReceiverConnectionErrorWithReason) {
// Test that AssociatedReceiver is notified with connection error when the
// interface hasn't associated with a message pipe and the peer is closed.
AssociatedRemote<IntegerSender> remote;
IntegerSenderImpl impl(remote.BindNewEndpointAndPassReceiver());
base::RunLoop run_loop;
impl.receiver()->set_disconnect_with_reason_handler(
base::BindLambdaForTesting(
[&](uint32_t custom_reason, const std::string& description) {
EXPECT_EQ(123u, custom_reason);
EXPECT_EQ("farewell", description);
run_loop.Quit();
}));
remote.ResetWithReason(123u, "farewell");
run_loop.Run();
}
TEST_F(AssociatedInterfaceTest, AssociatedRemoteConnectionErrorWithReason) {
PendingAssociatedReceiver<IntegerSender> pending_receiver;
PendingAssociatedRemote<IntegerSender> pending_remote;
CreateIntegerSender(&pending_remote, &pending_receiver);
IntegerSenderImpl impl(std::move(pending_receiver));
AssociatedRemote<IntegerSender> remote(std::move(pending_remote));
base::RunLoop run_loop;
remote.set_disconnect_with_reason_handler(base::BindLambdaForTesting(
[&](uint32_t custom_reason, const std::string& description) {
EXPECT_EQ(456u, custom_reason);
EXPECT_EQ("farewell", description);
run_loop.Quit();
}));
impl.receiver()->ResetWithReason(456u, "farewell");
run_loop.Run();
}
TEST_F(AssociatedInterfaceTest,
PendingAssociatedRemoteConnectionErrorWithReason) {
// Test that AssociatedInterfacePtr is notified with connection error when the
// interface hasn't associated with a message pipe and the peer is closed.
AssociatedRemote<IntegerSender> remote;
auto pending_receiver = remote.BindNewEndpointAndPassReceiver();
base::RunLoop run_loop;
remote.set_disconnect_with_reason_handler(base::BindLambdaForTesting(
[&](uint32_t custom_reason, const std::string& description) {
EXPECT_EQ(456u, custom_reason);
EXPECT_EQ("farewell", description);
run_loop.Quit();
}));
pending_receiver.ResetWithReason(456u, "farewell");
run_loop.Run();
}
TEST_F(AssociatedInterfaceTest, AssociatedRequestResetWithReason) {
PendingAssociatedReceiver<IntegerSender> pending_receiver;
PendingAssociatedRemote<IntegerSender> pending_remote;
CreateIntegerSender(&pending_remote, &pending_receiver);
AssociatedRemote<IntegerSender> remote(std::move(pending_remote));
base::RunLoop run_loop;
remote.set_disconnect_with_reason_handler(base::BindLambdaForTesting(
[&](uint32_t custom_reason, const std::string& description) {
EXPECT_EQ(789u, custom_reason);
EXPECT_EQ("long time no see", description);
run_loop.Quit();
}));
pending_receiver.ResetWithReason(789u, "long time no see");
run_loop.Run();
}
TEST_F(AssociatedInterfaceTest, SharedAssociatedRemote) {
Remote<IntegerSenderConnection> connection_remote;
IntegerSenderConnectionImpl connection(
connection_remote.BindNewPipeAndPassReceiver());
PendingAssociatedRemote<IntegerSender> pending_remote;
connection_remote->GetSender(
pending_remote.InitWithNewEndpointAndPassReceiver());
SharedAssociatedRemote<IntegerSender> shared_sender(
std::move(pending_remote));
{
// Test the thread safe pointer can be used from the interface ptr thread.
base::RunLoop run_loop;
shared_sender->Echo(123, base::BindLambdaForTesting([&](int32_t value) {
EXPECT_EQ(123, value);
run_loop.Quit();
}));
run_loop.Run();
}
// Test the thread safe pointer can be used from another thread.
base::RunLoop run_loop;
auto sender_task_runner = base::ThreadPool::CreateSequencedTaskRunner({});
auto quit_closure = run_loop.QuitClosure();
sender_task_runner->PostTask(
FROM_HERE, base::BindLambdaForTesting([&] {
shared_sender->Echo(
123, base::BindLambdaForTesting([&](int32_t value) {
EXPECT_EQ(123, value);
EXPECT_TRUE(sender_task_runner->RunsTasksInCurrentSequence());
std::move(quit_closure).Run();
}));
}));
// Block until the method callback is called on the background thread.
run_loop.Run();
}
struct ForwarderTestContext {
Remote<IntegerSenderConnection> connection_remote;
std::unique_ptr<IntegerSenderConnectionImpl> interface_impl;
PendingAssociatedReceiver<IntegerSender> sender_receiver;
};
TEST_F(AssociatedInterfaceTest, SharedAssociatedRemoteWithTaskRunner) {
const scoped_refptr<base::SequencedTaskRunner> other_thread_task_runner =
base::ThreadPool::CreateSequencedTaskRunner({});
ForwarderTestContext* context = new ForwarderTestContext();
PendingAssociatedRemote<IntegerSender> pending_remote;
base::WaitableEvent sender_bound_event(
base::WaitableEvent::ResetPolicy::MANUAL,
base::WaitableEvent::InitialState::NOT_SIGNALED);
other_thread_task_runner->PostTask(
FROM_HERE, base::BindLambdaForTesting([&] {
context->interface_impl = std::make_unique<IntegerSenderConnectionImpl>(
context->connection_remote.BindNewPipeAndPassReceiver());
context->connection_remote->GetSender(
pending_remote.InitWithNewEndpointAndPassReceiver());
sender_bound_event.Signal();
}));
sender_bound_event.Wait();
// Create a SharedAssociatedRemote that binds on the background thread and is
// associated with |connection_remote| there.
SharedAssociatedRemote<IntegerSender> shared_sender(std::move(pending_remote),
other_thread_task_runner);
// Issue a call on the shared remote immediately. Note that this may happen
// before the interface is bound on the background thread, and that must be
// OK.
base::RunLoop run_loop;
shared_sender->Echo(123, base::BindLambdaForTesting([&](int32_t value) {
EXPECT_EQ(123, value);
run_loop.Quit();
}));
run_loop.Run();
other_thread_task_runner->DeleteSoon(FROM_HERE, context);
shared_sender.reset();
}
class DiscardingAssociatedPingProviderProvider
: public AssociatedPingProviderProvider {
public:
void GetPingProvider(
PendingAssociatedReceiver<AssociatedPingProvider> receiver) override {}
};
TEST_F(AssociatedInterfaceTest, CloseWithoutBindingAssociatedReceiver) {
DiscardingAssociatedPingProviderProvider ping_provider_provider;
mojo::Receiver<AssociatedPingProviderProvider> receiver(
&ping_provider_provider);
Remote<AssociatedPingProviderProvider> provider_provider;
receiver.Bind(provider_provider.BindNewPipeAndPassReceiver());
AssociatedRemote<AssociatedPingProvider> provider;
provider_provider->GetPingProvider(provider.BindNewEndpointAndPassReceiver());
AssociatedRemote<PingService> ping;
provider->GetPing(ping.BindNewEndpointAndPassReceiver());
base::RunLoop run_loop;
ping.set_disconnect_handler(run_loop.QuitClosure());
run_loop.Run();
}
TEST_F(AssociatedInterfaceTest, AssociateWithDisconnectedPipe) {
AssociatedRemote<IntegerSender> sender;
AssociateWithDisconnectedPipe(
sender.BindNewEndpointAndPassReceiver().PassHandle());
sender->Send(42);
}
TEST_F(AssociatedInterfaceTest, AsyncErrorHandlersWhenClosingPrimaryInterface) {
// Ensures that associated interface error handlers are not invoked
// synchronously when the primary interface pipe is closed. Regression test
// for https://crbug.com/864731.
Remote<IntegerSenderConnection> connection_remote;
IntegerSenderConnectionImpl connection(
connection_remote.BindNewPipeAndPassReceiver());
base::RunLoop loop;
bool error_handler_invoked = false;
AssociatedRemote<IntegerSender> sender0;
connection_remote->GetSender(sender0.BindNewEndpointAndPassReceiver());
sender0.set_disconnect_handler(base::BindLambdaForTesting([&] {
error_handler_invoked = true;
loop.Quit();
}));
// This should not trigger the error handler synchronously...
connection_remote.reset();
EXPECT_FALSE(error_handler_invoked);
// ...but it should be triggered once we spin the scheduler.
loop.Run();
EXPECT_TRUE(error_handler_invoked);
}
TEST_F(AssociatedInterfaceTest, AssociatedReceiverReportBadMessage) {
PendingAssociatedReceiver<IntegerSender> pending_receiver;
PendingAssociatedRemote<IntegerSender> pending_remote;
CreateIntegerSender(&pending_remote, &pending_receiver);
IntegerSenderImpl impl(std::move(pending_receiver));
AssociatedRemote<IntegerSender> remote(std::move(pending_remote));
bool called = false;
base::RunLoop run_loop;
remote.set_disconnect_handler(base::BindLambdaForTesting([&] {
called = true;
run_loop.Quit();
}));
std::string received_error;
SetDefaultProcessErrorHandler(base::BindLambdaForTesting(
[&](const std::string& error) { received_error = error; }));
remote->Echo(-1, IntegerSenderImpl::EchoCallback());
EXPECT_FALSE(called);
run_loop.Run();
EXPECT_TRUE(called);
EXPECT_EQ("Reporting bad message for value == -1", received_error);
SetDefaultProcessErrorHandler(base::NullCallback());
}
TEST_F(AssociatedInterfaceTest, AssociatedReceiverDedicatedPipe) {
PendingAssociatedRemote<IntegerSender> pending_remote;
PendingAssociatedReceiver<IntegerSender> pending_receiver =
pending_remote.InitWithNewEndpointAndPassReceiver();
pending_receiver.EnableUnassociatedUsage();
IntegerSenderImpl impl(std::move(pending_receiver));
AssociatedRemote<IntegerSender> remote(std::move(pending_remote));
{
base::RunLoop run_loop;
impl.set_notify_send_method_called(
base::BindLambdaForTesting([&](int32_t x) {
EXPECT_EQ(88, x);
run_loop.Quit();
}));
remote->Send(88);
run_loop.Run();
}
{
base::RunLoop run_loop;
remote->Echo(888, base::BindLambdaForTesting([&](int32_t x) {
EXPECT_EQ(888, x);
run_loop.Quit();
}));
}
}
TEST_F(AssociatedInterfaceTest, AssociatedRemoteDedicatedPipe) {
PendingAssociatedRemote<IntegerSender> pending_remote;
PendingAssociatedReceiver<IntegerSender> pending_receiver =
pending_remote.InitWithNewEndpointAndPassReceiver();
IntegerSenderImpl impl(std::move(pending_receiver));
pending_remote.EnableUnassociatedUsage();
AssociatedRemote<IntegerSender> remote(std::move(pending_remote));
{
base::RunLoop run_loop;
impl.set_notify_send_method_called(
base::BindLambdaForTesting([&](int32_t x) {
EXPECT_EQ(88, x);
run_loop.Quit();
}));
remote->Send(88);
run_loop.Run();
}
{
base::RunLoop run_loop;
remote->Echo(888, base::BindLambdaForTesting([&](int32_t x) {
EXPECT_EQ(888, x);
run_loop.Quit();
}));
}
}
class ClumsyBinderImpl : public mojom::ClumsyBinder {
public:
explicit ClumsyBinderImpl(PendingReceiver<mojom::ClumsyBinder> receiver)
: receiver_(this, std::move(receiver)) {}
~ClumsyBinderImpl() override = default;
// mojom::ClumsyBinder:
void DropAssociatedBinder(
PendingAssociatedReceiver<mojom::AssociatedBinder> receiver) override {
// Nothing to do but drop the receiver so it's closed.
}
private:
Receiver<mojom::ClumsyBinder> receiver_;
};
TEST_F(AssociatedInterfaceTest, CloseSerializedAssociatedEndpoints) {
// Regression test for https://crbug.com/331636067. Verifies that endpoint
// lifetime is properly managed when associated endpoints are serialized into
// a message that gets dropped before transmission.
Remote<mojom::ClumsyBinder> binder;
ClumsyBinderImpl binder_impl(binder.BindNewPipeAndPassReceiver());
AssociatedRemote<mojom::AssociatedBinder> associated_binder;
binder->DropAssociatedBinder(
associated_binder.BindNewEndpointAndPassReceiver());
// Wait for disconnection to be observed. This way we know any subsequent
// outgoing messages on `associated_binder` will not be sent.
base::RunLoop loop1;
associated_binder.set_disconnect_handler(loop1.QuitClosure());
loop1.Run();
// Send another endpoint over. This receiver will be dropped, and the remote
// should be properly notified of peer closure to terminate this loop.
base::RunLoop loop2;
AssociatedRemote<mojom::AssociatedBinder> another_binder;
associated_binder->Bind(another_binder.BindNewEndpointAndPassReceiver());
another_binder.set_disconnect_handler(loop2.QuitClosure());
loop2.Run();
}
class TestSyncImpl : public TestSync {
public:
void Ping(PingCallback callback) override { std::move(callback).Run(); }
void NoInterruptPing(NoInterruptPingCallback callback) override {
std::move(callback).Run();
}
void Echo(int32_t value, EchoCallback callback) override {
std::move(callback).Run(value);
}
void AsyncEcho(int32_t, AsyncEchoCallback) override { NOTREACHED(); }
};
class TestSyncPrimaryImpl : public TestSyncPrimary, public TestSync {
public:
explicit TestSyncPrimaryImpl(PendingReceiver<TestSyncPrimary> receiver)
: receiver_(this, std::move(receiver)), test_sync_receiver_(this) {}
~TestSyncPrimaryImpl() override = default;
static constexpr int32_t kReceivedPing = 0b001;
static constexpr int32_t kSyncCallWasAborted = 0b010;
static constexpr int32_t kSyncCall2WasAborted = 0b100;
static constexpr int32_t kNoInterruptPingReplied = 0b1000;
void Ping(PingCallback callback) override {
result_ |= kReceivedPing;
std::move(callback).Run();
}
void NoInterruptPing(NoInterruptPingCallback) override { NOTREACHED(); }
void Echo(int32_t value, EchoCallback callback) override {
std::move(callback).Run(result_);
}
void AsyncEcho(int32_t, AsyncEchoCallback) override { NOTREACHED(); }
void SendRemote(PendingAssociatedRemote<TestSync> remote) override {
if (!test_sync_remote_.is_bound()) {
test_sync_remote_.Bind(std::move(remote));
} else {
test_sync_remote2_.Bind(std::move(remote));
}
CHECK(!test_sync_receiver_.is_bound());
}
void SendReceiver(PendingAssociatedReceiver<TestSync> receiver) override {
test_sync_receiver_.Bind(std::move(receiver));
{
base::ScopedAllowBaseSyncPrimitivesForTesting allow_sync;
DoTest();
}
}
virtual void DoTest() {
CHECK(test_sync_remote_.is_bound());
int reply = -1;
// If we got a second test sync remote, make the first sync call on that
// remote to verify behavior when the peer closed event comes in while
// we're blocked on a different interface.
if (test_sync_remote2_.is_bound()) {
if (test_sync_remote2_->NoInterruptPing()) {
result_ |= kNoInterruptPingReplied;
}
// Make another sync call on the secondary remote. This doesn't change
// anything for most test cases, but is important for
// TestHangOnDisconnectWithSignaledWatcher to make sure
// SequenceLocalSyncEventWatcher has cleared out its "ready" watchers.
test_sync_remote2_->Ping();
}
bool call_result = test_sync_remote_->Echo(123, &reply);
if (!call_result) {
result_ |= kSyncCallWasAborted;
}
// Make a second sync call to make sure that one gets correctly aborted as
// well.
call_result = test_sync_remote_->Echo(456, &reply);
if (!call_result) {
result_ |= kSyncCall2WasAborted;
}
}
protected:
Receiver<TestSyncPrimary> receiver_;
AssociatedRemote<TestSync> test_sync_remote_;
AssociatedRemote<TestSync> test_sync_remote2_;
AssociatedReceiver<TestSync> test_sync_receiver_;
int32_t result_ = 0;
};
// Regression test for https://crbug.com/435493653 and
// https://crbug.com/436965298. Verifies that sync calls made on an associated
// remote are correctly aborted if the receiver endpoint is closed even if other
// messages are queued on the same message pipe first.
TEST_F(AssociatedInterfaceTest, TestHangOnDisconnect) {
Remote<TestSyncPrimary> primary_remote;
base::SequenceBound<TestSyncPrimaryImpl> primary_impl(
base::ThreadPool::CreateSequencedTaskRunner({}),
primary_remote.BindNewPipeAndPassReceiver());
TestSyncImpl sync_impl;
AssociatedReceiver<TestSync> sync_receiver(&sync_impl);
AssociatedRemote<TestSync> sync_remote;
primary_remote->SendRemote(sync_receiver.BindNewEndpointAndPassRemote());
primary_remote->SendReceiver(sync_remote.BindNewEndpointAndPassReceiver());
sync_remote->Ping(base::DoNothing());
sync_remote.reset();
sync_receiver.reset();
base::test::TestFuture<int32_t> result;
primary_remote->Echo(0, result.GetCallback());
EXPECT_EQ(TestSyncPrimaryImpl::kReceivedPing |
TestSyncPrimaryImpl::kSyncCallWasAborted |
TestSyncPrimaryImpl::kSyncCall2WasAborted,
result.Get());
primary_impl.SynchronouslyResetForTest();
}
// Slight variation of the above test, where the peer disconnect happens while
// blocked on a different interface than the one being disconnected.
// Additionally this test makes sure the disconnect event arrives while blocked
// on a NoInterrupt sync call, since that code path is slightly more
// complicated.
TEST_F(AssociatedInterfaceTest, TestHangOnDisconnectDifferentEndpoint) {
Remote<TestSyncPrimary> primary_remote;
base::SequenceBound<TestSyncPrimaryImpl> primary_impl(
base::ThreadPool::CreateSequencedTaskRunner({}),
primary_remote.BindNewPipeAndPassReceiver());
TestSyncImpl sync_impl;
AssociatedReceiver<TestSync> sync_receiver(&sync_impl);
AssociatedReceiver<TestSync> sync_receiver2(&sync_impl);
AssociatedRemote<TestSync> sync_remote;
primary_remote->SendRemote(sync_receiver.BindNewEndpointAndPassRemote());
primary_remote->SendRemote(sync_receiver2.BindNewEndpointAndPassRemote());
primary_remote->SendReceiver(sync_remote.BindNewEndpointAndPassReceiver());
sync_remote.reset();
sync_receiver.reset();
base::test::TestFuture<int32_t> result;
primary_remote->Echo(0, result.GetCallback());
EXPECT_EQ(TestSyncPrimaryImpl::kSyncCallWasAborted |
TestSyncPrimaryImpl::kSyncCall2WasAborted |
TestSyncPrimaryImpl::kNoInterruptPingReplied,
result.Get());
primary_impl.SynchronouslyResetForTest();
}
// Variation of TestHangOnDisconnectDifferentEndpoint, that additionally sets
// things up such that MultiplexRouter::EndPoint will have its sync_watcher_
// populated at the time the peer closed event is handled, by carefully ordering
// the messages and events sent over the message pipe.
TEST_F(AssociatedInterfaceTest, TestHangOnDisconnectWithSignaledWatcher) {
Remote<TestSyncPrimary> primary_remote;
// By making one extra sync call before the rest of the test body we can make
// sure that the `sync_watcher_` field for the relevant end point has been
// initialized.
class TestSyncPrimaryImplWithExtraSyncCall : public TestSyncPrimaryImpl {
public:
using TestSyncPrimaryImpl::TestSyncPrimaryImpl;
void DoTest() override {
test_sync_remote_->Ping();
TestSyncPrimaryImpl::DoTest();
}
};
// For this test we need to make sure to disconnect the interface that will
// hang while a sync call on a secondary interface is being made. So override
// NoInterruptPing to do that disconnect.
class TestSyncImplWithCallback : public TestSyncImpl {
public:
void NoInterruptPing(NoInterruptPingCallback callback) override {
if (no_interrupt_ping_callback_) {
std::move(no_interrupt_ping_callback_).Run();
}
std::move(callback).Run();
}
base::OnceClosure no_interrupt_ping_callback_;
};
base::SequenceBound<TestSyncPrimaryImplWithExtraSyncCall> primary_impl(
base::ThreadPool::CreateSequencedTaskRunner({}),
primary_remote.BindNewPipeAndPassReceiver());
TestSyncImplWithCallback sync_impl;
AssociatedReceiver<TestSync> sync_receiver(&sync_impl);
AssociatedReceiver<TestSync> sync_receiver2(&sync_impl);
sync_impl.no_interrupt_ping_callback_ =
base::BindLambdaForTesting([&]() { sync_receiver.reset(); });
AssociatedRemote<TestSync> sync_remote;
primary_remote->SendRemote(sync_receiver.BindNewEndpointAndPassRemote());
primary_remote->SendRemote(sync_receiver2.BindNewEndpointAndPassRemote());
primary_remote->SendReceiver(sync_remote.BindNewEndpointAndPassReceiver());
base::test::TestFuture<int32_t> result;
primary_remote->Echo(0, result.GetCallback());
EXPECT_EQ(TestSyncPrimaryImpl::kSyncCallWasAborted |
TestSyncPrimaryImpl::kSyncCall2WasAborted |
TestSyncPrimaryImpl::kNoInterruptPingReplied,
result.Get());
primary_impl.SynchronouslyResetForTest();
}
} // namespace
} // namespace associated_interface_unittest
} // namespace test
} // namespace mojo