blob: 722914a1515115a12b9a49db991f3eb8521603ad [file] [log] [blame]
// Copyright 2025 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "remoting/signaling/message_channel.h"
#include <utility>
#include "base/functional/bind.h"
#include "base/functional/callback.h"
#include "base/functional/callback_helpers.h"
#include "base/logging.h"
#include "remoting/base/http_status.h"
#include "remoting/base/scoped_protobuf_http_request.h"
#include "remoting/signaling/ftl_services_context.h"
#include "remoting/signaling/message_channel_strategy.h"
#include "remoting/signaling/signaling_tracker.h"
namespace remoting {
MessageChannel::MessageChannel(std::unique_ptr<MessageChannelStrategy> strategy,
SignalingTracker* signaling_tracker)
: strategy_(std::move(strategy)),
// TODO: joedow - Move backoff policy to a shared location.
reconnect_retry_backoff_(&FtlServicesContext::GetBackoffPolicy()),
signaling_tracker_(signaling_tracker) {
strategy_->set_on_channel_active(base::BindRepeating(
&MessageChannel::OnChannelActive, weak_factory_.GetWeakPtr()));
}
MessageChannel::~MessageChannel() = default;
void MessageChannel::StartReceivingMessages(base::OnceClosure on_ready,
DoneCallback on_closed) {
stream_closed_callbacks_.push_back(std::move(on_closed));
if (state_ == State::STARTED) {
std::move(on_ready).Run();
return;
}
stream_ready_callbacks_.push_back(std::move(on_ready));
if (state_ == State::STARTING) {
return;
}
state_ = State::STARTING;
RetryStartReceivingMessagesWithBackoff();
}
void MessageChannel::StopReceivingMessages() {
if (state_ == State::STOPPED) {
return;
}
// Current stream callbacks shouldn't receive notification for future streams.
stream_ready_callbacks_.clear();
stream_closed_callbacks_.clear();
StopReceivingMessagesInternal();
}
const net::BackoffEntry&
MessageChannel::GetReconnectRetryBackoffEntryForTesting() const {
return reconnect_retry_backoff_;
}
void MessageChannel::OnReceiveMessagesStreamReady() {
DCHECK_EQ(State::STARTING, state_);
state_ = State::STARTED;
if (signaling_tracker_) {
signaling_tracker_->OnSignalingActive();
}
RunStreamReadyCallbacks();
BeginStreamTimers();
}
void MessageChannel::OnReceiveMessagesStreamClosed(const HttpStatus& status) {
if (state_ == State::STOPPED) {
// Previously closed by the caller.
return;
}
if (status.ok()) {
// The backend closes the stream. This is not an error so we restart it
// without backoff.
VLOG(1) << "Stream has been closed by the server. Reconnecting...";
reconnect_retry_backoff_.Reset();
RetryStartReceivingMessages();
return;
}
reconnect_retry_backoff_.InformOfRequest(false);
if (status.error_code() == HttpStatus::Code::ABORTED ||
status.error_code() == HttpStatus::Code::UNAVAILABLE ||
status.error_code() == HttpStatus::Code::NETWORK_ERROR) {
// These are 'soft' connection errors that should be retried.
// Other errors should be ignored.
RetryStartReceivingMessagesWithBackoff();
return;
}
stream_ready_callbacks_.clear();
StopReceivingMessagesInternal();
RunStreamClosedCallbacks(status);
}
void MessageChannel::RunStreamReadyCallbacks() {
if (stream_ready_callbacks_.empty()) {
return;
}
// The swap is to make StartReceivingMessages() reentrant.
std::list<base::OnceClosure> callbacks;
callbacks.swap(stream_ready_callbacks_);
for (base::OnceClosure& callback : callbacks) {
std::move(callback).Run();
}
}
void MessageChannel::RunStreamClosedCallbacks(const HttpStatus& status) {
if (stream_closed_callbacks_.empty()) {
return;
}
// The swap is to make StartReceivingMessages() reentrant.
std::list<DoneCallback> callbacks;
callbacks.swap(stream_closed_callbacks_);
for (DoneCallback& callback : callbacks) {
std::move(callback).Run(status);
}
}
void MessageChannel::RetryStartReceivingMessagesWithBackoff() {
VLOG(1) << "RetryStartReceivingMessages will be called with backoff: "
<< reconnect_retry_backoff_.GetTimeUntilRelease();
reconnect_retry_timer_.Start(
FROM_HERE, reconnect_retry_backoff_.GetTimeUntilRelease(),
base::BindOnce(&MessageChannel::RetryStartReceivingMessages,
base::Unretained(this)));
}
void MessageChannel::RetryStartReceivingMessages() {
VLOG(1) << "RetryStartReceivingMessages called";
StopReceivingMessagesInternal();
StartReceivingMessagesInternal();
}
void MessageChannel::StartReceivingMessagesInternal() {
DCHECK_EQ(State::STOPPED, state_);
state_ = State::STARTING;
receive_messages_stream_ = strategy_->CreateChannel(
base::BindOnce(&MessageChannel::OnReceiveMessagesStreamReady,
weak_factory_.GetWeakPtr()),
base::BindOnce(&MessageChannel::OnReceiveMessagesStreamClosed,
weak_factory_.GetWeakPtr()));
}
void MessageChannel::StopReceivingMessagesInternal() {
DCHECK_NE(State::STOPPED, state_);
state_ = State::STOPPED;
receive_messages_stream_.reset();
reconnect_retry_timer_.Stop();
stream_inactivity_timer_.reset();
}
bool MessageChannel::IsReceivingMessages() const {
return receive_messages_stream_.get() != nullptr;
}
void MessageChannel::BeginStreamTimers() {
reconnect_retry_backoff_.Reset();
stream_inactivity_timer_ = std::make_unique<base::DelayTimer>(
FROM_HERE, strategy_->GetInactivityTimeout(), this,
&MessageChannel::OnInactivityTimeout);
stream_inactivity_timer_->Reset();
}
void MessageChannel::OnInactivityTimeout() {
LOG(WARNING) << "Timed out waiting for message from server.";
reconnect_retry_backoff_.InformOfRequest(false);
RetryStartReceivingMessagesWithBackoff();
}
void MessageChannel::OnChannelActive() {
stream_inactivity_timer_->Reset();
if (signaling_tracker_) {
signaling_tracker_->OnSignalingActive();
}
}
} // namespace remoting