| // Copyright 2015 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "blimp/net/blimp_message_output_buffer.h" |
| |
| #include "base/callback_helpers.h" |
| #include "base/logging.h" |
| #include "base/message_loop/message_loop.h" |
| #include "blimp/common/proto/blimp_message.pb.h" |
| #include "blimp/net/test_common.h" |
| #include "net/base/net_errors.h" |
| #include "net/base/test_completion_callback.h" |
| #include "testing/gmock/include/gmock/gmock.h" |
| #include "testing/gtest/include/gtest/gtest.h" |
| |
| using testing::_; |
| using testing::InvokeArgument; |
| using testing::Ref; |
| using testing::Return; |
| using testing::SaveArg; |
| |
| namespace blimp { |
| namespace { |
| |
| class BlimpMessageOutputBufferTest : public testing::Test { |
| public: |
| BlimpMessageOutputBufferTest() {} |
| |
| void SetUp() override { |
| input_msg_.set_type(BlimpMessage::INPUT); |
| input_msg_.set_message_id(1); |
| compositor_msg_.set_type(BlimpMessage::COMPOSITOR); |
| compositor_msg_.set_message_id(2); |
| |
| // Buffer should only have space for two unacknowledged messages |
| // (with message IDs). |
| ASSERT_EQ(input_msg_.ByteSize(), compositor_msg_.ByteSize()); |
| buffer_.reset(new BlimpMessageOutputBuffer(2 * input_msg_.GetCachedSize())); |
| } |
| |
| protected: |
| void AddOutputExpectation(const BlimpMessage& msg) { |
| EXPECT_CALL(output_processor_, MockableProcessMessage(EqualsProto(msg), _)) |
| .WillOnce(SaveArg<1>(&captured_cb_)) |
| .RetiresOnSaturation(); |
| } |
| |
| BlimpMessage WithMessageId(const BlimpMessage& message, int64_t message_id) { |
| BlimpMessage output = message; |
| output.set_message_id(message_id); |
| return output; |
| } |
| |
| BlimpMessage input_msg_; |
| BlimpMessage compositor_msg_; |
| |
| base::MessageLoop message_loop_; |
| net::CompletionCallback captured_cb_; |
| MockBlimpMessageProcessor output_processor_; |
| scoped_ptr<BlimpMessageOutputBuffer> buffer_; |
| testing::InSequence s; |
| }; |
| |
| // Verify batched writes and acknowledgements. |
| TEST_F(BlimpMessageOutputBufferTest, SeparatelyBufferWriteAck) { |
| net::TestCompletionCallback complete_cb_1; |
| net::TestCompletionCallback complete_cb_2; |
| |
| AddOutputExpectation(input_msg_); |
| AddOutputExpectation(compositor_msg_); |
| |
| // Accumulate two messages. |
| buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(input_msg_)), |
| complete_cb_1.callback()); |
| buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(compositor_msg_)), |
| complete_cb_2.callback()); |
| ASSERT_EQ(2, buffer_->GetBufferByteSizeForTest()); |
| |
| // Write two messages. |
| ASSERT_TRUE(captured_cb_.is_null()); |
| buffer_->SetOutputProcessor(&output_processor_); |
| ASSERT_FALSE(captured_cb_.is_null()); |
| base::ResetAndReturn(&captured_cb_).Run(net::OK); |
| ASSERT_EQ(2, buffer_->GetBufferByteSizeForTest()); |
| ASSERT_EQ(1, buffer_->GetUnacknowledgedMessageCountForTest()); |
| base::ResetAndReturn(&captured_cb_).Run(net::OK); |
| ASSERT_EQ(2, buffer_->GetBufferByteSizeForTest()); |
| ASSERT_EQ(2, buffer_->GetUnacknowledgedMessageCountForTest()); |
| |
| // Both messages are acknowledged by separate checkpoints. |
| buffer_->OnMessageCheckpoint(1); |
| ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest()); |
| ASSERT_EQ(1, buffer_->GetUnacknowledgedMessageCountForTest()); |
| EXPECT_EQ(net::OK, complete_cb_1.WaitForResult()); |
| buffer_->OnMessageCheckpoint(2); |
| ASSERT_EQ(0, buffer_->GetBufferByteSizeForTest()); |
| ASSERT_EQ(0, buffer_->GetUnacknowledgedMessageCountForTest()); |
| EXPECT_EQ(net::OK, complete_cb_2.WaitForResult()); |
| } |
| |
| // Verify buffer writes from an empty state. |
| TEST_F(BlimpMessageOutputBufferTest, WritesFromEmptyBuffer) { |
| net::TestCompletionCallback complete_cb_1; |
| net::TestCompletionCallback complete_cb_2; |
| |
| AddOutputExpectation(input_msg_); |
| AddOutputExpectation(compositor_msg_); |
| |
| ASSERT_TRUE(captured_cb_.is_null()); |
| buffer_->SetOutputProcessor(&output_processor_); |
| |
| // Message #0 is buffered, sent, acknowledged. |
| buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(input_msg_)), |
| complete_cb_1.callback()); |
| ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest()); |
| ASSERT_FALSE(captured_cb_.is_null()); |
| base::ResetAndReturn(&captured_cb_).Run(net::OK); |
| ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest()); |
| ASSERT_EQ(1, buffer_->GetUnacknowledgedMessageCountForTest()); |
| buffer_->OnMessageCheckpoint(1); |
| ASSERT_EQ(0, buffer_->GetBufferByteSizeForTest()); |
| ASSERT_EQ(0, buffer_->GetUnacknowledgedMessageCountForTest()); |
| |
| buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(compositor_msg_)), |
| complete_cb_2.callback()); |
| ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest()); |
| ASSERT_FALSE(captured_cb_.is_null()); |
| base::ResetAndReturn(&captured_cb_).Run(net::OK); |
| ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest()); |
| ASSERT_EQ(1, buffer_->GetUnacknowledgedMessageCountForTest()); |
| buffer_->OnMessageCheckpoint(2); |
| ASSERT_EQ(0, buffer_->GetBufferByteSizeForTest()); |
| ASSERT_EQ(0, buffer_->GetUnacknowledgedMessageCountForTest()); |
| } |
| |
| // Verify that a single checkpoint can be used to acknowledge two writes. |
| TEST_F(BlimpMessageOutputBufferTest, SharedCheckpoint) { |
| net::TestCompletionCallback complete_cb_1; |
| net::TestCompletionCallback complete_cb_2; |
| |
| AddOutputExpectation(input_msg_); |
| AddOutputExpectation(compositor_msg_); |
| |
| // Message #1 is written but unacknowledged. |
| buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(input_msg_)), |
| complete_cb_1.callback()); |
| ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest()); |
| ASSERT_TRUE(captured_cb_.is_null()); |
| buffer_->SetOutputProcessor(&output_processor_); |
| ASSERT_FALSE(captured_cb_.is_null()); |
| base::ResetAndReturn(&captured_cb_).Run(net::OK); |
| ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest()); |
| ASSERT_EQ(1, buffer_->GetUnacknowledgedMessageCountForTest()); |
| |
| // Message #2 is written but unacknowledged. |
| buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(compositor_msg_)), |
| complete_cb_2.callback()); |
| ASSERT_EQ(2, buffer_->GetBufferByteSizeForTest()); |
| ASSERT_FALSE(captured_cb_.is_null()); |
| base::ResetAndReturn(&captured_cb_).Run(net::OK); |
| ASSERT_TRUE(captured_cb_.is_null()); |
| ASSERT_EQ(2, buffer_->GetBufferByteSizeForTest()); |
| ASSERT_EQ(2, buffer_->GetUnacknowledgedMessageCountForTest()); |
| |
| // Both messages are acknowledged in one checkpoint. |
| buffer_->OnMessageCheckpoint(2); |
| ASSERT_EQ(0, buffer_->GetBufferByteSizeForTest()); |
| ASSERT_EQ(0, buffer_->GetUnacknowledgedMessageCountForTest()); |
| EXPECT_EQ(net::OK, complete_cb_1.WaitForResult()); |
| EXPECT_EQ(net::OK, complete_cb_2.WaitForResult()); |
| } |
| |
| // Verify that messages that fail to write are kept in a pending write state. |
| TEST_F(BlimpMessageOutputBufferTest, WriteError) { |
| net::TestCompletionCallback complete_cb_1; |
| net::TestCompletionCallback complete_cb_2; |
| |
| AddOutputExpectation(input_msg_); |
| AddOutputExpectation(input_msg_); |
| |
| // Accumulate two messages. |
| buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(input_msg_)), |
| complete_cb_1.callback()); |
| ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest()); |
| |
| // First write attempt, which fails. |
| ASSERT_TRUE(captured_cb_.is_null()); |
| buffer_->SetOutputProcessor(&output_processor_); |
| ASSERT_FALSE(captured_cb_.is_null()); |
| base::ResetAndReturn(&captured_cb_).Run(net::ERR_FAILED); |
| ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest()); |
| ASSERT_EQ(0, buffer_->GetUnacknowledgedMessageCountForTest()); |
| |
| // Simulate disconnect. |
| buffer_->SetOutputProcessor(nullptr); |
| |
| // Reconnect. Should immediately try to write the contents of the buffer. |
| buffer_->SetOutputProcessor(&output_processor_); |
| ASSERT_FALSE(captured_cb_.is_null()); |
| base::ResetAndReturn(&captured_cb_).Run(net::OK); |
| ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest()); |
| ASSERT_EQ(1, buffer_->GetUnacknowledgedMessageCountForTest()); |
| buffer_->OnMessageCheckpoint(1); |
| ASSERT_EQ(0, buffer_->GetBufferByteSizeForTest()); |
| ASSERT_EQ(0, buffer_->GetUnacknowledgedMessageCountForTest()); |
| EXPECT_EQ(net::OK, complete_cb_1.WaitForResult()); |
| } |
| |
| // Verify that unacknowledged messages can be moved back to a pending write |
| // state (recovery after a lost connection.) |
| TEST_F(BlimpMessageOutputBufferTest, MessageRetransmit) { |
| net::TestCompletionCallback complete_cb_1; |
| net::TestCompletionCallback complete_cb_2; |
| |
| AddOutputExpectation(input_msg_); |
| AddOutputExpectation(compositor_msg_); |
| AddOutputExpectation(compositor_msg_); // Retransmitted message. |
| |
| // Accumulate two messages. |
| buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(input_msg_)), |
| complete_cb_1.callback()); |
| buffer_->ProcessMessage(make_scoped_ptr(new BlimpMessage(compositor_msg_)), |
| complete_cb_2.callback()); |
| ASSERT_EQ(2, buffer_->GetBufferByteSizeForTest()); |
| |
| // Write two messages. |
| ASSERT_TRUE(captured_cb_.is_null()); |
| buffer_->SetOutputProcessor(&output_processor_); |
| ASSERT_FALSE(captured_cb_.is_null()); |
| base::ResetAndReturn(&captured_cb_).Run(net::OK); |
| ASSERT_EQ(2, buffer_->GetBufferByteSizeForTest()); |
| ASSERT_EQ(1, buffer_->GetUnacknowledgedMessageCountForTest()); |
| base::ResetAndReturn(&captured_cb_).Run(net::OK); |
| ASSERT_EQ(2, buffer_->GetBufferByteSizeForTest()); |
| ASSERT_EQ(2, buffer_->GetUnacknowledgedMessageCountForTest()); |
| |
| // Simulate disconnect & reconnect. |
| buffer_->SetOutputProcessor(nullptr); |
| buffer_->SetOutputProcessor(&output_processor_); |
| |
| // Remote end indicates that it only received message #0. |
| // Message #1 should be moved from an unacknowledged state to a pending write |
| // state. |
| ASSERT_TRUE(captured_cb_.is_null()); |
| buffer_->OnMessageCheckpoint(1); |
| buffer_->RetransmitBufferedMessages(); |
| ASSERT_FALSE(captured_cb_.is_null()); |
| ASSERT_EQ(1, buffer_->GetBufferByteSizeForTest()); |
| ASSERT_EQ(0, buffer_->GetUnacknowledgedMessageCountForTest()); |
| base::ResetAndReturn(&captured_cb_).Run(net::OK); |
| ASSERT_EQ(1, buffer_->GetUnacknowledgedMessageCountForTest()); |
| |
| // Remote end acknowledges #1, buffer should be empty. |
| buffer_->OnMessageCheckpoint(2); |
| ASSERT_EQ(0, buffer_->GetBufferByteSizeForTest()); |
| ASSERT_EQ(0, buffer_->GetUnacknowledgedMessageCountForTest()); |
| EXPECT_EQ(net::OK, complete_cb_2.WaitForResult()); |
| } |
| |
| } // namespace |
| } // namespace blimp |