blob: 64af2681fa1d6a599fe3ee8f4b4c00c7fb489f0e [file] [log] [blame]
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chrome/browser/policy/messaging_layer/upload/record_handler_impl.h"
#include <utility>
#include "base/base64.h"
#include "base/bind.h"
#include "base/callback.h"
#include "base/containers/queue.h"
#include "base/json/json_reader.h"
#include "base/optional.h"
#include "base/sequenced_task_runner.h"
#include "base/strings/strcat.h"
#include "base/strings/string_number_conversions.h"
#include "base/task/post_task.h"
#include "base/task_runner.h"
#include "base/values.h"
#include "chrome/browser/policy/messaging_layer/upload/dm_server_upload_service.h"
#include "chrome/browser/policy/messaging_layer/upload/record_upload_request_builder.h"
#include "chrome/browser/policy/messaging_layer/util/status.h"
#include "chrome/browser/policy/messaging_layer/util/status_macros.h"
#include "chrome/browser/policy/messaging_layer/util/statusor.h"
#include "chrome/browser/policy/messaging_layer/util/task_runner_context.h"
#include "chrome/browser/profiles/profile_manager.h"
#include "chrome/browser/profiles/reporting_util.h"
#include "components/policy/core/common/cloud/cloud_policy_client.h"
#include "components/policy/proto/record.pb.h"
#include "components/policy/proto/record_constants.pb.h"
#include "content/public/browser/browser_task_traits.h"
#include "content/public/browser/browser_thread.h"
namespace reporting {
// ReportUploader handles enqueuing events on the |report_queue_|,
// and uploading those events with the |client_|.
class RecordHandlerImpl::ReportUploader
: public TaskRunnerContext<DmServerUploadService::CompletionResponse> {
public:
ReportUploader(
bool need_encryption_key,
std::unique_ptr<std::vector<EncryptedRecord>> records,
policy::CloudPolicyClient* client,
DmServerUploadService::CompletionCallback upload_complete_cb,
DmServerUploadService::EncryptionKeyAttachedCallback
encryption_key_attached_cb,
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner);
private:
~ReportUploader() override;
void OnStart() override;
void StartUpload(const EncryptedRecord& encrypted_record);
void OnUploadComplete(base::Optional<base::Value> response);
void HandleFailedUpload();
void HandleSuccessfulUpload();
void Complete(DmServerUploadService::CompletionResponse result);
// Returns a gap record if it is necessary. Expects the contents of the
// failedUploadedRecord field in the response:
// {
// "sequencingId": 1234
// "generationId": 4321
// "priority": 3
// }
base::Optional<EncryptedRecord> HandleFailedUploadedSequencingInformation(
const base::Value& sequencing_information);
// Helper function for converting a base::Value representation of
// SequencingInformation into a proto. Will return an INVALID_ARGUMENT error
// if the base::Value is not convertable.
StatusOr<SequencingInformation> SequencingInformationValueToProto(
const base::Value& value);
bool need_encryption_key_;
std::unique_ptr<std::vector<EncryptedRecord>> records_;
policy::CloudPolicyClient* client_;
// Encryption key delivery callback.
DmServerUploadService::EncryptionKeyAttachedCallback
encryption_key_attached_cb_;
// Last successful response to be processed.
// Note: I could not find a way to pass it as a parameter,
// so it is a class member variable. |last_response_| must be processed before
// any attempt to retry calling the client, otherwise it will be overwritten.
base::Value last_response_;
// When a record fails to be processed on the server, |ReportUploader| creates
// a gap record to upload in its place.
EncryptedRecord gap_record_;
// Set for the highest record being uploaded.
base::Optional<SequencingInformation> highest_sequencing_information_;
};
RecordHandlerImpl::ReportUploader::ReportUploader(
bool need_encryption_key,
std::unique_ptr<std::vector<EncryptedRecord>> records,
policy::CloudPolicyClient* client,
DmServerUploadService::CompletionCallback client_cb,
DmServerUploadService::EncryptionKeyAttachedCallback
encryption_key_attached_cb,
scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner)
: TaskRunnerContext<DmServerUploadService::CompletionResponse>(
std::move(client_cb),
sequenced_task_runner),
need_encryption_key_(need_encryption_key),
records_(std::move(records)),
client_(client),
encryption_key_attached_cb_(encryption_key_attached_cb) {}
RecordHandlerImpl::ReportUploader::~ReportUploader() = default;
void RecordHandlerImpl::ReportUploader::OnStart() {
if (client_ == nullptr) {
Status null_client = Status(error::INVALID_ARGUMENT, "Client was null");
LOG(ERROR) << null_client;
Complete(null_client);
return;
}
if (records_ == nullptr) {
Status null_records = Status(error::INVALID_ARGUMENT, "records_ was null");
LOG(ERROR) << null_records;
Complete(null_records);
return;
}
if (records_->empty()) {
Status empty_records =
Status(error::INVALID_ARGUMENT, "records_ was empty");
LOG(ERROR) << empty_records;
Complete(empty_records);
return;
}
// We'll be popping records off the back.
std::reverse(records_->begin(), records_->end());
StartUpload(records_->back());
}
void RecordHandlerImpl::ReportUploader::StartUpload(
const EncryptedRecord& encrypted_record) {
auto response_cb =
base::BindOnce(&RecordHandlerImpl::ReportUploader::OnUploadComplete,
base::Unretained(this));
auto request_result =
UploadEncryptedReportingRequestBuilder(need_encryption_key_)
.AddRecord(encrypted_record)
.Build();
if (!request_result.has_value()) {
std::move(response_cb).Run(base::nullopt);
return;
}
base::Value request = std::move(request_result.value());
base::PostTask(
FROM_HERE, {content::BrowserThread::UI},
base::BindOnce(
[](policy::CloudPolicyClient* client, base::Value request,
base::OnceCallback<void(base::Optional<base::Value>)>
response_cb) {
client->UploadEncryptedReport(
std::move(request),
reporting::GetContext(ProfileManager::GetPrimaryUserProfile()),
std::move(response_cb));
},
client_, std::move(request), std::move(response_cb)));
}
void RecordHandlerImpl::ReportUploader::OnUploadComplete(
base::Optional<base::Value> response) {
if (!response.has_value()) {
Schedule(&RecordHandlerImpl::ReportUploader::HandleFailedUpload,
base::Unretained(this));
return;
}
last_response_ = std::move(response.value());
Schedule(&RecordHandlerImpl::ReportUploader::HandleSuccessfulUpload,
base::Unretained(this));
}
void RecordHandlerImpl::ReportUploader::HandleFailedUpload() {
Status data_loss = Status(
error::DATA_LOSS,
base::StrCat({"Record failed uploaded: ",
records_->back().sequencing_information().DebugString()}));
LOG(ERROR) << data_loss;
if (highest_sequencing_information_.has_value()) {
Complete(std::move(highest_sequencing_information_.value()));
return;
}
Complete(data_loss);
}
void RecordHandlerImpl::ReportUploader::HandleSuccessfulUpload() {
// Decypher 'response' containing a base::Value dictionary that looks like:
// {
// "lastSucceedUploadedRecord": ... // SequencingInformation proto
// "firstFailedUploadedRecord": {
// "failedUploadedRecord": ... // SequencingInformation proto
// "failureStatus": ... // Status proto
// }
// "encryptionSettings": ... // EncryptionSettings proto
// }
const base::Value* last_succeed_uploaded_record =
last_response_.FindDictKey("lastSucceedUploadedRecord");
if (last_succeed_uploaded_record != nullptr) {
auto seq_info_result =
SequencingInformationValueToProto(*last_succeed_uploaded_record);
if (seq_info_result.ok()) {
highest_sequencing_information_ = std::move(seq_info_result.ValueOrDie());
} else {
LOG(ERROR) << "Server responded with an invalid SequencingInformation "
"for lastSucceedUploadedRecord:"
<< *last_succeed_uploaded_record;
}
}
// Handle the encryption settings.
// Note: server can attach it to response regardless of whether
// the response indicates success or failure, and whether the client
// set attach_encryption_settings to true in request.
const base::Value* signed_encryption_key_record =
last_response_.FindDictKey("encryptionSettings");
if (signed_encryption_key_record != nullptr) {
const std::string* public_key_str =
signed_encryption_key_record->FindStringKey("publicKey");
const auto public_key_id_result =
signed_encryption_key_record->FindIntKey("publicKeyId");
// TODO(b/170054326): Make signature mandatory too.
// const std::string* public_key_signature_str =
// signed_encryption_key_record->FindStringKey("publicKeySignature");
std::string public_key;
std::string public_key_signature;
if (public_key_str != nullptr &&
base::Base64Decode(*public_key_str, &public_key) &&
// TODO(b/170054326): Make signature mandatory too.
// public_key_signature_str != nullptr
// base::Base64Decode(*public_key_signature_str,
// &public_key_signature) &&
public_key_id_result.has_value()) {
SignedEncryptionInfo signed_encryption_key;
signed_encryption_key.set_public_asymmetric_key(public_key);
signed_encryption_key.set_public_key_id(public_key_id_result.value());
signed_encryption_key.set_signature(public_key_signature);
encryption_key_attached_cb_.Run(signed_encryption_key);
need_encryption_key_ = false;
}
}
// Check if the previous record was unprocessable on the server.
const base::Value* failed_uploaded_record = last_response_.FindDictPath(
"firstFailedUploadedRecord.failedUploadedRecord");
if (failed_uploaded_record != nullptr) {
// The record we uploaded previously was unprocessable by the server, if the
// record was after the current |highest_sequencing_information_| we should
// return a gap record. A gap record consists of an EncryptedRecord with
// just SequencingInformation. The server will report success for the gap
// record and |highest_sequencing_information_| will be updated in the next
// response. In the future there may be recoverable |failureStatus|, but
// for now all the device can do is delete the record.
auto gap_record_result =
HandleFailedUploadedSequencingInformation(*failed_uploaded_record);
if (gap_record_result.has_value()) {
gap_record_ = std::move(gap_record_result.value());
LOG(ERROR) << "Data Loss. Record was unprocessable by the server: "
<< *failed_uploaded_record;
StartUpload(gap_record_);
return;
}
}
// Pop the last record that was processed.
records_->pop_back();
if (!records_->empty()) {
// Upload the next record but do not request encryption key again.
StartUpload(records_->back());
return;
}
// No more records to process. Return the highest_sequencing_information_ if
// available.
if (highest_sequencing_information_.has_value()) {
Complete(highest_sequencing_information_.value());
return;
}
Complete(Status(error::INTERNAL, "Unable to upload any records"));
}
base::Optional<EncryptedRecord>
RecordHandlerImpl::ReportUploader::HandleFailedUploadedSequencingInformation(
const base::Value& sequencing_information) {
if (!highest_sequencing_information_.has_value()) {
LOG(ERROR) << "highest_sequencing_information_ has no value.";
return base::nullopt;
}
auto seq_info_result =
SequencingInformationValueToProto(sequencing_information);
if (!seq_info_result.ok()) {
LOG(ERROR) << "Server responded with an invalid SequencingInformation for "
"firstFailedUploadedRecord.failedUploadedRecord:"
<< sequencing_information;
return base::nullopt;
}
SequencingInformation& seq_info = seq_info_result.ValueOrDie();
// |seq_info| should be of the same generation and priority as
// highest_sequencing_information_, and have the next sequencing_id.
if (seq_info.generation_id() !=
highest_sequencing_information_->generation_id() ||
seq_info.priority() != highest_sequencing_information_->priority() ||
seq_info.sequencing_id() !=
highest_sequencing_information_->sequencing_id() + 1) {
return base::nullopt;
}
// Build a gap record and return it.
EncryptedRecord encrypted_record;
*encrypted_record.mutable_sequencing_information() = std::move(seq_info);
return encrypted_record;
}
void RecordHandlerImpl::ReportUploader::Complete(
DmServerUploadService::CompletionResponse completion_result) {
Schedule(&RecordHandlerImpl::ReportUploader::Response, base::Unretained(this),
completion_result);
}
StatusOr<SequencingInformation>
RecordHandlerImpl::ReportUploader::SequencingInformationValueToProto(
const base::Value& value) {
const std::string* sequencing_id = value.FindStringKey("sequencingId");
const std::string* generation_id = value.FindStringKey("generationId");
const auto priority = value.FindIntKey("priority");
// If any of the previous values don't exist, or are malformed, return error.
int64_t seq_id;
int64_t gen_id;
if (!sequencing_id || sequencing_id->empty() || !generation_id ||
generation_id->empty() || !priority.has_value() ||
!Priority_IsValid(priority.value()) ||
!base::StringToInt64(*sequencing_id, &seq_id) ||
!base::StringToInt64(*generation_id, &gen_id)) {
return Status(error::INVALID_ARGUMENT,
base::StrCat({"Provided value did not conform to a valid "
"SequencingInformation proto: ",
value.DebugString()}));
}
SequencingInformation proto;
proto.set_sequencing_id(seq_id);
proto.set_generation_id(gen_id);
proto.set_priority(Priority(priority.value()));
return proto;
}
RecordHandlerImpl::RecordHandlerImpl(policy::CloudPolicyClient* client)
: RecordHandler(client),
sequenced_task_runner_(base::ThreadPool::CreateSequencedTaskRunner({})) {}
RecordHandlerImpl::~RecordHandlerImpl() = default;
void RecordHandlerImpl::HandleRecords(
bool need_encryption_key,
std::unique_ptr<std::vector<EncryptedRecord>> records,
DmServerUploadService::CompletionCallback upload_complete_cb,
DmServerUploadService::EncryptionKeyAttachedCallback
encryption_key_attached_cb) {
Start<RecordHandlerImpl::ReportUploader>(
need_encryption_key, std::move(records), GetClient(),
std::move(upload_complete_cb), encryption_key_attached_cb,
sequenced_task_runner_);
}
} // namespace reporting