blob: 49f8284cac17474b6e6a418253a240149c1cb5c9 [file] [log] [blame]
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/sync/model_impl/client_tag_based_model_type_processor.h"
#include <utility>
#include <vector>
#include "base/bind.h"
#include "base/location.h"
#include "base/metrics/histogram_functions.h"
#include "base/metrics/histogram_macros.h"
#include "base/strings/stringprintf.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "base/trace_event/memory_usage_estimator.h"
#include "components/sync/base/data_type_histogram.h"
#include "components/sync/base/model_type.h"
#include "components/sync/base/time.h"
#include "components/sync/engine/commit_queue.h"
#include "components/sync/engine/data_type_activation_response.h"
#include "components/sync/engine/model_type_processor_proxy.h"
#include "components/sync/model_impl/client_tag_based_remote_update_handler.h"
#include "components/sync/model_impl/processor_entity.h"
#include "components/sync/protocol/proto_memory_estimations.h"
#include "components/sync/protocol/proto_value_conversions.h"
namespace syncer {
namespace {
int CountNonTombstoneEntries(
const std::map<ClientTagHash, std::unique_ptr<ProcessorEntity>>& entities) {
int count = 0;
for (const auto& kv : entities) {
if (!kv.second->metadata().is_deleted()) {
++count;
}
}
return count;
}
} // namespace
ClientTagBasedModelTypeProcessor::ClientTagBasedModelTypeProcessor(
ModelType type,
const base::RepeatingClosure& dump_stack)
: ClientTagBasedModelTypeProcessor(type,
dump_stack,
CommitOnlyTypes().Has(type)) {}
ClientTagBasedModelTypeProcessor::ClientTagBasedModelTypeProcessor(
ModelType type,
const base::RepeatingClosure& dump_stack,
bool commit_only)
: type_(type),
bridge_(nullptr),
dump_stack_(dump_stack),
commit_only_(commit_only) {
ResetState(CLEAR_METADATA);
}
ClientTagBasedModelTypeProcessor::~ClientTagBasedModelTypeProcessor() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
}
void ClientTagBasedModelTypeProcessor::OnSyncStarting(
const DataTypeActivationRequest& request,
StartCallback start_callback) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DVLOG(1) << "Sync is starting for " << ModelTypeToString(type_);
DCHECK(request.error_handler) << ModelTypeToString(type_);
DCHECK(start_callback) << ModelTypeToString(type_);
DCHECK(!start_callback_) << ModelTypeToString(type_);
DCHECK(!IsConnected()) << ModelTypeToString(type_);
start_callback_ = std::move(start_callback);
activation_request_ = request;
// Notify the bridge sync is starting before calling the |start_callback_|
// which in turn creates the worker.
bridge_->OnSyncStarting(request);
ConnectIfReady();
}
void ClientTagBasedModelTypeProcessor::OnModelStarting(
ModelTypeSyncBridge* bridge) {
DCHECK(bridge);
bridge_ = bridge;
}
void ClientTagBasedModelTypeProcessor::ModelReadyToSync(
std::unique_ptr<MetadataBatch> batch) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(entities_.empty());
DCHECK(!model_ready_to_sync_);
model_ready_to_sync_ = true;
// The model already experienced an error; abort;
if (model_error_)
return;
if (batch->GetModelTypeState().initial_sync_done()) {
EntityMetadataMap metadata_map(batch->TakeAllMetadata());
for (auto it = metadata_map.begin(); it != metadata_map.end(); it++) {
std::unique_ptr<sync_pb::EntityMetadata> metadata(std::move(it->second));
// As CreateFromMetadata() takes sync_pb::EntityMetadata by value, move it
// to avoid copying.
std::unique_ptr<ProcessorEntity> entity =
ProcessorEntity::CreateFromMetadata(it->first, std::move(*metadata));
ClientTagHash client_tag_hash =
ClientTagHash::FromHashed(entity->metadata().client_tag_hash());
storage_key_to_tag_hash_[entity->storage_key()] = client_tag_hash;
entities_[client_tag_hash] = std::move(entity);
}
model_type_state_ = batch->GetModelTypeState();
} else {
// In older versions of the binary, commit-only types did not persist
// initial_sync_done(). So this branch can be exercised for commit-only
// types exactly once as an upgrade flow.
// TODO(crbug.com/872360): This DCHECK can currently trigger if the user's
// persisted Sync metadata is in an inconsistent state.
DCHECK(commit_only_ || batch->TakeAllMetadata().empty())
<< ModelTypeToString(type_);
// First time syncing; initialize metadata.
model_type_state_.mutable_progress_marker()->set_data_type_id(
GetSpecificsFieldNumberFromModelType(type_));
}
ConnectIfReady();
}
bool ClientTagBasedModelTypeProcessor::IsAllowingChanges() const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// Changes can be handled correctly even before pending data is loaded.
return model_ready_to_sync_;
}
void ClientTagBasedModelTypeProcessor::ConnectIfReady() {
if (!start_callback_) {
return;
}
if (model_error_) {
activation_request_.error_handler.Run(model_error_.value());
start_callback_.Reset();
return;
}
if (!model_ready_to_sync_) {
return;
}
if (!model_type_state_.has_cache_guid()) {
// Initialize the cache_guid for old clients that didn't persist it.
model_type_state_.set_cache_guid(activation_request_.cache_guid);
}
// Check for invalid persisted metadata.
if (model_type_state_.cache_guid() != activation_request_.cache_guid ||
model_type_state_.progress_marker().data_type_id() !=
GetSpecificsFieldNumberFromModelType(type_)) {
// There is a mismatch between the cache guid or the data type id stored in
// |model_type_state_| and the one received from sync. This indicates that
// the stored metadata are invalid (e.g. has been manipulated) and don't
// belong to the current syncing client.
if (model_type_state_.progress_marker().data_type_id() !=
GetSpecificsFieldNumberFromModelType(type_)) {
UMA_HISTOGRAM_ENUMERATION("Sync.PersistedModelTypeIdMismatch",
ModelTypeHistogramValue(type_));
}
ClearMetadataAndResetState();
// The model is still ready to sync (with the same |bridge_|) - replay
// the initialization.
model_ready_to_sync_ = true;
// Notify the bridge sync is starting to simulate an enable event.
bridge_->OnSyncStarting(activation_request_);
}
// Cache GUID verification earlier above guarantees the user is the same.
model_type_state_.set_authenticated_account_id(
activation_request_.authenticated_account_id.ToString());
// For commit-only types, no updates are expected and hence we can consider
// initial_sync_done(), reflecting that sync is enabled.
if (commit_only_ && !model_type_state_.initial_sync_done()) {
sync_pb::ModelTypeState model_type_state = model_type_state_;
model_type_state.set_initial_sync_done(true);
OnFullUpdateReceived(model_type_state, UpdateResponseDataList());
DCHECK(IsTrackingMetadata());
}
auto activation_response = std::make_unique<DataTypeActivationResponse>();
activation_response->model_type_state = model_type_state_;
activation_response->type_processor =
std::make_unique<ModelTypeProcessorProxy>(
weak_ptr_factory_for_worker_.GetWeakPtr(),
base::SequencedTaskRunnerHandle::Get());
std::move(start_callback_).Run(std::move(activation_response));
}
bool ClientTagBasedModelTypeProcessor::IsConnected() const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
return !!worker_;
}
void ClientTagBasedModelTypeProcessor::OnSyncStopping(
SyncStopMetadataFate metadata_fate) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// Disabling sync for a type never happens before the model is ready to sync.
DCHECK(model_ready_to_sync_);
DCHECK(!start_callback_);
switch (metadata_fate) {
case KEEP_METADATA: {
bridge_->ApplyStopSyncChanges(
/*delete_metadata_change_list=*/nullptr);
// The model is still ready to sync (with the same |bridge_|) and same
// sync metadata.
ResetState(KEEP_METADATA);
DCHECK(model_ready_to_sync_);
break;
}
case CLEAR_METADATA: {
ClearMetadataAndResetState();
// The model is still ready to sync (with the same |bridge_|) - replay
// the initialization.
ModelReadyToSync(std::make_unique<MetadataBatch>());
DCHECK(model_ready_to_sync_);
break;
}
}
DCHECK(!IsConnected());
}
void ClientTagBasedModelTypeProcessor::ClearMetadataAndResetState() {
std::unique_ptr<MetadataChangeList> change_list;
// Clear metadata if MergeSyncData() was called before.
if (model_type_state_.initial_sync_done()) {
change_list = bridge_->CreateMetadataChangeList();
for (const auto& kv : entities_) {
change_list->ClearMetadata(kv.second->storage_key());
}
change_list->ClearModelTypeState();
} else {
// All changes before the initial sync is done are ignored and in fact they
// were never persisted by the bridge (prior to MergeSyncData), so we should
// be tracking no entities.
DCHECK(entities_.empty());
}
bridge_->ApplyStopSyncChanges(std::move(change_list));
// Reset all the internal state of the processor.
ResetState(CLEAR_METADATA);
}
bool ClientTagBasedModelTypeProcessor::IsTrackingMetadata() {
return model_type_state_.initial_sync_done();
}
std::string ClientTagBasedModelTypeProcessor::TrackedAccountId() {
// Returning non-empty here despite !IsTrackingMetadata() has weird semantics,
// e.g. initial updates are being fetched but we haven't received the response
// (i.e. prior to exercising MergeSyncData()). Let's be cautious and hide the
// account ID.
if (!IsTrackingMetadata()) {
return "";
}
return model_type_state_.authenticated_account_id();
}
std::string ClientTagBasedModelTypeProcessor::TrackedCacheGuid() {
// Returning non-empty here despite !IsTrackingMetadata() has weird semantics,
// e.g. initial updates are being fetched but we haven't received the response
// (i.e. prior to exercising MergeSyncData()). Let's be cautious and hide the
// cache GUID.
if (!IsTrackingMetadata()) {
return "";
}
return model_type_state_.cache_guid();
}
void ClientTagBasedModelTypeProcessor::ReportError(const ModelError& error) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// Ignore all errors after the first.
if (model_error_) {
return;
}
model_error_ = error;
if (dump_stack_) {
// Upload a stack trace if possible.
dump_stack_.Run();
}
if (IsConnected()) {
DisconnectSync();
}
// Shouldn't connect anymore.
start_callback_.Reset();
if (activation_request_.error_handler) {
// Tell sync about the error.
activation_request_.error_handler.Run(error);
}
// If the error handler isn't ready yet, we defer reporting the error until it
// becomes available which happens in ConnectIfReady() upon OnSyncStarting().
}
base::Optional<ModelError> ClientTagBasedModelTypeProcessor::GetError() const {
return model_error_;
}
base::WeakPtr<ModelTypeControllerDelegate>
ClientTagBasedModelTypeProcessor::GetControllerDelegate() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
return weak_ptr_factory_for_controller_.GetWeakPtr();
}
void ClientTagBasedModelTypeProcessor::ConnectSync(
std::unique_ptr<CommitQueue> worker) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DVLOG(1) << "Successfully connected " << ModelTypeToString(type_);
worker_ = std::move(worker);
NudgeForCommitIfNeeded();
}
void ClientTagBasedModelTypeProcessor::DisconnectSync() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(IsConnected());
DVLOG(1) << "Disconnecting sync for " << ModelTypeToString(type_);
weak_ptr_factory_for_worker_.InvalidateWeakPtrs();
worker_.reset();
for (const auto& kv : entities_) {
kv.second->ClearTransientSyncState();
}
}
void ClientTagBasedModelTypeProcessor::Put(
const std::string& storage_key,
std::unique_ptr<EntityData> data,
MetadataChangeList* metadata_change_list) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(IsAllowingChanges());
DCHECK(data);
DCHECK(!data->is_deleted());
DCHECK(!data->name.empty());
DCHECK(!data->specifics.has_encrypted());
DCHECK_EQ(type_, GetModelTypeFromSpecifics(data->specifics));
if (!model_type_state_.initial_sync_done()) {
// Ignore changes before the initial sync is done.
return;
}
ProcessorEntity* entity = GetEntityForStorageKey(storage_key);
if (entity == nullptr) {
// The bridge is creating a new entity. The bridge may or may not populate
// |data->client_tag_hash|, so let's ask for the client tag if needed.
if (data->client_tag_hash.value().empty()) {
data->client_tag_hash = GetClientTagHash(storage_key, *data);
} else if (bridge_->SupportsGetClientTag()) {
// If the Put() call already included the client tag, let's verify that
// it's consistent with the bridge's regular GetClientTag() function (if
// supported by the bridge).
DCHECK_EQ(
data->client_tag_hash,
ClientTagHash::FromUnhashed(type_, bridge_->GetClientTag(*data)));
}
// If another entity exists for the same client_tag_hash, it could be the
// case that the bridge has deleted this entity but the tombstone hasn't
// been sent to the server yet, and the bridge is trying to re-create this
// entity with a new storage key. In such case, we should reuse the existing
// entity.
entity = GetEntityForTagHash(data->client_tag_hash);
if (entity != nullptr) {
DCHECK(storage_key != entity->storage_key());
DCHECK(entity->metadata().is_deleted());
// Remove the old storage key from the processor, the entity, and the
// corresponding metadata record.
storage_key_to_tag_hash_.erase(entity->storage_key());
metadata_change_list->ClearMetadata(entity->storage_key());
entity->ClearStorageKey();
// Populate the new storage key in the existing entity.
entity->SetStorageKey(storage_key);
storage_key_to_tag_hash_[storage_key] = data->client_tag_hash;
} else {
if (data->creation_time.is_null())
data->creation_time = base::Time::Now();
if (data->modification_time.is_null())
data->modification_time = data->creation_time;
entity = CreateEntity(storage_key, *data);
}
} else if (entity->MatchesData(*data)) {
// Ignore changes that don't actually change anything.
UMA_HISTOGRAM_ENUMERATION("Sync.ModelTypeRedundantPut",
ModelTypeHistogramValue(type_));
return;
}
entity->MakeLocalChange(std::move(data));
metadata_change_list->UpdateMetadata(storage_key, entity->metadata());
NudgeForCommitIfNeeded();
}
void ClientTagBasedModelTypeProcessor::Delete(
const std::string& storage_key,
MetadataChangeList* metadata_change_list) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(IsAllowingChanges());
if (!model_type_state_.initial_sync_done()) {
// Ignore changes before the initial sync is done.
return;
}
ProcessorEntity* entity = GetEntityForStorageKey(storage_key);
if (entity == nullptr) {
// Missing is as good as deleted as far as the model is concerned.
return;
}
if (entity->Delete())
metadata_change_list->UpdateMetadata(storage_key, entity->metadata());
else
RemoveEntity(entity, metadata_change_list);
NudgeForCommitIfNeeded();
}
void ClientTagBasedModelTypeProcessor::UpdateStorageKey(
const EntityData& entity_data,
const std::string& storage_key,
MetadataChangeList* metadata_change_list) {
const ClientTagHash& client_tag_hash = entity_data.client_tag_hash;
DCHECK(!client_tag_hash.value().empty());
DCHECK(!storage_key.empty());
DCHECK(!bridge_->SupportsGetStorageKey());
DCHECK(model_type_state_.initial_sync_done());
ProcessorEntity* entity = GetEntityForTagHash(client_tag_hash);
DCHECK(entity);
DCHECK(entity->storage_key().empty());
DCHECK(storage_key_to_tag_hash_.find(storage_key) ==
storage_key_to_tag_hash_.end());
storage_key_to_tag_hash_[storage_key] = client_tag_hash;
entity->SetStorageKey(storage_key);
metadata_change_list->UpdateMetadata(storage_key, entity->metadata());
}
void ClientTagBasedModelTypeProcessor::UntrackEntityForStorageKey(
const std::string& storage_key) {
DCHECK(model_type_state_.initial_sync_done());
// Look-up the client tag hash.
auto iter = storage_key_to_tag_hash_.find(storage_key);
if (iter == storage_key_to_tag_hash_.end()) {
// Missing is as good as untracked as far as the model is concerned.
return;
}
entities_.erase(iter->second);
storage_key_to_tag_hash_.erase(iter);
}
void ClientTagBasedModelTypeProcessor::UntrackEntityForClientTagHash(
const ClientTagHash& client_tag_hash) {
DCHECK(model_type_state_.initial_sync_done());
DCHECK(!client_tag_hash.value().empty());
// Is a no-op if no entity for |client_tag_hash| is tracked.
DCHECK(GetEntityForTagHash(client_tag_hash) == nullptr ||
GetEntityForTagHash(client_tag_hash)->storage_key().empty());
entities_.erase(client_tag_hash);
}
bool ClientTagBasedModelTypeProcessor::IsEntityUnsynced(
const std::string& storage_key) {
ProcessorEntity* entity = GetEntityForStorageKey(storage_key);
if (entity == nullptr) {
return false;
}
return entity->IsUnsynced();
}
base::Time ClientTagBasedModelTypeProcessor::GetEntityCreationTime(
const std::string& storage_key) const {
const ProcessorEntity* entity = GetEntityForStorageKey(storage_key);
if (entity == nullptr) {
return base::Time();
}
return ProtoTimeToTime(entity->metadata().creation_time());
}
base::Time ClientTagBasedModelTypeProcessor::GetEntityModificationTime(
const std::string& storage_key) const {
const ProcessorEntity* entity = GetEntityForStorageKey(storage_key);
if (entity == nullptr) {
return base::Time();
}
return ProtoTimeToTime(entity->metadata().modification_time());
}
void ClientTagBasedModelTypeProcessor::NudgeForCommitIfNeeded() {
// Don't bother sending anything if there's no one to send to.
if (!IsConnected())
return;
// Don't send anything if the type is not ready to handle commits.
if (!model_type_state_.initial_sync_done())
return;
// Nudge worker if there are any entities with local changes.0
if (HasLocalChanges())
worker_->NudgeForCommit();
}
bool ClientTagBasedModelTypeProcessor::HasLocalChanges() const {
for (const auto& kv : entities_) {
ProcessorEntity* entity = kv.second.get();
if (entity->RequiresCommitRequest()) {
return true;
}
}
return false;
}
void ClientTagBasedModelTypeProcessor::GetLocalChanges(
size_t max_entries,
GetLocalChangesCallback callback) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK_GT(max_entries, 0U);
// If there is a model error, it must have been reported already but hasn't
// reached the sync engine yet. In this case return directly to avoid
// interactions with the bridge.
if (model_error_) {
std::move(callback).Run(CommitRequestDataList());
return;
}
std::vector<std::string> entities_requiring_data;
for (const auto& kv : entities_) {
ProcessorEntity* entity = kv.second.get();
if (entity->RequiresCommitData()) {
entities_requiring_data.push_back(entity->storage_key());
}
}
if (!entities_requiring_data.empty()) {
// Make a copy for the callback so that we can check if everything was
// loaded successfully.
std::unordered_set<std::string> storage_keys_to_load(
entities_requiring_data.begin(), entities_requiring_data.end());
bridge_->GetData(
std::move(entities_requiring_data),
base::BindOnce(&ClientTagBasedModelTypeProcessor::OnPendingDataLoaded,
weak_ptr_factory_for_worker_.GetWeakPtr(), max_entries,
std::move(callback), std::move(storage_keys_to_load)));
} else {
// All commit data can be available in memory for those entries passed in
// the .put() method.
CommitLocalChanges(max_entries, std::move(callback));
}
}
void ClientTagBasedModelTypeProcessor::OnCommitCompleted(
const sync_pb::ModelTypeState& model_type_state,
const CommitResponseDataList& response_list) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(!model_error_);
std::unique_ptr<MetadataChangeList> metadata_change_list =
bridge_->CreateMetadataChangeList();
EntityChangeList entity_change_list;
model_type_state_ = model_type_state;
metadata_change_list->UpdateModelTypeState(model_type_state_);
for (const CommitResponseData& data : response_list) {
ProcessorEntity* entity = GetEntityForTagHash(data.client_tag_hash);
if (entity == nullptr) {
NOTREACHED() << "Received commit response for missing item."
<< " type: " << ModelTypeToString(type_)
<< " client_tag_hash: " << data.client_tag_hash;
continue;
}
entity->ReceiveCommitResponse(data, commit_only_, type_);
if (commit_only_) {
if (!entity->IsUnsynced()) {
entity_change_list.push_back(
EntityChange::CreateDelete(entity->storage_key()));
RemoveEntity(entity, metadata_change_list.get());
}
// If unsynced, we could theoretically update persisted metadata to have
// more accurate bookkeeping. However, this wouldn't actually do anything
// useful, we still need to commit again, and we're not going to include
// any of the changing metadata in the commit message. So skip updating
// metadata.
} else if (entity->CanClearMetadata()) {
RemoveEntity(entity, metadata_change_list.get());
} else {
metadata_change_list->UpdateMetadata(entity->storage_key(),
entity->metadata());
}
}
// Entities not mentioned in response_list weren't committed. We should reset
// their commit_requested_sequence_number so they are committed again on next
// sync cycle.
// TODO(crbug.com/740757): Iterating over all entities is inefficient. It is
// better to remember in GetLocalChanges which entities are being committed
// and adjust only them. Alternatively we can make worker return commit status
// for all entities, not just successful ones and use that to lookup entities
// to clear.
for (auto& entity_kv : entities_) {
entity_kv.second->ClearTransientSyncState();
}
base::Optional<ModelError> error = bridge_->ApplySyncChanges(
std::move(metadata_change_list), std::move(entity_change_list));
if (error) {
ReportError(*error);
}
}
// Returns whether the state has a version_watermark based GC directive, which
// tells us to clear all sync data that's stored locally.
bool HasClearAllDirective(const sync_pb::ModelTypeState& model_type_state) {
return model_type_state.progress_marker()
.gc_directive()
.has_version_watermark();
}
void ClientTagBasedModelTypeProcessor::OnUpdateReceived(
const sync_pb::ModelTypeState& model_type_state,
UpdateResponseDataList updates) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(model_ready_to_sync_);
DCHECK(!model_error_);
if (!ValidateUpdate(model_type_state, updates)) {
return;
}
base::Optional<ModelError> error;
// We call OnFullUpdateReceived when it's the first sync cycle, or when
// we get a garbage collection directive from the server telling us to clear
// all data by version watermark.
// This means that if we receive a version watermark based GC directive, we
// always clear all data. We do this to allow the server to replace all data
// on the client, without having to know exactly which entities the client
// has.
bool is_initial_sync = !model_type_state_.initial_sync_done();
if (is_initial_sync || HasClearAllDirective(model_type_state)) {
error = OnFullUpdateReceived(model_type_state, std::move(updates));
} else {
error = OnIncrementalUpdateReceived(model_type_state, std::move(updates));
}
if (error) {
ReportError(*error);
return;
}
if (is_initial_sync) {
base::TimeDelta configuration_duration =
base::Time::Now() - activation_request_.configuration_start_time;
base::UmaHistogramCustomTimes(
base::StringPrintf(
"Sync.ModelTypeConfigurationTime.%s.%s",
(activation_request_.sync_mode == SyncMode::kTransportOnly)
? "Ephemeral"
: "Persistent",
ModelTypeToHistogramSuffix(type_)),
configuration_duration,
/*min=*/base::TimeDelta::FromMilliseconds(1),
/*min=*/base::TimeDelta::FromSeconds(60),
/*buckets=*/50);
}
// If there were entities with empty storage keys, they should have been
// updated by bridge as part of ApplySyncChanges.
DCHECK(AllStorageKeysPopulated());
// There may be new reasons to commit by the time this function is done.
NudgeForCommitIfNeeded();
}
bool ClientTagBasedModelTypeProcessor::ValidateUpdate(
const sync_pb::ModelTypeState& model_type_state,
const UpdateResponseDataList& updates) {
if (!model_type_state_.initial_sync_done()) {
// Due to uss_migrator, initial sync (when migrating from non-USS) does not
// contain any gc directives. Thus, we cannot expect the conditions below to
// be satisfied. It is okay to skip the check as for an initial sync, the gc
// directive does not make any semantical difference.
return true;
}
if (HasClearAllDirective(model_type_state) &&
bridge_->SupportsIncrementalUpdates()) {
ReportError(ModelError(FROM_HERE,
"Received an update with version watermark for "
"bridge that supports incremental updates"));
return false;
} else if (!HasClearAllDirective(model_type_state) &&
!bridge_->SupportsIncrementalUpdates() && !updates.empty()) {
// We receive an update without clear all directive from the server to
// indicate no data has changed. This contradicts with the list of updates
// being non-empty, the bridge cannot handle it and we need to fail here.
// (If the last condition does not hold true and the list of updates is
// empty, we still need to pass the empty update to the bridge because the
// progress marker might have changed.)
ReportError(ModelError(FROM_HERE,
"Received a non-empty update without version "
"watermark for bridge that does not support "
"incremental updates"));
return false;
}
return true;
}
base::Optional<ModelError>
ClientTagBasedModelTypeProcessor::OnFullUpdateReceived(
const sync_pb::ModelTypeState& model_type_state,
UpdateResponseDataList updates) {
std::unique_ptr<MetadataChangeList> metadata_changes =
bridge_->CreateMetadataChangeList();
DCHECK(model_ready_to_sync_);
// Check that the worker correctly marked initial sync as done
// for this update.
DCHECK(model_type_state.initial_sync_done());
if (HasClearAllDirective(model_type_state)) {
ExpireAllEntries(metadata_changes.get());
} else {
// Ensure that this is the initial sync, and it was not already marked done.
DCHECK(!model_type_state_.initial_sync_done());
}
// Given that we either just removed all existing sync entities (in the full
// update case), or we just started sync for the first time, we should not
// have any entities here.
DCHECK(entities_.empty());
EntityChangeList entity_data;
model_type_state_ = model_type_state;
metadata_changes->UpdateModelTypeState(model_type_state_);
for (const std::unique_ptr<syncer::UpdateResponseData>& update : updates) {
DCHECK(update);
const ClientTagHash& client_tag_hash = update->entity->client_tag_hash;
if (client_tag_hash.value().empty()) {
// Ignore updates missing a client tag hash (e.g. permanent nodes).
continue;
}
if (update->entity->is_deleted()) {
DLOG(WARNING) << "Ignoring tombstone found during initial update: "
<< "client_tag_hash = " << client_tag_hash << " for "
<< ModelTypeToString(type_);
continue;
}
if (bridge_->SupportsGetClientTag() &&
client_tag_hash != ClientTagHash::FromUnhashed(
type_, bridge_->GetClientTag(*update->entity))) {
DLOG(WARNING) << "Received unexpected client tag hash: "
<< client_tag_hash << " for " << ModelTypeToString(type_);
continue;
}
#if DCHECK_IS_ON()
// TODO(crbug.com/872360): The CreateEntity() call below assumes that no
// entity with this client_tag_hash exists already, but in some cases it
// does.
if (entities_.find(client_tag_hash) != entities_.end()) {
DLOG(ERROR) << "Received duplicate client_tag_hash " << client_tag_hash
<< " for " << ModelTypeToString(type_);
}
#endif // DCHECK_IS_ON()
ProcessorEntity* entity = CreateEntity(*update->entity);
entity->RecordAcceptedUpdate(*update);
const std::string& storage_key = entity->storage_key();
entity_data.push_back(
EntityChange::CreateAdd(storage_key, std::move(update->entity)));
if (!storage_key.empty())
metadata_changes->UpdateMetadata(storage_key, entity->metadata());
}
// Let the bridge handle associating and merging the data.
base::Optional<ModelError> error = bridge_->MergeSyncData(
std::move(metadata_changes), std::move(entity_data));
return error;
}
base::Optional<ModelError>
ClientTagBasedModelTypeProcessor::OnIncrementalUpdateReceived(
const sync_pb::ModelTypeState& model_type_state,
UpdateResponseDataList updates) {
DCHECK(model_ready_to_sync_);
DCHECK(model_type_state.initial_sync_done());
ClientTagBasedRemoteUpdateHandler updates_handler(
type_, bridge_, &model_type_state_, &storage_key_to_tag_hash_,
&entities_);
return updates_handler.ProcessIncrementalUpdate(model_type_state,
std::move(updates));
}
void ClientTagBasedModelTypeProcessor::OnPendingDataLoaded(
size_t max_entries,
GetLocalChangesCallback callback,
std::unordered_set<std::string> storage_keys_to_load,
std::unique_ptr<DataBatch> data_batch) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// The model already experienced an error; abort;
if (model_error_)
return;
ConsumeDataBatch(std::move(storage_keys_to_load), std::move(data_batch));
ConnectIfReady();
CommitLocalChanges(max_entries, std::move(callback));
}
void ClientTagBasedModelTypeProcessor::ConsumeDataBatch(
std::unordered_set<std::string> storage_keys_to_load,
std::unique_ptr<DataBatch> data_batch) {
while (data_batch->HasNext()) {
KeyAndData data = data_batch->Next();
const std::string& storage_key = data.first;
storage_keys_to_load.erase(storage_key);
ProcessorEntity* entity = GetEntityForStorageKey(storage_key);
// If the entity wasn't deleted or updated with new commit.
if (entity != nullptr && entity->RequiresCommitData()) {
// SetCommitData will update EntityData's fields with values from
// metadata.
entity->SetCommitData(std::move(data.second));
}
}
// Detect failed loads that shouldn't have failed.
std::vector<std::string> storage_keys_to_untrack;
for (const std::string& storage_key : storage_keys_to_load) {
ProcessorEntity* entity = GetEntityForStorageKey(storage_key);
if (entity == nullptr || entity->metadata().is_deleted()) {
// Skip entities that are not tracked any more or already marked for
// deletion.
continue;
}
// This scenario indicates a bug in the bridge, which didn't properly
// propagate a local deletion to the processor, either in the form of
// Delete() or UntrackEntity(). As a workaround to avoid negative side
// effects of this inconsistent state, we treat it as if UntrackEntity()
// had been called.
storage_keys_to_untrack.push_back(storage_key);
UMA_HISTOGRAM_ENUMERATION("Sync.ModelTypeOrphanMetadata",
ModelTypeHistogramValue(type_));
}
if (storage_keys_to_untrack.empty()) {
return;
}
DCHECK(model_ready_to_sync_);
DCHECK(IsTrackingMetadata());
std::unique_ptr<MetadataChangeList> metadata_changes =
bridge_->CreateMetadataChangeList();
for (const std::string& storage_key : storage_keys_to_untrack) {
UntrackEntityForStorageKey(storage_key);
metadata_changes->ClearMetadata(storage_key);
}
bridge_->ApplySyncChanges(std::move(metadata_changes), EntityChangeList());
}
void ClientTagBasedModelTypeProcessor::CommitLocalChanges(
size_t max_entries,
GetLocalChangesCallback callback) {
DCHECK(!model_error_);
CommitRequestDataList commit_requests;
// TODO(rlarocque): Do something smarter than iterate here.
for (const auto& kv : entities_) {
ProcessorEntity* entity = kv.second.get();
if (entity->RequiresCommitRequest() && !entity->RequiresCommitData()) {
auto request = std::make_unique<CommitRequestData>();
entity->InitializeCommitRequestData(request.get());
commit_requests.push_back(std::move(request));
if (commit_requests.size() >= max_entries) {
break;
}
}
}
std::move(callback).Run(std::move(commit_requests));
}
ClientTagHash ClientTagBasedModelTypeProcessor::GetClientTagHash(
const std::string& storage_key,
const EntityData& data) const {
auto iter = storage_key_to_tag_hash_.find(storage_key);
DCHECK(bridge_->SupportsGetClientTag());
return iter == storage_key_to_tag_hash_.end()
? ClientTagHash::FromUnhashed(type_, bridge_->GetClientTag(data))
: iter->second;
}
ProcessorEntity* ClientTagBasedModelTypeProcessor::GetEntityForStorageKey(
const std::string& storage_key) {
auto iter = storage_key_to_tag_hash_.find(storage_key);
return iter == storage_key_to_tag_hash_.end()
? nullptr
: GetEntityForTagHash(iter->second);
}
const ProcessorEntity* ClientTagBasedModelTypeProcessor::GetEntityForStorageKey(
const std::string& storage_key) const {
auto iter = storage_key_to_tag_hash_.find(storage_key);
return iter == storage_key_to_tag_hash_.end()
? nullptr
: GetEntityForTagHash(iter->second);
}
ProcessorEntity* ClientTagBasedModelTypeProcessor::GetEntityForTagHash(
const ClientTagHash& tag_hash) {
auto it = entities_.find(tag_hash);
return it != entities_.end() ? it->second.get() : nullptr;
}
const ProcessorEntity* ClientTagBasedModelTypeProcessor::GetEntityForTagHash(
const ClientTagHash& tag_hash) const {
auto it = entities_.find(tag_hash);
return it != entities_.end() ? it->second.get() : nullptr;
}
ProcessorEntity* ClientTagBasedModelTypeProcessor::CreateEntity(
const std::string& storage_key,
const EntityData& data) {
DCHECK(!data.client_tag_hash.value().empty());
DCHECK(entities_.find(data.client_tag_hash) == entities_.end());
DCHECK(!bridge_->SupportsGetStorageKey() || !storage_key.empty());
DCHECK(storage_key.empty() || storage_key_to_tag_hash_.find(storage_key) ==
storage_key_to_tag_hash_.end());
std::unique_ptr<ProcessorEntity> entity = ProcessorEntity::CreateNew(
storage_key, data.client_tag_hash, data.id, data.creation_time);
ProcessorEntity* entity_ptr = entity.get();
entities_[data.client_tag_hash] = std::move(entity);
if (!storage_key.empty())
storage_key_to_tag_hash_[storage_key] = data.client_tag_hash;
return entity_ptr;
}
ProcessorEntity* ClientTagBasedModelTypeProcessor::CreateEntity(
const EntityData& data) {
if (bridge_->SupportsGetClientTag()) {
DCHECK_EQ(data.client_tag_hash,
ClientTagHash::FromUnhashed(type_, bridge_->GetClientTag(data)));
}
std::string storage_key;
if (bridge_->SupportsGetStorageKey())
storage_key = bridge_->GetStorageKey(data);
return CreateEntity(storage_key, data);
}
bool ClientTagBasedModelTypeProcessor::AllStorageKeysPopulated() const {
for (const auto& kv : entities_) {
ProcessorEntity* entity = kv.second.get();
if (entity->storage_key().empty())
return false;
}
return true;
}
size_t ClientTagBasedModelTypeProcessor::EstimateMemoryUsage() const {
using base::trace_event::EstimateMemoryUsage;
size_t memory_usage = 0;
memory_usage += EstimateMemoryUsage(model_type_state_);
memory_usage += EstimateMemoryUsage(entities_);
memory_usage += EstimateMemoryUsage(storage_key_to_tag_hash_);
if (bridge_) {
memory_usage += bridge_->EstimateSyncOverheadMemoryUsage();
}
return memory_usage;
}
bool ClientTagBasedModelTypeProcessor::HasLocalChangesForTest() const {
return HasLocalChanges();
}
bool ClientTagBasedModelTypeProcessor::IsTrackingEntityForTest(
const std::string& storage_key) const {
return storage_key_to_tag_hash_.count(storage_key) != 0;
}
bool ClientTagBasedModelTypeProcessor::IsModelReadyToSyncForTest() const {
return model_ready_to_sync_;
}
void ClientTagBasedModelTypeProcessor::ExpireAllEntries(
MetadataChangeList* metadata_changes) {
DCHECK(metadata_changes);
std::vector<std::string> storage_key_to_be_deleted;
for (const auto& kv : entities_) {
ProcessorEntity* entity = kv.second.get();
if (!entity->IsUnsynced()) {
storage_key_to_be_deleted.push_back(entity->storage_key());
}
}
// Delete selected keys while not iterating over |entities_|.
for (const std::string& key : storage_key_to_be_deleted) {
metadata_changes->ClearMetadata(key);
auto iter = storage_key_to_tag_hash_.find(key);
DCHECK(iter != storage_key_to_tag_hash_.end());
entities_.erase(iter->second);
storage_key_to_tag_hash_.erase(iter);
}
}
void ClientTagBasedModelTypeProcessor::RemoveEntity(
ProcessorEntity* entity,
MetadataChangeList* metadata_change_list) {
metadata_change_list->ClearMetadata(entity->storage_key());
storage_key_to_tag_hash_.erase(entity->storage_key());
entities_.erase(
ClientTagHash::FromHashed(entity->metadata().client_tag_hash()));
}
void ClientTagBasedModelTypeProcessor::ResetState(
SyncStopMetadataFate metadata_fate) {
// This should reset all mutable fields (except for |bridge_|).
worker_.reset();
switch (metadata_fate) {
case KEEP_METADATA:
break;
case CLEAR_METADATA:
model_ready_to_sync_ = false;
entities_.clear();
storage_key_to_tag_hash_.clear();
model_type_state_ = sync_pb::ModelTypeState();
model_type_state_.mutable_progress_marker()->set_data_type_id(
GetSpecificsFieldNumberFromModelType(type_));
break;
}
// Do not let any delayed callbacks to be called.
weak_ptr_factory_for_worker_.InvalidateWeakPtrs();
}
void ClientTagBasedModelTypeProcessor::GetAllNodesForDebugging(
AllNodesCallback callback) {
if (!bridge_)
return;
bridge_->GetAllDataForDebugging(base::BindOnce(
&ClientTagBasedModelTypeProcessor::MergeDataWithMetadataForDebugging,
weak_ptr_factory_for_worker_.GetWeakPtr(), std::move(callback)));
}
void ClientTagBasedModelTypeProcessor::MergeDataWithMetadataForDebugging(
AllNodesCallback callback,
std::unique_ptr<DataBatch> batch) {
std::unique_ptr<base::ListValue> all_nodes =
std::make_unique<base::ListValue>();
std::string type_string = ModelTypeToString(type_);
while (batch->HasNext()) {
KeyAndData key_and_data = batch->Next();
std::unique_ptr<EntityData> data = std::move(key_and_data.second);
// There is an overlap between EntityData fields from the bridge and
// EntityMetadata fields from the processor's entity, metadata is
// the authoritative source of truth.
ProcessorEntity* entity = GetEntityForStorageKey(key_and_data.first);
// |entity| could be null if there are some unapplied changes.
if (entity != nullptr) {
const sync_pb::EntityMetadata& metadata = entity->metadata();
// Set id value as directory, "s" means server.
data->id = "s" + metadata.server_id();
data->creation_time = ProtoTimeToTime(metadata.creation_time());
data->modification_time = ProtoTimeToTime(metadata.modification_time());
data->client_tag_hash =
ClientTagHash::FromHashed(metadata.client_tag_hash());
}
std::unique_ptr<base::DictionaryValue> node = data->ToDictionaryValue();
node->SetString("modelType", type_string);
// Copy the whole metadata message into the dictionary (if existing).
if (entity != nullptr) {
node->Set("metadata", EntityMetadataToValue(entity->metadata()));
}
all_nodes->Append(std::move(node));
}
// Create a permanent folder for this data type. Since sync server no longer
// create root folders, and USS won't migrate root folders from directory, we
// create root folders for each data type here.
std::unique_ptr<base::DictionaryValue> rootnode =
std::make_unique<base::DictionaryValue>();
// Function isTypeRootNode in sync_node_browser.js use PARENT_ID and
// UNIQUE_SERVER_TAG to check if the node is root node. isChildOf in
// sync_node_browser.js uses modelType to check if root node is parent of real
// data node. NON_UNIQUE_NAME will be the name of node to display.
rootnode->SetString("PARENT_ID", "r");
rootnode->SetString("UNIQUE_SERVER_TAG", type_string);
rootnode->SetBoolean("IS_DIR", true);
rootnode->SetString("modelType", type_string);
rootnode->SetString("NON_UNIQUE_NAME", type_string);
all_nodes->Append(std::move(rootnode));
std::move(callback).Run(type_, std::move(all_nodes));
}
void ClientTagBasedModelTypeProcessor::GetStatusCountersForDebugging(
StatusCountersCallback callback) {
StatusCounters counters;
counters.num_entries_and_tombstones = entities_.size();
counters.num_entries = CountNonTombstoneEntries(entities_);
std::move(callback).Run(type_, counters);
}
void ClientTagBasedModelTypeProcessor::RecordMemoryUsageAndCountsHistograms() {
SyncRecordModelTypeMemoryHistogram(type_, EstimateMemoryUsage());
SyncRecordModelTypeCountHistogram(type_, CountNonTombstoneEntries(entities_));
}
} // namespace syncer