| // Copyright 2019 The Chromium Authors | 
 | // Use of this source code is governed by a BSD-style license that can be | 
 | // found in the LICENSE file. | 
 |  | 
 | #include "remoting/signaling/ftl_message_reception_channel.h" | 
 |  | 
 | #include <algorithm> | 
 | #include <memory> | 
 | #include <string> | 
 | #include <utility> | 
 | #include <vector> | 
 |  | 
 | #include "base/bind.h" | 
 | #include "base/callback_helpers.h" | 
 | #include "base/memory/weak_ptr.h" | 
 | #include "base/notreached.h" | 
 | #include "base/run_loop.h" | 
 | #include "base/task/sequenced_task_runner.h" | 
 | #include "base/test/bind.h" | 
 | #include "base/test/mock_callback.h" | 
 | #include "base/test/task_environment.h" | 
 | #include "base/time/time.h" | 
 | #include "remoting/base/protobuf_http_status.h" | 
 | #include "remoting/base/scoped_protobuf_http_request.h" | 
 | #include "remoting/proto/ftl/v1/ftl_messages.pb.h" | 
 | #include "remoting/signaling/ftl_services_context.h" | 
 | #include "remoting/signaling/signaling_tracker.h" | 
 | #include "testing/gmock/include/gmock/gmock.h" | 
 | #include "testing/gtest/include/gtest/gtest.h" | 
 |  | 
 | namespace remoting { | 
 |  | 
 | namespace { | 
 |  | 
 | using ::testing::_; | 
 | using ::testing::Expectation; | 
 | using ::testing::Invoke; | 
 | using ::testing::Property; | 
 | using ::testing::Return; | 
 |  | 
 | using ReceiveMessagesResponseCallback = base::RepeatingCallback<void( | 
 |     std::unique_ptr<ftl::ReceiveMessagesResponse>)>; | 
 | using StatusCallback = base::OnceCallback<void(const ProtobufHttpStatus&)>; | 
 |  | 
 | class MockSignalingTracker : public SignalingTracker { | 
 |  public: | 
 |   MOCK_METHOD0(OnSignalingActive, void()); | 
 | }; | 
 |  | 
 | // Fake stream implementation to allow probing if a stream is closed by client. | 
 | class FakeScopedProtobufHttpRequest : public ScopedProtobufHttpRequest { | 
 |  public: | 
 |   FakeScopedProtobufHttpRequest() | 
 |       : ScopedProtobufHttpRequest(base::DoNothing()) {} | 
 |  | 
 |   FakeScopedProtobufHttpRequest(const FakeScopedProtobufHttpRequest&) = delete; | 
 |   FakeScopedProtobufHttpRequest& operator=( | 
 |       const FakeScopedProtobufHttpRequest&) = delete; | 
 |  | 
 |   ~FakeScopedProtobufHttpRequest() override = default; | 
 |  | 
 |   base::WeakPtr<FakeScopedProtobufHttpRequest> GetWeakPtr() { | 
 |     return weak_factory_.GetWeakPtr(); | 
 |   } | 
 |  | 
 |  private: | 
 |   base::WeakPtrFactory<FakeScopedProtobufHttpRequest> weak_factory_{this}; | 
 | }; | 
 |  | 
 | std::unique_ptr<FakeScopedProtobufHttpRequest> CreateFakeServerStream() { | 
 |   return std::make_unique<FakeScopedProtobufHttpRequest>(); | 
 | } | 
 |  | 
 | // Creates a gmock EXPECT_CALL action that: | 
 | //   1. Creates a fake server stream and returns it as the start stream result | 
 | //   2. Posts a task to call |on_stream_opened| at the end of current sequence | 
 | //   3. Writes the WeakPtr to the fake server stream to |optional_out_stream| | 
 | //      if it is provided. | 
 | template <typename OnStreamOpenedLambda> | 
 | decltype(auto) StartStream(OnStreamOpenedLambda on_stream_opened, | 
 |                            base::WeakPtr<FakeScopedProtobufHttpRequest>* | 
 |                                optional_out_stream = nullptr) { | 
 |   return [=](base::OnceClosure on_channel_ready, | 
 |              const ReceiveMessagesResponseCallback& on_incoming_msg, | 
 |              StatusCallback on_channel_closed) { | 
 |     auto fake_stream = CreateFakeServerStream(); | 
 |     if (optional_out_stream) { | 
 |       *optional_out_stream = fake_stream->GetWeakPtr(); | 
 |     } | 
 |     auto on_stream_opened_cb = base::BindLambdaForTesting(on_stream_opened); | 
 |     base::SequencedTaskRunner::GetCurrentDefault()->PostTask( | 
 |         FROM_HERE, | 
 |         base::BindOnce(on_stream_opened_cb, std::move(on_channel_ready), | 
 |                        on_incoming_msg, std::move(on_channel_closed))); | 
 |     return fake_stream; | 
 |   }; | 
 | } | 
 |  | 
 | base::OnceClosure NotReachedClosure() { | 
 |   return base::BindOnce([]() { NOTREACHED(); }); | 
 | } | 
 |  | 
 | base::RepeatingCallback<void(const ProtobufHttpStatus&)> | 
 | NotReachedStatusCallback(const base::Location& location) { | 
 |   return base::BindLambdaForTesting([=](const ProtobufHttpStatus& status) { | 
 |     NOTREACHED() << "Location: " << location.ToString() | 
 |                  << ", status code: " << static_cast<int>(status.error_code()); | 
 |   }); | 
 | } | 
 |  | 
 | base::OnceCallback<void(const ProtobufHttpStatus&)> | 
 | CheckStatusThenQuitRunLoopCallback( | 
 |     const base::Location& from_here, | 
 |     ProtobufHttpStatus::Code expected_status_code, | 
 |     base::RunLoop* run_loop) { | 
 |   return base::BindLambdaForTesting([=](const ProtobufHttpStatus& status) { | 
 |     ASSERT_EQ(expected_status_code, status.error_code()) | 
 |         << "Incorrect status code. Location: " << from_here.ToString(); | 
 |     run_loop->QuitWhenIdle(); | 
 |   }); | 
 | } | 
 |  | 
 | }  // namespace | 
 |  | 
 | class FtlMessageReceptionChannelTest : public testing::Test { | 
 |  public: | 
 |   void SetUp() override; | 
 |   void TearDown() override; | 
 |  | 
 |  protected: | 
 |   base::TimeDelta GetTimeUntilRetry() const; | 
 |   int GetRetryFailureCount() const; | 
 |  | 
 |   base::test::TaskEnvironment task_environment_{ | 
 |       base::test::TaskEnvironment::TimeSource::MOCK_TIME}; | 
 |   std::unique_ptr<FtlMessageReceptionChannel> channel_; | 
 |   base::MockCallback<FtlMessageReceptionChannel::StreamOpener> | 
 |       mock_stream_opener_; | 
 |   base::MockCallback<base::RepeatingCallback<void(const ftl::InboxMessage&)>> | 
 |       mock_on_incoming_msg_; | 
 |   MockSignalingTracker mock_signaling_tracker_; | 
 | }; | 
 |  | 
 | void FtlMessageReceptionChannelTest::SetUp() { | 
 |   channel_ = | 
 |       std::make_unique<FtlMessageReceptionChannel>(&mock_signaling_tracker_); | 
 |   channel_->Initialize(mock_stream_opener_.Get(), mock_on_incoming_msg_.Get()); | 
 | } | 
 |  | 
 | void FtlMessageReceptionChannelTest::TearDown() { | 
 |   channel_.reset(); | 
 |   task_environment_.FastForwardUntilNoTasksRemain(); | 
 | } | 
 |  | 
 | base::TimeDelta FtlMessageReceptionChannelTest::GetTimeUntilRetry() const { | 
 |   return channel_->GetReconnectRetryBackoffEntryForTesting() | 
 |       .GetTimeUntilRelease(); | 
 | } | 
 |  | 
 | int FtlMessageReceptionChannelTest::GetRetryFailureCount() const { | 
 |   return channel_->GetReconnectRetryBackoffEntryForTesting().failure_count(); | 
 | } | 
 |  | 
 | TEST_F(FtlMessageReceptionChannelTest, | 
 |        TestStartReceivingMessages_StoppedImmediately) { | 
 |   base::RunLoop run_loop; | 
 |  | 
 |   EXPECT_CALL(mock_stream_opener_, Run(_, _, _)) | 
 |       .WillOnce(StartStream( | 
 |           [&](base::OnceClosure on_channel_ready, | 
 |               const ReceiveMessagesResponseCallback& on_incoming_msg, | 
 |               StatusCallback on_channel_closed) { | 
 |             channel_->StopReceivingMessages(); | 
 |             run_loop.Quit(); | 
 |           })); | 
 |  | 
 |   channel_->StartReceivingMessages(NotReachedClosure(), | 
 |                                    NotReachedStatusCallback(FROM_HERE)); | 
 |  | 
 |   run_loop.Run(); | 
 | } | 
 |  | 
 | TEST_F(FtlMessageReceptionChannelTest, | 
 |        TestStartReceivingMessages_NotAuthenticated) { | 
 |   base::RunLoop run_loop; | 
 |  | 
 |   EXPECT_CALL(mock_stream_opener_, Run(_, _, _)) | 
 |       .WillOnce(StartStream( | 
 |           [&](base::OnceClosure on_channel_ready, | 
 |               const ReceiveMessagesResponseCallback& on_incoming_msg, | 
 |               StatusCallback on_channel_closed) { | 
 |             std::move(on_channel_closed) | 
 |                 .Run(ProtobufHttpStatus( | 
 |                     ProtobufHttpStatus::Code::UNAUTHENTICATED, "")); | 
 |           })); | 
 |  | 
 |   channel_->StartReceivingMessages( | 
 |       NotReachedClosure(), | 
 |       CheckStatusThenQuitRunLoopCallback( | 
 |           FROM_HERE, ProtobufHttpStatus::Code::UNAUTHENTICATED, &run_loop)); | 
 |  | 
 |   run_loop.Run(); | 
 | } | 
 |  | 
 | TEST_F(FtlMessageReceptionChannelTest, | 
 |        TestStartReceivingMessages_StreamStarted) { | 
 |   base::RunLoop run_loop; | 
 |  | 
 |   EXPECT_CALL(mock_stream_opener_, Run(_, _, _)) | 
 |       .WillOnce(StartStream( | 
 |           [&](base::OnceClosure on_channel_ready, | 
 |               const ReceiveMessagesResponseCallback& on_incoming_msg, | 
 |               StatusCallback on_channel_closed) { | 
 |             std::move(on_channel_ready).Run(); | 
 |           })); | 
 |  | 
 |   channel_->StartReceivingMessages(run_loop.QuitClosure(), | 
 |                                    NotReachedStatusCallback(FROM_HERE)); | 
 |  | 
 |   run_loop.Run(); | 
 | } | 
 |  | 
 | TEST_F(FtlMessageReceptionChannelTest, | 
 |        TestStartReceivingMessages_RecoverableStreamError) { | 
 |   base::RunLoop run_loop; | 
 |  | 
 |   base::WeakPtr<FakeScopedProtobufHttpRequest> old_stream; | 
 |   EXPECT_CALL(mock_stream_opener_, Run(_, _, _)) | 
 |       .WillOnce(StartStream( | 
 |           [&](base::OnceClosure on_channel_ready, | 
 |               const ReceiveMessagesResponseCallback& on_incoming_msg, | 
 |               StatusCallback on_channel_closed) { | 
 |             // The first open stream attempt fails with UNAVAILABLE error. | 
 |             ASSERT_EQ(0, GetRetryFailureCount()); | 
 |  | 
 |             std::move(on_channel_closed) | 
 |                 .Run(ProtobufHttpStatus(ProtobufHttpStatus::Code::UNAVAILABLE, | 
 |                                         "")); | 
 |  | 
 |             ASSERT_EQ(1, GetRetryFailureCount()); | 
 |             ASSERT_NEAR(FtlServicesContext::kBackoffInitialDelay.InSecondsF(), | 
 |                         GetTimeUntilRetry().InSecondsF(), 0.5); | 
 |  | 
 |             // This will make the channel reopen the stream. | 
 |             task_environment_.FastForwardBy(GetTimeUntilRetry()); | 
 |           }, | 
 |           &old_stream)) | 
 |       .WillOnce(StartStream( | 
 |           [&](base::OnceClosure on_channel_ready, | 
 |               const ReceiveMessagesResponseCallback& on_incoming_msg, | 
 |               StatusCallback on_channel_closed) { | 
 |             // Second open stream attempt succeeds. | 
 |  | 
 |             // Assert old stream closed. | 
 |             ASSERT_FALSE(old_stream); | 
 |  | 
 |             std::move(on_channel_ready).Run(); | 
 |  | 
 |             ASSERT_EQ(0, GetRetryFailureCount()); | 
 |           })); | 
 |  | 
 |   channel_->StartReceivingMessages(run_loop.QuitClosure(), | 
 |                                    NotReachedStatusCallback(FROM_HERE)); | 
 |  | 
 |   run_loop.Run(); | 
 | } | 
 |  | 
 | TEST_F(FtlMessageReceptionChannelTest, | 
 |        TestStartReceivingMessages_MultipleCalls) { | 
 |   base::RunLoop run_loop; | 
 |  | 
 |   base::MockCallback<base::OnceClosure> stream_ready_callback; | 
 |  | 
 |   // Exits the run loop iff the callback is called three times with OK. | 
 |   EXPECT_CALL(stream_ready_callback, Run()) | 
 |       .WillOnce(Return()) | 
 |       .WillOnce(Return()) | 
 |       .WillOnce([&]() { run_loop.Quit(); }); | 
 |  | 
 |   EXPECT_CALL(mock_stream_opener_, Run(_, _, _)) | 
 |       .WillOnce(StartStream( | 
 |           [&](base::OnceClosure on_channel_ready, | 
 |               const ReceiveMessagesResponseCallback& on_incoming_msg, | 
 |               StatusCallback on_channel_closed) { | 
 |             std::move(on_channel_ready).Run(); | 
 |           })); | 
 |  | 
 |   channel_->StartReceivingMessages(stream_ready_callback.Get(), | 
 |                                    NotReachedStatusCallback(FROM_HERE)); | 
 |   channel_->StartReceivingMessages(stream_ready_callback.Get(), | 
 |                                    NotReachedStatusCallback(FROM_HERE)); | 
 |   channel_->StartReceivingMessages(stream_ready_callback.Get(), | 
 |                                    NotReachedStatusCallback(FROM_HERE)); | 
 |  | 
 |   run_loop.Run(); | 
 | } | 
 |  | 
 | TEST_F(FtlMessageReceptionChannelTest, StreamsTwoMessages) { | 
 |   base::RunLoop run_loop; | 
 |  | 
 |   constexpr char kMessage1Id[] = "msg_1"; | 
 |   constexpr char kMessage2Id[] = "msg_2"; | 
 |  | 
 |   ftl::InboxMessage message_1; | 
 |   message_1.set_message_id(kMessage1Id); | 
 |   ftl::InboxMessage message_2; | 
 |   message_2.set_message_id(kMessage2Id); | 
 |  | 
 |   EXPECT_CALL(mock_on_incoming_msg_, | 
 |               Run(Property(&ftl::InboxMessage::message_id, kMessage1Id))) | 
 |       .WillOnce(Return()); | 
 |   EXPECT_CALL(mock_on_incoming_msg_, | 
 |               Run(Property(&ftl::InboxMessage::message_id, kMessage2Id))) | 
 |       .WillOnce(Invoke([&](const ftl::InboxMessage&) { run_loop.Quit(); })); | 
 |  | 
 |   EXPECT_CALL(mock_stream_opener_, Run(_, _, _)) | 
 |       .WillOnce(StartStream( | 
 |           [&](base::OnceClosure on_channel_ready, | 
 |               const ReceiveMessagesResponseCallback& on_incoming_msg, | 
 |               StatusCallback on_channel_closed) { | 
 |             std::move(on_channel_ready).Run(); | 
 |  | 
 |             auto response = std::make_unique<ftl::ReceiveMessagesResponse>(); | 
 |             *response->mutable_inbox_message() = message_1; | 
 |             on_incoming_msg.Run(std::move(response)); | 
 |  | 
 |             response = std::make_unique<ftl::ReceiveMessagesResponse>(); | 
 |             *response->mutable_inbox_message() = message_2; | 
 |             on_incoming_msg.Run(std::move(response)); | 
 |  | 
 |             const ProtobufHttpStatus kCancel( | 
 |                 ProtobufHttpStatus::Code::CANCELLED, "Cancelled"); | 
 |             std::move(on_channel_closed).Run(kCancel); | 
 |           })); | 
 |  | 
 |   channel_->StartReceivingMessages( | 
 |       base::DoNothing(), | 
 |       CheckStatusThenQuitRunLoopCallback( | 
 |           FROM_HERE, ProtobufHttpStatus::ProtobufHttpStatus::Code::CANCELLED, | 
 |           &run_loop)); | 
 |  | 
 |   run_loop.Run(); | 
 | } | 
 |  | 
 | TEST_F(FtlMessageReceptionChannelTest, ReceivedOnePong_OnSignalingActiveTwice) { | 
 |   base::RunLoop run_loop; | 
 |  | 
 |   base::MockCallback<base::OnceClosure> stream_ready_callback; | 
 |  | 
 |   EXPECT_CALL(mock_signaling_tracker_, OnSignalingActive()) | 
 |       .WillOnce(Return()) | 
 |       .WillOnce([&]() { run_loop.Quit(); }); | 
 |  | 
 |   EXPECT_CALL(mock_stream_opener_, Run(_, _, _)) | 
 |       .WillOnce(StartStream( | 
 |           [&](base::OnceClosure on_channel_ready, | 
 |               const ReceiveMessagesResponseCallback& on_incoming_msg, | 
 |               StatusCallback on_channel_closed) { | 
 |             std::move(on_channel_ready).Run(); | 
 |             auto response = std::make_unique<ftl::ReceiveMessagesResponse>(); | 
 |             response->mutable_pong(); | 
 |             on_incoming_msg.Run(std::move(response)); | 
 |           })); | 
 |  | 
 |   channel_->StartReceivingMessages(base::DoNothing(), | 
 |                                    NotReachedStatusCallback(FROM_HERE)); | 
 |  | 
 |   run_loop.Run(); | 
 | } | 
 |  | 
 | TEST_F(FtlMessageReceptionChannelTest, NoPongWithinTimeout_ResetsStream) { | 
 |   base::RunLoop run_loop; | 
 |  | 
 |   base::WeakPtr<FakeScopedProtobufHttpRequest> old_stream; | 
 |   EXPECT_CALL(mock_stream_opener_, Run(_, _, _)) | 
 |       .WillOnce(StartStream( | 
 |           [&](base::OnceClosure on_channel_ready, | 
 |               const ReceiveMessagesResponseCallback& on_incoming_msg, | 
 |               StatusCallback on_channel_closed) { | 
 |             std::move(on_channel_ready).Run(); | 
 |             task_environment_.FastForwardBy( | 
 |                 FtlMessageReceptionChannel::kPongTimeout); | 
 |  | 
 |             ASSERT_EQ(1, GetRetryFailureCount()); | 
 |             ASSERT_NEAR(FtlServicesContext::kBackoffInitialDelay.InSecondsF(), | 
 |                         GetTimeUntilRetry().InSecondsF(), 0.5); | 
 |  | 
 |             // This will make the channel reopen the stream. | 
 |             task_environment_.FastForwardBy(GetTimeUntilRetry()); | 
 |           }, | 
 |           &old_stream)) | 
 |       .WillOnce(StartStream( | 
 |           [&](base::OnceClosure on_channel_ready, | 
 |               const ReceiveMessagesResponseCallback& on_incoming_msg, | 
 |               StatusCallback on_channel_closed) { | 
 |             // Stream is reopened. | 
 |  | 
 |             // Assert old stream closed. | 
 |             ASSERT_FALSE(old_stream); | 
 |  | 
 |             std::move(on_channel_ready).Run(); | 
 |             ASSERT_EQ(0, GetRetryFailureCount()); | 
 |             run_loop.Quit(); | 
 |           })); | 
 |  | 
 |   channel_->StartReceivingMessages(base::DoNothing(), | 
 |                                    NotReachedStatusCallback(FROM_HERE)); | 
 |  | 
 |   run_loop.Run(); | 
 | } | 
 |  | 
 | TEST_F(FtlMessageReceptionChannelTest, ServerClosesStream_ResetsStream) { | 
 |   base::RunLoop run_loop; | 
 |  | 
 |   base::WeakPtr<FakeScopedProtobufHttpRequest> old_stream; | 
 |   EXPECT_CALL(mock_stream_opener_, Run(_, _, _)) | 
 |       .WillOnce(StartStream( | 
 |           [&](base::OnceClosure on_channel_ready, | 
 |               const ReceiveMessagesResponseCallback& on_incoming_msg, | 
 |               StatusCallback on_channel_closed) { | 
 |             auto fake_server_stream = CreateFakeServerStream(); | 
 |             std::move(on_channel_ready).Run(); | 
 |  | 
 |             // Close the stream with OK. | 
 |             std::move(on_channel_closed).Run(ProtobufHttpStatus::OK()); | 
 |           }, | 
 |           &old_stream)) | 
 |       .WillOnce(StartStream( | 
 |           [&](base::OnceClosure on_channel_ready, | 
 |               const ReceiveMessagesResponseCallback& on_incoming_msg, | 
 |               StatusCallback on_channel_closed) { | 
 |             ASSERT_FALSE(old_stream); | 
 |  | 
 |             std::move(on_channel_ready).Run(); | 
 |             ASSERT_EQ(0, GetRetryFailureCount()); | 
 |             run_loop.Quit(); | 
 |           })); | 
 |  | 
 |   channel_->StartReceivingMessages(base::DoNothing(), | 
 |                                    NotReachedStatusCallback(FROM_HERE)); | 
 |  | 
 |   run_loop.Run(); | 
 | } | 
 |  | 
 | TEST_F(FtlMessageReceptionChannelTest, TimeoutIncreasesToMaximum) { | 
 |   base::RunLoop run_loop; | 
 |  | 
 |   int failure_count = 0; | 
 |   int hitting_max_delay_count = 0; | 
 |   EXPECT_CALL(mock_stream_opener_, Run(_, _, _)) | 
 |       .WillRepeatedly(StartStream( | 
 |           [&](base::OnceClosure on_channel_ready, | 
 |               const ReceiveMessagesResponseCallback& on_incoming_msg, | 
 |               StatusCallback on_channel_closed) { | 
 |             // Quit if delay is ~kBackoffMaxDelay three times. | 
 |             if (hitting_max_delay_count == 3) { | 
 |               std::move(on_channel_ready).Run(); | 
 |               ASSERT_EQ(0, GetRetryFailureCount()); | 
 |               run_loop.Quit(); | 
 |               return; | 
 |             } | 
 |  | 
 |             // Otherwise send UNAVAILABLE to reset the stream. | 
 |  | 
 |             std::move(on_channel_closed) | 
 |                 .Run(ProtobufHttpStatus( | 
 |                     ProtobufHttpStatus::ProtobufHttpStatus::Code::UNAVAILABLE, | 
 |                     "")); | 
 |  | 
 |             int new_failure_count = GetRetryFailureCount(); | 
 |             ASSERT_LT(failure_count, new_failure_count); | 
 |             failure_count = new_failure_count; | 
 |  | 
 |             base::TimeDelta time_until_retry = GetTimeUntilRetry(); | 
 |  | 
 |             base::TimeDelta max_delay_diff = | 
 |                 time_until_retry - FtlServicesContext::kBackoffMaxDelay; | 
 |  | 
 |             // Adjust for fuzziness. | 
 |             if (max_delay_diff.magnitude() < base::Milliseconds(500)) { | 
 |               hitting_max_delay_count++; | 
 |             } | 
 |  | 
 |             // This will tail-recursively call the stream opener. | 
 |             task_environment_.FastForwardBy(time_until_retry); | 
 |           })); | 
 |  | 
 |   channel_->StartReceivingMessages(base::DoNothing(), | 
 |                                    NotReachedStatusCallback(FROM_HERE)); | 
 |  | 
 |   run_loop.Run(); | 
 | } | 
 |  | 
 | TEST_F(FtlMessageReceptionChannelTest, | 
 |        StartStreamFailsWithUnRecoverableErrorAndRetry_TimeoutApplied) { | 
 |   base::RunLoop run_loop; | 
 |  | 
 |   base::WeakPtr<FakeScopedProtobufHttpRequest> old_stream; | 
 |   EXPECT_CALL(mock_stream_opener_, Run(_, _, _)) | 
 |       .WillOnce(StartStream( | 
 |           [&](base::OnceClosure on_channel_ready, | 
 |               const ReceiveMessagesResponseCallback& on_incoming_msg, | 
 |               StatusCallback on_channel_closed) { | 
 |             // The first open stream attempt fails with UNAUTHENTICATED error. | 
 |             ASSERT_EQ(0, GetRetryFailureCount()); | 
 |  | 
 |             std::move(on_channel_closed) | 
 |                 .Run(ProtobufHttpStatus(ProtobufHttpStatus::ProtobufHttpStatus:: | 
 |                                             Code::UNAUTHENTICATED, | 
 |                                         "")); | 
 |  | 
 |             ASSERT_EQ(1, GetRetryFailureCount()); | 
 |             ASSERT_NEAR(FtlServicesContext::kBackoffInitialDelay.InSecondsF(), | 
 |                         GetTimeUntilRetry().InSecondsF(), 0.5); | 
 |           }, | 
 |           &old_stream)) | 
 |       .WillOnce(StartStream( | 
 |           [&](base::OnceClosure on_channel_ready, | 
 |               const ReceiveMessagesResponseCallback& on_incoming_msg, | 
 |               StatusCallback on_channel_closed) { | 
 |             // Second open stream attempt succeeds. | 
 |  | 
 |             // Assert old stream closed. | 
 |             ASSERT_FALSE(old_stream); | 
 |  | 
 |             ASSERT_EQ(1, GetRetryFailureCount()); | 
 |  | 
 |             std::move(on_channel_ready).Run(); | 
 |  | 
 |             ASSERT_EQ(0, GetRetryFailureCount()); | 
 |           })); | 
 |  | 
 |   channel_->StartReceivingMessages( | 
 |       base::DoNothing(), | 
 |       base::BindLambdaForTesting([&](const ProtobufHttpStatus& status) { | 
 |         ASSERT_EQ(ProtobufHttpStatus::ProtobufHttpStatus::Code::UNAUTHENTICATED, | 
 |                   status.error_code()); | 
 |         channel_->StartReceivingMessages(run_loop.QuitClosure(), | 
 |                                          NotReachedStatusCallback(FROM_HERE)); | 
 |       })); | 
 |  | 
 |   run_loop.Run(); | 
 | } | 
 |  | 
 | }  // namespace remoting |