blob: 5e341436926b9ac10138fa1b68a7e19d6e98f3f8 [file] [log] [blame]
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <stdlib.h>
#include <string.h>
#include <string>
#include "gtest/gtest.h"
#include "mojo/public/cpp/bindings/lib/connector.h"
#include "mojo/public/cpp/bindings/lib/message_builder.h"
#include "mojo/public/cpp/bindings/tests/message_queue.h"
#include "mojo/public/cpp/environment/logging.h"
#include "mojo/public/cpp/system/macros.h"
#include "mojo/public/cpp/utility/run_loop.h"
namespace mojo {
namespace test {
namespace {
class ConnectorTest : public testing::Test {
public:
ConnectorTest() {}
void SetUp() override {
CreateMessagePipe(nullptr, &handle0_, &handle1_);
}
void TearDown() override {}
void AllocMessage(const char* text, Message* message) {
size_t payload_size = strlen(text) + 1; // Plus null terminator.
MessageBuilder builder(1, payload_size);
memcpy(builder.buffer()->Allocate(payload_size), text, payload_size);
builder.message()->MoveTo(message);
}
void PumpMessages() { loop_.RunUntilIdle(); }
protected:
ScopedMessagePipeHandle handle0_;
ScopedMessagePipeHandle handle1_;
private:
RunLoop loop_;
MOJO_DISALLOW_COPY_AND_ASSIGN(ConnectorTest);
};
class MessageAccumulator : public MessageReceiver {
public:
MessageAccumulator() {}
bool Accept(Message* message) override {
queue_.Push(message);
return true;
}
bool IsEmpty() const { return queue_.IsEmpty(); }
void Pop(Message* message) { queue_.Pop(message); }
private:
MessageQueue queue_;
MOJO_DISALLOW_COPY_AND_ASSIGN(MessageAccumulator);
};
TEST_F(ConnectorTest, Basic) {
internal::Connector connector0(handle0_.Pass());
internal::Connector connector1(handle1_.Pass());
const char kText[] = "hello world";
Message message;
AllocMessage(kText, &message);
connector0.Accept(&message);
MessageAccumulator accumulator;
connector1.set_incoming_receiver(&accumulator);
PumpMessages();
ASSERT_FALSE(accumulator.IsEmpty());
Message message_received;
accumulator.Pop(&message_received);
EXPECT_EQ(
std::string(kText),
std::string(reinterpret_cast<const char*>(message_received.payload())));
}
TEST_F(ConnectorTest, Basic_Synchronous) {
internal::Connector connector0(handle0_.Pass());
internal::Connector connector1(handle1_.Pass());
const char kText[] = "hello world";
Message message;
AllocMessage(kText, &message);
connector0.Accept(&message);
MessageAccumulator accumulator;
connector1.set_incoming_receiver(&accumulator);
connector1.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
ASSERT_FALSE(accumulator.IsEmpty());
Message message_received;
accumulator.Pop(&message_received);
EXPECT_EQ(
std::string(kText),
std::string(reinterpret_cast<const char*>(message_received.payload())));
}
TEST_F(ConnectorTest, Basic_EarlyIncomingReceiver) {
internal::Connector connector0(handle0_.Pass());
internal::Connector connector1(handle1_.Pass());
MessageAccumulator accumulator;
connector1.set_incoming_receiver(&accumulator);
const char kText[] = "hello world";
Message message;
AllocMessage(kText, &message);
connector0.Accept(&message);
PumpMessages();
ASSERT_FALSE(accumulator.IsEmpty());
Message message_received;
accumulator.Pop(&message_received);
EXPECT_EQ(
std::string(kText),
std::string(reinterpret_cast<const char*>(message_received.payload())));
}
TEST_F(ConnectorTest, Basic_TwoMessages) {
internal::Connector connector0(handle0_.Pass());
internal::Connector connector1(handle1_.Pass());
const char* kText[] = {"hello", "world"};
for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) {
Message message;
AllocMessage(kText[i], &message);
connector0.Accept(&message);
}
MessageAccumulator accumulator;
connector1.set_incoming_receiver(&accumulator);
PumpMessages();
for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) {
ASSERT_FALSE(accumulator.IsEmpty());
Message message_received;
accumulator.Pop(&message_received);
EXPECT_EQ(
std::string(kText[i]),
std::string(reinterpret_cast<const char*>(message_received.payload())));
}
}
TEST_F(ConnectorTest, Basic_TwoMessages_Synchronous) {
internal::Connector connector0(handle0_.Pass());
internal::Connector connector1(handle1_.Pass());
const char* kText[] = {"hello", "world"};
for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) {
Message message;
AllocMessage(kText[i], &message);
connector0.Accept(&message);
}
MessageAccumulator accumulator;
connector1.set_incoming_receiver(&accumulator);
connector1.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
ASSERT_FALSE(accumulator.IsEmpty());
Message message_received;
accumulator.Pop(&message_received);
EXPECT_EQ(
std::string(kText[0]),
std::string(reinterpret_cast<const char*>(message_received.payload())));
ASSERT_TRUE(accumulator.IsEmpty());
}
TEST_F(ConnectorTest, WriteToClosedPipe) {
internal::Connector connector0(handle0_.Pass());
const char kText[] = "hello world";
Message message;
AllocMessage(kText, &message);
// Close the other end of the pipe.
handle1_.reset();
// Not observed yet because we haven't spun the RunLoop yet.
EXPECT_FALSE(connector0.encountered_error());
// Write failures are not reported.
bool ok = connector0.Accept(&message);
EXPECT_TRUE(ok);
// Still not observed.
EXPECT_FALSE(connector0.encountered_error());
// Spin the RunLoop, and then we should start observing the closed pipe.
PumpMessages();
EXPECT_TRUE(connector0.encountered_error());
}
TEST_F(ConnectorTest, MessageWithHandles) {
internal::Connector connector0(handle0_.Pass());
internal::Connector connector1(handle1_.Pass());
const char kText[] = "hello world";
Message message1;
AllocMessage(kText, &message1);
MessagePipe pipe;
message1.mutable_handles()->push_back(pipe.handle0.release());
connector0.Accept(&message1);
// The message should have been transferred, releasing the handles.
EXPECT_TRUE(message1.handles()->empty());
MessageAccumulator accumulator;
connector1.set_incoming_receiver(&accumulator);
PumpMessages();
ASSERT_FALSE(accumulator.IsEmpty());
Message message_received;
accumulator.Pop(&message_received);
EXPECT_EQ(
std::string(kText),
std::string(reinterpret_cast<const char*>(message_received.payload())));
ASSERT_EQ(1U, message_received.handles()->size());
// Now send a message to the transferred handle and confirm it's sent through
// to the orginal pipe.
// TODO(vtl): Do we need a better way of "downcasting" the handle types?
ScopedMessagePipeHandle smph;
smph.reset(MessagePipeHandle(message_received.handles()->front().value()));
message_received.mutable_handles()->front() = Handle();
// |smph| now owns this handle.
internal::Connector connector_received(smph.Pass());
internal::Connector connector_original(pipe.handle1.Pass());
Message message2;
AllocMessage(kText, &message2);
connector_received.Accept(&message2);
connector_original.set_incoming_receiver(&accumulator);
PumpMessages();
ASSERT_FALSE(accumulator.IsEmpty());
accumulator.Pop(&message_received);
EXPECT_EQ(
std::string(kText),
std::string(reinterpret_cast<const char*>(message_received.payload())));
}
TEST_F(ConnectorTest, WaitForIncomingMessageWithError) {
internal::Connector connector0(handle0_.Pass());
// Close the other end of the pipe.
handle1_.reset();
ASSERT_FALSE(connector0.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE));
}
class ConnectorDeletingMessageAccumulator : public MessageAccumulator {
public:
explicit ConnectorDeletingMessageAccumulator(internal::Connector** connector)
: connector_(connector) {}
bool Accept(Message* message) override {
delete *connector_;
*connector_ = 0;
return MessageAccumulator::Accept(message);
}
private:
internal::Connector** connector_;
MOJO_DISALLOW_COPY_AND_ASSIGN(ConnectorDeletingMessageAccumulator);
};
TEST_F(ConnectorTest, WaitForIncomingMessageWithDeletion) {
internal::Connector connector0(handle0_.Pass());
internal::Connector* connector1 = new internal::Connector(handle1_.Pass());
const char kText[] = "hello world";
Message message;
AllocMessage(kText, &message);
connector0.Accept(&message);
ConnectorDeletingMessageAccumulator accumulator(&connector1);
connector1->set_incoming_receiver(&accumulator);
connector1->WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
ASSERT_FALSE(connector1);
ASSERT_FALSE(accumulator.IsEmpty());
Message message_received;
accumulator.Pop(&message_received);
EXPECT_EQ(
std::string(kText),
std::string(reinterpret_cast<const char*>(message_received.payload())));
}
class ReentrantMessageAccumulator : public MessageAccumulator {
public:
explicit ReentrantMessageAccumulator(internal::Connector* connector)
: connector_(connector), number_of_calls_(0) {}
bool Accept(Message* message) override {
if (!MessageAccumulator::Accept(message))
return false;
number_of_calls_++;
if (number_of_calls_ == 1) {
return connector_->WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
}
return true;
}
int number_of_calls() { return number_of_calls_; }
private:
internal::Connector* connector_;
int number_of_calls_;
MOJO_DISALLOW_COPY_AND_ASSIGN(ReentrantMessageAccumulator);
};
TEST_F(ConnectorTest, WaitForIncomingMessageWithReentrancy) {
internal::Connector connector0(handle0_.Pass());
internal::Connector connector1(handle1_.Pass());
const char* kText[] = {"hello", "world"};
for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) {
Message message;
AllocMessage(kText[i], &message);
connector0.Accept(&message);
}
ReentrantMessageAccumulator accumulator(&connector1);
connector1.set_incoming_receiver(&accumulator);
PumpMessages();
for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) {
ASSERT_FALSE(accumulator.IsEmpty());
Message message_received;
accumulator.Pop(&message_received);
EXPECT_EQ(
std::string(kText[i]),
std::string(reinterpret_cast<const char*>(message_received.payload())));
}
ASSERT_EQ(2, accumulator.number_of_calls());
}
// This message receiver just accepts messages, and responds (to another fixed
// receiver)
class NoTaskStarvationReplier : public MessageReceiver {
public:
explicit NoTaskStarvationReplier(MessageReceiver* reply_to)
: reply_to_(reply_to) {
MOJO_CHECK(reply_to_ != this);
}
bool Accept(Message* message) override {
num_accepted_++;
uint32_t name = message->name();
if (name >= 10u) {
RunLoop::current()->PostDelayedTask([]() { RunLoop::current()->Quit(); },
0);
}
// We don't necessarily expect the quit task to be processed immediately,
// but if some large number (say, ten thousand-ish) messages have been
// processed, we can say that starvation has occurred.
static const uint32_t kStarvationThreshold = 10000;
EXPECT_LE(name, kStarvationThreshold);
// We'd prefer our test not hang, so don't send the reply in the failing
// case.
if (name > kStarvationThreshold)
return true;
MessageBuilder builder(name + 1u, 0u);
MOJO_CHECK(reply_to_->Accept(builder.message()));
return true;
}
unsigned num_accepted() const { return num_accepted_; }
private:
MessageReceiver* const reply_to_;
unsigned num_accepted_ = 0;
MOJO_DISALLOW_COPY_AND_ASSIGN(NoTaskStarvationReplier);
};
// TODO(vtl): This test currently fails. See the discussion on issue #604
// (https://github.com/domokit/mojo/issues/604).
TEST_F(ConnectorTest, DISABLED_NoTaskStarvation) {
internal::Connector connector0(handle0_.Pass());
internal::Connector connector1(handle1_.Pass());
// The replier will bounce messages to |connector0|, and will receiver
// messages from |connector1|.
NoTaskStarvationReplier replier(&connector0);
connector1.set_incoming_receiver(&replier);
// Kick things off by sending a messagge on |connector0| (starting with a
// "name" of 1).
MessageBuilder builder(1u, 0u);
ASSERT_TRUE(connector0.Accept(builder.message()));
PumpMessages();
EXPECT_GE(replier.num_accepted(), 10u);
}
} // namespace
} // namespace test
} // namespace mojo