blob: e134cea4e9d4df4bd87f4128daf9e971fbbe5e06 [file] [log] [blame]
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/sync/engine_impl/model_type_registry.h"
#include <stddef.h>
#include <utility>
#include "base/bind.h"
#include "base/metrics/histogram_functions.h"
#include "base/metrics/histogram_macros.h"
#include "base/observer_list.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "components/sync/engine/commit_queue.h"
#include "components/sync/engine/data_type_activation_response.h"
#include "components/sync/engine/model_type_processor.h"
#include "components/sync/engine_impl/cycle/directory_type_debug_info_emitter.h"
#include "components/sync/engine_impl/cycle/non_blocking_type_debug_info_emitter.h"
#include "components/sync/engine_impl/directory_commit_contributor.h"
#include "components/sync/engine_impl/directory_update_handler.h"
#include "components/sync/engine_impl/model_type_worker.h"
#include "components/sync/nigori/cryptographer.h"
#include "components/sync/nigori/keystore_keys_handler.h"
#include "components/sync/syncable/read_transaction.h"
#include "components/sync/syncable/syncable_base_transaction.h"
namespace syncer {
namespace {
class CommitQueueProxy : public CommitQueue {
public:
CommitQueueProxy(const base::WeakPtr<CommitQueue>& worker,
const scoped_refptr<base::SequencedTaskRunner>& sync_thread);
~CommitQueueProxy() override;
void NudgeForCommit() override;
private:
base::WeakPtr<CommitQueue> worker_;
scoped_refptr<base::SequencedTaskRunner> sync_thread_;
};
CommitQueueProxy::CommitQueueProxy(
const base::WeakPtr<CommitQueue>& worker,
const scoped_refptr<base::SequencedTaskRunner>& sync_thread)
: worker_(worker), sync_thread_(sync_thread) {}
CommitQueueProxy::~CommitQueueProxy() {}
void CommitQueueProxy::NudgeForCommit() {
sync_thread_->PostTask(FROM_HERE,
base::BindOnce(&CommitQueue::NudgeForCommit, worker_));
}
} // namespace
ModelTypeRegistry::ModelTypeRegistry(
const std::vector<scoped_refptr<ModelSafeWorker>>& workers,
UserShare* user_share,
NudgeHandler* nudge_handler,
const UssMigrator& uss_migrator,
CancelationSignal* cancelation_signal,
KeystoreKeysHandler* keystore_keys_handler)
: user_share_(user_share),
nudge_handler_(nudge_handler),
uss_migrator_(uss_migrator),
cancelation_signal_(cancelation_signal),
keystore_keys_handler_(keystore_keys_handler) {
for (size_t i = 0u; i < workers.size(); ++i) {
workers_map_.insert(
std::make_pair(workers[i]->GetModelSafeGroup(), workers[i]));
}
}
ModelTypeRegistry::~ModelTypeRegistry() {}
void ModelTypeRegistry::ConnectNonBlockingType(
ModelType type,
std::unique_ptr<DataTypeActivationResponse> activation_response) {
DCHECK(update_handler_map_.find(type) == update_handler_map_.end());
DCHECK(commit_contributor_map_.find(type) == commit_contributor_map_.end());
DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type);
bool initial_sync_done =
activation_response->model_type_state.initial_sync_done();
// Attempt migration if the USS initial sync hasn't been done, there is a
// migrator function, and directory has data for this |type|, and |type| is
// not NIGORI. Nigori is exceptional, because it has a small amount of data,
// which is just downloaded from the server again.
bool do_migration = !initial_sync_done && !uss_migrator_.is_null() &&
directory()->InitialSyncEndedForType(type) &&
type != NIGORI;
bool trigger_initial_sync = !initial_sync_done && !do_migration;
// Save a raw pointer to the processor for connecting later.
ModelTypeProcessor* type_processor =
activation_response->type_processor.get();
std::unique_ptr<Cryptographer> cryptographer_copy;
if (encrypted_types_.Has(type))
cryptographer_copy = cryptographer_->Clone();
DataTypeDebugInfoEmitter* emitter = GetEmitter(type);
if (emitter == nullptr) {
auto new_emitter = std::make_unique<NonBlockingTypeDebugInfoEmitter>(
type, &type_debug_info_observers_);
emitter = new_emitter.get();
data_type_debug_info_emitter_map_.insert(
std::make_pair(type, std::move(new_emitter)));
}
auto worker = std::make_unique<ModelTypeWorker>(
type, activation_response->model_type_state, trigger_initial_sync,
std::move(cryptographer_copy), passphrase_type_, nudge_handler_,
std::move(activation_response->type_processor), emitter,
cancelation_signal_);
// Save a raw pointer and add the worker to our structures.
ModelTypeWorker* worker_ptr = worker.get();
model_type_workers_.push_back(std::move(worker));
update_handler_map_.insert(std::make_pair(type, worker_ptr));
commit_contributor_map_.insert(std::make_pair(type, worker_ptr));
// Initialize Processor -> Worker communication channel.
type_processor->ConnectSync(std::make_unique<CommitQueueProxy>(
worker_ptr->AsWeakPtr(), base::SequencedTaskRunnerHandle::Get()));
// Attempt migration if necessary.
if (do_migration) {
// TODO(crbug.com/658002): Store a pref before attempting migration
// indicating that it was attempted so we can avoid failure loops.
int migrated_entity_count = 0;
if (uss_migrator_.Run(type, user_share_, worker_ptr,
&migrated_entity_count)) {
UMA_HISTOGRAM_ENUMERATION("Sync.USSMigrationSuccess",
ModelTypeHistogramValue(type));
// If we succesfully migrated, purge the directory of data for the type.
// Purging removes the directory's local copy of the data only.
directory()->PurgeEntriesWithTypeIn(ModelTypeSet(type), ModelTypeSet(),
ModelTypeSet());
} else {
UMA_HISTOGRAM_ENUMERATION("Sync.USSMigrationFailure",
ModelTypeHistogramValue(type));
}
// Note that a partial failure may still contribute to the counts histogram.
base::UmaHistogramCounts100000(
std::string("Sync.USSMigrationEntityCount.") +
ModelTypeToHistogramSuffix(type),
migrated_entity_count);
}
// We want to check that we haven't accidentally enabled both the non-blocking
// and directory implementations for a given model type. This is true even if
// migration fails; our fallback in this case is to do an initial GetUpdates,
// not to use the directory implementation.
DCHECK(Intersection(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes())
.Empty());
}
void ModelTypeRegistry::DisconnectNonBlockingType(ModelType type) {
DVLOG(1) << "Disabling an off-thread sync type: " << ModelTypeToString(type);
DCHECK(update_handler_map_.find(type) != update_handler_map_.end());
DCHECK(commit_contributor_map_.find(type) != commit_contributor_map_.end());
size_t updaters_erased = update_handler_map_.erase(type);
size_t committers_erased = commit_contributor_map_.erase(type);
DCHECK_EQ(1U, updaters_erased);
DCHECK_EQ(1U, committers_erased);
auto iter = model_type_workers_.begin();
while (iter != model_type_workers_.end()) {
if ((*iter)->GetModelType() == type) {
iter = model_type_workers_.erase(iter);
} else {
++iter;
}
}
}
void ModelTypeRegistry::RegisterDirectoryType(ModelType type,
ModelSafeGroup group) {
DCHECK(update_handler_map_.find(type) == update_handler_map_.end());
DCHECK(commit_contributor_map_.find(type) == commit_contributor_map_.end());
DCHECK(directory_update_handlers_.find(type) ==
directory_update_handlers_.end());
DCHECK(directory_commit_contributors_.find(type) ==
directory_commit_contributors_.end());
DCHECK(data_type_debug_info_emitter_map_.find(type) ==
data_type_debug_info_emitter_map_.end());
DCHECK_NE(GROUP_NON_BLOCKING, group);
DCHECK(workers_map_.find(group) != workers_map_.end())
<< " for " << ModelTypeToString(type) << " group "
<< ModelSafeGroupToString(group);
auto worker = workers_map_.find(group)->second;
DCHECK(GetEmitter(type) == nullptr);
auto owned_emitter = std::make_unique<DirectoryTypeDebugInfoEmitter>(
directory(), type, &type_debug_info_observers_);
DataTypeDebugInfoEmitter* emitter_ptr = owned_emitter.get();
data_type_debug_info_emitter_map_[type] = std::move(owned_emitter);
auto updater = std::make_unique<DirectoryUpdateHandler>(directory(), type,
worker, emitter_ptr);
auto committer = std::make_unique<DirectoryCommitContributor>(
directory(), type, emitter_ptr);
update_handler_map_[type] = updater.get();
commit_contributor_map_[type] = committer.get();
directory_update_handlers_[type] = std::move(updater);
directory_commit_contributors_[type] = std::move(committer);
DCHECK(Intersection(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes())
.Empty());
}
void ModelTypeRegistry::UnregisterDirectoryType(ModelType type) {
DCHECK(update_handler_map_.find(type) != update_handler_map_.end());
DCHECK(commit_contributor_map_.find(type) != commit_contributor_map_.end());
DCHECK(directory_update_handlers_.find(type) !=
directory_update_handlers_.end());
DCHECK(directory_commit_contributors_.find(type) !=
directory_commit_contributors_.end());
DCHECK(data_type_debug_info_emitter_map_.find(type) !=
data_type_debug_info_emitter_map_.end());
update_handler_map_.erase(type);
commit_contributor_map_.erase(type);
directory_update_handlers_.erase(type);
directory_commit_contributors_.erase(type);
data_type_debug_info_emitter_map_.erase(type);
}
ModelTypeSet ModelTypeRegistry::GetEnabledTypes() const {
return Union(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes());
}
ModelTypeSet ModelTypeRegistry::GetInitialSyncEndedTypes() const {
// To prevent initial sync of USS types before we reach UssMigrator, we
// collect initial sync state from Directory.
// TODO(crbug.com/981480): consider cleaning configuration flow in a way,
// that this logic is not needed.
ModelTypeSet result = directory()->InitialSyncEndedTypes();
// We don't apply UssMigrator for Nigori, so we need to check only update
// handler state.
result.Remove(NIGORI);
for (const auto& kv : update_handler_map_) {
if (kv.second->IsInitialSyncEnded())
result.Put(kv.first);
}
return result;
}
const UpdateHandler* ModelTypeRegistry::GetUpdateHandler(ModelType type) const {
auto it = update_handler_map_.find(type);
return it == update_handler_map_.end() ? nullptr : it->second;
}
UpdateHandlerMap* ModelTypeRegistry::update_handler_map() {
return &update_handler_map_;
}
CommitContributorMap* ModelTypeRegistry::commit_contributor_map() {
return &commit_contributor_map_;
}
KeystoreKeysHandler* ModelTypeRegistry::keystore_keys_handler() {
return keystore_keys_handler_;
}
void ModelTypeRegistry::RegisterDirectoryTypeDebugInfoObserver(
TypeDebugInfoObserver* observer) {
if (!type_debug_info_observers_.HasObserver(observer))
type_debug_info_observers_.AddObserver(observer);
}
void ModelTypeRegistry::UnregisterDirectoryTypeDebugInfoObserver(
TypeDebugInfoObserver* observer) {
type_debug_info_observers_.RemoveObserver(observer);
}
bool ModelTypeRegistry::HasDirectoryTypeDebugInfoObserver(
const TypeDebugInfoObserver* observer) const {
return type_debug_info_observers_.HasObserver(observer);
}
void ModelTypeRegistry::RequestEmitDebugInfo() {
for (const auto& kv : data_type_debug_info_emitter_map_) {
kv.second->EmitCommitCountersUpdate();
kv.second->EmitUpdateCountersUpdate();
// Although this breaks encapsulation, don't emit status counters here.
// They've already been asked for manually on the UI thread because USS
// emitters don't have a working implementation yet.
}
}
bool ModelTypeRegistry::HasUnsyncedItems() const {
// For model type workers, we ask them individually.
for (const auto& worker : model_type_workers_) {
if (worker->HasLocalChangesForTest()) {
return true;
}
}
// Verify directory state.
ReadTransaction trans(FROM_HERE, user_share_);
return trans.GetWrappedTrans()->directory()->unsynced_entity_count() != 0;
}
base::WeakPtr<ModelTypeConnector> ModelTypeRegistry::AsWeakPtr() {
return weak_ptr_factory_.GetWeakPtr();
}
void ModelTypeRegistry::OnPassphraseRequired(
PassphraseRequiredReason reason,
const KeyDerivationParams& key_derivation_params,
const sync_pb::EncryptedData& pending_keys) {}
void ModelTypeRegistry::OnPassphraseAccepted() {
for (const auto& worker : model_type_workers_) {
if (encrypted_types_.Has(worker->GetModelType())) {
worker->EncryptionAcceptedMaybeApplyUpdates();
}
}
}
void ModelTypeRegistry::OnTrustedVaultKeyRequired() {}
void ModelTypeRegistry::OnTrustedVaultKeyAccepted() {
for (const auto& worker : model_type_workers_) {
if (encrypted_types_.Has(worker->GetModelType())) {
worker->EncryptionAcceptedMaybeApplyUpdates();
}
}
}
void ModelTypeRegistry::OnBootstrapTokenUpdated(
const std::string& bootstrap_token,
BootstrapTokenType type) {}
void ModelTypeRegistry::OnEncryptedTypesChanged(ModelTypeSet encrypted_types,
bool encrypt_everything) {
// TODO(skym): This does not handle reducing the number of encrypted types
// correctly. They're removed from |encrypted_types_| but corresponding
// workers never have their Cryptographers removed. This probably is not a use
// case that currently needs to be supported, but it should be guarded against
// here.
encrypted_types_ = encrypted_types;
OnEncryptionStateChanged();
}
void ModelTypeRegistry::OnEncryptionComplete() {}
void ModelTypeRegistry::OnCryptographerStateChanged(
Cryptographer* cryptographer,
bool has_pending_keys) {
cryptographer_ = cryptographer->Clone();
OnEncryptionStateChanged();
}
void ModelTypeRegistry::OnPassphraseTypeChanged(PassphraseType type,
base::Time passphrase_time) {
passphrase_type_ = type;
for (const auto& worker : model_type_workers_) {
if (encrypted_types_.Has(worker->GetModelType())) {
worker->UpdatePassphraseType(type);
}
}
}
void ModelTypeRegistry::OnEncryptionStateChanged() {
for (const auto& worker : model_type_workers_) {
if (encrypted_types_.Has(worker->GetModelType())) {
worker->UpdateCryptographer(cryptographer_->Clone());
}
}
}
DataTypeDebugInfoEmitter* ModelTypeRegistry::GetEmitter(ModelType type) {
DataTypeDebugInfoEmitter* raw_emitter = nullptr;
auto it = data_type_debug_info_emitter_map_.find(type);
if (it != data_type_debug_info_emitter_map_.end()) {
raw_emitter = it->second.get();
}
return raw_emitter;
}
ModelTypeSet ModelTypeRegistry::GetEnabledDirectoryTypes() const {
ModelTypeSet enabled_directory_types;
for (const auto& kv : directory_update_handlers_)
enabled_directory_types.Put(kv.first);
return enabled_directory_types;
}
ModelTypeSet ModelTypeRegistry::GetEnabledNonBlockingTypes() const {
ModelTypeSet enabled_non_blocking_types;
for (const auto& worker : model_type_workers_) {
enabled_non_blocking_types.Put(worker->GetModelType());
}
return enabled_non_blocking_types;
}
} // namespace syncer