blob: 6f74c6c4285a92767b8cb4a21b404335b35b198a [file] [log] [blame] [edit]
// Copyright 2017 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// Safe Observer/Observable implementation.
//
// When using ObserverListThreadSafe, we were running into issues since there
// was no synchronization between getting the existing value, and registering
// as an observer. See go/cast-platform-design-synchronized-value for more
// details.
//
// To fix this issue, and to make observing values safer and simpler in general,
// use the Observable/Observer pattern in this file. When you have a value that
// other code wants to observe (ie, get the value of and receive any changes),
// wrap that value in an Observable<T>. The value type T must be copyable and
// copy-assignable. The Observable must be constructed with the initial observed
// value, and the value may be updated at any time from any thread by calling
// SetValue(). You can also get the value using GetValue(); but note that this
// is not threadsafe (the value is returned without locking), so the caller must
// ensure safety by other means. Calling GetValueThreadSafe() is threadsafe but
// involves a mutex lock.
//
// Code that wants to observe the value calls Observe() on it at any point when
// the value is alive. Note that Observe() may be called safely from any thread.
// Observe() returns an Observer<T> instance, which MUST be used and destroyed
// only on the thread that called Observe(). The Observer initially contains the
// value that the Observable had when Observe() was called, and that value will
// be updated asynchronously whenever the Observable's SetValue() method is
// is called. NOTE: the initial value of the Observer is the value known to the
// thread that created the Observer at the time; there may be an updated value
// from the Observable that hasn't been handled by the Observer's thread yet.
//
// The Observer's view of the observed value is returned by GetValue(); this is
// a low-cost call since there is no locking (the value is updated on the thread
// that constructed the Observer). Note that Observers are always updated
// asynchronously with PostTask(), even if they belong to the same thread that
// calls SetValue(). All Observers on the same thread have the same consistent
// view of the observed value.
//
// Observers may be copied freely; the copy also observes the original
// Observable, and belongs to the thread that created the copy. Copying is safe
// even when the original Observable has been destroyed.
//
// Code may register a callback that is called whenever an Observer's value is
// updated, by calling SetOnUpdateCallback(). If you get an Observer by calling
// Observe() and then immediately call SetOnUpdateCallback() to register a
// a callback, you are guaranteed to get every value of the Observable starting
// from when you called Observe() - you get the initial value by calling
// GetValue() on the returned Observer, and any subsequent updates will trigger
// the callback so you can call GetValue() to get the new value. You will not
// receive any extra callbacks (exactly one callback per value update).
//
// Note that Observers are not default-constructible, since there is no way to
// construct it in a default state. In cases where you need to instantiate an
// Observer after your constructor, you can use a std::unique_ptr<Observer>
// instead, and initialize it when needed.
//
// Example usage:
//
// class MediaManager {
// public:
// MediaManager() : volume_(0.0f) {}
//
// Observer<float> ObserveVolume() { return volume_.Observe(); }
//
// // ... other methods ...
//
// private:
// // Assume this is called from some other internal code when the volume is
// // updated.
// void OnUpdateVolume(float new_volume) {
// volume_.SetValue(new_volume); // All observers will get the new value.
// }
//
// Observable<float> volume_;
// }
//
// class VolumeFeedbackManager {
// public:
// VolumeFeedbackManager(MediaManager* media_manager)
// : volume_observer_(media_manager->ObserveVolume()) {
// volume_observer_.SetOnUpdateCallback(
// base::BindRepeating(&VolumeFeedbackManager::OnVolumeChanged,
// base::Unretained(this)));
// }
//
// private:
// void OnVolumeChanged() {
// ShowVolumeFeedback(volume_observer_.GetValue());
// }
//
// void ShowVolumeFeedback(float volume) {
// // ... some implementation ...
// }
// };
//
#ifndef CHROMECAST_BASE_OBSERVER_H_
#define CHROMECAST_BASE_OBSERVER_H_
#include <stddef.h>
#include <stdint.h>
#include <memory>
#include <utility>
#include <vector>
#include "base/check_op.h"
#include "base/containers/contains.h"
#include "base/functional/bind.h"
#include "base/functional/callback.h"
#include "base/location.h"
#include "base/memory/ref_counted.h"
#include "base/notreached.h"
#include "base/sequence_checker.h"
#include "base/synchronization/lock.h"
#include "base/task/sequenced_task_runner.h"
namespace chromecast {
namespace subtle {
template <typename T>
class ObservableInternals;
} // namespace subtle
template <typename T>
class Observable;
template <typename T>
class Observer {
public:
Observer(const Observer& other);
Observer& operator=(const Observer&) = delete;
~Observer();
void SetOnUpdateCallback(base::RepeatingClosure callback) {
on_update_callback_ = std::move(callback);
}
const T& GetValue() const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
return value_;
}
private:
friend class subtle::ObservableInternals<T>;
friend class Observable<T>;
explicit Observer(scoped_refptr<subtle::ObservableInternals<T>> internals);
void OnUpdate();
const scoped_refptr<subtle::ObservableInternals<T>> internals_;
// Note: value_ is a const ref to the value copy for this sequence, stored in
// SequenceOwnedInfo.
const T& value_;
base::RepeatingClosure on_update_callback_;
SEQUENCE_CHECKER(sequence_checker_);
};
template <typename T>
class Observable {
static_assert(std::is_copy_constructible<T>::value,
"Observable values must be copyable");
static_assert(std::is_copy_assignable<T>::value,
"Observable values must be copy-assignable");
public:
explicit Observable(const T& initial_value);
Observable(const Observable&) = delete;
Observable& operator=(const Observable&) = delete;
Observer<T> Observe();
void SetValue(const T& new_value);
const T& GetValue() const; // NOT threadsafe!
T GetValueThreadSafe() const;
private:
// By using a refcounted object to store the value and observer list, we can
// avoid tying the lifetime of Observable to its Observers or vice versa.
const scoped_refptr<subtle::ObservableInternals<T>> internals_;
};
namespace subtle {
template <typename T>
class ObservableInternals
: public base::RefCountedThreadSafe<ObservableInternals<T>> {
public:
explicit ObservableInternals(const T& initial_value)
: value_(initial_value) {}
ObservableInternals(const ObservableInternals&) = delete;
ObservableInternals& operator=(const ObservableInternals&) = delete;
void SetValue(const T& new_value) {
base::AutoLock lock(lock_);
value_ = new_value;
for (auto& item : per_sequence_) {
item.SetValue(new_value);
}
}
const T& GetValue() const { return value_; }
T GetValueThreadSafe() const {
base::AutoLock lock(lock_);
return value_;
}
const T& AddObserver(Observer<T>* observer) {
DCHECK(observer);
DCHECK(base::SequencedTaskRunner::HasCurrentDefault());
auto task_runner = base::SequencedTaskRunner::GetCurrentDefault();
base::AutoLock lock(lock_);
auto it = per_sequence_.begin();
while (it != per_sequence_.end() && it->task_runner() != task_runner) {
++it;
}
if (it == per_sequence_.end()) {
per_sequence_.emplace_back(std::move(task_runner), value_);
it = --per_sequence_.end();
}
it->AddObserver(observer);
return it->value();
}
void RemoveObserver(Observer<T>* observer) {
DCHECK(observer);
DCHECK(base::SequencedTaskRunner::HasCurrentDefault());
auto task_runner = base::SequencedTaskRunner::GetCurrentDefault();
base::AutoLock lock(lock_);
for (size_t i = 0; i < per_sequence_.size(); ++i) {
if (per_sequence_[i].task_runner() == task_runner) {
per_sequence_[i].RemoveObserver(observer);
if (per_sequence_[i].Empty()) {
per_sequence_[i].Swap(per_sequence_.back());
per_sequence_.pop_back();
}
return;
}
}
NOTREACHED_IN_MIGRATION()
<< "Tried to remove observer from unknown task runner";
}
private:
// Information owned by a particular sequence. Must be only accessed on that
// sequence, and must be deleted by posting a task to that sequence.
// This class MUST NOT contain a scoped_refptr to the task_runner, since if it
// did, there would be a reference cycle during cleanup, when the task to
// Destroy() is posted.
class SequenceOwnedInfo {
public:
explicit SequenceOwnedInfo(const T& value) : value_(value) {}
SequenceOwnedInfo(const SequenceOwnedInfo&) = delete;
SequenceOwnedInfo& operator=(const SequenceOwnedInfo&) = delete;
const T& value() const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
return value_;
}
void AddObserver(Observer<T>* observer) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(observer);
DCHECK(!base::Contains(observers_, observer));
observers_.push_back(observer);
}
void RemoveObserver(Observer<T>* observer) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(observer);
DCHECK(base::Contains(observers_, observer));
observers_.erase(
std::remove(observers_.begin(), observers_.end(), observer),
observers_.end());
}
bool Empty() const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
return observers_.empty();
}
void SetValue(const T& value) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
value_ = value;
for (auto* obs : observers_) {
obs->OnUpdate();
}
}
static void Destroy(std::unique_ptr<SequenceOwnedInfo> self) {
// The unique_ptr deletes automatically.
}
private:
std::vector<Observer<T>*> observers_;
T value_;
SEQUENCE_CHECKER(sequence_checker_);
};
class PerSequenceInfo {
public:
PerSequenceInfo(scoped_refptr<base::SequencedTaskRunner> task_runner,
const T& value)
: task_runner_(std::move(task_runner)),
owned_info_(std::make_unique<SequenceOwnedInfo>(value)) {}
PerSequenceInfo(PerSequenceInfo&& other) = default;
~PerSequenceInfo() {
if (!owned_info_) {
// Members have been moved out via move constructor.
return;
}
DCHECK(Empty());
// Must post a task to delete the owned info, since there may still be a
// pending task to call SequenceOwnedInfo::SetValue().
// Use manual PostNonNestableTask(), since DeleteSoon() does not
// guarantee deletion.
task_runner_->PostNonNestableTask(
FROM_HERE,
base::BindOnce(&SequenceOwnedInfo::Destroy, std::move(owned_info_)));
}
const T& value() const { return owned_info_->value(); }
const base::SequencedTaskRunner* task_runner() const {
return task_runner_.get();
}
void AddObserver(Observer<T>* observer) {
owned_info_->AddObserver(observer);
}
void RemoveObserver(Observer<T>* observer) {
owned_info_->RemoveObserver(observer);
}
bool Empty() const { return owned_info_->Empty(); }
void Swap(PerSequenceInfo& other) {
std::swap(task_runner_, other.task_runner_);
std::swap(owned_info_, other.owned_info_);
}
void SetValue(const T& value) {
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&SequenceOwnedInfo::SetValue,
base::Unretained(owned_info_.get()), value));
}
private:
// Operations on |owned_info| do not need to be synchronized with a lock,
// since all operations must occur on |task_runner|.
scoped_refptr<base::SequencedTaskRunner> task_runner_;
std::unique_ptr<SequenceOwnedInfo> owned_info_;
};
friend class base::RefCountedThreadSafe<ObservableInternals>;
~ObservableInternals() {}
mutable base::Lock lock_;
T value_;
std::vector<PerSequenceInfo> per_sequence_;
};
} // namespace subtle
template <typename T>
Observer<T>::Observer(scoped_refptr<subtle::ObservableInternals<T>> internals)
: internals_(std::move(internals)), value_(internals_->AddObserver(this)) {}
template <typename T>
Observer<T>::Observer(const Observer& other) : Observer(other.internals_) {}
template <typename T>
Observer<T>::~Observer() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
internals_->RemoveObserver(this);
}
template <typename T>
void Observer<T>::OnUpdate() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (on_update_callback_) {
on_update_callback_.Run();
}
}
template <typename T>
Observable<T>::Observable(const T& initial_value)
: internals_(base::WrapRefCounted(
new subtle::ObservableInternals<T>(initial_value))) {}
template <typename T>
Observer<T> Observable<T>::Observe() {
return Observer<T>(internals_);
}
template <typename T>
void Observable<T>::SetValue(const T& new_value) {
internals_->SetValue(new_value);
}
template <typename T>
const T& Observable<T>::GetValue() const {
return internals_->GetValue();
}
template <typename T>
T Observable<T>::GetValueThreadSafe() const {
return internals_->GetValueThreadSafe();
}
} // namespace chromecast
#endif // CHROMECAST_BASE_OBSERVER_H_