blob: 659c8848ad8836a61641fa11b8000a4d816c210a [file] [log] [blame]
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "remoting/signaling/ftl_message_reception_channel.h"
#include <algorithm>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "base/bind.h"
#include "base/logging.h"
#include "base/memory/weak_ptr.h"
#include "base/run_loop.h"
#include "base/test/bind_test_util.h"
#include "base/test/mock_callback.h"
#include "base/test/scoped_task_environment.h"
#include "remoting/signaling/ftl.pb.h"
#include "remoting/signaling/grpc_support/grpc_test_util.h"
#include "remoting/signaling/grpc_support/scoped_grpc_server_stream.h"
#include "testing/gmock/include/gmock/gmock.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace remoting {
namespace {
using ::testing::_;
using ::testing::Expectation;
using ::testing::Invoke;
using ::testing::Property;
using ::testing::Return;
// Fake stream implementation to allow probing if a stream is closed by client.
class FakeScopedGrpcServerStream : public ScopedGrpcServerStream {
public:
FakeScopedGrpcServerStream()
: ScopedGrpcServerStream(nullptr), weak_factory_(this) {}
~FakeScopedGrpcServerStream() override = default;
base::WeakPtr<FakeScopedGrpcServerStream> GetWeakPtr() {
return weak_factory_.GetWeakPtr();
}
private:
base::WeakPtrFactory<FakeScopedGrpcServerStream> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(FakeScopedGrpcServerStream);
};
std::unique_ptr<FakeScopedGrpcServerStream> CreateFakeServerStream() {
return std::make_unique<FakeScopedGrpcServerStream>();
}
ftl::ReceiveMessagesResponse CreateStartOfBatchResponse() {
ftl::ReceiveMessagesResponse response;
response.mutable_start_of_batch();
return response;
}
base::OnceCallback<void(const grpc::Status& status)> AssertOkCallback() {
return base::BindOnce(
[](const grpc::Status& status) { ASSERT_TRUE(status.ok()); });
}
} // namespace
class FtlMessageReceptionChannelTest : public testing::Test {
public:
void SetUp() override;
void TearDown() override;
protected:
base::TimeDelta GetTimeUntilRetry() const;
int GetRetryFailureCount() const;
base::test::ScopedTaskEnvironment scoped_task_environment_{
base::test::ScopedTaskEnvironment::MainThreadType::MOCK_TIME,
base::test::ScopedTaskEnvironment::NowSource::MAIN_THREAD_MOCK_TIME};
std::unique_ptr<FtlMessageReceptionChannel> channel_;
base::MockCallback<FtlMessageReceptionChannel::StreamOpener>
mock_stream_opener_;
base::MockCallback<base::RepeatingCallback<void(const ftl::InboxMessage&)>>
mock_on_incoming_msg_;
};
void FtlMessageReceptionChannelTest::SetUp() {
channel_ = std::make_unique<FtlMessageReceptionChannel>();
channel_->Initialize(mock_stream_opener_.Get(), mock_on_incoming_msg_.Get());
}
void FtlMessageReceptionChannelTest::TearDown() {
channel_.reset();
scoped_task_environment_.FastForwardUntilNoTasksRemain();
}
base::TimeDelta FtlMessageReceptionChannelTest::GetTimeUntilRetry() const {
return channel_->GetReconnectRetryBackoffEntryForTesting()
.GetTimeUntilRelease();
}
int FtlMessageReceptionChannelTest::GetRetryFailureCount() const {
return channel_->GetReconnectRetryBackoffEntryForTesting().failure_count();
}
TEST_F(FtlMessageReceptionChannelTest,
TestStartReceivingMessages_StoppedImmediately) {
base::RunLoop run_loop;
EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
.WillOnce(Invoke(
[&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
on_stream_started,
const base::RepeatingCallback<void(
const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
std::move(on_stream_started).Run(CreateFakeServerStream());
channel_->StopReceivingMessages();
}));
channel_->StartReceivingMessages(test::CheckStatusThenQuitRunLoopCallback(
FROM_HERE, grpc::StatusCode::CANCELLED, &run_loop));
run_loop.Run();
}
TEST_F(FtlMessageReceptionChannelTest,
TestStartReceivingMessages_NotAuthenticated) {
base::RunLoop run_loop;
EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
.WillOnce(Invoke(
[&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
on_stream_started,
const base::RepeatingCallback<void(
const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
std::move(on_stream_started).Run(CreateFakeServerStream());
std::move(on_channel_closed)
.Run(grpc::Status(grpc::StatusCode::UNAUTHENTICATED, ""));
}));
channel_->StartReceivingMessages(test::CheckStatusThenQuitRunLoopCallback(
FROM_HERE, grpc::StatusCode::UNAUTHENTICATED, &run_loop));
run_loop.Run();
}
TEST_F(FtlMessageReceptionChannelTest,
TestStartReceivingMessages_StreamStarted) {
base::RunLoop run_loop;
EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
.WillOnce(Invoke(
[&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
on_stream_started,
const base::RepeatingCallback<void(
const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
std::move(on_stream_started).Run(CreateFakeServerStream());
on_incoming_msg.Run(CreateStartOfBatchResponse());
}));
channel_->StartReceivingMessages(test::CheckStatusThenQuitRunLoopCallback(
FROM_HERE, grpc::StatusCode::OK, &run_loop));
run_loop.Run();
}
TEST_F(FtlMessageReceptionChannelTest,
TestStartReceivingMessages_RecoverableStreamError) {
base::RunLoop run_loop;
base::WeakPtr<FakeScopedGrpcServerStream> old_stream;
EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
.WillOnce(Invoke(
[&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
on_stream_started,
const base::RepeatingCallback<void(
const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
// The first open stream attempt fails with UNAVAILABLE error.
auto fake_server_stream = CreateFakeServerStream();
old_stream = fake_server_stream->GetWeakPtr();
std::move(on_stream_started).Run(std::move(fake_server_stream));
ASSERT_EQ(0, GetRetryFailureCount());
std::move(on_channel_closed)
.Run(grpc::Status(grpc::StatusCode::UNAVAILABLE, ""));
ASSERT_EQ(1, GetRetryFailureCount());
ASSERT_NEAR(
FtlMessageReceptionChannel::kBackoffInitialDelay.InSecondsF(),
GetTimeUntilRetry().InSecondsF(), 0.5);
// This will make the channel reopen the stream.
scoped_task_environment_.FastForwardBy(GetTimeUntilRetry());
}))
.WillOnce(Invoke(
[&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
on_stream_started,
const base::RepeatingCallback<void(
const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
// Second open stream attempt succeeds.
// Assert old stream closed.
ASSERT_FALSE(old_stream);
// Send a StartOfBatch and verify it resets the failure counter.
std::move(on_stream_started).Run(CreateFakeServerStream());
on_incoming_msg.Run(CreateStartOfBatchResponse());
ASSERT_EQ(0, GetRetryFailureCount());
}));
channel_->StartReceivingMessages(test::CheckStatusThenQuitRunLoopCallback(
FROM_HERE, grpc::StatusCode::OK, &run_loop));
run_loop.Run();
}
TEST_F(FtlMessageReceptionChannelTest,
TestStartReceivingMessages_MultipleCalls) {
base::RunLoop run_loop;
base::MockCallback<FtlMessageReceptionChannel::DoneCallback>
start_receiving_messages_callback;
// Exits the run loop iff the callback is called three times with OK.
EXPECT_CALL(start_receiving_messages_callback,
Run(Property(&grpc::Status::error_code, grpc::StatusCode::OK)))
.WillOnce(Return())
.WillOnce(Return())
.WillOnce(Invoke([&](const grpc::Status& status) { run_loop.Quit(); }));
EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
.WillOnce(Invoke(
[&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
on_stream_started,
const base::RepeatingCallback<void(
const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
std::move(on_stream_started).Run(CreateFakeServerStream());
on_incoming_msg.Run(CreateStartOfBatchResponse());
}));
channel_->StartReceivingMessages(start_receiving_messages_callback.Get());
channel_->StartReceivingMessages(start_receiving_messages_callback.Get());
channel_->StartReceivingMessages(start_receiving_messages_callback.Get());
run_loop.Run();
}
TEST_F(FtlMessageReceptionChannelTest, StreamsTwoMessages) {
base::RunLoop run_loop;
constexpr char kMessage1Id[] = "msg_1";
constexpr char kMessage2Id[] = "msg_2";
ftl::InboxMessage message_1;
message_1.set_message_id(kMessage1Id);
ftl::InboxMessage message_2;
message_2.set_message_id(kMessage2Id);
EXPECT_CALL(mock_on_incoming_msg_,
Run(Property(&ftl::InboxMessage::message_id, kMessage1Id)))
.WillOnce(Return());
EXPECT_CALL(mock_on_incoming_msg_,
Run(Property(&ftl::InboxMessage::message_id, kMessage2Id)))
.WillOnce(Invoke([&](const ftl::InboxMessage&) { run_loop.Quit(); }));
EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
.WillOnce(Invoke(
[&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
on_stream_started,
const base::RepeatingCallback<void(
const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
std::move(on_stream_started).Run(CreateFakeServerStream());
on_incoming_msg.Run(CreateStartOfBatchResponse());
ftl::ReceiveMessagesResponse response;
*response.mutable_inbox_message() = message_1;
on_incoming_msg.Run(response);
response.Clear();
*response.mutable_inbox_message() = message_2;
on_incoming_msg.Run(response);
response.Clear();
std::move(on_channel_closed).Run(grpc::Status::OK);
}));
channel_->StartReceivingMessages(AssertOkCallback());
run_loop.Run();
}
TEST_F(FtlMessageReceptionChannelTest, NoPongWithinTimeout_ResetsStream) {
base::RunLoop run_loop;
base::WeakPtr<FakeScopedGrpcServerStream> old_stream;
EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
.WillOnce(Invoke(
[&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
on_stream_started,
const base::RepeatingCallback<void(
const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
auto fake_server_stream = CreateFakeServerStream();
old_stream = fake_server_stream->GetWeakPtr();
std::move(on_stream_started).Run(std::move(fake_server_stream));
on_incoming_msg.Run(CreateStartOfBatchResponse());
scoped_task_environment_.FastForwardBy(
FtlMessageReceptionChannel::kPongTimeout);
ASSERT_EQ(1, GetRetryFailureCount());
ASSERT_NEAR(
FtlMessageReceptionChannel::kBackoffInitialDelay.InSecondsF(),
GetTimeUntilRetry().InSecondsF(), 0.5);
// This will make the channel reopen the stream.
scoped_task_environment_.FastForwardBy(GetTimeUntilRetry());
}))
.WillOnce(Invoke(
[&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
on_stream_started,
const base::RepeatingCallback<void(
const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
// Stream is reopened.
// Assert old stream closed.
ASSERT_FALSE(old_stream);
std::move(on_stream_started).Run(CreateFakeServerStream());
// Sends a StartOfBatch and verify that it resets the failure
// counter.
on_incoming_msg.Run(CreateStartOfBatchResponse());
ASSERT_EQ(0, GetRetryFailureCount());
run_loop.Quit();
}));
channel_->StartReceivingMessages(AssertOkCallback());
run_loop.Run();
}
TEST_F(FtlMessageReceptionChannelTest, LifetimeExceeded_ResetsStream) {
base::RunLoop run_loop;
EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
.WillOnce(Invoke(
[&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
on_stream_started,
const base::RepeatingCallback<void(
const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
auto fake_server_stream = CreateFakeServerStream();
base::WeakPtr<FakeScopedGrpcServerStream> old_stream =
fake_server_stream->GetWeakPtr();
std::move(on_stream_started).Run(std::move(fake_server_stream));
on_incoming_msg.Run(CreateStartOfBatchResponse());
// Keep sending pong until lifetime exceeded.
base::TimeDelta pong_period =
FtlMessageReceptionChannel::kPongTimeout -
base::TimeDelta::FromSeconds(1);
ASSERT_LT(base::TimeDelta(), pong_period);
base::TimeDelta ticked_time;
// The last FastForwardBy() will make the channel reopen the stream.
while (ticked_time <= FtlMessageReceptionChannel::kPongTimeout) {
scoped_task_environment_.FastForwardBy(pong_period);
ticked_time += pong_period;
}
ASSERT_FALSE(old_stream);
}))
.WillOnce(Invoke(
[&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
on_stream_started,
const base::RepeatingCallback<void(
const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
// Reopening the stream. Send StartOfBatch and verify that it resets
// the failure counter.
std::move(on_stream_started).Run(CreateFakeServerStream());
on_incoming_msg.Run(CreateStartOfBatchResponse());
ASSERT_EQ(0, GetRetryFailureCount());
run_loop.Quit();
}));
channel_->StartReceivingMessages(AssertOkCallback());
run_loop.Run();
}
TEST_F(FtlMessageReceptionChannelTest, TimeoutIncreasesToMaximum) {
base::RunLoop run_loop;
int failure_count = 0;
int hitting_max_delay_count = 0;
EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
.WillRepeatedly(Invoke(
[&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
on_stream_started,
const base::RepeatingCallback<void(
const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
std::move(on_stream_started).Run(CreateFakeServerStream());
// Quit if delay is ~kBackoffMaxDelay three times.
if (hitting_max_delay_count == 3) {
on_incoming_msg.Run(CreateStartOfBatchResponse());
ASSERT_EQ(0, GetRetryFailureCount());
run_loop.Quit();
return;
}
// Otherwise send UNAVAILABLE to reset the stream.
std::move(on_channel_closed)
.Run(grpc::Status(grpc::StatusCode::UNAVAILABLE, ""));
int new_failure_count = GetRetryFailureCount();
ASSERT_LT(failure_count, new_failure_count);
failure_count = new_failure_count;
base::TimeDelta time_until_retry = GetTimeUntilRetry();
base::TimeDelta max_delay_diff =
time_until_retry - FtlMessageReceptionChannel::kBackoffMaxDelay;
// Adjust for fuzziness.
if (max_delay_diff.magnitude() <
base::TimeDelta::FromMilliseconds(500)) {
hitting_max_delay_count++;
}
// This will tail-recursively call the stream opener.
scoped_task_environment_.FastForwardBy(time_until_retry);
}));
channel_->StartReceivingMessages(AssertOkCallback());
run_loop.Run();
}
} // namespace remoting