blob: aa675b2b1a6d203e4af985f8fadab579004e79ff [file] [log] [blame]
// Copyright 2020 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/reporting/client/report_queue_impl.h"
#include <cstddef>
#include <memory>
#include <optional>
#include <utility>
#include "base/containers/queue.h"
#include "base/json/json_reader.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_refptr.h"
#include "base/test/gtest_util.h"
#include "base/test/task_environment.h"
#include "base/types/expected.h"
#include "base/values.h"
#include "components/reporting/client/mock_report_queue.h"
#include "components/reporting/client/report_queue_configuration.h"
#include "components/reporting/proto/synced/record_constants.pb.h"
#include "components/reporting/proto/test.pb.h"
#include "components/reporting/storage/storage_module_interface.h"
#include "components/reporting/storage/test_storage_module.h"
#include "components/reporting/util/rate_limiter_interface.h"
#include "components/reporting/util/status.h"
#include "components/reporting/util/status_macros.h"
#include "components/reporting/util/statusor.h"
#include "components/reporting/util/test_support_callbacks.h"
#include "testing/gmock/include/gmock/gmock.h"
#include "testing/gtest/include/gtest/gtest.h"
using ::testing::_;
using ::testing::AllOf;
using ::testing::EndsWith;
using ::testing::Eq;
using ::testing::MockFunction;
using ::testing::NiceMock;
using ::testing::Not;
using ::testing::NotNull;
using ::testing::Property;
using ::testing::Return;
using ::testing::StrEq;
using ::testing::WithArg;
using ::reporting::test::TestStorageModule;
namespace reporting {
namespace {
constexpr char kTestMessage[] = "TEST_MESSAGE";
class MockRateLimiter : public RateLimiterInterface {
public:
MOCK_METHOD(bool, Acquire, (size_t event_size), (override));
};
// Creates a |ReportQueue| using |TestStorageModule| and
// |TestEncryptionModule|. Allows access to the storage module for checking
// stored values.
class ReportQueueImplTest : public testing::Test {
protected:
ReportQueueImplTest()
: priority_(Priority::IMMEDIATE),
dm_token_("FAKE_DM_TOKEN"),
destination_(Destination::UPLOAD_EVENTS),
storage_module_(base::MakeRefCounted<TestStorageModule>()),
policy_check_callback_(
base::BindRepeating(&MockFunction<Status()>::Call,
base::Unretained(&mocked_policy_check_))) {}
void SetUp() override {
ON_CALL(mocked_policy_check_, Call())
.WillByDefault(Return(Status::StatusOK()));
auto config_result =
ReportQueueConfiguration::Create({.destination = destination_})
.SetDMToken(dm_token_)
.SetPolicyCheckCallback(policy_check_callback_)
.Build();
ASSERT_TRUE(config_result.has_value()) << config_result.error();
test::TestEvent<StatusOr<std::unique_ptr<ReportQueue>>> report_queue_event;
ReportQueueImpl::Create(std::move(config_result.value()), storage_module_,
report_queue_event.cb());
auto report_queue_result = report_queue_event.result();
ASSERT_TRUE(report_queue_result.has_value()) << report_queue_result.error();
report_queue_ = std::move(report_queue_result.value());
}
Status EnqueueTestRecord(
std::unique_ptr<reporting::ReportQueueConfiguration> config,
test::TestMessage test_message) {
test::TestEvent<StatusOr<std::unique_ptr<ReportQueue>>> report_queue_event;
ReportQueueImpl::Create(std::move(config), storage_module_,
report_queue_event.cb());
auto report_queue_result = report_queue_event.result();
CHECK(report_queue_result.has_value()) << report_queue_result.error();
report_queue_ = std::move(report_queue_result.value());
test::TestEvent<Status> a;
report_queue_->Enqueue(std::make_unique<test::TestMessage>(test_message),
priority_, a.cb());
return a.result();
}
TestStorageModule* test_storage_module() const {
TestStorageModule* const test_storage_module =
static_cast<TestStorageModule*>(storage_module_.get());
EXPECT_THAT(test_storage_module, NotNull());
return test_storage_module;
}
NiceMock<MockFunction<Status()>> mocked_policy_check_;
base::test::TaskEnvironment task_environment_{
base::test::TaskEnvironment::TimeSource::MOCK_TIME};
const Priority priority_;
std::unique_ptr<ReportQueue> report_queue_;
base::OnceCallback<void(Status)> callback_;
const std::string dm_token_;
const Destination destination_;
scoped_refptr<StorageModuleInterface> storage_module_;
ReportQueueConfiguration::PolicyCheckCallback policy_check_callback_;
};
// Enqueues a random string and ensures that the string arrives unaltered in the
// |StorageModuleInterface|.
TEST_F(ReportQueueImplTest, SuccessfulStringRecord) {
test::TestEvent<Status> a;
report_queue_->Enqueue(kTestMessage, priority_, a.cb());
const auto a_result = a.result();
EXPECT_OK(a_result) << a_result;
EXPECT_THAT(test_storage_module()->priority(), Eq(priority_));
EXPECT_THAT(test_storage_module()->record().has_timestamp_us(), Eq(true));
EXPECT_THAT(test_storage_module()->record().data(), StrEq(kTestMessage));
}
// Enqueues a |base::Value| dictionary and ensures it arrives unaltered in the
// |StorageModuleInterface|.
TEST_F(ReportQueueImplTest, SuccessfulBaseValueRecord) {
static constexpr char kTestKey[] = "TEST_KEY";
static constexpr char kTestValue[] = "TEST_VALUE";
base::Value::Dict test_dict;
test_dict.Set(kTestKey, kTestValue);
test::TestEvent<Status> a;
report_queue_->Enqueue(test_dict.Clone(), priority_, a.cb());
const auto a_result = a.result();
EXPECT_OK(a_result) << a_result;
EXPECT_THAT(test_storage_module()->priority(), Eq(priority_));
EXPECT_THAT(test_storage_module()->record().has_timestamp_us(), Eq(true));
std::optional<base::Value> value_result =
base::JSONReader::Read(test_storage_module()->record().data(),
base::JSON_PARSE_CHROMIUM_EXTENSIONS);
ASSERT_TRUE(value_result.has_value());
EXPECT_EQ(value_result.value().GetDict(), test_dict);
}
// Enqueues a |TestMessage| and ensures that it arrives unaltered in the
// |StorageModuleInterface|.
TEST_F(ReportQueueImplTest, SuccessfulProtoRecord) {
test::TestMessage test_message;
test_message.set_test(kTestMessage);
test::TestEvent<Status> a;
report_queue_->Enqueue(std::make_unique<test::TestMessage>(test_message),
priority_, a.cb());
const auto a_result = a.result();
EXPECT_OK(a_result) << a_result;
EXPECT_THAT(test_storage_module()->priority(), Eq(priority_));
EXPECT_THAT(test_storage_module()->record().has_timestamp_us(), Eq(true));
test::TestMessage result_message;
ASSERT_TRUE(
result_message.ParseFromString(test_storage_module()->record().data()));
ASSERT_EQ(result_message.test(), test_message.test());
}
// Verifies that records sent to `Destination::EVENT_METRIC` are `MetricData`
// protos.
TEST_F(ReportQueueImplTest,
NonMetricDataFailsToEnqueueToEventMetricDestination) {
auto config_result = ReportQueueConfiguration::Create(
{.destination = Destination::EVENT_METRIC})
.SetDMToken(dm_token_)
.SetPolicyCheckCallback(policy_check_callback_)
.Build();
ASSERT_TRUE(config_result.has_value()) << config_result.error();
// Test records are not MetricData so they should trigger CHECK.
EXPECT_CHECK_DEATH(static_cast<void>(EnqueueTestRecord(
std::move(config_result.value()), test::TestMessage())));
}
// Verifies that records sent to `Destination::TELEMETRY_METRIC` are
// `MetricData` protos.
TEST_F(ReportQueueImplTest,
NonMetricDataFailsToEnqueueToTelemetryMetricDestination) {
auto config_result = ReportQueueConfiguration::Create(
{.destination = Destination::TELEMETRY_METRIC})
.SetDMToken(dm_token_)
.SetPolicyCheckCallback(policy_check_callback_)
.Build();
ASSERT_TRUE(config_result.has_value()) << config_result.error();
// Test records are not MetricData so they should trigger CHECK.
EXPECT_CHECK_DEATH(static_cast<void>(EnqueueTestRecord(
std::move(config_result.value()), test::TestMessage())));
}
// Verifies that records sent to `Destination::INFO_METRIC` are
// `MetricData` protos.
TEST_F(ReportQueueImplTest,
NonMetricDataFailsToEnqueueToInfoMetricDestination) {
auto config_result = ReportQueueConfiguration::Create(
{.destination = Destination::INFO_METRIC})
.SetDMToken(dm_token_)
.SetPolicyCheckCallback(policy_check_callback_)
.Build();
ASSERT_TRUE(config_result.has_value()) << config_result.error();
// Test records are not MetricData so they should trigger CHECK.
EXPECT_CHECK_DEATH(static_cast<void>(EnqueueTestRecord(
std::move(config_result.value()), test::TestMessage())));
}
TEST_F(ReportQueueImplTest, SuccessfulProtoRecordWithRateLimiter) {
auto rate_limiter = std::make_unique<MockRateLimiter>();
auto* const mock_rate_limiter = rate_limiter.get();
auto config_result =
ReportQueueConfiguration::Create({.destination = destination_})
.SetDMToken(dm_token_)
.SetPolicyCheckCallback(policy_check_callback_)
.SetRateLimiter(std::move(rate_limiter))
.Build();
ASSERT_TRUE(config_result.has_value()) << config_result.error();
test::TestEvent<StatusOr<std::unique_ptr<ReportQueue>>> report_queue_event;
ReportQueueImpl::Create(std::move(config_result.value()), storage_module_,
report_queue_event.cb());
auto report_queue_result = report_queue_event.result();
ASSERT_TRUE(report_queue_result.has_value()) << report_queue_result.error();
report_queue_ = std::move(report_queue_result.value());
test::TestMessage test_message;
test_message.set_test(kTestMessage);
// Fail Enqueue by rate limiter.
EXPECT_CALL(*mock_rate_limiter, Acquire(_)).WillOnce(Return(false));
test::TestEvent<Status> a;
report_queue_->Enqueue(std::make_unique<test::TestMessage>(test_message),
priority_, a.cb());
const auto a_result = a.result();
EXPECT_THAT(a_result,
AllOf(Property(&Status::error_code, Eq(error::OUT_OF_RANGE)),
Property(&Status::error_message,
EndsWith(" rejected by rate limiter"))))
<< a_result;
// Succeed Enqueue by rate limiter.
EXPECT_CALL(*mock_rate_limiter, Acquire(_)).WillOnce(Return(true));
test::TestEvent<Status> b;
report_queue_->Enqueue(std::make_unique<test::TestMessage>(test_message),
priority_, b.cb());
const auto b_result = b.result();
EXPECT_OK(b_result) << b_result;
EXPECT_THAT(test_storage_module()->priority(), Eq(priority_));
EXPECT_THAT(test_storage_module()->record().has_timestamp_us(), Eq(true));
test::TestMessage result_message;
ASSERT_TRUE(
result_message.ParseFromString(test_storage_module()->record().data()));
ASSERT_EQ(result_message.test(), test_message.test());
}
TEST_F(ReportQueueImplTest, SuccessfulProtoRecordWithReservedSpace) {
static constexpr int64_t kReservedSpace = 12345L;
auto config_result =
ReportQueueConfiguration::Create(
{.destination = destination_, .reserved_space = kReservedSpace})
.SetDMToken(dm_token_)
.SetPolicyCheckCallback(policy_check_callback_)
.Build();
ASSERT_TRUE(config_result.has_value()) << config_result.error();
test::TestMessage test_message;
test_message.set_test(kTestMessage);
const auto a_result =
EnqueueTestRecord(std::move(config_result.value()), test_message);
EXPECT_OK(a_result) << a_result;
EXPECT_THAT(test_storage_module()->priority(), Eq(priority_));
EXPECT_THAT(test_storage_module()->record().has_timestamp_us(), Eq(true));
EXPECT_THAT(test_storage_module()->record().reserved_space(),
Eq(kReservedSpace));
test::TestMessage result_message;
ASSERT_TRUE(
result_message.ParseFromString(test_storage_module()->record().data()));
ASSERT_EQ(result_message.test(), test_message.test());
}
TEST_F(ReportQueueImplTest, SuccessfulProtoRecordWithSource) {
SourceInfo source_info;
source_info.set_source(SourceInfo::ASH);
auto config_result =
ReportQueueConfiguration::Create({.destination = destination_})
.SetDMToken(dm_token_)
.SetPolicyCheckCallback(policy_check_callback_)
.SetSourceInfo(source_info)
.Build();
ASSERT_TRUE(config_result.has_value()) << config_result.error();
test::TestMessage test_message;
test_message.set_test(kTestMessage);
const auto a_result =
EnqueueTestRecord(std::move(config_result.value()), test_message);
EXPECT_OK(a_result) << a_result;
EXPECT_THAT(test_storage_module()->priority(), Eq(priority_));
EXPECT_THAT(test_storage_module()->record().has_timestamp_us(), Eq(true));
EXPECT_THAT(test_storage_module()->record().reserved_space(), Eq(0L));
EXPECT_THAT(test_storage_module()->record().source_info().source(),
Eq(source_info.source()));
test::TestMessage result_message;
ASSERT_TRUE(
result_message.ParseFromString(test_storage_module()->record().data()));
EXPECT_THAT(result_message.test(), StrEq(test_message.test()));
}
TEST_F(ReportQueueImplTest, SuccessfulProtoRecordWithSourceVersion) {
SourceInfo source_info;
source_info.set_source(SourceInfo::ASH);
source_info.set_source_version("1.0.0");
auto config_result =
ReportQueueConfiguration::Create({.destination = destination_})
.SetDMToken(dm_token_)
.SetPolicyCheckCallback(policy_check_callback_)
.SetSourceInfo(source_info)
.Build();
ASSERT_TRUE(config_result.has_value()) << config_result.error();
test::TestMessage test_message;
test_message.set_test(kTestMessage);
const auto a_result =
EnqueueTestRecord(std::move(config_result.value()), test_message);
EXPECT_OK(a_result) << a_result;
EXPECT_THAT(test_storage_module()->priority(), Eq(priority_));
EXPECT_THAT(test_storage_module()->record().has_timestamp_us(), Eq(true));
EXPECT_THAT(test_storage_module()->record().reserved_space(), Eq(0L));
EXPECT_THAT(test_storage_module()->record().source_info().source(),
Eq(source_info.source()));
EXPECT_THAT(test_storage_module()->record().source_info().source_version(),
Eq(source_info.source_version()));
test::TestMessage result_message;
ASSERT_TRUE(
result_message.ParseFromString(test_storage_module()->record().data()));
EXPECT_THAT(result_message.test(), StrEq(test_message.test()));
}
// The call to enqueue should succeed, indicating that the storage operation has
// been scheduled. The callback should fail, indicating that storage was
// unsuccessful.
TEST_F(ReportQueueImplTest, CallSuccessCallbackFailure) {
EXPECT_CALL(*test_storage_module(), AddRecord(Eq(priority_), _, _))
.WillOnce(WithArg<2>([](base::OnceCallback<void(Status)> callback) {
std::move(callback).Run(Status(error::UNKNOWN, "Failing for Test"));
}));
test::TestMessage test_message;
test_message.set_test(kTestMessage);
test::TestEvent<Status> a;
report_queue_->Enqueue(std::make_unique<test::TestMessage>(test_message),
priority_, a.cb());
const auto result = a.result();
EXPECT_FALSE(result.ok());
EXPECT_THAT(result.error_code(), Eq(error::UNKNOWN));
}
TEST_F(ReportQueueImplTest, EnqueueStringFailsOnPolicy) {
EXPECT_CALL(mocked_policy_check_, Call())
.WillOnce(Return(Status(error::UNAUTHENTICATED, "Failing for tests")));
test::TestEvent<Status> a;
report_queue_->Enqueue(std::string(kTestMessage), priority_, a.cb());
const auto result = a.result();
EXPECT_FALSE(result.ok());
EXPECT_THAT(result.error_code(), Eq(error::UNAUTHENTICATED));
}
TEST_F(ReportQueueImplTest, EnqueueProtoFailsOnPolicy) {
EXPECT_CALL(mocked_policy_check_, Call())
.WillOnce(Return(Status(error::UNAUTHENTICATED, "Failing for tests")));
test::TestMessage test_message;
test_message.set_test(kTestMessage);
test::TestEvent<Status> a;
report_queue_->Enqueue(std::make_unique<test::TestMessage>(test_message),
priority_, a.cb());
const auto result = a.result();
EXPECT_FALSE(result.ok());
EXPECT_THAT(result.error_code(), Eq(error::UNAUTHENTICATED));
}
TEST_F(ReportQueueImplTest, EnqueueValueFailsOnPolicy) {
EXPECT_CALL(mocked_policy_check_, Call())
.WillOnce(Return(Status(error::UNAUTHENTICATED, "Failing for tests")));
static constexpr char kTestKey[] = "TEST_KEY";
static constexpr char kTestValue[] = "TEST_VALUE";
base::Value::Dict test_dict;
test_dict.Set(kTestKey, kTestValue);
test::TestEvent<Status> a;
report_queue_->Enqueue(test_dict.Clone(), priority_, a.cb());
const auto result = a.result();
EXPECT_FALSE(result.ok());
EXPECT_THAT(result.error_code(), Eq(error::UNAUTHENTICATED));
}
TEST_F(ReportQueueImplTest, EnqueueAndFlushSuccess) {
test::TestMessage test_message;
test_message.set_test(kTestMessage);
test::TestEvent<Status> a;
report_queue_->Enqueue(std::make_unique<test::TestMessage>(test_message),
priority_, a.cb());
const auto a_result = a.result();
EXPECT_OK(a_result) << a_result;
test::TestEvent<Status> f;
report_queue_->Flush(priority_, f.cb());
const auto f_result = f.result();
EXPECT_OK(f_result) << f_result;
}
TEST_F(ReportQueueImplTest, EnqueueSuccessFlushFailure) {
test::TestMessage test_message;
test_message.set_test(kTestMessage);
test::TestEvent<Status> a;
report_queue_->Enqueue(std::make_unique<test::TestMessage>(test_message),
priority_, a.cb());
const auto a_result = a.result();
EXPECT_OK(a_result) << a_result;
EXPECT_CALL(*test_storage_module(), Flush(Eq(priority_), _))
.WillOnce(WithArg<1>([](base::OnceCallback<void(Status)> callback) {
std::move(callback).Run(Status(error::UNKNOWN, "Failing for Test"));
}));
test::TestEvent<Status> f;
report_queue_->Flush(priority_, f.cb());
const auto result = f.result();
EXPECT_FALSE(result.ok());
EXPECT_THAT(result.error_code(), Eq(error::UNKNOWN));
}
// Enqueues a random string into speculative queue, then enqueues a sting,
// attaches actual one and ensures that the string arrives unaltered in the
// |StorageModuleInterface|.
TEST_F(ReportQueueImplTest, SuccessfulSpeculativeStringRecord) {
test::TestEvent<Status> a;
auto speculative_report_queue =
SpeculativeReportQueueImpl::Create({.destination = destination_});
speculative_report_queue->Enqueue(std::string(kTestMessage), priority_,
a.cb());
// Enqueue would not end until actual queue is attached.
speculative_report_queue->PrepareToAttachActualQueue().Run(
std::move(report_queue_));
const auto a_result = a.result();
EXPECT_OK(a_result) << a_result;
// Let everything ongoing to finish.
task_environment_.RunUntilIdle();
EXPECT_THAT(test_storage_module()->priority(), Eq(priority_));
EXPECT_THAT(test_storage_module()->record().has_timestamp_us(), Eq(true));
EXPECT_THAT(test_storage_module()->record().data(), StrEq(kTestMessage));
}
TEST_F(ReportQueueImplTest, SuccessfulSpeculativeStringRecordWithRateLimiter) {
auto rate_limiter = std::make_unique<MockRateLimiter>();
auto* const mock_rate_limiter = rate_limiter.get();
auto config_result =
ReportQueueConfiguration::Create({.destination = destination_})
.SetDMToken(dm_token_)
.SetPolicyCheckCallback(policy_check_callback_)
.SetRateLimiter(std::move(rate_limiter))
.Build();
ASSERT_TRUE(config_result.has_value()) << config_result.error();
test::TestEvent<StatusOr<std::unique_ptr<ReportQueue>>> report_queue_event;
ReportQueueImpl::Create(std::move(config_result.value()), storage_module_,
report_queue_event.cb());
auto report_queue_result = report_queue_event.result();
ASSERT_TRUE(report_queue_result.has_value()) << report_queue_result.error();
report_queue_ = std::move(report_queue_result.value());
EXPECT_CALL(*mock_rate_limiter, Acquire(_)).WillOnce(Return(false));
test::TestEvent<Status> a;
auto speculative_report_queue =
SpeculativeReportQueueImpl::Create({.destination = destination_});
speculative_report_queue->Enqueue(std::string(kTestMessage), priority_,
a.cb());
// Enqueue would not end until actual queue is attached.
speculative_report_queue->PrepareToAttachActualQueue().Run(
std::move(report_queue_));
const auto a_result = a.result();
EXPECT_THAT(a_result,
AllOf(Property(&Status::error_code, Eq(error::OUT_OF_RANGE)),
Property(&Status::error_message,
EndsWith(" rejected by rate limiter"))))
<< a_result;
EXPECT_CALL(*mock_rate_limiter, Acquire(_)).WillOnce(Return(true));
test::TestEvent<Status> b;
speculative_report_queue->Enqueue(std::string(kTestMessage), priority_,
b.cb());
const auto b_result = b.result();
EXPECT_OK(b_result) << b_result;
// Let everything ongoing to finish.
task_environment_.RunUntilIdle();
EXPECT_THAT(test_storage_module()->priority(), Eq(priority_));
EXPECT_THAT(test_storage_module()->record().has_timestamp_us(), Eq(true));
EXPECT_THAT(test_storage_module()->record().data(), StrEq(kTestMessage));
}
TEST_F(ReportQueueImplTest,
SuccessfulSpeculativeStringRecordWithReservedSpace) {
static constexpr int64_t kReservedSpace = 12345L;
auto config_result =
ReportQueueConfiguration::Create(
{.destination = destination_, .reserved_space = kReservedSpace})
.SetDMToken(dm_token_)
.SetPolicyCheckCallback(policy_check_callback_)
.Build();
ASSERT_TRUE(config_result.has_value()) << config_result.error();
test::TestEvent<StatusOr<std::unique_ptr<ReportQueue>>> report_queue_event;
ReportQueueImpl::Create(std::move(config_result.value()), storage_module_,
report_queue_event.cb());
auto report_queue_result = report_queue_event.result();
ASSERT_TRUE(report_queue_result.has_value()) << report_queue_result.error();
report_queue_ = std::move(report_queue_result.value());
test::TestEvent<Status> a;
auto speculative_report_queue =
SpeculativeReportQueueImpl::Create({.destination = destination_});
speculative_report_queue->Enqueue(std::string(kTestMessage), priority_,
a.cb());
// Enqueue would not end until actual queue is attached.
speculative_report_queue->PrepareToAttachActualQueue().Run(
std::move(report_queue_));
const auto a_result = a.result();
EXPECT_OK(a_result) << a_result;
// Let everything ongoing to finish.
task_environment_.RunUntilIdle();
EXPECT_THAT(test_storage_module()->priority(), Eq(priority_));
EXPECT_THAT(test_storage_module()->record().has_timestamp_us(), Eq(true));
EXPECT_THAT(test_storage_module()->record().data(), StrEq(kTestMessage));
EXPECT_THAT(test_storage_module()->record().reserved_space(),
Eq(kReservedSpace));
}
TEST_F(ReportQueueImplTest, SuccessfulSpeculativeStringRecordWithSource) {
SourceInfo source_info;
source_info.set_source(SourceInfo::ASH);
auto config_result =
ReportQueueConfiguration::Create({.destination = destination_})
.SetDMToken(dm_token_)
.SetPolicyCheckCallback(policy_check_callback_)
.SetSourceInfo(source_info)
.Build();
ASSERT_TRUE(config_result.has_value()) << config_result.error();
test::TestEvent<StatusOr<std::unique_ptr<ReportQueue>>> report_queue_event;
ReportQueueImpl::Create(std::move(config_result.value()), storage_module_,
report_queue_event.cb());
auto report_queue_result = report_queue_event.result();
ASSERT_TRUE(report_queue_result.has_value()) << report_queue_result.error();
report_queue_ = std::move(report_queue_result.value());
test::TestEvent<Status> a;
auto speculative_report_queue =
SpeculativeReportQueueImpl::Create({.destination = destination_});
speculative_report_queue->Enqueue(std::string(kTestMessage), priority_,
a.cb());
// Enqueue would not end until actual queue is attached.
speculative_report_queue->PrepareToAttachActualQueue().Run(
std::move(report_queue_));
const auto a_result = a.result();
EXPECT_OK(a_result) << a_result;
// Let everything ongoing to finish.
task_environment_.RunUntilIdle();
EXPECT_THAT(test_storage_module()->priority(), Eq(priority_));
EXPECT_THAT(test_storage_module()->record().has_timestamp_us(), Eq(true));
EXPECT_THAT(test_storage_module()->record().data(), StrEq(kTestMessage));
EXPECT_THAT(test_storage_module()->record().reserved_space(), Eq(0L));
EXPECT_THAT(test_storage_module()->record().source_info().source(),
Eq(source_info.source()));
}
TEST_F(ReportQueueImplTest,
SuccessfulSpeculativeStringRecordWithSourceVersion) {
SourceInfo source_info;
source_info.set_source(SourceInfo::ASH);
source_info.set_source_version("1.0.0");
auto config_result =
ReportQueueConfiguration::Create({.destination = destination_})
.SetDMToken(dm_token_)
.SetPolicyCheckCallback(policy_check_callback_)
.SetSourceInfo(source_info)
.Build();
ASSERT_TRUE(config_result.has_value()) << config_result.error();
test::TestEvent<StatusOr<std::unique_ptr<ReportQueue>>> report_queue_event;
ReportQueueImpl::Create(std::move(config_result.value()), storage_module_,
report_queue_event.cb());
auto report_queue_result = report_queue_event.result();
ASSERT_TRUE(report_queue_result.has_value()) << report_queue_result.error();
report_queue_ = std::move(report_queue_result.value());
test::TestEvent<Status> a;
auto speculative_report_queue =
SpeculativeReportQueueImpl::Create({.destination = destination_});
speculative_report_queue->Enqueue(std::string(kTestMessage), priority_,
a.cb());
// Enqueue would not end until actual queue is attached.
speculative_report_queue->PrepareToAttachActualQueue().Run(
std::move(report_queue_));
const auto a_result = a.result();
EXPECT_OK(a_result) << a_result;
// Let everything ongoing to finish.
task_environment_.RunUntilIdle();
EXPECT_THAT(test_storage_module()->priority(), Eq(priority_));
EXPECT_THAT(test_storage_module()->record().has_timestamp_us(), Eq(true));
EXPECT_THAT(test_storage_module()->record().data(), StrEq(kTestMessage));
EXPECT_THAT(test_storage_module()->record().reserved_space(), Eq(0L));
EXPECT_THAT(test_storage_module()->record().source_info().source(),
Eq(source_info.source()));
EXPECT_THAT(test_storage_module()->record().source_info().source_version(),
StrEq(source_info.source_version()));
}
TEST_F(ReportQueueImplTest, SpeculativeQueueMultipleRecordsAfterCreation) {
static constexpr char kTestString1[] = "record1";
static constexpr char kTestString2[] = "record2";
auto speculative_report_queue =
SpeculativeReportQueueImpl::Create({.destination = destination_});
speculative_report_queue->PrepareToAttachActualQueue().Run(
std::move(report_queue_));
// Let everything ongoing to finish.
task_environment_.RunUntilIdle();
test::TestEvent<Status> test_event1;
speculative_report_queue->Enqueue(kTestString1, Priority::IMMEDIATE,
test_event1.cb());
const auto result1 = test_event1.result();
ASSERT_OK(result1) << result1;
EXPECT_THAT(test_storage_module()->priority(), Eq(Priority::IMMEDIATE));
EXPECT_THAT(test_storage_module()->record().data(), StrEq(kTestString1));
test::TestEvent<Status> test_event2;
speculative_report_queue->Enqueue(kTestString2, Priority::SLOW_BATCH,
test_event2.cb());
const auto result2 = test_event2.result();
ASSERT_OK(result2) << result2;
EXPECT_THAT(test_storage_module()->priority(), Eq(Priority::SLOW_BATCH));
EXPECT_THAT(test_storage_module()->record().data(), StrEq(kTestString2));
}
TEST_F(ReportQueueImplTest, SpeculativeQueueCreationFailedToCreate) {
test::TestEvent<Status> test_event;
{
auto speculative_report_queue =
SpeculativeReportQueueImpl::Create({.destination = destination_});
// Fail to attach queue before calling `Enqueue`.
speculative_report_queue->PrepareToAttachActualQueue().Run(
base::unexpected(Status(error::UNKNOWN, "Failed for Test")));
task_environment_.RunUntilIdle(); // Let `AttachActualQueue` finish.
speculative_report_queue->Enqueue(kTestMessage, Priority::IMMEDIATE,
test_event.cb());
} // Destructs `speculative_report_queue` now, fails all pending Enqueues.
// Unfulfilled pending Enqueue returns the queue failure status.
const auto result = test_event.result();
ASSERT_FALSE(result.ok());
EXPECT_THAT(result.code(), Eq(error::DATA_LOSS)) << result;
}
TEST_F(ReportQueueImplTest, SpeculativeQueueEnqueueAndCreationFailed) {
test::TestEvent<Status> test_event;
auto speculative_report_queue =
SpeculativeReportQueueImpl::Create({.destination = destination_});
speculative_report_queue->Enqueue(kTestMessage, Priority::IMMEDIATE,
test_event.cb());
// Fail to attach queue after calling `Enqueue`.
speculative_report_queue->PrepareToAttachActualQueue().Run(
base::unexpected(Status(error::UNKNOWN, "Failed for Test")));
task_environment_.RunUntilIdle(); // Let `AttachActualQueue` finish.
// Unfulfilled pending Enqueue returns the queue failure status.
const auto result = test_event.result();
ASSERT_FALSE(result.ok());
EXPECT_THAT(result.code(), Eq(error::UNKNOWN)) << result;
}
TEST_F(ReportQueueImplTest, EnqueueRecordWithInvalidPriority) {
test::TestEvent<Status> event;
report_queue_->Enqueue(std::string(kTestMessage),
Priority::UNDEFINED_PRIORITY, event.cb());
const auto result = event.result();
ASSERT_FALSE(result.ok());
EXPECT_THAT(result.code(), Eq(error::INVALID_ARGUMENT));
}
TEST_F(ReportQueueImplTest, FlushSpeculativeReportQueue) {
test::TestEvent<Status> event;
// Set up speculative report queue
auto speculative_report_queue =
SpeculativeReportQueueImpl::Create({.destination = destination_});
speculative_report_queue->PrepareToAttachActualQueue().Run(
std::move(report_queue_));
task_environment_.RunUntilIdle();
EXPECT_CALL(*test_storage_module(), Flush(Eq(priority_), _))
.WillOnce(WithArg<1>([](base::OnceCallback<void(Status)> callback) {
std::move(callback).Run(Status::StatusOK());
}));
speculative_report_queue->Flush(priority_, event.cb());
const auto result = event.result();
ASSERT_OK(result);
}
TEST_F(ReportQueueImplTest, FlushUninitializedSpeculativeReportQueue) {
test::TestEvent<Status> event;
auto speculative_report_queue =
SpeculativeReportQueueImpl::Create({.destination = destination_});
speculative_report_queue->Flush(priority_, event.cb());
const auto result = event.result();
ASSERT_FALSE(result.ok());
EXPECT_THAT(result.error_code(), Eq(error::FAILED_PRECONDITION));
}
TEST_F(ReportQueueImplTest, FlushFailedSpeculativeReportQueue) {
test::TestEvent<Status> event;
{
auto speculative_report_queue =
SpeculativeReportQueueImpl::Create({.destination = destination_});
speculative_report_queue->PrepareToAttachActualQueue().Run(
base::unexpected(Status(error::UNKNOWN, "Failed for Test")));
task_environment_.RunUntilIdle(); // Let `AttachActualQueue` finish.
speculative_report_queue->Flush(priority_, event.cb());
} // Destructs speculative
const auto result = event.result();
ASSERT_FALSE(result.ok());
EXPECT_THAT(result.error_code(), Eq(error::FAILED_PRECONDITION)) << result;
}
TEST_F(ReportQueueImplTest, AsyncProcessingReportQueue) {
auto mock_queue = std::make_unique<MockReportQueue>();
EXPECT_CALL(*mock_queue, AddProducedRecord)
.Times(3)
.WillRepeatedly([](ReportQueue::RecordProducer record_producer,
Priority event_priority,
ReportQueue::EnqueueCallback cb) {
std::move(cb).Run(Status::StatusOK());
});
test::TestEvent<Status> a_string;
mock_queue->Enqueue(std::string(kTestMessage), priority_, a_string.cb());
test::TestEvent<Status> a_proto;
test::TestMessage test_message;
test_message.set_test(kTestMessage);
mock_queue->Enqueue(std::make_unique<test::TestMessage>(test_message),
priority_, a_proto.cb());
test::TestEvent<Status> a_json;
static constexpr char kTestKey[] = "TEST_KEY";
static constexpr char kTestValue[] = "TEST_VALUE";
base::Value::Dict test_dict;
test_dict.Set(kTestKey, kTestValue);
mock_queue->Enqueue(std::move(test_dict), priority_, a_json.cb());
const auto a_string_result = a_string.result();
EXPECT_OK(a_string_result) << a_string_result;
const auto a_proto_result = a_proto.result();
EXPECT_OK(a_proto_result) << a_proto_result;
const auto a_json_result = a_json.result();
EXPECT_OK(a_json_result) << a_json_result;
}
TEST_F(ReportQueueImplTest, AsyncProcessingSpeculativeReportQueue) {
auto speculative_report_queue =
SpeculativeReportQueueImpl::Create({.destination = destination_});
test::TestEvent<Status> a_string;
speculative_report_queue->Enqueue(std::string(kTestMessage), priority_,
a_string.cb());
test::TestEvent<Status> a_proto;
test::TestMessage test_message;
test_message.set_test(kTestMessage);
speculative_report_queue->Enqueue(
std::make_unique<test::TestMessage>(test_message), priority_,
a_proto.cb());
test::TestEvent<Status> a_json;
static constexpr char kTestKey[] = "TEST_KEY";
static constexpr char kTestValue[] = "TEST_VALUE";
base::Value::Dict test_dict;
test_dict.Set(kTestKey, kTestValue);
speculative_report_queue->Enqueue(std::move(test_dict), priority_,
a_json.cb());
auto mock_queue = std::make_unique<MockReportQueue>();
EXPECT_CALL(*mock_queue, AddProducedRecord)
.Times(3)
.WillRepeatedly([](ReportQueue::RecordProducer record_producer,
Priority event_priority,
ReportQueue::EnqueueCallback cb) {
std::move(cb).Run(Status::StatusOK());
});
EXPECT_CALL(*mock_queue, GetDestination()).WillOnce(Return(destination_));
speculative_report_queue->PrepareToAttachActualQueue().Run(
std::move(mock_queue));
const auto a_string_result = a_string.result();
EXPECT_OK(a_string_result) << a_string_result;
const auto a_proto_result = a_proto.result();
EXPECT_OK(a_proto_result) << a_proto_result;
const auto a_json_result = a_json.result();
EXPECT_OK(a_json_result) << a_json_result;
}
TEST_F(ReportQueueImplTest, GetDestinationForReportQueue) {
EXPECT_THAT(report_queue_->GetDestination(), Eq(destination_));
}
TEST_F(ReportQueueImplTest, GetDestinationForSpeculativeReportQueueBeforeInit) {
const auto speculative_report_queue =
SpeculativeReportQueueImpl::Create({.destination = destination_});
EXPECT_THAT(speculative_report_queue->GetDestination(), Eq(destination_));
}
TEST_F(ReportQueueImplTest, GetDestinationForSpeculativeReportQueueAfterInit) {
const auto speculative_report_queue =
SpeculativeReportQueueImpl::Create({.destination = destination_});
speculative_report_queue->PrepareToAttachActualQueue().Run(
std::move(report_queue_));
task_environment_.RunUntilIdle();
EXPECT_THAT(speculative_report_queue->GetDestination(), Eq(destination_));
}
} // namespace
} // namespace reporting