[remoting][FTL] Implement ReceiveMessages stream
This CL implements the ReceiveMessages stream and allows
FtlSignalingPlayground to read from the stream.
Bug: 927962
Change-Id: I4c3b155ccf0f4aa0a5f6b241dd8763cb9fd9f928
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1510052
Commit-Queue: Yuwei Huang <yuweih@chromium.org>
Reviewed-by: Joe Downing <joedow@chromium.org>
Cr-Commit-Position: refs/heads/master@{#643513}
diff --git a/remoting/signaling/BUILD.gn b/remoting/signaling/BUILD.gn
index a292bda3..e025bbc 100644
--- a/remoting/signaling/BUILD.gn
+++ b/remoting/signaling/BUILD.gn
@@ -10,6 +10,8 @@
"delegating_signal_strategy.h",
"ftl_grpc_context.cc",
"ftl_grpc_context.h",
+ "ftl_message_reception_channel.cc",
+ "ftl_message_reception_channel.h",
"ftl_messaging_client.cc",
"ftl_messaging_client.h",
"iq_sender.cc",
@@ -18,6 +20,7 @@
"jid_util.h",
"log_to_server.cc",
"log_to_server.h",
+ "message_reception_channel.h",
"push_notification_subscriber.cc",
"push_notification_subscriber.h",
"server_log_entry.cc",
@@ -61,6 +64,8 @@
sources -= [
"ftl_grpc_context.cc",
"ftl_grpc_context.h",
+ "ftl_message_reception_channel.cc",
+ "ftl_message_reception_channel.h",
"ftl_messaging_client.cc",
"ftl_messaging_client.h",
"log_to_server.cc",
@@ -106,6 +111,7 @@
sources = [
"ftl_grpc_context_unittest.cc",
+ "ftl_message_reception_channel_unittest.cc",
"ftl_messaging_client_unittest.cc",
"iq_sender_unittest.cc",
"jid_util_unittest.cc",
diff --git a/remoting/signaling/ftl.proto b/remoting/signaling/ftl.proto
index c44550cef..8a7eb04 100644
--- a/remoting/signaling/ftl.proto
+++ b/remoting/signaling/ftl.proto
@@ -194,3 +194,25 @@
message AckMessagesResponse {
ResponseHeader header = 1;
}
+
+message ReceiveMessagesRequest {
+ RequestHeader header = 1;
+}
+
+message ReceiveMessagesResponse {
+ message Pong {}
+
+ message StartOfBatch { int32 count = 1; }
+
+ message EndOfBatch {}
+
+ message RefreshResult {}
+
+ oneof body {
+ InboxMessage inbox_message = 2;
+ Pong pong = 3;
+ StartOfBatch start_of_batch = 4;
+ EndOfBatch end_of_batch = 5;
+ RefreshResult refresh_result = 6;
+ }
+}
diff --git a/remoting/signaling/ftl_grpc_context.h b/remoting/signaling/ftl_grpc_context.h
index f513153..e28f6da4 100644
--- a/remoting/signaling/ftl_grpc_context.h
+++ b/remoting/signaling/ftl_grpc_context.h
@@ -15,6 +15,7 @@
#include "remoting/base/oauth_token_getter.h"
#include "remoting/signaling/ftl_services.grpc.pb.h"
#include "remoting/signaling/grpc_support/grpc_async_dispatcher.h"
+#include "remoting/signaling/grpc_support/scoped_grpc_server_stream.h"
#include "third_party/grpc/src/include/grpcpp/support/status.h"
namespace remoting {
@@ -25,6 +26,8 @@
template <typename ResponseType>
using RpcCallback =
base::OnceCallback<void(const grpc::Status&, const ResponseType&)>;
+ using StreamStartedCallback =
+ base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>;
static std::string GetChromotingAppIdentifier();
@@ -47,6 +50,26 @@
GetOAuthTokenAndExecuteRpc(std::move(execute_rpc_with_context));
}
+ // |request| doesn't need to set the header field since this class will set
+ // it for you.
+ template <typename RequestType, typename ResponseType>
+ void ExecuteServerStreamingRpc(
+ GrpcAsyncDispatcher::AsyncServerStreamingRpcFunction<RequestType,
+ ResponseType> rpc,
+ const RequestType& request,
+ StreamStartedCallback on_stream_started,
+ const GrpcAsyncDispatcher::RpcStreamCallback<ResponseType>&
+ on_incoming_msg,
+ GrpcAsyncDispatcher::RpcChannelClosedCallback on_channel_closed) {
+ auto execute_rpc_with_context = base::BindOnce(
+ &FtlGrpcContext::ExecuteServerStreamingRpcWithContext<RequestType,
+ ResponseType>,
+ weak_factory_.GetWeakPtr(), std::move(rpc), request,
+ std::move(on_stream_started), on_incoming_msg,
+ std::move(on_channel_closed));
+ GetOAuthTokenAndExecuteRpc(std::move(execute_rpc_with_context));
+ }
+
std::shared_ptr<grpc::ChannelInterface> channel() { return channel_; }
void SetChannelForTesting(std::shared_ptr<grpc::ChannelInterface> channel);
@@ -66,6 +89,23 @@
std::move(callback));
}
+ template <typename RequestType, typename ResponseType>
+ void ExecuteServerStreamingRpcWithContext(
+ GrpcAsyncDispatcher::AsyncServerStreamingRpcFunction<RequestType,
+ ResponseType> rpc,
+ RequestType request,
+ StreamStartedCallback on_stream_started,
+ const GrpcAsyncDispatcher::RpcStreamCallback<ResponseType>&
+ on_incoming_msg,
+ GrpcAsyncDispatcher::RpcChannelClosedCallback on_channel_closed,
+ std::unique_ptr<grpc::ClientContext> context) {
+ request.set_allocated_header(BuildRequestHeader().release());
+ auto scoped_stream = dispatcher_.ExecuteAsyncServerStreamingRpc(
+ std::move(rpc), std::move(context), request, on_incoming_msg,
+ std::move(on_channel_closed));
+ std::move(on_stream_started).Run(std::move(scoped_stream));
+ }
+
void GetOAuthTokenAndExecuteRpc(
ExecuteRpcWithContextCallback execute_rpc_with_context);
diff --git a/remoting/signaling/ftl_grpc_context_unittest.cc b/remoting/signaling/ftl_grpc_context_unittest.cc
index 325bd3b..018561a 100644
--- a/remoting/signaling/ftl_grpc_context_unittest.cc
+++ b/remoting/signaling/ftl_grpc_context_unittest.cc
@@ -28,11 +28,16 @@
constexpr char kFakeUserEmail[] = "fake@gmail.com";
constexpr char kFakeAccessToken[] = "Dummy Token";
+constexpr char kFakeFtlAuthToken[] = "Dummy FTL Token";
using PullMessagesCallback =
FtlGrpcContext::RpcCallback<ftl::PullMessagesResponse>;
+using IncomingMessageCallback =
+ GrpcAsyncDispatcher::RpcStreamCallback<ftl::ReceiveMessagesResponse>;
using PullMessagesResponder =
test::GrpcServerResponder<ftl::PullMessagesResponse>;
+using ReceiveMessagesResponder =
+ test::GrpcServerStreamResponder<ftl::ReceiveMessagesResponse>;
PullMessagesCallback QuitRunLoopOnPullMessagesCallback(
base::RunLoop* run_loop) {
@@ -54,8 +59,14 @@
google::internal::communications::instantmessaging::v1::Messaging;
void SendFakePullMessagesRequest(PullMessagesCallback on_response);
+ void SendFakeReceiveMessagesRequest(
+ FtlGrpcContext::StreamStartedCallback on_stream_started,
+ const IncomingMessageCallback& on_incoming_msg,
+ GrpcAsyncDispatcher::RpcChannelClosedCallback on_channel_closed);
std::unique_ptr<PullMessagesResponder> HandlePullMessages(
ftl::PullMessagesRequest* request);
+ std::unique_ptr<ReceiveMessagesResponder> HandleReceiveMessages(
+ ftl::ReceiveMessagesRequest* request);
scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
std::unique_ptr<FtlGrpcContext> context_;
@@ -93,12 +104,30 @@
request, std::move(on_response));
}
+void FtlGrpcContextTest::SendFakeReceiveMessagesRequest(
+ FtlGrpcContext::StreamStartedCallback on_stream_started,
+ const IncomingMessageCallback& on_incoming_msg,
+ GrpcAsyncDispatcher::RpcChannelClosedCallback on_channel_closed) {
+ context_->ExecuteServerStreamingRpc(
+ base::BindOnce(&Messaging::Stub::AsyncReceiveMessages,
+ base::Unretained(stub_.get())),
+ ftl::ReceiveMessagesRequest(), std::move(on_stream_started),
+ on_incoming_msg, std::move(on_channel_closed));
+}
+
std::unique_ptr<PullMessagesResponder> FtlGrpcContextTest::HandlePullMessages(
ftl::PullMessagesRequest* request) {
return server_->HandleRequest(&Messaging::AsyncService::RequestPullMessages,
request);
}
+std::unique_ptr<ReceiveMessagesResponder>
+FtlGrpcContextTest::HandleReceiveMessages(
+ ftl::ReceiveMessagesRequest* request) {
+ return server_->HandleStreamRequest(
+ &Messaging::AsyncService::RequestReceiveMessages, request);
+}
+
TEST_F(FtlGrpcContextTest, VerifyAPIKeyIsProvided) {
base::RunLoop run_loop;
@@ -153,10 +182,8 @@
}
TEST_F(FtlGrpcContextTest, HasAuthTokenIfSet) {
- constexpr char dummy_ftl_token[] = "Dummy FTL Token";
-
base::RunLoop run_loop;
- context_->SetAuthToken(dummy_ftl_token);
+ context_->SetAuthToken(kFakeFtlAuthToken);
SendFakePullMessagesRequest(QuitRunLoopOnPullMessagesCallback(&run_loop));
@@ -164,12 +191,57 @@
FROM_HERE, base::BindLambdaForTesting([&]() {
ftl::PullMessagesRequest request;
auto responder = HandlePullMessages(&request);
- ASSERT_EQ(dummy_ftl_token, request.header().auth_token_payload());
+ ASSERT_EQ(kFakeFtlAuthToken, request.header().auth_token_payload());
responder->Respond(ftl::PullMessagesResponse(), grpc::Status::OK);
}));
run_loop.Run();
}
+TEST_F(FtlGrpcContextTest, ServerStreamingScenario) {
+ base::RunLoop run_loop;
+ context_->SetAuthToken(kFakeFtlAuthToken);
+
+ ftl::InboxMessage fake_inbox_message;
+ fake_inbox_message.set_message_id("msg_1");
+
+ std::unique_ptr<ScopedGrpcServerStream> server_stream;
+ std::unique_ptr<ReceiveMessagesResponder> responder;
+
+ SendFakeReceiveMessagesRequest(
+ // On stream started
+ base::BindLambdaForTesting(
+ [&](std::unique_ptr<ScopedGrpcServerStream> stream) {
+ server_stream = std::move(stream);
+
+ ftl::ReceiveMessagesRequest request;
+ responder = HandleReceiveMessages(&request);
+ ASSERT_TRUE(request.has_header());
+ ASSERT_FALSE(request.header().request_id().empty());
+ ASSERT_FALSE(request.header().app().empty());
+ ASSERT_TRUE(request.header().has_client_info());
+ ASSERT_EQ(kFakeFtlAuthToken, request.header().auth_token_payload());
+
+ ftl::ReceiveMessagesResponse response;
+ response.set_allocated_inbox_message(
+ new ftl::InboxMessage(fake_inbox_message));
+ responder->SendMessage(response);
+ }),
+
+ // On message received
+ base::BindLambdaForTesting(
+ [&](const ftl::ReceiveMessagesResponse& response) {
+ ASSERT_EQ(fake_inbox_message.message_id(),
+ response.inbox_message().message_id());
+ responder->Close(grpc::Status::OK);
+ }),
+
+ // On channel closed
+ test::CheckStatusThenQuitRunLoopCallback(FROM_HERE, grpc::StatusCode::OK,
+ &run_loop));
+
+ run_loop.Run();
+}
+
// TODO(yuweih): Ideally we should verify access token is properly attached too,
// but currently this information seems to be lost in ServiceContext.
diff --git a/remoting/signaling/ftl_message_reception_channel.cc b/remoting/signaling/ftl_message_reception_channel.cc
new file mode 100644
index 0000000..ab74d048
--- /dev/null
+++ b/remoting/signaling/ftl_message_reception_channel.cc
@@ -0,0 +1,215 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/signaling/ftl_message_reception_channel.h"
+
+#include <utility>
+
+#include "base/bind_helpers.h"
+#include "base/callback.h"
+#include "base/logging.h"
+#include "remoting/signaling/ftl_grpc_context.h"
+#include "remoting/signaling/ftl_services.grpc.pb.h"
+#include "remoting/signaling/grpc_support/scoped_grpc_server_stream.h"
+
+namespace remoting {
+
+namespace {
+
+const net::BackoffEntry::Policy kBackoffPolicy = {
+ // Number of initial errors (in sequence) to ignore before applying
+ // exponential back-off rules.
+ 0,
+
+ // Initial delay for exponential back-off in ms.
+ FtlMessageReceptionChannel::kBackoffInitialDelay.InMilliseconds(),
+
+ // Factor by which the waiting time will be multiplied.
+ 2,
+
+ // Fuzzing percentage. ex: 10% will spread requests randomly
+ // between 90%-100% of the calculated time.
+ 0.5,
+
+ // Maximum amount of time we are willing to delay our request in ms.
+ FtlMessageReceptionChannel::kBackoffMaxDelay.InMilliseconds(),
+
+ // Time to keep an entry from being discarded even when it
+ // has no significant state, -1 to never discard.
+ -1,
+
+ // Starts with initial delay.
+ false,
+};
+
+} // namespace
+
+constexpr base::TimeDelta FtlMessageReceptionChannel::kPongTimeout;
+constexpr base::TimeDelta FtlMessageReceptionChannel::kStreamLifetime;
+constexpr base::TimeDelta FtlMessageReceptionChannel::kBackoffInitialDelay;
+constexpr base::TimeDelta FtlMessageReceptionChannel::kBackoffMaxDelay;
+
+FtlMessageReceptionChannel::FtlMessageReceptionChannel()
+ : reconnect_retry_backoff_(&kBackoffPolicy), weak_factory_(this) {}
+
+FtlMessageReceptionChannel::~FtlMessageReceptionChannel() = default;
+
+void FtlMessageReceptionChannel::Initialize(
+ const StreamOpener& stream_opener,
+ const MessageCallback& on_incoming_msg) {
+ DCHECK(stream_opener);
+ DCHECK(on_incoming_msg);
+ DCHECK(!stream_opener_);
+ DCHECK(!on_incoming_msg_);
+ stream_opener_ = stream_opener;
+ on_incoming_msg_ = on_incoming_msg;
+}
+
+void FtlMessageReceptionChannel::StartReceivingMessages(DoneCallback on_done) {
+ if (state_ == State::STARTED) {
+ std::move(on_done).Run(grpc::Status::OK);
+ return;
+ }
+ start_receiving_messages_callbacks_.push_back(std::move(on_done));
+ if (state_ == State::STARTING) {
+ return;
+ }
+ StartReceivingMessagesInternal();
+}
+
+void FtlMessageReceptionChannel::StopReceivingMessages() {
+ if (state_ == State::STOPPED) {
+ return;
+ }
+ StopReceivingMessagesInternal();
+ RunStartReceivingMessagesCallbacks(grpc::Status::CANCELLED);
+}
+
+const net::BackoffEntry&
+FtlMessageReceptionChannel::GetReconnectRetryBackoffEntryForTesting() const {
+ return reconnect_retry_backoff_;
+}
+
+void FtlMessageReceptionChannel::OnReceiveMessagesStreamStarted(
+ std::unique_ptr<ScopedGrpcServerStream> stream) {
+ receive_messages_stream_ = std::move(stream);
+}
+
+void FtlMessageReceptionChannel::OnReceiveMessagesStreamClosed(
+ const grpc::Status& status) {
+ if (state_ == State::STOPPED) {
+ // Previously closed by the caller.
+ return;
+ }
+ if (status.error_code() == grpc::StatusCode::ABORTED ||
+ status.error_code() == grpc::StatusCode::UNAVAILABLE) {
+ // These are 'soft' connection errors that should be retried.
+ // Other errors should be ignored. See this file for more info:
+ // third_party/grpc/src/include/grpcpp/impl/codegen/status_code_enum.h
+ RetryStartReceivingMessagesWithBackoff();
+ return;
+ }
+ StopReceivingMessagesInternal();
+ RunStartReceivingMessagesCallbacks(status);
+}
+
+void FtlMessageReceptionChannel::OnMessageReceived(
+ const ftl::ReceiveMessagesResponse& response) {
+ switch (response.body_case()) {
+ case ftl::ReceiveMessagesResponse::BodyCase::kInboxMessage: {
+ VLOG(0) << "Received message";
+ on_incoming_msg_.Run(response.inbox_message());
+ break;
+ }
+ case ftl::ReceiveMessagesResponse::BodyCase::kPong:
+ VLOG(0) << "Received pong";
+ stream_pong_timer_->Reset();
+ break;
+ case ftl::ReceiveMessagesResponse::BodyCase::kStartOfBatch:
+ state_ = State::STARTED;
+ RunStartReceivingMessagesCallbacks(grpc::Status::OK);
+ BeginStreamTimers();
+ break;
+ case ftl::ReceiveMessagesResponse::BodyCase::kEndOfBatch:
+ VLOG(0) << "Received end of batch";
+ break;
+ default:
+ LOG(WARNING) << "Received unknown message type: " << response.body_case();
+ break;
+ }
+}
+
+void FtlMessageReceptionChannel::RunStartReceivingMessagesCallbacks(
+ const grpc::Status& status) {
+ if (start_receiving_messages_callbacks_.empty()) {
+ return;
+ }
+ for (DoneCallback& callback : start_receiving_messages_callbacks_) {
+ std::move(callback).Run(status);
+ }
+ start_receiving_messages_callbacks_.clear();
+}
+
+void FtlMessageReceptionChannel::RetryStartReceivingMessagesWithBackoff() {
+ reconnect_retry_backoff_.InformOfRequest(false);
+ VLOG(0) << "RetryStartReceivingMessages will be called with backoff: "
+ << reconnect_retry_backoff_.GetTimeUntilRelease();
+ reconnect_retry_timer_.Start(
+ FROM_HERE, reconnect_retry_backoff_.GetTimeUntilRelease(),
+ base::BindOnce(&FtlMessageReceptionChannel::RetryStartReceivingMessages,
+ base::Unretained(this)));
+}
+
+void FtlMessageReceptionChannel::RetryStartReceivingMessages() {
+ VLOG(0) << "RetryStartReceivingMessages called";
+ StopReceivingMessagesInternal();
+ StartReceivingMessagesInternal();
+}
+
+void FtlMessageReceptionChannel::StartReceivingMessagesInternal() {
+ DCHECK_EQ(State::STOPPED, state_);
+ state_ = State::STARTING;
+ stream_opener_.Run(
+ base::BindOnce(
+ &FtlMessageReceptionChannel::OnReceiveMessagesStreamStarted,
+ weak_factory_.GetWeakPtr()),
+ base::BindRepeating(&FtlMessageReceptionChannel::OnMessageReceived,
+ weak_factory_.GetWeakPtr()),
+ base::BindOnce(&FtlMessageReceptionChannel::OnReceiveMessagesStreamClosed,
+ weak_factory_.GetWeakPtr()));
+}
+
+void FtlMessageReceptionChannel::StopReceivingMessagesInternal() {
+ DCHECK_NE(State::STOPPED, state_);
+ state_ = State::STOPPED;
+ receive_messages_stream_.reset();
+ reconnect_retry_timer_.Stop();
+ stream_lifetime_timer_.Stop();
+ stream_pong_timer_.reset();
+}
+
+void FtlMessageReceptionChannel::BeginStreamTimers() {
+ reconnect_retry_backoff_.Reset();
+ stream_pong_timer_ = std::make_unique<base::DelayTimer>(
+ FROM_HERE, kPongTimeout, this,
+ &FtlMessageReceptionChannel::OnPongTimeout);
+ stream_pong_timer_->Reset();
+ stream_lifetime_timer_.Start(
+ FROM_HERE, kStreamLifetime,
+ base::BindOnce(&FtlMessageReceptionChannel::OnStreamLifetimeExceeded,
+ base::Unretained(this)));
+}
+
+void FtlMessageReceptionChannel::OnPongTimeout() {
+ LOG(WARNING) << "Timed out waiting for PONG message from server.";
+ RetryStartReceivingMessagesWithBackoff();
+}
+
+void FtlMessageReceptionChannel::OnStreamLifetimeExceeded() {
+ VLOG(0) << "Reached maximum lifetime for current stream.";
+ reconnect_retry_backoff_.Reset();
+ RetryStartReceivingMessages();
+}
+
+} // namespace remoting
diff --git a/remoting/signaling/ftl_message_reception_channel.h b/remoting/signaling/ftl_message_reception_channel.h
new file mode 100644
index 0000000..a105d67
--- /dev/null
+++ b/remoting/signaling/ftl_message_reception_channel.h
@@ -0,0 +1,88 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_SIGNALING_FTL_MESSAGE_RECEPTION_CHANNEL_H_
+#define REMOTING_SIGNALING_FTL_MESSAGE_RECEPTION_CHANNEL_H_
+
+#include <list>
+#include <memory>
+
+#include "base/callback_forward.h"
+#include "base/macros.h"
+#include "base/memory/weak_ptr.h"
+#include "base/time/time.h"
+#include "base/timer/timer.h"
+#include "net/base/backoff_entry.h"
+#include "remoting/signaling/message_reception_channel.h"
+
+namespace remoting {
+
+// Handles the lifetime and validity of the messaging stream used for FTL.
+class FtlMessageReceptionChannel final : public MessageReceptionChannel {
+ public:
+ static constexpr base::TimeDelta kPongTimeout =
+ base::TimeDelta::FromSeconds(15);
+ static constexpr base::TimeDelta kStreamLifetime =
+ base::TimeDelta::FromMinutes(13);
+ static constexpr base::TimeDelta kBackoffInitialDelay =
+ base::TimeDelta::FromSeconds(1);
+ static constexpr base::TimeDelta kBackoffMaxDelay =
+ base::TimeDelta::FromMinutes(1);
+
+ FtlMessageReceptionChannel();
+ ~FtlMessageReceptionChannel() override;
+
+ // MessageReceptionChannel implementations.
+ void Initialize(const StreamOpener& stream_opener,
+ const MessageCallback& on_incoming_msg) override;
+ void StartReceivingMessages(DoneCallback on_done) override;
+ void StopReceivingMessages() override;
+
+ const net::BackoffEntry& GetReconnectRetryBackoffEntryForTesting() const;
+
+ private:
+ enum class State {
+ // TODO(yuweih): Evaluate if this class needs to be reusable.
+ STOPPED,
+
+ // StartReceivingMessages() is called but the channel hasn't received a
+ // signal from the server yet.
+ STARTING,
+
+ // Stream is started, or is dropped but being retried.
+ STARTED,
+ };
+
+ void OnReceiveMessagesStreamStarted(
+ std::unique_ptr<ScopedGrpcServerStream> stream);
+ void OnReceiveMessagesStreamClosed(const grpc::Status& status);
+ void OnMessageReceived(const ftl::ReceiveMessagesResponse& response);
+
+ void RunStartReceivingMessagesCallbacks(const grpc::Status& status);
+ void RetryStartReceivingMessagesWithBackoff();
+ void RetryStartReceivingMessages();
+ void StartReceivingMessagesInternal();
+ void StopReceivingMessagesInternal();
+
+ void BeginStreamTimers();
+ void OnPongTimeout();
+ void OnStreamLifetimeExceeded();
+
+ StreamOpener stream_opener_;
+ MessageCallback on_incoming_msg_;
+ std::unique_ptr<ScopedGrpcServerStream> receive_messages_stream_;
+ std::list<DoneCallback> start_receiving_messages_callbacks_;
+ State state_ = State::STOPPED;
+ net::BackoffEntry reconnect_retry_backoff_;
+ base::OneShotTimer reconnect_retry_timer_;
+ base::OneShotTimer stream_lifetime_timer_;
+ std::unique_ptr<base::DelayTimer> stream_pong_timer_;
+
+ base::WeakPtrFactory<FtlMessageReceptionChannel> weak_factory_;
+ DISALLOW_COPY_AND_ASSIGN(FtlMessageReceptionChannel);
+};
+
+} // namespace remoting
+
+#endif // REMOTING_SIGNALING_FTL_MESSAGE_RECEPTION_CHANNEL_H_
diff --git a/remoting/signaling/ftl_message_reception_channel_unittest.cc b/remoting/signaling/ftl_message_reception_channel_unittest.cc
new file mode 100644
index 0000000..659c8848
--- /dev/null
+++ b/remoting/signaling/ftl_message_reception_channel_unittest.cc
@@ -0,0 +1,454 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/signaling/ftl_message_reception_channel.h"
+
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "base/bind.h"
+#include "base/logging.h"
+#include "base/memory/weak_ptr.h"
+#include "base/run_loop.h"
+#include "base/test/bind_test_util.h"
+#include "base/test/mock_callback.h"
+#include "base/test/scoped_task_environment.h"
+#include "remoting/signaling/ftl.pb.h"
+#include "remoting/signaling/grpc_support/grpc_test_util.h"
+#include "remoting/signaling/grpc_support/scoped_grpc_server_stream.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace remoting {
+
+namespace {
+
+using ::testing::_;
+using ::testing::Expectation;
+using ::testing::Invoke;
+using ::testing::Property;
+using ::testing::Return;
+
+// Fake stream implementation to allow probing if a stream is closed by client.
+class FakeScopedGrpcServerStream : public ScopedGrpcServerStream {
+ public:
+ FakeScopedGrpcServerStream()
+ : ScopedGrpcServerStream(nullptr), weak_factory_(this) {}
+ ~FakeScopedGrpcServerStream() override = default;
+
+ base::WeakPtr<FakeScopedGrpcServerStream> GetWeakPtr() {
+ return weak_factory_.GetWeakPtr();
+ }
+
+ private:
+ base::WeakPtrFactory<FakeScopedGrpcServerStream> weak_factory_;
+ DISALLOW_COPY_AND_ASSIGN(FakeScopedGrpcServerStream);
+};
+
+std::unique_ptr<FakeScopedGrpcServerStream> CreateFakeServerStream() {
+ return std::make_unique<FakeScopedGrpcServerStream>();
+}
+
+ftl::ReceiveMessagesResponse CreateStartOfBatchResponse() {
+ ftl::ReceiveMessagesResponse response;
+ response.mutable_start_of_batch();
+ return response;
+}
+
+base::OnceCallback<void(const grpc::Status& status)> AssertOkCallback() {
+ return base::BindOnce(
+ [](const grpc::Status& status) { ASSERT_TRUE(status.ok()); });
+}
+
+} // namespace
+
+class FtlMessageReceptionChannelTest : public testing::Test {
+ public:
+ void SetUp() override;
+ void TearDown() override;
+
+ protected:
+ base::TimeDelta GetTimeUntilRetry() const;
+ int GetRetryFailureCount() const;
+
+ base::test::ScopedTaskEnvironment scoped_task_environment_{
+ base::test::ScopedTaskEnvironment::MainThreadType::MOCK_TIME,
+ base::test::ScopedTaskEnvironment::NowSource::MAIN_THREAD_MOCK_TIME};
+ std::unique_ptr<FtlMessageReceptionChannel> channel_;
+ base::MockCallback<FtlMessageReceptionChannel::StreamOpener>
+ mock_stream_opener_;
+ base::MockCallback<base::RepeatingCallback<void(const ftl::InboxMessage&)>>
+ mock_on_incoming_msg_;
+};
+
+void FtlMessageReceptionChannelTest::SetUp() {
+ channel_ = std::make_unique<FtlMessageReceptionChannel>();
+ channel_->Initialize(mock_stream_opener_.Get(), mock_on_incoming_msg_.Get());
+}
+
+void FtlMessageReceptionChannelTest::TearDown() {
+ channel_.reset();
+ scoped_task_environment_.FastForwardUntilNoTasksRemain();
+}
+
+base::TimeDelta FtlMessageReceptionChannelTest::GetTimeUntilRetry() const {
+ return channel_->GetReconnectRetryBackoffEntryForTesting()
+ .GetTimeUntilRelease();
+}
+
+int FtlMessageReceptionChannelTest::GetRetryFailureCount() const {
+ return channel_->GetReconnectRetryBackoffEntryForTesting().failure_count();
+}
+
+TEST_F(FtlMessageReceptionChannelTest,
+ TestStartReceivingMessages_StoppedImmediately) {
+ base::RunLoop run_loop;
+
+ EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
+ .WillOnce(Invoke(
+ [&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+ on_stream_started,
+ const base::RepeatingCallback<void(
+ const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
+ base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
+ std::move(on_stream_started).Run(CreateFakeServerStream());
+ channel_->StopReceivingMessages();
+ }));
+
+ channel_->StartReceivingMessages(test::CheckStatusThenQuitRunLoopCallback(
+ FROM_HERE, grpc::StatusCode::CANCELLED, &run_loop));
+
+ run_loop.Run();
+}
+
+TEST_F(FtlMessageReceptionChannelTest,
+ TestStartReceivingMessages_NotAuthenticated) {
+ base::RunLoop run_loop;
+
+ EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
+ .WillOnce(Invoke(
+ [&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+ on_stream_started,
+ const base::RepeatingCallback<void(
+ const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
+ base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
+ std::move(on_stream_started).Run(CreateFakeServerStream());
+ std::move(on_channel_closed)
+ .Run(grpc::Status(grpc::StatusCode::UNAUTHENTICATED, ""));
+ }));
+
+ channel_->StartReceivingMessages(test::CheckStatusThenQuitRunLoopCallback(
+ FROM_HERE, grpc::StatusCode::UNAUTHENTICATED, &run_loop));
+
+ run_loop.Run();
+}
+
+TEST_F(FtlMessageReceptionChannelTest,
+ TestStartReceivingMessages_StreamStarted) {
+ base::RunLoop run_loop;
+
+ EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
+ .WillOnce(Invoke(
+ [&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+ on_stream_started,
+ const base::RepeatingCallback<void(
+ const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
+ base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
+ std::move(on_stream_started).Run(CreateFakeServerStream());
+ on_incoming_msg.Run(CreateStartOfBatchResponse());
+ }));
+
+ channel_->StartReceivingMessages(test::CheckStatusThenQuitRunLoopCallback(
+ FROM_HERE, grpc::StatusCode::OK, &run_loop));
+
+ run_loop.Run();
+}
+
+TEST_F(FtlMessageReceptionChannelTest,
+ TestStartReceivingMessages_RecoverableStreamError) {
+ base::RunLoop run_loop;
+
+ base::WeakPtr<FakeScopedGrpcServerStream> old_stream;
+ EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
+ .WillOnce(Invoke(
+ [&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+ on_stream_started,
+ const base::RepeatingCallback<void(
+ const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
+ base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
+ // The first open stream attempt fails with UNAVAILABLE error.
+ auto fake_server_stream = CreateFakeServerStream();
+ old_stream = fake_server_stream->GetWeakPtr();
+ std::move(on_stream_started).Run(std::move(fake_server_stream));
+
+ ASSERT_EQ(0, GetRetryFailureCount());
+
+ std::move(on_channel_closed)
+ .Run(grpc::Status(grpc::StatusCode::UNAVAILABLE, ""));
+
+ ASSERT_EQ(1, GetRetryFailureCount());
+ ASSERT_NEAR(
+ FtlMessageReceptionChannel::kBackoffInitialDelay.InSecondsF(),
+ GetTimeUntilRetry().InSecondsF(), 0.5);
+
+ // This will make the channel reopen the stream.
+ scoped_task_environment_.FastForwardBy(GetTimeUntilRetry());
+ }))
+ .WillOnce(Invoke(
+ [&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+ on_stream_started,
+ const base::RepeatingCallback<void(
+ const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
+ base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
+ // Second open stream attempt succeeds.
+
+ // Assert old stream closed.
+ ASSERT_FALSE(old_stream);
+
+ // Send a StartOfBatch and verify it resets the failure counter.
+ std::move(on_stream_started).Run(CreateFakeServerStream());
+ on_incoming_msg.Run(CreateStartOfBatchResponse());
+
+ ASSERT_EQ(0, GetRetryFailureCount());
+ }));
+
+ channel_->StartReceivingMessages(test::CheckStatusThenQuitRunLoopCallback(
+ FROM_HERE, grpc::StatusCode::OK, &run_loop));
+
+ run_loop.Run();
+}
+
+TEST_F(FtlMessageReceptionChannelTest,
+ TestStartReceivingMessages_MultipleCalls) {
+ base::RunLoop run_loop;
+
+ base::MockCallback<FtlMessageReceptionChannel::DoneCallback>
+ start_receiving_messages_callback;
+
+ // Exits the run loop iff the callback is called three times with OK.
+ EXPECT_CALL(start_receiving_messages_callback,
+ Run(Property(&grpc::Status::error_code, grpc::StatusCode::OK)))
+ .WillOnce(Return())
+ .WillOnce(Return())
+ .WillOnce(Invoke([&](const grpc::Status& status) { run_loop.Quit(); }));
+
+ EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
+ .WillOnce(Invoke(
+ [&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+ on_stream_started,
+ const base::RepeatingCallback<void(
+ const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
+ base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
+ std::move(on_stream_started).Run(CreateFakeServerStream());
+ on_incoming_msg.Run(CreateStartOfBatchResponse());
+ }));
+
+ channel_->StartReceivingMessages(start_receiving_messages_callback.Get());
+ channel_->StartReceivingMessages(start_receiving_messages_callback.Get());
+ channel_->StartReceivingMessages(start_receiving_messages_callback.Get());
+
+ run_loop.Run();
+}
+
+TEST_F(FtlMessageReceptionChannelTest, StreamsTwoMessages) {
+ base::RunLoop run_loop;
+
+ constexpr char kMessage1Id[] = "msg_1";
+ constexpr char kMessage2Id[] = "msg_2";
+
+ ftl::InboxMessage message_1;
+ message_1.set_message_id(kMessage1Id);
+ ftl::InboxMessage message_2;
+ message_2.set_message_id(kMessage2Id);
+
+ EXPECT_CALL(mock_on_incoming_msg_,
+ Run(Property(&ftl::InboxMessage::message_id, kMessage1Id)))
+ .WillOnce(Return());
+ EXPECT_CALL(mock_on_incoming_msg_,
+ Run(Property(&ftl::InboxMessage::message_id, kMessage2Id)))
+ .WillOnce(Invoke([&](const ftl::InboxMessage&) { run_loop.Quit(); }));
+
+ EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
+ .WillOnce(Invoke(
+ [&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+ on_stream_started,
+ const base::RepeatingCallback<void(
+ const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
+ base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
+ std::move(on_stream_started).Run(CreateFakeServerStream());
+
+ on_incoming_msg.Run(CreateStartOfBatchResponse());
+
+ ftl::ReceiveMessagesResponse response;
+ *response.mutable_inbox_message() = message_1;
+ on_incoming_msg.Run(response);
+ response.Clear();
+
+ *response.mutable_inbox_message() = message_2;
+ on_incoming_msg.Run(response);
+ response.Clear();
+
+ std::move(on_channel_closed).Run(grpc::Status::OK);
+ }));
+
+ channel_->StartReceivingMessages(AssertOkCallback());
+
+ run_loop.Run();
+}
+
+TEST_F(FtlMessageReceptionChannelTest, NoPongWithinTimeout_ResetsStream) {
+ base::RunLoop run_loop;
+
+ base::WeakPtr<FakeScopedGrpcServerStream> old_stream;
+ EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
+ .WillOnce(Invoke(
+ [&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+ on_stream_started,
+ const base::RepeatingCallback<void(
+ const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
+ base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
+ auto fake_server_stream = CreateFakeServerStream();
+ old_stream = fake_server_stream->GetWeakPtr();
+ std::move(on_stream_started).Run(std::move(fake_server_stream));
+ on_incoming_msg.Run(CreateStartOfBatchResponse());
+ scoped_task_environment_.FastForwardBy(
+ FtlMessageReceptionChannel::kPongTimeout);
+
+ ASSERT_EQ(1, GetRetryFailureCount());
+ ASSERT_NEAR(
+ FtlMessageReceptionChannel::kBackoffInitialDelay.InSecondsF(),
+ GetTimeUntilRetry().InSecondsF(), 0.5);
+
+ // This will make the channel reopen the stream.
+ scoped_task_environment_.FastForwardBy(GetTimeUntilRetry());
+ }))
+ .WillOnce(Invoke(
+ [&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+ on_stream_started,
+ const base::RepeatingCallback<void(
+ const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
+ base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
+ // Stream is reopened.
+
+ // Assert old stream closed.
+ ASSERT_FALSE(old_stream);
+
+ std::move(on_stream_started).Run(CreateFakeServerStream());
+
+ // Sends a StartOfBatch and verify that it resets the failure
+ // counter.
+ on_incoming_msg.Run(CreateStartOfBatchResponse());
+ ASSERT_EQ(0, GetRetryFailureCount());
+ run_loop.Quit();
+ }));
+
+ channel_->StartReceivingMessages(AssertOkCallback());
+
+ run_loop.Run();
+}
+
+TEST_F(FtlMessageReceptionChannelTest, LifetimeExceeded_ResetsStream) {
+ base::RunLoop run_loop;
+
+ EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
+ .WillOnce(Invoke(
+ [&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+ on_stream_started,
+ const base::RepeatingCallback<void(
+ const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
+ base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
+ auto fake_server_stream = CreateFakeServerStream();
+ base::WeakPtr<FakeScopedGrpcServerStream> old_stream =
+ fake_server_stream->GetWeakPtr();
+ std::move(on_stream_started).Run(std::move(fake_server_stream));
+ on_incoming_msg.Run(CreateStartOfBatchResponse());
+
+ // Keep sending pong until lifetime exceeded.
+ base::TimeDelta pong_period =
+ FtlMessageReceptionChannel::kPongTimeout -
+ base::TimeDelta::FromSeconds(1);
+ ASSERT_LT(base::TimeDelta(), pong_period);
+ base::TimeDelta ticked_time;
+
+ // The last FastForwardBy() will make the channel reopen the stream.
+ while (ticked_time <= FtlMessageReceptionChannel::kPongTimeout) {
+ scoped_task_environment_.FastForwardBy(pong_period);
+ ticked_time += pong_period;
+ }
+ ASSERT_FALSE(old_stream);
+ }))
+ .WillOnce(Invoke(
+ [&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+ on_stream_started,
+ const base::RepeatingCallback<void(
+ const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
+ base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
+ // Reopening the stream. Send StartOfBatch and verify that it resets
+ // the failure counter.
+ std::move(on_stream_started).Run(CreateFakeServerStream());
+ on_incoming_msg.Run(CreateStartOfBatchResponse());
+ ASSERT_EQ(0, GetRetryFailureCount());
+ run_loop.Quit();
+ }));
+
+ channel_->StartReceivingMessages(AssertOkCallback());
+
+ run_loop.Run();
+}
+
+TEST_F(FtlMessageReceptionChannelTest, TimeoutIncreasesToMaximum) {
+ base::RunLoop run_loop;
+
+ int failure_count = 0;
+ int hitting_max_delay_count = 0;
+ EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
+ .WillRepeatedly(Invoke(
+ [&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+ on_stream_started,
+ const base::RepeatingCallback<void(
+ const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
+ base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
+ std::move(on_stream_started).Run(CreateFakeServerStream());
+
+ // Quit if delay is ~kBackoffMaxDelay three times.
+ if (hitting_max_delay_count == 3) {
+ on_incoming_msg.Run(CreateStartOfBatchResponse());
+ ASSERT_EQ(0, GetRetryFailureCount());
+ run_loop.Quit();
+ return;
+ }
+
+ // Otherwise send UNAVAILABLE to reset the stream.
+
+ std::move(on_channel_closed)
+ .Run(grpc::Status(grpc::StatusCode::UNAVAILABLE, ""));
+
+ int new_failure_count = GetRetryFailureCount();
+ ASSERT_LT(failure_count, new_failure_count);
+ failure_count = new_failure_count;
+
+ base::TimeDelta time_until_retry = GetTimeUntilRetry();
+
+ base::TimeDelta max_delay_diff =
+ time_until_retry - FtlMessageReceptionChannel::kBackoffMaxDelay;
+
+ // Adjust for fuzziness.
+ if (max_delay_diff.magnitude() <
+ base::TimeDelta::FromMilliseconds(500)) {
+ hitting_max_delay_count++;
+ }
+
+ // This will tail-recursively call the stream opener.
+ scoped_task_environment_.FastForwardBy(time_until_retry);
+ }));
+
+ channel_->StartReceivingMessages(AssertOkCallback());
+
+ run_loop.Run();
+}
+
+} // namespace remoting
diff --git a/remoting/signaling/ftl_messaging_client.cc b/remoting/signaling/ftl_messaging_client.cc
index 106f71d..46d0aa1 100644
--- a/remoting/signaling/ftl_messaging_client.cc
+++ b/remoting/signaling/ftl_messaging_client.cc
@@ -6,9 +6,11 @@
#include <utility>
+#include "base/bind_helpers.h"
#include "base/callback.h"
#include "base/logging.h"
#include "remoting/signaling/ftl_grpc_context.h"
+#include "remoting/signaling/ftl_message_reception_channel.h"
namespace remoting {
@@ -28,6 +30,12 @@
: weak_factory_(this) {
context_ = context;
messaging_stub_ = Messaging::NewStub(context_->channel());
+ reception_channel_ = std::make_unique<FtlMessageReceptionChannel>();
+ reception_channel_->Initialize(
+ base::BindRepeating(&FtlMessagingClient::OpenReceiveMessagesStream,
+ weak_factory_.GetWeakPtr()),
+ base::BindRepeating(&FtlMessagingClient::OnMessageReceived,
+ weak_factory_.GetWeakPtr()));
}
FtlMessagingClient::~FtlMessagingClient() = default;
@@ -46,6 +54,24 @@
weak_factory_.GetWeakPtr(), std::move(on_done)));
}
+void FtlMessagingClient::StartReceivingMessages(DoneCallback on_done) {
+ reception_channel_->StartReceivingMessages(std::move(on_done));
+}
+
+void FtlMessagingClient::StopReceivingMessages() {
+ reception_channel_->StopReceivingMessages();
+}
+
+void FtlMessagingClient::SetMessageReceptionChannelForTesting(
+ std::unique_ptr<MessageReceptionChannel> channel) {
+ reception_channel_ = std::move(channel);
+ reception_channel_->Initialize(
+ base::BindRepeating(&FtlMessagingClient::OpenReceiveMessagesStream,
+ weak_factory_.GetWeakPtr()),
+ base::BindRepeating(&FtlMessagingClient::OnMessageReceived,
+ weak_factory_.GetWeakPtr()));
+}
+
void FtlMessagingClient::OnPullMessagesResponse(
DoneCallback on_done,
const grpc::Status& status,
@@ -70,7 +96,6 @@
return;
}
- // TODO(yuweih): May need retry logic.
VLOG(0) << "Acking " << ack_request.messages_size() << " messages";
AckMessages(ack_request, std::move(on_done));
@@ -90,9 +115,23 @@
DoneCallback on_done,
const grpc::Status& status,
const ftl::AckMessagesResponse& response) {
+ // TODO(yuweih): Handle failure.
std::move(on_done).Run(status);
}
+void FtlMessagingClient::OpenReceiveMessagesStream(
+ base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+ on_stream_started,
+ const base::RepeatingCallback<void(const ftl::ReceiveMessagesResponse&)>&
+ on_incoming_msg,
+ base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
+ context_->ExecuteServerStreamingRpc(
+ base::BindOnce(&Messaging::Stub::AsyncReceiveMessages,
+ base::Unretained(messaging_stub_.get())),
+ ftl::ReceiveMessagesRequest(), std::move(on_stream_started),
+ on_incoming_msg, std::move(on_channel_closed));
+}
+
void FtlMessagingClient::RunMessageCallbacks(const ftl::InboxMessage& message) {
if (message.message_type() !=
ftl::InboxMessage_MessageType_CHROMOTING_MESSAGE) {
@@ -107,4 +146,11 @@
callback_list_.Notify(message.sender_id().id(), chromoting_message.message());
}
+void FtlMessagingClient::OnMessageReceived(const ftl::InboxMessage& message) {
+ RunMessageCallbacks(message);
+ ftl::AckMessagesRequest ack_request;
+ AddMessageToAckRequest(message, &ack_request);
+ AckMessages(ack_request, base::DoNothing());
+}
+
} // namespace remoting
diff --git a/remoting/signaling/ftl_messaging_client.h b/remoting/signaling/ftl_messaging_client.h
index f5f7e42..3e70a7d0 100644
--- a/remoting/signaling/ftl_messaging_client.h
+++ b/remoting/signaling/ftl_messaging_client.h
@@ -17,6 +17,8 @@
namespace remoting {
class FtlGrpcContext;
+class MessageReceptionChannel;
+class ScopedGrpcServerStream;
// A class for sending and receiving messages via the FTL API.
class FtlMessagingClient final {
@@ -44,6 +46,18 @@
// server's inbox.
void PullMessages(DoneCallback on_done);
+ // Opens a stream to continuously receive new messages from the server and
+ // calls the registered MessageCallback once a new message is received.
+ // |on_done| is called once the stream is successfully opened or failed to
+ // open due to an error.
+ void StartReceivingMessages(DoneCallback on_done);
+
+ // Stops the stream for continuously receiving new messages.
+ void StopReceivingMessages();
+
+ void SetMessageReceptionChannelForTesting(
+ std::unique_ptr<MessageReceptionChannel> channel);
+
private:
using Messaging =
google::internal::communications::instantmessaging::v1::Messaging;
@@ -59,10 +73,20 @@
const grpc::Status& status,
const ftl::AckMessagesResponse& response);
+ void OpenReceiveMessagesStream(
+ base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+ on_stream_started,
+ const base::RepeatingCallback<void(const ftl::ReceiveMessagesResponse&)>&
+ on_incoming_msg,
+ base::OnceCallback<void(const grpc::Status&)> on_channel_closed);
+
void RunMessageCallbacks(const ftl::InboxMessage& message);
+ void OnMessageReceived(const ftl::InboxMessage& message);
+
FtlGrpcContext* context_;
std::unique_ptr<Messaging::Stub> messaging_stub_;
+ std::unique_ptr<MessageReceptionChannel> reception_channel_;
base::CallbackList<void(const std::string&, const std::string&)>
callback_list_;
diff --git a/remoting/signaling/ftl_messaging_client_unittest.cc b/remoting/signaling/ftl_messaging_client_unittest.cc
index 5b27d4b..6f5ac52 100644
--- a/remoting/signaling/ftl_messaging_client_unittest.cc
+++ b/remoting/signaling/ftl_messaging_client_unittest.cc
@@ -22,6 +22,7 @@
#include "remoting/signaling/ftl_services.grpc.pb.h"
#include "remoting/signaling/grpc_support/grpc_async_test_server.h"
#include "remoting/signaling/grpc_support/grpc_test_util.h"
+#include "remoting/signaling/message_reception_channel.h"
#include "testing/gmock/include/gmock/gmock.h"
#include "testing/gtest/include/gtest/gtest.h"
@@ -29,6 +30,9 @@
namespace {
+using ::testing::_;
+using ::testing::Invoke;
+using ::testing::Property;
using ::testing::Return;
using PullMessagesResponder =
@@ -59,6 +63,32 @@
return message;
}
+class MockMessageReceptionChannel : public MessageReceptionChannel {
+ public:
+ MockMessageReceptionChannel() = default;
+ ~MockMessageReceptionChannel() override = default;
+
+ // MessageReceptionChannel implementations.
+ void Initialize(const StreamOpener& stream_opener,
+ const MessageCallback& on_incoming_msg) override {
+ stream_opener_ = stream_opener;
+ on_incoming_msg_ = on_incoming_msg;
+ }
+
+ MOCK_METHOD1(StartReceivingMessages, void(DoneCallback));
+ MOCK_METHOD0(StopReceivingMessages, void());
+
+ StreamOpener* stream_opener() { return &stream_opener_; }
+
+ MessageCallback* on_incoming_msg() { return &on_incoming_msg_; }
+
+ private:
+ StreamOpener stream_opener_;
+ MessageCallback on_incoming_msg_;
+
+ DISALLOW_COPY_AND_ASSIGN(MockMessageReceptionChannel);
+};
+
} // namespace
class FtlMessagingClientTest : public testing::Test {
@@ -67,6 +97,9 @@
void TearDown() override;
protected:
+ using Messaging =
+ google::internal::communications::instantmessaging::v1::Messaging;
+
// Calls are scheduled sequentially and handled on the server thread.
void ServerWaitAndRespondToPullMessagesRequest(
const ftl::PullMessagesResponse& response,
@@ -75,15 +108,13 @@
base::OnceCallback<grpc::Status(const ftl::AckMessagesRequest&)> handler,
base::OnceClosure on_done);
+ std::unique_ptr<test::GrpcAsyncTestServer> server_;
+ scoped_refptr<base::SequencedTaskRunner> server_task_runner_;
std::unique_ptr<FtlMessagingClient> messaging_client_;
+ MockMessageReceptionChannel* mock_message_reception_channel_;
private:
- using Messaging =
- google::internal::communications::instantmessaging::v1::Messaging;
-
- std::unique_ptr<test::GrpcAsyncTestServer> server_;
std::unique_ptr<test::FtlGrpcTestEnvironment> test_environment_;
- scoped_refptr<base::SequencedTaskRunner> server_task_runner_;
base::test::ScopedTaskEnvironment scoped_task_environment_;
};
@@ -96,6 +127,9 @@
server_->CreateInProcessChannel());
messaging_client_ =
std::make_unique<FtlMessagingClient>(test_environment_->context());
+ auto channel = std::make_unique<MockMessageReceptionChannel>();
+ mock_message_reception_channel_ = channel.get();
+ messaging_client_->SetMessageReceptionChannelForTesting(std::move(channel));
}
void FtlMessagingClientTest::TearDown() {
@@ -231,4 +265,96 @@
run_loop.Run();
}
+TEST_F(FtlMessagingClientTest,
+ TestStartReceivingMessages_DoneCallbackForwarded) {
+ base::RunLoop run_loop;
+
+ EXPECT_CALL(*mock_message_reception_channel_, StartReceivingMessages(_))
+ .WillOnce(Invoke([&](FtlMessagingClient::DoneCallback callback) {
+ std::move(callback).Run(
+ grpc::Status(grpc::StatusCode::UNAUTHENTICATED, ""));
+ }));
+
+ messaging_client_->StartReceivingMessages(
+ base::BindLambdaForTesting([&](const grpc::Status& status) {
+ ASSERT_EQ(grpc::StatusCode::UNAUTHENTICATED, status.error_code());
+ run_loop.Quit();
+ }));
+
+ run_loop.Run();
+}
+
+TEST_F(FtlMessagingClientTest, TestStopReceivingMessages_CallForwarded) {
+ EXPECT_CALL(*mock_message_reception_channel_, StopReceivingMessages())
+ .WillOnce(Return());
+ messaging_client_->StopReceivingMessages();
+}
+
+TEST_F(FtlMessagingClientTest,
+ TestStreamOpener_StreamsTwoMessagesThenCloseByClient) {
+ base::RunLoop run_loop;
+
+ std::unique_ptr<test::GrpcServerStreamResponder<ftl::ReceiveMessagesResponse>>
+ responder;
+ std::unique_ptr<ScopedGrpcServerStream> scoped_stream;
+
+ ftl::ReceiveMessagesResponse response_1;
+ response_1.mutable_inbox_message()->set_message_id(kMessage1Id);
+ ftl::ReceiveMessagesResponse response_2;
+ response_2.mutable_pong();
+
+ server_task_runner_->PostTask(
+ FROM_HERE, base::BindLambdaForTesting([&]() {
+ ftl::ReceiveMessagesRequest request;
+ // This blocks the server thread until the client opens the stream.
+ responder = server_->HandleStreamRequest(
+ &Messaging::AsyncService::RequestReceiveMessages, &request);
+ ASSERT_TRUE(responder->SendMessage(response_1));
+ ASSERT_TRUE(responder->SendMessage(response_2));
+ }));
+
+ base::MockCallback<
+ base::RepeatingCallback<void(const ftl::ReceiveMessagesResponse&)>>
+ mock_on_incoming_msg;
+ EXPECT_CALL(mock_on_incoming_msg, Run(_))
+ .WillOnce([&](const ftl::ReceiveMessagesResponse& response) {
+ ASSERT_EQ(kMessage1Id, response.inbox_message().message_id());
+ })
+ .WillOnce([&](const ftl::ReceiveMessagesResponse& response) {
+ ASSERT_TRUE(response.has_pong());
+ DCHECK(scoped_stream);
+ scoped_stream.reset();
+ run_loop.QuitWhenIdle();
+ });
+
+ mock_message_reception_channel_->stream_opener()->Run(
+ base::BindLambdaForTesting(
+ [&](std::unique_ptr<ScopedGrpcServerStream> stream) {
+ scoped_stream = std::move(stream);
+ }),
+ mock_on_incoming_msg.Get(),
+ base::BindOnce([](const grpc::Status&) { NOTREACHED(); }));
+
+ run_loop.Run();
+}
+
+TEST_F(FtlMessagingClientTest,
+ TestOnMessageReceived_MessagePassedToSubscriberAndAcked) {
+ base::RunLoop run_loop;
+
+ ftl::InboxMessage message = CreateMessage(kMessage1Id, kMessage1Text);
+ mock_message_reception_channel_->on_incoming_msg()->Run(message);
+
+ ServerWaitAndRespondToAckMessagesRequest(
+ base::BindLambdaForTesting([&](const ftl::AckMessagesRequest& request) {
+ EXPECT_EQ(1, request.messages_size());
+ EXPECT_EQ(kFakeReceiverId, request.messages(0).receiver_id().id());
+ EXPECT_EQ(kMessage1Id, request.messages(0).message_id());
+ return grpc::Status::OK;
+ }),
+ run_loop.QuitWhenIdleClosure());
+
+ run_loop.Run();
+}
+
} // namespace remoting
diff --git a/remoting/signaling/ftl_services.proto b/remoting/signaling/ftl_services.proto
index b523408..9c0a1e5 100644
--- a/remoting/signaling/ftl_services.proto
+++ b/remoting/signaling/ftl_services.proto
@@ -27,4 +27,6 @@
returns (remoting.ftl.PullMessagesResponse) {}
rpc AckMessages(remoting.ftl.AckMessagesRequest)
returns (remoting.ftl.AckMessagesResponse) {}
+ rpc ReceiveMessages(remoting.ftl.ReceiveMessagesRequest)
+ returns (stream remoting.ftl.ReceiveMessagesResponse) {}
}
diff --git a/remoting/signaling/grpc_support/grpc_test_util.cc b/remoting/signaling/grpc_support/grpc_test_util.cc
index 4220077..4a74791 100644
--- a/remoting/signaling/grpc_support/grpc_test_util.cc
+++ b/remoting/signaling/grpc_support/grpc_test_util.cc
@@ -25,7 +25,7 @@
void WaitForCompletionAndAssertOk(const base::Location& from_here,
grpc::CompletionQueue* completion_queue,
void* expected_tag) {
- bool ok = WaitForCompletion(FROM_HERE, completion_queue, expected_tag);
+ bool ok = WaitForCompletion(from_here, completion_queue, expected_tag);
DCHECK(ok) << "Event is not ok. Location: " << from_here.ToString();
}
diff --git a/remoting/signaling/grpc_support/scoped_grpc_server_stream.h b/remoting/signaling/grpc_support/scoped_grpc_server_stream.h
index 219be189..7e257e4 100644
--- a/remoting/signaling/grpc_support/scoped_grpc_server_stream.h
+++ b/remoting/signaling/grpc_support/scoped_grpc_server_stream.h
@@ -20,7 +20,7 @@
public:
explicit ScopedGrpcServerStream(
base::WeakPtr<internal::GrpcAsyncServerStreamingCallDataBase> call_data);
- ~ScopedGrpcServerStream();
+ virtual ~ScopedGrpcServerStream();
private:
base::WeakPtr<internal::GrpcAsyncServerStreamingCallDataBase> call_data_;
diff --git a/remoting/signaling/message_reception_channel.h b/remoting/signaling/message_reception_channel.h
new file mode 100644
index 0000000..2a746b08
--- /dev/null
+++ b/remoting/signaling/message_reception_channel.h
@@ -0,0 +1,54 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_SIGNALING_MESSAGE_RECEPTION_CHANNEL_H_
+#define REMOTING_SIGNALING_MESSAGE_RECEPTION_CHANNEL_H_
+
+#include <list>
+#include <memory>
+
+#include "base/callback_forward.h"
+#include "base/macros.h"
+#include "remoting/signaling/ftl.pb.h"
+#include "third_party/grpc/src/include/grpcpp/support/status.h"
+
+namespace remoting {
+
+class ScopedGrpcServerStream;
+
+// Interface for starting or closing the server stream to receive messages from
+// FTL backend.
+class MessageReceptionChannel {
+ public:
+ using StreamOpener = base::RepeatingCallback<void(
+ base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+ on_stream_started,
+ const base::RepeatingCallback<void(const ftl::ReceiveMessagesResponse&)>&
+ on_incoming_msg,
+ base::OnceCallback<void(const grpc::Status&)> on_channel_closed)>;
+ using MessageCallback =
+ base::RepeatingCallback<void(const ftl::InboxMessage&)>;
+ using DoneCallback = base::OnceCallback<void(const grpc::Status& status)>;
+
+ MessageReceptionChannel() = default;
+ virtual ~MessageReceptionChannel() = default;
+
+ virtual void Initialize(const StreamOpener& stream_opener,
+ const MessageCallback& on_incoming_msg) = 0;
+
+ // Opens a server streaming channel to the FTL API to enable message reception
+ // over the fast path. |on_done| is called to signal success or failure for
+ // starting the stream.
+ virtual void StartReceivingMessages(DoneCallback on_done) = 0;
+
+ // Closes the streaming channel.
+ virtual void StopReceivingMessages() = 0;
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(MessageReceptionChannel);
+};
+
+} // namespace remoting
+
+#endif // REMOTING_SIGNALING_MESSAGE_RECEPTION_CHANNEL_H_
diff --git a/remoting/test/ftl_signaling_playground.cc b/remoting/test/ftl_signaling_playground.cc
index 01702d2..1baf00d 100644
--- a/remoting/test/ftl_signaling_playground.cc
+++ b/remoting/test/ftl_signaling_playground.cc
@@ -19,6 +19,7 @@
#include "base/path_service.h"
#include "base/strings/string_number_conversions.h"
#include "base/strings/stringprintf.h"
+#include "base/task/post_task.h"
#include "remoting/base/fake_oauth_token_getter.h"
#include "remoting/base/oauth_token_getter_impl.h"
#include "remoting/signaling/ftl_services.grpc.pb.h"
@@ -62,6 +63,35 @@
return ReadString();
}
+// Wait for the user to press enter key on an anonymous thread and calls
+// |on_done| once it is done.
+void WaitForEnterKey(base::OnceClosure on_done) {
+ base::PostTaskWithTraitsAndReply(FROM_HERE, {base::MayBlock()},
+ base::BindOnce([]() { getchar(); }),
+ std::move(on_done));
+}
+
+void PrintGrpcStatusError(const grpc::Status& status) {
+ DCHECK(!status.ok());
+ fprintf(stderr, "RPC failed. Code=%d, Message=%s\n", status.error_code(),
+ status.error_message().c_str());
+ switch (status.error_code()) {
+ case grpc::StatusCode::UNAVAILABLE:
+ fprintf(stderr,
+ "Set the GRPC_DEFAULT_SSL_ROOTS_FILE_PATH environment variable "
+ "to third_party/grpc/src/etc/roots.pem if gRPC cannot locate the "
+ "root certificates.\n");
+ break;
+ case grpc::StatusCode::UNAUTHENTICATED:
+ fprintf(
+ stderr,
+ "Request is unauthenticated. Make sure you have run SignInGaia.\n");
+ break;
+ default:
+ break;
+ }
+}
+
} // namespace
namespace remoting {
@@ -116,25 +146,29 @@
while (true) {
printf(
"\nOptions:\n"
- " 1. GetIceServer\n"
- " 2. SignInGaia\n"
+ " 1. SignInGaia\n"
+ " 2. GetIceServer\n"
" 3. PullMessages\n"
- " 4. Quit\n\n"
+ " 4. ReceiveMessages\n"
+ " 5. Quit\n\n"
"Your choice [number]: ");
int choice = 0;
base::StringToInt(ReadString(), &choice);
base::RunLoop run_loop;
switch (choice) {
case 1:
- GetIceServer(run_loop.QuitClosure());
+ SignInGaia(run_loop.QuitClosure());
break;
case 2:
- SignInGaia(run_loop.QuitClosure());
+ GetIceServer(run_loop.QuitClosure());
break;
case 3:
PullMessages(run_loop.QuitClosure());
break;
case 4:
+ StartReceivingMessages(run_loop.QuitWhenIdleClosure());
+ break;
+ case 5:
return;
default:
fprintf(stderr, "Unknown option\n");
@@ -200,27 +234,6 @@
std::move(on_done).Run();
}
-void FtlSignalingPlayground::HandleGrpcStatusError(const grpc::Status& status) {
- DCHECK(!status.ok());
- LOG(ERROR) << "RPC failed. Code=" << status.error_code() << ", "
- << "Message=" << status.error_message();
- switch (status.error_code()) {
- case grpc::StatusCode::UNAVAILABLE:
- VLOG(0)
- << "Set the GRPC_DEFAULT_SSL_ROOTS_FILE_PATH environment variable "
- << "to third_party/grpc/src/etc/roots.pem if gRPC cannot locate the "
- << "root certificates.";
- break;
- case grpc::StatusCode::UNAUTHENTICATED:
- VLOG(0) << "Grpc request failed to authenticate. "
- << "Trying to reauthenticate...";
- AuthenticateAndResetServices();
- break;
- default:
- break;
- }
-}
-
void FtlSignalingPlayground::GetIceServer(base::OnceClosure on_done) {
DCHECK(peer_to_peer_stub_);
VLOG(0) << "Running GetIceServer...";
@@ -254,7 +267,7 @@
}
}
} else {
- HandleGrpcStatusError(status);
+ PrintGrpcStatusError(status);
}
std::move(on_done).Run();
}
@@ -316,7 +329,13 @@
ftl_context_->SetAuthToken(response.auth_token().payload());
VLOG(0) << "Auth token set on FtlClient";
} else {
- HandleGrpcStatusError(status);
+ if (status.error_code() == grpc::StatusCode::UNAUTHENTICATED) {
+ VLOG(0) << "Grpc request failed to authenticate. "
+ << "Trying to reauthenticate...";
+ AuthenticateAndResetServices();
+ } else {
+ PrintGrpcStatusError(status);
+ }
}
std::move(on_done).Run();
}
@@ -334,15 +353,23 @@
base::OnceClosure on_done,
const grpc::Status& status) {
if (!status.ok()) {
- if (status.error_code() == grpc::StatusCode::UNAUTHENTICATED) {
- fprintf(stderr, "Please run SignInGaia first\n");
- } else {
- HandleGrpcStatusError(status);
- }
+ PrintGrpcStatusError(status);
}
std::move(on_done).Run();
}
+void FtlSignalingPlayground::StartReceivingMessages(base::OnceClosure on_done) {
+ VLOG(0) << "Running StartReceivingMessages...";
+ messaging_client_->StartReceivingMessages(
+ base::BindOnce(&FtlSignalingPlayground::OnStartReceivingMessagesDone,
+ weak_factory_.GetWeakPtr(), std::move(on_done)));
+}
+
+void FtlSignalingPlayground::StopReceivingMessages(base::OnceClosure on_done) {
+ messaging_client_->StopReceivingMessages();
+ std::move(on_done).Run();
+}
+
void FtlSignalingPlayground::OnMessageReceived(const std::string& sender_id,
const std::string& message) {
printf(
@@ -352,4 +379,23 @@
sender_id.c_str(), message.c_str());
}
+void FtlSignalingPlayground::OnStartReceivingMessagesDone(
+ base::OnceClosure on_done,
+ const grpc::Status& status) {
+ if (status.error_code() == grpc::StatusCode::CANCELLED) {
+ printf("ReceiveMessages stream canceled by client.\n");
+ std::move(on_done).Run();
+ return;
+ }
+ if (!status.ok()) {
+ PrintGrpcStatusError(status);
+ std::move(on_done).Run();
+ return;
+ }
+ printf("Started receiving messages. Press enter to stop streaming...\n");
+ WaitForEnterKey(base::BindOnce(&FtlSignalingPlayground::StopReceivingMessages,
+ weak_factory_.GetWeakPtr(),
+ std::move(on_done)));
+}
+
} // namespace remoting
diff --git a/remoting/test/ftl_signaling_playground.h b/remoting/test/ftl_signaling_playground.h
index 910ffa5..abd39ab 100644
--- a/remoting/test/ftl_signaling_playground.h
+++ b/remoting/test/ftl_signaling_playground.h
@@ -45,7 +45,6 @@
OAuthTokenGetter::Status status,
const std::string& user_email,
const std::string& access_token);
- void HandleGrpcStatusError(const grpc::Status& status);
void GetIceServer(base::OnceClosure on_done);
void OnGetIceServerResponse(base::OnceClosure on_done,
@@ -60,8 +59,12 @@
void PullMessages(base::OnceClosure on_done);
void OnPullMessagesResponse(base::OnceClosure on_done,
const grpc::Status& status);
+ void StartReceivingMessages(base::OnceClosure on_done);
+ void StopReceivingMessages(base::OnceClosure on_done);
void OnMessageReceived(const std::string& sender_id,
const std::string& message);
+ void OnStartReceivingMessagesDone(base::OnceClosure on_done,
+ const grpc::Status& status);
std::unique_ptr<test::TestTokenStorage> storage_;
std::unique_ptr<TestOAuthTokenGetterFactory> token_getter_factory_;