| // Copyright 2016 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include <inttypes.h> |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <string.h> |
| |
| #include <map> |
| #include <sstream> |
| #include <utility> |
| |
| #include "base/bind.h" |
| #include "base/callback.h" |
| #include "base/containers/queue.h" |
| #include "base/logging.h" |
| #include "base/memory/ref_counted.h" |
| #include "base/strings/string_piece.h" |
| #include "base/strings/stringprintf.h" |
| #include "base/synchronization/lock.h" |
| #include "base/synchronization/waitable_event.h" |
| #include "base/test/scoped_task_environment.h" |
| #include "base/threading/thread.h" |
| #include "mojo/core/ports/event.h" |
| #include "mojo/core/ports/node.h" |
| #include "mojo/core/ports/node_delegate.h" |
| #include "mojo/core/ports/user_message.h" |
| #include "testing/gtest/include/gtest/gtest.h" |
| |
| namespace mojo { |
| namespace core { |
| namespace ports { |
| namespace test { |
| |
| namespace { |
| |
| // TODO(rockot): Remove this unnecessary alias. |
| using ScopedMessage = std::unique_ptr<UserMessageEvent>; |
| |
| class TestMessage : public UserMessage { |
| public: |
| static const TypeInfo kUserMessageTypeInfo; |
| |
| TestMessage(const base::StringPiece& payload) |
| : UserMessage(&kUserMessageTypeInfo), payload_(payload) {} |
| ~TestMessage() override {} |
| |
| const std::string& payload() const { return payload_; } |
| |
| private: |
| std::string payload_; |
| }; |
| |
| const UserMessage::TypeInfo TestMessage::kUserMessageTypeInfo = {}; |
| |
| ScopedMessage NewUserMessageEvent(const base::StringPiece& payload, |
| size_t num_ports) { |
| auto event = std::make_unique<UserMessageEvent>(num_ports); |
| event->AttachMessage(std::make_unique<TestMessage>(payload)); |
| return event; |
| } |
| |
| bool MessageEquals(const ScopedMessage& message, const base::StringPiece& s) { |
| return message->GetMessage<TestMessage>()->payload() == s; |
| } |
| |
| class TestNode; |
| |
| class MessageRouter { |
| public: |
| virtual ~MessageRouter() {} |
| |
| virtual void ForwardEvent(TestNode* from_node, |
| const NodeName& node_name, |
| ScopedEvent event) = 0; |
| virtual void BroadcastEvent(TestNode* from_node, ScopedEvent event) = 0; |
| }; |
| |
| class TestNode : public NodeDelegate { |
| public: |
| explicit TestNode(uint64_t id) |
| : node_name_(id, 1), |
| node_(node_name_, this), |
| node_thread_(base::StringPrintf("Node %" PRIu64 " thread", id)), |
| events_available_event_( |
| base::WaitableEvent::ResetPolicy::AUTOMATIC, |
| base::WaitableEvent::InitialState::NOT_SIGNALED), |
| idle_event_(base::WaitableEvent::ResetPolicy::MANUAL, |
| base::WaitableEvent::InitialState::SIGNALED) {} |
| |
| ~TestNode() override { |
| StopWhenIdle(); |
| node_thread_.Stop(); |
| } |
| |
| const NodeName& name() const { return node_name_; } |
| |
| // NOTE: Node is thread-safe. |
| Node& node() { return node_; } |
| |
| base::WaitableEvent& idle_event() { return idle_event_; } |
| |
| bool IsIdle() { |
| base::AutoLock lock(lock_); |
| return started_ && !dispatching_ && |
| (incoming_events_.empty() || (block_on_event_ && blocked_)); |
| } |
| |
| void BlockOnEvent(Event::Type type) { |
| base::AutoLock lock(lock_); |
| blocked_event_type_ = type; |
| block_on_event_ = true; |
| } |
| |
| void Unblock() { |
| base::AutoLock lock(lock_); |
| block_on_event_ = false; |
| events_available_event_.Signal(); |
| } |
| |
| void Start(MessageRouter* router) { |
| router_ = router; |
| node_thread_.Start(); |
| node_thread_.task_runner()->PostTask( |
| FROM_HERE, |
| base::BindOnce(&TestNode::ProcessEvents, base::Unretained(this))); |
| } |
| |
| void StopWhenIdle() { |
| base::AutoLock lock(lock_); |
| should_quit_ = true; |
| events_available_event_.Signal(); |
| } |
| |
| void WakeUp() { events_available_event_.Signal(); } |
| |
| int SendStringMessage(const PortRef& port, const std::string& s) { |
| return node_.SendUserMessage(port, NewUserMessageEvent(s, 0)); |
| } |
| |
| int SendStringMessageWithPort(const PortRef& port, |
| const std::string& s, |
| const PortName& sent_port_name) { |
| auto event = NewUserMessageEvent(s, 1); |
| event->ports()[0] = sent_port_name; |
| return node_.SendUserMessage(port, std::move(event)); |
| } |
| |
| int SendStringMessageWithPort(const PortRef& port, |
| const std::string& s, |
| const PortRef& sent_port) { |
| return SendStringMessageWithPort(port, s, sent_port.name()); |
| } |
| |
| void set_drop_messages(bool value) { |
| base::AutoLock lock(lock_); |
| drop_messages_ = value; |
| } |
| |
| void set_save_messages(bool value) { |
| base::AutoLock lock(lock_); |
| save_messages_ = value; |
| } |
| |
| bool ReadMessage(const PortRef& port, ScopedMessage* message) { |
| return node_.GetMessage(port, message, nullptr) == OK && *message; |
| } |
| |
| bool GetSavedMessage(ScopedMessage* message) { |
| base::AutoLock lock(lock_); |
| if (saved_messages_.empty()) { |
| message->reset(); |
| return false; |
| } |
| std::swap(*message, saved_messages_.front()); |
| saved_messages_.pop(); |
| return true; |
| } |
| |
| void EnqueueEvent(ScopedEvent event) { |
| idle_event_.Reset(); |
| |
| // NOTE: This may be called from ForwardMessage and thus must not reenter |
| // |node_|. |
| base::AutoLock lock(lock_); |
| incoming_events_.emplace(std::move(event)); |
| events_available_event_.Signal(); |
| } |
| |
| void ForwardEvent(const NodeName& node_name, ScopedEvent event) override { |
| { |
| base::AutoLock lock(lock_); |
| if (drop_messages_) { |
| DVLOG(1) << "Dropping ForwardMessage from node " << node_name_ << " to " |
| << node_name; |
| |
| base::AutoUnlock unlock(lock_); |
| ClosePortsInEvent(event.get()); |
| return; |
| } |
| } |
| |
| DCHECK(router_); |
| DVLOG(1) << "ForwardEvent from node " << node_name_ << " to " << node_name; |
| router_->ForwardEvent(this, node_name, std::move(event)); |
| } |
| |
| void BroadcastEvent(ScopedEvent event) override { |
| router_->BroadcastEvent(this, std::move(event)); |
| } |
| |
| void PortStatusChanged(const PortRef& port) override { |
| // The port may be closed, in which case we ignore the notification. |
| base::AutoLock lock(lock_); |
| if (!save_messages_) |
| return; |
| |
| for (;;) { |
| ScopedMessage message; |
| { |
| base::AutoUnlock unlock(lock_); |
| if (!ReadMessage(port, &message)) |
| break; |
| } |
| |
| saved_messages_.emplace(std::move(message)); |
| } |
| } |
| |
| void ClosePortsInEvent(Event* event) { |
| if (event->type() != Event::Type::kUserMessage) |
| return; |
| |
| UserMessageEvent* message_event = static_cast<UserMessageEvent*>(event); |
| for (size_t i = 0; i < message_event->num_ports(); ++i) { |
| PortRef port; |
| ASSERT_EQ(OK, node_.GetPort(message_event->ports()[i], &port)); |
| EXPECT_EQ(OK, node_.ClosePort(port)); |
| } |
| } |
| |
| private: |
| void ProcessEvents() { |
| for (;;) { |
| events_available_event_.Wait(); |
| base::AutoLock lock(lock_); |
| |
| if (should_quit_) |
| return; |
| |
| dispatching_ = true; |
| while (!incoming_events_.empty()) { |
| if (block_on_event_ && |
| incoming_events_.front()->type() == blocked_event_type_) { |
| blocked_ = true; |
| // Go idle if we hit a blocked event type. |
| break; |
| } else { |
| blocked_ = false; |
| } |
| ScopedEvent event = std::move(incoming_events_.front()); |
| incoming_events_.pop(); |
| |
| // NOTE: AcceptMessage() can re-enter this object to call any of the |
| // NodeDelegate interface methods. |
| base::AutoUnlock unlock(lock_); |
| node_.AcceptEvent(std::move(event)); |
| } |
| |
| dispatching_ = false; |
| started_ = true; |
| idle_event_.Signal(); |
| }; |
| } |
| |
| const NodeName node_name_; |
| Node node_; |
| MessageRouter* router_ = nullptr; |
| |
| base::Thread node_thread_; |
| base::WaitableEvent events_available_event_; |
| base::WaitableEvent idle_event_; |
| |
| // Guards fields below. |
| base::Lock lock_; |
| bool started_ = false; |
| bool dispatching_ = false; |
| bool should_quit_ = false; |
| bool drop_messages_ = false; |
| bool save_messages_ = false; |
| bool blocked_ = false; |
| bool block_on_event_ = false; |
| Event::Type blocked_event_type_; |
| base::queue<ScopedEvent> incoming_events_; |
| base::queue<ScopedMessage> saved_messages_; |
| }; |
| |
| class PortsTest : public testing::Test, public MessageRouter { |
| public: |
| void AddNode(TestNode* node) { |
| { |
| base::AutoLock lock(lock_); |
| nodes_[node->name()] = node; |
| } |
| node->Start(this); |
| } |
| |
| void RemoveNode(TestNode* node) { |
| { |
| base::AutoLock lock(lock_); |
| nodes_.erase(node->name()); |
| } |
| |
| for (const auto& entry : nodes_) |
| entry.second->node().LostConnectionToNode(node->name()); |
| } |
| |
| // Waits until all known Nodes are idle. Message forwarding and processing |
| // is handled in such a way that idleness is a stable state: once all nodes in |
| // the system are idle, they will remain idle until the test explicitly |
| // initiates some further event (e.g. sending a message, closing a port, or |
| // removing a Node). |
| void WaitForIdle() { |
| for (;;) { |
| base::AutoLock global_lock(global_lock_); |
| bool all_nodes_idle = true; |
| for (const auto& entry : nodes_) { |
| if (!entry.second->IsIdle()) |
| all_nodes_idle = false; |
| entry.second->WakeUp(); |
| } |
| if (all_nodes_idle) |
| return; |
| |
| // Wait for any Node to signal that it's idle. |
| base::AutoUnlock global_unlock(global_lock_); |
| std::vector<base::WaitableEvent*> events; |
| for (const auto& entry : nodes_) |
| events.push_back(&entry.second->idle_event()); |
| base::WaitableEvent::WaitMany(events.data(), events.size()); |
| } |
| } |
| |
| void CreatePortPair(TestNode* node0, |
| PortRef* port0, |
| TestNode* node1, |
| PortRef* port1) { |
| if (node0 == node1) { |
| EXPECT_EQ(OK, node0->node().CreatePortPair(port0, port1)); |
| } else { |
| EXPECT_EQ(OK, node0->node().CreateUninitializedPort(port0)); |
| EXPECT_EQ(OK, node1->node().CreateUninitializedPort(port1)); |
| EXPECT_EQ(OK, node0->node().InitializePort(*port0, node1->name(), |
| port1->name())); |
| EXPECT_EQ(OK, node1->node().InitializePort(*port1, node0->name(), |
| port0->name())); |
| } |
| } |
| |
| private: |
| // MessageRouter: |
| void ForwardEvent(TestNode* from_node, |
| const NodeName& node_name, |
| ScopedEvent event) override { |
| base::AutoLock global_lock(global_lock_); |
| base::AutoLock lock(lock_); |
| // Drop messages from nodes that have been removed. |
| if (nodes_.find(from_node->name()) == nodes_.end()) { |
| from_node->ClosePortsInEvent(event.get()); |
| return; |
| } |
| |
| auto it = nodes_.find(node_name); |
| if (it == nodes_.end()) { |
| DVLOG(1) << "Node not found: " << node_name; |
| return; |
| } |
| |
| it->second->EnqueueEvent(std::move(event)); |
| } |
| |
| void BroadcastEvent(TestNode* from_node, ScopedEvent event) override { |
| base::AutoLock global_lock(global_lock_); |
| base::AutoLock lock(lock_); |
| |
| // Drop messages from nodes that have been removed. |
| if (nodes_.find(from_node->name()) == nodes_.end()) |
| return; |
| |
| for (const auto& entry : nodes_) { |
| TestNode* node = entry.second; |
| // Broadcast doesn't deliver to the local node. |
| if (node == from_node) |
| continue; |
| node->EnqueueEvent(event->Clone()); |
| } |
| } |
| |
| base::test::ScopedTaskEnvironment scoped_task_environment_; |
| |
| // Acquired before any operation which makes a Node busy, and before testing |
| // if all nodes are idle. |
| base::Lock global_lock_; |
| |
| base::Lock lock_; |
| std::map<NodeName, TestNode*> nodes_; |
| }; |
| |
| } // namespace |
| |
| TEST_F(PortsTest, Basic1) { |
| TestNode node0(0); |
| AddNode(&node0); |
| |
| TestNode node1(1); |
| AddNode(&node1); |
| |
| PortRef x0, x1; |
| CreatePortPair(&node0, &x0, &node1, &x1); |
| |
| PortRef a0, a1; |
| EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); |
| EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1)); |
| EXPECT_EQ(OK, node0.node().ClosePort(a0)); |
| |
| EXPECT_EQ(OK, node0.node().ClosePort(x0)); |
| EXPECT_EQ(OK, node1.node().ClosePort(x1)); |
| |
| WaitForIdle(); |
| |
| EXPECT_TRUE(node0.node().CanShutdownCleanly()); |
| EXPECT_TRUE(node1.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, Basic2) { |
| TestNode node0(0); |
| AddNode(&node0); |
| |
| TestNode node1(1); |
| AddNode(&node1); |
| |
| PortRef x0, x1; |
| CreatePortPair(&node0, &x0, &node1, &x1); |
| |
| PortRef b0, b1; |
| EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1)); |
| EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", b1)); |
| EXPECT_EQ(OK, node0.SendStringMessage(b0, "hello again")); |
| |
| EXPECT_EQ(OK, node0.node().ClosePort(b0)); |
| |
| EXPECT_EQ(OK, node0.node().ClosePort(x0)); |
| EXPECT_EQ(OK, node1.node().ClosePort(x1)); |
| |
| WaitForIdle(); |
| |
| EXPECT_TRUE(node0.node().CanShutdownCleanly()); |
| EXPECT_TRUE(node1.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, Basic3) { |
| TestNode node0(0); |
| AddNode(&node0); |
| |
| TestNode node1(1); |
| AddNode(&node1); |
| |
| PortRef x0, x1; |
| CreatePortPair(&node0, &x0, &node1, &x1); |
| |
| PortRef a0, a1; |
| EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); |
| |
| EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1)); |
| EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello again")); |
| |
| EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a0)); |
| |
| PortRef b0, b1; |
| EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1)); |
| EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "bar", b1)); |
| EXPECT_EQ(OK, node0.SendStringMessage(b0, "baz")); |
| |
| EXPECT_EQ(OK, node0.node().ClosePort(b0)); |
| |
| EXPECT_EQ(OK, node0.node().ClosePort(x0)); |
| EXPECT_EQ(OK, node1.node().ClosePort(x1)); |
| |
| WaitForIdle(); |
| |
| EXPECT_TRUE(node0.node().CanShutdownCleanly()); |
| EXPECT_TRUE(node1.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, LostConnectionToNode1) { |
| TestNode node0(0); |
| AddNode(&node0); |
| |
| TestNode node1(1); |
| AddNode(&node1); |
| node1.set_drop_messages(true); |
| |
| PortRef x0, x1; |
| CreatePortPair(&node0, &x0, &node1, &x1); |
| |
| // Transfer a port to node1 and simulate a lost connection to node1. |
| |
| PortRef a0, a1; |
| EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); |
| EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1)); |
| |
| WaitForIdle(); |
| |
| RemoveNode(&node1); |
| |
| WaitForIdle(); |
| |
| EXPECT_EQ(OK, node0.node().ClosePort(a0)); |
| EXPECT_EQ(OK, node0.node().ClosePort(x0)); |
| EXPECT_EQ(OK, node1.node().ClosePort(x1)); |
| |
| WaitForIdle(); |
| |
| EXPECT_TRUE(node0.node().CanShutdownCleanly()); |
| EXPECT_TRUE(node1.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, LostConnectionToNode2) { |
| TestNode node0(0); |
| AddNode(&node0); |
| |
| TestNode node1(1); |
| AddNode(&node1); |
| |
| PortRef x0, x1; |
| CreatePortPair(&node0, &x0, &node1, &x1); |
| |
| PortRef a0, a1; |
| EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); |
| EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "take a1", a1)); |
| |
| WaitForIdle(); |
| |
| node1.set_drop_messages(true); |
| |
| RemoveNode(&node1); |
| |
| WaitForIdle(); |
| |
| // a0 should have eventually detected peer closure after node loss. |
| ScopedMessage message; |
| EXPECT_EQ(ERROR_PORT_PEER_CLOSED, |
| node0.node().GetMessage(a0, &message, nullptr)); |
| EXPECT_FALSE(message); |
| |
| EXPECT_EQ(OK, node0.node().ClosePort(a0)); |
| |
| EXPECT_EQ(OK, node0.node().ClosePort(x0)); |
| |
| EXPECT_EQ(OK, node1.node().GetMessage(x1, &message, nullptr)); |
| EXPECT_TRUE(message); |
| node1.ClosePortsInEvent(message.get()); |
| |
| EXPECT_EQ(OK, node1.node().ClosePort(x1)); |
| |
| WaitForIdle(); |
| |
| EXPECT_TRUE(node0.node().CanShutdownCleanly()); |
| EXPECT_TRUE(node1.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) { |
| // Tests that a proxy gets cleaned up when its indirect peer lives on a lost |
| // node. |
| |
| TestNode node0(0); |
| AddNode(&node0); |
| |
| TestNode node1(1); |
| AddNode(&node1); |
| |
| TestNode node2(2); |
| AddNode(&node2); |
| |
| // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2. |
| PortRef A, B, C, D; |
| CreatePortPair(&node0, &A, &node1, &B); |
| CreatePortPair(&node1, &C, &node2, &D); |
| |
| // Create E-F and send F over A to node 1. |
| PortRef E, F; |
| EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F)); |
| EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F)); |
| |
| WaitForIdle(); |
| |
| ScopedMessage message; |
| ASSERT_TRUE(node1.ReadMessage(B, &message)); |
| ASSERT_EQ(1u, message->num_ports()); |
| |
| EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F)); |
| |
| // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1 |
| // will trivially become aware of the loss, and this test verifies that the |
| // port A on node 0 will eventually also become aware of it. |
| |
| // Make sure node2 stops processing events when it encounters an ObserveProxy. |
| node2.BlockOnEvent(Event::Type::kObserveProxy); |
| |
| EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F)); |
| WaitForIdle(); |
| |
| // Simulate node 1 and 2 disconnecting. |
| EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name())); |
| |
| // Let node2 continue processing events and wait for everyone to go idle. |
| node2.Unblock(); |
| WaitForIdle(); |
| |
| // Port F should be gone. |
| EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F)); |
| |
| // Port E should have detected peer closure despite the fact that there is |
| // no longer a continuous route from F to E over which the event could travel. |
| PortStatus status; |
| EXPECT_EQ(OK, node0.node().GetStatus(E, &status)); |
| EXPECT_TRUE(status.peer_closed); |
| |
| EXPECT_EQ(OK, node0.node().ClosePort(A)); |
| EXPECT_EQ(OK, node1.node().ClosePort(B)); |
| EXPECT_EQ(OK, node1.node().ClosePort(C)); |
| EXPECT_EQ(OK, node0.node().ClosePort(E)); |
| |
| WaitForIdle(); |
| |
| EXPECT_TRUE(node0.node().CanShutdownCleanly()); |
| EXPECT_TRUE(node1.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) { |
| // Tests that a proxy gets cleaned up when its direct peer lives on a lost |
| // node and it's predecessor lives on the same node. |
| |
| TestNode node0(0); |
| AddNode(&node0); |
| |
| TestNode node1(1); |
| AddNode(&node1); |
| |
| PortRef A, B; |
| CreatePortPair(&node0, &A, &node1, &B); |
| |
| PortRef C, D; |
| EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D)); |
| |
| // Send D but block node0 on an ObserveProxy event. |
| node0.BlockOnEvent(Event::Type::kObserveProxy); |
| EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", D)); |
| |
| // node0 won't collapse the proxy but node1 will receive the message before |
| // going idle. |
| WaitForIdle(); |
| |
| ScopedMessage message; |
| ASSERT_TRUE(node1.ReadMessage(B, &message)); |
| ASSERT_EQ(1u, message->num_ports()); |
| PortRef E; |
| EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &E)); |
| |
| RemoveNode(&node1); |
| |
| node0.Unblock(); |
| WaitForIdle(); |
| |
| // Port C should have detected peer closure. |
| PortStatus status; |
| EXPECT_EQ(OK, node0.node().GetStatus(C, &status)); |
| EXPECT_TRUE(status.peer_closed); |
| |
| EXPECT_EQ(OK, node0.node().ClosePort(A)); |
| EXPECT_EQ(OK, node1.node().ClosePort(B)); |
| EXPECT_EQ(OK, node0.node().ClosePort(C)); |
| EXPECT_EQ(OK, node1.node().ClosePort(E)); |
| |
| EXPECT_TRUE(node0.node().CanShutdownCleanly()); |
| EXPECT_TRUE(node1.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, GetMessage1) { |
| TestNode node(0); |
| AddNode(&node); |
| |
| PortRef a0, a1; |
| EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1)); |
| |
| ScopedMessage message; |
| EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr)); |
| EXPECT_FALSE(message); |
| |
| EXPECT_EQ(OK, node.node().ClosePort(a1)); |
| |
| WaitForIdle(); |
| |
| EXPECT_EQ(ERROR_PORT_PEER_CLOSED, |
| node.node().GetMessage(a0, &message, nullptr)); |
| EXPECT_FALSE(message); |
| |
| EXPECT_EQ(OK, node.node().ClosePort(a0)); |
| |
| WaitForIdle(); |
| |
| EXPECT_TRUE(node.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, GetMessage2) { |
| TestNode node(0); |
| AddNode(&node); |
| |
| PortRef a0, a1; |
| EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1)); |
| |
| EXPECT_EQ(OK, node.SendStringMessage(a1, "1")); |
| |
| ScopedMessage message; |
| EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr)); |
| |
| ASSERT_TRUE(message); |
| EXPECT_TRUE(MessageEquals(message, "1")); |
| |
| EXPECT_EQ(OK, node.node().ClosePort(a0)); |
| EXPECT_EQ(OK, node.node().ClosePort(a1)); |
| |
| EXPECT_TRUE(node.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, GetMessage3) { |
| TestNode node(0); |
| AddNode(&node); |
| |
| PortRef a0, a1; |
| EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1)); |
| |
| const char* kStrings[] = {"1", "2", "3"}; |
| |
| for (size_t i = 0; i < sizeof(kStrings) / sizeof(kStrings[0]); ++i) |
| EXPECT_EQ(OK, node.SendStringMessage(a1, kStrings[i])); |
| |
| ScopedMessage message; |
| for (size_t i = 0; i < sizeof(kStrings) / sizeof(kStrings[0]); ++i) { |
| EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr)); |
| ASSERT_TRUE(message); |
| EXPECT_TRUE(MessageEquals(message, kStrings[i])); |
| } |
| |
| EXPECT_EQ(OK, node.node().ClosePort(a0)); |
| EXPECT_EQ(OK, node.node().ClosePort(a1)); |
| |
| EXPECT_TRUE(node.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, Delegation1) { |
| TestNode node0(0); |
| AddNode(&node0); |
| |
| TestNode node1(1); |
| AddNode(&node1); |
| |
| PortRef x0, x1; |
| CreatePortPair(&node0, &x0, &node1, &x1); |
| |
| // In this test, we send a message to a port that has been moved. |
| |
| PortRef a0, a1; |
| EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); |
| EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "a1", a1)); |
| WaitForIdle(); |
| |
| ScopedMessage message; |
| ASSERT_TRUE(node1.ReadMessage(x1, &message)); |
| ASSERT_EQ(1u, message->num_ports()); |
| EXPECT_TRUE(MessageEquals(message, "a1")); |
| |
| // This is "a1" from the point of view of node1. |
| PortName a2_name = message->ports()[0]; |
| EXPECT_EQ(OK, node1.SendStringMessageWithPort(x1, "a2", a2_name)); |
| EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello")); |
| |
| WaitForIdle(); |
| |
| ASSERT_TRUE(node0.ReadMessage(x0, &message)); |
| ASSERT_EQ(1u, message->num_ports()); |
| EXPECT_TRUE(MessageEquals(message, "a2")); |
| |
| // This is "a2" from the point of view of node1. |
| PortName a3_name = message->ports()[0]; |
| |
| PortRef a3; |
| EXPECT_EQ(OK, node0.node().GetPort(a3_name, &a3)); |
| |
| ASSERT_TRUE(node0.ReadMessage(a3, &message)); |
| EXPECT_EQ(0u, message->num_ports()); |
| EXPECT_TRUE(MessageEquals(message, "hello")); |
| |
| EXPECT_EQ(OK, node0.node().ClosePort(a0)); |
| EXPECT_EQ(OK, node0.node().ClosePort(a3)); |
| |
| EXPECT_EQ(OK, node0.node().ClosePort(x0)); |
| EXPECT_EQ(OK, node1.node().ClosePort(x1)); |
| |
| EXPECT_TRUE(node0.node().CanShutdownCleanly()); |
| EXPECT_TRUE(node1.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, Delegation2) { |
| TestNode node0(0); |
| AddNode(&node0); |
| |
| TestNode node1(1); |
| AddNode(&node1); |
| |
| for (int i = 0; i < 100; ++i) { |
| // Setup pipe a<->b between node0 and node1. |
| PortRef A, B; |
| CreatePortPair(&node0, &A, &node1, &B); |
| |
| PortRef C, D; |
| EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D)); |
| |
| PortRef E, F; |
| EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F)); |
| |
| node1.set_save_messages(true); |
| |
| // Pass D over A to B. |
| EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, "1", D)); |
| |
| // Pass F over C to D. |
| EXPECT_EQ(OK, node0.SendStringMessageWithPort(C, "1", F)); |
| |
| // This message should find its way to node1. |
| EXPECT_EQ(OK, node0.SendStringMessage(E, "hello")); |
| |
| WaitForIdle(); |
| |
| EXPECT_EQ(OK, node0.node().ClosePort(C)); |
| EXPECT_EQ(OK, node0.node().ClosePort(E)); |
| |
| EXPECT_EQ(OK, node0.node().ClosePort(A)); |
| EXPECT_EQ(OK, node1.node().ClosePort(B)); |
| |
| bool got_hello = false; |
| ScopedMessage message; |
| while (node1.GetSavedMessage(&message)) { |
| node1.ClosePortsInEvent(message.get()); |
| if (MessageEquals(message, "hello")) { |
| got_hello = true; |
| break; |
| } |
| } |
| |
| EXPECT_TRUE(got_hello); |
| |
| WaitForIdle(); // Because closing ports may have generated tasks. |
| } |
| |
| EXPECT_TRUE(node0.node().CanShutdownCleanly()); |
| EXPECT_TRUE(node1.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, SendUninitialized) { |
| TestNode node(0); |
| AddNode(&node); |
| |
| PortRef x0; |
| EXPECT_EQ(OK, node.node().CreateUninitializedPort(&x0)); |
| EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, node.SendStringMessage(x0, "oops")); |
| EXPECT_EQ(OK, node.node().ClosePort(x0)); |
| EXPECT_TRUE(node.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, SendFailure) { |
| TestNode node(0); |
| AddNode(&node); |
| |
| node.set_save_messages(true); |
| |
| PortRef A, B; |
| EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); |
| |
| // Try to send A over itself. |
| |
| EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF, |
| node.SendStringMessageWithPort(A, "oops", A)); |
| |
| // Try to send B over A. |
| |
| EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER, |
| node.SendStringMessageWithPort(A, "nope", B)); |
| |
| // B should be closed immediately. |
| EXPECT_EQ(ERROR_PORT_UNKNOWN, node.node().GetPort(B.name(), &B)); |
| |
| WaitForIdle(); |
| |
| // There should have been no messages accepted. |
| ScopedMessage message; |
| EXPECT_FALSE(node.GetSavedMessage(&message)); |
| |
| EXPECT_EQ(OK, node.node().ClosePort(A)); |
| |
| WaitForIdle(); |
| |
| EXPECT_TRUE(node.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, DontLeakUnreceivedPorts) { |
| TestNode node(0); |
| AddNode(&node); |
| |
| PortRef A, B, C, D; |
| EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); |
| EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D)); |
| |
| EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D)); |
| |
| EXPECT_EQ(OK, node.node().ClosePort(C)); |
| EXPECT_EQ(OK, node.node().ClosePort(A)); |
| EXPECT_EQ(OK, node.node().ClosePort(B)); |
| |
| WaitForIdle(); |
| |
| EXPECT_TRUE(node.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) { |
| TestNode node(0); |
| AddNode(&node); |
| |
| PortRef A, B, C, D; |
| EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); |
| EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D)); |
| |
| EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D)); |
| |
| ScopedMessage message; |
| EXPECT_TRUE(node.ReadMessage(B, &message)); |
| ASSERT_EQ(1u, message->num_ports()); |
| EXPECT_TRUE(MessageEquals(message, "foo")); |
| PortRef E; |
| ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E)); |
| |
| EXPECT_TRUE( |
| node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); |
| |
| WaitForIdle(); |
| |
| EXPECT_TRUE( |
| node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); |
| EXPECT_FALSE(node.node().CanShutdownCleanly()); |
| |
| EXPECT_EQ(OK, node.node().ClosePort(A)); |
| EXPECT_EQ(OK, node.node().ClosePort(B)); |
| EXPECT_EQ(OK, node.node().ClosePort(C)); |
| EXPECT_EQ(OK, node.node().ClosePort(E)); |
| |
| WaitForIdle(); |
| |
| EXPECT_TRUE(node.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, ProxyCollapse1) { |
| TestNode node(0); |
| AddNode(&node); |
| |
| PortRef A, B; |
| EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); |
| |
| PortRef X, Y; |
| EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); |
| |
| ScopedMessage message; |
| |
| // Send B and receive it as C. |
| EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); |
| ASSERT_TRUE(node.ReadMessage(Y, &message)); |
| ASSERT_EQ(1u, message->num_ports()); |
| PortRef C; |
| ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C)); |
| |
| // Send C and receive it as D. |
| EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C)); |
| ASSERT_TRUE(node.ReadMessage(Y, &message)); |
| ASSERT_EQ(1u, message->num_ports()); |
| PortRef D; |
| ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D)); |
| |
| // Send D and receive it as E. |
| EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", D)); |
| ASSERT_TRUE(node.ReadMessage(Y, &message)); |
| ASSERT_EQ(1u, message->num_ports()); |
| PortRef E; |
| ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E)); |
| |
| EXPECT_EQ(OK, node.node().ClosePort(X)); |
| EXPECT_EQ(OK, node.node().ClosePort(Y)); |
| |
| EXPECT_EQ(OK, node.node().ClosePort(A)); |
| EXPECT_EQ(OK, node.node().ClosePort(E)); |
| |
| // The node should not idle until all proxies are collapsed. |
| WaitForIdle(); |
| |
| EXPECT_TRUE(node.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, ProxyCollapse2) { |
| TestNode node(0); |
| AddNode(&node); |
| |
| PortRef A, B; |
| EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); |
| |
| PortRef X, Y; |
| EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); |
| |
| ScopedMessage message; |
| |
| // Send B and A to create proxies in each direction. |
| EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); |
| EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A)); |
| |
| EXPECT_EQ(OK, node.node().ClosePort(X)); |
| EXPECT_EQ(OK, node.node().ClosePort(Y)); |
| |
| // At this point we have a scenario with: |
| // |
| // D -> [B] -> C -> [A] |
| // |
| // Ensure that the proxies can collapse. The sent ports will be closed |
| // eventually as a result of Y's closure. |
| |
| WaitForIdle(); |
| |
| EXPECT_TRUE(node.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, SendWithClosedPeer) { |
| // This tests that if a port is sent when its peer is already known to be |
| // closed, the newly created port will be aware of that peer closure, and the |
| // proxy will eventually collapse. |
| |
| TestNode node(0); |
| AddNode(&node); |
| |
| // Send a message from A to B, then close A. |
| PortRef A, B; |
| EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); |
| EXPECT_EQ(OK, node.SendStringMessage(A, "hey")); |
| EXPECT_EQ(OK, node.node().ClosePort(A)); |
| |
| // Now send B over X-Y as new port C. |
| PortRef X, Y; |
| EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); |
| EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); |
| ScopedMessage message; |
| ASSERT_TRUE(node.ReadMessage(Y, &message)); |
| ASSERT_EQ(1u, message->num_ports()); |
| PortRef C; |
| ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C)); |
| |
| EXPECT_EQ(OK, node.node().ClosePort(X)); |
| EXPECT_EQ(OK, node.node().ClosePort(Y)); |
| |
| WaitForIdle(); |
| |
| // C should have received the message originally sent to B, and it should also |
| // be aware of A's closure. |
| |
| ASSERT_TRUE(node.ReadMessage(C, &message)); |
| EXPECT_TRUE(MessageEquals(message, "hey")); |
| |
| PortStatus status; |
| EXPECT_EQ(OK, node.node().GetStatus(C, &status)); |
| EXPECT_FALSE(status.receiving_messages); |
| EXPECT_FALSE(status.has_messages); |
| EXPECT_TRUE(status.peer_closed); |
| |
| node.node().ClosePort(C); |
| |
| WaitForIdle(); |
| |
| EXPECT_TRUE(node.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, SendWithClosedPeerSent) { |
| // This tests that if a port is closed while some number of proxies are still |
| // routing messages (directly or indirectly) to it, that the peer port is |
| // eventually notified of the closure, and the dead-end proxies will |
| // eventually be removed. |
| |
| TestNode node(0); |
| AddNode(&node); |
| |
| PortRef X, Y; |
| EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); |
| |
| PortRef A, B; |
| EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); |
| |
| ScopedMessage message; |
| |
| // Send A as new port C. |
| EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A)); |
| |
| ASSERT_TRUE(node.ReadMessage(Y, &message)); |
| ASSERT_EQ(1u, message->num_ports()); |
| PortRef C; |
| ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C)); |
| |
| // Send C as new port D. |
| EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C)); |
| |
| ASSERT_TRUE(node.ReadMessage(Y, &message)); |
| ASSERT_EQ(1u, message->num_ports()); |
| PortRef D; |
| ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D)); |
| |
| // Send a message to B through D, then close D. |
| EXPECT_EQ(OK, node.SendStringMessage(D, "hey")); |
| EXPECT_EQ(OK, node.node().ClosePort(D)); |
| |
| // Now send B as new port E. |
| |
| EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); |
| EXPECT_EQ(OK, node.node().ClosePort(X)); |
| |
| ASSERT_TRUE(node.ReadMessage(Y, &message)); |
| ASSERT_EQ(1u, message->num_ports()); |
| PortRef E; |
| ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E)); |
| |
| EXPECT_EQ(OK, node.node().ClosePort(Y)); |
| |
| WaitForIdle(); |
| |
| // E should receive the message originally sent to B, and it should also be |
| // aware of D's closure. |
| |
| ASSERT_TRUE(node.ReadMessage(E, &message)); |
| EXPECT_TRUE(MessageEquals(message, "hey")); |
| |
| PortStatus status; |
| EXPECT_EQ(OK, node.node().GetStatus(E, &status)); |
| EXPECT_FALSE(status.receiving_messages); |
| EXPECT_FALSE(status.has_messages); |
| EXPECT_TRUE(status.peer_closed); |
| |
| EXPECT_EQ(OK, node.node().ClosePort(E)); |
| |
| WaitForIdle(); |
| |
| EXPECT_TRUE(node.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, MergePorts) { |
| TestNode node0(0); |
| AddNode(&node0); |
| |
| TestNode node1(1); |
| AddNode(&node1); |
| |
| // Setup two independent port pairs, A-B on node0 and C-D on node1. |
| PortRef A, B, C, D; |
| EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); |
| EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); |
| |
| // Write a message on A. |
| EXPECT_EQ(OK, node0.SendStringMessage(A, "hey")); |
| |
| // Initiate a merge between B and C. |
| EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); |
| |
| WaitForIdle(); |
| |
| // Expect all proxies to be gone once idle. |
| EXPECT_TRUE( |
| node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); |
| EXPECT_TRUE( |
| node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); |
| |
| // Expect D to have received the message sent on A. |
| ScopedMessage message; |
| ASSERT_TRUE(node1.ReadMessage(D, &message)); |
| EXPECT_TRUE(MessageEquals(message, "hey")); |
| |
| EXPECT_EQ(OK, node0.node().ClosePort(A)); |
| EXPECT_EQ(OK, node1.node().ClosePort(D)); |
| |
| // No more ports should be open. |
| EXPECT_TRUE(node0.node().CanShutdownCleanly()); |
| EXPECT_TRUE(node1.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, MergePortWithClosedPeer1) { |
| // This tests that the right thing happens when initiating a merge on a port |
| // whose peer has already been closed. |
| |
| TestNode node0(0); |
| AddNode(&node0); |
| |
| TestNode node1(1); |
| AddNode(&node1); |
| |
| // Setup two independent port pairs, A-B on node0 and C-D on node1. |
| PortRef A, B, C, D; |
| EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); |
| EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); |
| |
| // Write a message on A. |
| EXPECT_EQ(OK, node0.SendStringMessage(A, "hey")); |
| |
| // Close A. |
| EXPECT_EQ(OK, node0.node().ClosePort(A)); |
| |
| // Initiate a merge between B and C. |
| EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); |
| |
| WaitForIdle(); |
| |
| // Expect all proxies to be gone once idle. node0 should have no ports since |
| // A was explicitly closed. |
| EXPECT_TRUE(node0.node().CanShutdownCleanly()); |
| EXPECT_TRUE( |
| node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); |
| |
| // Expect D to have received the message sent on A. |
| ScopedMessage message; |
| ASSERT_TRUE(node1.ReadMessage(D, &message)); |
| EXPECT_TRUE(MessageEquals(message, "hey")); |
| |
| EXPECT_EQ(OK, node1.node().ClosePort(D)); |
| |
| // No more ports should be open. |
| EXPECT_TRUE(node0.node().CanShutdownCleanly()); |
| EXPECT_TRUE(node1.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, MergePortWithClosedPeer2) { |
| // This tests that the right thing happens when merging into a port whose peer |
| // has already been closed. |
| |
| TestNode node0(0); |
| AddNode(&node0); |
| |
| TestNode node1(1); |
| AddNode(&node1); |
| |
| // Setup two independent port pairs, A-B on node0 and C-D on node1. |
| PortRef A, B, C, D; |
| EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); |
| EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); |
| |
| // Write a message on D and close it. |
| EXPECT_EQ(OK, node0.SendStringMessage(D, "hey")); |
| EXPECT_EQ(OK, node1.node().ClosePort(D)); |
| |
| // Initiate a merge between B and C. |
| EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); |
| |
| WaitForIdle(); |
| |
| // Expect all proxies to be gone once idle. node1 should have no ports since |
| // D was explicitly closed. |
| EXPECT_TRUE( |
| node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); |
| EXPECT_TRUE(node1.node().CanShutdownCleanly()); |
| |
| // Expect A to have received the message sent on D. |
| ScopedMessage message; |
| ASSERT_TRUE(node0.ReadMessage(A, &message)); |
| EXPECT_TRUE(MessageEquals(message, "hey")); |
| |
| EXPECT_EQ(OK, node0.node().ClosePort(A)); |
| |
| // No more ports should be open. |
| EXPECT_TRUE(node0.node().CanShutdownCleanly()); |
| EXPECT_TRUE(node1.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, MergePortsWithClosedPeers) { |
| // This tests that no residual ports are left behind if two ports are merged |
| // when both of their peers have been closed. |
| |
| TestNode node0(0); |
| AddNode(&node0); |
| |
| TestNode node1(1); |
| AddNode(&node1); |
| |
| // Setup two independent port pairs, A-B on node0 and C-D on node1. |
| PortRef A, B, C, D; |
| EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); |
| EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); |
| |
| // Close A and D. |
| EXPECT_EQ(OK, node0.node().ClosePort(A)); |
| EXPECT_EQ(OK, node1.node().ClosePort(D)); |
| |
| WaitForIdle(); |
| |
| // Initiate a merge between B and C. |
| EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); |
| |
| WaitForIdle(); |
| |
| // Expect everything to have gone away. |
| EXPECT_TRUE(node0.node().CanShutdownCleanly()); |
| EXPECT_TRUE(node1.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, MergePortsWithMovedPeers) { |
| // This tests that ports can be merged successfully even if their peers are |
| // moved around. |
| |
| TestNode node0(0); |
| AddNode(&node0); |
| |
| TestNode node1(1); |
| AddNode(&node1); |
| |
| // Setup two independent port pairs, A-B on node0 and C-D on node1. |
| PortRef A, B, C, D; |
| EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); |
| EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); |
| |
| // Set up another pair X-Y for moving ports on node0. |
| PortRef X, Y; |
| EXPECT_EQ(OK, node0.node().CreatePortPair(&X, &Y)); |
| |
| ScopedMessage message; |
| |
| // Move A to new port E. |
| EXPECT_EQ(OK, node0.SendStringMessageWithPort(X, "foo", A)); |
| ASSERT_TRUE(node0.ReadMessage(Y, &message)); |
| ASSERT_EQ(1u, message->num_ports()); |
| PortRef E; |
| ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E)); |
| |
| EXPECT_EQ(OK, node0.node().ClosePort(X)); |
| EXPECT_EQ(OK, node0.node().ClosePort(Y)); |
| |
| // Write messages on E and D. |
| EXPECT_EQ(OK, node0.SendStringMessage(E, "hey")); |
| EXPECT_EQ(OK, node1.SendStringMessage(D, "hi")); |
| |
| // Initiate a merge between B and C. |
| EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); |
| |
| WaitForIdle(); |
| |
| // Expect to receive D's message on E and E's message on D. |
| ASSERT_TRUE(node0.ReadMessage(E, &message)); |
| EXPECT_TRUE(MessageEquals(message, "hi")); |
| ASSERT_TRUE(node1.ReadMessage(D, &message)); |
| EXPECT_TRUE(MessageEquals(message, "hey")); |
| |
| // Close E and D. |
| EXPECT_EQ(OK, node0.node().ClosePort(E)); |
| EXPECT_EQ(OK, node1.node().ClosePort(D)); |
| |
| WaitForIdle(); |
| |
| // Expect everything to have gone away. |
| EXPECT_TRUE(node0.node().CanShutdownCleanly()); |
| EXPECT_TRUE(node1.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, MergePortsFailsGracefully) { |
| // This tests that the system remains in a well-defined state if something |
| // goes wrong during port merge. |
| |
| TestNode node0(0); |
| AddNode(&node0); |
| |
| TestNode node1(1); |
| AddNode(&node1); |
| |
| // Setup two independent port pairs, A-B on node0 and C-D on node1. |
| PortRef A, B, C, D; |
| EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); |
| EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); |
| |
| ScopedMessage message; |
| PortRef X, Y; |
| EXPECT_EQ(OK, node0.node().CreateUninitializedPort(&X)); |
| EXPECT_EQ(OK, node1.node().CreateUninitializedPort(&Y)); |
| EXPECT_EQ(OK, node0.node().InitializePort(X, node1.name(), Y.name())); |
| EXPECT_EQ(OK, node1.node().InitializePort(Y, node0.name(), X.name())); |
| |
| // Block the merge from proceeding until we can do something stupid with port |
| // C. This avoids the test logic racing with async merge logic. |
| node1.BlockOnEvent(Event::Type::kMergePort); |
| |
| // Initiate the merge between B and C. |
| EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); |
| |
| // Move C to a new port E. This is not a sane use of Node's public API but |
| // is still hypothetically possible. It allows us to force a merge failure |
| // because C will be in an invalid state by the time the merge is processed. |
| // As a result, B should be closed. |
| EXPECT_EQ(OK, node1.SendStringMessageWithPort(Y, "foo", C)); |
| |
| node1.Unblock(); |
| |
| WaitForIdle(); |
| |
| ASSERT_TRUE(node0.ReadMessage(X, &message)); |
| ASSERT_EQ(1u, message->num_ports()); |
| PortRef E; |
| ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E)); |
| |
| EXPECT_EQ(OK, node0.node().ClosePort(X)); |
| EXPECT_EQ(OK, node1.node().ClosePort(Y)); |
| |
| WaitForIdle(); |
| |
| // C goes away as a result of normal proxy removal. B should have been closed |
| // cleanly by the failed MergePorts. |
| EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(C.name(), &C)); |
| EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.node().GetPort(B.name(), &B)); |
| |
| // Close A, D, and E. |
| EXPECT_EQ(OK, node0.node().ClosePort(A)); |
| EXPECT_EQ(OK, node1.node().ClosePort(D)); |
| EXPECT_EQ(OK, node0.node().ClosePort(E)); |
| |
| WaitForIdle(); |
| |
| // Expect everything to have gone away. |
| EXPECT_TRUE(node0.node().CanShutdownCleanly()); |
| EXPECT_TRUE(node1.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, RemotePeerStatus) { |
| TestNode node0(0); |
| AddNode(&node0); |
| |
| TestNode node1(1); |
| AddNode(&node1); |
| |
| // Create a local port pair. Neither port should appear to have a remote peer. |
| PortRef a, b; |
| PortStatus status; |
| node0.node().CreatePortPair(&a, &b); |
| ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); |
| EXPECT_FALSE(status.peer_remote); |
| ASSERT_EQ(OK, node0.node().GetStatus(b, &status)); |
| EXPECT_FALSE(status.peer_remote); |
| |
| // Create a port pair spanning the two nodes. Both spanning ports should |
| // immediately appear to have a remote peer. |
| PortRef x0, x1; |
| CreatePortPair(&node0, &x0, &node1, &x1); |
| |
| ASSERT_EQ(OK, node0.node().GetStatus(x0, &status)); |
| EXPECT_TRUE(status.peer_remote); |
| ASSERT_EQ(OK, node1.node().GetStatus(x1, &status)); |
| EXPECT_TRUE(status.peer_remote); |
| |
| PortRef x2, x3; |
| CreatePortPair(&node0, &x2, &node1, &x3); |
| |
| // Transfer |b| to |node1| and |x1| to |node0|. i.e., make the local peers |
| // remote and the remote peers local. |
| EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", b)); |
| EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", x1)); |
| WaitForIdle(); |
| |
| ScopedMessage message; |
| ASSERT_TRUE(node0.ReadMessage(x2, &message)); |
| ASSERT_EQ(1u, message->num_ports()); |
| ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &x1)); |
| |
| ASSERT_TRUE(node1.ReadMessage(x3, &message)); |
| ASSERT_EQ(1u, message->num_ports()); |
| ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &b)); |
| |
| // Now x0-x1 should be local to node0 and a-b should span the nodes. |
| ASSERT_EQ(OK, node0.node().GetStatus(x0, &status)); |
| EXPECT_FALSE(status.peer_remote); |
| ASSERT_EQ(OK, node0.node().GetStatus(x1, &status)); |
| EXPECT_FALSE(status.peer_remote); |
| ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); |
| EXPECT_TRUE(status.peer_remote); |
| ASSERT_EQ(OK, node1.node().GetStatus(b, &status)); |
| EXPECT_TRUE(status.peer_remote); |
| |
| // And swap them back one more time. |
| EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", x1)); |
| EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", b)); |
| WaitForIdle(); |
| |
| ASSERT_TRUE(node0.ReadMessage(x2, &message)); |
| ASSERT_EQ(1u, message->num_ports()); |
| ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &b)); |
| |
| ASSERT_TRUE(node1.ReadMessage(x3, &message)); |
| ASSERT_EQ(1u, message->num_ports()); |
| ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &x1)); |
| |
| ASSERT_EQ(OK, node0.node().GetStatus(x0, &status)); |
| EXPECT_TRUE(status.peer_remote); |
| ASSERT_EQ(OK, node1.node().GetStatus(x1, &status)); |
| EXPECT_TRUE(status.peer_remote); |
| ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); |
| EXPECT_FALSE(status.peer_remote); |
| ASSERT_EQ(OK, node0.node().GetStatus(b, &status)); |
| EXPECT_FALSE(status.peer_remote); |
| |
| EXPECT_EQ(OK, node0.node().ClosePort(x0)); |
| EXPECT_EQ(OK, node1.node().ClosePort(x1)); |
| EXPECT_EQ(OK, node0.node().ClosePort(x2)); |
| EXPECT_EQ(OK, node1.node().ClosePort(x3)); |
| EXPECT_EQ(OK, node0.node().ClosePort(a)); |
| EXPECT_EQ(OK, node0.node().ClosePort(b)); |
| |
| EXPECT_TRUE(node0.node().CanShutdownCleanly()); |
| EXPECT_TRUE(node1.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, RemotePeerStatusAfterLocalPortMerge) { |
| TestNode node0(0); |
| AddNode(&node0); |
| |
| TestNode node1(1); |
| AddNode(&node1); |
| |
| // Set up a-b on node0 and c-d spanning node0-node1. |
| PortRef a, b, c, d; |
| node0.node().CreatePortPair(&a, &b); |
| CreatePortPair(&node0, &c, &node1, &d); |
| |
| PortStatus status; |
| ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); |
| EXPECT_FALSE(status.peer_remote); |
| ASSERT_EQ(OK, node0.node().GetStatus(b, &status)); |
| EXPECT_FALSE(status.peer_remote); |
| ASSERT_EQ(OK, node0.node().GetStatus(c, &status)); |
| EXPECT_TRUE(status.peer_remote); |
| ASSERT_EQ(OK, node1.node().GetStatus(d, &status)); |
| EXPECT_TRUE(status.peer_remote); |
| |
| EXPECT_EQ(OK, node0.node().MergeLocalPorts(b, c)); |
| WaitForIdle(); |
| |
| ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); |
| EXPECT_TRUE(status.peer_remote); |
| ASSERT_EQ(OK, node1.node().GetStatus(d, &status)); |
| EXPECT_TRUE(status.peer_remote); |
| |
| EXPECT_EQ(OK, node0.node().ClosePort(a)); |
| EXPECT_EQ(OK, node1.node().ClosePort(d)); |
| EXPECT_TRUE(node0.node().CanShutdownCleanly()); |
| EXPECT_TRUE(node1.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, RemotePeerStatusAfterRemotePortMerge) { |
| TestNode node0(0); |
| AddNode(&node0); |
| |
| TestNode node1(1); |
| AddNode(&node1); |
| |
| // Set up a-b on node0 and c-d on node1. |
| PortRef a, b, c, d; |
| node0.node().CreatePortPair(&a, &b); |
| node1.node().CreatePortPair(&c, &d); |
| |
| PortStatus status; |
| ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); |
| EXPECT_FALSE(status.peer_remote); |
| ASSERT_EQ(OK, node0.node().GetStatus(b, &status)); |
| EXPECT_FALSE(status.peer_remote); |
| ASSERT_EQ(OK, node1.node().GetStatus(c, &status)); |
| EXPECT_FALSE(status.peer_remote); |
| ASSERT_EQ(OK, node1.node().GetStatus(d, &status)); |
| EXPECT_FALSE(status.peer_remote); |
| |
| EXPECT_EQ(OK, node0.node().MergePorts(b, node1.name(), c.name())); |
| WaitForIdle(); |
| |
| ASSERT_EQ(OK, node0.node().GetStatus(a, &status)); |
| EXPECT_TRUE(status.peer_remote); |
| ASSERT_EQ(OK, node1.node().GetStatus(d, &status)); |
| EXPECT_TRUE(status.peer_remote); |
| |
| EXPECT_EQ(OK, node0.node().ClosePort(a)); |
| EXPECT_EQ(OK, node1.node().ClosePort(d)); |
| EXPECT_TRUE(node0.node().CanShutdownCleanly()); |
| EXPECT_TRUE(node1.node().CanShutdownCleanly()); |
| } |
| |
| TEST_F(PortsTest, RetransmitUserMessageEvents) { |
| // Ensures that user message events can be retransmitted properly. |
| TestNode node0(0); |
| AddNode(&node0); |
| |
| PortRef a, b; |
| node0.node().CreatePortPair(&a, &b); |
| |
| // Ping. |
| const char* kMessage = "hey"; |
| ScopedMessage message; |
| EXPECT_EQ(OK, node0.SendStringMessage(a, kMessage)); |
| ASSERT_TRUE(node0.ReadMessage(b, &message)); |
| EXPECT_TRUE(MessageEquals(message, kMessage)); |
| |
| // Pong. |
| EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message))); |
| EXPECT_FALSE(message); |
| ASSERT_TRUE(node0.ReadMessage(a, &message)); |
| EXPECT_TRUE(MessageEquals(message, kMessage)); |
| |
| // Ping again. |
| EXPECT_EQ(OK, node0.node().SendUserMessage(a, std::move(message))); |
| EXPECT_FALSE(message); |
| ASSERT_TRUE(node0.ReadMessage(b, &message)); |
| EXPECT_TRUE(MessageEquals(message, kMessage)); |
| |
| // Pong again! |
| EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message))); |
| EXPECT_FALSE(message); |
| ASSERT_TRUE(node0.ReadMessage(a, &message)); |
| EXPECT_TRUE(MessageEquals(message, kMessage)); |
| |
| EXPECT_EQ(OK, node0.node().ClosePort(a)); |
| EXPECT_EQ(OK, node0.node().ClosePort(b)); |
| } |
| |
| } // namespace test |
| } // namespace ports |
| } // namespace core |
| } // namespace mojo |