| // Copyright 2015 The Chromium Authors |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include <stddef.h> |
| #include <stdint.h> |
| |
| #include <algorithm> |
| #include <array> |
| #include <memory> |
| #include <tuple> |
| #include <utility> |
| |
| #include "base/functional/bind.h" |
| #include "base/functional/callback.h" |
| #include "base/functional/callback_helpers.h" |
| #include "base/memory/ptr_util.h" |
| #include "base/memory/raw_ptr.h" |
| #include "base/run_loop.h" |
| #include "base/synchronization/waitable_event.h" |
| #include "base/task/sequenced_task_runner.h" |
| #include "base/task/single_thread_task_runner.h" |
| #include "base/task/thread_pool.h" |
| #include "base/test/bind.h" |
| #include "base/test/scoped_feature_list.h" |
| #include "base/test/task_environment.h" |
| #include "base/test/test_future.h" |
| #include "base/threading/sequence_bound.h" |
| #include "base/threading/thread.h" |
| #include "base/threading/thread_restrictions.h" |
| #include "mojo/public/cpp/bindings/associated_receiver.h" |
| #include "mojo/public/cpp/bindings/associated_remote.h" |
| #include "mojo/public/cpp/bindings/lib/multiplex_router.h" |
| #include "mojo/public/cpp/bindings/pending_associated_receiver.h" |
| #include "mojo/public/cpp/bindings/pending_associated_remote.h" |
| #include "mojo/public/cpp/bindings/receiver.h" |
| #include "mojo/public/cpp/bindings/remote.h" |
| #include "mojo/public/cpp/bindings/shared_associated_remote.h" |
| #include "mojo/public/cpp/bindings/tests/associated_interface_unittest.test-mojom.h" |
| #include "mojo/public/cpp/bindings/unique_associated_receiver_set.h" |
| #include "mojo/public/cpp/system/functions.h" |
| #include "mojo/public/interfaces/bindings/tests/ping_service.test-mojom.h" |
| #include "mojo/public/interfaces/bindings/tests/test_associated_interfaces.test-mojom.h" |
| #include "mojo/public/interfaces/bindings/tests/test_sync_methods.test-mojom.h" |
| #include "testing/gtest/include/gtest/gtest.h" |
| |
| namespace mojo { |
| namespace test { |
| namespace associated_interface_unittest { |
| namespace { |
| |
| using mojo::internal::MultiplexRouter; |
| |
| class IntegerSenderImpl : public IntegerSender { |
| public: |
| IntegerSenderImpl() = default; |
| explicit IntegerSenderImpl(PendingAssociatedReceiver<IntegerSender> receiver) |
| : receiver_(this, std::move(receiver)) {} |
| |
| ~IntegerSenderImpl() override = default; |
| |
| void set_notify_send_method_called( |
| base::RepeatingCallback<void(int32_t)> callback) { |
| notify_send_method_called_ = std::move(callback); |
| } |
| |
| void Echo(int32_t value, EchoCallback callback) override { |
| if (value == -1) { |
| receiver_.ReportBadMessage("Reporting bad message for value == -1"); |
| return; |
| } |
| std::move(callback).Run(value); |
| } |
| void Send(int32_t value) override { notify_send_method_called_.Run(value); } |
| |
| AssociatedReceiver<IntegerSender>* receiver() { return &receiver_; } |
| |
| private: |
| AssociatedReceiver<IntegerSender> receiver_{this}; |
| base::RepeatingCallback<void(int32_t)> notify_send_method_called_; |
| }; |
| |
| class IntegerSenderConnectionImpl : public IntegerSenderConnection { |
| public: |
| explicit IntegerSenderConnectionImpl( |
| PendingReceiver<IntegerSenderConnection> receiver) |
| : receiver_(this, std::move(receiver)) {} |
| |
| ~IntegerSenderConnectionImpl() override = default; |
| |
| void GetSender(PendingAssociatedReceiver<IntegerSender> receiver) override { |
| DCHECK(receiver.is_valid()); |
| senders_.Add(std::make_unique<IntegerSenderImpl>(), std::move(receiver)); |
| } |
| |
| void AsyncGetSender(AsyncGetSenderCallback callback) override { |
| PendingAssociatedRemote<IntegerSender> remote; |
| GetSender(remote.InitWithNewEndpointAndPassReceiver()); |
| std::move(callback).Run(std::move(remote)); |
| } |
| |
| Receiver<IntegerSenderConnection>* receiver() { return &receiver_; } |
| |
| private: |
| Receiver<IntegerSenderConnection> receiver_; |
| UniqueAssociatedReceiverSet<IntegerSender> senders_; |
| }; |
| |
| class AssociatedInterfaceTest : public testing::Test { |
| public: |
| AssociatedInterfaceTest() |
| : main_runner_(base::SingleThreadTaskRunner::GetCurrentDefault()) {} |
| ~AssociatedInterfaceTest() override = default; |
| |
| template <typename T> |
| PendingAssociatedRemote<T> EmulatePassingAssociatedRemote( |
| PendingAssociatedRemote<T> remote, |
| scoped_refptr<MultiplexRouter> source, |
| scoped_refptr<MultiplexRouter> target) { |
| ScopedInterfaceEndpointHandle handle = remote.PassHandle(); |
| CHECK(handle.pending_association()); |
| auto id = source->AssociateInterface(std::move(handle)); |
| return PendingAssociatedRemote<T>(target->CreateLocalEndpointHandle(id), |
| remote.version()); |
| } |
| |
| void CreateRouterPair(scoped_refptr<MultiplexRouter>* router0, |
| scoped_refptr<MultiplexRouter>* router1) { |
| MessagePipe pipe; |
| *router0 = MultiplexRouter::CreateAndStartReceiving( |
| std::move(pipe.handle0), MultiplexRouter::MULTI_INTERFACE, true, |
| main_runner_); |
| *router1 = MultiplexRouter::CreateAndStartReceiving( |
| std::move(pipe.handle1), MultiplexRouter::MULTI_INTERFACE, false, |
| main_runner_); |
| } |
| |
| void CreateIntegerSenderWithExistingRouters( |
| scoped_refptr<MultiplexRouter> router0, |
| PendingAssociatedRemote<IntegerSender>* remote0, |
| scoped_refptr<MultiplexRouter> router1, |
| PendingAssociatedReceiver<IntegerSender>* receiver1) { |
| *receiver1 = remote0->InitWithNewEndpointAndPassReceiver(); |
| *remote0 = |
| EmulatePassingAssociatedRemote(std::move(*remote0), router1, router0); |
| } |
| |
| void CreateIntegerSender(PendingAssociatedRemote<IntegerSender>* remote, |
| PendingAssociatedReceiver<IntegerSender>* receiver) { |
| scoped_refptr<MultiplexRouter> router0; |
| scoped_refptr<MultiplexRouter> router1; |
| CreateRouterPair(&router0, &router1); |
| CreateIntegerSenderWithExistingRouters(router1, remote, router0, receiver); |
| } |
| |
| private: |
| base::test::TaskEnvironment task_environment; |
| scoped_refptr<base::SequencedTaskRunner> main_runner_; |
| }; |
| |
| void Fail() { |
| FAIL() << "Unexpected connection error"; |
| } |
| |
| TEST_F(AssociatedInterfaceTest, InterfacesAtBothEnds) { |
| // Bind to the same pipe two associated interfaces, whose implementation lives |
| // at different ends. Test that the two don't interfere with each other. |
| |
| scoped_refptr<MultiplexRouter> router0; |
| scoped_refptr<MultiplexRouter> router1; |
| CreateRouterPair(&router0, &router1); |
| |
| PendingAssociatedReceiver<IntegerSender> receiver; |
| PendingAssociatedRemote<IntegerSender> remote; |
| |
| CreateIntegerSenderWithExistingRouters(router1, &remote, router0, &receiver); |
| IntegerSenderImpl impl0(std::move(receiver)); |
| AssociatedRemote<IntegerSender> remote0(std::move(remote)); |
| |
| CreateIntegerSenderWithExistingRouters(router0, &remote, router1, &receiver); |
| IntegerSenderImpl impl1(std::move(receiver)); |
| AssociatedRemote<IntegerSender> remote1(std::move(remote)); |
| |
| base::RunLoop run_loop, run_loop2; |
| bool remote0_callback_run = false; |
| remote0->Echo(123, base::BindLambdaForTesting([&](int32_t value) { |
| EXPECT_EQ(123, value); |
| remote0_callback_run = true; |
| run_loop.Quit(); |
| })); |
| |
| bool remote1_callback_run = false; |
| remote1->Echo(456, base::BindLambdaForTesting([&](int32_t value) { |
| EXPECT_EQ(456, value); |
| remote1_callback_run = true; |
| run_loop2.Quit(); |
| })); |
| |
| run_loop.Run(); |
| run_loop2.Run(); |
| EXPECT_TRUE(remote0_callback_run); |
| EXPECT_TRUE(remote1_callback_run); |
| |
| bool remote0_disconnect_handler_run = false; |
| base::RunLoop run_loop3; |
| remote0.set_disconnect_handler(base::BindLambdaForTesting([&] { |
| remote0_disconnect_handler_run = true; |
| run_loop3.Quit(); |
| })); |
| |
| impl0.receiver()->reset(); |
| run_loop3.Run(); |
| EXPECT_TRUE(remote0_disconnect_handler_run); |
| |
| bool remote1_disconnect_handler_run = false; |
| base::RunLoop run_loop4; |
| impl1.receiver()->set_disconnect_handler(base::BindLambdaForTesting([&] { |
| remote1_disconnect_handler_run = true; |
| run_loop4.Quit(); |
| })); |
| |
| remote1.reset(); |
| run_loop4.Run(); |
| EXPECT_TRUE(remote1_disconnect_handler_run); |
| } |
| |
| class TestSender { |
| public: |
| TestSender() |
| : task_runner_(base::ThreadPool::CreateSequencedTaskRunner({})), |
| next_sender_(nullptr), |
| max_value_to_send_(-1) {} |
| |
| // The following three methods are called on the corresponding sender thread. |
| void SetUp(PendingAssociatedRemote<IntegerSender> remote, |
| TestSender* next_sender, |
| int32_t max_value_to_send) { |
| CHECK(task_runner()->RunsTasksInCurrentSequence()); |
| |
| remote_.Bind(std::move(remote)); |
| next_sender_ = next_sender ? next_sender : this; |
| max_value_to_send_ = max_value_to_send; |
| } |
| |
| void Send(int32_t value) { |
| CHECK(task_runner()->RunsTasksInCurrentSequence()); |
| |
| if (value > max_value_to_send_) |
| return; |
| |
| remote_->Send(value); |
| |
| next_sender_->task_runner()->PostTask( |
| FROM_HERE, base::BindOnce(&TestSender::Send, |
| base::Unretained(next_sender_), ++value)); |
| } |
| |
| void TearDown() { |
| CHECK(task_runner()->RunsTasksInCurrentSequence()); |
| |
| remote_.reset(); |
| } |
| |
| base::SequencedTaskRunner* task_runner() { return task_runner_.get(); } |
| |
| private: |
| scoped_refptr<base::SequencedTaskRunner> task_runner_; |
| raw_ptr<TestSender> next_sender_; |
| int32_t max_value_to_send_; |
| |
| AssociatedRemote<IntegerSender> remote_; |
| }; |
| |
| class TestReceiver { |
| public: |
| TestReceiver() |
| : task_runner_(base::ThreadPool::CreateSequencedTaskRunner({})), |
| expected_calls_(0) {} |
| |
| void SetUp(PendingAssociatedReceiver<IntegerSender> receiver0, |
| PendingAssociatedReceiver<IntegerSender> receiver1, |
| size_t expected_calls, |
| base::OnceClosure notify_finish) { |
| CHECK(task_runner()->RunsTasksInCurrentSequence()); |
| |
| impl0_ = std::make_unique<IntegerSenderImpl>(std::move(receiver0)); |
| impl0_->set_notify_send_method_called(base::BindRepeating( |
| &TestReceiver::SendMethodCalled, base::Unretained(this))); |
| impl1_ = std::make_unique<IntegerSenderImpl>(std::move(receiver1)); |
| impl1_->set_notify_send_method_called(base::BindRepeating( |
| &TestReceiver::SendMethodCalled, base::Unretained(this))); |
| |
| expected_calls_ = expected_calls; |
| notify_finish_ = std::move(notify_finish); |
| } |
| |
| void TearDown() { |
| CHECK(task_runner()->RunsTasksInCurrentSequence()); |
| |
| impl0_.reset(); |
| impl1_.reset(); |
| } |
| |
| base::SequencedTaskRunner* task_runner() { return task_runner_.get(); } |
| const std::vector<int32_t>& values() const { return values_; } |
| |
| private: |
| void SendMethodCalled(int32_t value) { |
| values_.push_back(value); |
| |
| if (values_.size() >= expected_calls_) |
| std::move(notify_finish_).Run(); |
| } |
| |
| scoped_refptr<base::SequencedTaskRunner> task_runner_; |
| size_t expected_calls_; |
| |
| std::unique_ptr<IntegerSenderImpl> impl0_; |
| std::unique_ptr<IntegerSenderImpl> impl1_; |
| |
| std::vector<int32_t> values_; |
| |
| base::OnceClosure notify_finish_; |
| }; |
| |
| class NotificationCounter { |
| public: |
| NotificationCounter(size_t total_count, base::OnceClosure notify_finish) |
| : total_count_(total_count), |
| current_count_(0), |
| notify_finish_(std::move(notify_finish)) {} |
| |
| ~NotificationCounter() = default; |
| |
| // Okay to call from any thread. |
| void OnGotNotification() { |
| bool finshed = false; |
| { |
| base::AutoLock locker(lock_); |
| CHECK_LT(current_count_, total_count_); |
| current_count_++; |
| finshed = current_count_ == total_count_; |
| } |
| |
| if (finshed) |
| std::move(notify_finish_).Run(); |
| } |
| |
| private: |
| base::Lock lock_; |
| const size_t total_count_; |
| size_t current_count_; |
| base::OnceClosure notify_finish_; |
| }; |
| |
| TEST_F(AssociatedInterfaceTest, MultiThreadAccess) { |
| // Set up four associated interfaces on a message pipe. Use the inteface |
| // pointers on four threads in parallel; run the interface implementations on |
| // two threads. Test that multi-threaded access works. |
| |
| const int32_t kMaxValue = 1000; |
| MessagePipe pipe; |
| scoped_refptr<MultiplexRouter> router0; |
| scoped_refptr<MultiplexRouter> router1; |
| CreateRouterPair(&router0, &router1); |
| |
| std::array<PendingAssociatedReceiver<IntegerSender>, 4> pending_receivers; |
| std::array<PendingAssociatedRemote<IntegerSender>, 4> pending_remotes; |
| for (size_t i = 0; i < 4; ++i) { |
| CreateIntegerSenderWithExistingRouters(router1, &pending_remotes[i], |
| router0, &pending_receivers[i]); |
| } |
| |
| std::array<TestSender, 4> senders; |
| for (size_t i = 0; i < 4; ++i) { |
| senders[i].task_runner()->PostTask( |
| FROM_HERE, |
| base::BindOnce(&TestSender::SetUp, base::Unretained(&senders[i]), |
| std::move(pending_remotes[i]), nullptr, |
| kMaxValue * (i + 1) / 4)); |
| } |
| |
| base::RunLoop run_loop; |
| std::array<TestReceiver, 2> receivers; |
| NotificationCounter counter(2, run_loop.QuitClosure()); |
| for (size_t i = 0; i < 2; ++i) { |
| receivers[i].task_runner()->PostTask( |
| FROM_HERE, |
| base::BindOnce(&TestReceiver::SetUp, base::Unretained(&receivers[i]), |
| std::move(pending_receivers[2 * i]), |
| std::move(pending_receivers[2 * i + 1]), |
| static_cast<size_t>(kMaxValue / 2), |
| base::BindOnce(&NotificationCounter::OnGotNotification, |
| base::Unretained(&counter)))); |
| } |
| |
| for (size_t i = 0; i < 4; ++i) { |
| senders[i].task_runner()->PostTask( |
| FROM_HERE, |
| base::BindOnce(&TestSender::Send, base::Unretained(&senders[i]), |
| kMaxValue * i / 4 + 1)); |
| } |
| |
| run_loop.Run(); |
| |
| for (size_t i = 0; i < 4; ++i) { |
| base::RunLoop run_loop2; |
| senders[i].task_runner()->PostTaskAndReply( |
| FROM_HERE, |
| base::BindOnce(&TestSender::TearDown, base::Unretained(&senders[i])), |
| run_loop2.QuitClosure()); |
| run_loop2.Run(); |
| } |
| |
| for (size_t i = 0; i < 2; ++i) { |
| base::RunLoop run_loop2; |
| receivers[i].task_runner()->PostTaskAndReply( |
| FROM_HERE, |
| base::BindOnce(&TestReceiver::TearDown, |
| base::Unretained(&receivers[i])), |
| run_loop2.QuitClosure()); |
| run_loop2.Run(); |
| } |
| |
| EXPECT_EQ(static_cast<size_t>(kMaxValue / 2), receivers[0].values().size()); |
| EXPECT_EQ(static_cast<size_t>(kMaxValue / 2), receivers[1].values().size()); |
| |
| std::vector<int32_t> all_values; |
| all_values.insert(all_values.end(), receivers[0].values().begin(), |
| receivers[0].values().end()); |
| all_values.insert(all_values.end(), receivers[1].values().begin(), |
| receivers[1].values().end()); |
| |
| std::sort(all_values.begin(), all_values.end()); |
| for (size_t i = 0; i < all_values.size(); ++i) |
| ASSERT_EQ(static_cast<int32_t>(i + 1), all_values[i]); |
| } |
| |
| TEST_F(AssociatedInterfaceTest, FIFO) { |
| // Set up four associated interfaces on a message pipe. Use the inteface |
| // pointers on four threads; run the interface implementations on two threads. |
| // Take turns to make calls using the four pointers. Test that FIFO-ness is |
| // preserved. |
| |
| const int32_t kMaxValue = 100; |
| MessagePipe pipe; |
| scoped_refptr<MultiplexRouter> router0; |
| scoped_refptr<MultiplexRouter> router1; |
| CreateRouterPair(&router0, &router1); |
| |
| std::array<PendingAssociatedReceiver<IntegerSender>, 4> pending_receivers; |
| std::array<PendingAssociatedRemote<IntegerSender>, 4> pending_remotes; |
| for (size_t i = 0; i < 4; ++i) { |
| CreateIntegerSenderWithExistingRouters(router1, &pending_remotes[i], |
| router0, &pending_receivers[i]); |
| } |
| |
| std::array<TestSender, 4> senders; |
| for (size_t i = 0; i < 4; ++i) { |
| senders[i].task_runner()->PostTask( |
| FROM_HERE, |
| base::BindOnce(&TestSender::SetUp, base::Unretained(&senders[i]), |
| std::move(pending_remotes[i]), |
| base::Unretained(&senders[(i + 1) % 4]), kMaxValue)); |
| } |
| |
| base::RunLoop run_loop; |
| std::array<TestReceiver, 2> receivers; |
| NotificationCounter counter(2, run_loop.QuitClosure()); |
| for (size_t i = 0; i < 2; ++i) { |
| receivers[i].task_runner()->PostTask( |
| FROM_HERE, |
| base::BindOnce(&TestReceiver::SetUp, base::Unretained(&receivers[i]), |
| std::move(pending_receivers[2 * i]), |
| std::move(pending_receivers[2 * i + 1]), |
| static_cast<size_t>(kMaxValue / 2), |
| base::BindOnce(&NotificationCounter::OnGotNotification, |
| base::Unretained(&counter)))); |
| } |
| |
| senders[0].task_runner()->PostTask( |
| FROM_HERE, |
| base::BindOnce(&TestSender::Send, base::Unretained(&senders[0]), 1)); |
| |
| run_loop.Run(); |
| |
| for (size_t i = 0; i < 4; ++i) { |
| base::RunLoop run_loop2; |
| senders[i].task_runner()->PostTaskAndReply( |
| FROM_HERE, |
| base::BindOnce(&TestSender::TearDown, base::Unretained(&senders[i])), |
| run_loop2.QuitClosure()); |
| run_loop2.Run(); |
| } |
| |
| for (size_t i = 0; i < 2; ++i) { |
| base::RunLoop run_loop2; |
| receivers[i].task_runner()->PostTaskAndReply( |
| FROM_HERE, |
| base::BindOnce(&TestReceiver::TearDown, |
| base::Unretained(&receivers[i])), |
| run_loop2.QuitClosure()); |
| run_loop2.Run(); |
| } |
| |
| EXPECT_EQ(static_cast<size_t>(kMaxValue / 2), receivers[0].values().size()); |
| EXPECT_EQ(static_cast<size_t>(kMaxValue / 2), receivers[1].values().size()); |
| |
| for (size_t i = 0; i < 2; ++i) { |
| for (size_t j = 1; j < receivers[i].values().size(); ++j) |
| EXPECT_LT(receivers[i].values()[j - 1], receivers[i].values()[j]); |
| } |
| } |
| |
| TEST_F(AssociatedInterfaceTest, PassAssociatedInterfaces) { |
| Remote<IntegerSenderConnection> connection_remote; |
| IntegerSenderConnectionImpl connection( |
| connection_remote.BindNewPipeAndPassReceiver()); |
| |
| AssociatedRemote<IntegerSender> sender0; |
| connection_remote->GetSender(sender0.BindNewEndpointAndPassReceiver()); |
| |
| base::RunLoop run_loop; |
| sender0->Echo(123, base::BindLambdaForTesting([&](int32_t value) { |
| EXPECT_EQ(123, value); |
| run_loop.Quit(); |
| })); |
| run_loop.Run(); |
| |
| AssociatedRemote<IntegerSender> sender1; |
| base::RunLoop run_loop2; |
| connection_remote->AsyncGetSender(base::BindLambdaForTesting( |
| [&](PendingAssociatedRemote<IntegerSender> sender) { |
| sender1.Bind(std::move(sender)); |
| run_loop2.Quit(); |
| })); |
| run_loop2.Run(); |
| EXPECT_TRUE(sender1); |
| |
| base::RunLoop run_loop3; |
| sender1->Echo(456, base::BindLambdaForTesting([&](int32_t value) { |
| EXPECT_EQ(456, value); |
| run_loop3.Quit(); |
| })); |
| run_loop3.Run(); |
| } |
| |
| TEST_F(AssociatedInterfaceTest, |
| ReceiverWaitAndPauseWhenNoAssociatedInterfaces) { |
| Remote<IntegerSenderConnection> connection_remote; |
| IntegerSenderConnectionImpl connection( |
| connection_remote.BindNewPipeAndPassReceiver()); |
| |
| AssociatedRemote<IntegerSender> sender0; |
| connection_remote->GetSender(sender0.BindNewEndpointAndPassReceiver()); |
| |
| EXPECT_FALSE( |
| connection.receiver()->internal_state()->HasAssociatedInterfaces()); |
| |
| // There are no associated interfaces running on the pipe yet. It is okay to |
| // pause. |
| connection.receiver()->Pause(); |
| connection.receiver()->Resume(); |
| |
| // There are no associated interfaces running on the pipe yet. It is okay to |
| // wait. |
| EXPECT_TRUE(connection.receiver()->WaitForIncomingCall()); |
| |
| // The previous wait has dispatched the GetSender request message, therefore |
| // an associated interface has been set up on the pipe. It is not allowed to |
| // wait or pause. |
| EXPECT_TRUE( |
| connection.receiver()->internal_state()->HasAssociatedInterfaces()); |
| } |
| |
| class PingServiceImpl : public PingService { |
| public: |
| explicit PingServiceImpl(PendingAssociatedReceiver<PingService> receiver) |
| : receiver_(this, std::move(receiver)) {} |
| ~PingServiceImpl() override = default; |
| |
| AssociatedReceiver<PingService>& receiver() { return receiver_; } |
| |
| void set_ping_handler(base::RepeatingClosure handler) { |
| ping_handler_ = std::move(handler); |
| } |
| |
| // PingService: |
| void Ping(PingCallback callback) override { |
| if (ping_handler_) |
| ping_handler_.Run(); |
| std::move(callback).Run(); |
| } |
| |
| private: |
| AssociatedReceiver<PingService> receiver_; |
| base::RepeatingClosure ping_handler_; |
| }; |
| |
| class PingProviderImpl : public AssociatedPingProvider { |
| public: |
| explicit PingProviderImpl(PendingReceiver<AssociatedPingProvider> receiver) |
| : receiver_(this, std::move(receiver)) {} |
| ~PingProviderImpl() override = default; |
| |
| // AssociatedPingProvider: |
| void GetPing(PendingAssociatedReceiver<PingService> receiver) override { |
| ping_services_.emplace_back(new PingServiceImpl(std::move(receiver))); |
| |
| if (expected_receivers_count_ > 0 && |
| ping_services_.size() == expected_receivers_count_ && quit_waiting_) { |
| expected_receivers_count_ = 0; |
| std::move(quit_waiting_).Run(); |
| } |
| } |
| |
| std::vector<std::unique_ptr<PingServiceImpl>>& ping_services() { |
| return ping_services_; |
| } |
| |
| void WaitForReceivers(size_t count) { |
| DCHECK(!quit_waiting_); |
| expected_receivers_count_ = count; |
| base::RunLoop loop; |
| quit_waiting_ = loop.QuitClosure(); |
| loop.Run(); |
| } |
| |
| private: |
| Receiver<AssociatedPingProvider> receiver_; |
| std::vector<std::unique_ptr<PingServiceImpl>> ping_services_; |
| size_t expected_receivers_count_ = 0; |
| base::OnceClosure quit_waiting_; |
| }; |
| |
| class CallbackFilter : public MessageFilter { |
| public: |
| explicit CallbackFilter(base::RepeatingClosure callback) |
| : callback_(std::move(callback)) {} |
| ~CallbackFilter() override = default; |
| |
| static std::unique_ptr<CallbackFilter> Wrap(base::RepeatingClosure callback) { |
| return std::make_unique<CallbackFilter>(std::move(callback)); |
| } |
| |
| // MessageFilter: |
| bool WillDispatch(Message* message) override { |
| callback_.Run(); |
| return true; |
| } |
| |
| void DidDispatchOrReject(Message* message, bool accepted) override {} |
| |
| private: |
| base::RepeatingClosure callback_; |
| }; |
| |
| // Verifies that filters work as expected on associated receivers, i.e. that |
| // they're notified in order, before dispatch; and that each associated |
| // receiver in a group operates with its own set of filters. |
| TEST_F(AssociatedInterfaceTest, ReceiverWithFilters) { |
| Remote<AssociatedPingProvider> provider; |
| PingProviderImpl provider_impl(provider.BindNewPipeAndPassReceiver()); |
| |
| AssociatedRemote<PingService> ping_a, ping_b; |
| provider->GetPing(ping_a.BindNewEndpointAndPassReceiver()); |
| provider->GetPing(ping_b.BindNewEndpointAndPassReceiver()); |
| provider_impl.WaitForReceivers(2); |
| |
| ASSERT_EQ(2u, provider_impl.ping_services().size()); |
| PingServiceImpl& ping_a_impl = *provider_impl.ping_services()[0]; |
| PingServiceImpl& ping_b_impl = *provider_impl.ping_services()[1]; |
| |
| int a_status = 0; |
| int b_status = 0; |
| |
| ping_a_impl.receiver().SetFilter( |
| CallbackFilter::Wrap(base::BindLambdaForTesting([&] { |
| EXPECT_EQ(0, a_status); |
| EXPECT_EQ(0, b_status); |
| a_status = 1; |
| }))); |
| |
| ping_b_impl.receiver().SetFilter( |
| CallbackFilter::Wrap(base::BindLambdaForTesting([&] { |
| EXPECT_EQ(1, a_status); |
| EXPECT_EQ(0, b_status); |
| b_status = 1; |
| }))); |
| |
| for (int i = 0; i < 10; ++i) { |
| a_status = 0; |
| b_status = 0; |
| |
| { |
| base::RunLoop loop; |
| ping_a->Ping(loop.QuitClosure()); |
| loop.Run(); |
| } |
| |
| EXPECT_EQ(1, a_status); |
| EXPECT_EQ(0, b_status); |
| |
| { |
| base::RunLoop loop; |
| ping_b->Ping(loop.QuitClosure()); |
| loop.Run(); |
| } |
| |
| EXPECT_EQ(1, a_status); |
| EXPECT_EQ(1, b_status); |
| } |
| } |
| |
| TEST_F(AssociatedInterfaceTest, AssociatedRemoteFlushForTesting) { |
| PendingAssociatedReceiver<IntegerSender> receiver; |
| PendingAssociatedRemote<IntegerSender> remote; |
| CreateIntegerSender(&remote, &receiver); |
| |
| IntegerSenderImpl impl0(std::move(receiver)); |
| AssociatedRemote<IntegerSender> remote0(std::move(remote)); |
| remote0.set_disconnect_handler(base::BindOnce(&Fail)); |
| |
| bool remote0_callback_run = false; |
| remote0->Echo(123, base::BindLambdaForTesting([&](int32_t value) { |
| EXPECT_EQ(123, value); |
| remote0_callback_run = true; |
| })); |
| remote0.FlushForTesting(); |
| EXPECT_TRUE(remote0_callback_run); |
| } |
| |
| TEST_F(AssociatedInterfaceTest, AssociatedRemoteFlushForTestingWithClosedPeer) { |
| PendingAssociatedReceiver<IntegerSender> receiver; |
| PendingAssociatedRemote<IntegerSender> remote; |
| CreateIntegerSender(&remote, &receiver); |
| |
| AssociatedRemote<IntegerSender> remote0(std::move(remote)); |
| bool called = false; |
| remote0.set_disconnect_handler( |
| base::BindLambdaForTesting([&] { called = true; })); |
| receiver.reset(); |
| |
| remote0.FlushForTesting(); |
| EXPECT_TRUE(called); |
| remote0.FlushForTesting(); |
| } |
| |
| TEST_F(AssociatedInterfaceTest, AssociatedBindingFlushForTesting) { |
| PendingAssociatedReceiver<IntegerSender> receiver; |
| PendingAssociatedRemote<IntegerSender> remote; |
| CreateIntegerSender(&remote, &receiver); |
| |
| IntegerSenderImpl impl0(std::move(receiver)); |
| impl0.receiver()->set_disconnect_handler(base::BindOnce(&Fail)); |
| AssociatedRemote<IntegerSender> remote0(std::move(remote)); |
| |
| bool remote0_callback_run = false; |
| remote0->Echo(123, base::BindLambdaForTesting([&](int32_t value) { |
| EXPECT_EQ(123, value); |
| remote0_callback_run = true; |
| })); |
| |
| // Because the flush is sent from the receiver, it only guarantees that the |
| // request has been received, not the response. The second flush waits for the |
| // response to be received. |
| impl0.receiver()->FlushForTesting(); |
| impl0.receiver()->FlushForTesting(); |
| EXPECT_TRUE(remote0_callback_run); |
| } |
| |
| TEST_F(AssociatedInterfaceTest, |
| AssociatedReceiverFlushForTestingWithClosedPeer) { |
| scoped_refptr<MultiplexRouter> router0; |
| scoped_refptr<MultiplexRouter> router1; |
| CreateRouterPair(&router0, &router1); |
| |
| PendingAssociatedReceiver<IntegerSender> receiver; |
| { |
| PendingAssociatedRemote<IntegerSender> remote; |
| CreateIntegerSenderWithExistingRouters(router1, &remote, router0, |
| &receiver); |
| } |
| |
| IntegerSenderImpl impl(std::move(receiver)); |
| bool called = false; |
| impl.receiver()->set_disconnect_handler( |
| base::BindLambdaForTesting([&] { called = true; })); |
| impl.receiver()->FlushForTesting(); |
| EXPECT_TRUE(called); |
| impl.receiver()->FlushForTesting(); |
| } |
| |
| TEST_F(AssociatedInterfaceTest, ReceiverFlushForTesting) { |
| Remote<IntegerSenderConnection> remote; |
| IntegerSenderConnectionImpl impl(remote.BindNewPipeAndPassReceiver()); |
| bool called = false; |
| remote->AsyncGetSender(base::BindLambdaForTesting( |
| [&](PendingAssociatedRemote<IntegerSender> remote) { called = true; })); |
| EXPECT_FALSE(called); |
| impl.receiver()->set_disconnect_handler(base::BindOnce(&Fail)); |
| |
| // Because the flush is sent from the receiver, it only guarantees that the |
| // request has been received, not the response. The second flush waits for the |
| // response to be received. |
| impl.receiver()->FlushForTesting(); |
| impl.receiver()->FlushForTesting(); |
| |
| EXPECT_TRUE(called); |
| } |
| |
| TEST_F(AssociatedInterfaceTest, ReceiverFlushForTestingWithClosedPeer) { |
| Remote<IntegerSenderConnection> remote; |
| IntegerSenderConnectionImpl impl(remote.BindNewPipeAndPassReceiver()); |
| bool called = false; |
| impl.receiver()->set_disconnect_handler( |
| base::BindLambdaForTesting([&] { called = true; })); |
| remote.reset(); |
| EXPECT_FALSE(called); |
| impl.receiver()->FlushForTesting(); |
| EXPECT_TRUE(called); |
| impl.receiver()->FlushForTesting(); |
| } |
| |
| TEST_F(AssociatedInterfaceTest, RemoteFlushForTesting) { |
| Remote<IntegerSenderConnection> remote; |
| IntegerSenderConnectionImpl impl(remote.BindNewPipeAndPassReceiver()); |
| bool called = false; |
| remote.set_disconnect_handler(base::BindOnce(&Fail)); |
| remote->AsyncGetSender(base::BindLambdaForTesting( |
| [&](PendingAssociatedRemote<IntegerSender> remote) { called = true; })); |
| EXPECT_FALSE(called); |
| remote.FlushForTesting(); |
| EXPECT_TRUE(called); |
| } |
| |
| TEST_F(AssociatedInterfaceTest, RemoteFlushForTestingWithClosedPeer) { |
| Remote<IntegerSenderConnection> remote; |
| std::ignore = remote.BindNewPipeAndPassReceiver(); |
| bool called = false; |
| remote.set_disconnect_handler( |
| base::BindLambdaForTesting([&] { called = true; })); |
| EXPECT_FALSE(called); |
| remote.FlushForTesting(); |
| EXPECT_TRUE(called); |
| remote.FlushForTesting(); |
| } |
| |
| TEST_F(AssociatedInterfaceTest, AssociatedReceiverConnectionErrorWithReason) { |
| PendingAssociatedReceiver<IntegerSender> pending_receiver; |
| PendingAssociatedRemote<IntegerSender> pending_remote; |
| CreateIntegerSender(&pending_remote, &pending_receiver); |
| |
| IntegerSenderImpl impl(std::move(pending_receiver)); |
| AssociatedRemote<IntegerSender> remote(std::move(pending_remote)); |
| |
| base::RunLoop run_loop; |
| impl.receiver()->set_disconnect_with_reason_handler( |
| base::BindLambdaForTesting( |
| [&](uint32_t custom_reason, const std::string& description) { |
| EXPECT_EQ(123u, custom_reason); |
| EXPECT_EQ("farewell", description); |
| run_loop.Quit(); |
| })); |
| |
| remote.ResetWithReason(123u, "farewell"); |
| |
| run_loop.Run(); |
| } |
| |
| TEST_F(AssociatedInterfaceTest, |
| PendingAssociatedReceiverConnectionErrorWithReason) { |
| // Test that AssociatedReceiver is notified with connection error when the |
| // interface hasn't associated with a message pipe and the peer is closed. |
| |
| AssociatedRemote<IntegerSender> remote; |
| IntegerSenderImpl impl(remote.BindNewEndpointAndPassReceiver()); |
| |
| base::RunLoop run_loop; |
| impl.receiver()->set_disconnect_with_reason_handler( |
| base::BindLambdaForTesting( |
| [&](uint32_t custom_reason, const std::string& description) { |
| EXPECT_EQ(123u, custom_reason); |
| EXPECT_EQ("farewell", description); |
| run_loop.Quit(); |
| })); |
| |
| remote.ResetWithReason(123u, "farewell"); |
| run_loop.Run(); |
| } |
| |
| TEST_F(AssociatedInterfaceTest, AssociatedRemoteConnectionErrorWithReason) { |
| PendingAssociatedReceiver<IntegerSender> pending_receiver; |
| PendingAssociatedRemote<IntegerSender> pending_remote; |
| CreateIntegerSender(&pending_remote, &pending_receiver); |
| |
| IntegerSenderImpl impl(std::move(pending_receiver)); |
| AssociatedRemote<IntegerSender> remote(std::move(pending_remote)); |
| |
| base::RunLoop run_loop; |
| remote.set_disconnect_with_reason_handler(base::BindLambdaForTesting( |
| [&](uint32_t custom_reason, const std::string& description) { |
| EXPECT_EQ(456u, custom_reason); |
| EXPECT_EQ("farewell", description); |
| run_loop.Quit(); |
| })); |
| |
| impl.receiver()->ResetWithReason(456u, "farewell"); |
| run_loop.Run(); |
| } |
| |
| TEST_F(AssociatedInterfaceTest, |
| PendingAssociatedRemoteConnectionErrorWithReason) { |
| // Test that AssociatedInterfacePtr is notified with connection error when the |
| // interface hasn't associated with a message pipe and the peer is closed. |
| |
| AssociatedRemote<IntegerSender> remote; |
| auto pending_receiver = remote.BindNewEndpointAndPassReceiver(); |
| |
| base::RunLoop run_loop; |
| remote.set_disconnect_with_reason_handler(base::BindLambdaForTesting( |
| [&](uint32_t custom_reason, const std::string& description) { |
| EXPECT_EQ(456u, custom_reason); |
| EXPECT_EQ("farewell", description); |
| run_loop.Quit(); |
| })); |
| |
| pending_receiver.ResetWithReason(456u, "farewell"); |
| run_loop.Run(); |
| } |
| |
| TEST_F(AssociatedInterfaceTest, AssociatedRequestResetWithReason) { |
| PendingAssociatedReceiver<IntegerSender> pending_receiver; |
| PendingAssociatedRemote<IntegerSender> pending_remote; |
| CreateIntegerSender(&pending_remote, &pending_receiver); |
| |
| AssociatedRemote<IntegerSender> remote(std::move(pending_remote)); |
| |
| base::RunLoop run_loop; |
| remote.set_disconnect_with_reason_handler(base::BindLambdaForTesting( |
| [&](uint32_t custom_reason, const std::string& description) { |
| EXPECT_EQ(789u, custom_reason); |
| EXPECT_EQ("long time no see", description); |
| run_loop.Quit(); |
| })); |
| |
| pending_receiver.ResetWithReason(789u, "long time no see"); |
| |
| run_loop.Run(); |
| } |
| |
| TEST_F(AssociatedInterfaceTest, SharedAssociatedRemote) { |
| Remote<IntegerSenderConnection> connection_remote; |
| IntegerSenderConnectionImpl connection( |
| connection_remote.BindNewPipeAndPassReceiver()); |
| |
| PendingAssociatedRemote<IntegerSender> pending_remote; |
| connection_remote->GetSender( |
| pending_remote.InitWithNewEndpointAndPassReceiver()); |
| |
| SharedAssociatedRemote<IntegerSender> shared_sender( |
| std::move(pending_remote)); |
| |
| { |
| // Test the thread safe pointer can be used from the interface ptr thread. |
| base::RunLoop run_loop; |
| shared_sender->Echo(123, base::BindLambdaForTesting([&](int32_t value) { |
| EXPECT_EQ(123, value); |
| run_loop.Quit(); |
| })); |
| run_loop.Run(); |
| } |
| |
| // Test the thread safe pointer can be used from another thread. |
| base::RunLoop run_loop; |
| |
| auto sender_task_runner = base::ThreadPool::CreateSequencedTaskRunner({}); |
| auto quit_closure = run_loop.QuitClosure(); |
| sender_task_runner->PostTask( |
| FROM_HERE, base::BindLambdaForTesting([&] { |
| shared_sender->Echo( |
| 123, base::BindLambdaForTesting([&](int32_t value) { |
| EXPECT_EQ(123, value); |
| EXPECT_TRUE(sender_task_runner->RunsTasksInCurrentSequence()); |
| std::move(quit_closure).Run(); |
| })); |
| })); |
| |
| // Block until the method callback is called on the background thread. |
| run_loop.Run(); |
| } |
| |
| struct ForwarderTestContext { |
| Remote<IntegerSenderConnection> connection_remote; |
| std::unique_ptr<IntegerSenderConnectionImpl> interface_impl; |
| PendingAssociatedReceiver<IntegerSender> sender_receiver; |
| }; |
| |
| TEST_F(AssociatedInterfaceTest, SharedAssociatedRemoteWithTaskRunner) { |
| const scoped_refptr<base::SequencedTaskRunner> other_thread_task_runner = |
| base::ThreadPool::CreateSequencedTaskRunner({}); |
| |
| ForwarderTestContext* context = new ForwarderTestContext(); |
| PendingAssociatedRemote<IntegerSender> pending_remote; |
| base::WaitableEvent sender_bound_event( |
| base::WaitableEvent::ResetPolicy::MANUAL, |
| base::WaitableEvent::InitialState::NOT_SIGNALED); |
| other_thread_task_runner->PostTask( |
| FROM_HERE, base::BindLambdaForTesting([&] { |
| context->interface_impl = std::make_unique<IntegerSenderConnectionImpl>( |
| context->connection_remote.BindNewPipeAndPassReceiver()); |
| context->connection_remote->GetSender( |
| pending_remote.InitWithNewEndpointAndPassReceiver()); |
| sender_bound_event.Signal(); |
| })); |
| |
| sender_bound_event.Wait(); |
| |
| // Create a SharedAssociatedRemote that binds on the background thread and is |
| // associated with |connection_remote| there. |
| SharedAssociatedRemote<IntegerSender> shared_sender(std::move(pending_remote), |
| other_thread_task_runner); |
| |
| // Issue a call on the shared remote immediately. Note that this may happen |
| // before the interface is bound on the background thread, and that must be |
| // OK. |
| base::RunLoop run_loop; |
| shared_sender->Echo(123, base::BindLambdaForTesting([&](int32_t value) { |
| EXPECT_EQ(123, value); |
| run_loop.Quit(); |
| })); |
| run_loop.Run(); |
| |
| other_thread_task_runner->DeleteSoon(FROM_HERE, context); |
| |
| shared_sender.reset(); |
| } |
| |
| class DiscardingAssociatedPingProviderProvider |
| : public AssociatedPingProviderProvider { |
| public: |
| void GetPingProvider( |
| PendingAssociatedReceiver<AssociatedPingProvider> receiver) override {} |
| }; |
| |
| TEST_F(AssociatedInterfaceTest, CloseWithoutBindingAssociatedReceiver) { |
| DiscardingAssociatedPingProviderProvider ping_provider_provider; |
| mojo::Receiver<AssociatedPingProviderProvider> receiver( |
| &ping_provider_provider); |
| Remote<AssociatedPingProviderProvider> provider_provider; |
| receiver.Bind(provider_provider.BindNewPipeAndPassReceiver()); |
| AssociatedRemote<AssociatedPingProvider> provider; |
| provider_provider->GetPingProvider(provider.BindNewEndpointAndPassReceiver()); |
| AssociatedRemote<PingService> ping; |
| provider->GetPing(ping.BindNewEndpointAndPassReceiver()); |
| base::RunLoop run_loop; |
| ping.set_disconnect_handler(run_loop.QuitClosure()); |
| run_loop.Run(); |
| } |
| |
| TEST_F(AssociatedInterfaceTest, AssociateWithDisconnectedPipe) { |
| AssociatedRemote<IntegerSender> sender; |
| AssociateWithDisconnectedPipe( |
| sender.BindNewEndpointAndPassReceiver().PassHandle()); |
| sender->Send(42); |
| } |
| |
| TEST_F(AssociatedInterfaceTest, AsyncErrorHandlersWhenClosingPrimaryInterface) { |
| // Ensures that associated interface error handlers are not invoked |
| // synchronously when the primary interface pipe is closed. Regression test |
| // for https://crbug.com/864731. |
| |
| Remote<IntegerSenderConnection> connection_remote; |
| IntegerSenderConnectionImpl connection( |
| connection_remote.BindNewPipeAndPassReceiver()); |
| |
| base::RunLoop loop; |
| bool error_handler_invoked = false; |
| AssociatedRemote<IntegerSender> sender0; |
| connection_remote->GetSender(sender0.BindNewEndpointAndPassReceiver()); |
| sender0.set_disconnect_handler(base::BindLambdaForTesting([&] { |
| error_handler_invoked = true; |
| loop.Quit(); |
| })); |
| |
| // This should not trigger the error handler synchronously... |
| connection_remote.reset(); |
| EXPECT_FALSE(error_handler_invoked); |
| |
| // ...but it should be triggered once we spin the scheduler. |
| loop.Run(); |
| EXPECT_TRUE(error_handler_invoked); |
| } |
| |
| TEST_F(AssociatedInterfaceTest, AssociatedReceiverReportBadMessage) { |
| PendingAssociatedReceiver<IntegerSender> pending_receiver; |
| PendingAssociatedRemote<IntegerSender> pending_remote; |
| CreateIntegerSender(&pending_remote, &pending_receiver); |
| |
| IntegerSenderImpl impl(std::move(pending_receiver)); |
| AssociatedRemote<IntegerSender> remote(std::move(pending_remote)); |
| |
| bool called = false; |
| base::RunLoop run_loop; |
| remote.set_disconnect_handler(base::BindLambdaForTesting([&] { |
| called = true; |
| run_loop.Quit(); |
| })); |
| |
| std::string received_error; |
| SetDefaultProcessErrorHandler(base::BindLambdaForTesting( |
| [&](const std::string& error) { received_error = error; })); |
| |
| remote->Echo(-1, IntegerSenderImpl::EchoCallback()); |
| EXPECT_FALSE(called); |
| run_loop.Run(); |
| EXPECT_TRUE(called); |
| EXPECT_EQ("Reporting bad message for value == -1", received_error); |
| |
| SetDefaultProcessErrorHandler(base::NullCallback()); |
| } |
| |
| TEST_F(AssociatedInterfaceTest, AssociatedReceiverDedicatedPipe) { |
| PendingAssociatedRemote<IntegerSender> pending_remote; |
| PendingAssociatedReceiver<IntegerSender> pending_receiver = |
| pending_remote.InitWithNewEndpointAndPassReceiver(); |
| pending_receiver.EnableUnassociatedUsage(); |
| IntegerSenderImpl impl(std::move(pending_receiver)); |
| AssociatedRemote<IntegerSender> remote(std::move(pending_remote)); |
| |
| { |
| base::RunLoop run_loop; |
| impl.set_notify_send_method_called( |
| base::BindLambdaForTesting([&](int32_t x) { |
| EXPECT_EQ(88, x); |
| run_loop.Quit(); |
| })); |
| |
| remote->Send(88); |
| run_loop.Run(); |
| } |
| |
| { |
| base::RunLoop run_loop; |
| remote->Echo(888, base::BindLambdaForTesting([&](int32_t x) { |
| EXPECT_EQ(888, x); |
| run_loop.Quit(); |
| })); |
| } |
| } |
| |
| TEST_F(AssociatedInterfaceTest, AssociatedRemoteDedicatedPipe) { |
| PendingAssociatedRemote<IntegerSender> pending_remote; |
| PendingAssociatedReceiver<IntegerSender> pending_receiver = |
| pending_remote.InitWithNewEndpointAndPassReceiver(); |
| IntegerSenderImpl impl(std::move(pending_receiver)); |
| pending_remote.EnableUnassociatedUsage(); |
| AssociatedRemote<IntegerSender> remote(std::move(pending_remote)); |
| |
| { |
| base::RunLoop run_loop; |
| impl.set_notify_send_method_called( |
| base::BindLambdaForTesting([&](int32_t x) { |
| EXPECT_EQ(88, x); |
| run_loop.Quit(); |
| })); |
| |
| remote->Send(88); |
| run_loop.Run(); |
| } |
| |
| { |
| base::RunLoop run_loop; |
| remote->Echo(888, base::BindLambdaForTesting([&](int32_t x) { |
| EXPECT_EQ(888, x); |
| run_loop.Quit(); |
| })); |
| } |
| } |
| |
| class ClumsyBinderImpl : public mojom::ClumsyBinder { |
| public: |
| explicit ClumsyBinderImpl(PendingReceiver<mojom::ClumsyBinder> receiver) |
| : receiver_(this, std::move(receiver)) {} |
| ~ClumsyBinderImpl() override = default; |
| |
| // mojom::ClumsyBinder: |
| void DropAssociatedBinder( |
| PendingAssociatedReceiver<mojom::AssociatedBinder> receiver) override { |
| // Nothing to do but drop the receiver so it's closed. |
| } |
| |
| private: |
| Receiver<mojom::ClumsyBinder> receiver_; |
| }; |
| |
| TEST_F(AssociatedInterfaceTest, CloseSerializedAssociatedEndpoints) { |
| // Regression test for https://crbug.com/331636067. Verifies that endpoint |
| // lifetime is properly managed when associated endpoints are serialized into |
| // a message that gets dropped before transmission. |
| |
| Remote<mojom::ClumsyBinder> binder; |
| ClumsyBinderImpl binder_impl(binder.BindNewPipeAndPassReceiver()); |
| |
| AssociatedRemote<mojom::AssociatedBinder> associated_binder; |
| binder->DropAssociatedBinder( |
| associated_binder.BindNewEndpointAndPassReceiver()); |
| |
| // Wait for disconnection to be observed. This way we know any subsequent |
| // outgoing messages on `associated_binder` will not be sent. |
| base::RunLoop loop1; |
| associated_binder.set_disconnect_handler(loop1.QuitClosure()); |
| loop1.Run(); |
| |
| // Send another endpoint over. This receiver will be dropped, and the remote |
| // should be properly notified of peer closure to terminate this loop. |
| base::RunLoop loop2; |
| AssociatedRemote<mojom::AssociatedBinder> another_binder; |
| associated_binder->Bind(another_binder.BindNewEndpointAndPassReceiver()); |
| another_binder.set_disconnect_handler(loop2.QuitClosure()); |
| loop2.Run(); |
| } |
| |
| class TestSyncImpl : public TestSync { |
| public: |
| void Ping(PingCallback callback) override { std::move(callback).Run(); } |
| void NoInterruptPing(NoInterruptPingCallback callback) override { |
| std::move(callback).Run(); |
| } |
| void Echo(int32_t value, EchoCallback callback) override { |
| std::move(callback).Run(value); |
| } |
| void AsyncEcho(int32_t, AsyncEchoCallback) override { NOTREACHED(); } |
| }; |
| |
| class TestSyncPrimaryImpl : public TestSyncPrimary, public TestSync { |
| public: |
| explicit TestSyncPrimaryImpl(PendingReceiver<TestSyncPrimary> receiver) |
| : receiver_(this, std::move(receiver)), test_sync_receiver_(this) {} |
| ~TestSyncPrimaryImpl() override = default; |
| |
| static constexpr int32_t kReceivedPing = 0b001; |
| static constexpr int32_t kSyncCallWasAborted = 0b010; |
| static constexpr int32_t kSyncCall2WasAborted = 0b100; |
| static constexpr int32_t kNoInterruptPingReplied = 0b1000; |
| |
| void Ping(PingCallback callback) override { |
| result_ |= kReceivedPing; |
| std::move(callback).Run(); |
| } |
| void NoInterruptPing(NoInterruptPingCallback) override { NOTREACHED(); } |
| void Echo(int32_t value, EchoCallback callback) override { |
| std::move(callback).Run(result_); |
| } |
| void AsyncEcho(int32_t, AsyncEchoCallback) override { NOTREACHED(); } |
| |
| void SendRemote(PendingAssociatedRemote<TestSync> remote) override { |
| if (!test_sync_remote_.is_bound()) { |
| test_sync_remote_.Bind(std::move(remote)); |
| } else { |
| test_sync_remote2_.Bind(std::move(remote)); |
| } |
| CHECK(!test_sync_receiver_.is_bound()); |
| } |
| |
| void SendReceiver(PendingAssociatedReceiver<TestSync> receiver) override { |
| test_sync_receiver_.Bind(std::move(receiver)); |
| { |
| base::ScopedAllowBaseSyncPrimitivesForTesting allow_sync; |
| DoTest(); |
| } |
| } |
| |
| virtual void DoTest() { |
| CHECK(test_sync_remote_.is_bound()); |
| int reply = -1; |
| // If we got a second test sync remote, make the first sync call on that |
| // remote to verify behavior when the peer closed event comes in while |
| // we're blocked on a different interface. |
| if (test_sync_remote2_.is_bound()) { |
| if (test_sync_remote2_->NoInterruptPing()) { |
| result_ |= kNoInterruptPingReplied; |
| } |
| // Make another sync call on the secondary remote. This doesn't change |
| // anything for most test cases, but is important for |
| // TestHangOnDisconnectWithSignaledWatcher to make sure |
| // SequenceLocalSyncEventWatcher has cleared out its "ready" watchers. |
| test_sync_remote2_->Ping(); |
| } |
| bool call_result = test_sync_remote_->Echo(123, &reply); |
| if (!call_result) { |
| result_ |= kSyncCallWasAborted; |
| } |
| // Make a second sync call to make sure that one gets correctly aborted as |
| // well. |
| call_result = test_sync_remote_->Echo(456, &reply); |
| if (!call_result) { |
| result_ |= kSyncCall2WasAborted; |
| } |
| } |
| |
| protected: |
| Receiver<TestSyncPrimary> receiver_; |
| AssociatedRemote<TestSync> test_sync_remote_; |
| AssociatedRemote<TestSync> test_sync_remote2_; |
| AssociatedReceiver<TestSync> test_sync_receiver_; |
| int32_t result_ = 0; |
| }; |
| |
| // Regression test for https://crbug.com/435493653 and |
| // https://crbug.com/436965298. Verifies that sync calls made on an associated |
| // remote are correctly aborted if the receiver endpoint is closed even if other |
| // messages are queued on the same message pipe first. |
| TEST_F(AssociatedInterfaceTest, TestHangOnDisconnect) { |
| Remote<TestSyncPrimary> primary_remote; |
| base::SequenceBound<TestSyncPrimaryImpl> primary_impl( |
| base::ThreadPool::CreateSequencedTaskRunner({}), |
| primary_remote.BindNewPipeAndPassReceiver()); |
| |
| TestSyncImpl sync_impl; |
| AssociatedReceiver<TestSync> sync_receiver(&sync_impl); |
| AssociatedRemote<TestSync> sync_remote; |
| primary_remote->SendRemote(sync_receiver.BindNewEndpointAndPassRemote()); |
| primary_remote->SendReceiver(sync_remote.BindNewEndpointAndPassReceiver()); |
| |
| sync_remote->Ping(base::DoNothing()); |
| sync_remote.reset(); |
| |
| sync_receiver.reset(); |
| |
| base::test::TestFuture<int32_t> result; |
| primary_remote->Echo(0, result.GetCallback()); |
| EXPECT_EQ(TestSyncPrimaryImpl::kReceivedPing | |
| TestSyncPrimaryImpl::kSyncCallWasAborted | |
| TestSyncPrimaryImpl::kSyncCall2WasAborted, |
| result.Get()); |
| |
| primary_impl.SynchronouslyResetForTest(); |
| } |
| |
| // Slight variation of the above test, where the peer disconnect happens while |
| // blocked on a different interface than the one being disconnected. |
| // Additionally this test makes sure the disconnect event arrives while blocked |
| // on a NoInterrupt sync call, since that code path is slightly more |
| // complicated. |
| TEST_F(AssociatedInterfaceTest, TestHangOnDisconnectDifferentEndpoint) { |
| Remote<TestSyncPrimary> primary_remote; |
| base::SequenceBound<TestSyncPrimaryImpl> primary_impl( |
| base::ThreadPool::CreateSequencedTaskRunner({}), |
| primary_remote.BindNewPipeAndPassReceiver()); |
| |
| TestSyncImpl sync_impl; |
| AssociatedReceiver<TestSync> sync_receiver(&sync_impl); |
| AssociatedReceiver<TestSync> sync_receiver2(&sync_impl); |
| AssociatedRemote<TestSync> sync_remote; |
| primary_remote->SendRemote(sync_receiver.BindNewEndpointAndPassRemote()); |
| primary_remote->SendRemote(sync_receiver2.BindNewEndpointAndPassRemote()); |
| primary_remote->SendReceiver(sync_remote.BindNewEndpointAndPassReceiver()); |
| |
| sync_remote.reset(); |
| |
| sync_receiver.reset(); |
| |
| base::test::TestFuture<int32_t> result; |
| primary_remote->Echo(0, result.GetCallback()); |
| EXPECT_EQ(TestSyncPrimaryImpl::kSyncCallWasAborted | |
| TestSyncPrimaryImpl::kSyncCall2WasAborted | |
| TestSyncPrimaryImpl::kNoInterruptPingReplied, |
| result.Get()); |
| |
| primary_impl.SynchronouslyResetForTest(); |
| } |
| |
| // Variation of TestHangOnDisconnectDifferentEndpoint, that additionally sets |
| // things up such that MultiplexRouter::EndPoint will have its sync_watcher_ |
| // populated at the time the peer closed event is handled, by carefully ordering |
| // the messages and events sent over the message pipe. |
| TEST_F(AssociatedInterfaceTest, TestHangOnDisconnectWithSignaledWatcher) { |
| Remote<TestSyncPrimary> primary_remote; |
| // By making one extra sync call before the rest of the test body we can make |
| // sure that the `sync_watcher_` field for the relevant end point has been |
| // initialized. |
| class TestSyncPrimaryImplWithExtraSyncCall : public TestSyncPrimaryImpl { |
| public: |
| using TestSyncPrimaryImpl::TestSyncPrimaryImpl; |
| void DoTest() override { |
| test_sync_remote_->Ping(); |
| TestSyncPrimaryImpl::DoTest(); |
| } |
| }; |
| |
| // For this test we need to make sure to disconnect the interface that will |
| // hang while a sync call on a secondary interface is being made. So override |
| // NoInterruptPing to do that disconnect. |
| class TestSyncImplWithCallback : public TestSyncImpl { |
| public: |
| void NoInterruptPing(NoInterruptPingCallback callback) override { |
| if (no_interrupt_ping_callback_) { |
| std::move(no_interrupt_ping_callback_).Run(); |
| } |
| std::move(callback).Run(); |
| } |
| |
| base::OnceClosure no_interrupt_ping_callback_; |
| }; |
| |
| base::SequenceBound<TestSyncPrimaryImplWithExtraSyncCall> primary_impl( |
| base::ThreadPool::CreateSequencedTaskRunner({}), |
| primary_remote.BindNewPipeAndPassReceiver()); |
| |
| TestSyncImplWithCallback sync_impl; |
| AssociatedReceiver<TestSync> sync_receiver(&sync_impl); |
| AssociatedReceiver<TestSync> sync_receiver2(&sync_impl); |
| |
| sync_impl.no_interrupt_ping_callback_ = |
| base::BindLambdaForTesting([&]() { sync_receiver.reset(); }); |
| |
| AssociatedRemote<TestSync> sync_remote; |
| primary_remote->SendRemote(sync_receiver.BindNewEndpointAndPassRemote()); |
| primary_remote->SendRemote(sync_receiver2.BindNewEndpointAndPassRemote()); |
| primary_remote->SendReceiver(sync_remote.BindNewEndpointAndPassReceiver()); |
| |
| base::test::TestFuture<int32_t> result; |
| primary_remote->Echo(0, result.GetCallback()); |
| EXPECT_EQ(TestSyncPrimaryImpl::kSyncCallWasAborted | |
| TestSyncPrimaryImpl::kSyncCall2WasAborted | |
| TestSyncPrimaryImpl::kNoInterruptPingReplied, |
| result.Get()); |
| |
| primary_impl.SynchronouslyResetForTest(); |
| } |
| |
| } // namespace |
| } // namespace associated_interface_unittest |
| } // namespace test |
| } // namespace mojo |