// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "sync/internal_api/public/shared_model_type_processor.h"

#include <utility>
#include <vector>

#include "base/bind.h"
#include "base/location.h"
#include "base/memory/ptr_util.h"
#include "base/metrics/histogram.h"
#include "base/threading/thread_task_runner_handle.h"
#include "sync/engine/commit_queue.h"
#include "sync/internal_api/public/activation_context.h"
#include "sync/internal_api/public/processor_entity_tracker.h"
#include "sync/syncable/syncable_util.h"

namespace syncer_v2 {

namespace {

class ModelTypeProcessorProxy : public ModelTypeProcessor {
 public:
  ModelTypeProcessorProxy(
      const base::WeakPtr<ModelTypeProcessor>& processor,
      const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner);
  ~ModelTypeProcessorProxy() override;

  void ConnectSync(std::unique_ptr<CommitQueue> worker) override;
  void DisconnectSync() override;
  void OnCommitCompleted(const sync_pb::DataTypeState& type_state,
                         const CommitResponseDataList& response_list) override;
  void OnUpdateReceived(const sync_pb::DataTypeState& type_state,
                        const UpdateResponseDataList& updates) override;

 private:
  base::WeakPtr<ModelTypeProcessor> processor_;
  scoped_refptr<base::SequencedTaskRunner> processor_task_runner_;
};

ModelTypeProcessorProxy::ModelTypeProcessorProxy(
    const base::WeakPtr<ModelTypeProcessor>& processor,
    const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner)
    : processor_(processor), processor_task_runner_(processor_task_runner) {}

ModelTypeProcessorProxy::~ModelTypeProcessorProxy() {}

void ModelTypeProcessorProxy::ConnectSync(std::unique_ptr<CommitQueue> worker) {
  processor_task_runner_->PostTask(
      FROM_HERE, base::Bind(&ModelTypeProcessor::ConnectSync, processor_,
                            base::Passed(std::move(worker))));
}

void ModelTypeProcessorProxy::DisconnectSync() {
  processor_task_runner_->PostTask(
      FROM_HERE, base::Bind(&ModelTypeProcessor::DisconnectSync, processor_));
}

void ModelTypeProcessorProxy::OnCommitCompleted(
    const sync_pb::DataTypeState& type_state,
    const CommitResponseDataList& response_list) {
  processor_task_runner_->PostTask(
      FROM_HERE, base::Bind(&ModelTypeProcessor::OnCommitCompleted, processor_,
                            type_state, response_list));
}

void ModelTypeProcessorProxy::OnUpdateReceived(
    const sync_pb::DataTypeState& type_state,
    const UpdateResponseDataList& updates) {
  processor_task_runner_->PostTask(
      FROM_HERE, base::Bind(&ModelTypeProcessor::OnUpdateReceived, processor_,
                            type_state, updates));
}

}  // namespace

SharedModelTypeProcessor::SharedModelTypeProcessor(syncer::ModelType type,
                                                   ModelTypeService* service)
    : type_(type),
      is_metadata_loaded_(false),
      is_initial_pending_data_loaded_(false),
      service_(service),
      error_handler_(nullptr),
      weak_ptr_factory_(this) {
  DCHECK(service);
}

SharedModelTypeProcessor::~SharedModelTypeProcessor() {}

// static
std::unique_ptr<ModelTypeChangeProcessor>
SharedModelTypeProcessor::CreateAsChangeProcessor(syncer::ModelType type,
                                                  ModelTypeService* service) {
  return std::unique_ptr<ModelTypeChangeProcessor>(
      new SharedModelTypeProcessor(type, service));
}

void SharedModelTypeProcessor::OnSyncStarting(
    syncer::DataTypeErrorHandler* error_handler,
    const StartCallback& start_callback) {
  DCHECK(CalledOnValidThread());
  DCHECK(start_callback_.is_null());
  DCHECK(!IsConnected());
  DCHECK(error_handler);
  DVLOG(1) << "Sync is starting for " << ModelTypeToString(type_);

  error_handler_ = error_handler;
  start_callback_ = start_callback;
  ConnectIfReady();
}

void SharedModelTypeProcessor::OnMetadataLoaded(
    syncer::SyncError error,
    std::unique_ptr<MetadataBatch> batch) {
  DCHECK(CalledOnValidThread());
  DCHECK(entities_.empty());
  DCHECK(!is_metadata_loaded_);
  DCHECK(!IsConnected());

  is_metadata_loaded_ = true;
  // Flip this flag here to cover all cases where we don't need to load data.
  is_initial_pending_data_loaded_ = true;

  if (error.IsSet()) {
    start_error_ = error;
    ConnectIfReady();
    return;
  }

  if (batch->GetDataTypeState().initial_sync_done()) {
    EntityMetadataMap metadata_map(batch->TakeAllMetadata());
    std::vector<std::string> entities_to_commit;

    for (auto it = metadata_map.begin(); it != metadata_map.end(); it++) {
      std::unique_ptr<ProcessorEntityTracker> entity =
          ProcessorEntityTracker::CreateFromMetadata(it->first, &it->second);
      if (entity->RequiresCommitData()) {
        entities_to_commit.push_back(entity->client_tag());
      }
      entities_[entity->metadata().client_tag_hash()] = std::move(entity);
    }
    data_type_state_ = batch->GetDataTypeState();
    if (!entities_to_commit.empty()) {
      is_initial_pending_data_loaded_ = false;
      service_->GetData(
          entities_to_commit,
          base::Bind(&SharedModelTypeProcessor::OnInitialPendingDataLoaded,
                     weak_ptr_factory_.GetWeakPtr()));
    }
  } else {
    // First time syncing; initialize metadata.
    data_type_state_.mutable_progress_marker()->set_data_type_id(
        GetSpecificsFieldNumberFromModelType(type_));
  }

  ConnectIfReady();
}

void SharedModelTypeProcessor::ConnectIfReady() {
  DCHECK(CalledOnValidThread());
  if (!is_metadata_loaded_ || !is_initial_pending_data_loaded_ ||
      start_callback_.is_null()) {
    return;
  }

  std::unique_ptr<ActivationContext> activation_context;

  if (!start_error_.IsSet()) {
    activation_context = base::WrapUnique(new ActivationContext);
    activation_context->data_type_state = data_type_state_;
    activation_context->type_processor = base::WrapUnique(
        new ModelTypeProcessorProxy(weak_ptr_factory_.GetWeakPtr(),
                                    base::ThreadTaskRunnerHandle::Get()));
  }

  start_callback_.Run(start_error_, std::move(activation_context));
  start_callback_.Reset();
}

bool SharedModelTypeProcessor::IsAllowingChanges() const {
  return is_metadata_loaded_;
}

bool SharedModelTypeProcessor::IsConnected() const {
  DCHECK(CalledOnValidThread());
  return !!worker_;
}

void SharedModelTypeProcessor::DisableSync() {
  DCHECK(CalledOnValidThread());
  std::unique_ptr<MetadataChangeList> change_list =
      service_->CreateMetadataChangeList();
  for (auto it = entities_.begin(); it != entities_.end(); ++it) {
    change_list->ClearMetadata(it->second->client_tag());
  }
  change_list->ClearDataTypeState();
  // Nothing to do if this fails, so just ignore the error it might return.
  service_->ApplySyncChanges(std::move(change_list), EntityChangeList());
}

syncer::SyncError SharedModelTypeProcessor::CreateAndUploadError(
    const tracked_objects::Location& location,
    const std::string& message) {
  if (error_handler_) {
    return error_handler_->CreateAndUploadError(location, message, type_);
  } else {
    return syncer::SyncError(location, syncer::SyncError::DATATYPE_ERROR,
                             message, type_);
  }
}

void SharedModelTypeProcessor::ConnectSync(
    std::unique_ptr<CommitQueue> worker) {
  DCHECK(CalledOnValidThread());
  DVLOG(1) << "Successfully connected " << ModelTypeToString(type_);

  worker_ = std::move(worker);

  FlushPendingCommitRequests();
}

void SharedModelTypeProcessor::DisconnectSync() {
  DCHECK(CalledOnValidThread());
  DCHECK(IsConnected());

  DVLOG(1) << "Disconnecting sync for " << ModelTypeToString(type_);
  weak_ptr_factory_.InvalidateWeakPtrs();
  worker_.reset();

  for (auto it = entities_.begin(); it != entities_.end(); ++it) {
    it->second->ClearTransientSyncState();
  }
}

void SharedModelTypeProcessor::Put(const std::string& tag,
                                   std::unique_ptr<EntityData> data,
                                   MetadataChangeList* metadata_change_list) {
  DCHECK(IsAllowingChanges());
  DCHECK(data.get());
  DCHECK(!data->is_deleted());
  DCHECK(!data->non_unique_name.empty());
  DCHECK_EQ(type_, syncer::GetModelTypeFromSpecifics(data->specifics));

  if (!data_type_state_.initial_sync_done()) {
    // Ignore changes before the initial sync is done.
    return;
  }

  // Fill in some data.
  data->client_tag_hash = GetHashForTag(tag);
  if (data->modification_time.is_null()) {
    data->modification_time = base::Time::Now();
  }

  ProcessorEntityTracker* entity = GetEntityForTagHash(data->client_tag_hash);

  if (entity == nullptr) {
    // The service is creating a new entity.
    if (data->creation_time.is_null()) {
      data->creation_time = data->modification_time;
    }
    entity = CreateEntity(tag, *data);
  } else if (entity->MatchesData(*data)) {
    // Ignore changes that don't actually change anything.
    return;
  }

  entity->MakeLocalChange(std::move(data));
  metadata_change_list->UpdateMetadata(tag, entity->metadata());

  FlushPendingCommitRequests();
}

void SharedModelTypeProcessor::Delete(
    const std::string& tag,
    MetadataChangeList* metadata_change_list) {
  DCHECK(IsAllowingChanges());

  if (!data_type_state_.initial_sync_done()) {
    // Ignore changes before the initial sync is done.
    return;
  }

  ProcessorEntityTracker* entity = GetEntityForTag(tag);
  if (entity == nullptr) {
    // That's unusual, but not necessarily a bad thing.
    // Missing is as good as deleted as far as the model is concerned.
    DLOG(WARNING) << "Attempted to delete missing item."
                  << " client tag: " << tag;
    return;
  }

  entity->Delete();

  metadata_change_list->UpdateMetadata(tag, entity->metadata());
  FlushPendingCommitRequests();
}

void SharedModelTypeProcessor::FlushPendingCommitRequests() {
  CommitRequestDataList commit_requests;

  // Don't bother sending anything if there's no one to send to.
  if (!IsConnected())
    return;

  // Don't send anything if the type is not ready to handle commits.
  if (!data_type_state_.initial_sync_done())
    return;

  // TODO(rlarocque): Do something smarter than iterate here.
  for (auto it = entities_.begin(); it != entities_.end(); ++it) {
    ProcessorEntityTracker* entity = it->second.get();
    if (entity->RequiresCommitRequest() && !entity->RequiresCommitData()) {
      CommitRequestData request;
      entity->InitializeCommitRequestData(&request);
      commit_requests.push_back(request);
    }
  }

  if (!commit_requests.empty())
    worker_->EnqueueForCommit(commit_requests);
}

void SharedModelTypeProcessor::OnCommitCompleted(
    const sync_pb::DataTypeState& type_state,
    const CommitResponseDataList& response_list) {
  std::unique_ptr<MetadataChangeList> change_list =
      service_->CreateMetadataChangeList();

  data_type_state_ = type_state;
  change_list->UpdateDataTypeState(data_type_state_);

  for (const CommitResponseData& data : response_list) {
    ProcessorEntityTracker* entity = GetEntityForTagHash(data.client_tag_hash);
    if (entity == nullptr) {
      NOTREACHED() << "Received commit response for missing item."
                   << " type: " << type_
                   << " client_tag_hash: " << data.client_tag_hash;
      continue;
    }

    entity->ReceiveCommitResponse(data);

    if (entity->CanClearMetadata()) {
      change_list->ClearMetadata(entity->client_tag());
      entities_.erase(entity->metadata().client_tag_hash());
    } else {
      change_list->UpdateMetadata(entity->client_tag(), entity->metadata());
    }
  }

  syncer::SyncError error =
      service_->ApplySyncChanges(std::move(change_list), EntityChangeList());
  if (error.IsSet()) {
    error_handler_->OnSingleDataTypeUnrecoverableError(error);
  }
}

void SharedModelTypeProcessor::OnUpdateReceived(
    const sync_pb::DataTypeState& data_type_state,
    const UpdateResponseDataList& updates) {
  if (!data_type_state_.initial_sync_done()) {
    OnInitialUpdateReceived(data_type_state, updates);
    return;
  }

  std::unique_ptr<MetadataChangeList> metadata_changes =
      service_->CreateMetadataChangeList();
  EntityChangeList entity_changes;

  metadata_changes->UpdateDataTypeState(data_type_state);
  bool got_new_encryption_requirements =
      data_type_state_.encryption_key_name() !=
      data_type_state.encryption_key_name();
  data_type_state_ = data_type_state;

  // If new encryption requirements come from the server, the entities that are
  // in |updates| will be recorded here so they can be ignored during the
  // re-encryption phase at the end.
  std::unordered_set<std::string> already_updated;

  for (const UpdateResponseData& update : updates) {
    ProcessorEntityTracker* entity = ProcessUpdate(update, &entity_changes);

    if (!entity) {
      // The update should be ignored.
      continue;
    }

    if (entity->CanClearMetadata()) {
      metadata_changes->ClearMetadata(entity->client_tag());
      entities_.erase(entity->metadata().client_tag_hash());
    } else {
      metadata_changes->UpdateMetadata(entity->client_tag(),
                                       entity->metadata());
    }

    if (got_new_encryption_requirements) {
      already_updated.insert(entity->client_tag());
    }
  }

  if (got_new_encryption_requirements) {
    RecommitAllForEncryption(already_updated, metadata_changes.get());
  }

  // Inform the service of the new or updated data.
  syncer::SyncError error =
      service_->ApplySyncChanges(std::move(metadata_changes), entity_changes);

  if (error.IsSet()) {
    error_handler_->OnSingleDataTypeUnrecoverableError(error);
  } else {
    // There may be new reasons to commit by the time this function is done.
    FlushPendingCommitRequests();
  }
}

ProcessorEntityTracker* SharedModelTypeProcessor::ProcessUpdate(
    const UpdateResponseData& update,
    EntityChangeList* entity_changes) {
  const EntityData& data = update.entity.value();
  const std::string& client_tag_hash = data.client_tag_hash;
  ProcessorEntityTracker* entity = GetEntityForTagHash(client_tag_hash);
  if (entity == nullptr) {
    if (data.is_deleted()) {
      DLOG(WARNING) << "Received remote delete for a non-existing item."
                    << " client_tag_hash: " << client_tag_hash;
      return nullptr;
    }

    entity = CreateEntity(data);
    entity_changes->push_back(
        EntityChange::CreateAdd(entity->client_tag(), update.entity));
    entity->RecordAcceptedUpdate(update);
  } else if (entity->UpdateIsReflection(update.response_version)) {
    // Seen this update before; just ignore it.
    return nullptr;
  } else if (entity->IsUnsynced()) {
    ConflictResolution::Type resolution_type =
        ResolveConflict(update, entity, entity_changes);
    UMA_HISTOGRAM_ENUMERATION("Sync.ResolveConflict", resolution_type,
                              ConflictResolution::TYPE_SIZE);
  } else if (data.is_deleted()) {
    // The entity was deleted; inform the service. Note that the local data
    // can never be deleted at this point because it would have either been
    // acked (the add case) or pending (the conflict case).
    DCHECK(!entity->metadata().is_deleted());
    entity_changes->push_back(EntityChange::CreateDelete(entity->client_tag()));
    entity->RecordAcceptedUpdate(update);
  } else if (!entity->MatchesData(data)) {
    // Specifics have changed, so update the service.
    entity_changes->push_back(
        EntityChange::CreateUpdate(entity->client_tag(), update.entity));
    entity->RecordAcceptedUpdate(update);
  } else {
    // No data change; still record that the update was received.
    entity->RecordAcceptedUpdate(update);
  }

  // If the received entity has out of date encryption, we schedule another
  // commit to fix it.
  if (data_type_state_.encryption_key_name() != update.encryption_key_name) {
    DVLOG(2) << ModelTypeToString(type_) << ": Requesting re-encrypt commit "
             << update.encryption_key_name << " -> "
             << data_type_state_.encryption_key_name();

    entity->IncrementSequenceNumber();
    if (entity->RequiresCommitData()) {
      // If there is no pending commit data, then either this update wasn't
      // in conflict or the remote data won; either way the remote data is
      // the right data to re-queue for commit.
      entity->CacheCommitData(update.entity);
    }
  }

  return entity;
}

ConflictResolution::Type SharedModelTypeProcessor::ResolveConflict(
    const UpdateResponseData& update,
    ProcessorEntityTracker* entity,
    EntityChangeList* changes) {
  const EntityData& remote_data = update.entity.value();

  ConflictResolution::Type resolution_type = ConflictResolution::TYPE_SIZE;
  std::unique_ptr<EntityData> new_data;

  // Determine the type of resolution.
  if (entity->MatchesData(remote_data)) {
    // The changes are identical so there isn't a real conflict.
    resolution_type = ConflictResolution::CHANGES_MATCH;
  } else if (entity->RequiresCommitData() ||
             entity->MatchesBaseData(entity->commit_data().value())) {
    // If commit data needs to be loaded at this point, it can only be due to a
    // re-encryption request. If the commit data matches the base data, it also
    // must be a re-encryption request. Either way there's no real local change
    // and the remote data should win.
    resolution_type = ConflictResolution::IGNORE_LOCAL_ENCRYPTION;
  } else if (entity->MatchesBaseData(remote_data)) {
    // The remote data isn't actually changing from the last remote data that
    // was seen, so it must have been a re-encryption and can be ignored.
    resolution_type = ConflictResolution::IGNORE_REMOTE_ENCRYPTION;
  } else {
    // There's a real data conflict here; let the service resolve it.
    ConflictResolution resolution =
        service_->ResolveConflict(entity->commit_data().value(), remote_data);
    resolution_type = resolution.type();
    new_data = resolution.ExtractData();
  }

  // Apply the resolution.
  switch (resolution_type) {
    case ConflictResolution::CHANGES_MATCH:
      // Record the update and squash the pending commit.
      entity->RecordForcedUpdate(update);
      break;
    case ConflictResolution::USE_LOCAL:
    case ConflictResolution::IGNORE_REMOTE_ENCRYPTION:
      // Record that we received the update from the server but leave the
      // pending commit intact.
      entity->RecordIgnoredUpdate(update);
      break;
    case ConflictResolution::USE_REMOTE:
    case ConflictResolution::IGNORE_LOCAL_ENCRYPTION:
      // Squash the pending commit.
      entity->RecordForcedUpdate(update);
      // Update client data to match server.
      changes->push_back(
          EntityChange::CreateUpdate(entity->client_tag(), update.entity));
      break;
    case ConflictResolution::USE_NEW:
      // Record that we received the update.
      entity->RecordIgnoredUpdate(update);
      // Make a new pending commit to update the server.
      entity->MakeLocalChange(std::move(new_data));
      // Update the client with the new entity.
      changes->push_back(EntityChange::CreateUpdate(entity->client_tag(),
                                                    entity->commit_data()));
      break;
    case ConflictResolution::TYPE_SIZE:
      NOTREACHED();
      break;
  }
  DCHECK(!new_data);

  return resolution_type;
}

void SharedModelTypeProcessor::RecommitAllForEncryption(
    std::unordered_set<std::string> already_updated,
    MetadataChangeList* metadata_changes) {
  ModelTypeService::ClientTagList entities_needing_data;

  for (auto it = entities_.begin(); it != entities_.end(); ++it) {
    ProcessorEntityTracker* entity = it->second.get();
    if (already_updated.find(entity->client_tag()) != already_updated.end()) {
      continue;
    }
    entity->IncrementSequenceNumber();
    if (entity->RequiresCommitData()) {
      entities_needing_data.push_back(entity->client_tag());
    }
    metadata_changes->UpdateMetadata(entity->client_tag(), entity->metadata());
  }

  if (!entities_needing_data.empty()) {
    service_->GetData(
        entities_needing_data,
        base::Bind(&SharedModelTypeProcessor::OnDataLoadedForReEncryption,
                   weak_ptr_factory_.GetWeakPtr()));
  }
}

void SharedModelTypeProcessor::OnInitialUpdateReceived(
    const sync_pb::DataTypeState& data_type_state,
    const UpdateResponseDataList& updates) {
  DCHECK(entities_.empty());
  // Ensure that initial sync was not already done and that the worker
  // correctly marked initial sync as done for this update.
  DCHECK(!data_type_state_.initial_sync_done());
  DCHECK(data_type_state.initial_sync_done());

  std::unique_ptr<MetadataChangeList> metadata_changes =
      service_->CreateMetadataChangeList();
  EntityDataMap data_map;

  data_type_state_ = data_type_state;
  metadata_changes->UpdateDataTypeState(data_type_state_);

  for (const UpdateResponseData& update : updates) {
    ProcessorEntityTracker* entity = CreateEntity(update.entity.value());
    const std::string& tag = entity->client_tag();
    entity->RecordAcceptedUpdate(update);
    metadata_changes->UpdateMetadata(tag, entity->metadata());
    data_map[tag] = update.entity;
  }

  // Let the service handle associating and merging the data.
  syncer::SyncError error =
      service_->MergeSyncData(std::move(metadata_changes), data_map);

  if (error.IsSet()) {
    error_handler_->OnSingleDataTypeUnrecoverableError(error);
  } else {
    // We may have new reasons to commit by the time this function is done.
    FlushPendingCommitRequests();
  }
}

void SharedModelTypeProcessor::OnInitialPendingDataLoaded(
    syncer::SyncError error,
    std::unique_ptr<DataBatch> data_batch) {
  DCHECK(!is_initial_pending_data_loaded_);

  if (error.IsSet()) {
    start_error_ = error;
  } else {
    ConsumeDataBatch(std::move(data_batch));
  }

  is_initial_pending_data_loaded_ = true;
  ConnectIfReady();
}

void SharedModelTypeProcessor::OnDataLoadedForReEncryption(
    syncer::SyncError error,
    std::unique_ptr<DataBatch> data_batch) {
  DCHECK(is_initial_pending_data_loaded_);

  if (error.IsSet()) {
    error_handler_->OnSingleDataTypeUnrecoverableError(error);
    return;
  }

  ConsumeDataBatch(std::move(data_batch));
  FlushPendingCommitRequests();
}

void SharedModelTypeProcessor::ConsumeDataBatch(
    std::unique_ptr<DataBatch> data_batch) {
  while (data_batch->HasNext()) {
    TagAndData data = data_batch->Next();
    ProcessorEntityTracker* entity = GetEntityForTag(data.first);
    // If the entity wasn't deleted or updated with new commit.
    if (entity != nullptr && entity->RequiresCommitData()) {
      entity->CacheCommitData(data.second.get());
    }
  }
}

std::string SharedModelTypeProcessor::GetHashForTag(const std::string& tag) {
  return syncer::syncable::GenerateSyncableHash(type_, tag);
}

ProcessorEntityTracker* SharedModelTypeProcessor::GetEntityForTag(
    const std::string& tag) {
  return GetEntityForTagHash(GetHashForTag(tag));
}

ProcessorEntityTracker* SharedModelTypeProcessor::GetEntityForTagHash(
    const std::string& tag_hash) {
  auto it = entities_.find(tag_hash);
  return it != entities_.end() ? it->second.get() : nullptr;
}

ProcessorEntityTracker* SharedModelTypeProcessor::CreateEntity(
    const std::string& tag,
    const EntityData& data) {
  DCHECK(entities_.find(data.client_tag_hash) == entities_.end());
  std::unique_ptr<ProcessorEntityTracker> entity =
      ProcessorEntityTracker::CreateNew(tag, data.client_tag_hash, data.id,
                                        data.creation_time);
  ProcessorEntityTracker* entity_ptr = entity.get();
  entities_[data.client_tag_hash] = std::move(entity);
  return entity_ptr;
}

ProcessorEntityTracker* SharedModelTypeProcessor::CreateEntity(
    const EntityData& data) {
  // Let the service define |client_tag| based on the entity data.
  const std::string tag = service_->GetClientTag(data);
  // This constraint may be relaxed in the future.
  DCHECK_EQ(data.client_tag_hash, GetHashForTag(tag));
  return CreateEntity(tag, data);
}

}  // namespace syncer_v2
