blob: 531286f7fa55769f5eb2198dc33568143a9d8166 [file] [log] [blame]
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "sync/notifier/non_blocking_invalidation_notifier.h"
#include "base/logging.h"
#include "base/memory/scoped_ptr.h"
#include "base/message_loop.h"
#include "base/threading/thread.h"
#include "jingle/notifier/listener/push_client.h"
#include "sync/notifier/invalidation_notifier.h"
namespace sync_notifier {
class NonBlockingInvalidationNotifier::Core
: public base::RefCountedThreadSafe<NonBlockingInvalidationNotifier::Core>,
// SyncNotifierObserver to observe the InvalidationNotifier we create.
public SyncNotifierObserver {
public:
// Called on parent thread. |delegate_observer| should be
// initialized.
explicit Core(
const browser_sync::WeakHandle<SyncNotifierObserver>&
delegate_observer);
// Helpers called on I/O thread.
void Initialize(
const notifier::NotifierOptions& notifier_options,
const InvalidationVersionMap& initial_max_invalidation_versions,
const std::string& initial_invalidation_state,
const browser_sync::WeakHandle<InvalidationStateTracker>&
invalidation_state_tracker,
const std::string& client_info);
void Teardown();
void SetUniqueId(const std::string& unique_id);
void SetStateDeprecated(const std::string& state);
void UpdateCredentials(const std::string& email, const std::string& token);
void UpdateEnabledTypes(syncable::ModelTypeSet enabled_types);
// SyncNotifierObserver implementation (all called on I/O thread by
// InvalidationNotifier).
virtual void OnIncomingNotification(
const syncable::ModelTypePayloadMap& type_payloads,
IncomingNotificationSource source);
virtual void OnNotificationStateChange(bool notifications_enabled);
private:
friend class
base::RefCountedThreadSafe<NonBlockingInvalidationNotifier::Core>;
// Called on parent or I/O thread.
~Core();
// The variables below should be used only on the I/O thread.
const browser_sync::WeakHandle<SyncNotifierObserver> delegate_observer_;
scoped_ptr<InvalidationNotifier> invalidation_notifier_;
scoped_refptr<base::MessageLoopProxy> io_message_loop_proxy_;
DISALLOW_COPY_AND_ASSIGN(Core);
};
NonBlockingInvalidationNotifier::Core::Core(
const browser_sync::WeakHandle<SyncNotifierObserver>&
delegate_observer)
: delegate_observer_(delegate_observer) {
DCHECK(delegate_observer_.IsInitialized());
}
NonBlockingInvalidationNotifier::Core::~Core() {
}
void NonBlockingInvalidationNotifier::Core::Initialize(
const notifier::NotifierOptions& notifier_options,
const InvalidationVersionMap& initial_max_invalidation_versions,
const std::string& initial_invalidation_state,
const browser_sync::WeakHandle<InvalidationStateTracker>&
invalidation_state_tracker,
const std::string& client_info) {
DCHECK(notifier_options.request_context_getter);
DCHECK_EQ(notifier::NOTIFICATION_SERVER,
notifier_options.notification_method);
io_message_loop_proxy_ = notifier_options.request_context_getter->
GetIOMessageLoopProxy();
DCHECK(io_message_loop_proxy_->BelongsToCurrentThread());
invalidation_notifier_.reset(
new InvalidationNotifier(
notifier::PushClient::CreateDefaultOnIOThread(notifier_options),
initial_max_invalidation_versions,
initial_invalidation_state,
invalidation_state_tracker,
client_info));
invalidation_notifier_->AddObserver(this);
}
void NonBlockingInvalidationNotifier::Core::Teardown() {
DCHECK(io_message_loop_proxy_->BelongsToCurrentThread());
invalidation_notifier_->RemoveObserver(this);
invalidation_notifier_.reset();
io_message_loop_proxy_ = NULL;
}
void NonBlockingInvalidationNotifier::Core::SetUniqueId(
const std::string& unique_id) {
DCHECK(io_message_loop_proxy_->BelongsToCurrentThread());
invalidation_notifier_->SetUniqueId(unique_id);
}
void NonBlockingInvalidationNotifier::Core::SetStateDeprecated(
const std::string& state) {
DCHECK(io_message_loop_proxy_->BelongsToCurrentThread());
invalidation_notifier_->SetStateDeprecated(state);
}
void NonBlockingInvalidationNotifier::Core::UpdateCredentials(
const std::string& email, const std::string& token) {
DCHECK(io_message_loop_proxy_->BelongsToCurrentThread());
invalidation_notifier_->UpdateCredentials(email, token);
}
void NonBlockingInvalidationNotifier::Core::UpdateEnabledTypes(
syncable::ModelTypeSet enabled_types) {
DCHECK(io_message_loop_proxy_->BelongsToCurrentThread());
invalidation_notifier_->UpdateEnabledTypes(enabled_types);
}
void NonBlockingInvalidationNotifier::Core::OnIncomingNotification(
const syncable::ModelTypePayloadMap& type_payloads,
IncomingNotificationSource source) {
DCHECK(io_message_loop_proxy_->BelongsToCurrentThread());
delegate_observer_.Call(FROM_HERE,
&SyncNotifierObserver::OnIncomingNotification,
type_payloads,
source);
}
void NonBlockingInvalidationNotifier::Core::OnNotificationStateChange(
bool notifications_enabled) {
DCHECK(io_message_loop_proxy_->BelongsToCurrentThread());
delegate_observer_.Call(FROM_HERE,
&SyncNotifierObserver::OnNotificationStateChange,
notifications_enabled);
}
NonBlockingInvalidationNotifier::NonBlockingInvalidationNotifier(
const notifier::NotifierOptions& notifier_options,
const InvalidationVersionMap& initial_max_invalidation_versions,
const std::string& initial_invalidation_state,
const browser_sync::WeakHandle<InvalidationStateTracker>&
invalidation_state_tracker,
const std::string& client_info)
: weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
core_(
new Core(browser_sync::MakeWeakHandle(
weak_ptr_factory_.GetWeakPtr()))),
parent_message_loop_proxy_(
base::MessageLoopProxy::current()),
io_message_loop_proxy_(notifier_options.request_context_getter->
GetIOMessageLoopProxy()) {
if (!io_message_loop_proxy_->PostTask(
FROM_HERE,
base::Bind(
&NonBlockingInvalidationNotifier::Core::Initialize,
core_.get(),
notifier_options,
initial_max_invalidation_versions,
initial_invalidation_state,
invalidation_state_tracker,
client_info))) {
NOTREACHED();
}
}
NonBlockingInvalidationNotifier::~NonBlockingInvalidationNotifier() {
DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
if (!io_message_loop_proxy_->PostTask(
FROM_HERE,
base::Bind(&NonBlockingInvalidationNotifier::Core::Teardown,
core_.get()))) {
NOTREACHED();
}
}
void NonBlockingInvalidationNotifier::AddObserver(
SyncNotifierObserver* observer) {
DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
observers_.AddObserver(observer);
}
void NonBlockingInvalidationNotifier::RemoveObserver(
SyncNotifierObserver* observer) {
DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
observers_.RemoveObserver(observer);
}
void NonBlockingInvalidationNotifier::SetUniqueId(
const std::string& unique_id) {
DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
if (!io_message_loop_proxy_->PostTask(
FROM_HERE,
base::Bind(&NonBlockingInvalidationNotifier::Core::SetUniqueId,
core_.get(), unique_id))) {
NOTREACHED();
}
}
void NonBlockingInvalidationNotifier::SetStateDeprecated(
const std::string& state) {
DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
if (!io_message_loop_proxy_->PostTask(
FROM_HERE,
base::Bind(
&NonBlockingInvalidationNotifier::Core::SetStateDeprecated,
core_.get(), state))) {
NOTREACHED();
}
}
void NonBlockingInvalidationNotifier::UpdateCredentials(
const std::string& email, const std::string& token) {
DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
if (!io_message_loop_proxy_->PostTask(
FROM_HERE,
base::Bind(&NonBlockingInvalidationNotifier::Core::UpdateCredentials,
core_.get(), email, token))) {
NOTREACHED();
}
}
void NonBlockingInvalidationNotifier::UpdateEnabledTypes(
syncable::ModelTypeSet enabled_types) {
DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
if (!io_message_loop_proxy_->PostTask(
FROM_HERE,
base::Bind(&NonBlockingInvalidationNotifier::Core::UpdateEnabledTypes,
core_.get(), enabled_types))) {
NOTREACHED();
}
}
void NonBlockingInvalidationNotifier::SendNotification(
syncable::ModelTypeSet changed_types) {
DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
// InvalidationClient doesn't implement SendNotification(), so no
// need to forward on the call.
}
void NonBlockingInvalidationNotifier::OnIncomingNotification(
const syncable::ModelTypePayloadMap& type_payloads,
IncomingNotificationSource source) {
DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
FOR_EACH_OBSERVER(SyncNotifierObserver, observers_,
OnIncomingNotification(type_payloads, source));
}
void NonBlockingInvalidationNotifier::OnNotificationStateChange(
bool notifications_enabled) {
DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
FOR_EACH_OBSERVER(SyncNotifierObserver, observers_,
OnNotificationStateChange(notifications_enabled));
}
} // namespace sync_notifier