blob: 4c34261351e0f1700329959d1557d0096a3b1b4b [file] [log] [blame]
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/sync/model_impl/syncable_service_based_bridge.h"
#include <stdint.h>
#include <utility>
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/location.h"
#include "base/trace_event/memory_usage_estimator.h"
#include "components/sync/base/data_type_histogram.h"
#include "components/sync/base/hash_util.h"
#include "components/sync/model/mutable_data_batch.h"
#include "components/sync/model/sync_change.h"
#include "components/sync/model/sync_error_factory.h"
#include "components/sync/model/syncable_service.h"
#include "components/sync/model_impl/client_tag_based_model_type_processor.h"
#include "components/sync/protocol/persisted_entity_data.pb.h"
#include "components/sync/protocol/proto_memory_estimations.h"
namespace syncer {
namespace {
// Same as kInvalidId in syncable/base_node.h.
constexpr int64_t kInvalidNodeId = 0;
std::unique_ptr<EntityData> ConvertPersistedToEntityData(
const std::string& client_tag_hash,
sync_pb::PersistedEntityData data) {
DCHECK(!client_tag_hash.empty());
auto entity_data = std::make_unique<EntityData>();
entity_data->non_unique_name = std::move(*data.mutable_non_unique_name());
entity_data->specifics = std::move(*data.mutable_specifics());
entity_data->client_tag_hash = client_tag_hash;
// Purposefully crash if we have client only data, as this could result in
// sending password in plain text.
CHECK(!entity_data->specifics.password().has_client_only_encrypted_data());
return entity_data;
}
sync_pb::PersistedEntityData CreatePersistedFromEntityData(
const EntityData& entity_data) {
sync_pb::PersistedEntityData persisted;
persisted.set_non_unique_name(entity_data.non_unique_name);
*persisted.mutable_specifics() = entity_data.specifics;
return persisted;
}
sync_pb::PersistedEntityData CreatePersistedFromSyncData(
const SyncDataLocal& sync_data) {
DCHECK(!sync_data.GetTitle().empty());
sync_pb::PersistedEntityData persisted;
persisted.set_non_unique_name(sync_data.GetTitle());
*persisted.mutable_specifics() = sync_data.GetSpecifics();
return persisted;
}
SyncChange::SyncChangeType ConvertToSyncChangeType(
EntityChange::ChangeType type) {
switch (type) {
case EntityChange::ACTION_DELETE:
return SyncChange::ACTION_DELETE;
case EntityChange::ACTION_ADD:
return SyncChange::ACTION_ADD;
case EntityChange::ACTION_UPDATE:
return SyncChange::ACTION_UPDATE;
}
NOTREACHED();
return SyncChange::ACTION_INVALID;
}
base::Optional<ModelError> ConvertToModelError(const SyncError& sync_error) {
if (sync_error.IsSet()) {
return ModelError(sync_error.location(), sync_error.message());
}
return base::nullopt;
}
// Parses the content of |record_list| into |*in_memory_store|. The output
// parameter is first for binding purposes.
base::Optional<ModelError> ParseInMemoryStoreOnBackendSequence(
SyncableServiceBasedBridge::InMemoryStore* in_memory_store,
std::unique_ptr<ModelTypeStore::RecordList> record_list) {
DCHECK(in_memory_store);
DCHECK(in_memory_store->empty());
DCHECK(record_list);
for (const ModelTypeStore::Record& record : *record_list) {
sync_pb::PersistedEntityData persisted_entity;
if (!persisted_entity.ParseFromString(record.value)) {
return ModelError(FROM_HERE, "Failed deserializing data.");
}
in_memory_store->emplace(record.id,
std::move(*persisted_entity.mutable_specifics()));
}
return base::nullopt;
}
// Object to propagate local changes to the bridge, which will ultimately
// propagate them to the server.
class LocalChangeProcessor : public SyncChangeProcessor {
public:
LocalChangeProcessor(
ModelType type,
const base::RepeatingCallback<void(const base::Optional<ModelError>&)>&
error_callback,
ModelTypeStore* store,
SyncableServiceBasedBridge::InMemoryStore* in_memory_store,
ModelTypeChangeProcessor* other)
: type_(type),
error_callback_(error_callback),
store_(store),
in_memory_store_(in_memory_store),
other_(other) {
DCHECK(store);
DCHECK(other);
}
~LocalChangeProcessor() override {}
SyncError ProcessSyncChanges(const base::Location& from_here,
const SyncChangeList& change_list) override {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// Reject changes if the processor has already experienced errors.
base::Optional<ModelError> processor_error = other_->GetError();
if (processor_error) {
return SyncError(processor_error->location(),
SyncError::UNRECOVERABLE_ERROR,
processor_error->message(), type_);
}
std::unique_ptr<ModelTypeStore::WriteBatch> batch =
store_->CreateWriteBatch();
for (const SyncChange& change : change_list) {
switch (change.change_type()) {
case SyncChange::ACTION_INVALID:
NOTREACHED() << " from " << change.location().ToString();
break;
case SyncChange::ACTION_ADD:
case SyncChange::ACTION_UPDATE: {
DCHECK_EQ(type_, change.sync_data().GetDataType());
DCHECK(change.sync_data().IsLocal())
<< " from " << change.location().ToString();
SyncDataLocal sync_data(change.sync_data());
DCHECK(sync_data.IsValid())
<< " from " << change.location().ToString();
const std::string storage_key =
GenerateSyncableHash(type_, sync_data.GetTag());
DCHECK(!storage_key.empty());
(*in_memory_store_)[storage_key] = sync_data.GetSpecifics();
sync_pb::PersistedEntityData persisted_entity =
CreatePersistedFromSyncData(sync_data);
batch->WriteData(storage_key, persisted_entity.SerializeAsString());
other_->Put(
storage_key,
ConvertPersistedToEntityData(
/*client_tag_hash=*/storage_key, std::move(persisted_entity)),
batch->GetMetadataChangeList());
break;
}
case SyncChange::ACTION_DELETE: {
std::string storage_key;
// Both SyncDataLocal and SyncDataRemote are allowed for deletions.
if (change.sync_data().IsLocal()) {
SyncDataLocal sync_data(change.sync_data());
DCHECK(sync_data.IsValid())
<< " from " << change.location().ToString();
storage_key = GenerateSyncableHash(type_, sync_data.GetTag());
} else {
SyncDataRemote sync_data(change.sync_data());
storage_key = sync_data.GetClientTagHash();
}
DCHECK(!storage_key.empty())
<< " from " << change.location().ToString();
if (IsActOnceDataType(type_)) {
if (other_->IsEntityUnsynced(storage_key)) {
// Ignore the local deletion if the entity hasn't been committed
// yet, similarly to how WriteNode::Drop() does it.
continue;
}
batch->GetMetadataChangeList()->ClearMetadata(storage_key);
other_->UntrackEntityForStorageKey(storage_key);
} else {
other_->Delete(storage_key, batch->GetMetadataChangeList());
}
in_memory_store_->erase(storage_key);
batch->DeleteData(storage_key);
break;
}
}
}
store_->CommitWriteBatch(std::move(batch), error_callback_);
return SyncError();
}
SyncDataList GetAllSyncData(ModelType type) const override {
// This function is not supported and not exercised by the relevant
// datatypes (that are integrated with this bridge).
NOTREACHED();
return SyncDataList();
}
SyncError UpdateDataTypeContext(ModelType type,
ContextRefreshStatus refresh_status,
const std::string& context) override {
// This function is not supported and not exercised by anyone, since
// the USS flow doesn't use SharedChangeProcessor.
// TODO(crbug.com/870624): Remove this function altogether when the
// directory codebase is removed.
NOTREACHED();
return SyncError();
}
void AddLocalChangeObserver(LocalChangeObserver* observer) override {
// This function is not supported and not exercised by the relevant
// datatypes (that are integrated with this bridge).
NOTREACHED();
}
void RemoveLocalChangeObserver(LocalChangeObserver* observer) override {
// This function is not supported and not exercised by the relevant
// datatypes (that are integrated with this bridge).
NOTREACHED();
}
private:
const ModelType type_;
const base::RepeatingCallback<void(const base::Optional<ModelError>&)>
error_callback_;
ModelTypeStore* const store_;
SyncableServiceBasedBridge::InMemoryStore* const in_memory_store_;
ModelTypeChangeProcessor* const other_;
SEQUENCE_CHECKER(sequence_checker_);
DISALLOW_COPY_AND_ASSIGN(LocalChangeProcessor);
};
class SyncErrorFactoryImpl : public SyncErrorFactory {
public:
explicit SyncErrorFactoryImpl(ModelType type) : type_(type) {}
~SyncErrorFactoryImpl() override = default;
SyncError CreateAndUploadError(const base::Location& location,
const std::string& message) override {
// Uploading is not supported, we simply return the error.
return SyncError(location, SyncError::DATATYPE_ERROR, message, type_);
}
private:
const ModelType type_;
DISALLOW_COPY_AND_ASSIGN(SyncErrorFactoryImpl);
};
} // namespace
SyncableServiceBasedBridge::SyncableServiceBasedBridge(
ModelType type,
OnceModelTypeStoreFactory store_factory,
std::unique_ptr<ModelTypeChangeProcessor> change_processor,
SyncableService* syncable_service)
: ModelTypeSyncBridge(std::move(change_processor)),
type_(type),
syncable_service_(syncable_service),
store_factory_(std::move(store_factory)),
syncable_service_started_(false),
weak_ptr_factory_(this) {
DCHECK(store_factory_);
DCHECK(syncable_service_);
}
SyncableServiceBasedBridge::~SyncableServiceBasedBridge() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// Stop the syncable service to make sure instances of LocalChangeProcessor
// are not continued to be used.
if (syncable_service_started_) {
syncable_service_->StopSyncing(type_);
}
}
std::unique_ptr<MetadataChangeList>
SyncableServiceBasedBridge::CreateMetadataChangeList() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
return ModelTypeStore::WriteBatch::CreateMetadataChangeList();
}
void SyncableServiceBasedBridge::OnSyncStarting(
const DataTypeActivationRequest& request) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(!syncable_service_started_);
if (!store_factory_) {
// Sync was have been started earlier, and |store_| is guaranteed to be
// initialized because stopping of the datatype cannot be completed before
// ModelReadyToSync().
DCHECK(store_);
ReportErrorIfSet(MaybeStartSyncableService());
return;
}
syncable_service_->WaitUntilReadyToSync(
base::BindOnce(&SyncableServiceBasedBridge::OnSyncableServiceReady,
weak_ptr_factory_.GetWeakPtr()));
}
base::Optional<ModelError> SyncableServiceBasedBridge::MergeSyncData(
std::unique_ptr<MetadataChangeList> metadata_change_list,
EntityChangeList entity_change_list) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(store_);
DCHECK(change_processor()->IsTrackingMetadata());
DCHECK(!syncable_service_started_);
DCHECK(in_memory_store_.empty());
StoreAndConvertRemoteChanges(std::move(metadata_change_list),
std::move(entity_change_list));
// We ignore the output of previous call of StoreAndConvertRemoteChanges() at
// this point and let MaybeStartSyncableService() read from
// |in_memory_store_|, which has been updated above as part of
// StoreAndConvertRemoteChanges().
return MaybeStartSyncableService();
}
base::Optional<ModelError> SyncableServiceBasedBridge::ApplySyncChanges(
std::unique_ptr<MetadataChangeList> metadata_change_list,
EntityChangeList entity_change_list) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(store_);
DCHECK(change_processor()->IsTrackingMetadata());
DCHECK(syncable_service_started_);
SyncChangeList sync_change_list = StoreAndConvertRemoteChanges(
std::move(metadata_change_list), std::move(entity_change_list));
if (sync_change_list.empty()) {
return base::nullopt;
}
return ConvertToModelError(
syncable_service_->ProcessSyncChanges(FROM_HERE, sync_change_list));
}
void SyncableServiceBasedBridge::GetData(StorageKeyList storage_keys,
DataCallback callback) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(store_);
store_->ReadData(
storage_keys,
base::BindOnce(&SyncableServiceBasedBridge::OnReadDataForProcessor,
weak_ptr_factory_.GetWeakPtr(), std::move(callback)));
}
void SyncableServiceBasedBridge::GetAllDataForDebugging(DataCallback callback) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(store_);
store_->ReadAllData(
base::BindOnce(&SyncableServiceBasedBridge::OnReadAllDataForProcessor,
weak_ptr_factory_.GetWeakPtr(), std::move(callback)));
}
std::string SyncableServiceBasedBridge::GetClientTag(
const EntityData& entity_data) {
NOTREACHED();
return std::string();
}
std::string SyncableServiceBasedBridge::GetStorageKey(
const EntityData& entity_data) {
// Not supported as per SupportsGetStorageKey().
NOTREACHED();
return std::string();
}
bool SyncableServiceBasedBridge::SupportsGetClientTag() const {
return false;
}
bool SyncableServiceBasedBridge::SupportsGetStorageKey() const {
return false;
}
ConflictResolution SyncableServiceBasedBridge::ResolveConflict(
const std::string& storage_key,
const EntityData& remote_data) const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!remote_data.is_deleted()) {
return ConflictResolution::UseRemote();
}
// Ignore local changes for extensions/apps when server had a delete, to
// avoid unwanted reinstall of an uninstalled extension.
if (type_ == EXTENSIONS || type_ == APPS) {
DVLOG(1) << "Resolving conflict, ignoring local changes for extension/app";
return ConflictResolution::UseRemote();
}
return ConflictResolution::UseLocal();
}
void SyncableServiceBasedBridge::ApplyStopSyncChanges(
std::unique_ptr<MetadataChangeList> delete_metadata_change_list) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(store_);
if (delete_metadata_change_list) {
in_memory_store_.clear();
store_->DeleteAllDataAndMetadata(base::DoNothing());
}
if (syncable_service_started_) {
syncable_service_->StopSyncing(type_);
syncable_service_started_ = false;
}
}
size_t SyncableServiceBasedBridge::EstimateSyncOverheadMemoryUsage() const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
return base::trace_event::EstimateMemoryUsage(in_memory_store_);
}
// static
std::unique_ptr<SyncChangeProcessor>
SyncableServiceBasedBridge::CreateLocalChangeProcessorForTesting(
ModelType type,
ModelTypeStore* store,
InMemoryStore* in_memory_store,
ModelTypeChangeProcessor* other) {
return std::make_unique<LocalChangeProcessor>(
type, /*error_callback=*/base::DoNothing(), store, in_memory_store,
other);
}
void SyncableServiceBasedBridge::OnSyncableServiceReady() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
std::move(store_factory_)
.Run(type_, base::BindOnce(&SyncableServiceBasedBridge::OnStoreCreated,
weak_ptr_factory_.GetWeakPtr()));
DCHECK(!store_factory_);
}
void SyncableServiceBasedBridge::OnStoreCreated(
const base::Optional<ModelError>& error,
std::unique_ptr<ModelTypeStore> store) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (error) {
change_processor()->ReportError(*error);
return;
}
DCHECK(store);
store_ = std::move(store);
auto in_memory_store = std::make_unique<InMemoryStore>();
InMemoryStore* raw_in_memory_store = in_memory_store.get();
store_->ReadAllDataAndPreprocess(
base::BindOnce(&ParseInMemoryStoreOnBackendSequence,
base::Unretained(raw_in_memory_store)),
base::BindOnce(&SyncableServiceBasedBridge::OnReadAllDataForInit,
weak_ptr_factory_.GetWeakPtr(),
std::move(in_memory_store)));
}
void SyncableServiceBasedBridge::OnReadAllDataForInit(
std::unique_ptr<InMemoryStore> in_memory_store,
const base::Optional<ModelError>& error) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(in_memory_store.get());
DCHECK(in_memory_store_.empty());
if (error) {
change_processor()->ReportError(*error);
return;
}
in_memory_store_ = std::move(*in_memory_store);
store_->ReadAllMetadata(
base::BindOnce(&SyncableServiceBasedBridge::OnReadAllMetadataForInit,
weak_ptr_factory_.GetWeakPtr()));
}
void SyncableServiceBasedBridge::OnReadAllMetadataForInit(
const base::Optional<ModelError>& error,
std::unique_ptr<MetadataBatch> metadata_batch) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(!syncable_service_started_);
if (error) {
change_processor()->ReportError(*error);
return;
}
// Guard against inconsistent state, and recover from it by starting from
// scratch, which will cause the eventual refetching of all entities from the
// server.
if (!metadata_batch->GetModelTypeState().initial_sync_done() &&
!in_memory_store_.empty()) {
in_memory_store_.clear();
store_->DeleteAllDataAndMetadata(base::DoNothing());
change_processor()->ModelReadyToSync(std::make_unique<MetadataBatch>());
DCHECK(!change_processor()->IsTrackingMetadata());
return;
}
change_processor()->ModelReadyToSync(std::move(metadata_batch));
ReportErrorIfSet(MaybeStartSyncableService());
}
base::Optional<ModelError>
SyncableServiceBasedBridge::MaybeStartSyncableService() {
DCHECK(!syncable_service_started_);
DCHECK(store_);
// If sync wasn't enabled according to the loaded metadata, let's wait until
// MergeSyncData() is called before starting the SyncableService.
if (!change_processor()->IsTrackingMetadata()) {
return base::nullopt;
}
const base::TimeTicks start_time = base::TimeTicks::Now();
// Sync enabled, so exercise MergeDataAndStartSyncing() immediately, since
// this function is reached only if sync is starting already.
SyncDataList initial_sync_data;
initial_sync_data.reserve(in_memory_store_.size());
for (const std::pair<const std::string, sync_pb::EntitySpecifics>& record :
in_memory_store_) {
initial_sync_data.push_back(SyncData::CreateRemoteData(
/*id=*/kInvalidNodeId, std::move(record.second),
/*client_tag_hash=*/record.first));
}
auto error_callback =
base::BindRepeating(&SyncableServiceBasedBridge::ReportErrorIfSet,
weak_ptr_factory_.GetWeakPtr());
auto local_change_processor = std::make_unique<LocalChangeProcessor>(
type_, error_callback, store_.get(), &in_memory_store_,
change_processor());
const base::Optional<ModelError> merge_error = ConvertToModelError(
syncable_service_
->MergeDataAndStartSyncing(
type_, initial_sync_data, std::move(local_change_processor),
std::make_unique<SyncErrorFactoryImpl>(type_))
.error());
RecordAssociationTime(base::TimeTicks::Now() - start_time);
if (!merge_error) {
syncable_service_started_ = true;
}
return merge_error;
}
SyncChangeList SyncableServiceBasedBridge::StoreAndConvertRemoteChanges(
std::unique_ptr<MetadataChangeList> initial_metadata_change_list,
EntityChangeList input_entity_change_list) {
std::unique_ptr<ModelTypeStore::WriteBatch> batch =
store_->CreateWriteBatch();
batch->TakeMetadataChangesFrom(std::move(initial_metadata_change_list));
SyncChangeList output_sync_change_list;
output_sync_change_list.reserve(input_entity_change_list.size());
for (const std::unique_ptr<EntityChange>& change : input_entity_change_list) {
switch (change->type()) {
case EntityChange::ACTION_DELETE: {
const std::string& storage_key = change->storage_key();
DCHECK_NE(0U, in_memory_store_.count(storage_key));
DVLOG(1) << ModelTypeToString(type_)
<< ": Processing deletion with storage key: " << storage_key;
output_sync_change_list.emplace_back(
FROM_HERE, SyncChange::ACTION_DELETE,
SyncData::CreateRemoteData(
/*id=*/kInvalidNodeId, in_memory_store_[storage_key],
/*client_tag_hash=*/""));
// For tombstones, there is no actual data, which means no client tag
// hash either, but the processor provides the storage key.
DCHECK(!storage_key.empty());
batch->DeleteData(storage_key);
in_memory_store_.erase(storage_key);
break;
}
case EntityChange::ACTION_ADD:
// Because we use the client tag hash as storage key, let the processor
// know.
change_processor()->UpdateStorageKey(
change->data(), /*storage_key=*/change->data().client_tag_hash,
batch->GetMetadataChangeList());
FALLTHROUGH;
case EntityChange::ACTION_UPDATE: {
const std::string& storage_key = change->data().client_tag_hash;
DVLOG(1) << ModelTypeToString(type_)
<< ": Processing add/update with key: " << storage_key;
output_sync_change_list.emplace_back(
FROM_HERE, ConvertToSyncChangeType(change->type()),
SyncData::CreateRemoteData(
/*id=*/kInvalidNodeId, change->data().specifics,
change->data().client_tag_hash));
batch->WriteData(
storage_key,
CreatePersistedFromEntityData(change->data()).SerializeAsString());
in_memory_store_[storage_key] = change->data().specifics;
break;
}
}
}
store_->CommitWriteBatch(
std::move(batch),
base::BindOnce(&SyncableServiceBasedBridge::ReportErrorIfSet,
weak_ptr_factory_.GetWeakPtr()));
return output_sync_change_list;
}
void SyncableServiceBasedBridge::OnReadDataForProcessor(
DataCallback callback,
const base::Optional<ModelError>& error,
std::unique_ptr<ModelTypeStore::RecordList> record_list,
std::unique_ptr<ModelTypeStore::IdList> missing_id_list) {
OnReadAllDataForProcessor(std::move(callback), error, std::move(record_list));
}
void SyncableServiceBasedBridge::OnReadAllDataForProcessor(
DataCallback callback,
const base::Optional<ModelError>& error,
std::unique_ptr<ModelTypeStore::RecordList> record_list) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (error) {
change_processor()->ReportError(*error);
return;
}
auto batch = std::make_unique<MutableDataBatch>();
for (const ModelTypeStore::Record& record : *record_list) {
sync_pb::PersistedEntityData persisted_entity;
if (record.id.empty() || !persisted_entity.ParseFromString(record.value)) {
change_processor()->ReportError(
{FROM_HERE, "Failed deserializing data."});
return;
}
// Note that client tag hash is used as storage key too.
batch->Put(record.id,
ConvertPersistedToEntityData(
/*client_tag_hash=*/record.id, std::move(persisted_entity)));
}
std::move(callback).Run(std::move(batch));
}
void SyncableServiceBasedBridge::ReportErrorIfSet(
const base::Optional<ModelError>& error) {
if (error) {
change_processor()->ReportError(*error);
}
}
void SyncableServiceBasedBridge::RecordAssociationTime(
base::TimeDelta time) const {
// This mimics the implementation in SharedChangeProcessor.
#define PER_DATA_TYPE_MACRO(type_str) \
UMA_HISTOGRAM_TIMES("Sync." type_str "AssociationTime", time);
SYNC_DATA_TYPE_HISTOGRAM(type_);
#undef PER_DATA_TYPE_MACRO
}
} // namespace syncer