blob: d449a888cbf33cf28dde6f9dc9f278670f5107d2 [file] [log] [blame]
// Copyright 2025 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "remoting/test/corp_messaging_playground.h"
#include <iostream>
#include <memory>
#include <variant>
#include "base/files/file_util.h"
#include "base/functional/bind.h"
#include "base/functional/callback.h"
#include "base/logging.h"
#include "base/memory/scoped_refptr.h"
#include "base/notreached.h"
#include "base/run_loop.h"
#include "base/strings/string_number_conversions.h"
#include "base/strings/string_split.h"
#include "base/task/bind_post_task.h"
#include "base/task/single_thread_task_runner.h"
#include "base/task/thread_pool.h"
#include "base/uuid.h"
#include "net/ssl/client_cert_store.h"
#include "remoting/base/certificate_helpers.h"
#include "remoting/base/http_status.h"
#include "remoting/base/internal_headers.h"
#include "remoting/base/rsa_key_pair.h"
#include "remoting/base/url_request_context_getter.h"
#include "remoting/signaling/corp_messaging_client.h"
#include "services/network/public/cpp/shared_url_loader_factory.h"
#include "services/network/transitional_url_loader_factory_owner.h"
#include "third_party/abseil-cpp/absl/functional/overload.h"
namespace remoting {
namespace {
using internal::BurstStruct;
using internal::PingPongStruct;
using internal::ShareSessionTokenStruct;
using internal::SimpleStruct;
// Squirrel-related messaging constants.
constexpr char kSquirrel[] = "🐿️";
constexpr int kSquirrelCount = 1000000;
constexpr char kSquirrelMsgStart[] = "Ready for lots of squirrels? -> ";
constexpr char kSquirrelMsgEnd[] = " -> Wow! That was nuts!!!";
} // namespace
class CorpMessagingPlayground::Core {
public:
using OnInputCallback = base::RepeatingCallback<void(char)>;
explicit Core(OnInputCallback on_input_callback);
~Core();
Core(const Core&) = delete;
Core& operator=(const Core&) = delete;
void Start();
private:
OnInputCallback on_input_callback_;
};
CorpMessagingPlayground::Core::Core(OnInputCallback on_input_callback)
: on_input_callback_(on_input_callback) {}
CorpMessagingPlayground::Core::~Core() = default;
void CorpMessagingPlayground::Core::Start() {
base::SetNonBlocking(STDIN_FILENO);
std::cout << "Press '1' to send a small message to the client." << std::endl;
std::cout << "Press '2' to send a burst of 10 messages to the client."
<< std::endl;
std::cout << "Press '3' to send a burst of 100 messages to the client."
<< std::endl;
std::cout << "Press '4' to start a ping-pong exchange." << std::endl;
std::cout << "Press '5' to send a large message." << std::endl;
std::cout << "Press 'x' to quit." << std::endl << std::endl;
while (true) {
char c;
int n = read(STDIN_FILENO, &c, 1);
if (n == 1) {
on_input_callback_.Run(c);
} else if (n == -1 && errno != EAGAIN) {
LOG(ERROR) << "Error reading from stdin";
break;
}
}
}
CorpMessagingPlayground::CorpMessagingPlayground(const std::string& username) {
auto url_request_context_getter =
base::MakeRefCounted<URLRequestContextGetter>(
base::SingleThreadTaskRunner::GetCurrentDefault());
url_loader_factory_owner_ =
std::make_unique<network::TransitionalURLLoaderFactoryOwner>(
url_request_context_getter, /* is_trusted= */ true);
client_ = std::make_unique<CorpMessagingClient>(
username, key_pair_->GetPublicKey(),
url_loader_factory_owner_->GetURLLoaderFactory(),
CreateClientCertStoreInstance());
core_ = std::make_unique<Core>(base::BindPostTask(
base::SingleThreadTaskRunner::GetCurrentDefault(),
base::BindRepeating(&CorpMessagingPlayground::OnCharacterInput,
weak_factory_.GetWeakPtr())));
}
CorpMessagingPlayground::~CorpMessagingPlayground() = default;
void CorpMessagingPlayground::Start() {
run_loop_ = std::make_unique<base::RunLoop>();
// `callback_subscription` is automatically unregistered after `run_loop_`
// completes and this function goes out of scope.
auto callback_subscription = client_->RegisterMessageCallback(
base::BindRepeating(&CorpMessagingPlayground::OnPeerMessageReceived,
base::Unretained(this)));
client_->StartReceivingMessages(
base::BindOnce(&CorpMessagingPlayground::OnStreamOpened,
base::Unretained(this)),
base::BindOnce(&CorpMessagingPlayground::OnStreamClosed,
base::Unretained(this)));
base::ThreadPool::PostTask(
FROM_HERE, {base::MayBlock()},
base::BindOnce(&Core::Start, base::Unretained(core_.get())));
run_loop_->Run();
}
void CorpMessagingPlayground::OnStreamOpened() {
LOG(INFO) << "Stream opened...";
}
void CorpMessagingPlayground::OnStreamClosed(const HttpStatus& status) {
LOG(INFO) << "Stream closed: " << status.ok() << ", "
<< static_cast<int>(status.error_code()) << ", "
<< status.error_message();
run_loop_->Quit();
}
void CorpMessagingPlayground::OnPeerMessageReceived(
const internal::PeerMessageStruct& message) {
const auto* system_test =
std::get_if<internal::SystemTestStruct>(&message.payload);
if (!system_test) {
LOG(WARNING) << "Received message with unsupported payload type.";
return;
}
std::visit(absl::Overload(
[this](const PingPongStruct& message) {
if (message.type == PingPongStruct::Type::PONG) {
auto rtt = base::Time::Now() - last_ping_sent_time_;
ping_total_rtt_ += rtt;
LOG(INFO) << "Current RTT: " << rtt.InMilliseconds()
<< "ms, Total RTT: "
<< ping_total_rtt_.InMilliseconds() << "ms";
if (message.current_count >= message.exchange_count) {
LOG(INFO) << "Ping-pong exchange finished. Total RTT: "
<< ping_total_rtt_.InMilliseconds() << "ms";
return;
}
// Send next PING.
internal::PingPongStruct ping_pong;
ping_pong.type = PingPongStruct::Type::PING;
ping_pong.rally_id = message.rally_id;
ping_pong.current_count = message.current_count + 1;
ping_pong.exchange_count = message.exchange_count;
internal::SystemTestStruct response_message;
response_message.test_message = std::move(ping_pong);
last_ping_sent_time_ = base::Time::Now();
client_->SendTestMessage(messaging_authz_token_,
std::move(response_message),
base::DoNothing());
} else if (message.type == PingPongStruct::Type::PING) {
// Send PONG.
internal::PingPongStruct ping_pong;
ping_pong.type = PingPongStruct::Type::PONG;
ping_pong.rally_id = message.rally_id;
ping_pong.current_count = message.current_count;
ping_pong.exchange_count = message.exchange_count;
internal::SystemTestStruct response_message;
response_message.test_message = std::move(ping_pong);
client_->SendTestMessage(messaging_authz_token_,
std::move(response_message),
base::DoNothing());
} else {
NOTREACHED();
}
},
[](const BurstStruct& message) {
LOG(INFO)
<< "Burst message received: index=" << message.index
<< ", burst_count=" << message.burst_count
<< ", payload=" << message.payload;
},
[](const SimpleStruct& simple_message) {
LOG(INFO) << "PeerMessage received: payload="
<< simple_message.payload;
},
[this](const ShareSessionTokenStruct& message) {
LOG(INFO) << "ShareSessionToken received.";
messaging_authz_token_ = message.messaging_authz_token;
}),
system_test->test_message);
}
void CorpMessagingPlayground::OnCharacterInput(char c) {
switch (c) {
case '1':
SendMessage();
break;
case '2':
SendMessage(10);
break;
case '3':
SendMessage(100);
break;
case '4':
StartPingPongRally();
break;
case '5':
SendLargeMessage();
break;
case 'x':
run_loop_->Quit();
break;
}
}
void CorpMessagingPlayground::SendMessage(int count) {
if (messaging_authz_token_.empty()) {
LOG(WARNING) << "No authz token received yet, cannot send message.";
return;
}
if (count > 1) {
for (int i = 0; i < count; i++) {
internal::SystemTestStruct message;
internal::BurstStruct burst;
burst.index = i;
burst.burst_count = count;
burst.payload = "Burst message #" + base::NumberToString(i + 1) + " of " +
base::NumberToString(count);
message.test_message = std::move(burst);
client_->SendTestMessage(messaging_authz_token_, std::move(message),
base::DoNothing());
}
return;
}
client_->SendMessage(messaging_authz_token_, "Hello from the playground!",
base::DoNothing());
}
void CorpMessagingPlayground::StartPingPongRally() {
if (messaging_authz_token_.empty()) {
LOG(WARNING) << "No authz token received yet, cannot start ping-pong.";
return;
}
LOG(INFO) << "Starting a new Ping-Pong rally.";
// TODO: joedow - Use a map to track RTTs for concurrent rallies.
ping_total_rtt_ = {};
last_ping_sent_time_ = base::Time::Now();
internal::SystemTestStruct message;
internal::PingPongStruct ping_pong;
ping_pong.type = PingPongStruct::Type::PING;
ping_pong.rally_id = "chromium-playground-rally-" +
base::Uuid::GenerateRandomV4().AsLowercaseString();
ping_pong.current_count = 1;
ping_pong.exchange_count = 10;
message.test_message = std::move(ping_pong);
client_->SendTestMessage(messaging_authz_token_, std::move(message),
base::DoNothing());
}
void CorpMessagingPlayground::SendLargeMessage() {
if (messaging_authz_token_.empty()) {
LOG(WARNING) << "No authz token received yet, cannot send large message.";
return;
}
std::string payload(kSquirrelMsgStart);
payload.reserve((sizeof(kSquirrelMsgStart) - 1) +
(sizeof(kSquirrelMsgEnd) - 1) +
(sizeof(kSquirrel) - 1) * kSquirrelCount);
for (int i = 0; i < kSquirrelCount; i++) {
payload += kSquirrel;
}
payload += kSquirrelMsgEnd;
client_->SendMessage(messaging_authz_token_, payload, base::DoNothing());
}
} // namespace remoting