blob: 70bb2181807235c7fd65fbc248a03b0c5868d73c [file] [log] [blame]
// Copyright 2017 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/core/channel.h"
#include <atomic>
#include <optional>
#include "base/functional/bind.h"
#include "base/memory/page_size.h"
#include "base/memory/ptr_util.h"
#include "base/message_loop/message_pump_type.h"
#include "base/process/process_handle.h"
#include "base/run_loop.h"
#include "base/strings/stringprintf.h"
#include "base/task/single_thread_task_runner.h"
#include "base/test/bind.h"
#include "base/test/task_environment.h"
#include "base/threading/thread.h"
#include "build/build_config.h"
#include "mojo/core/platform_handle_utils.h"
#include "mojo/public/cpp/platform/platform_channel.h"
#include "testing/gmock/include/gmock/gmock.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace mojo {
namespace core {
namespace {
class TestChannel : public Channel {
public:
TestChannel(Channel::Delegate* delegate)
: Channel(delegate, Channel::HandlePolicy::kAcceptHandles) {}
char* GetReadBufferTest(size_t* buffer_capacity) {
return GetReadBuffer(buffer_capacity);
}
bool OnReadCompleteTest(size_t bytes_read, size_t* next_read_size_hint) {
return OnReadComplete(bytes_read, next_read_size_hint);
}
MOCK_METHOD7(GetReadPlatformHandles,
bool(const void* payload,
size_t payload_size,
size_t num_handles,
const void* extra_header,
size_t extra_header_size,
std::vector<PlatformHandle>* handles,
bool* deferred));
MOCK_METHOD2(GetReadPlatformHandlesForIpcz,
bool(size_t, std::vector<PlatformHandle>&));
MOCK_METHOD0(Start, void());
MOCK_METHOD0(ShutDownImpl, void());
MOCK_METHOD0(LeakHandle, void());
void Write(MessagePtr message) override {}
protected:
~TestChannel() override = default;
};
// Not using GMock as I don't think it supports movable types.
class MockChannelDelegate : public Channel::Delegate {
public:
MockChannelDelegate() = default;
size_t GetReceivedPayloadSize() const { return payload_size_; }
const void* GetReceivedPayload() const { return payload_.get(); }
protected:
void OnChannelMessage(const void* payload,
size_t payload_size,
std::vector<PlatformHandle> handles) override {
payload_.reset(new char[payload_size]);
memcpy(payload_.get(), payload, payload_size);
payload_size_ = payload_size;
}
// Notify that an error has occured and the Channel will cease operation.
void OnChannelError(Channel::Error error) override {}
private:
size_t payload_size_ = 0;
std::unique_ptr<char[]> payload_;
};
Channel::MessagePtr CreateDefaultMessage(bool legacy_message) {
const size_t payload_size = 100;
Channel::MessagePtr message = Channel::Message::CreateMessage(
payload_size, 0,
legacy_message ? Channel::Message::MessageType::NORMAL_LEGACY
: Channel::Message::MessageType::NORMAL);
char* payload = static_cast<char*>(message->mutable_payload());
for (size_t i = 0; i < payload_size; i++) {
payload[i] = static_cast<char>(i);
}
return message;
}
void TestMemoryEqual(const void* data1,
size_t data1_size,
const void* data2,
size_t data2_size) {
ASSERT_EQ(data1_size, data2_size);
const unsigned char* data1_char = static_cast<const unsigned char*>(data1);
const unsigned char* data2_char = static_cast<const unsigned char*>(data2);
for (size_t i = 0; i < data1_size; i++) {
// ASSERT so we don't log tons of errors if the data is different.
ASSERT_EQ(data1_char[i], data2_char[i]);
}
}
void TestMessagesAreEqual(Channel::Message* message1,
Channel::Message* message2,
bool legacy_messages) {
// If any of the message is null, this is probably not what you wanted to
// test.
ASSERT_NE(nullptr, message1);
ASSERT_NE(nullptr, message2);
ASSERT_EQ(message1->payload_size(), message2->payload_size());
EXPECT_EQ(message1->has_handles(), message2->has_handles());
TestMemoryEqual(message1->payload(), message1->payload_size(),
message2->payload(), message2->payload_size());
if (legacy_messages)
return;
ASSERT_EQ(message1->extra_header_size(), message2->extra_header_size());
TestMemoryEqual(message1->extra_header(), message1->extra_header_size(),
message2->extra_header(), message2->extra_header_size());
}
TEST(ChannelTest, LegacyMessageDeserialization) {
Channel::MessagePtr message = CreateDefaultMessage(true /* legacy_message */);
Channel::MessagePtr deserialized_message =
Channel::Message::Deserialize(message->data(), message->data_num_bytes(),
Channel::HandlePolicy::kAcceptHandles);
TestMessagesAreEqual(message.get(), deserialized_message.get(),
true /* legacy_message */);
}
TEST(ChannelTest, NonLegacyMessageDeserialization) {
Channel::MessagePtr message =
CreateDefaultMessage(false /* legacy_message */);
Channel::MessagePtr deserialized_message =
Channel::Message::Deserialize(message->data(), message->data_num_bytes(),
Channel::HandlePolicy::kAcceptHandles);
TestMessagesAreEqual(message.get(), deserialized_message.get(),
false /* legacy_message */);
}
TEST(ChannelTest, OnReadLegacyMessage) {
size_t buffer_size = 100 * 1024;
Channel::MessagePtr message = CreateDefaultMessage(true /* legacy_message */);
MockChannelDelegate channel_delegate;
scoped_refptr<TestChannel> channel = new TestChannel(&channel_delegate);
char* read_buffer = channel->GetReadBufferTest(&buffer_size);
ASSERT_LT(message->data_num_bytes(),
buffer_size); // Bad test. Increase buffer
// size.
memcpy(read_buffer, message->data(), message->data_num_bytes());
size_t next_read_size_hint = 0;
EXPECT_TRUE(channel->OnReadCompleteTest(message->data_num_bytes(),
&next_read_size_hint));
TestMemoryEqual(message->payload(), message->payload_size(),
channel_delegate.GetReceivedPayload(),
channel_delegate.GetReceivedPayloadSize());
}
TEST(ChannelTest, OnReadNonLegacyMessage) {
size_t buffer_size = 100 * 1024;
Channel::MessagePtr message =
CreateDefaultMessage(false /* legacy_message */);
MockChannelDelegate channel_delegate;
scoped_refptr<TestChannel> channel = new TestChannel(&channel_delegate);
char* read_buffer = channel->GetReadBufferTest(&buffer_size);
ASSERT_LT(message->data_num_bytes(),
buffer_size); // Bad test. Increase buffer
// size.
memcpy(read_buffer, message->data(), message->data_num_bytes());
size_t next_read_size_hint = 0;
EXPECT_TRUE(channel->OnReadCompleteTest(message->data_num_bytes(),
&next_read_size_hint));
TestMemoryEqual(message->payload(), message->payload_size(),
channel_delegate.GetReceivedPayload(),
channel_delegate.GetReceivedPayloadSize());
}
class ChannelTestShutdownAndWriteDelegate : public Channel::Delegate {
public:
ChannelTestShutdownAndWriteDelegate(
PlatformChannelEndpoint endpoint,
scoped_refptr<base::SingleThreadTaskRunner> task_runner,
scoped_refptr<Channel> client_channel,
std::unique_ptr<base::Thread> client_thread,
base::RepeatingClosure quit_closure)
: quit_closure_(std::move(quit_closure)),
client_channel_(std::move(client_channel)),
client_thread_(std::move(client_thread)) {
channel_ = Channel::Create(this, ConnectionParams(std::move(endpoint)),
Channel::HandlePolicy::kAcceptHandles,
std::move(task_runner));
channel_->Start();
}
~ChannelTestShutdownAndWriteDelegate() override { channel_->ShutDown(); }
// Channel::Delegate implementation
void OnChannelMessage(const void* payload,
size_t payload_size,
std::vector<PlatformHandle> handles) override {
++message_count_;
// If |client_channel_| exists then close it and its thread.
if (client_channel_) {
// Write a fresh message, making our channel readable again.
Channel::MessagePtr message = CreateDefaultMessage(false);
client_thread_->task_runner()->PostTask(
FROM_HERE,
base::BindOnce(&Channel::Write, client_channel_, std::move(message)));
// Close the channel and wait for it to shutdown.
client_channel_->ShutDown();
client_channel_ = nullptr;
client_thread_->Stop();
client_thread_ = nullptr;
}
// Write a message to the channel, to verify whether this triggers an
// OnChannelError callback before all messages were read.
Channel::MessagePtr message = CreateDefaultMessage(false);
channel_->Write(std::move(message));
}
void OnChannelError(Channel::Error error) override {
EXPECT_EQ(2, message_count_);
quit_closure_.Run();
}
base::RepeatingClosure quit_closure_;
int message_count_ = 0;
scoped_refptr<Channel> channel_;
scoped_refptr<Channel> client_channel_;
std::unique_ptr<base::Thread> client_thread_;
};
TEST(ChannelTest, PeerShutdownDuringRead) {
base::test::SingleThreadTaskEnvironment task_environment(
base::test::TaskEnvironment::MainThreadType::IO);
PlatformChannel channel;
// Create a "client" Channel with one end of the pipe, and Start() it.
std::unique_ptr<base::Thread> client_thread =
std::make_unique<base::Thread>("clientio_thread");
client_thread->StartWithOptions(
base::Thread::Options(base::MessagePumpType::IO, 0));
scoped_refptr<Channel> client_channel = Channel::Create(
nullptr, ConnectionParams(channel.TakeRemoteEndpoint()),
Channel::HandlePolicy::kAcceptHandles, client_thread->task_runner());
client_channel->Start();
// On the "client" IO thread, create and write a message.
Channel::MessagePtr message = CreateDefaultMessage(false);
client_thread->task_runner()->PostTask(
FROM_HERE,
base::BindOnce(&Channel::Write, client_channel, std::move(message)));
// Create a "server" Channel with the other end of the pipe, and process the
// messages from it. The |server_delegate| will ShutDown the client end of
// the pipe after the first message, and quit the RunLoop when OnChannelError
// is received.
base::RunLoop run_loop;
ChannelTestShutdownAndWriteDelegate server_delegate(
channel.TakeLocalEndpoint(),
base::SingleThreadTaskRunner::GetCurrentDefault(),
std::move(client_channel), std::move(client_thread),
run_loop.QuitClosure());
run_loop.Run();
}
class RejectHandlesDelegate : public Channel::Delegate {
public:
RejectHandlesDelegate() = default;
RejectHandlesDelegate(const RejectHandlesDelegate&) = delete;
RejectHandlesDelegate& operator=(const RejectHandlesDelegate&) = delete;
size_t num_messages() const { return num_messages_; }
// Channel::Delegate:
void OnChannelMessage(const void* payload,
size_t payload_size,
std::vector<PlatformHandle> handles) override {
++num_messages_;
}
void OnChannelError(Channel::Error error) override {
if (wait_for_error_loop_)
wait_for_error_loop_->Quit();
}
void WaitForError() {
wait_for_error_loop_.emplace();
wait_for_error_loop_->Run();
}
private:
size_t num_messages_ = 0;
std::optional<base::RunLoop> wait_for_error_loop_;
};
TEST(ChannelTest, RejectHandles) {
base::test::SingleThreadTaskEnvironment task_environment(
base::test::TaskEnvironment::MainThreadType::IO);
PlatformChannel platform_channel;
RejectHandlesDelegate receiver_delegate;
scoped_refptr<Channel> receiver =
Channel::Create(&receiver_delegate,
ConnectionParams(platform_channel.TakeLocalEndpoint()),
Channel::HandlePolicy::kRejectHandles,
base::SingleThreadTaskRunner::GetCurrentDefault());
receiver->Start();
RejectHandlesDelegate sender_delegate;
scoped_refptr<Channel> sender = Channel::Create(
&sender_delegate, ConnectionParams(platform_channel.TakeRemoteEndpoint()),
Channel::HandlePolicy::kRejectHandles,
base::SingleThreadTaskRunner::GetCurrentDefault());
sender->Start();
// Create another platform channel just to stuff one of its endpoint handles
// into a message. Sending this message to the receiver should cause the
// receiver to reject it and close the Channel without ever dispatching the
// message.
PlatformChannel dummy_channel;
std::vector<mojo::PlatformHandle> handles;
handles.push_back(dummy_channel.TakeLocalEndpoint().TakePlatformHandle());
auto message = Channel::Message::CreateMessage(0 /* payload_size */,
1 /* max_handles */);
message->SetHandles(std::move(handles));
sender->Write(std::move(message));
receiver_delegate.WaitForError();
EXPECT_EQ(0u, receiver_delegate.num_messages());
}
TEST(ChannelTest, DeserializeMessage_BadExtraHeaderSize) {
// Verifies that a message payload is rejected when the extra header chunk
// size not properly aligned.
constexpr uint16_t kBadAlignment = kChannelMessageAlignment + 1;
constexpr uint16_t kTotalHeaderSize =
sizeof(Channel::Message::Header) + kBadAlignment;
constexpr uint32_t kEmptyPayloadSize = 8;
constexpr uint32_t kMessageSize = kTotalHeaderSize + kEmptyPayloadSize;
char message[kMessageSize];
memset(message, 0, kMessageSize);
Channel::Message::Header* header =
reinterpret_cast<Channel::Message::Header*>(&message[0]);
header->num_bytes = kMessageSize;
header->num_header_bytes = kTotalHeaderSize;
header->message_type = Channel::Message::MessageType::NORMAL;
header->num_handles = 0;
EXPECT_EQ(nullptr,
Channel::Message::Deserialize(&message[0], kMessageSize,
Channel::HandlePolicy::kAcceptHandles,
base::kNullProcessHandle));
}
// This test is only enabled for Linux-based platforms.
#if !BUILDFLAG(IS_WIN) && !BUILDFLAG(IS_APPLE) && !BUILDFLAG(IS_FUCHSIA)
TEST(ChannelTest, DeserializeMessage_NonZeroExtraHeaderSize) {
// Verifies that a message payload is rejected when the extra header chunk
// size anything but zero on Linux, even if it's aligned.
constexpr uint16_t kTotalHeaderSize =
sizeof(Channel::Message::Header) + kChannelMessageAlignment;
constexpr uint32_t kEmptyPayloadSize = 8;
constexpr uint32_t kMessageSize = kTotalHeaderSize + kEmptyPayloadSize;
char message[kMessageSize];
memset(message, 0, kMessageSize);
Channel::Message::Header* header =
reinterpret_cast<Channel::Message::Header*>(&message[0]);
header->num_bytes = kMessageSize;
header->num_header_bytes = kTotalHeaderSize;
header->message_type = Channel::Message::MessageType::NORMAL;
header->num_handles = 0;
EXPECT_EQ(nullptr,
Channel::Message::Deserialize(&message[0], kMessageSize,
Channel::HandlePolicy::kAcceptHandles,
base::kNullProcessHandle));
}
#endif
class CountingChannelDelegate : public Channel::Delegate {
public:
explicit CountingChannelDelegate(base::OnceClosure on_final_message)
: on_final_message_(std::move(on_final_message)) {}
~CountingChannelDelegate() override = default;
void OnChannelMessage(const void* payload,
size_t payload_size,
std::vector<PlatformHandle> handles) override {
// If this is the special "final message", run the closure.
if (payload_size == 1) {
auto* payload_str = reinterpret_cast<const char*>(payload);
if (payload_str[0] == '!') {
std::move(on_final_message_).Run();
return;
}
}
++message_count_;
}
void OnChannelError(Channel::Error error) override { ++error_count_; }
size_t message_count_ = 0;
size_t error_count_ = 0;
private:
base::OnceClosure on_final_message_;
};
TEST(ChannelTest, PeerStressTest) {
constexpr size_t kLotsOfMessages = 1024;
base::test::SingleThreadTaskEnvironment task_environment(
base::test::TaskEnvironment::MainThreadType::IO);
base::RunLoop run_loop;
// Both channels should receive all the messages that each is sent. When
// the count becomes 2 (indicating both channels have received the final
// message), quit the main test thread's run loop.
std::atomic_int count_channels_received_final_message(0);
auto quit_when_both_channels_received_final_message = base::BindRepeating(
[](std::atomic_int* count_channels_received_final_message,
base::OnceClosure quit_closure) {
if (++(*count_channels_received_final_message) == 2) {
std::move(quit_closure).Run();
}
},
base::Unretained(&count_channels_received_final_message),
run_loop.QuitClosure());
// Create a second IO thread for the peer channel.
base::Thread::Options thread_options;
thread_options.message_pump_type = base::MessagePumpType::IO;
base::Thread peer_thread("peer_b_io");
peer_thread.StartWithOptions(std::move(thread_options));
// Create two channels that run on separate threads.
PlatformChannel platform_channel;
CountingChannelDelegate delegate_a(
quit_when_both_channels_received_final_message);
scoped_refptr<Channel> channel_a = Channel::Create(
&delegate_a, ConnectionParams(platform_channel.TakeLocalEndpoint()),
Channel::HandlePolicy::kRejectHandles,
base::SingleThreadTaskRunner::GetCurrentDefault());
CountingChannelDelegate delegate_b(
quit_when_both_channels_received_final_message);
scoped_refptr<Channel> channel_b = Channel::Create(
&delegate_b, ConnectionParams(platform_channel.TakeRemoteEndpoint()),
Channel::HandlePolicy::kRejectHandles, peer_thread.task_runner());
// Send a lot of messages, followed by a final terminating message.
auto send_lots_of_messages = [](scoped_refptr<Channel> channel) {
for (size_t i = 0; i < kLotsOfMessages; ++i) {
channel->Write(Channel::Message::CreateMessage(0, 0));
}
};
auto send_final_message = [](scoped_refptr<Channel> channel) {
auto message = Channel::Message::CreateMessage(1, 0);
auto* payload = static_cast<char*>(message->mutable_payload());
payload[0] = '!';
channel->Write(std::move(message));
};
channel_a->Start();
channel_b->Start();
send_lots_of_messages(channel_a);
send_lots_of_messages(channel_b);
base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE, base::BindOnce(send_lots_of_messages, channel_a));
base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE, base::BindOnce(send_lots_of_messages, channel_a));
base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE, base::BindOnce(send_final_message, channel_a));
peer_thread.task_runner()->PostTask(
FROM_HERE, base::BindOnce(send_lots_of_messages, channel_b));
peer_thread.task_runner()->PostTask(
FROM_HERE, base::BindOnce(send_lots_of_messages, channel_b));
peer_thread.task_runner()->PostTask(
FROM_HERE, base::BindOnce(send_final_message, channel_b));
// Run until quit_when_both_channels_received_final_message quits the loop.
run_loop.Run();
channel_a->ShutDown();
channel_b->ShutDown();
peer_thread.StopSoon();
base::RunLoop().RunUntilIdle();
EXPECT_EQ(kLotsOfMessages * 3, delegate_a.message_count_);
EXPECT_EQ(kLotsOfMessages * 3, delegate_b.message_count_);
EXPECT_EQ(0u, delegate_a.error_count_);
EXPECT_EQ(0u, delegate_b.error_count_);
}
class CallbackChannelDelegate : public Channel::Delegate {
public:
CallbackChannelDelegate() = default;
CallbackChannelDelegate(const CallbackChannelDelegate&) = delete;
CallbackChannelDelegate& operator=(const CallbackChannelDelegate&) = delete;
void OnChannelMessage(const void* payload,
size_t payload_size,
std::vector<PlatformHandle> handles) override {
if (on_message_)
std::move(on_message_).Run();
}
void OnChannelError(Channel::Error error) override {
if (on_error_)
std::move(on_error_).Run();
}
void set_on_message(base::OnceClosure on_message) {
on_message_ = std::move(on_message);
}
void set_on_error(base::OnceClosure on_error) {
on_error_ = std::move(on_error);
}
private:
base::OnceClosure on_message_;
base::OnceClosure on_error_;
};
TEST(ChannelTest, MessageSizeTest) {
base::test::SingleThreadTaskEnvironment task_environment(
base::test::TaskEnvironment::MainThreadType::IO);
PlatformChannel platform_channel;
CallbackChannelDelegate receiver_delegate;
scoped_refptr<Channel> receiver =
Channel::Create(&receiver_delegate,
ConnectionParams(platform_channel.TakeLocalEndpoint()),
Channel::HandlePolicy::kAcceptHandles,
base::SingleThreadTaskRunner::GetCurrentDefault());
receiver->Start();
MockChannelDelegate sender_delegate;
scoped_refptr<Channel> sender = Channel::Create(
&sender_delegate, ConnectionParams(platform_channel.TakeRemoteEndpoint()),
Channel::HandlePolicy::kAcceptHandles,
base::SingleThreadTaskRunner::GetCurrentDefault());
sender->Start();
for (uint32_t i = 0; i < base::GetPageSize() * 4; ++i) {
SCOPED_TRACE(base::StringPrintf("message size %d", i));
auto message = Channel::Message::CreateMessage(i, 0);
memset(message->mutable_payload(), 0xAB, i);
sender->Write(std::move(message));
bool got_message = false, got_error = false;
base::RunLoop loop;
receiver_delegate.set_on_message(
base::BindLambdaForTesting([&got_message, &loop]() {
got_message = true;
loop.Quit();
}));
receiver_delegate.set_on_error(
base::BindLambdaForTesting([&got_error, &loop]() {
got_error = true;
loop.Quit();
}));
loop.Run();
EXPECT_TRUE(got_message);
EXPECT_FALSE(got_error);
}
}
#if BUILDFLAG(IS_MAC)
TEST(ChannelTest, SendToDeadMachPortName) {
base::test::SingleThreadTaskEnvironment task_environment(
base::test::TaskEnvironment::MainThreadType::IO);
// Create a second IO thread for the B channel. It needs to process tasks
// separately from channel A.
base::Thread::Options thread_options;
thread_options.message_pump_type = base::MessagePumpType::IO;
base::Thread peer_thread("channel_b_io");
peer_thread.StartWithOptions(std::move(thread_options));
// Create a PlatformChannel send/receive right pair.
PlatformChannel platform_channel;
mach_port_urefs_t send = 0, dead = 0;
mach_port_t send_name = platform_channel.local_endpoint()
.platform_handle()
.GetMachSendRight()
.get();
auto get_send_name_refs = [&send, &dead, send_name]() {
kern_return_t kr = mach_port_get_refs(mach_task_self(), send_name,
MACH_PORT_RIGHT_SEND, &send);
ASSERT_EQ(kr, KERN_SUCCESS);
kr = mach_port_get_refs(mach_task_self(), send_name,
MACH_PORT_RIGHT_DEAD_NAME, &dead);
ASSERT_EQ(kr, KERN_SUCCESS);
};
get_send_name_refs();
EXPECT_EQ(1u, send);
EXPECT_EQ(0u, dead);
// Add an extra send right.
ASSERT_EQ(KERN_SUCCESS, mach_port_mod_refs(mach_task_self(), send_name,
MACH_PORT_RIGHT_SEND, 1));
get_send_name_refs();
EXPECT_EQ(2u, send);
EXPECT_EQ(0u, dead);
base::apple::ScopedMachSendRight extra_send(send_name);
// Channel A gets created with the Mach send right from |platform_channel|.
CallbackChannelDelegate delegate_a;
scoped_refptr<Channel> channel_a = Channel::Create(
&delegate_a, ConnectionParams(platform_channel.TakeLocalEndpoint()),
Channel::HandlePolicy::kAcceptHandles,
base::SingleThreadTaskRunner::GetCurrentDefault());
channel_a->Start();
// Channel B gets the receive right.
MockChannelDelegate delegate_b;
scoped_refptr<Channel> channel_b = Channel::Create(
&delegate_b, ConnectionParams(platform_channel.TakeRemoteEndpoint()),
Channel::HandlePolicy::kAcceptHandles, peer_thread.task_runner());
channel_b->Start();
// Ensure the channels have started and are talking.
channel_b->Write(Channel::Message::CreateMessage(0, 0));
{
base::RunLoop loop;
delegate_a.set_on_message(loop.QuitClosure());
loop.Run();
}
// Queue two messages from B to A. Two are required so that channel A does
// not immediately process the dead-name notification when channel B shuts
// down.
channel_b->Write(Channel::Message::CreateMessage(0, 0));
channel_b->Write(Channel::Message::CreateMessage(0, 0));
// Turn Channel A's send right into a dead name.
channel_b->ShutDown();
channel_b = nullptr;
// ShutDown() posts a task on the channel's TaskRunner, so wait for that
// to run.
base::WaitableEvent event;
peer_thread.task_runner()->PostTask(
FROM_HERE,
base::BindOnce(&base::WaitableEvent::Signal, base::Unretained(&event)));
event.Wait();
// Force a send-to-dead-name on Channel A.
channel_a->Write(Channel::Message::CreateMessage(0, 0));
{
base::RunLoop loop;
delegate_a.set_on_error(base::BindOnce(
[](scoped_refptr<Channel> channel, base::RunLoop* loop) {
channel->ShutDown();
channel = nullptr;
loop->QuitWhenIdle();
},
channel_a, base::Unretained(&loop)));
loop.Run();
}
// The only remaining ref should be the extra one that was added in the test.
get_send_name_refs();
EXPECT_EQ(0u, send);
EXPECT_EQ(1u, dead);
}
#endif // BUILDFLAG(IS_MAC)
TEST(ChannelTest, ShutDownStress) {
base::test::SingleThreadTaskEnvironment task_environment(
base::test::TaskEnvironment::MainThreadType::IO);
// Create a second IO thread for Channel B.
base::Thread peer_thread("channel_b_io");
peer_thread.StartWithOptions(
base::Thread::Options(base::MessagePumpType::IO, 0));
// Create two channels, A and B, which run on different threads.
PlatformChannel platform_channel;
CallbackChannelDelegate delegate_a;
scoped_refptr<Channel> channel_a = Channel::Create(
&delegate_a, ConnectionParams(platform_channel.TakeLocalEndpoint()),
Channel::HandlePolicy::kRejectHandles,
task_environment.GetMainThreadTaskRunner());
channel_a->Start();
scoped_refptr<Channel> channel_b = Channel::Create(
nullptr, ConnectionParams(platform_channel.TakeRemoteEndpoint()),
Channel::HandlePolicy::kRejectHandles, peer_thread.task_runner());
channel_b->Start();
base::WaitableEvent go_event;
// Warm up the channel to ensure that A and B are connected, then quit.
channel_b->Write(Channel::Message::CreateMessage(0, 0));
{
base::RunLoop run_loop;
delegate_a.set_on_message(run_loop.QuitClosure());
run_loop.Run();
}
// Block the peer thread while some tasks are queued up from the test main
// thread.
peer_thread.task_runner()->PostTask(
FROM_HERE,
base::BindOnce(&base::WaitableEvent::Wait, base::Unretained(&go_event)));
// First, write some messages for Channel B.
for (int i = 0; i < 500; ++i) {
channel_b->Write(Channel::Message::CreateMessage(0, 0));
}
// Then shut down channel B.
channel_b->ShutDown();
// Un-block the peer thread.
go_event.Signal();
// And then flood the channel with messages. This will suss out data races
// during Channel B's shutdown, since Writes can happen across threads
// without a PostTask.
for (int i = 0; i < 1000; ++i) {
channel_b->Write(Channel::Message::CreateMessage(0, 0));
}
// Explicitly join the thread to wait for pending tasks, which may reference
// stack variables, to complete.
peer_thread.Stop();
}
} // namespace
} // namespace core
} // namespace mojo