blob: 8fc9e2f1781853d987e296e7eb6189ae8745b051 [file] [log] [blame]
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "content/browser/leveldb_wrapper_impl.h"
#include "base/bind.h"
#include "base/metrics/histogram_macros.h"
#include "base/threading/thread_task_runner_handle.h"
#include "components/leveldb/public/cpp/util.h"
#include "content/public/browser/browser_thread.h"
namespace content {
void LevelDBWrapperImpl::Delegate::MigrateData(
base::OnceCallback<void(std::unique_ptr<ValueMap>)> callback) {
std::move(callback).Run(nullptr);
}
void LevelDBWrapperImpl::Delegate::OnMapLoaded(leveldb::mojom::DatabaseError) {}
bool LevelDBWrapperImpl::s_aggressive_flushing_enabled_ = false;
LevelDBWrapperImpl::RateLimiter::RateLimiter(size_t desired_rate,
base::TimeDelta time_quantum)
: rate_(desired_rate), samples_(0), time_quantum_(time_quantum) {
DCHECK_GT(desired_rate, 0ul);
}
base::TimeDelta LevelDBWrapperImpl::RateLimiter::ComputeTimeNeeded() const {
return time_quantum_ * (samples_ / rate_);
}
base::TimeDelta LevelDBWrapperImpl::RateLimiter::ComputeDelayNeeded(
const base::TimeDelta elapsed_time) const {
base::TimeDelta time_needed = ComputeTimeNeeded();
if (time_needed > elapsed_time)
return time_needed - elapsed_time;
return base::TimeDelta();
}
LevelDBWrapperImpl::CommitBatch::CommitBatch() : clear_all_first(false) {}
LevelDBWrapperImpl::CommitBatch::~CommitBatch() {}
LevelDBWrapperImpl::LevelDBWrapperImpl(
leveldb::mojom::LevelDBDatabase* database,
const std::string& prefix,
size_t max_size,
base::TimeDelta default_commit_delay,
int max_bytes_per_hour,
int max_commits_per_hour,
Delegate* delegate)
: prefix_(leveldb::StdStringToUint8Vector(prefix)),
delegate_(delegate),
database_(database),
bytes_used_(0),
max_size_(max_size),
start_time_(base::TimeTicks::Now()),
default_commit_delay_(default_commit_delay),
data_rate_limiter_(max_bytes_per_hour, base::TimeDelta::FromHours(1)),
commit_rate_limiter_(max_commits_per_hour, base::TimeDelta::FromHours(1)),
weak_ptr_factory_(this) {
bindings_.set_connection_error_handler(base::Bind(
&LevelDBWrapperImpl::OnConnectionError, base::Unretained(this)));
}
LevelDBWrapperImpl::~LevelDBWrapperImpl() {
if (commit_batch_)
CommitChanges();
}
void LevelDBWrapperImpl::Bind(mojom::LevelDBWrapperRequest request) {
bindings_.AddBinding(this, std::move(request));
}
void LevelDBWrapperImpl::EnableAggressiveCommitDelay() {
s_aggressive_flushing_enabled_ = true;
}
void LevelDBWrapperImpl::ScheduleImmediateCommit() {
if (!on_load_complete_tasks_.empty()) {
LoadMap(base::Bind(&LevelDBWrapperImpl::ScheduleImmediateCommit,
base::Unretained(this)));
return;
}
if (!database_ || !commit_batch_)
return;
CommitChanges();
}
void LevelDBWrapperImpl::PurgeMemory() {
if (!map_ || // We're not using any memory.
commit_batch_ || // We leave things alone with changes pending.
!database_) { // Don't purge anything if we're not backed by a database.
return;
}
map_.reset();
}
void LevelDBWrapperImpl::AddObserver(
mojom::LevelDBObserverAssociatedPtrInfo observer) {
mojom::LevelDBObserverAssociatedPtr observer_ptr;
observer_ptr.Bind(std::move(observer));
observers_.AddPtr(std::move(observer_ptr));
}
void LevelDBWrapperImpl::Put(const std::vector<uint8_t>& key,
const std::vector<uint8_t>& value,
const std::string& source,
PutCallback callback) {
if (!map_) {
LoadMap(base::Bind(&LevelDBWrapperImpl::Put, base::Unretained(this), key,
value, source, base::Passed(&callback)));
return;
}
bool has_old_item = false;
size_t old_item_size = 0;
auto found = map_->find(key);
if (found != map_->end()) {
if (found->second == value) {
std::move(callback).Run(true); // Key already has this value.
return;
}
old_item_size = key.size() + found->second.size();
has_old_item = true;
}
size_t new_item_size = key.size() + value.size();
size_t new_bytes_used = bytes_used_ - old_item_size + new_item_size;
// Only check quota if the size is increasing, this allows
// shrinking changes to pre-existing maps that are over budget.
if (new_item_size > old_item_size && new_bytes_used > max_size_) {
std::move(callback).Run(false);
return;
}
if (database_) {
CreateCommitBatchIfNeeded();
commit_batch_->changed_keys.insert(key);
}
std::vector<uint8_t> old_value;
if (has_old_item) {
old_value.swap((*map_)[key]);
}
(*map_)[key] = value;
bytes_used_ = new_bytes_used;
if (!has_old_item) {
// We added a new key/value pair.
observers_.ForAllPtrs(
[&key, &value, &source](mojom::LevelDBObserver* observer) {
observer->KeyAdded(key, value, source);
});
} else {
// We changed the value for an existing key.
observers_.ForAllPtrs(
[&key, &value, &source, &old_value](mojom::LevelDBObserver* observer) {
observer->KeyChanged(key, value, old_value, source);
});
}
std::move(callback).Run(true);
}
void LevelDBWrapperImpl::Delete(const std::vector<uint8_t>& key,
const std::string& source,
DeleteCallback callback) {
if (!map_) {
LoadMap(base::Bind(&LevelDBWrapperImpl::Delete, base::Unretained(this), key,
source, base::Passed(&callback)));
return;
}
auto found = map_->find(key);
if (found == map_->end()) {
std::move(callback).Run(true);
return;
}
if (database_) {
CreateCommitBatchIfNeeded();
commit_batch_->changed_keys.insert(std::move(found->first));
}
std::vector<uint8_t> old_value(std::move(found->second));
map_->erase(found);
bytes_used_ -= key.size() + old_value.size();
observers_.ForAllPtrs(
[&key, &source, &old_value](mojom::LevelDBObserver* observer) {
observer->KeyDeleted(key, old_value, source);
});
std::move(callback).Run(true);
}
void LevelDBWrapperImpl::DeleteAll(const std::string& source,
DeleteAllCallback callback) {
if (!map_) {
LoadMap(base::Bind(&LevelDBWrapperImpl::DeleteAll, base::Unretained(this),
source, base::Passed(&callback)));
return;
}
if (map_->empty()) {
std::move(callback).Run(true);
return;
}
if (database_) {
CreateCommitBatchIfNeeded();
commit_batch_->clear_all_first = true;
commit_batch_->changed_keys.clear();
}
map_->clear();
bytes_used_ = 0;
observers_.ForAllPtrs(
[&source](mojom::LevelDBObserver* observer) {
observer->AllDeleted(source);
});
std::move(callback).Run(true);
}
void LevelDBWrapperImpl::Get(const std::vector<uint8_t>& key,
GetCallback callback) {
if (!map_) {
LoadMap(base::Bind(&LevelDBWrapperImpl::Get, base::Unretained(this), key,
base::Passed(&callback)));
return;
}
auto found = map_->find(key);
if (found == map_->end()) {
std::move(callback).Run(false, std::vector<uint8_t>());
return;
}
std::move(callback).Run(true, found->second);
}
void LevelDBWrapperImpl::GetAll(
mojom::LevelDBWrapperGetAllCallbackAssociatedPtrInfo complete_callback,
GetAllCallback callback) {
if (!map_) {
LoadMap(base::Bind(&LevelDBWrapperImpl::GetAll, base::Unretained(this),
base::Passed(&complete_callback),
base::Passed(&callback)));
return;
}
std::vector<mojom::KeyValuePtr> all;
for (const auto& it : (*map_)) {
mojom::KeyValuePtr kv = mojom::KeyValue::New();
kv->key = it.first;
kv->value = it.second;
all.push_back(std::move(kv));
}
std::move(callback).Run(leveldb::mojom::DatabaseError::OK, std::move(all));
if (complete_callback.is_valid()) {
mojom::LevelDBWrapperGetAllCallbackAssociatedPtr complete_ptr;
complete_ptr.Bind(std::move(complete_callback));
complete_ptr->Complete(true);
}
}
void LevelDBWrapperImpl::OnConnectionError() {
if (!bindings_.empty())
return;
// If any tasks are waiting for load to complete, delay calling the
// no_bindings_callback_ until all those tasks have completed.
if (!on_load_complete_tasks_.empty())
return;
delegate_->OnNoBindings();
}
void LevelDBWrapperImpl::LoadMap(const base::Closure& completion_callback) {
DCHECK(!map_);
on_load_complete_tasks_.push_back(completion_callback);
if (on_load_complete_tasks_.size() > 1)
return;
if (!database_) {
OnMapLoaded(leveldb::mojom::DatabaseError::IO_ERROR,
std::vector<leveldb::mojom::KeyValuePtr>());
return;
}
database_->GetPrefixed(prefix_, base::Bind(&LevelDBWrapperImpl::OnMapLoaded,
weak_ptr_factory_.GetWeakPtr()));
}
void LevelDBWrapperImpl::OnMapLoaded(
leveldb::mojom::DatabaseError status,
std::vector<leveldb::mojom::KeyValuePtr> data) {
DCHECK(!map_);
if (data.empty() && status == leveldb::mojom::DatabaseError::OK) {
delegate_->MigrateData(
base::BindOnce(&LevelDBWrapperImpl::OnGotMigrationData,
weak_ptr_factory_.GetWeakPtr()));
return;
}
map_.reset(new ValueMap);
bytes_used_ = 0;
for (auto& it : data) {
DCHECK_GE(it->key.size(), prefix_.size());
(*map_)[std::vector<uint8_t>(it->key.begin() + prefix_.size(),
it->key.end())] = it->value;
bytes_used_ += it->key.size() - prefix_.size() + it->value.size();
}
// We proceed without using a backing store, nothing will be persisted but the
// class is functional for the lifetime of the object.
delegate_->OnMapLoaded(status);
if (status != leveldb::mojom::DatabaseError::OK)
database_ = nullptr;
OnLoadComplete();
}
void LevelDBWrapperImpl::OnGotMigrationData(std::unique_ptr<ValueMap> data) {
map_ = data ? std::move(data) : base::MakeUnique<ValueMap>();
bytes_used_ = 0;
for (const auto& it : *map_)
bytes_used_ += it.first.size() + it.second.size();
if (database_ && !empty()) {
CreateCommitBatchIfNeeded();
for (const auto& it : *map_)
commit_batch_->changed_keys.insert(it.first);
CommitChanges();
}
OnLoadComplete();
}
void LevelDBWrapperImpl::OnLoadComplete() {
std::vector<base::Closure> tasks;
on_load_complete_tasks_.swap(tasks);
for (auto& task : tasks)
task.Run();
// We might need to call the no_bindings_callback_ here if bindings became
// empty while waiting for load to complete.
if (bindings_.empty())
delegate_->OnNoBindings();
}
void LevelDBWrapperImpl::CreateCommitBatchIfNeeded() {
if (commit_batch_)
return;
commit_batch_.reset(new CommitBatch());
BrowserThread::PostAfterStartupTask(
FROM_HERE, base::ThreadTaskRunnerHandle::Get(),
base::Bind(&LevelDBWrapperImpl::StartCommitTimer,
weak_ptr_factory_.GetWeakPtr()));
}
void LevelDBWrapperImpl::StartCommitTimer() {
if (!commit_batch_)
return;
// Start a timer to commit any changes that accrue in the batch, but only if
// no commits are currently in flight. In that case the timer will be
// started after the commits have happened.
if (commit_batches_in_flight_)
return;
base::ThreadTaskRunnerHandle::Get()->PostDelayedTask(
FROM_HERE, base::Bind(&LevelDBWrapperImpl::CommitChanges,
weak_ptr_factory_.GetWeakPtr()),
ComputeCommitDelay());
}
base::TimeDelta LevelDBWrapperImpl::ComputeCommitDelay() const {
if (s_aggressive_flushing_enabled_)
return base::TimeDelta::FromSeconds(1);
base::TimeDelta elapsed_time = base::TimeTicks::Now() - start_time_;
base::TimeDelta delay = std::max(
default_commit_delay_,
std::max(commit_rate_limiter_.ComputeDelayNeeded(elapsed_time),
data_rate_limiter_.ComputeDelayNeeded(elapsed_time)));
UMA_HISTOGRAM_LONG_TIMES("LevelDBWrapper.CommitDelay", delay);
return delay;
}
void LevelDBWrapperImpl::CommitChanges() {
DCHECK(database_);
DCHECK(map_);
if (!commit_batch_)
return;
commit_rate_limiter_.add_samples(1);
// Commit all our changes in a single batch.
std::vector<leveldb::mojom::BatchedOperationPtr> operations =
delegate_->PrepareToCommit();
if (commit_batch_->clear_all_first) {
leveldb::mojom::BatchedOperationPtr item =
leveldb::mojom::BatchedOperation::New();
item->type = leveldb::mojom::BatchOperationType::DELETE_PREFIXED_KEY;
item->key = prefix_;
operations.push_back(std::move(item));
}
size_t data_size = 0;
for (const auto& key: commit_batch_->changed_keys) {
data_size += key.size();
leveldb::mojom::BatchedOperationPtr item =
leveldb::mojom::BatchedOperation::New();
item->key.reserve(prefix_.size() + key.size());
item->key.insert(item->key.end(), prefix_.begin(), prefix_.end());
item->key.insert(item->key.end(), key.begin(), key.end());
auto it = map_->find(key);
if (it == map_->end()) {
item->type = leveldb::mojom::BatchOperationType::DELETE_KEY;
} else {
item->type = leveldb::mojom::BatchOperationType::PUT_KEY;
item->value = it->second;
data_size += it->second.size();
}
operations.push_back(std::move(item));
}
commit_batch_.reset();
data_rate_limiter_.add_samples(data_size);
++commit_batches_in_flight_;
// TODO(michaeln): Currently there is no guarantee LevelDBDatabaseImp::Write
// will run during a clean shutdown. We need that to avoid dataloss.
database_->Write(std::move(operations),
base::Bind(&LevelDBWrapperImpl::OnCommitComplete,
weak_ptr_factory_.GetWeakPtr()));
}
void LevelDBWrapperImpl::OnCommitComplete(leveldb::mojom::DatabaseError error) {
--commit_batches_in_flight_;
StartCommitTimer();
delegate_->DidCommit(error);
}
} // namespace content