blob: d4164356e83fc4297b58fc971b9904b84370cef1 [file] [log] [blame]
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "remoting/signaling/ftl_messaging_client.h"
#include <utility>
#include "base/bind_helpers.h"
#include "base/callback.h"
#include "base/guid.h"
#include "base/logging.h"
#include "base/time/time.h"
#include "remoting/base/grpc_support/grpc_async_server_streaming_request.h"
#include "remoting/base/grpc_support/grpc_async_unary_request.h"
#include "remoting/base/grpc_support/grpc_authenticated_executor.h"
#include "remoting/base/grpc_support/grpc_executor.h"
#include "remoting/signaling/ftl_grpc_context.h"
#include "remoting/signaling/ftl_message_reception_channel.h"
#include "remoting/signaling/registration_manager.h"
namespace remoting {
namespace {
void AddMessageToAckRequest(const ftl::InboxMessage& message,
ftl::AckMessagesRequest* request) {
ftl::ReceiverMessage* receiver_message = request->add_messages();
receiver_message->set_message_id(message.message_id());
receiver_message->set_allocated_receiver_id(
new ftl::Id(message.receiver_id()));
}
constexpr base::TimeDelta kInboxMessageTtl = base::TimeDelta::FromMinutes(1);
} // namespace
FtlMessagingClient::FtlMessagingClient(
OAuthTokenGetter* token_getter,
RegistrationManager* registration_manager,
SignalingTracker* signaling_tracker)
: FtlMessagingClient(
std::make_unique<GrpcAuthenticatedExecutor>(token_getter),
registration_manager,
std::make_unique<FtlMessageReceptionChannel>(signaling_tracker)) {}
FtlMessagingClient::FtlMessagingClient(
std::unique_ptr<GrpcExecutor> executor,
RegistrationManager* registration_manager,
std::unique_ptr<MessageReceptionChannel> channel) {
DCHECK(executor);
DCHECK(registration_manager);
DCHECK(channel);
executor_ = std::move(executor);
registration_manager_ = registration_manager;
messaging_stub_ = Messaging::NewStub(FtlGrpcContext::CreateChannel());
reception_channel_ = std::move(channel);
reception_channel_->Initialize(
base::BindRepeating(&FtlMessagingClient::OpenReceiveMessagesStream,
base::Unretained(this)),
base::BindRepeating(&FtlMessagingClient::OnMessageReceived,
base::Unretained(this)));
}
FtlMessagingClient::~FtlMessagingClient() = default;
std::unique_ptr<FtlMessagingClient::MessageCallbackSubscription>
FtlMessagingClient::RegisterMessageCallback(const MessageCallback& callback) {
return callback_list_.Add(callback);
}
void FtlMessagingClient::PullMessages(DoneCallback on_done) {
ftl::PullMessagesRequest request;
*request.mutable_header() = FtlGrpcContext::CreateRequestHeader(
registration_manager_->GetFtlAuthToken());
auto grpc_request = CreateGrpcAsyncUnaryRequest(
base::BindOnce(&Messaging::Stub::AsyncPullMessages,
base::Unretained(messaging_stub_.get())),
request,
base::BindOnce(&FtlMessagingClient::OnPullMessagesResponse,
base::Unretained(this), std::move(on_done)));
FtlGrpcContext::FillClientContext(grpc_request->context());
executor_->ExecuteRpc(std::move(grpc_request));
}
void FtlMessagingClient::SendMessage(
const std::string& destination,
const std::string& destination_registration_id,
const ftl::ChromotingMessage& message,
DoneCallback on_done) {
ftl::InboxSendRequest request;
*request.mutable_header() = FtlGrpcContext::CreateRequestHeader(
registration_manager_->GetFtlAuthToken());
request.set_time_to_live(kInboxMessageTtl.InMicroseconds());
// TODO(yuweih): See if we need to set requester_id
*request.mutable_dest_id() = FtlGrpcContext::CreateIdFromString(destination);
std::string serialized_message;
bool succeeded = message.SerializeToString(&serialized_message);
DCHECK(succeeded);
request.mutable_message()->set_message(serialized_message);
request.mutable_message()->set_message_id(base::GenerateGUID());
request.mutable_message()->set_message_type(
ftl::InboxMessage_MessageType_CHROMOTING_MESSAGE);
request.mutable_message()->set_message_class(
ftl::InboxMessage_MessageClass_STATUS);
if (!destination_registration_id.empty()) {
request.add_dest_registration_ids(destination_registration_id);
}
auto grpc_request = CreateGrpcAsyncUnaryRequest(
base::BindOnce(&Messaging::Stub::AsyncSendMessage,
base::Unretained(messaging_stub_.get())),
request,
base::BindOnce(&FtlMessagingClient::OnSendMessageResponse,
base::Unretained(this), std::move(on_done)));
FtlGrpcContext::FillClientContext(grpc_request->context());
executor_->ExecuteRpc(std::move(grpc_request));
}
void FtlMessagingClient::StartReceivingMessages(base::OnceClosure on_ready,
DoneCallback on_closed) {
reception_channel_->StartReceivingMessages(std::move(on_ready),
std::move(on_closed));
}
void FtlMessagingClient::StopReceivingMessages() {
reception_channel_->StopReceivingMessages();
}
bool FtlMessagingClient::IsReceivingMessages() const {
return reception_channel_->IsReceivingMessages();
}
void FtlMessagingClient::OnPullMessagesResponse(
DoneCallback on_done,
const grpc::Status& status,
const ftl::PullMessagesResponse& response) {
if (!status.ok()) {
LOG(ERROR) << "Failed to pull messages. "
<< "Error code: " << status.error_code()
<< ", message: " << status.error_message();
std::move(on_done).Run(status);
return;
}
ftl::AckMessagesRequest ack_request;
*ack_request.mutable_header() = FtlGrpcContext::CreateRequestHeader(
registration_manager_->GetFtlAuthToken());
for (const auto& message : response.messages()) {
RunMessageCallbacks(message);
AddMessageToAckRequest(message, &ack_request);
}
if (ack_request.messages_size() == 0) {
LOG(WARNING) << "No new message is received.";
std::move(on_done).Run(status);
return;
}
VLOG(1) << "Acking " << ack_request.messages_size() << " messages";
AckMessages(ack_request, std::move(on_done));
}
void FtlMessagingClient::OnSendMessageResponse(
DoneCallback on_done,
const grpc::Status& status,
const ftl::InboxSendResponse& response) {
std::move(on_done).Run(status);
}
void FtlMessagingClient::AckMessages(const ftl::AckMessagesRequest& request,
DoneCallback on_done) {
auto grpc_request = CreateGrpcAsyncUnaryRequest(
base::BindOnce(&Messaging::Stub::AsyncAckMessages,
base::Unretained(messaging_stub_.get())),
request,
base::BindOnce(&FtlMessagingClient::OnAckMessagesResponse,
base::Unretained(this), std::move(on_done)));
FtlGrpcContext::FillClientContext(grpc_request->context());
executor_->ExecuteRpc(std::move(grpc_request));
}
void FtlMessagingClient::OnAckMessagesResponse(
DoneCallback on_done,
const grpc::Status& status,
const ftl::AckMessagesResponse& response) {
// TODO(yuweih): Handle failure.
std::move(on_done).Run(status);
}
std::unique_ptr<ScopedGrpcServerStream>
FtlMessagingClient::OpenReceiveMessagesStream(
base::OnceClosure on_channel_ready,
const base::RepeatingCallback<void(const ftl::ReceiveMessagesResponse&)>&
on_incoming_msg,
base::OnceCallback<void(const grpc::Status&)> on_channel_closed) {
ftl::ReceiveMessagesRequest request;
*request.mutable_header() = FtlGrpcContext::CreateRequestHeader(
registration_manager_->GetFtlAuthToken());
std::unique_ptr<ScopedGrpcServerStream> stream;
auto grpc_request = CreateGrpcAsyncServerStreamingRequest(
base::BindOnce(&Messaging::Stub::AsyncReceiveMessages,
base::Unretained(messaging_stub_.get())),
request, std::move(on_channel_ready), on_incoming_msg,
std::move(on_channel_closed), &stream);
FtlGrpcContext::FillClientContext(grpc_request->context());
executor_->ExecuteRpc(std::move(grpc_request));
return stream;
}
void FtlMessagingClient::RunMessageCallbacks(const ftl::InboxMessage& message) {
if (message_tracker_.IsIdTracked(message.message_id())) {
LOG(WARNING) << "Found message with duplicated message ID: "
<< message.message_id();
return;
}
message_tracker_.TrackId(message.message_id());
if (message.sender_id().type() != ftl::IdType_Type_SYSTEM &&
message.sender_registration_id().empty()) {
LOG(WARNING) << "Ignored peer message with no sender registration ID.";
return;
}
if (message.message_type() !=
ftl::InboxMessage_MessageType_CHROMOTING_MESSAGE) {
LOG(WARNING) << "Received message with unknown type: "
<< message.message_type()
<< ", sender: " << message.sender_id().id();
return;
}
ftl::ChromotingMessage chromoting_message;
chromoting_message.ParseFromString(message.message());
callback_list_.Notify(message.sender_id(), message.sender_registration_id(),
chromoting_message);
}
void FtlMessagingClient::OnMessageReceived(const ftl::InboxMessage& message) {
RunMessageCallbacks(message);
ftl::AckMessagesRequest ack_request;
*ack_request.mutable_header() = FtlGrpcContext::CreateRequestHeader(
registration_manager_->GetFtlAuthToken());
AddMessageToAckRequest(message, &ack_request);
AckMessages(ack_request, base::DoNothing());
}
} // namespace remoting