blob: d65f9d1bbb072b982481a3dfd346d6347ee4de67 [file] [log] [blame]
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "content/browser/attribution_reporting/attribution_storage_sql.h"
#include <stdint.h>
#include <string>
#include <utility>
#include "base/bind.h"
#include "base/check_op.h"
#include "base/compiler_specific.h"
#include "base/containers/flat_map.h"
#include "base/containers/flat_set.h"
#include "base/files/file_util.h"
#include "base/guid.h"
#include "base/ignore_result.h"
#include "base/logging.h"
#include "base/metrics/histogram_functions.h"
#include "base/metrics/histogram_macros.h"
#include "base/time/clock.h"
#include "base/time/time.h"
#include "content/browser/attribution_reporting/attribution_report.h"
#include "content/browser/attribution_reporting/attribution_storage_sql_migrations.h"
#include "content/browser/attribution_reporting/sql_utils.h"
#include "content/browser/attribution_reporting/storable_source.h"
#include "content/browser/attribution_reporting/storable_trigger.h"
#include "sql/database.h"
#include "sql/recovery.h"
#include "sql/statement.h"
#include "sql/transaction.h"
#include "third_party/abseil-cpp/absl/types/optional.h"
#include "url/origin.h"
namespace content {
namespace {
using CreateReportResult = ::content::AttributionStorage::CreateReportResult;
using CreateReportStatus =
::content::AttributionStorage::CreateReportResult::Status;
using DeactivatedSource = ::content::AttributionStorage::DeactivatedSource;
const base::FilePath::CharType kInMemoryPath[] = FILE_PATH_LITERAL(":memory");
const base::FilePath::CharType kDatabasePath[] =
FILE_PATH_LITERAL("Conversions");
// Version number of the database.
//
// Version 1 - 2020/01/27 - https://crrev.com/c/1965450
//
// Version 2 - 2020/11/03 - https://crrev.com/c/2194182
//
// Version 2 adds a new impression column "conversion_destination" based on
// registerable domain which is used for attribution instead of
// "conversion_origin".
//
// Version 3 - 2021/03/08 - https://crrev.com/c/2743337
//
// Version 3 adds new impression columns source_type and attributed_truthfully.
//
// Version 4 - 2021/03/16 - https://crrev.com/c/2716913
//
// Version 4 adds a new rate_limits table.
//
// Version 5 - 2021/04/30 - https://crrev.com/c/2860056
//
// Version 5 drops the conversions.attribution_credit column.
//
// Version 6 - 2021/05/06 - https://crrev.com/c/2878235
//
// Version 6 adds the impressions.priority column.
//
// Version 7 - 2021/06/03 - https://crrev.com/c/2904386
//
// Version 7 adds the impressions.impression_site column.
//
// Version 8 - 2021/06/30 - https://crrev.com/c/2888906
//
// Version 8 changes the conversions.conversion_data and
// impressions.impression_data columns from TEXT to INTEGER and makes
// conversions.impression_id NOT NULL.
//
// Version 9 - 2021/06/30 - https://crrev.com/c/2951620
//
// Version 9 adds the conversions.priority column.
//
// Version 10 - 2021/07/16 - https://crrev.com/c/2983439
//
// Version 10 adds the dedup_keys table.
//
// Version 11 - 2021/08/10 - https://crrev.com/c/3087755
//
// Version 11 replaces impression_site_idx with
// event_source_impression_site_idx, which stores less data.
//
// Version 12 - 2021/08/18 - https://crrev.com/c/3085887
//
// Version 12 adds the rate_limits.bucket and rate_limits.value columns and
// makes rate_limits.rate_limit_id NOT NULL.
//
// Version 13 - 2021/09/08 - https://crrev.com/c/3149550
//
// Version 13 makes the impressions.impression_id and conversions.conversion_id
// columns NOT NULL and AUTOINCREMENT, the latter to prevent ID reuse, which is
// prone to race conditions with the queuing logic vs deletions.
//
// Version 14 - 2021/09/22 - https://crrev.com/c/3138353
//
// Version 14 adds the conversions.failed_send_attempts column.
//
// Version 15 - 2021/11/13 - https://crrev.com/c/3180180
//
// Version 15 adds the conversions.external_report_id column.
const int kCurrentVersionNumber = 15;
// Earliest version which can use a |kCurrentVersionNumber| database
// without failing.
const int kCompatibleVersionNumber = 15;
// Latest version of the database that cannot be upgraded to
// |kCurrentVersionNumber| without razing the database. No versions are
// currently deprecated.
const int kDeprecatedVersionNumber = 0;
void RecordInitializationStatus(
const AttributionStorageSql::InitStatus status) {
base::UmaHistogramEnumeration("Conversions.Storage.Sql.InitStatus2", status);
}
void RecordSourcesDeleted(int count) {
UMA_HISTOGRAM_COUNTS_1000(
"Conversions.ImpressionsDeletedInDataClearOperation", count);
}
void RecordReportsDeleted(int count) {
UMA_HISTOGRAM_COUNTS_1000("Conversions.ReportsDeletedInDataClearOperation",
count);
}
WARN_UNUSED_RESULT int SerializeAttributionLogic(
StorableSource::AttributionLogic val) {
return static_cast<int>(val);
}
WARN_UNUSED_RESULT absl::optional<StorableSource::AttributionLogic>
DeserializeAttributionLogic(int val) {
switch (val) {
case static_cast<int>(StorableSource::AttributionLogic::kNever):
return StorableSource::AttributionLogic::kNever;
case static_cast<int>(StorableSource::AttributionLogic::kTruthfully):
return StorableSource::AttributionLogic::kTruthfully;
case static_cast<int>(StorableSource::AttributionLogic::kFalsely):
return StorableSource::AttributionLogic::kFalsely;
default:
return absl::nullopt;
}
}
WARN_UNUSED_RESULT int SerializeSourceType(StorableSource::SourceType val) {
return static_cast<int>(val);
}
WARN_UNUSED_RESULT absl::optional<StorableSource::SourceType>
DeserializeSourceType(int val) {
switch (val) {
case static_cast<int>(StorableSource::SourceType::kNavigation):
return StorableSource::SourceType::kNavigation;
case static_cast<int>(StorableSource::SourceType::kEvent):
return StorableSource::SourceType::kEvent;
default:
return absl::nullopt;
}
}
struct SourceToAttribute {
StorableSource source;
int num_conversions;
};
WARN_UNUSED_RESULT absl::optional<SourceToAttribute> ReadSourceToAttribute(
sql::Database* db,
StorableSource::Id source_id,
const url::Origin& reporting_origin) {
static constexpr char kReadSourceToAttributeSql[] =
"SELECT impression_origin,impression_time,priority,"
"conversion_origin,attributed_truthfully,source_type,num_conversions,"
"impression_data,expiry_time "
"FROM impressions "
"WHERE impression_id = ?";
sql::Statement statement(
db->GetCachedStatement(SQL_FROM_HERE, kReadSourceToAttributeSql));
statement.BindInt64(0, *source_id);
if (!statement.Step())
return absl::nullopt;
url::Origin impression_origin = DeserializeOrigin(statement.ColumnString(0));
if (impression_origin.opaque())
return absl::nullopt;
base::Time impression_time = statement.ColumnTime(1);
int64_t priority = statement.ColumnInt64(2);
url::Origin conversion_origin = DeserializeOrigin(statement.ColumnString(3));
if (conversion_origin.opaque())
return absl::nullopt;
absl::optional<StorableSource::AttributionLogic> attribution_logic =
DeserializeAttributionLogic(statement.ColumnInt(4));
// There should never be an unattributed source with `kFalsely`.
if (!attribution_logic.has_value() ||
attribution_logic == StorableSource::AttributionLogic::kFalsely) {
return absl::nullopt;
}
absl::optional<StorableSource::SourceType> source_type =
DeserializeSourceType(statement.ColumnInt(5));
if (!source_type.has_value())
return absl::nullopt;
int num_conversions = statement.ColumnInt(6);
if (num_conversions < 0)
return absl::nullopt;
uint64_t source_event_id = DeserializeUint64(statement.ColumnInt64(7));
base::Time expiry_time = statement.ColumnTime(8);
return SourceToAttribute{
.source = StorableSource(source_event_id, std::move(impression_origin),
std::move(conversion_origin), reporting_origin,
impression_time, expiry_time, *source_type,
priority, *attribution_logic, source_id),
.num_conversions = num_conversions,
};
}
// Helper to deserialize source rows. See `GetActiveSources()` for the
// expected ordering of columns used for the input to this function.
absl::optional<StorableSource> ReadSourceFromStatement(
sql::Statement& statement) {
StorableSource::Id source_id(statement.ColumnInt64(0));
uint64_t source_event_id = DeserializeUint64(statement.ColumnInt64(1));
url::Origin impression_origin = DeserializeOrigin(statement.ColumnString(2));
url::Origin conversion_origin = DeserializeOrigin(statement.ColumnString(3));
url::Origin reporting_origin = DeserializeOrigin(statement.ColumnString(4));
base::Time impression_time = statement.ColumnTime(5);
base::Time expiry_time = statement.ColumnTime(6);
absl::optional<StorableSource::SourceType> source_type =
DeserializeSourceType(statement.ColumnInt(7));
absl::optional<StorableSource::AttributionLogic> attribution_logic =
DeserializeAttributionLogic(statement.ColumnInt(8));
int64_t priority = statement.ColumnInt64(9);
if (!source_type.has_value() || !attribution_logic.has_value())
return absl::nullopt;
return StorableSource(source_event_id, std::move(impression_origin),
std::move(conversion_origin),
std::move(reporting_origin), impression_time,
expiry_time, *source_type, priority, *attribution_logic,
source_id);
}
} // namespace
// static
void AttributionStorageSql::RunInMemoryForTesting() {
g_run_in_memory_ = true;
}
// static
bool AttributionStorageSql::g_run_in_memory_ = false;
AttributionStorageSql::AttributionStorageSql(
const base::FilePath& path_to_database,
std::unique_ptr<Delegate> delegate,
const base::Clock* clock)
: path_to_database_(g_run_in_memory_
? base::FilePath(kInMemoryPath)
: path_to_database.Append(kDatabasePath)),
rate_limit_table_(delegate.get(), clock),
clock_(clock),
delegate_(std::move(delegate)),
weak_factory_(this) {
DCHECK(delegate_);
DETACH_FROM_SEQUENCE(sequence_checker_);
}
AttributionStorageSql::~AttributionStorageSql() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
}
absl::optional<std::vector<DeactivatedSource>>
AttributionStorageSql::DeactivateSources(
const std::string& serialized_conversion_destination,
const std::string& serialized_reporting_origin,
int return_limit) {
std::vector<DeactivatedSource> deactivated_sources;
if (return_limit != 0) {
// Get at most `return_limit` sources that will be deactivated. We do this
// first, instead of using a RETURNING clause in the UPDATE, because we
// cannot limit the number of returned results there, and we want to avoid
// bringing all results into memory.
static constexpr char kGetSourcesToReturnSql[] =
"SELECT impression_id,impression_data,impression_origin,"
"conversion_origin,reporting_origin,impression_time,expiry_time,"
"source_type,attributed_truthfully,priority "
"FROM impressions "
DCHECK_SQL_INDEXED_BY("conversion_destination_idx")
"WHERE conversion_destination = ? AND reporting_origin = ? AND "
"active = 1 AND num_conversions > 0 LIMIT ?";
sql::Statement get_statement(
db_->GetCachedStatement(SQL_FROM_HERE, kGetSourcesToReturnSql));
get_statement.BindString(0, serialized_conversion_destination);
get_statement.BindString(1, serialized_reporting_origin);
get_statement.BindInt(2, return_limit);
while (get_statement.Step()) {
absl::optional<StorableSource> source =
ReadSourceFromStatement(get_statement);
if (!source.has_value())
return absl::nullopt;
deactivated_sources.emplace_back(
std::move(*source),
DeactivatedSource::Reason::kReplacedByNewerSource);
}
if (!get_statement.Succeeded())
return absl::nullopt;
// If nothing was returned, we know the UPDATE below will do nothing, so
// just return early.
if (deactivated_sources.empty())
return deactivated_sources;
}
static constexpr char kDeactivateSourcesSql[] =
"UPDATE impressions "
DCHECK_SQL_INDEXED_BY("conversion_destination_idx")
"SET active = 0 "
"WHERE conversion_destination = ? AND reporting_origin = ? AND "
"active = 1 AND num_conversions > 0";
sql::Statement deactivate_statement(
db_->GetCachedStatement(SQL_FROM_HERE, kDeactivateSourcesSql));
deactivate_statement.BindString(0, serialized_conversion_destination);
deactivate_statement.BindString(1, serialized_reporting_origin);
if (!deactivate_statement.Run())
return absl::nullopt;
for (auto& deactivated_source : deactivated_sources) {
absl::optional<std::vector<int64_t>> dedup_keys =
ReadDedupKeys(*deactivated_source.source.impression_id());
if (!dedup_keys.has_value())
return absl::nullopt;
deactivated_source.source.SetDedupKeys(std::move(*dedup_keys));
}
return deactivated_sources;
}
std::vector<DeactivatedSource> AttributionStorageSql::StoreSource(
const StorableSource& source,
int deactivated_source_return_limit) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// Force the creation of the database if it doesn't exist, as we need to
// persist the source.
if (!LazyInit(DbCreationPolicy::kCreateIfAbsent))
return {};
// Only delete expired impressions periodically to avoid excessive DB
// operations.
const base::TimeDelta delete_frequency =
delegate_->GetDeleteExpiredSourcesFrequency();
DCHECK_GE(delete_frequency, base::TimeDelta());
const base::Time now = clock_->Now();
if (now - last_deleted_expired_sources_ >= delete_frequency) {
if (!DeleteExpiredSources())
return {};
last_deleted_expired_sources_ = now;
}
// TODO(csharrison): Thread this failure to the caller and report a console
// error.
const std::string serialized_impression_origin =
SerializeOrigin(source.impression_origin());
if (!HasCapacityForStoringSource(serialized_impression_origin))
return {};
// Wrap the deactivation and insertion in the same transaction. If the
// deactivation fails, we do not want to store the new source as we may
// return the wrong set of sources for a trigger.
sql::Transaction transaction(db_.get());
if (!transaction.Begin())
return {};
if (!EnsureCapacityForPendingDestinationLimit(source))
return {};
const std::string serialized_conversion_destination =
source.ConversionDestination().Serialize();
const std::string serialized_reporting_origin =
SerializeOrigin(source.reporting_origin());
// In the case where we get a new source for a given <reporting_origin,
// conversion_destination> we should mark all active, converted impressions
// with the matching <reporting_origin, conversion_destination> as not active.
absl::optional<std::vector<DeactivatedSource>> deactivated_sources =
DeactivateSources(serialized_conversion_destination,
serialized_reporting_origin,
deactivated_source_return_limit);
if (!deactivated_sources.has_value())
return {};
static constexpr char kInsertImpressionSql[] =
"INSERT INTO impressions"
"(impression_data,impression_origin,conversion_origin,"
"conversion_destination,"
"reporting_origin,impression_time,expiry_time,source_type,"
"attributed_truthfully,priority,impression_site,"
"num_conversions,active)"
"VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)";
sql::Statement statement(
db_->GetCachedStatement(SQL_FROM_HERE, kInsertImpressionSql));
statement.BindInt64(0, SerializeUint64(source.source_event_id()));
statement.BindString(1, serialized_impression_origin);
statement.BindString(2, SerializeOrigin(source.conversion_origin()));
statement.BindString(3, serialized_conversion_destination);
statement.BindString(4, serialized_reporting_origin);
statement.BindTime(5, source.impression_time());
statement.BindTime(6, source.expiry_time());
statement.BindInt(7, SerializeSourceType(source.source_type()));
statement.BindInt(8, SerializeAttributionLogic(source.attribution_logic()));
statement.BindInt64(9, source.priority());
statement.BindString(10, source.ImpressionSite().Serialize());
if (source.attribution_logic() ==
StorableSource::AttributionLogic::kFalsely) {
// Falsely attributed impressions are immediately stored with
// `num_conversions == 1` and `active == 0`, as they will be attributed via
// the below call to `StoreReport()` in the same transaction.
statement.BindInt(11, 1); // num_conversions
statement.BindInt(12, 0); // active
} else {
statement.BindInt(11, 0); // num_conversions
statement.BindInt(12, 1); // active
}
if (!statement.Run())
return {};
if (source.attribution_logic() ==
StorableSource::AttributionLogic::kFalsely) {
DCHECK_EQ(StorableSource::SourceType::kEvent, source.source_type());
StorableSource::Id source_id(db_->GetLastInsertRowId());
uint64_t event_source_trigger_data =
delegate_->GetFakeEventSourceTriggerData();
const base::Time conversion_time = source.impression_time();
const base::Time report_time =
delegate_->GetReportTime(source, conversion_time);
AttributionReport report(source, event_source_trigger_data,
/*conversion_time=*/conversion_time,
/*report_time=*/report_time,
/*priority=*/0,
/*external_report_id=*/
delegate_->NewReportID(),
/*conversion_id=*/absl::nullopt);
if (!StoreReport(report, source_id))
return {};
}
if (!transaction.Commit())
return {};
return *deactivated_sources;
}
// Checks whether a new report is allowed to be stored for the given source
// based on `GetMaxAttributionsPerSource()`. If there's sufficient capacity,
// the new report should be stored. Otherwise, if all existing reports were from
// an earlier window, the corresponding source is deactivated and the new
// report should be dropped. Otherwise, If there's insufficient capacity, checks
// the new report's priority against all existing ones for the same source.
// If all existing ones have greater priority, the new report should be dropped;
// otherwise, the existing one with the lowest priority is deleted and the new
// one should be stored.
AttributionStorageSql::MaybeReplaceLowerPriorityReportResult
AttributionStorageSql::MaybeReplaceLowerPriorityReport(
const AttributionReport& report,
int num_conversions,
int64_t conversion_priority,
absl::optional<AttributionReport>& replaced_report) {
DCHECK(report.impression.impression_id().has_value());
DCHECK_GE(num_conversions, 0);
// If there's already capacity for the new report, there's nothing to do.
if (num_conversions <
delegate_->GetMaxAttributionsPerSource(report.impression.source_type())) {
return MaybeReplaceLowerPriorityReportResult::kAddNewReport;
}
// Prioritization is scoped within report windows.
// This is reasonably optimized as is because we only store a ~small number
// of reports per impression_id. Selects the report with lowest priority,
// and uses the greatest conversion_time to break ties. This favors sending
// reports for report closer to the source time.
static constexpr char kMinPrioritySql[] =
"SELECT priority,conversion_id "
"FROM conversions "
"WHERE impression_id = ? AND report_time = ? "
"ORDER BY priority ASC, conversion_time DESC "
"LIMIT 1";
sql::Statement min_priority_statement(
db_->GetCachedStatement(SQL_FROM_HERE, kMinPrioritySql));
min_priority_statement.BindInt64(0,
*report.impression.impression_id().value());
min_priority_statement.BindTime(1, report.report_time);
const bool has_matching_report = min_priority_statement.Step();
if (!min_priority_statement.Succeeded())
return MaybeReplaceLowerPriorityReportResult::kError;
// Deactivate the source as a new report will never be generated in the
// future.
if (!has_matching_report) {
static constexpr char kDeactivateSql[] =
"UPDATE impressions SET active = 0 WHERE impression_id = ?";
sql::Statement deactivate_statement(
db_->GetCachedStatement(SQL_FROM_HERE, kDeactivateSql));
deactivate_statement.BindInt64(0,
*report.impression.impression_id().value());
return deactivate_statement.Run()
? MaybeReplaceLowerPriorityReportResult::
kDropNewReportSourceDeactivated
: MaybeReplaceLowerPriorityReportResult::kError;
}
int64_t min_priority = min_priority_statement.ColumnInt64(0);
AttributionReport::Id conversion_id_with_min_priority(
min_priority_statement.ColumnInt64(1));
// If the new report's priority is less than all existing ones, or if its
// priority is equal to the minimum existing one and it is more recent, drop
// it. We could explicitly check the trigger time here, but it would only
// be relevant in the case of an ill-behaved clock, in which case the rest of
// the attribution functionality would probably also break.
if (conversion_priority <= min_priority) {
return MaybeReplaceLowerPriorityReportResult::kDropNewReport;
}
absl::optional<AttributionReport> replaced =
GetReport(conversion_id_with_min_priority);
if (!replaced.has_value()) {
return MaybeReplaceLowerPriorityReportResult::kError;
}
// Otherwise, delete the existing report with the lowest priority.
if (!DeleteReportInternal(conversion_id_with_min_priority)) {
return MaybeReplaceLowerPriorityReportResult::kError;
}
replaced_report = std::move(replaced);
return MaybeReplaceLowerPriorityReportResult::kReplaceOldReport;
}
CreateReportResult AttributionStorageSql::MaybeCreateAndStoreReport(
const StorableTrigger& trigger) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// We don't bother creating the DB here if it doesn't exist, because it's not
// possible for there to be a matching source if there's no DB.
if (!LazyInit(DbCreationPolicy::kIgnoreIfAbsent)) {
return CreateReportResult(CreateReportStatus::kNoMatchingImpressions);
}
const net::SchemefulSite& conversion_destination =
trigger.conversion_destination();
const std::string serialized_conversion_destination =
conversion_destination.Serialize();
const url::Origin& reporting_origin = trigger.reporting_origin();
DCHECK(!conversion_destination.opaque());
DCHECK(!reporting_origin.opaque());
base::Time current_time = clock_->Now();
// Get all sources that match this <reporting_origin,
// conversion_destination> pair. Only get sources that are active and not
// past their expiry time. The sources are fetched in order so that the
// first one is the one that will be attributed; the others will be deleted.
static constexpr char kGetMatchingSourcesSql[] =
"SELECT impression_id FROM impressions "
DCHECK_SQL_INDEXED_BY("conversion_destination_idx")
"WHERE conversion_destination = ? AND reporting_origin = ? "
"AND active = 1 AND expiry_time > ? "
"ORDER BY priority DESC,impression_time DESC";
sql::Statement matching_sources_statement(
db_->GetCachedStatement(SQL_FROM_HERE, kGetMatchingSourcesSql));
matching_sources_statement.BindString(0, serialized_conversion_destination);
matching_sources_statement.BindString(1, SerializeOrigin(reporting_origin));
matching_sources_statement.BindTime(2, current_time);
// If there are no matching sources, return early.
if (!matching_sources_statement.Step()) {
return CreateReportResult(matching_sources_statement.Succeeded()
? CreateReportStatus::kNoMatchingImpressions
: CreateReportStatus::kInternalError);
}
// The first one returned will be attributed; it has the highest priority.
StorableSource::Id source_id_to_attribute(
matching_sources_statement.ColumnInt64(0));
// Any others will be deleted.
std::vector<StorableSource::Id> source_ids_to_delete;
while (matching_sources_statement.Step()) {
StorableSource::Id source_id(matching_sources_statement.ColumnInt64(0));
source_ids_to_delete.push_back(source_id);
}
// Exit early if the last statement wasn't valid.
if (!matching_sources_statement.Succeeded()) {
return CreateReportResult(CreateReportStatus::kInternalError);
}
switch (ReportAlreadyStored(source_id_to_attribute, trigger.dedup_key())) {
case ReportAlreadyStoredStatus::kNotStored:
break;
case ReportAlreadyStoredStatus::kStored:
return CreateReportResult(CreateReportStatus::kDeduplicated);
case ReportAlreadyStoredStatus::kError:
return CreateReportResult(CreateReportStatus::kInternalError);
}
switch (CapacityForStoringReport(serialized_conversion_destination)) {
case ConversionCapacityStatus::kHasCapacity:
break;
case ConversionCapacityStatus::kNoCapacity:
return CreateReportResult(
CreateReportStatus::kNoCapacityForConversionDestination);
case ConversionCapacityStatus::kError:
return CreateReportResult(CreateReportStatus::kInternalError);
}
absl::optional<SourceToAttribute> source_to_attribute = ReadSourceToAttribute(
db_.get(), source_id_to_attribute, reporting_origin);
// This is only possible if there is a corrupt DB.
if (!source_to_attribute.has_value()) {
return CreateReportResult(CreateReportStatus::kInternalError);
}
const uint64_t trigger_data = source_to_attribute->source.source_type() ==
StorableSource::SourceType::kEvent
? trigger.event_source_trigger_data()
: trigger.trigger_data();
const base::Time report_time =
delegate_->GetReportTime(source_to_attribute->source,
/*trigger_time=*/current_time);
AttributionReport report(std::move(source_to_attribute->source), trigger_data,
/*conversion_time=*/current_time,
/*report_time=*/report_time, trigger.priority(),
/*external_report_id=*/delegate_->NewReportID(),
/*conversion_id=*/absl::nullopt);
switch (
rate_limit_table_.AttributionAllowed(db_.get(), report, current_time)) {
case RateLimitTable::AttributionAllowedStatus::kAllowed:
break;
case RateLimitTable::AttributionAllowedStatus::kNotAllowed:
return CreateReportResult(CreateReportStatus::kRateLimited);
case RateLimitTable::AttributionAllowedStatus::kError:
return CreateReportResult(CreateReportStatus::kInternalError);
}
sql::Transaction transaction(db_.get());
if (!transaction.Begin()) {
return CreateReportResult(CreateReportStatus::kInternalError);
}
absl::optional<AttributionReport> replaced_report;
const auto maybe_replace_lower_priority_report_result =
MaybeReplaceLowerPriorityReport(report,
source_to_attribute->num_conversions,
trigger.priority(), replaced_report);
if (maybe_replace_lower_priority_report_result ==
MaybeReplaceLowerPriorityReportResult::kError) {
return CreateReportResult(CreateReportStatus::kInternalError);
}
if (maybe_replace_lower_priority_report_result ==
MaybeReplaceLowerPriorityReportResult::kDropNewReport ||
maybe_replace_lower_priority_report_result ==
MaybeReplaceLowerPriorityReportResult::
kDropNewReportSourceDeactivated) {
if (!transaction.Commit()) {
return CreateReportResult(CreateReportStatus::kInternalError);
}
return CreateReportResult(
CreateReportStatus::kPriorityTooLow, std::move(report),
maybe_replace_lower_priority_report_result ==
MaybeReplaceLowerPriorityReportResult::
kDropNewReportSourceDeactivated
? absl::make_optional(
DeactivatedSource::Reason::kReachedAttributionLimit)
: absl::nullopt);
}
// Reports with `AttributionLogic::kNever` should be included in all
// attribution operations and matching, but only `kTruthfully` should generate
// reports that get sent.
const bool create_report = report.impression.attribution_logic() ==
StorableSource::AttributionLogic::kTruthfully;
if (create_report) {
DCHECK(report.impression.impression_id().has_value());
if (!StoreReport(report, *report.impression.impression_id())) {
return CreateReportResult(CreateReportStatus::kInternalError);
}
}
// If a dedup key is present, store it. We do this regardless of whether
// `create_report` is true to avoid leaking whether the report was actually
// stored.
if (trigger.dedup_key().has_value()) {
static constexpr char kInsertDedupKeySql[] =
"INSERT INTO dedup_keys(impression_id,dedup_key)VALUES(?,?)";
sql::Statement insert_dedup_key_statement(
db_->GetCachedStatement(SQL_FROM_HERE, kInsertDedupKeySql));
insert_dedup_key_statement.BindInt64(
0, *report.impression.impression_id().value());
insert_dedup_key_statement.BindInt64(1, *trigger.dedup_key());
if (!insert_dedup_key_statement.Run()) {
return CreateReportResult(CreateReportStatus::kInternalError);
}
}
// Only increment the number of conversions associated with the source if
// we are adding a new one, rather than replacing a dropped one.
if (maybe_replace_lower_priority_report_result ==
MaybeReplaceLowerPriorityReportResult::kAddNewReport) {
static constexpr char kUpdateImpressionForConversionSql[] =
"UPDATE impressions SET num_conversions = num_conversions + 1 "
"WHERE impression_id = ?";
sql::Statement impression_update_statement(db_->GetCachedStatement(
SQL_FROM_HERE, kUpdateImpressionForConversionSql));
// Update the attributed source.
impression_update_statement.BindInt64(
0, *report.impression.impression_id().value());
if (!impression_update_statement.Run()) {
return CreateReportResult(CreateReportStatus::kInternalError);
}
}
// Delete all unattributed sources.
if (!DeleteSources(source_ids_to_delete)) {
return CreateReportResult(CreateReportStatus::kInternalError);
}
// Based on the deletion logic here and the fact that we delete sources
// with |num_conversions > 1| when there is a new matching source in
// |StoreSource()|, we should be guaranteed that these sources all
// have |num_conversions == 0|, and that they never contributed to a rate
// limit. Therefore, we don't need to call
// |RateLimitTable::ClearDataForSourceIds()| here.
if (create_report && !rate_limit_table_.AddRateLimit(db_.get(), report)) {
return CreateReportResult(CreateReportStatus::kInternalError);
}
if (!transaction.Commit()) {
return CreateReportResult(CreateReportStatus::kInternalError);
}
if (!create_report) {
return CreateReportResult(CreateReportStatus::kDroppedForNoise,
std::move(report));
}
return CreateReportResult(
maybe_replace_lower_priority_report_result ==
MaybeReplaceLowerPriorityReportResult::kReplaceOldReport
? CreateReportStatus::kSuccessDroppedLowerPriority
: CreateReportStatus::kSuccess,
std::move(replaced_report));
}
bool AttributionStorageSql::StoreReport(const AttributionReport& report,
StorableSource::Id source_id) {
static constexpr char kStoreReportSql[] =
"INSERT INTO conversions"
"(impression_id,conversion_data,conversion_time,report_time,"
"priority,failed_send_attempts,external_report_id)VALUES(?,?,?,?,?,0,?)";
sql::Statement store_report_statement(
db_->GetCachedStatement(SQL_FROM_HERE, kStoreReportSql));
store_report_statement.BindInt64(0, *source_id);
store_report_statement.BindInt64(1, SerializeUint64(report.trigger_data));
store_report_statement.BindTime(2, report.conversion_time);
store_report_statement.BindTime(3, report.report_time);
store_report_statement.BindInt64(4, report.priority);
store_report_statement.BindString(
5, report.external_report_id.AsLowercaseString());
return store_report_statement.Run();
}
namespace {
// Helper to deserialize report rows. See `GetReport()` for the expected
// ordering of columns used for the input to this function.
absl::optional<AttributionReport> ReadReportFromStatement(
sql::Statement& statement) {
uint64_t trigger_data = DeserializeUint64(statement.ColumnInt64(0));
base::Time conversion_time = statement.ColumnTime(1);
base::Time report_time = statement.ColumnTime(2);
AttributionReport::Id conversion_id(statement.ColumnInt64(3));
int64_t conversion_priority = statement.ColumnInt64(4);
int failed_send_attempts = statement.ColumnInt(5);
base::GUID external_report_id =
base::GUID::ParseLowercase(statement.ColumnString(6));
url::Origin impression_origin = DeserializeOrigin(statement.ColumnString(7));
url::Origin conversion_origin = DeserializeOrigin(statement.ColumnString(8));
url::Origin reporting_origin = DeserializeOrigin(statement.ColumnString(9));
uint64_t source_event_id = DeserializeUint64(statement.ColumnInt64(10));
base::Time impression_time = statement.ColumnTime(11);
base::Time expiry_time = statement.ColumnTime(12);
StorableSource::Id source_id(statement.ColumnInt64(13));
absl::optional<StorableSource::SourceType> source_type =
DeserializeSourceType(statement.ColumnInt(14));
int64_t attribution_source_priority = statement.ColumnInt64(15);
absl::optional<StorableSource::AttributionLogic> attribution_logic =
DeserializeAttributionLogic(statement.ColumnInt(16));
// Ensure origins are valid before continuing. This could happen if there is
// database corruption.
// TODO(csharrison): This should be an extremely rare occurrence but it
// would entail that some records will remain in the DB as vestigial if a
// report is never sent. We should delete these entries from the DB.
// TODO(apaseltiner): Should we raze the DB if we've detected corruption?
if (impression_origin.opaque() || conversion_origin.opaque() ||
reporting_origin.opaque() || !source_type.has_value() ||
!attribution_logic.has_value() || failed_send_attempts < 0 ||
!external_report_id.is_valid()) {
return absl::nullopt;
}
// Create the source and AttributionReport objects from the retrieved
// columns.
StorableSource source(source_event_id, std::move(impression_origin),
std::move(conversion_origin),
std::move(reporting_origin), impression_time,
expiry_time, *source_type, attribution_source_priority,
*attribution_logic, source_id);
AttributionReport report(std::move(source), trigger_data, conversion_time,
report_time, conversion_priority,
std::move(external_report_id), conversion_id);
report.failed_send_attempts = failed_send_attempts;
return report;
}
} // namespace
std::vector<AttributionReport> AttributionStorageSql::GetAttributionsToReport(
base::Time max_report_time,
int limit) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!LazyInit(DbCreationPolicy::kIgnoreIfAbsent))
return {};
// Get at most |limit| entries in the conversions table with a |report_time|
// less than |max_report_time| and their matching information from the
// impression table. Negatives are treated as no limit
// (https://sqlite.org/lang_select.html#limitoffset).
static constexpr char kGetReportsSql[] =
"SELECT C.conversion_data,C.conversion_time,C.report_time,"
"C.conversion_id,C.priority,C.failed_send_attempts,C.external_report_id,"
"I.impression_origin,I.conversion_origin,I.reporting_origin,"
"I.impression_data,I.impression_time,I.expiry_time,I.impression_id,"
"I.source_type,I.priority,I.attributed_truthfully "
"FROM conversions C JOIN impressions I ON "
"C.impression_id = I.impression_id WHERE C.report_time <= ? "
"LIMIT ?";
sql::Statement statement(
db_->GetCachedStatement(SQL_FROM_HERE, kGetReportsSql));
statement.BindTime(0, max_report_time);
statement.BindInt(1, limit);
std::vector<AttributionReport> reports;
while (statement.Step()) {
absl::optional<AttributionReport> report =
ReadReportFromStatement(statement);
if (report.has_value())
reports.push_back(std::move(*report));
}
if (!statement.Succeeded())
return {};
return reports;
}
absl::optional<AttributionReport> AttributionStorageSql::GetReport(
AttributionReport::Id conversion_id) {
static constexpr char kGetReportSql[] =
"SELECT C.conversion_data,C.conversion_time,C.report_time,"
"C.conversion_id,C.priority,C.failed_send_attempts,C.external_report_id,"
"I.impression_origin,I.conversion_origin,I.reporting_origin,"
"I.impression_data,I.impression_time,I.expiry_time,I.impression_id,"
"I.source_type,I.priority,I.attributed_truthfully "
"FROM conversions C JOIN impressions I ON "
"C.impression_id = I.impression_id WHERE C.conversion_id = ?";
sql::Statement statement(
db_->GetCachedStatement(SQL_FROM_HERE, kGetReportSql));
statement.BindInt64(0, *conversion_id);
if (!statement.Step())
return absl::nullopt;
return ReadReportFromStatement(statement);
}
bool AttributionStorageSql::DeleteExpiredSources() {
const int kMaxDeletesPerBatch = 100;
auto delete_sources_from_paged_select =
[this](sql::Statement& statement)
VALID_CONTEXT_REQUIRED(sequence_checker_) -> bool {
while (true) {
std::vector<StorableSource::Id> source_ids;
while (statement.Step()) {
StorableSource::Id source_id(statement.ColumnInt64(0));
source_ids.push_back(source_id);
}
if (!statement.Succeeded())
return false;
if (source_ids.empty())
return true;
if (!DeleteSources(source_ids))
return false;
// Deliberately retain the existing bound vars so that the limit, etc are
// the same.
statement.Reset(/*clear_bound_vars=*/false);
}
};
// Delete all sources that have no associated reports and are past
// their expiry time. Optimized by |kImpressionExpiryIndexSql|.
static constexpr char kSelectExpiredSourcesSql[] =
"SELECT impression_id FROM impressions "
DCHECK_SQL_INDEXED_BY("impression_expiry_idx")
"WHERE expiry_time <= ? AND "
"impression_id NOT IN("
"SELECT impression_id FROM conversions"
DCHECK_SQL_INDEXED_BY("conversion_impression_id_idx")
")LIMIT ?";
sql::Statement select_expired_statement(
db_->GetCachedStatement(SQL_FROM_HERE, kSelectExpiredSourcesSql));
select_expired_statement.BindTime(0, clock_->Now());
select_expired_statement.BindInt(1, kMaxDeletesPerBatch);
if (!delete_sources_from_paged_select(select_expired_statement))
return false;
// Delete all sources that have no associated reports and are
// inactive. This is done in a separate statement from
// |kSelectExpiredSourcesSql| so that each query is optimized by an index.
// Optimized by |kConversionDestinationIndexSql|.
static constexpr char kSelectInactiveSourcesSql[] =
"SELECT impression_id FROM impressions "
DCHECK_SQL_INDEXED_BY("conversion_destination_idx")
"WHERE active = 0 AND "
"impression_id NOT IN("
"SELECT impression_id FROM conversions"
DCHECK_SQL_INDEXED_BY("conversion_impression_id_idx")
")LIMIT ?";
sql::Statement select_inactive_statement(
db_->GetCachedStatement(SQL_FROM_HERE, kSelectInactiveSourcesSql));
select_inactive_statement.BindInt(0, kMaxDeletesPerBatch);
return delete_sources_from_paged_select(select_inactive_statement);
}
bool AttributionStorageSql::DeleteReport(AttributionReport::Id report_id) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!LazyInit(DbCreationPolicy::kIgnoreIfAbsent))
return true;
return DeleteReportInternal(report_id);
}
bool AttributionStorageSql::DeleteReportInternal(
AttributionReport::Id report_id) {
static constexpr char kDeleteReportSql[] =
"DELETE FROM conversions WHERE conversion_id = ?";
sql::Statement statement(
db_->GetCachedStatement(SQL_FROM_HERE, kDeleteReportSql));
statement.BindInt64(0, *report_id);
return statement.Run();
}
bool AttributionStorageSql::UpdateReportForSendFailure(
AttributionReport::Id conversion_id,
base::Time new_report_time) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!LazyInit(DbCreationPolicy::kIgnoreIfAbsent))
return false;
static constexpr char kUpdateFailedReportSql[] =
"UPDATE conversions SET report_time = ?,"
"failed_send_attempts = failed_send_attempts + 1 "
"WHERE conversion_id = ?";
sql::Statement statement(
db_->GetCachedStatement(SQL_FROM_HERE, kUpdateFailedReportSql));
statement.BindTime(0, new_report_time);
statement.BindInt64(1, *conversion_id);
return statement.Run() && db_->GetLastChangeCount() == 1;
}
void AttributionStorageSql::ClearData(
base::Time delete_begin,
base::Time delete_end,
base::RepeatingCallback<bool(const url::Origin&)> filter) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!LazyInit(DbCreationPolicy::kIgnoreIfAbsent))
return;
SCOPED_UMA_HISTOGRAM_TIMER("Conversions.ClearDataTime");
if (filter.is_null()) {
ClearAllDataInRange(delete_begin, delete_end);
return;
}
// Measure the time it takes to perform a clear with a filter separately from
// the above histogram.
SCOPED_UMA_HISTOGRAM_TIMER("Conversions.Storage.ClearDataWithFilterDuration");
// TODO(csharrison, johnidel): This query can be split up and optimized by
// adding indexes on the impression_time and conversion_time columns.
// See this comment for more information:
// crrev.com/c/2150071/4/content/browser/conversions/conversion_storage_sql.cc#342
static constexpr char kScanCandidateData[] =
"SELECT I.impression_origin,I.conversion_origin,I.reporting_origin,"
"I.impression_id,C.conversion_id "
"FROM impressions I LEFT JOIN conversions C ON "
"C.impression_id = I.impression_id WHERE"
"(I.impression_time BETWEEN ?1 AND ?2)OR"
"(C.conversion_time BETWEEN ?1 AND ?2)";
sql::Statement statement(
db_->GetCachedStatement(SQL_FROM_HERE, kScanCandidateData));
statement.BindTime(0, delete_begin);
statement.BindTime(1, delete_end);
std::vector<StorableSource::Id> source_ids_to_delete;
std::vector<AttributionReport::Id> conversion_ids_to_delete;
while (statement.Step()) {
if (filter.Run(DeserializeOrigin(statement.ColumnString(0))) ||
filter.Run(DeserializeOrigin(statement.ColumnString(1))) ||
filter.Run(DeserializeOrigin(statement.ColumnString(2)))) {
source_ids_to_delete.emplace_back(statement.ColumnInt64(3));
if (statement.GetColumnType(4) != sql::ColumnType::kNull)
conversion_ids_to_delete.emplace_back(statement.ColumnInt64(4));
}
}
// TODO(csharrison, johnidel): Should we consider poisoning the DB if some of
// the delete operations fail?
if (!statement.Succeeded())
return;
// Since multiple reports can be associated with a single source,
// deduplicate source IDs using a set to avoid redundant DB operations
// below.
source_ids_to_delete =
base::flat_set<StorableSource::Id>(std::move(source_ids_to_delete))
.extract();
// Delete the data in a transaction to avoid cases where the source part
// of a report is deleted without deleting the associated report, or
// vice versa.
sql::Transaction transaction(db_.get());
if (!transaction.Begin())
return;
if (!DeleteSources(source_ids_to_delete))
return;
for (AttributionReport::Id conversion_id : conversion_ids_to_delete) {
if (!DeleteReportInternal(conversion_id))
return;
}
int num_reports_deleted = static_cast<int>(conversion_ids_to_delete.size());
// Careful! At this point we can still have some vestigial entries in the DB.
// For example, if a source has two reports, and one report is
// deleted, the above logic will delete the source as well, leaving the
// second report in limbo (it was not in the deletion time range).
// Delete all unattributed reports here to ensure everything is cleaned
// up.
static constexpr char kDeleteVestigialConversionSql[] =
"DELETE FROM conversions WHERE impression_id = ?";
sql::Statement delete_vestigial_statement(
db_->GetCachedStatement(SQL_FROM_HERE, kDeleteVestigialConversionSql));
for (StorableSource::Id source_id : source_ids_to_delete) {
delete_vestigial_statement.Reset(/*clear_bound_vars=*/true);
delete_vestigial_statement.BindInt64(0, *source_id);
if (!delete_vestigial_statement.Run())
return;
num_reports_deleted += db_->GetLastChangeCount();
}
if (!rate_limit_table_.ClearDataForSourceIds(db_.get(),
source_ids_to_delete)) {
return;
}
if (!rate_limit_table_.ClearDataForOriginsInRange(db_.get(), delete_begin,
delete_end, filter)) {
return;
}
if (!transaction.Commit())
return;
RecordSourcesDeleted(static_cast<int>(source_ids_to_delete.size()));
RecordReportsDeleted(num_reports_deleted);
}
void AttributionStorageSql::ClearAllDataInRange(base::Time delete_begin,
base::Time delete_end) {
// Browsing data remover will call this with null |delete_begin|, but also
// perform the ClearAllDataAllTime optimization if |delete_begin| is
// base::Time::Min().
if ((delete_begin.is_null() || delete_begin.is_min()) &&
delete_end.is_max()) {
ClearAllDataAllTime();
return;
}
sql::Transaction transaction(db_.get());
if (!transaction.Begin())
return;
// Select all sources and reports in the given time range.
// Note: This should follow the same basic logic in ClearData, with the
// assumption that all origins match the filter. We cannot use a DELETE
// statement, because we need the list of |impression_id|s to delete from the
// |rate_limits| table.
//
// Optimizing these queries are also tough, see this comment for an idea:
// http://crrev.com/c/2150071/12/content/browser/conversions/conversion_storage_sql.cc#468
static constexpr char kSelectSourceRangeSql[] =
"SELECT impression_id FROM impressions WHERE(impression_time BETWEEN ?1 "
"AND ?2)OR "
"impression_id IN(SELECT impression_id FROM conversions "
"WHERE conversion_time BETWEEN ?1 AND ?2)";
sql::Statement select_sources_statement(
db_->GetCachedStatement(SQL_FROM_HERE, kSelectSourceRangeSql));
select_sources_statement.BindTime(0, delete_begin);
select_sources_statement.BindTime(1, delete_end);
std::vector<StorableSource::Id> source_ids_to_delete;
while (select_sources_statement.Step()) {
StorableSource::Id source_id(select_sources_statement.ColumnInt64(0));
source_ids_to_delete.push_back(source_id);
}
if (!select_sources_statement.Succeeded())
return;
if (!DeleteSources(source_ids_to_delete))
return;
static constexpr char kDeleteReportRangeSql[] =
"DELETE FROM conversions WHERE(conversion_time BETWEEN ? AND ?)"
"OR impression_id NOT IN(SELECT impression_id FROM impressions)";
sql::Statement delete_reports_statement(
db_->GetCachedStatement(SQL_FROM_HERE, kDeleteReportRangeSql));
delete_reports_statement.BindTime(0, delete_begin);
delete_reports_statement.BindTime(1, delete_end);
if (!delete_reports_statement.Run())
return;
int num_reports_deleted = db_->GetLastChangeCount();
if (!rate_limit_table_.ClearDataForSourceIds(db_.get(), source_ids_to_delete))
return;
if (!rate_limit_table_.ClearAllDataInRange(db_.get(), delete_begin,
delete_end))
return;
if (!transaction.Commit())
return;
RecordSourcesDeleted(static_cast<int>(source_ids_to_delete.size()));
RecordReportsDeleted(num_reports_deleted);
}
void AttributionStorageSql::ClearAllDataAllTime() {
sql::Transaction transaction(db_.get());
if (!transaction.Begin())
return;
static constexpr char kDeleteAllReportsSql[] = "DELETE FROM conversions";
sql::Statement delete_all_reports_statement(
db_->GetCachedStatement(SQL_FROM_HERE, kDeleteAllReportsSql));
if (!delete_all_reports_statement.Run())
return;
int num_reports_deleted = db_->GetLastChangeCount();
static constexpr char kDeleteAllSourcesSql[] = "DELETE FROM impressions";
sql::Statement delete_all_sources_statement(
db_->GetCachedStatement(SQL_FROM_HERE, kDeleteAllSourcesSql));
if (!delete_all_sources_statement.Run())
return;
int num_sources_deleted = db_->GetLastChangeCount();
static constexpr char kDeleteAllDedupKeysSql[] = "DELETE FROM dedup_keys";
sql::Statement delete_all_dedup_keys_statement(
db_->GetCachedStatement(SQL_FROM_HERE, kDeleteAllDedupKeysSql));
if (!delete_all_dedup_keys_statement.Run())
return;
if (!rate_limit_table_.ClearAllDataAllTime(db_.get()))
return;
if (!transaction.Commit())
return;
RecordSourcesDeleted(num_sources_deleted);
RecordReportsDeleted(num_reports_deleted);
}
bool AttributionStorageSql::HasCapacityForStoringSource(
const std::string& serialized_origin) {
static constexpr char kCountSourcesSql[] =
// clang-format off
"SELECT COUNT(impression_origin)FROM impressions "
DCHECK_SQL_INDEXED_BY("impression_origin_idx")
"WHERE impression_origin = ?"; // clang-format on
sql::Statement statement(
db_->GetCachedStatement(SQL_FROM_HERE, kCountSourcesSql));
statement.BindString(0, serialized_origin);
if (!statement.Step())
return false;
int64_t count = statement.ColumnInt64(0);
return count < delegate_->GetMaxSourcesPerOrigin();
}
AttributionStorageSql::ReportAlreadyStoredStatus
AttributionStorageSql::ReportAlreadyStored(StorableSource::Id source_id,
absl::optional<int64_t> dedup_key) {
if (!dedup_key.has_value())
return ReportAlreadyStoredStatus::kNotStored;
static constexpr char kCountReportsSql[] =
"SELECT COUNT(*)FROM dedup_keys "
"WHERE impression_id = ? AND dedup_key = ?";
sql::Statement statement(
db_->GetCachedStatement(SQL_FROM_HERE, kCountReportsSql));
statement.BindInt64(0, *source_id);
statement.BindInt64(1, *dedup_key);
// If there's an error, return true so `MaybeCreateAndStoreReport()`
// returns early.
if (!statement.Step())
return ReportAlreadyStoredStatus::kError;
int64_t count = statement.ColumnInt64(0);
return count > 0 ? ReportAlreadyStoredStatus::kStored
: ReportAlreadyStoredStatus::kNotStored;
}
AttributionStorageSql::ConversionCapacityStatus
AttributionStorageSql::CapacityForStoringReport(
const std::string& serialized_origin) {
// This query should be reasonably optimized via
// `kConversionDestinationIndexSql`. The conversion origin is the second
// column in a multi-column index where the first column is just a boolean.
// Therefore the second column in the index should be very well-sorted.
//
// Note: to take advantage of this, we need to hint to the query planner that
// |active| is a boolean, so include it in the conditional.
static constexpr char kCountReportsSql[] =
"SELECT COUNT(conversion_id)FROM conversions C "
"JOIN impressions I "
DCHECK_SQL_INDEXED_BY("conversion_destination_idx")
"ON I.impression_id = C.impression_id "
"WHERE I.conversion_destination = ? AND(active BETWEEN 0 AND 1)";
sql::Statement statement(
db_->GetCachedStatement(SQL_FROM_HERE, kCountReportsSql));
statement.BindString(0, serialized_origin);
if (!statement.Step())
return ConversionCapacityStatus::kError;
int64_t count = statement.ColumnInt64(0);
return count < delegate_->GetMaxAttributionsPerOrigin()
? ConversionCapacityStatus::kHasCapacity
: ConversionCapacityStatus::kNoCapacity;
}
std::vector<StorableSource> AttributionStorageSql::GetActiveSources(int limit) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!LazyInit(DbCreationPolicy::kIgnoreIfAbsent))
return {};
// Negatives are treated as no limit
// (https://sqlite.org/lang_select.html#limitoffset).
static constexpr char kGetActiveSourcesSql[] =
"SELECT impression_id,impression_data,impression_origin,"
"conversion_origin,reporting_origin,impression_time,expiry_time,"
"source_type,attributed_truthfully,priority "
"FROM impressions "
"WHERE active = 1 and expiry_time > ? "
"LIMIT ?";
sql::Statement statement(
db_->GetCachedStatement(SQL_FROM_HERE, kGetActiveSourcesSql));
statement.BindTime(0, clock_->Now());
statement.BindInt(1, limit);
std::vector<StorableSource> sources;
while (statement.Step()) {
absl::optional<StorableSource> source = ReadSourceFromStatement(statement);
if (source.has_value())
sources.push_back(std::move(*source));
}
if (!statement.Succeeded())
return {};
for (auto& source : sources) {
absl::optional<std::vector<int64_t>> dedup_keys =
ReadDedupKeys(*source.impression_id());
if (!dedup_keys.has_value())
return {};
source.SetDedupKeys(std::move(*dedup_keys));
}
return sources;
}
absl::optional<std::vector<int64_t>> AttributionStorageSql::ReadDedupKeys(
StorableSource::Id source_id) {
static constexpr char kDedupKeySql[] =
"SELECT dedup_key FROM dedup_keys WHERE impression_id = ?";
sql::Statement statement(
db_->GetCachedStatement(SQL_FROM_HERE, kDedupKeySql));
statement.BindInt64(0, *source_id);
std::vector<int64_t> dedup_keys;
while (statement.Step()) {
dedup_keys.push_back(statement.ColumnInt64(0));
}
if (!statement.Succeeded())
return absl ::nullopt;
return dedup_keys;
}
void AttributionStorageSql::HandleInitializationFailure(
const InitStatus status) {
RecordInitializationStatus(status);
db_.reset();
db_init_status_ = DbStatus::kClosed;
}
bool AttributionStorageSql::LazyInit(DbCreationPolicy creation_policy) {
if (!db_init_status_) {
if (g_run_in_memory_) {
db_init_status_ = DbStatus::kDeferringCreation;
} else {
db_init_status_ = base::PathExists(path_to_database_)
? DbStatus::kDeferringOpen
: DbStatus::kDeferringCreation;
}
}
switch (*db_init_status_) {
// If the database file has not been created, we defer creation until
// storage needs to be used for an operation which needs to operate even on
// an empty database.
case DbStatus::kDeferringCreation:
if (creation_policy == DbCreationPolicy::kIgnoreIfAbsent)
return false;
break;
case DbStatus::kDeferringOpen:
break;
case DbStatus::kClosed:
return false;
case DbStatus::kOpen:
return true;
}
db_ = std::make_unique<sql::Database>(sql::DatabaseOptions{
.exclusive_locking = true, .page_size = 4096, .cache_size = 32});
db_->set_histogram_tag("Conversions");
// Supply this callback with a weak_ptr to avoid calling the error callback
// after |this| has been deleted.
db_->set_error_callback(
base::BindRepeating(&AttributionStorageSql::DatabaseErrorCallback,
weak_factory_.GetWeakPtr()));
if (path_to_database_.value() == kInMemoryPath) {
if (!db_->OpenInMemory()) {
HandleInitializationFailure(InitStatus::kFailedToOpenDbInMemory);
return false;
}
} else {
const base::FilePath& dir = path_to_database_.DirName();
const bool dir_exists_or_was_created =
base::DirectoryExists(dir) || base::CreateDirectory(dir);
if (dir_exists_or_was_created == false) {
DLOG(ERROR) << "Failed to create directory for Conversion database";
HandleInitializationFailure(InitStatus::kFailedToCreateDir);
return false;
}
if (db_->Open(path_to_database_) == false) {
HandleInitializationFailure(InitStatus::kFailedToOpenDbFile);
return false;
}
}
if (InitializeSchema(db_init_status_ == DbStatus::kDeferringCreation) ==
false) {
HandleInitializationFailure(InitStatus::kFailedToInitializeSchema);
return false;
}
db_init_status_ = DbStatus::kOpen;
RecordInitializationStatus(InitStatus::kSuccess);
return true;
}
bool AttributionStorageSql::InitializeSchema(bool db_empty) {
if (db_empty)
return CreateSchema();
int current_version = kCurrentVersionNumber;
if (!sql::MetaTable::DoesTableExist(db_.get())) {
// Version 1 of the schema did not have a metadata table.
current_version = 1;
if (!meta_table_.Init(db_.get(), current_version, kCompatibleVersionNumber))
return false;
} else {
if (!meta_table_.Init(db_.get(), current_version, kCompatibleVersionNumber))
return false;
current_version = meta_table_.GetVersionNumber();
}
if (current_version == kCurrentVersionNumber)
return true;
if (current_version <= kDeprecatedVersionNumber) {
// Note that this also razes the meta table, so it will need to be
// initialized again.
db_->Raze();
return CreateSchema();
}
if (meta_table_.GetCompatibleVersionNumber() > kCurrentVersionNumber) {
// In this case the database version is too new to be used. The DB will
// never work until Chrome is re-upgraded. Assume the user will continue
// using this Chrome version and raze the DB to get attribution reporting
// working.
db_->Raze();
return CreateSchema();
}
return UpgradeAttributionStorageSqlSchema(db_.get(), &meta_table_,
delegate_.get());
}
bool AttributionStorageSql::CreateSchema() {
base::ThreadTicks start_timestamp = base::ThreadTicks::Now();
// TODO(johnidel, csharrison): Many sources will share a target origin and
// a reporting origin, so it makes sense to make a "shared string" table for
// these to save disk / memory. However, this complicates the schema a lot, so
// probably best to only do it if there's performance problems here.
//
// Origins usually aren't _that_ big compared to a 64 bit integer(8 bytes).
//
// All of the columns in this table are designed to be "const" except for
// |num_conversions| and |active| which are updated when a new report is
// received. |num_conversions| is the number of times a report has
// been created for a given source. |delegate_| can choose to enforce a
// maximum limit on this. |active| indicates whether a source is able to
// create new associated reports. |active| can be unset on a number
// of conditions:
// - A source converted too many times.
// - A new source was stored after a source converted, making it
// ineligible for new sources due to the attribution model documented
// in `StoreSource()`.
// - A source has expired but still has unsent reports in the
// conversions table meaning it cannot be deleted yet.
// |source_type| is the type of the source of the source, currently always
// |kNavigation|.
// |attributed_truthfully| corresponds to the
// |StorableSource::AttributionLogic| enum.
// |impression_site| is used to optimize the lookup of sources;
// |StorableSource::ImpressionSite| is always derived from the origin.
//
// |impression_id| uses AUTOINCREMENT to ensure that IDs aren't reused over
// the lifetime of the DB.
static constexpr char kImpressionTableSql[] =
"CREATE TABLE IF NOT EXISTS impressions"
"(impression_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,"
"impression_data INTEGER NOT NULL,"
"impression_origin TEXT NOT NULL,"
"conversion_origin TEXT NOT NULL,"
"reporting_origin TEXT NOT NULL,"
"impression_time INTEGER NOT NULL,"
"expiry_time INTEGER NOT NULL,"
"num_conversions INTEGER DEFAULT 0,"
"active INTEGER DEFAULT 1,"
"conversion_destination TEXT NOT NULL,"
"source_type INTEGER NOT NULL,"
"attributed_truthfully INTEGER NOT NULL,"
"priority INTEGER NOT NULL,"
"impression_site TEXT NOT NULL)";
if (!db_->Execute(kImpressionTableSql))
return false;
// Optimizes source lookup by conversion destination/reporting origin
// during calls to `MaybeCreateAndStoreReport()`,
// `StoreSource()`, `DeleteExpiredSources()`. Sources and
// triggers are considered matching if they share this pair. These calls
// need to distinguish between active and inactive reports, so include
// |active| in the index.
static constexpr char kConversionDestinationIndexSql[] =
"CREATE INDEX IF NOT EXISTS conversion_destination_idx "
"ON impressions(active,conversion_destination,reporting_origin)";
if (!db_->Execute(kConversionDestinationIndexSql))
return false;
// Optimizes calls to `DeleteExpiredSources()` and
// `MaybeCreateAndStoreReport()` by indexing sources by expiry
// time. Both calls require only returning sources that expire after a
// given time.
static constexpr char kImpressionExpiryIndexSql[] =
"CREATE INDEX IF NOT EXISTS impression_expiry_idx "
"ON impressions(expiry_time)";
if (!db_->Execute(kImpressionExpiryIndexSql))
return false;
// Optimizes counting sources by source origin.
static constexpr char kImpressionOriginIndexSql[] =
"CREATE INDEX IF NOT EXISTS impression_origin_idx "
"ON impressions(impression_origin)";
if (!db_->Execute(kImpressionOriginIndexSql))
return false;
// Optimizes `EnsureCapacityForPendingDestinationLimit()`, which only needs to
// examine active, unconverted, event-source sources.
static constexpr char kEventSourceImpressionSiteIndexSql[] =
"CREATE INDEX IF NOT EXISTS event_source_impression_site_idx "
"ON impressions(impression_site)"
"WHERE active = 1 AND num_conversions = 0 AND source_type = 1";
if (!db_->Execute(kEventSourceImpressionSiteIndexSql))
return false;
// All columns in this table are const except |report_time| and
// |failed_send_attempts|,
// which are updated when a report fails to send, as part of retries.
// |impression_id| is the primary key of a row in the [impressions] table,
// [impressions.impression_id]. |conversion_time| is the time at which the
// trigger was registered, and should be used for clearing site data.
// |report_time| is the time a <report, source> pair should be
// reported, and is specified by |delegate_|.
//
// |conversion_id| uses AUTOINCREMENT to ensure that IDs aren't reused over
// the lifetime of the DB.
static constexpr char kConversionTableSql[] =
"CREATE TABLE IF NOT EXISTS conversions"
"(conversion_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,"
"impression_id INTEGER NOT NULL,"
"conversion_data INTEGER NOT NULL,"
"conversion_time INTEGER NOT NULL,"
"report_time INTEGER NOT NULL,"
"priority INTEGER NOT NULL,"
"failed_send_attempts INTEGER NOT NULL,"
"external_report_id TEXT NOT NULL)";
if (!db_->Execute(kConversionTableSql))
return false;
// Optimize sorting reports by report time for calls to
// GetAttributionsToReport(). The reports with the earliest report times are
// periodically fetched from storage to be sent.
static constexpr char kConversionReportTimeIndexSql[] =
"CREATE INDEX IF NOT EXISTS conversion_report_idx "
"ON conversions(report_time)";
if (!db_->Execute(kConversionReportTimeIndexSql))
return false;
// Want to optimize report look up by source id. This allows us to
// quickly know if an expired source can be deleted safely if it has no
// corresponding pending reports during calls to
// `DeleteExpiredSources()`.
static constexpr char kConversionImpressionIdIndexSql[] =
"CREATE INDEX IF NOT EXISTS conversion_impression_id_idx "
"ON conversions(impression_id)";
if (!db_->Execute(kConversionImpressionIdIndexSql))
return false;
if (!rate_limit_table_.CreateTable(db_.get()))
return false;
static constexpr char kDedupKeyTableSql[] =
"CREATE TABLE IF NOT EXISTS dedup_keys"
"(impression_id INTEGER NOT NULL,"
"dedup_key INTEGER NOT NULL,"
"PRIMARY KEY(impression_id,dedup_key))WITHOUT ROWID";
if (!db_->Execute(kDedupKeyTableSql))
return false;
if (!meta_table_.Init(db_.get(), kCurrentVersionNumber,
kCompatibleVersionNumber)) {
return false;
}
base::UmaHistogramMediumTimes("Conversions.Storage.CreationTime",
base::ThreadTicks::Now() - start_timestamp);
return true;
}
void AttributionStorageSql::DatabaseErrorCallback(int extended_error,
sql::Statement* stmt) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// Attempt to recover a corrupt database, unless it is setup in memory.
if (sql::Recovery::ShouldRecover(extended_error) &&
(path_to_database_.value() != kInMemoryPath)) {
// Prevent reentrant calls.
db_->reset_error_callback();
// After this call, the |db_| handle is poisoned so that future calls will
// return errors until the handle is re-opened.
sql::Recovery::RecoverDatabaseWithMetaVersion(db_.get(), path_to_database_);
// The DLOG(FATAL) below is intended to draw immediate attention to errors
// in newly-written code. Database corruption is generally a result of OS
// or hardware issues, not coding errors at the client level, so displaying
// the error would probably lead to confusion. The ignored call signals the
// test-expectation framework that the error was handled.
ignore_result(sql::Database::IsExpectedSqliteError(extended_error));
return;
}
// The default handling is to assert on debug and to ignore on release.
if (!sql::Database::IsExpectedSqliteError(extended_error) &&
!ignore_errors_for_testing_)
DLOG(FATAL) << db_->GetErrorMessage();
// Consider the database closed if we did not attempt to recover so we did
// not produce further errors.
db_init_status_ = DbStatus::kClosed;
}
bool AttributionStorageSql::EnsureCapacityForPendingDestinationLimit(
const StorableSource& source) {
// TODO(apaseltiner): Add metrics for how this behaves so we can see how often
// sites are hitting the limit.
if (source.source_type() != StorableSource::SourceType::kEvent)
return true;
static_assert(static_cast<int>(StorableSource::SourceType::kEvent) == 1,
"Update the SQL statement below and this condition");
const std::string serialized_conversion_destination =
source.ConversionDestination().Serialize();
// Optimized by `kEventSourceImpressionSiteIndexSql`.
static constexpr char kSelectSourcesSql[] =
"SELECT impression_id,conversion_destination FROM impressions "
DCHECK_SQL_INDEXED_BY("event_source_impression_site_idx")
"WHERE impression_site = ? AND source_type = 1 "
"AND active = 1 AND num_conversions = 0 "
"ORDER BY impression_time ASC";
sql::Statement statement(
db_->GetCachedStatement(SQL_FROM_HERE, kSelectSourcesSql));
statement.BindString(0, source.ImpressionSite().Serialize());
base::flat_map<std::string, size_t> conversion_destinations;
struct SourceData {
StorableSource::Id source_id;
std::string conversion_destination;
};
std::vector<SourceData> sources_by_impression_time;
while (statement.Step()) {
SourceData impression_data = {
.source_id = StorableSource::Id(statement.ColumnInt64(0)),
.conversion_destination = statement.ColumnString(1),
};
// If there's already a source matching the to-be-stored
// `impression_site` and `conversion_destination`, then the unique count
// won't be changed, so there's nothing else to do.
if (impression_data.conversion_destination ==
serialized_conversion_destination) {
return true;
}
conversion_destinations[impression_data.conversion_destination]++;
sources_by_impression_time.push_back(std::move(impression_data));
}
if (!statement.Succeeded())
return false;
const int max = delegate_->GetMaxAttributionDestinationsPerEventSource();
// TODO(apaseltiner): We could just make
// `GetMaxAttributionDestinationsPerEventSource()` return `size_t`, but it
// would be inconsistent with the other `AttributionStorage::Delegate`
// methods.
DCHECK_GT(max, 0);
// Otherwise, if there's capacity for the new `conversion_destination` to be
// stored for the `impression_site`, there's nothing else to do.
if (conversion_destinations.size() < static_cast<size_t>(max))
return true;
// Otherwise, delete sources in order by `impression_time` until the
// number of distinct `conversion_destination`s is under `max`.
std::vector<StorableSource::Id> source_ids_to_delete;
for (const auto& source_data : sources_by_impression_time) {
auto it = conversion_destinations.find(source_data.conversion_destination);
DCHECK(it != conversion_destinations.end());
it->second--;
if (it->second == 0)
conversion_destinations.erase(it);
source_ids_to_delete.push_back(source_data.source_id);
if (conversion_destinations.size() < static_cast<size_t>(max))
break;
}
return DeleteSources(source_ids_to_delete);
// Because this is limited to active sources with `num_conversions = 0`,
// we should be guaranteed that there is not any corresponding data in the
// rate limit table or the report table.
}
bool AttributionStorageSql::DeleteSources(
const std::vector<StorableSource::Id>& source_ids) {
sql::Transaction transaction(db_.get());
if (!transaction.Begin())
return false;
static constexpr char kDeleteSourcesSql[] =
"DELETE FROM impressions WHERE impression_id = ?";
sql::Statement delete_impression_statement(
db_->GetCachedStatement(SQL_FROM_HERE, kDeleteSourcesSql));
for (StorableSource::Id source_id : source_ids) {
delete_impression_statement.Reset(/*clear_bound_vars=*/true);
delete_impression_statement.BindInt64(0, *source_id);
if (!delete_impression_statement.Run())
return false;
}
static constexpr char kDeleteDedupKeySql[] =
"DELETE FROM dedup_keys WHERE impression_id = ?";
sql::Statement delete_dedup_key_statement(
db_->GetCachedStatement(SQL_FROM_HERE, kDeleteDedupKeySql));
for (StorableSource::Id source_id : source_ids) {
delete_dedup_key_statement.Reset(/*clear_bound_vars=*/true);
delete_dedup_key_statement.BindInt64(0, *source_id);
if (!delete_dedup_key_statement.Run())
return false;
}
return transaction.Commit();
}
} // namespace content