[remoting][FTL] Implement ReceiveMessages stream

This CL implements the ReceiveMessages stream and allows
FtlSignalingPlayground to read from the stream.

Bug: 927962
Change-Id: I4c3b155ccf0f4aa0a5f6b241dd8763cb9fd9f928
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1510052
Commit-Queue: Yuwei Huang <yuweih@chromium.org>
Reviewed-by: Joe Downing <joedow@chromium.org>
Cr-Commit-Position: refs/heads/master@{#643513}
diff --git a/remoting/signaling/BUILD.gn b/remoting/signaling/BUILD.gn
index a292bda3..e025bbc 100644
--- a/remoting/signaling/BUILD.gn
+++ b/remoting/signaling/BUILD.gn
@@ -10,6 +10,8 @@
     "delegating_signal_strategy.h",
     "ftl_grpc_context.cc",
     "ftl_grpc_context.h",
+    "ftl_message_reception_channel.cc",
+    "ftl_message_reception_channel.h",
     "ftl_messaging_client.cc",
     "ftl_messaging_client.h",
     "iq_sender.cc",
@@ -18,6 +20,7 @@
     "jid_util.h",
     "log_to_server.cc",
     "log_to_server.h",
+    "message_reception_channel.h",
     "push_notification_subscriber.cc",
     "push_notification_subscriber.h",
     "server_log_entry.cc",
@@ -61,6 +64,8 @@
     sources -= [
       "ftl_grpc_context.cc",
       "ftl_grpc_context.h",
+      "ftl_message_reception_channel.cc",
+      "ftl_message_reception_channel.h",
       "ftl_messaging_client.cc",
       "ftl_messaging_client.h",
       "log_to_server.cc",
@@ -106,6 +111,7 @@
 
   sources = [
     "ftl_grpc_context_unittest.cc",
+    "ftl_message_reception_channel_unittest.cc",
     "ftl_messaging_client_unittest.cc",
     "iq_sender_unittest.cc",
     "jid_util_unittest.cc",
diff --git a/remoting/signaling/ftl.proto b/remoting/signaling/ftl.proto
index c44550cef..8a7eb04 100644
--- a/remoting/signaling/ftl.proto
+++ b/remoting/signaling/ftl.proto
@@ -194,3 +194,25 @@
 message AckMessagesResponse {
   ResponseHeader header = 1;
 }
+
+message ReceiveMessagesRequest {
+  RequestHeader header = 1;
+}
+
+message ReceiveMessagesResponse {
+  message Pong {}
+
+  message StartOfBatch { int32 count = 1; }
+
+  message EndOfBatch {}
+
+  message RefreshResult {}
+
+  oneof body {
+    InboxMessage inbox_message = 2;
+    Pong pong = 3;
+    StartOfBatch start_of_batch = 4;
+    EndOfBatch end_of_batch = 5;
+    RefreshResult refresh_result = 6;
+  }
+}
diff --git a/remoting/signaling/ftl_grpc_context.h b/remoting/signaling/ftl_grpc_context.h
index f513153..e28f6da4 100644
--- a/remoting/signaling/ftl_grpc_context.h
+++ b/remoting/signaling/ftl_grpc_context.h
@@ -15,6 +15,7 @@
 #include "remoting/base/oauth_token_getter.h"
 #include "remoting/signaling/ftl_services.grpc.pb.h"
 #include "remoting/signaling/grpc_support/grpc_async_dispatcher.h"
+#include "remoting/signaling/grpc_support/scoped_grpc_server_stream.h"
 #include "third_party/grpc/src/include/grpcpp/support/status.h"
 
 namespace remoting {
@@ -25,6 +26,8 @@
   template <typename ResponseType>
   using RpcCallback =
       base::OnceCallback<void(const grpc::Status&, const ResponseType&)>;
+  using StreamStartedCallback =
+      base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>;
 
   static std::string GetChromotingAppIdentifier();
 
@@ -47,6 +50,26 @@
     GetOAuthTokenAndExecuteRpc(std::move(execute_rpc_with_context));
   }
 
+  // |request| doesn't need to set the header field since this class will set
+  // it for you.
+  template <typename RequestType, typename ResponseType>
+  void ExecuteServerStreamingRpc(
+      GrpcAsyncDispatcher::AsyncServerStreamingRpcFunction<RequestType,
+                                                           ResponseType> rpc,
+      const RequestType& request,
+      StreamStartedCallback on_stream_started,
+      const GrpcAsyncDispatcher::RpcStreamCallback<ResponseType>&
+          on_incoming_msg,
+      GrpcAsyncDispatcher::RpcChannelClosedCallback on_channel_closed) {
+    auto execute_rpc_with_context = base::BindOnce(
+        &FtlGrpcContext::ExecuteServerStreamingRpcWithContext<RequestType,
+                                                              ResponseType>,
+        weak_factory_.GetWeakPtr(), std::move(rpc), request,
+        std::move(on_stream_started), on_incoming_msg,
+        std::move(on_channel_closed));
+    GetOAuthTokenAndExecuteRpc(std::move(execute_rpc_with_context));
+  }
+
   std::shared_ptr<grpc::ChannelInterface> channel() { return channel_; }
 
   void SetChannelForTesting(std::shared_ptr<grpc::ChannelInterface> channel);
@@ -66,6 +89,23 @@
                                 std::move(callback));
   }
 
+  template <typename RequestType, typename ResponseType>
+  void ExecuteServerStreamingRpcWithContext(
+      GrpcAsyncDispatcher::AsyncServerStreamingRpcFunction<RequestType,
+                                                           ResponseType> rpc,
+      RequestType request,
+      StreamStartedCallback on_stream_started,
+      const GrpcAsyncDispatcher::RpcStreamCallback<ResponseType>&
+          on_incoming_msg,
+      GrpcAsyncDispatcher::RpcChannelClosedCallback on_channel_closed,
+      std::unique_ptr<grpc::ClientContext> context) {
+    request.set_allocated_header(BuildRequestHeader().release());
+    auto scoped_stream = dispatcher_.ExecuteAsyncServerStreamingRpc(
+        std::move(rpc), std::move(context), request, on_incoming_msg,
+        std::move(on_channel_closed));
+    std::move(on_stream_started).Run(std::move(scoped_stream));
+  }
+
   void GetOAuthTokenAndExecuteRpc(
       ExecuteRpcWithContextCallback execute_rpc_with_context);
 
diff --git a/remoting/signaling/ftl_grpc_context_unittest.cc b/remoting/signaling/ftl_grpc_context_unittest.cc
index 325bd3b..018561a 100644
--- a/remoting/signaling/ftl_grpc_context_unittest.cc
+++ b/remoting/signaling/ftl_grpc_context_unittest.cc
@@ -28,11 +28,16 @@
 
 constexpr char kFakeUserEmail[] = "fake@gmail.com";
 constexpr char kFakeAccessToken[] = "Dummy Token";
+constexpr char kFakeFtlAuthToken[] = "Dummy FTL Token";
 
 using PullMessagesCallback =
     FtlGrpcContext::RpcCallback<ftl::PullMessagesResponse>;
+using IncomingMessageCallback =
+    GrpcAsyncDispatcher::RpcStreamCallback<ftl::ReceiveMessagesResponse>;
 using PullMessagesResponder =
     test::GrpcServerResponder<ftl::PullMessagesResponse>;
+using ReceiveMessagesResponder =
+    test::GrpcServerStreamResponder<ftl::ReceiveMessagesResponse>;
 
 PullMessagesCallback QuitRunLoopOnPullMessagesCallback(
     base::RunLoop* run_loop) {
@@ -54,8 +59,14 @@
       google::internal::communications::instantmessaging::v1::Messaging;
 
   void SendFakePullMessagesRequest(PullMessagesCallback on_response);
+  void SendFakeReceiveMessagesRequest(
+      FtlGrpcContext::StreamStartedCallback on_stream_started,
+      const IncomingMessageCallback& on_incoming_msg,
+      GrpcAsyncDispatcher::RpcChannelClosedCallback on_channel_closed);
   std::unique_ptr<PullMessagesResponder> HandlePullMessages(
       ftl::PullMessagesRequest* request);
+  std::unique_ptr<ReceiveMessagesResponder> HandleReceiveMessages(
+      ftl::ReceiveMessagesRequest* request);
 
   scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
   std::unique_ptr<FtlGrpcContext> context_;
@@ -93,12 +104,30 @@
                        request, std::move(on_response));
 }
 
+void FtlGrpcContextTest::SendFakeReceiveMessagesRequest(
+    FtlGrpcContext::StreamStartedCallback on_stream_started,
+    const IncomingMessageCallback& on_incoming_msg,
+    GrpcAsyncDispatcher::RpcChannelClosedCallback on_channel_closed) {
+  context_->ExecuteServerStreamingRpc(
+      base::BindOnce(&Messaging::Stub::AsyncReceiveMessages,
+                     base::Unretained(stub_.get())),
+      ftl::ReceiveMessagesRequest(), std::move(on_stream_started),
+      on_incoming_msg, std::move(on_channel_closed));
+}
+
 std::unique_ptr<PullMessagesResponder> FtlGrpcContextTest::HandlePullMessages(
     ftl::PullMessagesRequest* request) {
   return server_->HandleRequest(&Messaging::AsyncService::RequestPullMessages,
                                 request);
 }
 
+std::unique_ptr<ReceiveMessagesResponder>
+FtlGrpcContextTest::HandleReceiveMessages(
+    ftl::ReceiveMessagesRequest* request) {
+  return server_->HandleStreamRequest(
+      &Messaging::AsyncService::RequestReceiveMessages, request);
+}
+
 TEST_F(FtlGrpcContextTest, VerifyAPIKeyIsProvided) {
   base::RunLoop run_loop;
 
@@ -153,10 +182,8 @@
 }
 
 TEST_F(FtlGrpcContextTest, HasAuthTokenIfSet) {
-  constexpr char dummy_ftl_token[] = "Dummy FTL Token";
-
   base::RunLoop run_loop;
-  context_->SetAuthToken(dummy_ftl_token);
+  context_->SetAuthToken(kFakeFtlAuthToken);
 
   SendFakePullMessagesRequest(QuitRunLoopOnPullMessagesCallback(&run_loop));
 
@@ -164,12 +191,57 @@
       FROM_HERE, base::BindLambdaForTesting([&]() {
         ftl::PullMessagesRequest request;
         auto responder = HandlePullMessages(&request);
-        ASSERT_EQ(dummy_ftl_token, request.header().auth_token_payload());
+        ASSERT_EQ(kFakeFtlAuthToken, request.header().auth_token_payload());
         responder->Respond(ftl::PullMessagesResponse(), grpc::Status::OK);
       }));
   run_loop.Run();
 }
 
+TEST_F(FtlGrpcContextTest, ServerStreamingScenario) {
+  base::RunLoop run_loop;
+  context_->SetAuthToken(kFakeFtlAuthToken);
+
+  ftl::InboxMessage fake_inbox_message;
+  fake_inbox_message.set_message_id("msg_1");
+
+  std::unique_ptr<ScopedGrpcServerStream> server_stream;
+  std::unique_ptr<ReceiveMessagesResponder> responder;
+
+  SendFakeReceiveMessagesRequest(
+      // On stream started
+      base::BindLambdaForTesting(
+          [&](std::unique_ptr<ScopedGrpcServerStream> stream) {
+            server_stream = std::move(stream);
+
+            ftl::ReceiveMessagesRequest request;
+            responder = HandleReceiveMessages(&request);
+            ASSERT_TRUE(request.has_header());
+            ASSERT_FALSE(request.header().request_id().empty());
+            ASSERT_FALSE(request.header().app().empty());
+            ASSERT_TRUE(request.header().has_client_info());
+            ASSERT_EQ(kFakeFtlAuthToken, request.header().auth_token_payload());
+
+            ftl::ReceiveMessagesResponse response;
+            response.set_allocated_inbox_message(
+                new ftl::InboxMessage(fake_inbox_message));
+            responder->SendMessage(response);
+          }),
+
+      // On message received
+      base::BindLambdaForTesting(
+          [&](const ftl::ReceiveMessagesResponse& response) {
+            ASSERT_EQ(fake_inbox_message.message_id(),
+                      response.inbox_message().message_id());
+            responder->Close(grpc::Status::OK);
+          }),
+
+      // On channel closed
+      test::CheckStatusThenQuitRunLoopCallback(FROM_HERE, grpc::StatusCode::OK,
+                                               &run_loop));
+
+  run_loop.Run();
+}
+
 // TODO(yuweih): Ideally we should verify access token is properly attached too,
 // but currently this information seems to be lost in ServiceContext.
 
diff --git a/remoting/signaling/ftl_message_reception_channel.cc b/remoting/signaling/ftl_message_reception_channel.cc
new file mode 100644
index 0000000..ab74d048
--- /dev/null
+++ b/remoting/signaling/ftl_message_reception_channel.cc
@@ -0,0 +1,215 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/signaling/ftl_message_reception_channel.h"
+
+#include <utility>
+
+#include "base/bind_helpers.h"
+#include "base/callback.h"
+#include "base/logging.h"
+#include "remoting/signaling/ftl_grpc_context.h"
+#include "remoting/signaling/ftl_services.grpc.pb.h"
+#include "remoting/signaling/grpc_support/scoped_grpc_server_stream.h"
+
+namespace remoting {
+
+namespace {
+
+const net::BackoffEntry::Policy kBackoffPolicy = {
+    // Number of initial errors (in sequence) to ignore before applying
+    // exponential back-off rules.
+    0,
+
+    // Initial delay for exponential back-off in ms.
+    FtlMessageReceptionChannel::kBackoffInitialDelay.InMilliseconds(),
+
+    // Factor by which the waiting time will be multiplied.
+    2,
+
+    // Fuzzing percentage. ex: 10% will spread requests randomly
+    // between 90%-100% of the calculated time.
+    0.5,
+
+    // Maximum amount of time we are willing to delay our request in ms.
+    FtlMessageReceptionChannel::kBackoffMaxDelay.InMilliseconds(),
+
+    // Time to keep an entry from being discarded even when it
+    // has no significant state, -1 to never discard.
+    -1,
+
+    // Starts with initial delay.
+    false,
+};
+
+}  // namespace
+
+constexpr base::TimeDelta FtlMessageReceptionChannel::kPongTimeout;
+constexpr base::TimeDelta FtlMessageReceptionChannel::kStreamLifetime;
+constexpr base::TimeDelta FtlMessageReceptionChannel::kBackoffInitialDelay;
+constexpr base::TimeDelta FtlMessageReceptionChannel::kBackoffMaxDelay;
+
+FtlMessageReceptionChannel::FtlMessageReceptionChannel()
+    : reconnect_retry_backoff_(&kBackoffPolicy), weak_factory_(this) {}
+
+FtlMessageReceptionChannel::~FtlMessageReceptionChannel() = default;
+
+void FtlMessageReceptionChannel::Initialize(
+    const StreamOpener& stream_opener,
+    const MessageCallback& on_incoming_msg) {
+  DCHECK(stream_opener);
+  DCHECK(on_incoming_msg);
+  DCHECK(!stream_opener_);
+  DCHECK(!on_incoming_msg_);
+  stream_opener_ = stream_opener;
+  on_incoming_msg_ = on_incoming_msg;
+}
+
+void FtlMessageReceptionChannel::StartReceivingMessages(DoneCallback on_done) {
+  if (state_ == State::STARTED) {
+    std::move(on_done).Run(grpc::Status::OK);
+    return;
+  }
+  start_receiving_messages_callbacks_.push_back(std::move(on_done));
+  if (state_ == State::STARTING) {
+    return;
+  }
+  StartReceivingMessagesInternal();
+}
+
+void FtlMessageReceptionChannel::StopReceivingMessages() {
+  if (state_ == State::STOPPED) {
+    return;
+  }
+  StopReceivingMessagesInternal();
+  RunStartReceivingMessagesCallbacks(grpc::Status::CANCELLED);
+}
+
+const net::BackoffEntry&
+FtlMessageReceptionChannel::GetReconnectRetryBackoffEntryForTesting() const {
+  return reconnect_retry_backoff_;
+}
+
+void FtlMessageReceptionChannel::OnReceiveMessagesStreamStarted(
+    std::unique_ptr<ScopedGrpcServerStream> stream) {
+  receive_messages_stream_ = std::move(stream);
+}
+
+void FtlMessageReceptionChannel::OnReceiveMessagesStreamClosed(
+    const grpc::Status& status) {
+  if (state_ == State::STOPPED) {
+    // Previously closed by the caller.
+    return;
+  }
+  if (status.error_code() == grpc::StatusCode::ABORTED ||
+      status.error_code() == grpc::StatusCode::UNAVAILABLE) {
+    // These are 'soft' connection errors that should be retried.
+    // Other errors should be ignored.  See this file for more info:
+    // third_party/grpc/src/include/grpcpp/impl/codegen/status_code_enum.h
+    RetryStartReceivingMessagesWithBackoff();
+    return;
+  }
+  StopReceivingMessagesInternal();
+  RunStartReceivingMessagesCallbacks(status);
+}
+
+void FtlMessageReceptionChannel::OnMessageReceived(
+    const ftl::ReceiveMessagesResponse& response) {
+  switch (response.body_case()) {
+    case ftl::ReceiveMessagesResponse::BodyCase::kInboxMessage: {
+      VLOG(0) << "Received message";
+      on_incoming_msg_.Run(response.inbox_message());
+      break;
+    }
+    case ftl::ReceiveMessagesResponse::BodyCase::kPong:
+      VLOG(0) << "Received pong";
+      stream_pong_timer_->Reset();
+      break;
+    case ftl::ReceiveMessagesResponse::BodyCase::kStartOfBatch:
+      state_ = State::STARTED;
+      RunStartReceivingMessagesCallbacks(grpc::Status::OK);
+      BeginStreamTimers();
+      break;
+    case ftl::ReceiveMessagesResponse::BodyCase::kEndOfBatch:
+      VLOG(0) << "Received end of batch";
+      break;
+    default:
+      LOG(WARNING) << "Received unknown message type: " << response.body_case();
+      break;
+  }
+}
+
+void FtlMessageReceptionChannel::RunStartReceivingMessagesCallbacks(
+    const grpc::Status& status) {
+  if (start_receiving_messages_callbacks_.empty()) {
+    return;
+  }
+  for (DoneCallback& callback : start_receiving_messages_callbacks_) {
+    std::move(callback).Run(status);
+  }
+  start_receiving_messages_callbacks_.clear();
+}
+
+void FtlMessageReceptionChannel::RetryStartReceivingMessagesWithBackoff() {
+  reconnect_retry_backoff_.InformOfRequest(false);
+  VLOG(0) << "RetryStartReceivingMessages will be called with backoff: "
+          << reconnect_retry_backoff_.GetTimeUntilRelease();
+  reconnect_retry_timer_.Start(
+      FROM_HERE, reconnect_retry_backoff_.GetTimeUntilRelease(),
+      base::BindOnce(&FtlMessageReceptionChannel::RetryStartReceivingMessages,
+                     base::Unretained(this)));
+}
+
+void FtlMessageReceptionChannel::RetryStartReceivingMessages() {
+  VLOG(0) << "RetryStartReceivingMessages called";
+  StopReceivingMessagesInternal();
+  StartReceivingMessagesInternal();
+}
+
+void FtlMessageReceptionChannel::StartReceivingMessagesInternal() {
+  DCHECK_EQ(State::STOPPED, state_);
+  state_ = State::STARTING;
+  stream_opener_.Run(
+      base::BindOnce(
+          &FtlMessageReceptionChannel::OnReceiveMessagesStreamStarted,
+          weak_factory_.GetWeakPtr()),
+      base::BindRepeating(&FtlMessageReceptionChannel::OnMessageReceived,
+                          weak_factory_.GetWeakPtr()),
+      base::BindOnce(&FtlMessageReceptionChannel::OnReceiveMessagesStreamClosed,
+                     weak_factory_.GetWeakPtr()));
+}
+
+void FtlMessageReceptionChannel::StopReceivingMessagesInternal() {
+  DCHECK_NE(State::STOPPED, state_);
+  state_ = State::STOPPED;
+  receive_messages_stream_.reset();
+  reconnect_retry_timer_.Stop();
+  stream_lifetime_timer_.Stop();
+  stream_pong_timer_.reset();
+}
+
+void FtlMessageReceptionChannel::BeginStreamTimers() {
+  reconnect_retry_backoff_.Reset();
+  stream_pong_timer_ = std::make_unique<base::DelayTimer>(
+      FROM_HERE, kPongTimeout, this,
+      &FtlMessageReceptionChannel::OnPongTimeout);
+  stream_pong_timer_->Reset();
+  stream_lifetime_timer_.Start(
+      FROM_HERE, kStreamLifetime,
+      base::BindOnce(&FtlMessageReceptionChannel::OnStreamLifetimeExceeded,
+                     base::Unretained(this)));
+}
+
+void FtlMessageReceptionChannel::OnPongTimeout() {
+  LOG(WARNING) << "Timed out waiting for PONG message from server.";
+  RetryStartReceivingMessagesWithBackoff();
+}
+
+void FtlMessageReceptionChannel::OnStreamLifetimeExceeded() {
+  VLOG(0) << "Reached maximum lifetime for current stream.";
+  reconnect_retry_backoff_.Reset();
+  RetryStartReceivingMessages();
+}
+
+}  // namespace remoting
diff --git a/remoting/signaling/ftl_message_reception_channel.h b/remoting/signaling/ftl_message_reception_channel.h
new file mode 100644
index 0000000..a105d67
--- /dev/null
+++ b/remoting/signaling/ftl_message_reception_channel.h
@@ -0,0 +1,88 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_SIGNALING_FTL_MESSAGE_RECEPTION_CHANNEL_H_
+#define REMOTING_SIGNALING_FTL_MESSAGE_RECEPTION_CHANNEL_H_
+
+#include <list>
+#include <memory>
+
+#include "base/callback_forward.h"
+#include "base/macros.h"
+#include "base/memory/weak_ptr.h"
+#include "base/time/time.h"
+#include "base/timer/timer.h"
+#include "net/base/backoff_entry.h"
+#include "remoting/signaling/message_reception_channel.h"
+
+namespace remoting {
+
+// Handles the lifetime and validity of the messaging stream used for FTL.
+class FtlMessageReceptionChannel final : public MessageReceptionChannel {
+ public:
+  static constexpr base::TimeDelta kPongTimeout =
+      base::TimeDelta::FromSeconds(15);
+  static constexpr base::TimeDelta kStreamLifetime =
+      base::TimeDelta::FromMinutes(13);
+  static constexpr base::TimeDelta kBackoffInitialDelay =
+      base::TimeDelta::FromSeconds(1);
+  static constexpr base::TimeDelta kBackoffMaxDelay =
+      base::TimeDelta::FromMinutes(1);
+
+  FtlMessageReceptionChannel();
+  ~FtlMessageReceptionChannel() override;
+
+  // MessageReceptionChannel implementations.
+  void Initialize(const StreamOpener& stream_opener,
+                  const MessageCallback& on_incoming_msg) override;
+  void StartReceivingMessages(DoneCallback on_done) override;
+  void StopReceivingMessages() override;
+
+  const net::BackoffEntry& GetReconnectRetryBackoffEntryForTesting() const;
+
+ private:
+  enum class State {
+    // TODO(yuweih): Evaluate if this class needs to be reusable.
+    STOPPED,
+
+    // StartReceivingMessages() is called but the channel hasn't received a
+    // signal from the server yet.
+    STARTING,
+
+    // Stream is started, or is dropped but being retried.
+    STARTED,
+  };
+
+  void OnReceiveMessagesStreamStarted(
+      std::unique_ptr<ScopedGrpcServerStream> stream);
+  void OnReceiveMessagesStreamClosed(const grpc::Status& status);
+  void OnMessageReceived(const ftl::ReceiveMessagesResponse& response);
+
+  void RunStartReceivingMessagesCallbacks(const grpc::Status& status);
+  void RetryStartReceivingMessagesWithBackoff();
+  void RetryStartReceivingMessages();
+  void StartReceivingMessagesInternal();
+  void StopReceivingMessagesInternal();
+
+  void BeginStreamTimers();
+  void OnPongTimeout();
+  void OnStreamLifetimeExceeded();
+
+  StreamOpener stream_opener_;
+  MessageCallback on_incoming_msg_;
+  std::unique_ptr<ScopedGrpcServerStream> receive_messages_stream_;
+  std::list<DoneCallback> start_receiving_messages_callbacks_;
+  State state_ = State::STOPPED;
+  net::BackoffEntry reconnect_retry_backoff_;
+  base::OneShotTimer reconnect_retry_timer_;
+  base::OneShotTimer stream_lifetime_timer_;
+  std::unique_ptr<base::DelayTimer> stream_pong_timer_;
+
+  base::WeakPtrFactory<FtlMessageReceptionChannel> weak_factory_;
+  DISALLOW_COPY_AND_ASSIGN(FtlMessageReceptionChannel);
+};
+
+}  // namespace remoting
+
+#endif  // REMOTING_SIGNALING_FTL_MESSAGE_RECEPTION_CHANNEL_H_
diff --git a/remoting/signaling/ftl_message_reception_channel_unittest.cc b/remoting/signaling/ftl_message_reception_channel_unittest.cc
new file mode 100644
index 0000000..659c8848
--- /dev/null
+++ b/remoting/signaling/ftl_message_reception_channel_unittest.cc
@@ -0,0 +1,454 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/signaling/ftl_message_reception_channel.h"
+
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "base/bind.h"
+#include "base/logging.h"
+#include "base/memory/weak_ptr.h"
+#include "base/run_loop.h"
+#include "base/test/bind_test_util.h"
+#include "base/test/mock_callback.h"
+#include "base/test/scoped_task_environment.h"
+#include "remoting/signaling/ftl.pb.h"
+#include "remoting/signaling/grpc_support/grpc_test_util.h"
+#include "remoting/signaling/grpc_support/scoped_grpc_server_stream.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace remoting {
+
+namespace {
+
+using ::testing::_;
+using ::testing::Expectation;
+using ::testing::Invoke;
+using ::testing::Property;
+using ::testing::Return;
+
+// Fake stream implementation to allow probing if a stream is closed by client.
+class FakeScopedGrpcServerStream : public ScopedGrpcServerStream {
+ public:
+  FakeScopedGrpcServerStream()
+      : ScopedGrpcServerStream(nullptr), weak_factory_(this) {}
+  ~FakeScopedGrpcServerStream() override = default;
+
+  base::WeakPtr<FakeScopedGrpcServerStream> GetWeakPtr() {
+    return weak_factory_.GetWeakPtr();
+  }
+
+ private:
+  base::WeakPtrFactory<FakeScopedGrpcServerStream> weak_factory_;
+  DISALLOW_COPY_AND_ASSIGN(FakeScopedGrpcServerStream);
+};
+
+std::unique_ptr<FakeScopedGrpcServerStream> CreateFakeServerStream() {
+  return std::make_unique<FakeScopedGrpcServerStream>();
+}
+
+ftl::ReceiveMessagesResponse CreateStartOfBatchResponse() {
+  ftl::ReceiveMessagesResponse response;
+  response.mutable_start_of_batch();
+  return response;
+}
+
+base::OnceCallback<void(const grpc::Status& status)> AssertOkCallback() {
+  return base::BindOnce(
+      [](const grpc::Status& status) { ASSERT_TRUE(status.ok()); });
+}
+
+}  // namespace
+
+class FtlMessageReceptionChannelTest : public testing::Test {
+ public:
+  void SetUp() override;
+  void TearDown() override;
+
+ protected:
+  base::TimeDelta GetTimeUntilRetry() const;
+  int GetRetryFailureCount() const;
+
+  base::test::ScopedTaskEnvironment scoped_task_environment_{
+      base::test::ScopedTaskEnvironment::MainThreadType::MOCK_TIME,
+      base::test::ScopedTaskEnvironment::NowSource::MAIN_THREAD_MOCK_TIME};
+  std::unique_ptr<FtlMessageReceptionChannel> channel_;
+  base::MockCallback<FtlMessageReceptionChannel::StreamOpener>
+      mock_stream_opener_;
+  base::MockCallback<base::RepeatingCallback<void(const ftl::InboxMessage&)>>
+      mock_on_incoming_msg_;
+};
+
+void FtlMessageReceptionChannelTest::SetUp() {
+  channel_ = std::make_unique<FtlMessageReceptionChannel>();
+  channel_->Initialize(mock_stream_opener_.Get(), mock_on_incoming_msg_.Get());
+}
+
+void FtlMessageReceptionChannelTest::TearDown() {
+  channel_.reset();
+  scoped_task_environment_.FastForwardUntilNoTasksRemain();
+}
+
+base::TimeDelta FtlMessageReceptionChannelTest::GetTimeUntilRetry() const {
+  return channel_->GetReconnectRetryBackoffEntryForTesting()
+      .GetTimeUntilRelease();
+}
+
+int FtlMessageReceptionChannelTest::GetRetryFailureCount() const {
+  return channel_->GetReconnectRetryBackoffEntryForTesting().failure_count();
+}
+
+TEST_F(FtlMessageReceptionChannelTest,
+       TestStartReceivingMessages_StoppedImmediately) {
+  base::RunLoop run_loop;
+
+  EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
+      .WillOnce(Invoke(
+          [&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+                  on_stream_started,
+              const base::RepeatingCallback<void(
+                  const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
+              base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
+            std::move(on_stream_started).Run(CreateFakeServerStream());
+            channel_->StopReceivingMessages();
+          }));
+
+  channel_->StartReceivingMessages(test::CheckStatusThenQuitRunLoopCallback(
+      FROM_HERE, grpc::StatusCode::CANCELLED, &run_loop));
+
+  run_loop.Run();
+}
+
+TEST_F(FtlMessageReceptionChannelTest,
+       TestStartReceivingMessages_NotAuthenticated) {
+  base::RunLoop run_loop;
+
+  EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
+      .WillOnce(Invoke(
+          [&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+                  on_stream_started,
+              const base::RepeatingCallback<void(
+                  const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
+              base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
+            std::move(on_stream_started).Run(CreateFakeServerStream());
+            std::move(on_channel_closed)
+                .Run(grpc::Status(grpc::StatusCode::UNAUTHENTICATED, ""));
+          }));
+
+  channel_->StartReceivingMessages(test::CheckStatusThenQuitRunLoopCallback(
+      FROM_HERE, grpc::StatusCode::UNAUTHENTICATED, &run_loop));
+
+  run_loop.Run();
+}
+
+TEST_F(FtlMessageReceptionChannelTest,
+       TestStartReceivingMessages_StreamStarted) {
+  base::RunLoop run_loop;
+
+  EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
+      .WillOnce(Invoke(
+          [&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+                  on_stream_started,
+              const base::RepeatingCallback<void(
+                  const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
+              base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
+            std::move(on_stream_started).Run(CreateFakeServerStream());
+            on_incoming_msg.Run(CreateStartOfBatchResponse());
+          }));
+
+  channel_->StartReceivingMessages(test::CheckStatusThenQuitRunLoopCallback(
+      FROM_HERE, grpc::StatusCode::OK, &run_loop));
+
+  run_loop.Run();
+}
+
+TEST_F(FtlMessageReceptionChannelTest,
+       TestStartReceivingMessages_RecoverableStreamError) {
+  base::RunLoop run_loop;
+
+  base::WeakPtr<FakeScopedGrpcServerStream> old_stream;
+  EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
+      .WillOnce(Invoke(
+          [&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+                  on_stream_started,
+              const base::RepeatingCallback<void(
+                  const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
+              base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
+            // The first open stream attempt fails with UNAVAILABLE error.
+            auto fake_server_stream = CreateFakeServerStream();
+            old_stream = fake_server_stream->GetWeakPtr();
+            std::move(on_stream_started).Run(std::move(fake_server_stream));
+
+            ASSERT_EQ(0, GetRetryFailureCount());
+
+            std::move(on_channel_closed)
+                .Run(grpc::Status(grpc::StatusCode::UNAVAILABLE, ""));
+
+            ASSERT_EQ(1, GetRetryFailureCount());
+            ASSERT_NEAR(
+                FtlMessageReceptionChannel::kBackoffInitialDelay.InSecondsF(),
+                GetTimeUntilRetry().InSecondsF(), 0.5);
+
+            // This will make the channel reopen the stream.
+            scoped_task_environment_.FastForwardBy(GetTimeUntilRetry());
+          }))
+      .WillOnce(Invoke(
+          [&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+                  on_stream_started,
+              const base::RepeatingCallback<void(
+                  const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
+              base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
+            // Second open stream attempt succeeds.
+
+            // Assert old stream closed.
+            ASSERT_FALSE(old_stream);
+
+            // Send a StartOfBatch and verify it resets the failure counter.
+            std::move(on_stream_started).Run(CreateFakeServerStream());
+            on_incoming_msg.Run(CreateStartOfBatchResponse());
+
+            ASSERT_EQ(0, GetRetryFailureCount());
+          }));
+
+  channel_->StartReceivingMessages(test::CheckStatusThenQuitRunLoopCallback(
+      FROM_HERE, grpc::StatusCode::OK, &run_loop));
+
+  run_loop.Run();
+}
+
+TEST_F(FtlMessageReceptionChannelTest,
+       TestStartReceivingMessages_MultipleCalls) {
+  base::RunLoop run_loop;
+
+  base::MockCallback<FtlMessageReceptionChannel::DoneCallback>
+      start_receiving_messages_callback;
+
+  // Exits the run loop iff the callback is called three times with OK.
+  EXPECT_CALL(start_receiving_messages_callback,
+              Run(Property(&grpc::Status::error_code, grpc::StatusCode::OK)))
+      .WillOnce(Return())
+      .WillOnce(Return())
+      .WillOnce(Invoke([&](const grpc::Status& status) { run_loop.Quit(); }));
+
+  EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
+      .WillOnce(Invoke(
+          [&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+                  on_stream_started,
+              const base::RepeatingCallback<void(
+                  const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
+              base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
+            std::move(on_stream_started).Run(CreateFakeServerStream());
+            on_incoming_msg.Run(CreateStartOfBatchResponse());
+          }));
+
+  channel_->StartReceivingMessages(start_receiving_messages_callback.Get());
+  channel_->StartReceivingMessages(start_receiving_messages_callback.Get());
+  channel_->StartReceivingMessages(start_receiving_messages_callback.Get());
+
+  run_loop.Run();
+}
+
+TEST_F(FtlMessageReceptionChannelTest, StreamsTwoMessages) {
+  base::RunLoop run_loop;
+
+  constexpr char kMessage1Id[] = "msg_1";
+  constexpr char kMessage2Id[] = "msg_2";
+
+  ftl::InboxMessage message_1;
+  message_1.set_message_id(kMessage1Id);
+  ftl::InboxMessage message_2;
+  message_2.set_message_id(kMessage2Id);
+
+  EXPECT_CALL(mock_on_incoming_msg_,
+              Run(Property(&ftl::InboxMessage::message_id, kMessage1Id)))
+      .WillOnce(Return());
+  EXPECT_CALL(mock_on_incoming_msg_,
+              Run(Property(&ftl::InboxMessage::message_id, kMessage2Id)))
+      .WillOnce(Invoke([&](const ftl::InboxMessage&) { run_loop.Quit(); }));
+
+  EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
+      .WillOnce(Invoke(
+          [&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+                  on_stream_started,
+              const base::RepeatingCallback<void(
+                  const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
+              base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
+            std::move(on_stream_started).Run(CreateFakeServerStream());
+
+            on_incoming_msg.Run(CreateStartOfBatchResponse());
+
+            ftl::ReceiveMessagesResponse response;
+            *response.mutable_inbox_message() = message_1;
+            on_incoming_msg.Run(response);
+            response.Clear();
+
+            *response.mutable_inbox_message() = message_2;
+            on_incoming_msg.Run(response);
+            response.Clear();
+
+            std::move(on_channel_closed).Run(grpc::Status::OK);
+          }));
+
+  channel_->StartReceivingMessages(AssertOkCallback());
+
+  run_loop.Run();
+}
+
+TEST_F(FtlMessageReceptionChannelTest, NoPongWithinTimeout_ResetsStream) {
+  base::RunLoop run_loop;
+
+  base::WeakPtr<FakeScopedGrpcServerStream> old_stream;
+  EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
+      .WillOnce(Invoke(
+          [&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+                  on_stream_started,
+              const base::RepeatingCallback<void(
+                  const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
+              base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
+            auto fake_server_stream = CreateFakeServerStream();
+            old_stream = fake_server_stream->GetWeakPtr();
+            std::move(on_stream_started).Run(std::move(fake_server_stream));
+            on_incoming_msg.Run(CreateStartOfBatchResponse());
+            scoped_task_environment_.FastForwardBy(
+                FtlMessageReceptionChannel::kPongTimeout);
+
+            ASSERT_EQ(1, GetRetryFailureCount());
+            ASSERT_NEAR(
+                FtlMessageReceptionChannel::kBackoffInitialDelay.InSecondsF(),
+                GetTimeUntilRetry().InSecondsF(), 0.5);
+
+            // This will make the channel reopen the stream.
+            scoped_task_environment_.FastForwardBy(GetTimeUntilRetry());
+          }))
+      .WillOnce(Invoke(
+          [&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+                  on_stream_started,
+              const base::RepeatingCallback<void(
+                  const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
+              base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
+            // Stream is reopened.
+
+            // Assert old stream closed.
+            ASSERT_FALSE(old_stream);
+
+            std::move(on_stream_started).Run(CreateFakeServerStream());
+
+            // Sends a StartOfBatch and verify that it resets the failure
+            // counter.
+            on_incoming_msg.Run(CreateStartOfBatchResponse());
+            ASSERT_EQ(0, GetRetryFailureCount());
+            run_loop.Quit();
+          }));
+
+  channel_->StartReceivingMessages(AssertOkCallback());
+
+  run_loop.Run();
+}
+
+TEST_F(FtlMessageReceptionChannelTest, LifetimeExceeded_ResetsStream) {
+  base::RunLoop run_loop;
+
+  EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
+      .WillOnce(Invoke(
+          [&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+                  on_stream_started,
+              const base::RepeatingCallback<void(
+                  const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
+              base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
+            auto fake_server_stream = CreateFakeServerStream();
+            base::WeakPtr<FakeScopedGrpcServerStream> old_stream =
+                fake_server_stream->GetWeakPtr();
+            std::move(on_stream_started).Run(std::move(fake_server_stream));
+            on_incoming_msg.Run(CreateStartOfBatchResponse());
+
+            // Keep sending pong until lifetime exceeded.
+            base::TimeDelta pong_period =
+                FtlMessageReceptionChannel::kPongTimeout -
+                base::TimeDelta::FromSeconds(1);
+            ASSERT_LT(base::TimeDelta(), pong_period);
+            base::TimeDelta ticked_time;
+
+            // The last FastForwardBy() will make the channel reopen the stream.
+            while (ticked_time <= FtlMessageReceptionChannel::kPongTimeout) {
+              scoped_task_environment_.FastForwardBy(pong_period);
+              ticked_time += pong_period;
+            }
+            ASSERT_FALSE(old_stream);
+          }))
+      .WillOnce(Invoke(
+          [&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+                  on_stream_started,
+              const base::RepeatingCallback<void(
+                  const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
+              base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
+            // Reopening the stream. Send StartOfBatch and verify that it resets
+            // the failure counter.
+            std::move(on_stream_started).Run(CreateFakeServerStream());
+            on_incoming_msg.Run(CreateStartOfBatchResponse());
+            ASSERT_EQ(0, GetRetryFailureCount());
+            run_loop.Quit();
+          }));
+
+  channel_->StartReceivingMessages(AssertOkCallback());
+
+  run_loop.Run();
+}
+
+TEST_F(FtlMessageReceptionChannelTest, TimeoutIncreasesToMaximum) {
+  base::RunLoop run_loop;
+
+  int failure_count = 0;
+  int hitting_max_delay_count = 0;
+  EXPECT_CALL(mock_stream_opener_, Run(_, _, _))
+      .WillRepeatedly(Invoke(
+          [&](base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+                  on_stream_started,
+              const base::RepeatingCallback<void(
+                  const ftl::ReceiveMessagesResponse&)>& on_incoming_msg,
+              base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
+            std::move(on_stream_started).Run(CreateFakeServerStream());
+
+            // Quit if delay is ~kBackoffMaxDelay three times.
+            if (hitting_max_delay_count == 3) {
+              on_incoming_msg.Run(CreateStartOfBatchResponse());
+              ASSERT_EQ(0, GetRetryFailureCount());
+              run_loop.Quit();
+              return;
+            }
+
+            // Otherwise send UNAVAILABLE to reset the stream.
+
+            std::move(on_channel_closed)
+                .Run(grpc::Status(grpc::StatusCode::UNAVAILABLE, ""));
+
+            int new_failure_count = GetRetryFailureCount();
+            ASSERT_LT(failure_count, new_failure_count);
+            failure_count = new_failure_count;
+
+            base::TimeDelta time_until_retry = GetTimeUntilRetry();
+
+            base::TimeDelta max_delay_diff =
+                time_until_retry - FtlMessageReceptionChannel::kBackoffMaxDelay;
+
+            // Adjust for fuzziness.
+            if (max_delay_diff.magnitude() <
+                base::TimeDelta::FromMilliseconds(500)) {
+              hitting_max_delay_count++;
+            }
+
+            // This will tail-recursively call the stream opener.
+            scoped_task_environment_.FastForwardBy(time_until_retry);
+          }));
+
+  channel_->StartReceivingMessages(AssertOkCallback());
+
+  run_loop.Run();
+}
+
+}  // namespace remoting
diff --git a/remoting/signaling/ftl_messaging_client.cc b/remoting/signaling/ftl_messaging_client.cc
index 106f71d..46d0aa1 100644
--- a/remoting/signaling/ftl_messaging_client.cc
+++ b/remoting/signaling/ftl_messaging_client.cc
@@ -6,9 +6,11 @@
 
 #include <utility>
 
+#include "base/bind_helpers.h"
 #include "base/callback.h"
 #include "base/logging.h"
 #include "remoting/signaling/ftl_grpc_context.h"
+#include "remoting/signaling/ftl_message_reception_channel.h"
 
 namespace remoting {
 
@@ -28,6 +30,12 @@
     : weak_factory_(this) {
   context_ = context;
   messaging_stub_ = Messaging::NewStub(context_->channel());
+  reception_channel_ = std::make_unique<FtlMessageReceptionChannel>();
+  reception_channel_->Initialize(
+      base::BindRepeating(&FtlMessagingClient::OpenReceiveMessagesStream,
+                          weak_factory_.GetWeakPtr()),
+      base::BindRepeating(&FtlMessagingClient::OnMessageReceived,
+                          weak_factory_.GetWeakPtr()));
 }
 
 FtlMessagingClient::~FtlMessagingClient() = default;
@@ -46,6 +54,24 @@
                      weak_factory_.GetWeakPtr(), std::move(on_done)));
 }
 
+void FtlMessagingClient::StartReceivingMessages(DoneCallback on_done) {
+  reception_channel_->StartReceivingMessages(std::move(on_done));
+}
+
+void FtlMessagingClient::StopReceivingMessages() {
+  reception_channel_->StopReceivingMessages();
+}
+
+void FtlMessagingClient::SetMessageReceptionChannelForTesting(
+    std::unique_ptr<MessageReceptionChannel> channel) {
+  reception_channel_ = std::move(channel);
+  reception_channel_->Initialize(
+      base::BindRepeating(&FtlMessagingClient::OpenReceiveMessagesStream,
+                          weak_factory_.GetWeakPtr()),
+      base::BindRepeating(&FtlMessagingClient::OnMessageReceived,
+                          weak_factory_.GetWeakPtr()));
+}
+
 void FtlMessagingClient::OnPullMessagesResponse(
     DoneCallback on_done,
     const grpc::Status& status,
@@ -70,7 +96,6 @@
     return;
   }
 
-  // TODO(yuweih): May need retry logic.
   VLOG(0) << "Acking " << ack_request.messages_size() << " messages";
 
   AckMessages(ack_request, std::move(on_done));
@@ -90,9 +115,23 @@
     DoneCallback on_done,
     const grpc::Status& status,
     const ftl::AckMessagesResponse& response) {
+  // TODO(yuweih): Handle failure.
   std::move(on_done).Run(status);
 }
 
+void FtlMessagingClient::OpenReceiveMessagesStream(
+    base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+        on_stream_started,
+    const base::RepeatingCallback<void(const ftl::ReceiveMessagesResponse&)>&
+        on_incoming_msg,
+    base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
+  context_->ExecuteServerStreamingRpc(
+      base::BindOnce(&Messaging::Stub::AsyncReceiveMessages,
+                     base::Unretained(messaging_stub_.get())),
+      ftl::ReceiveMessagesRequest(), std::move(on_stream_started),
+      on_incoming_msg, std::move(on_channel_closed));
+}
+
 void FtlMessagingClient::RunMessageCallbacks(const ftl::InboxMessage& message) {
   if (message.message_type() !=
       ftl::InboxMessage_MessageType_CHROMOTING_MESSAGE) {
@@ -107,4 +146,11 @@
   callback_list_.Notify(message.sender_id().id(), chromoting_message.message());
 }
 
+void FtlMessagingClient::OnMessageReceived(const ftl::InboxMessage& message) {
+  RunMessageCallbacks(message);
+  ftl::AckMessagesRequest ack_request;
+  AddMessageToAckRequest(message, &ack_request);
+  AckMessages(ack_request, base::DoNothing());
+}
+
 }  // namespace remoting
diff --git a/remoting/signaling/ftl_messaging_client.h b/remoting/signaling/ftl_messaging_client.h
index f5f7e42..3e70a7d0 100644
--- a/remoting/signaling/ftl_messaging_client.h
+++ b/remoting/signaling/ftl_messaging_client.h
@@ -17,6 +17,8 @@
 namespace remoting {
 
 class FtlGrpcContext;
+class MessageReceptionChannel;
+class ScopedGrpcServerStream;
 
 // A class for sending and receiving messages via the FTL API.
 class FtlMessagingClient final {
@@ -44,6 +46,18 @@
   // server's inbox.
   void PullMessages(DoneCallback on_done);
 
+  // Opens a stream to continuously receive new messages from the server and
+  // calls the registered MessageCallback once a new message is received.
+  // |on_done| is called once the stream is successfully opened or failed to
+  // open due to an error.
+  void StartReceivingMessages(DoneCallback on_done);
+
+  // Stops the stream for continuously receiving new messages.
+  void StopReceivingMessages();
+
+  void SetMessageReceptionChannelForTesting(
+      std::unique_ptr<MessageReceptionChannel> channel);
+
  private:
   using Messaging =
       google::internal::communications::instantmessaging::v1::Messaging;
@@ -59,10 +73,20 @@
                              const grpc::Status& status,
                              const ftl::AckMessagesResponse& response);
 
+  void OpenReceiveMessagesStream(
+      base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+          on_stream_started,
+      const base::RepeatingCallback<void(const ftl::ReceiveMessagesResponse&)>&
+          on_incoming_msg,
+      base::OnceCallback<void(const grpc::Status&)> on_channel_closed);
+
   void RunMessageCallbacks(const ftl::InboxMessage& message);
 
+  void OnMessageReceived(const ftl::InboxMessage& message);
+
   FtlGrpcContext* context_;
   std::unique_ptr<Messaging::Stub> messaging_stub_;
+  std::unique_ptr<MessageReceptionChannel> reception_channel_;
   base::CallbackList<void(const std::string&, const std::string&)>
       callback_list_;
 
diff --git a/remoting/signaling/ftl_messaging_client_unittest.cc b/remoting/signaling/ftl_messaging_client_unittest.cc
index 5b27d4b..6f5ac52 100644
--- a/remoting/signaling/ftl_messaging_client_unittest.cc
+++ b/remoting/signaling/ftl_messaging_client_unittest.cc
@@ -22,6 +22,7 @@
 #include "remoting/signaling/ftl_services.grpc.pb.h"
 #include "remoting/signaling/grpc_support/grpc_async_test_server.h"
 #include "remoting/signaling/grpc_support/grpc_test_util.h"
+#include "remoting/signaling/message_reception_channel.h"
 #include "testing/gmock/include/gmock/gmock.h"
 #include "testing/gtest/include/gtest/gtest.h"
 
@@ -29,6 +30,9 @@
 
 namespace {
 
+using ::testing::_;
+using ::testing::Invoke;
+using ::testing::Property;
 using ::testing::Return;
 
 using PullMessagesResponder =
@@ -59,6 +63,32 @@
   return message;
 }
 
+class MockMessageReceptionChannel : public MessageReceptionChannel {
+ public:
+  MockMessageReceptionChannel() = default;
+  ~MockMessageReceptionChannel() override = default;
+
+  // MessageReceptionChannel implementations.
+  void Initialize(const StreamOpener& stream_opener,
+                  const MessageCallback& on_incoming_msg) override {
+    stream_opener_ = stream_opener;
+    on_incoming_msg_ = on_incoming_msg;
+  }
+
+  MOCK_METHOD1(StartReceivingMessages, void(DoneCallback));
+  MOCK_METHOD0(StopReceivingMessages, void());
+
+  StreamOpener* stream_opener() { return &stream_opener_; }
+
+  MessageCallback* on_incoming_msg() { return &on_incoming_msg_; }
+
+ private:
+  StreamOpener stream_opener_;
+  MessageCallback on_incoming_msg_;
+
+  DISALLOW_COPY_AND_ASSIGN(MockMessageReceptionChannel);
+};
+
 }  // namespace
 
 class FtlMessagingClientTest : public testing::Test {
@@ -67,6 +97,9 @@
   void TearDown() override;
 
  protected:
+  using Messaging =
+      google::internal::communications::instantmessaging::v1::Messaging;
+
   // Calls are scheduled sequentially and handled on the server thread.
   void ServerWaitAndRespondToPullMessagesRequest(
       const ftl::PullMessagesResponse& response,
@@ -75,15 +108,13 @@
       base::OnceCallback<grpc::Status(const ftl::AckMessagesRequest&)> handler,
       base::OnceClosure on_done);
 
+  std::unique_ptr<test::GrpcAsyncTestServer> server_;
+  scoped_refptr<base::SequencedTaskRunner> server_task_runner_;
   std::unique_ptr<FtlMessagingClient> messaging_client_;
+  MockMessageReceptionChannel* mock_message_reception_channel_;
 
  private:
-  using Messaging =
-      google::internal::communications::instantmessaging::v1::Messaging;
-
-  std::unique_ptr<test::GrpcAsyncTestServer> server_;
   std::unique_ptr<test::FtlGrpcTestEnvironment> test_environment_;
-  scoped_refptr<base::SequencedTaskRunner> server_task_runner_;
   base::test::ScopedTaskEnvironment scoped_task_environment_;
 };
 
@@ -96,6 +127,9 @@
       server_->CreateInProcessChannel());
   messaging_client_ =
       std::make_unique<FtlMessagingClient>(test_environment_->context());
+  auto channel = std::make_unique<MockMessageReceptionChannel>();
+  mock_message_reception_channel_ = channel.get();
+  messaging_client_->SetMessageReceptionChannelForTesting(std::move(channel));
 }
 
 void FtlMessagingClientTest::TearDown() {
@@ -231,4 +265,96 @@
   run_loop.Run();
 }
 
+TEST_F(FtlMessagingClientTest,
+       TestStartReceivingMessages_DoneCallbackForwarded) {
+  base::RunLoop run_loop;
+
+  EXPECT_CALL(*mock_message_reception_channel_, StartReceivingMessages(_))
+      .WillOnce(Invoke([&](FtlMessagingClient::DoneCallback callback) {
+        std::move(callback).Run(
+            grpc::Status(grpc::StatusCode::UNAUTHENTICATED, ""));
+      }));
+
+  messaging_client_->StartReceivingMessages(
+      base::BindLambdaForTesting([&](const grpc::Status& status) {
+        ASSERT_EQ(grpc::StatusCode::UNAUTHENTICATED, status.error_code());
+        run_loop.Quit();
+      }));
+
+  run_loop.Run();
+}
+
+TEST_F(FtlMessagingClientTest, TestStopReceivingMessages_CallForwarded) {
+  EXPECT_CALL(*mock_message_reception_channel_, StopReceivingMessages())
+      .WillOnce(Return());
+  messaging_client_->StopReceivingMessages();
+}
+
+TEST_F(FtlMessagingClientTest,
+       TestStreamOpener_StreamsTwoMessagesThenCloseByClient) {
+  base::RunLoop run_loop;
+
+  std::unique_ptr<test::GrpcServerStreamResponder<ftl::ReceiveMessagesResponse>>
+      responder;
+  std::unique_ptr<ScopedGrpcServerStream> scoped_stream;
+
+  ftl::ReceiveMessagesResponse response_1;
+  response_1.mutable_inbox_message()->set_message_id(kMessage1Id);
+  ftl::ReceiveMessagesResponse response_2;
+  response_2.mutable_pong();
+
+  server_task_runner_->PostTask(
+      FROM_HERE, base::BindLambdaForTesting([&]() {
+        ftl::ReceiveMessagesRequest request;
+        // This blocks the server thread until the client opens the stream.
+        responder = server_->HandleStreamRequest(
+            &Messaging::AsyncService::RequestReceiveMessages, &request);
+        ASSERT_TRUE(responder->SendMessage(response_1));
+        ASSERT_TRUE(responder->SendMessage(response_2));
+      }));
+
+  base::MockCallback<
+      base::RepeatingCallback<void(const ftl::ReceiveMessagesResponse&)>>
+      mock_on_incoming_msg;
+  EXPECT_CALL(mock_on_incoming_msg, Run(_))
+      .WillOnce([&](const ftl::ReceiveMessagesResponse& response) {
+        ASSERT_EQ(kMessage1Id, response.inbox_message().message_id());
+      })
+      .WillOnce([&](const ftl::ReceiveMessagesResponse& response) {
+        ASSERT_TRUE(response.has_pong());
+        DCHECK(scoped_stream);
+        scoped_stream.reset();
+        run_loop.QuitWhenIdle();
+      });
+
+  mock_message_reception_channel_->stream_opener()->Run(
+      base::BindLambdaForTesting(
+          [&](std::unique_ptr<ScopedGrpcServerStream> stream) {
+            scoped_stream = std::move(stream);
+          }),
+      mock_on_incoming_msg.Get(),
+      base::BindOnce([](const grpc::Status&) { NOTREACHED(); }));
+
+  run_loop.Run();
+}
+
+TEST_F(FtlMessagingClientTest,
+       TestOnMessageReceived_MessagePassedToSubscriberAndAcked) {
+  base::RunLoop run_loop;
+
+  ftl::InboxMessage message = CreateMessage(kMessage1Id, kMessage1Text);
+  mock_message_reception_channel_->on_incoming_msg()->Run(message);
+
+  ServerWaitAndRespondToAckMessagesRequest(
+      base::BindLambdaForTesting([&](const ftl::AckMessagesRequest& request) {
+        EXPECT_EQ(1, request.messages_size());
+        EXPECT_EQ(kFakeReceiverId, request.messages(0).receiver_id().id());
+        EXPECT_EQ(kMessage1Id, request.messages(0).message_id());
+        return grpc::Status::OK;
+      }),
+      run_loop.QuitWhenIdleClosure());
+
+  run_loop.Run();
+}
+
 }  // namespace remoting
diff --git a/remoting/signaling/ftl_services.proto b/remoting/signaling/ftl_services.proto
index b523408..9c0a1e5 100644
--- a/remoting/signaling/ftl_services.proto
+++ b/remoting/signaling/ftl_services.proto
@@ -27,4 +27,6 @@
       returns (remoting.ftl.PullMessagesResponse) {}
   rpc AckMessages(remoting.ftl.AckMessagesRequest)
       returns (remoting.ftl.AckMessagesResponse) {}
+  rpc ReceiveMessages(remoting.ftl.ReceiveMessagesRequest)
+      returns (stream remoting.ftl.ReceiveMessagesResponse) {}
 }
diff --git a/remoting/signaling/grpc_support/grpc_test_util.cc b/remoting/signaling/grpc_support/grpc_test_util.cc
index 4220077..4a74791 100644
--- a/remoting/signaling/grpc_support/grpc_test_util.cc
+++ b/remoting/signaling/grpc_support/grpc_test_util.cc
@@ -25,7 +25,7 @@
 void WaitForCompletionAndAssertOk(const base::Location& from_here,
                                   grpc::CompletionQueue* completion_queue,
                                   void* expected_tag) {
-  bool ok = WaitForCompletion(FROM_HERE, completion_queue, expected_tag);
+  bool ok = WaitForCompletion(from_here, completion_queue, expected_tag);
   DCHECK(ok) << "Event is not ok. Location: " << from_here.ToString();
 }
 
diff --git a/remoting/signaling/grpc_support/scoped_grpc_server_stream.h b/remoting/signaling/grpc_support/scoped_grpc_server_stream.h
index 219be189..7e257e4 100644
--- a/remoting/signaling/grpc_support/scoped_grpc_server_stream.h
+++ b/remoting/signaling/grpc_support/scoped_grpc_server_stream.h
@@ -20,7 +20,7 @@
  public:
   explicit ScopedGrpcServerStream(
       base::WeakPtr<internal::GrpcAsyncServerStreamingCallDataBase> call_data);
-  ~ScopedGrpcServerStream();
+  virtual ~ScopedGrpcServerStream();
 
  private:
   base::WeakPtr<internal::GrpcAsyncServerStreamingCallDataBase> call_data_;
diff --git a/remoting/signaling/message_reception_channel.h b/remoting/signaling/message_reception_channel.h
new file mode 100644
index 0000000..2a746b08
--- /dev/null
+++ b/remoting/signaling/message_reception_channel.h
@@ -0,0 +1,54 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_SIGNALING_MESSAGE_RECEPTION_CHANNEL_H_
+#define REMOTING_SIGNALING_MESSAGE_RECEPTION_CHANNEL_H_
+
+#include <list>
+#include <memory>
+
+#include "base/callback_forward.h"
+#include "base/macros.h"
+#include "remoting/signaling/ftl.pb.h"
+#include "third_party/grpc/src/include/grpcpp/support/status.h"
+
+namespace remoting {
+
+class ScopedGrpcServerStream;
+
+// Interface for starting or closing the server stream to receive messages from
+// FTL backend.
+class MessageReceptionChannel {
+ public:
+  using StreamOpener = base::RepeatingCallback<void(
+      base::OnceCallback<void(std::unique_ptr<ScopedGrpcServerStream>)>
+          on_stream_started,
+      const base::RepeatingCallback<void(const ftl::ReceiveMessagesResponse&)>&
+          on_incoming_msg,
+      base::OnceCallback<void(const grpc::Status&)> on_channel_closed)>;
+  using MessageCallback =
+      base::RepeatingCallback<void(const ftl::InboxMessage&)>;
+  using DoneCallback = base::OnceCallback<void(const grpc::Status& status)>;
+
+  MessageReceptionChannel() = default;
+  virtual ~MessageReceptionChannel() = default;
+
+  virtual void Initialize(const StreamOpener& stream_opener,
+                          const MessageCallback& on_incoming_msg) = 0;
+
+  // Opens a server streaming channel to the FTL API to enable message reception
+  // over the fast path. |on_done| is called to signal success or failure for
+  // starting the stream.
+  virtual void StartReceivingMessages(DoneCallback on_done) = 0;
+
+  // Closes the streaming channel.
+  virtual void StopReceivingMessages() = 0;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(MessageReceptionChannel);
+};
+
+}  // namespace remoting
+
+#endif  // REMOTING_SIGNALING_MESSAGE_RECEPTION_CHANNEL_H_
diff --git a/remoting/test/ftl_signaling_playground.cc b/remoting/test/ftl_signaling_playground.cc
index 01702d2..1baf00d 100644
--- a/remoting/test/ftl_signaling_playground.cc
+++ b/remoting/test/ftl_signaling_playground.cc
@@ -19,6 +19,7 @@
 #include "base/path_service.h"
 #include "base/strings/string_number_conversions.h"
 #include "base/strings/stringprintf.h"
+#include "base/task/post_task.h"
 #include "remoting/base/fake_oauth_token_getter.h"
 #include "remoting/base/oauth_token_getter_impl.h"
 #include "remoting/signaling/ftl_services.grpc.pb.h"
@@ -62,6 +63,35 @@
   return ReadString();
 }
 
+// Wait for the user to press enter key on an anonymous thread and calls
+// |on_done| once it is done.
+void WaitForEnterKey(base::OnceClosure on_done) {
+  base::PostTaskWithTraitsAndReply(FROM_HERE, {base::MayBlock()},
+                                   base::BindOnce([]() { getchar(); }),
+                                   std::move(on_done));
+}
+
+void PrintGrpcStatusError(const grpc::Status& status) {
+  DCHECK(!status.ok());
+  fprintf(stderr, "RPC failed. Code=%d, Message=%s\n", status.error_code(),
+          status.error_message().c_str());
+  switch (status.error_code()) {
+    case grpc::StatusCode::UNAVAILABLE:
+      fprintf(stderr,
+              "Set the GRPC_DEFAULT_SSL_ROOTS_FILE_PATH environment variable "
+              "to third_party/grpc/src/etc/roots.pem if gRPC cannot locate the "
+              "root certificates.\n");
+      break;
+    case grpc::StatusCode::UNAUTHENTICATED:
+      fprintf(
+          stderr,
+          "Request is unauthenticated. Make sure you have run SignInGaia.\n");
+      break;
+    default:
+      break;
+  }
+}
+
 }  // namespace
 
 namespace remoting {
@@ -116,25 +146,29 @@
   while (true) {
     printf(
         "\nOptions:\n"
-        "  1. GetIceServer\n"
-        "  2. SignInGaia\n"
+        "  1. SignInGaia\n"
+        "  2. GetIceServer\n"
         "  3. PullMessages\n"
-        "  4. Quit\n\n"
+        "  4. ReceiveMessages\n"
+        "  5. Quit\n\n"
         "Your choice [number]: ");
     int choice = 0;
     base::StringToInt(ReadString(), &choice);
     base::RunLoop run_loop;
     switch (choice) {
       case 1:
-        GetIceServer(run_loop.QuitClosure());
+        SignInGaia(run_loop.QuitClosure());
         break;
       case 2:
-        SignInGaia(run_loop.QuitClosure());
+        GetIceServer(run_loop.QuitClosure());
         break;
       case 3:
         PullMessages(run_loop.QuitClosure());
         break;
       case 4:
+        StartReceivingMessages(run_loop.QuitWhenIdleClosure());
+        break;
+      case 5:
         return;
       default:
         fprintf(stderr, "Unknown option\n");
@@ -200,27 +234,6 @@
   std::move(on_done).Run();
 }
 
-void FtlSignalingPlayground::HandleGrpcStatusError(const grpc::Status& status) {
-  DCHECK(!status.ok());
-  LOG(ERROR) << "RPC failed. Code=" << status.error_code() << ", "
-             << "Message=" << status.error_message();
-  switch (status.error_code()) {
-    case grpc::StatusCode::UNAVAILABLE:
-      VLOG(0)
-          << "Set the GRPC_DEFAULT_SSL_ROOTS_FILE_PATH environment variable "
-          << "to third_party/grpc/src/etc/roots.pem if gRPC cannot locate the "
-          << "root certificates.";
-      break;
-    case grpc::StatusCode::UNAUTHENTICATED:
-      VLOG(0) << "Grpc request failed to authenticate. "
-              << "Trying to reauthenticate...";
-      AuthenticateAndResetServices();
-      break;
-    default:
-      break;
-  }
-}
-
 void FtlSignalingPlayground::GetIceServer(base::OnceClosure on_done) {
   DCHECK(peer_to_peer_stub_);
   VLOG(0) << "Running GetIceServer...";
@@ -254,7 +267,7 @@
       }
     }
   } else {
-    HandleGrpcStatusError(status);
+    PrintGrpcStatusError(status);
   }
   std::move(on_done).Run();
 }
@@ -316,7 +329,13 @@
     ftl_context_->SetAuthToken(response.auth_token().payload());
     VLOG(0) << "Auth token set on FtlClient";
   } else {
-    HandleGrpcStatusError(status);
+    if (status.error_code() == grpc::StatusCode::UNAUTHENTICATED) {
+      VLOG(0) << "Grpc request failed to authenticate. "
+              << "Trying to reauthenticate...";
+      AuthenticateAndResetServices();
+    } else {
+      PrintGrpcStatusError(status);
+    }
   }
   std::move(on_done).Run();
 }
@@ -334,15 +353,23 @@
     base::OnceClosure on_done,
     const grpc::Status& status) {
   if (!status.ok()) {
-    if (status.error_code() == grpc::StatusCode::UNAUTHENTICATED) {
-      fprintf(stderr, "Please run SignInGaia first\n");
-    } else {
-      HandleGrpcStatusError(status);
-    }
+    PrintGrpcStatusError(status);
   }
   std::move(on_done).Run();
 }
 
+void FtlSignalingPlayground::StartReceivingMessages(base::OnceClosure on_done) {
+  VLOG(0) << "Running StartReceivingMessages...";
+  messaging_client_->StartReceivingMessages(
+      base::BindOnce(&FtlSignalingPlayground::OnStartReceivingMessagesDone,
+                     weak_factory_.GetWeakPtr(), std::move(on_done)));
+}
+
+void FtlSignalingPlayground::StopReceivingMessages(base::OnceClosure on_done) {
+  messaging_client_->StopReceivingMessages();
+  std::move(on_done).Run();
+}
+
 void FtlSignalingPlayground::OnMessageReceived(const std::string& sender_id,
                                                const std::string& message) {
   printf(
@@ -352,4 +379,23 @@
       sender_id.c_str(), message.c_str());
 }
 
+void FtlSignalingPlayground::OnStartReceivingMessagesDone(
+    base::OnceClosure on_done,
+    const grpc::Status& status) {
+  if (status.error_code() == grpc::StatusCode::CANCELLED) {
+    printf("ReceiveMessages stream canceled by client.\n");
+    std::move(on_done).Run();
+    return;
+  }
+  if (!status.ok()) {
+    PrintGrpcStatusError(status);
+    std::move(on_done).Run();
+    return;
+  }
+  printf("Started receiving messages. Press enter to stop streaming...\n");
+  WaitForEnterKey(base::BindOnce(&FtlSignalingPlayground::StopReceivingMessages,
+                                 weak_factory_.GetWeakPtr(),
+                                 std::move(on_done)));
+}
+
 }  // namespace remoting
diff --git a/remoting/test/ftl_signaling_playground.h b/remoting/test/ftl_signaling_playground.h
index 910ffa5..abd39ab 100644
--- a/remoting/test/ftl_signaling_playground.h
+++ b/remoting/test/ftl_signaling_playground.h
@@ -45,7 +45,6 @@
                      OAuthTokenGetter::Status status,
                      const std::string& user_email,
                      const std::string& access_token);
-  void HandleGrpcStatusError(const grpc::Status& status);
 
   void GetIceServer(base::OnceClosure on_done);
   void OnGetIceServerResponse(base::OnceClosure on_done,
@@ -60,8 +59,12 @@
   void PullMessages(base::OnceClosure on_done);
   void OnPullMessagesResponse(base::OnceClosure on_done,
                               const grpc::Status& status);
+  void StartReceivingMessages(base::OnceClosure on_done);
+  void StopReceivingMessages(base::OnceClosure on_done);
   void OnMessageReceived(const std::string& sender_id,
                          const std::string& message);
+  void OnStartReceivingMessagesDone(base::OnceClosure on_done,
+                                    const grpc::Status& status);
 
   std::unique_ptr<test::TestTokenStorage> storage_;
   std::unique_ptr<TestOAuthTokenGetterFactory> token_getter_factory_;