blob: f2f31a68c8c8117fa2040e8036466c2ac814ea27 [file] [log] [blame]
// Copyright 2020 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chrome/browser/policy/messaging_layer/public/report_client.h"
#include <memory>
#include <utility>
#include "base/functional/bind.h"
#include "base/functional/callback.h"
#include "base/memory/ptr_util.h"
#include "base/memory/scoped_refptr.h"
#include "base/metrics/histogram_functions.h"
#include "base/no_destructor.h"
#include "base/path_service.h"
#include "base/strings/strcat.h"
#include "base/strings/string_number_conversions.h"
#include "base/threading/sequence_bound.h"
#include "base/types/expected.h"
#include "build/build_config.h"
#include "chrome/browser/policy/messaging_layer/storage_selector/storage_selector.h"
#include "chrome/browser/policy/messaging_layer/util/dm_token_retriever_provider.h"
#include "chrome/browser/policy/messaging_layer/util/reporting_server_connector.h"
#include "chrome/common/chrome_paths.h"
#include "components/reporting/client/dm_token_retriever.h"
#include "components/reporting/client/report_queue_configuration.h"
#include "components/reporting/storage/storage_module_interface.h"
#include "components/reporting/util/reporting_errors.h"
#include "components/reporting/util/status.h"
#include "components/reporting/util/statusor.h"
#if !BUILDFLAG(IS_CHROMEOS)
#include "base/task/bind_post_task.h"
#include "base/task/thread_pool.h"
#include "chrome/browser/policy/messaging_layer/upload/upload_provider.h"
#include "components/reporting/encryption/verification.h"
#endif // !BUILDFLAG(IS_CHROMEOS)
namespace reporting {
#if !BUILDFLAG(IS_CHROMEOS)
namespace {
const base::FilePath::CharType kReportingDirectory[] =
FILE_PATH_LITERAL("reporting");
} // namespace
#endif // !BUILDFLAG(IS_CHROMEOS)
ReportingClient::ReportingClient(
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner)
: ReportQueueProvider(
base::BindRepeating(
[](base::OnceCallback<void(
StatusOr<scoped_refptr<StorageModuleInterface>>)>
storage_created_cb) {
#if BUILDFLAG(IS_CHROMEOS)
StorageSelector::CreateMissiveStorageModule(
std::move(storage_created_cb));
#else // !BUILDFLAG(IS_CHROMEOS)
base::FilePath reporting_path;
const auto res = base::PathService::Get(chrome::DIR_USER_DATA,
&reporting_path);
CHECK(res) << "Could not retrieve base path";
reporting_path = reporting_path.Append(kReportingDirectory);
StorageSelector::CreateLocalStorageModule(
reporting_path, SignatureVerifier::VerificationKey(),
CompressionInformation::COMPRESSION_SNAPPY,
base::BindPostTask(
ReportQueueProvider::GetInstance()
->sequenced_task_runner(),
base::BindRepeating(
&ReportingClient::AsyncStartUploader,
ReportQueueProvider::GetInstance()->GetWeakPtr())),
std::move(storage_created_cb));
#endif // !BUILDFLAG(IS_CHROMEOS)
}),
sequenced_task_runner) {
// Register itself as observer to connector.
ReportingServerConnector::GetInstance()->AddObserver(this);
}
ReportingClient::~ReportingClient() {
// Unregister itself as observer to connector.
ReportingServerConnector::GetInstance()->RemoveObserver(this);
}
void ReportingClient::OnConnected() {
// Immediately perform dummy upload that will retrieve encryption key, if
// `Storage` does not have it yet. This is done to provide the key for later
// events posting even in case the device goes offline very soon after
// enrollment - `Flush` tries to request encryption key from the server. It
// is expected to succeed, but even if it fails, later enqueue operations
// have a good chance to succeed if the server is available. Note that we
// cannot use Speculative Report Queue - as opposed to `Enqueue`, `Flush`
// does wait for the underlying actual queue to be created.
ReportQueueProvider::CreateQueue(
ReportQueueConfiguration::Create(
{
.destination = Destination::HEARTBEAT_EVENTS // Unused
})
.Build()
.value(),
base::BindOnce([](StatusOr<std::unique_ptr<ReportQueue>> queue_result) {
if (!queue_result.has_value()) {
LOG(WARNING) << "Failed to create queue for initial flush";
return;
}
// Flush SECURITY queue since it is usually empty (events are uploaded
// immediately after they are posted).
queue_result.value()->Flush(
Priority::SECURITY, base::BindOnce([](Status status) {
LOG_IF(WARNING, !status.ok()) << "Initial flush error=" << status;
}));
}));
}
void ReportingClient::OnDisconnected() {
// No action required upon disconnect.
}
// static
ReportQueueProvider::SmartPtr<ReportingClient> ReportingClient::Create(
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner) {
return SmartPtr<ReportingClient>(
new ReportingClient(sequenced_task_runner),
base::OnTaskRunnerDeleter(sequenced_task_runner));
}
void ReportingClient::ConfigureReportQueue(
std::unique_ptr<ReportQueueConfiguration> configuration,
ReportQueueProvider::ReportQueueConfiguredCallback completion_cb) {
// If DM token has already been set (only likely for testing purposes or until
// pre-existing events are migrated over to use event types instead), we do
// nothing and trigger completion callback with report queue config.
if (!configuration->dm_token().empty()) {
std::move(completion_cb).Run(std::move(configuration));
return;
}
auto dm_token_retriever_provider =
std::make_unique<DMTokenRetrieverProvider>();
auto dm_token_retriever =
std::move(dm_token_retriever_provider)
->GetDMTokenRetrieverForEventType(configuration->event_type());
// Trigger completion callback with an internal error if no DM token retriever
// found
if (!dm_token_retriever) {
std::move(completion_cb)
.Run(base::unexpected(
Status(error::INTERNAL,
base::StrCat({"No DM token retriever found for event type=",
base::NumberToString(static_cast<int>(
configuration->event_type()))}))));
return;
}
std::move(dm_token_retriever)
->RetrieveDMToken(base::BindOnce(
[](std::unique_ptr<ReportQueueConfiguration> configuration,
ReportQueueProvider::ReportQueueConfiguredCallback completion_cb,
StatusOr<std::string> dm_token_result) {
// Trigger completion callback with error if there was an error
// retrieving DM token.
if (!dm_token_result.has_value()) {
std::move(completion_cb)
.Run(base::unexpected(dm_token_result.error()));
return;
}
// Set DM token in config and trigger completion callback with the
// corresponding result.
auto config_result =
configuration->SetDMToken(dm_token_result.value());
// Fail on error
if (!config_result.ok()) {
std::move(completion_cb).Run(base::unexpected(config_result));
return;
}
// Success, run completion callback with updated config
std::move(completion_cb).Run(std::move(configuration));
},
std::move(configuration), std::move(completion_cb)));
}
#if !BUILDFLAG(IS_CHROMEOS)
// Uploader is passed to Storage in order to upload messages using
// `UploadClient`.
class ReportingClient::Uploader : public UploaderInterface {
public:
using UploadCallback = base::OnceCallback<
Status(bool, std::vector<EncryptedRecord>, ScopedReservation)>;
static std::unique_ptr<Uploader> Create(bool need_encryption_key,
UploadCallback upload_callback) {
return base::WrapUnique(
new Uploader(need_encryption_key, std::move(upload_callback)));
}
~Uploader() override = default;
Uploader(const Uploader& other) = delete;
Uploader& operator=(const Uploader& other) = delete;
void ProcessRecord(EncryptedRecord data,
ScopedReservation scoped_reservation,
base::OnceCallback<void(bool)> processed_cb) override {
helper_.AsyncCall(&Helper::ProcessRecord)
.WithArgs(std::move(data), std::move(scoped_reservation),
std::move(processed_cb));
}
void ProcessGap(SequenceInformation start,
uint64_t count,
base::OnceCallback<void(bool)> processed_cb) override {
helper_.AsyncCall(&Helper::ProcessGap)
.WithArgs(std::move(start), count, std::move(processed_cb));
}
void Completed(Status final_status) override {
helper_.AsyncCall(&Helper::Completed).WithArgs(final_status);
}
private:
// Helper class that performs actions, wrapped in SequenceBound by |Uploader|.
class Helper {
public:
Helper(bool need_encryption_key, UploadCallback upload_callback);
Helper(const Helper& other) = delete;
Helper& operator=(const Helper& other) = delete;
void ProcessRecord(EncryptedRecord data,
ScopedReservation scoped_reservation,
base::OnceCallback<void(bool)> processed_cb);
void ProcessGap(SequenceInformation start,
uint64_t count,
base::OnceCallback<void(bool)> processed_cb);
void Completed(Status final_status);
private:
bool completed_ GUARDED_BY_CONTEXT(sequence_checker_){false};
const bool need_encryption_key_;
std::vector<EncryptedRecord> encrypted_records_;
ScopedReservation encrypted_records_reservation_;
SEQUENCE_CHECKER(sequence_checker_);
UploadCallback upload_callback_;
};
Uploader(bool need_encryption_key, UploadCallback upload_callback)
: helper_(base::ThreadPool::CreateSequencedTaskRunner({}),
need_encryption_key,
std::move(upload_callback)) {}
base::SequenceBound<Helper> helper_;
};
ReportingClient::Uploader::Helper::Helper(
bool need_encryption_key,
ReportingClient::Uploader::UploadCallback upload_callback)
: need_encryption_key_(need_encryption_key),
upload_callback_(std::move(upload_callback)) {
// Constructor is called on the task assigned by |SequenceBound|.
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
}
void ReportingClient::Uploader::Helper::ProcessRecord(
EncryptedRecord data,
ScopedReservation scoped_reservation,
base::OnceCallback<void(bool)> processed_cb) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (completed_) {
std::move(processed_cb).Run(false);
return;
}
encrypted_records_.emplace_back(std::move(data));
encrypted_records_reservation_.HandOver(scoped_reservation);
std::move(processed_cb).Run(true);
}
void ReportingClient::Uploader::Helper::ProcessGap(
SequenceInformation start,
uint64_t count,
base::OnceCallback<void(bool)> processed_cb) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (completed_) {
std::move(processed_cb).Run(false);
return;
}
for (uint64_t i = 0; i < count; ++i) {
encrypted_records_.emplace_back();
*encrypted_records_.rbegin()->mutable_sequence_information() = start;
start.set_sequencing_id(start.sequencing_id() + 1);
}
std::move(processed_cb).Run(true);
}
void ReportingClient::Uploader::Helper::Completed(Status final_status) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!final_status.ok()) {
// No work to do - something went wrong with storage and it no longer
// wants to upload the records. Let the records die with |this|.
return;
}
if (completed_) {
// Upload has already been invoked. Return.
return;
}
completed_ = true;
if (encrypted_records_.empty() && !need_encryption_key_) {
return;
}
CHECK(upload_callback_);
Status upload_status =
std::move(upload_callback_)
.Run(need_encryption_key_, std::move(encrypted_records_),
std::move(encrypted_records_reservation_));
if (!upload_status.ok()) {
LOG(ERROR) << "Unable to upload records: " << upload_status;
}
}
// static
std::unique_ptr<EncryptedReportingUploadProvider>
ReportingClient::CreateLocalUploadProvider(
scoped_refptr<StorageModuleInterface> storage_module) {
// Note: access local storage inside the callbacks, because it may be not
// yet stored in the client at the moment EncryptedReportingUploadProvider
// is instantiated.
return std::make_unique<EncryptedReportingUploadProvider>(
base::BindPostTask(
ReportQueueProvider::GetInstance()->sequenced_task_runner(),
StorageSelector::GetLocalReportSuccessfulUploadCb(storage_module)),
base::BindPostTask(
ReportQueueProvider::GetInstance()->sequenced_task_runner(),
StorageSelector::GetLocalEncryptionKeyAttachedCb(storage_module)),
// The configuration file feature is only available in CrOS.
/*update_config_in_missive_cb=*/base::DoNothing());
}
// static
void ReportingClient::AsyncStartUploader(
base::WeakPtr<ReportQueueProvider> instance,
UploaderInterface::UploadReason reason,
UploaderInterface::UploaderInterfaceResultCb start_uploader_cb) {
if (!instance) {
std::move(start_uploader_cb)
.Run(base::unexpected(
Status(error::UNAVAILABLE, "Client not available")));
base::UmaHistogramEnumeration(
reporting::kUmaUnavailableErrorReason,
UnavailableErrorReason::REPORTING_CLIENT_IS_NULL,
UnavailableErrorReason::MAX_VALUE);
return;
}
auto* const client = static_cast<ReportingClient*>(instance.get());
CHECK(client);
client->DeliverAsyncStartUploader(reason, std::move(start_uploader_cb));
}
void ReportingClient::DeliverAsyncStartUploader(
UploaderInterface::UploadReason reason,
UploaderInterface::UploaderInterfaceResultCb start_uploader_cb) {
if (!upload_provider_) {
// If non-missived uploading is enabled, it will need upload
// provider. In case of missived Uploader will be provided by
// EncryptedReportingServiceProvider so it does not need to be
// enabled here.
if (StorageSelector::is_use_missive() ||
storage() == nullptr // report queue provider is not (yet?) ready
) {
std::move(start_uploader_cb)
.Run(base::unexpected(
Status(error::UNAVAILABLE, "Uploader not available")));
base::UmaHistogramEnumeration(
reporting::kUmaUnavailableErrorReason,
UnavailableErrorReason::UPLOAD_PROVIDER_IS_NULL,
UnavailableErrorReason::MAX_VALUE);
return;
}
upload_provider_ = CreateLocalUploadProvider(storage());
}
auto uploader = Uploader::Create(
/*need_encryption_key=*/
reason == UploaderInterface::UploadReason::KEY_DELIVERY,
base::BindOnce(
[](base::WeakPtr<EncryptedReportingUploadProvider> upload_provider,
bool need_encryption_key, std::vector<EncryptedRecord> records,
ScopedReservation scoped_reservation) {
if (!upload_provider) {
base::UmaHistogramEnumeration(
reporting::kUmaUnavailableErrorReason,
UnavailableErrorReason::UPLOAD_PROVIDER_IS_NULL,
UnavailableErrorReason::MAX_VALUE);
return Status{error::UNAVAILABLE, "Uploader not available"};
}
upload_provider->RequestUploadEncryptedRecords(
need_encryption_key, std::move(records),
std::move(scoped_reservation), base::DoNothing());
return Status::StatusOK();
},
upload_provider_->GetWeakPtr()));
std::move(start_uploader_cb).Run(std::move(uploader));
}
#endif // !BUILDFLAG(IS_CHROMEOS)
} // namespace reporting