blob: 8b046240dabaa0a8df7a2e3c8b452994ae202256 [file] [log] [blame]
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "sync/internal_api/public/shared_model_type_processor.h"
#include <utility>
#include <vector>
#include "base/bind.h"
#include "base/location.h"
#include "base/memory/ptr_util.h"
#include "base/metrics/histogram.h"
#include "base/threading/thread_task_runner_handle.h"
#include "sync/engine/commit_queue.h"
#include "sync/internal_api/public/activation_context.h"
#include "sync/internal_api/public/processor_entity_tracker.h"
#include "sync/syncable/syncable_util.h"
namespace syncer_v2 {
namespace {
class ModelTypeProcessorProxy : public ModelTypeProcessor {
public:
ModelTypeProcessorProxy(
const base::WeakPtr<ModelTypeProcessor>& processor,
const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner);
~ModelTypeProcessorProxy() override;
void ConnectSync(std::unique_ptr<CommitQueue> worker) override;
void DisconnectSync() override;
void OnCommitCompleted(const sync_pb::DataTypeState& type_state,
const CommitResponseDataList& response_list) override;
void OnUpdateReceived(const sync_pb::DataTypeState& type_state,
const UpdateResponseDataList& updates) override;
private:
base::WeakPtr<ModelTypeProcessor> processor_;
scoped_refptr<base::SequencedTaskRunner> processor_task_runner_;
};
ModelTypeProcessorProxy::ModelTypeProcessorProxy(
const base::WeakPtr<ModelTypeProcessor>& processor,
const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner)
: processor_(processor), processor_task_runner_(processor_task_runner) {}
ModelTypeProcessorProxy::~ModelTypeProcessorProxy() {}
void ModelTypeProcessorProxy::ConnectSync(std::unique_ptr<CommitQueue> worker) {
processor_task_runner_->PostTask(
FROM_HERE, base::Bind(&ModelTypeProcessor::ConnectSync, processor_,
base::Passed(std::move(worker))));
}
void ModelTypeProcessorProxy::DisconnectSync() {
processor_task_runner_->PostTask(
FROM_HERE, base::Bind(&ModelTypeProcessor::DisconnectSync, processor_));
}
void ModelTypeProcessorProxy::OnCommitCompleted(
const sync_pb::DataTypeState& type_state,
const CommitResponseDataList& response_list) {
processor_task_runner_->PostTask(
FROM_HERE, base::Bind(&ModelTypeProcessor::OnCommitCompleted, processor_,
type_state, response_list));
}
void ModelTypeProcessorProxy::OnUpdateReceived(
const sync_pb::DataTypeState& type_state,
const UpdateResponseDataList& updates) {
processor_task_runner_->PostTask(
FROM_HERE, base::Bind(&ModelTypeProcessor::OnUpdateReceived, processor_,
type_state, updates));
}
} // namespace
SharedModelTypeProcessor::SharedModelTypeProcessor(syncer::ModelType type,
ModelTypeService* service)
: type_(type),
is_metadata_loaded_(false),
is_initial_pending_data_loaded_(false),
service_(service),
error_handler_(nullptr),
weak_ptr_factory_(this) {
DCHECK(service);
}
SharedModelTypeProcessor::~SharedModelTypeProcessor() {}
// static
std::unique_ptr<ModelTypeChangeProcessor>
SharedModelTypeProcessor::CreateAsChangeProcessor(syncer::ModelType type,
ModelTypeService* service) {
return std::unique_ptr<ModelTypeChangeProcessor>(
new SharedModelTypeProcessor(type, service));
}
void SharedModelTypeProcessor::OnSyncStarting(
syncer::DataTypeErrorHandler* error_handler,
const StartCallback& start_callback) {
DCHECK(CalledOnValidThread());
DCHECK(start_callback_.is_null());
DCHECK(!IsConnected());
DCHECK(error_handler);
DVLOG(1) << "Sync is starting for " << ModelTypeToString(type_);
error_handler_ = error_handler;
start_callback_ = start_callback;
ConnectIfReady();
}
void SharedModelTypeProcessor::OnMetadataLoaded(
syncer::SyncError error,
std::unique_ptr<MetadataBatch> batch) {
DCHECK(CalledOnValidThread());
DCHECK(entities_.empty());
DCHECK(!is_metadata_loaded_);
DCHECK(!IsConnected());
is_metadata_loaded_ = true;
// Flip this flag here to cover all cases where we don't need to load data.
is_initial_pending_data_loaded_ = true;
if (error.IsSet()) {
start_error_ = error;
ConnectIfReady();
return;
}
if (batch->GetDataTypeState().initial_sync_done()) {
EntityMetadataMap metadata_map(batch->TakeAllMetadata());
std::vector<std::string> entities_to_commit;
for (auto it = metadata_map.begin(); it != metadata_map.end(); it++) {
std::unique_ptr<ProcessorEntityTracker> entity =
ProcessorEntityTracker::CreateFromMetadata(it->first, &it->second);
if (entity->RequiresCommitData()) {
entities_to_commit.push_back(entity->client_tag());
}
entities_[entity->metadata().client_tag_hash()] = std::move(entity);
}
data_type_state_ = batch->GetDataTypeState();
if (!entities_to_commit.empty()) {
is_initial_pending_data_loaded_ = false;
service_->GetData(
entities_to_commit,
base::Bind(&SharedModelTypeProcessor::OnInitialPendingDataLoaded,
weak_ptr_factory_.GetWeakPtr()));
}
} else {
// First time syncing; initialize metadata.
data_type_state_.mutable_progress_marker()->set_data_type_id(
GetSpecificsFieldNumberFromModelType(type_));
}
ConnectIfReady();
}
void SharedModelTypeProcessor::ConnectIfReady() {
DCHECK(CalledOnValidThread());
if (!is_metadata_loaded_ || !is_initial_pending_data_loaded_ ||
start_callback_.is_null()) {
return;
}
std::unique_ptr<ActivationContext> activation_context;
if (!start_error_.IsSet()) {
activation_context = base::WrapUnique(new ActivationContext);
activation_context->data_type_state = data_type_state_;
activation_context->type_processor = base::WrapUnique(
new ModelTypeProcessorProxy(weak_ptr_factory_.GetWeakPtr(),
base::ThreadTaskRunnerHandle::Get()));
}
start_callback_.Run(start_error_, std::move(activation_context));
start_callback_.Reset();
}
bool SharedModelTypeProcessor::IsAllowingChanges() const {
return is_metadata_loaded_;
}
bool SharedModelTypeProcessor::IsConnected() const {
DCHECK(CalledOnValidThread());
return !!worker_;
}
void SharedModelTypeProcessor::DisableSync() {
DCHECK(CalledOnValidThread());
std::unique_ptr<MetadataChangeList> change_list =
service_->CreateMetadataChangeList();
for (auto it = entities_.begin(); it != entities_.end(); ++it) {
change_list->ClearMetadata(it->second->client_tag());
}
change_list->ClearDataTypeState();
// Nothing to do if this fails, so just ignore the error it might return.
service_->ApplySyncChanges(std::move(change_list), EntityChangeList());
}
syncer::SyncError SharedModelTypeProcessor::CreateAndUploadError(
const tracked_objects::Location& location,
const std::string& message) {
if (error_handler_) {
return error_handler_->CreateAndUploadError(location, message, type_);
} else {
return syncer::SyncError(location, syncer::SyncError::DATATYPE_ERROR,
message, type_);
}
}
void SharedModelTypeProcessor::ConnectSync(
std::unique_ptr<CommitQueue> worker) {
DCHECK(CalledOnValidThread());
DVLOG(1) << "Successfully connected " << ModelTypeToString(type_);
worker_ = std::move(worker);
FlushPendingCommitRequests();
}
void SharedModelTypeProcessor::DisconnectSync() {
DCHECK(CalledOnValidThread());
DCHECK(IsConnected());
DVLOG(1) << "Disconnecting sync for " << ModelTypeToString(type_);
weak_ptr_factory_.InvalidateWeakPtrs();
worker_.reset();
for (auto it = entities_.begin(); it != entities_.end(); ++it) {
it->second->ClearTransientSyncState();
}
}
void SharedModelTypeProcessor::Put(const std::string& tag,
std::unique_ptr<EntityData> data,
MetadataChangeList* metadata_change_list) {
DCHECK(IsAllowingChanges());
DCHECK(data.get());
DCHECK(!data->is_deleted());
DCHECK(!data->non_unique_name.empty());
DCHECK_EQ(type_, syncer::GetModelTypeFromSpecifics(data->specifics));
if (!data_type_state_.initial_sync_done()) {
// Ignore changes before the initial sync is done.
return;
}
// Fill in some data.
data->client_tag_hash = GetHashForTag(tag);
if (data->modification_time.is_null()) {
data->modification_time = base::Time::Now();
}
ProcessorEntityTracker* entity = GetEntityForTagHash(data->client_tag_hash);
if (entity == nullptr) {
// The service is creating a new entity.
if (data->creation_time.is_null()) {
data->creation_time = data->modification_time;
}
entity = CreateEntity(tag, *data);
} else if (entity->MatchesData(*data)) {
// Ignore changes that don't actually change anything.
return;
}
entity->MakeLocalChange(std::move(data));
metadata_change_list->UpdateMetadata(tag, entity->metadata());
FlushPendingCommitRequests();
}
void SharedModelTypeProcessor::Delete(
const std::string& tag,
MetadataChangeList* metadata_change_list) {
DCHECK(IsAllowingChanges());
if (!data_type_state_.initial_sync_done()) {
// Ignore changes before the initial sync is done.
return;
}
ProcessorEntityTracker* entity = GetEntityForTag(tag);
if (entity == nullptr) {
// That's unusual, but not necessarily a bad thing.
// Missing is as good as deleted as far as the model is concerned.
DLOG(WARNING) << "Attempted to delete missing item."
<< " client tag: " << tag;
return;
}
entity->Delete();
metadata_change_list->UpdateMetadata(tag, entity->metadata());
FlushPendingCommitRequests();
}
void SharedModelTypeProcessor::FlushPendingCommitRequests() {
CommitRequestDataList commit_requests;
// Don't bother sending anything if there's no one to send to.
if (!IsConnected())
return;
// Don't send anything if the type is not ready to handle commits.
if (!data_type_state_.initial_sync_done())
return;
// TODO(rlarocque): Do something smarter than iterate here.
for (auto it = entities_.begin(); it != entities_.end(); ++it) {
ProcessorEntityTracker* entity = it->second.get();
if (entity->RequiresCommitRequest() && !entity->RequiresCommitData()) {
CommitRequestData request;
entity->InitializeCommitRequestData(&request);
commit_requests.push_back(request);
}
}
if (!commit_requests.empty())
worker_->EnqueueForCommit(commit_requests);
}
void SharedModelTypeProcessor::OnCommitCompleted(
const sync_pb::DataTypeState& type_state,
const CommitResponseDataList& response_list) {
std::unique_ptr<MetadataChangeList> change_list =
service_->CreateMetadataChangeList();
data_type_state_ = type_state;
change_list->UpdateDataTypeState(data_type_state_);
for (const CommitResponseData& data : response_list) {
ProcessorEntityTracker* entity = GetEntityForTagHash(data.client_tag_hash);
if (entity == nullptr) {
NOTREACHED() << "Received commit response for missing item."
<< " type: " << type_
<< " client_tag_hash: " << data.client_tag_hash;
continue;
}
entity->ReceiveCommitResponse(data);
if (entity->CanClearMetadata()) {
change_list->ClearMetadata(entity->client_tag());
entities_.erase(entity->metadata().client_tag_hash());
} else {
change_list->UpdateMetadata(entity->client_tag(), entity->metadata());
}
}
syncer::SyncError error =
service_->ApplySyncChanges(std::move(change_list), EntityChangeList());
if (error.IsSet()) {
error_handler_->OnSingleDataTypeUnrecoverableError(error);
}
}
void SharedModelTypeProcessor::OnUpdateReceived(
const sync_pb::DataTypeState& data_type_state,
const UpdateResponseDataList& updates) {
if (!data_type_state_.initial_sync_done()) {
OnInitialUpdateReceived(data_type_state, updates);
return;
}
std::unique_ptr<MetadataChangeList> metadata_changes =
service_->CreateMetadataChangeList();
EntityChangeList entity_changes;
metadata_changes->UpdateDataTypeState(data_type_state);
bool got_new_encryption_requirements =
data_type_state_.encryption_key_name() !=
data_type_state.encryption_key_name();
data_type_state_ = data_type_state;
// If new encryption requirements come from the server, the entities that are
// in |updates| will be recorded here so they can be ignored during the
// re-encryption phase at the end.
std::unordered_set<std::string> already_updated;
for (const UpdateResponseData& update : updates) {
ProcessorEntityTracker* entity = ProcessUpdate(update, &entity_changes);
if (!entity) {
// The update should be ignored.
continue;
}
if (entity->CanClearMetadata()) {
metadata_changes->ClearMetadata(entity->client_tag());
entities_.erase(entity->metadata().client_tag_hash());
} else {
metadata_changes->UpdateMetadata(entity->client_tag(),
entity->metadata());
}
if (got_new_encryption_requirements) {
already_updated.insert(entity->client_tag());
}
}
if (got_new_encryption_requirements) {
RecommitAllForEncryption(already_updated, metadata_changes.get());
}
// Inform the service of the new or updated data.
syncer::SyncError error =
service_->ApplySyncChanges(std::move(metadata_changes), entity_changes);
if (error.IsSet()) {
error_handler_->OnSingleDataTypeUnrecoverableError(error);
} else {
// There may be new reasons to commit by the time this function is done.
FlushPendingCommitRequests();
}
}
ProcessorEntityTracker* SharedModelTypeProcessor::ProcessUpdate(
const UpdateResponseData& update,
EntityChangeList* entity_changes) {
const EntityData& data = update.entity.value();
const std::string& client_tag_hash = data.client_tag_hash;
ProcessorEntityTracker* entity = GetEntityForTagHash(client_tag_hash);
if (entity == nullptr) {
if (data.is_deleted()) {
DLOG(WARNING) << "Received remote delete for a non-existing item."
<< " client_tag_hash: " << client_tag_hash;
return nullptr;
}
entity = CreateEntity(data);
entity_changes->push_back(
EntityChange::CreateAdd(entity->client_tag(), update.entity));
entity->RecordAcceptedUpdate(update);
} else if (entity->UpdateIsReflection(update.response_version)) {
// Seen this update before; just ignore it.
return nullptr;
} else if (entity->IsUnsynced()) {
ConflictResolution::Type resolution_type =
ResolveConflict(update, entity, entity_changes);
UMA_HISTOGRAM_ENUMERATION("Sync.ResolveConflict", resolution_type,
ConflictResolution::TYPE_SIZE);
} else if (data.is_deleted()) {
// The entity was deleted; inform the service. Note that the local data
// can never be deleted at this point because it would have either been
// acked (the add case) or pending (the conflict case).
DCHECK(!entity->metadata().is_deleted());
entity_changes->push_back(EntityChange::CreateDelete(entity->client_tag()));
entity->RecordAcceptedUpdate(update);
} else if (!entity->MatchesData(data)) {
// Specifics have changed, so update the service.
entity_changes->push_back(
EntityChange::CreateUpdate(entity->client_tag(), update.entity));
entity->RecordAcceptedUpdate(update);
} else {
// No data change; still record that the update was received.
entity->RecordAcceptedUpdate(update);
}
// If the received entity has out of date encryption, we schedule another
// commit to fix it.
if (data_type_state_.encryption_key_name() != update.encryption_key_name) {
DVLOG(2) << ModelTypeToString(type_) << ": Requesting re-encrypt commit "
<< update.encryption_key_name << " -> "
<< data_type_state_.encryption_key_name();
entity->IncrementSequenceNumber();
if (entity->RequiresCommitData()) {
// If there is no pending commit data, then either this update wasn't
// in conflict or the remote data won; either way the remote data is
// the right data to re-queue for commit.
entity->CacheCommitData(update.entity);
}
}
return entity;
}
ConflictResolution::Type SharedModelTypeProcessor::ResolveConflict(
const UpdateResponseData& update,
ProcessorEntityTracker* entity,
EntityChangeList* changes) {
const EntityData& remote_data = update.entity.value();
ConflictResolution::Type resolution_type = ConflictResolution::TYPE_SIZE;
std::unique_ptr<EntityData> new_data;
// Determine the type of resolution.
if (entity->MatchesData(remote_data)) {
// The changes are identical so there isn't a real conflict.
resolution_type = ConflictResolution::CHANGES_MATCH;
} else if (entity->RequiresCommitData() ||
entity->MatchesBaseData(entity->commit_data().value())) {
// If commit data needs to be loaded at this point, it can only be due to a
// re-encryption request. If the commit data matches the base data, it also
// must be a re-encryption request. Either way there's no real local change
// and the remote data should win.
resolution_type = ConflictResolution::IGNORE_LOCAL_ENCRYPTION;
} else if (entity->MatchesBaseData(remote_data)) {
// The remote data isn't actually changing from the last remote data that
// was seen, so it must have been a re-encryption and can be ignored.
resolution_type = ConflictResolution::IGNORE_REMOTE_ENCRYPTION;
} else {
// There's a real data conflict here; let the service resolve it.
ConflictResolution resolution =
service_->ResolveConflict(entity->commit_data().value(), remote_data);
resolution_type = resolution.type();
new_data = resolution.ExtractData();
}
// Apply the resolution.
switch (resolution_type) {
case ConflictResolution::CHANGES_MATCH:
// Record the update and squash the pending commit.
entity->RecordForcedUpdate(update);
break;
case ConflictResolution::USE_LOCAL:
case ConflictResolution::IGNORE_REMOTE_ENCRYPTION:
// Record that we received the update from the server but leave the
// pending commit intact.
entity->RecordIgnoredUpdate(update);
break;
case ConflictResolution::USE_REMOTE:
case ConflictResolution::IGNORE_LOCAL_ENCRYPTION:
// Squash the pending commit.
entity->RecordForcedUpdate(update);
// Update client data to match server.
changes->push_back(
EntityChange::CreateUpdate(entity->client_tag(), update.entity));
break;
case ConflictResolution::USE_NEW:
// Record that we received the update.
entity->RecordIgnoredUpdate(update);
// Make a new pending commit to update the server.
entity->MakeLocalChange(std::move(new_data));
// Update the client with the new entity.
changes->push_back(EntityChange::CreateUpdate(entity->client_tag(),
entity->commit_data()));
break;
case ConflictResolution::TYPE_SIZE:
NOTREACHED();
break;
}
DCHECK(!new_data);
return resolution_type;
}
void SharedModelTypeProcessor::RecommitAllForEncryption(
std::unordered_set<std::string> already_updated,
MetadataChangeList* metadata_changes) {
ModelTypeService::ClientTagList entities_needing_data;
for (auto it = entities_.begin(); it != entities_.end(); ++it) {
ProcessorEntityTracker* entity = it->second.get();
if (already_updated.find(entity->client_tag()) != already_updated.end()) {
continue;
}
entity->IncrementSequenceNumber();
if (entity->RequiresCommitData()) {
entities_needing_data.push_back(entity->client_tag());
}
metadata_changes->UpdateMetadata(entity->client_tag(), entity->metadata());
}
if (!entities_needing_data.empty()) {
service_->GetData(
entities_needing_data,
base::Bind(&SharedModelTypeProcessor::OnDataLoadedForReEncryption,
weak_ptr_factory_.GetWeakPtr()));
}
}
void SharedModelTypeProcessor::OnInitialUpdateReceived(
const sync_pb::DataTypeState& data_type_state,
const UpdateResponseDataList& updates) {
DCHECK(entities_.empty());
// Ensure that initial sync was not already done and that the worker
// correctly marked initial sync as done for this update.
DCHECK(!data_type_state_.initial_sync_done());
DCHECK(data_type_state.initial_sync_done());
std::unique_ptr<MetadataChangeList> metadata_changes =
service_->CreateMetadataChangeList();
EntityDataMap data_map;
data_type_state_ = data_type_state;
metadata_changes->UpdateDataTypeState(data_type_state_);
for (const UpdateResponseData& update : updates) {
ProcessorEntityTracker* entity = CreateEntity(update.entity.value());
const std::string& tag = entity->client_tag();
entity->RecordAcceptedUpdate(update);
metadata_changes->UpdateMetadata(tag, entity->metadata());
data_map[tag] = update.entity;
}
// Let the service handle associating and merging the data.
syncer::SyncError error =
service_->MergeSyncData(std::move(metadata_changes), data_map);
if (error.IsSet()) {
error_handler_->OnSingleDataTypeUnrecoverableError(error);
} else {
// We may have new reasons to commit by the time this function is done.
FlushPendingCommitRequests();
}
}
void SharedModelTypeProcessor::OnInitialPendingDataLoaded(
syncer::SyncError error,
std::unique_ptr<DataBatch> data_batch) {
DCHECK(!is_initial_pending_data_loaded_);
if (error.IsSet()) {
start_error_ = error;
} else {
ConsumeDataBatch(std::move(data_batch));
}
is_initial_pending_data_loaded_ = true;
ConnectIfReady();
}
void SharedModelTypeProcessor::OnDataLoadedForReEncryption(
syncer::SyncError error,
std::unique_ptr<DataBatch> data_batch) {
DCHECK(is_initial_pending_data_loaded_);
if (error.IsSet()) {
error_handler_->OnSingleDataTypeUnrecoverableError(error);
return;
}
ConsumeDataBatch(std::move(data_batch));
FlushPendingCommitRequests();
}
void SharedModelTypeProcessor::ConsumeDataBatch(
std::unique_ptr<DataBatch> data_batch) {
while (data_batch->HasNext()) {
TagAndData data = data_batch->Next();
ProcessorEntityTracker* entity = GetEntityForTag(data.first);
// If the entity wasn't deleted or updated with new commit.
if (entity != nullptr && entity->RequiresCommitData()) {
entity->CacheCommitData(data.second.get());
}
}
}
std::string SharedModelTypeProcessor::GetHashForTag(const std::string& tag) {
return syncer::syncable::GenerateSyncableHash(type_, tag);
}
ProcessorEntityTracker* SharedModelTypeProcessor::GetEntityForTag(
const std::string& tag) {
return GetEntityForTagHash(GetHashForTag(tag));
}
ProcessorEntityTracker* SharedModelTypeProcessor::GetEntityForTagHash(
const std::string& tag_hash) {
auto it = entities_.find(tag_hash);
return it != entities_.end() ? it->second.get() : nullptr;
}
ProcessorEntityTracker* SharedModelTypeProcessor::CreateEntity(
const std::string& tag,
const EntityData& data) {
DCHECK(entities_.find(data.client_tag_hash) == entities_.end());
std::unique_ptr<ProcessorEntityTracker> entity =
ProcessorEntityTracker::CreateNew(tag, data.client_tag_hash, data.id,
data.creation_time);
ProcessorEntityTracker* entity_ptr = entity.get();
entities_[data.client_tag_hash] = std::move(entity);
return entity_ptr;
}
ProcessorEntityTracker* SharedModelTypeProcessor::CreateEntity(
const EntityData& data) {
// Let the service define |client_tag| based on the entity data.
const std::string tag = service_->GetClientTag(data);
// This constraint may be relaxed in the future.
DCHECK_EQ(data.client_tag_hash, GetHashForTag(tag));
return CreateEntity(tag, data);
}
} // namespace syncer_v2