blob: 2d015187982a8846ca73a54ec7e2bc15922dfe7b [file] [log] [blame]
// Copyright 2021 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/segmentation_platform/internal/database/signal_database_impl.h"
#include <cstdint>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "base/bind.h"
#include "base/check_op.h"
#include "base/memory/weak_ptr.h"
#include "base/time/clock.h"
#include "base/time/time.h"
#include "base/trace_event/typed_macros.h"
#include "components/leveldb_proto/public/proto_database.h"
#include "components/segmentation_platform/internal/database/metadata_utils.h"
#include "components/segmentation_platform/internal/database/signal_database.h"
#include "components/segmentation_platform/internal/database/signal_key.h"
#include "components/segmentation_platform/internal/proto/signal.pb.h"
#include "components/segmentation_platform/internal/stats.h"
#include "third_party/abseil-cpp/absl/types/optional.h"
namespace segmentation_platform {
namespace {
// TODO(shaktisahu): May be make this a class member for ease of testing.
bool FilterKeyBasedOnRange(proto::SignalType signal_type,
uint64_t name_hash,
base::Time end_time,
base::Time start_time,
const std::string& signal_key) {
DCHECK(start_time <= end_time);
SignalKey key;
if (!SignalKey::FromBinary(signal_key, &key))
return false;
DCHECK(key.IsValid());
if (key.kind() != metadata_utils::SignalTypeToSignalKind(signal_type) ||
key.name_hash() != name_hash) {
return false;
}
// Check if the key range is contained within the given range.
return key.range_end() <= end_time && start_time <= key.range_start();
}
leveldb_proto::Enums::KeyIteratorAction GetSamplesIteratorController(
const SignalKey& start_key,
base::Time end_time,
const std::string& key_bytes) {
SignalKey key;
if (!SignalKey::FromBinary(key_bytes, &key))
return leveldb_proto::Enums::kSkipAndStop;
DCHECK(key.IsValid());
if (start_key.kind() != key.kind() ||
start_key.name_hash() != key.name_hash()) {
// This key is for a different signal.
return leveldb_proto::Enums::kSkipAndStop;
}
if (key.range_start() > end_time) {
// All samples under this key are too fresh.
return leveldb_proto::Enums::kSkipAndStop;
}
if (key.range_end() > end_time) {
// This is the last key with relevant samples.
return leveldb_proto::Enums::kLoadAndStop;
}
return leveldb_proto::Enums::kLoadAndContinue;
}
} // namespace
SignalDatabaseImpl::SignalDatabaseImpl(std::unique_ptr<SignalProtoDb> database,
base::Clock* clock)
: database_(std::move(database)), clock_(clock) {}
SignalDatabaseImpl::~SignalDatabaseImpl() = default;
void SignalDatabaseImpl::Initialize(SuccessCallback callback) {
database_->Init(
leveldb_proto::CreateSimpleOptions(),
base::BindOnce(&SignalDatabaseImpl::OnDatabaseInitialized,
weak_ptr_factory_.GetWeakPtr(), std::move(callback)));
}
void SignalDatabaseImpl::WriteSample(proto::SignalType signal_type,
uint64_t name_hash,
absl::optional<int32_t> value,
SuccessCallback callback) {
DCHECK(initialized_);
base::Time timestamp = clock_->Now();
SignalKey key(metadata_utils::SignalTypeToSignalKind(signal_type), name_hash,
timestamp, timestamp);
proto::SignalData signal_data;
// If there is another sample with the same signal key, collate both into a
// single DB entry.
if (recently_added_signals_.find(key) != recently_added_signals_.end())
signal_data = recently_added_signals_[key];
proto::Sample* sample = signal_data.add_samples();
if (value.has_value())
sample->set_value(value.value());
// Convert to delta from UTC midnight. This results in smaller values thereby
// requiring less storage space in the DB.
base::TimeDelta midnight_delta = timestamp - timestamp.UTCMidnight();
sample->set_time_sec_delta(midnight_delta.InSeconds());
recently_added_signals_[key] = signal_data;
// Write as a new db entry.
auto entries_to_save = std::make_unique<
std::vector<std::pair<std::string, proto::SignalData>>>();
auto keys_to_delete = std::make_unique<std::vector<std::string>>();
entries_to_save->emplace_back(
std::make_pair(key.ToBinary(), std::move(signal_data)));
database_->UpdateEntries(std::move(entries_to_save),
std::move(keys_to_delete), std::move(callback));
CleanupStaleCachedEntries(timestamp);
}
void SignalDatabaseImpl::GetSamples(proto::SignalType signal_type,
uint64_t name_hash,
base::Time start_time,
base::Time end_time,
SamplesCallback callback) {
TRACE_EVENT("segmentation_platform", "SignalDatabaseImpl::GetSamples");
DCHECK(initialized_);
DCHECK_LE(start_time, end_time);
const SignalKey start_key(metadata_utils::SignalTypeToSignalKind(signal_type),
name_hash, start_time, base::Time());
database_->LoadKeysAndEntriesWhile(
start_key.GetPrefixInBinary(),
base::BindRepeating(&GetSamplesIteratorController, start_key, end_time),
base::BindOnce(&SignalDatabaseImpl::OnGetSamples,
weak_ptr_factory_.GetWeakPtr(), std::move(callback),
start_time, end_time));
}
void SignalDatabaseImpl::OnGetSamples(
SamplesCallback callback,
base::Time start_time,
base::Time end_time,
bool success,
std::unique_ptr<std::map<std::string, proto::SignalData>> entries) {
TRACE_EVENT("segmentation_platform", "SignalDatabaseImpl::OnGetSamples");
std::vector<Sample> out;
if (!success || !entries) {
stats::RecordSignalDatabaseGetSamplesResult(/* success = */ false);
std::move(callback).Run(out);
return;
}
stats::RecordSignalDatabaseGetSamplesResult(/* success = */ true);
stats::RecordSignalDatabaseGetSamplesDatabaseEntryCount(
entries.get()->size());
for (const auto& pair : *entries.get()) {
SignalKey key;
if (!SignalKey::FromBinary(pair.first, &key))
continue;
DCHECK(key.IsValid());
// TODO(shaktisahu): Remove DCHECK and collect UMA.
const auto& signal_data = pair.second;
base::Time midnight = key.range_start().UTCMidnight();
for (int i = 0; i < signal_data.samples_size(); ++i) {
const auto& sample = signal_data.samples(i);
base::Time timestamp = midnight + base::Seconds(sample.time_sec_delta());
if (timestamp < start_time || timestamp > end_time)
continue;
out.emplace_back(std::make_pair(timestamp, sample.value()));
}
}
stats::RecordSignalDatabaseGetSamplesSampleCount(out.size());
std::move(callback).Run(out);
}
void SignalDatabaseImpl::DeleteSamples(proto::SignalType signal_type,
uint64_t name_hash,
base::Time end_time,
SuccessCallback callback) {
TRACE_EVENT("segmentation_platform", "SignalDatabaseImpl::DeleteSamples");
DCHECK(initialized_);
SignalKey dummy_key(metadata_utils::SignalTypeToSignalKind(signal_type),
name_hash, base::Time(), base::Time());
std::string key_prefix = dummy_key.GetPrefixInBinary();
database_->LoadKeysAndEntriesWithFilter(
base::BindRepeating(&FilterKeyBasedOnRange, signal_type, name_hash,
end_time, base::Time()),
leveldb::ReadOptions(), key_prefix,
base::BindOnce(&SignalDatabaseImpl::OnGetSamplesForDeletion,
weak_ptr_factory_.GetWeakPtr(), std::move(callback)));
}
void SignalDatabaseImpl::OnGetSamplesForDeletion(
SuccessCallback callback,
bool success,
std::unique_ptr<std::map<std::string, proto::SignalData>> entries) {
TRACE_EVENT("segmentation_platform",
"SignalDatabaseImpl::OnGetSamplesForDeletion");
if (!success || !entries) {
std::move(callback).Run(success);
return;
}
auto entries_to_save = std::make_unique<
std::vector<std::pair<std::string, proto::SignalData>>>();
auto keys_to_delete = std::make_unique<std::vector<std::string>>();
// Collect the keys to be deleted.
for (const auto& pair : *entries.get()) {
keys_to_delete->emplace_back(pair.first);
}
// Write to DB.
database_->UpdateEntries(std::move(entries_to_save),
std::move(keys_to_delete), std::move(callback));
}
void SignalDatabaseImpl::CompactSamplesForDay(proto::SignalType signal_type,
uint64_t name_hash,
base::Time day_start_time,
SuccessCallback callback) {
TRACE_EVENT("segmentation_platform",
"SignalDatabaseImpl::CompactSamplesForDay");
DCHECK(initialized_);
// Compact the signals between 00:00:00AM to 23:59:59PM.
day_start_time = day_start_time.UTCMidnight();
base::Time day_end_time = day_start_time + base::Days(1) - base::Seconds(1);
SignalKey compact_key(metadata_utils::SignalTypeToSignalKind(signal_type),
name_hash, day_end_time, day_start_time);
database_->LoadKeysAndEntriesWithFilter(
base::BindRepeating(&FilterKeyBasedOnRange, signal_type, name_hash,
day_end_time, day_start_time),
base::BindOnce(&SignalDatabaseImpl::OnGetSamplesForCompaction,
weak_ptr_factory_.GetWeakPtr(), std::move(callback),
compact_key.ToBinary()));
}
void SignalDatabaseImpl::OnGetSamplesForCompaction(
SuccessCallback callback,
std::string compact_key,
bool success,
std::unique_ptr<std::map<std::string, proto::SignalData>> entries) {
TRACE_EVENT("segmentation_platform",
"SignalDatabaseImpl::OnGetSamplesForCompaction");
if (!success || !entries || entries->empty()) {
std::move(callback).Run(success);
return;
}
// We found one or more entries for the day. Let's compact them.
auto keys_to_delete = std::make_unique<std::vector<std::string>>();
// Aggregate samples under a new proto. Delete the old entries.
proto::SignalData compact;
for (const auto& pair : *entries.get()) {
const auto& signal_data = pair.second;
for (int i = 0; i < signal_data.samples_size(); i++) {
auto* new_sample = compact.add_samples();
new_sample->CopyFrom(signal_data.samples(i));
}
keys_to_delete->emplace_back(pair.first);
}
// Write to DB.
auto entries_to_save = std::make_unique<
std::vector<std::pair<std::string, proto::SignalData>>>();
entries_to_save->emplace_back(
std::make_pair(compact_key, std::move(compact)));
database_->UpdateEntries(std::move(entries_to_save),
std::move(keys_to_delete), std::move(callback));
}
void SignalDatabaseImpl::OnDatabaseInitialized(
SuccessCallback callback,
leveldb_proto::Enums::InitStatus status) {
initialized_ = status == leveldb_proto::Enums::InitStatus::kOK;
std::move(callback).Run(status == leveldb_proto::Enums::InitStatus::kOK);
}
void SignalDatabaseImpl::CleanupStaleCachedEntries(
base::Time current_timestamp) {
base::Time prev_second = current_timestamp - base::Seconds(1);
std::vector<SignalKey> keys_to_delete;
for (const auto& entry : recently_added_signals_) {
if (entry.first.range_end() < prev_second)
keys_to_delete.emplace_back(entry.first);
}
for (const auto& cache_key : keys_to_delete)
recently_added_signals_.erase(cache_key);
}
} // namespace segmentation_platform