blob: 99e3ee5ea5dea8be9118ddc947572ce44ed0ce27 [file] [log] [blame]
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "sync/notifier/sync_system_resources.h"
#include <cstdlib>
#include <cstring>
#include <string>
#include "base/bind.h"
#include "base/logging.h"
#include "base/message_loop/message_loop.h"
#include "base/stl_util.h"
#include "base/strings/string_util.h"
#include "base/strings/stringprintf.h"
#include "google/cacheinvalidation/client_gateway.pb.h"
#include "google/cacheinvalidation/deps/callback.h"
#include "google/cacheinvalidation/include/types.h"
#include "sync/notifier/invalidation_util.h"
namespace syncer {
SyncLogger::SyncLogger() {}
SyncLogger::~SyncLogger() {}
void SyncLogger::Log(LogLevel level, const char* file, int line,
const char* format, ...) {
logging::LogSeverity log_severity = -2; // VLOG(2)
bool emit_log = false;
switch (level) {
case FINE_LEVEL:
log_severity = -2; // VLOG(2)
emit_log = VLOG_IS_ON(2);
break;
case INFO_LEVEL:
log_severity = -1; // VLOG(1)
emit_log = VLOG_IS_ON(1);
break;
case WARNING_LEVEL:
log_severity = logging::LOG_WARNING;
emit_log = LOG_IS_ON(WARNING);
break;
case SEVERE_LEVEL:
log_severity = logging::LOG_ERROR;
emit_log = LOG_IS_ON(ERROR);
break;
}
if (emit_log) {
va_list ap;
va_start(ap, format);
std::string result;
base::StringAppendV(&result, format, ap);
logging::LogMessage(file, line, log_severity).stream() << result;
va_end(ap);
}
}
void SyncLogger::SetSystemResources(invalidation::SystemResources* resources) {
// Do nothing.
}
SyncInvalidationScheduler::SyncInvalidationScheduler()
: created_on_loop_(base::MessageLoop::current()),
is_started_(false),
is_stopped_(false),
weak_factory_(this) {
CHECK(created_on_loop_);
}
SyncInvalidationScheduler::~SyncInvalidationScheduler() {
CHECK_EQ(created_on_loop_, base::MessageLoop::current());
CHECK(is_stopped_);
}
void SyncInvalidationScheduler::Start() {
CHECK_EQ(created_on_loop_, base::MessageLoop::current());
CHECK(!is_started_);
is_started_ = true;
is_stopped_ = false;
weak_factory_.InvalidateWeakPtrs();
}
void SyncInvalidationScheduler::Stop() {
CHECK_EQ(created_on_loop_, base::MessageLoop::current());
is_stopped_ = true;
is_started_ = false;
weak_factory_.InvalidateWeakPtrs();
STLDeleteElements(&posted_tasks_);
posted_tasks_.clear();
}
void SyncInvalidationScheduler::Schedule(invalidation::TimeDelta delay,
invalidation::Closure* task) {
DCHECK(invalidation::IsCallbackRepeatable(task));
CHECK_EQ(created_on_loop_, base::MessageLoop::current());
if (!is_started_) {
delete task;
return;
}
posted_tasks_.insert(task);
base::MessageLoop::current()->PostDelayedTask(
FROM_HERE, base::Bind(&SyncInvalidationScheduler::RunPostedTask,
weak_factory_.GetWeakPtr(), task),
delay);
}
bool SyncInvalidationScheduler::IsRunningOnThread() const {
return created_on_loop_ == base::MessageLoop::current();
}
invalidation::Time SyncInvalidationScheduler::GetCurrentTime() const {
CHECK_EQ(created_on_loop_, base::MessageLoop::current());
return base::Time::Now();
}
void SyncInvalidationScheduler::SetSystemResources(
invalidation::SystemResources* resources) {
// Do nothing.
}
void SyncInvalidationScheduler::RunPostedTask(invalidation::Closure* task) {
CHECK_EQ(created_on_loop_, base::MessageLoop::current());
task->Run();
posted_tasks_.erase(task);
delete task;
}
SyncNetworkChannel::SyncNetworkChannel()
: invalidator_state_(DEFAULT_INVALIDATION_ERROR),
scheduling_hash_(0) {
}
SyncNetworkChannel::~SyncNetworkChannel() {
STLDeleteElements(&network_status_receivers_);
}
void SyncNetworkChannel::SendMessage(const std::string& outgoing_message) {
std::string encoded_message;
EncodeMessage(&encoded_message, outgoing_message, service_context_,
scheduling_hash_);
SendEncodedMessage(encoded_message);
}
void SyncNetworkChannel::SetMessageReceiver(
invalidation::MessageCallback* incoming_receiver) {
incoming_receiver_.reset(incoming_receiver);
}
void SyncNetworkChannel::AddNetworkStatusReceiver(
invalidation::NetworkStatusCallback* network_status_receiver) {
network_status_receiver->Run(invalidator_state_ == INVALIDATIONS_ENABLED);
network_status_receivers_.push_back(network_status_receiver);
}
void SyncNetworkChannel::SetSystemResources(
invalidation::SystemResources* resources) {
// Do nothing.
}
void SyncNetworkChannel::AddObserver(Observer* observer) {
observers_.AddObserver(observer);
}
void SyncNetworkChannel::RemoveObserver(Observer* observer) {
observers_.RemoveObserver(observer);
}
const std::string& SyncNetworkChannel::GetServiceContextForTest() const {
return service_context_;
}
int64 SyncNetworkChannel::GetSchedulingHashForTest() const {
return scheduling_hash_;
}
std::string SyncNetworkChannel::EncodeMessageForTest(
const std::string& message, const std::string& service_context,
int64 scheduling_hash) {
std::string encoded_message;
EncodeMessage(&encoded_message, message, service_context, scheduling_hash);
return encoded_message;
}
bool SyncNetworkChannel::DecodeMessageForTest(
const std::string& data,
std::string* message,
std::string* service_context,
int64* scheduling_hash) {
return DecodeMessage(data, message, service_context, scheduling_hash);
}
void SyncNetworkChannel::NotifyStateChange(InvalidatorState invalidator_state) {
// Remember state for future NetworkStatusReceivers.
invalidator_state_ = invalidator_state;
// Notify NetworkStatusReceivers in cacheinvalidation.
for (NetworkStatusReceiverList::const_iterator it =
network_status_receivers_.begin();
it != network_status_receivers_.end(); ++it) {
(*it)->Run(invalidator_state_ == INVALIDATIONS_ENABLED);
}
// Notify observers.
FOR_EACH_OBSERVER(Observer, observers_,
OnNetworkChannelStateChanged(invalidator_state_));
}
void SyncNetworkChannel::DeliverIncomingMessage(const std::string& data) {
if (!incoming_receiver_) {
DLOG(ERROR) << "No receiver for incoming notification";
return;
}
std::string message;
if (!DecodeMessage(data,
&message, &service_context_, &scheduling_hash_)) {
DLOG(ERROR) << "Could not parse ClientGatewayMessage";
return;
}
incoming_receiver_->Run(message);
}
void SyncNetworkChannel::EncodeMessage(
std::string* encoded_message,
const std::string& message,
const std::string& service_context,
int64 scheduling_hash) {
ipc::invalidation::ClientGatewayMessage envelope;
envelope.set_is_client_to_server(true);
if (!service_context.empty()) {
envelope.set_service_context(service_context);
envelope.set_rpc_scheduling_hash(scheduling_hash);
}
envelope.set_network_message(message);
envelope.SerializeToString(encoded_message);
}
bool SyncNetworkChannel::DecodeMessage(
const std::string& data,
std::string* message,
std::string* service_context,
int64* scheduling_hash) {
ipc::invalidation::ClientGatewayMessage envelope;
if (!envelope.ParseFromString(data)) {
return false;
}
*message = envelope.network_message();
if (envelope.has_service_context()) {
*service_context = envelope.service_context();
}
if (envelope.has_rpc_scheduling_hash()) {
*scheduling_hash = envelope.rpc_scheduling_hash();
}
return true;
}
SyncStorage::SyncStorage(StateWriter* state_writer,
invalidation::Scheduler* scheduler)
: state_writer_(state_writer),
scheduler_(scheduler) {
DCHECK(state_writer_);
DCHECK(scheduler_);
}
SyncStorage::~SyncStorage() {}
void SyncStorage::WriteKey(const std::string& key, const std::string& value,
invalidation::WriteKeyCallback* done) {
CHECK(state_writer_);
// TODO(ghc): actually write key,value associations, and don't invoke the
// callback until the operation completes.
state_writer_->WriteState(value);
cached_state_ = value;
// According to the cache invalidation API folks, we can do this as
// long as we make sure to clear the persistent state that we start
// up the cache invalidation client with. However, we musn't do it
// right away, as we may be called under a lock that the callback
// uses.
scheduler_->Schedule(
invalidation::Scheduler::NoDelay(),
invalidation::NewPermanentCallback(
this, &SyncStorage::RunAndDeleteWriteKeyCallback,
done));
}
void SyncStorage::ReadKey(const std::string& key,
invalidation::ReadKeyCallback* done) {
DCHECK(scheduler_->IsRunningOnThread()) << "not running on scheduler thread";
RunAndDeleteReadKeyCallback(done, cached_state_);
}
void SyncStorage::DeleteKey(const std::string& key,
invalidation::DeleteKeyCallback* done) {
// TODO(ghc): Implement.
LOG(WARNING) << "ignoring call to DeleteKey(" << key << ", callback)";
}
void SyncStorage::ReadAllKeys(invalidation::ReadAllKeysCallback* done) {
// TODO(ghc): Implement.
LOG(WARNING) << "ignoring call to ReadAllKeys(callback)";
}
void SyncStorage::SetSystemResources(
invalidation::SystemResources* resources) {
// Do nothing.
}
void SyncStorage::RunAndDeleteWriteKeyCallback(
invalidation::WriteKeyCallback* callback) {
callback->Run(
invalidation::Status(invalidation::Status::SUCCESS, std::string()));
delete callback;
}
void SyncStorage::RunAndDeleteReadKeyCallback(
invalidation::ReadKeyCallback* callback, const std::string& value) {
callback->Run(std::make_pair(
invalidation::Status(invalidation::Status::SUCCESS, std::string()),
value));
delete callback;
}
SyncSystemResources::SyncSystemResources(
SyncNetworkChannel* sync_network_channel,
StateWriter* state_writer)
: is_started_(false),
logger_(new SyncLogger()),
internal_scheduler_(new SyncInvalidationScheduler()),
listener_scheduler_(new SyncInvalidationScheduler()),
storage_(new SyncStorage(state_writer, internal_scheduler_.get())),
sync_network_channel_(sync_network_channel) {
}
SyncSystemResources::~SyncSystemResources() {
Stop();
}
void SyncSystemResources::Start() {
internal_scheduler_->Start();
listener_scheduler_->Start();
is_started_ = true;
}
void SyncSystemResources::Stop() {
internal_scheduler_->Stop();
listener_scheduler_->Stop();
}
bool SyncSystemResources::IsStarted() const {
return is_started_;
}
void SyncSystemResources::set_platform(const std::string& platform) {
platform_ = platform;
}
std::string SyncSystemResources::platform() const {
return platform_;
}
SyncLogger* SyncSystemResources::logger() {
return logger_.get();
}
SyncStorage* SyncSystemResources::storage() {
return storage_.get();
}
SyncNetworkChannel* SyncSystemResources::network() {
return sync_network_channel_;
}
SyncInvalidationScheduler* SyncSystemResources::internal_scheduler() {
return internal_scheduler_.get();
}
SyncInvalidationScheduler* SyncSystemResources::listener_scheduler() {
return listener_scheduler_.get();
}
} // namespace syncer