blob: 96445ba68b522f253f2371e3d361269df4a26037 [file] [log] [blame]
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/feed/core/v2/feed_store.h"
#include <utility>
#include "base/bind.h"
#include "base/callback_helpers.h"
#include "base/containers/flat_set.h"
#include "base/files/file_path.h"
#include "base/memory/ptr_util.h"
#include "base/notreached.h"
#include "base/strings/strcat.h"
#include "base/strings/string_number_conversions.h"
#include "base/strings/string_piece.h"
#include "base/strings/string_util.h"
#include "base/task/post_task.h"
#include "base/task/thread_pool.h"
#include "components/feed/core/proto/v2/store.pb.h"
#include "components/feed/core/v2/feedstore_util.h"
#include "components/feed/core/v2/protocol_translator.h"
#include "components/feed/core/v2/public/stream_type.h"
#include "components/leveldb_proto/public/proto_database_provider.h"
namespace feed {
namespace {
// Keys are defined as:
// [Key format] -> [Record field]
// S/<stream-id> -> stream_data
// T/<stream-id>/<sequence-number> -> stream_structures
// c/<stream-id>/<content-id> -> content
// s/<stream-id>/<content-id> -> shared_state
// a/<action-id> -> action
// m -> metadata
// subs -> subscribed_web_feeds
// recommendedIndex -> recommended_web_feed_index
// R/<web_feed_id> -> recommended_web_feed
constexpr char kLocalActionPrefix[] = "a/";
constexpr char kMetadataKey[] = "m";
constexpr char kSubscribedFeedsKey[] = "subs";
constexpr char kRecommendedIndexKey[] = "recommendedIndex";
leveldb::ReadOptions CreateReadOptions() {
leveldb::ReadOptions opts;
opts.fill_cache = false;
return opts;
}
// For avoiding multiple `StrCat()` calls when generating a single string.
#define CONTENT_ID_STRING_PARTS(content_id) \
content_id.content_domain(), ",", base::NumberToString(content_id.type()), \
",", base::NumberToString(content_id.id())
std::string StreamDataKey(const base::StringPiece stream_id) {
return base::StrCat({"S/", stream_id});
}
std::string StreamDataKey(const StreamType& stream_type) {
return StreamDataKey(feedstore::StreamId(stream_type));
}
std::string ContentKey(const base::StringPiece stream_type,
const feedwire::ContentId& content_id) {
return base::StrCat(
{"c/", stream_type, "/", CONTENT_ID_STRING_PARTS(content_id)});
}
std::string ContentKey(const StreamType& stream_type,
const feedwire::ContentId& content_id) {
return ContentKey(feedstore::StreamId(stream_type), content_id);
}
std::string SharedStateKey(const base::StringPiece stream_type,
const feedwire::ContentId& content_id) {
return base::StrCat(
{"s/", stream_type, "/", CONTENT_ID_STRING_PARTS(content_id)});
}
std::string SharedStateKey(const StreamType& stream_type,
const feedwire::ContentId& content_id) {
return SharedStateKey(feedstore::StreamId(stream_type), content_id);
}
std::string LocalActionKey(int64_t id) {
return kLocalActionPrefix + base::NumberToString(id);
}
std::string LocalActionKey(const LocalActionId& id) {
return LocalActionKey(id.GetUnsafeValue());
}
// Returns true if the record key is for stream data (stream_data,
// stream_structures, content, shared_state).
bool IsAnyStreamRecordKey(const std::string& key) {
return key.size() > 1 && key[1] == '/' &&
(key[0] == 'S' || key[0] == 'T' || key[0] == 'c' || key[0] == 's');
}
// For matching keys that belong to a specific stream type.
class StreamKeyMatcher {
public:
explicit StreamKeyMatcher(const StreamType& stream_type) {
stream_id_ = std::string(feedstore::StreamId(stream_type));
stream_id_plus_slash_ = stream_id_ + '/';
}
// Returns true if `key` is a key specific to `stream_type`.
bool IsKeyForStream(base::StringPiece key) const {
if (key.size() < 2 || key[1] != '/')
return false;
const base::StringPiece key_suffix = key.substr(2);
switch (key[0]) {
case 'S':
return key_suffix == stream_id_;
case 'T':
case 'c':
case 's':
return base::StartsWith(key_suffix, stream_id_plus_slash_);
}
return false;
}
private:
std::string stream_id_;
std::string stream_id_plus_slash_;
};
bool IsLocalActionKey(const std::string& key) {
return base::StartsWith(key, kLocalActionPrefix);
}
std::string KeyForRecord(const feedstore::Record& record) {
switch (record.data_case()) {
case feedstore::Record::kStreamData: {
const std::string stream_id = record.stream_data().stream_id();
return stream_id.empty() ? StreamDataKey(kForYouStream)
: StreamDataKey(stream_id);
}
case feedstore::Record::kStreamStructures:
return base::StrCat(
{"T/", record.stream_structures().stream_id(), "/",
base::NumberToString(record.stream_structures().sequence_number())});
case feedstore::Record::kContent:
return ContentKey(record.content().stream_id(),
record.content().content_id());
case feedstore::Record::kLocalAction:
return LocalActionKey(record.local_action().id());
case feedstore::Record::kSharedState:
return SharedStateKey(record.shared_state().stream_id(),
record.shared_state().content_id());
case feedstore::Record::kMetadata:
return kMetadataKey;
case feedstore::Record::kSubscribedWebFeeds:
return kSubscribedFeedsKey;
case feedstore::Record::kRecommendedWebFeed:
return base::StrCat({"R/", record.recommended_web_feed().web_feed_id()});
case feedstore::Record::kRecommendedWebFeedIndex:
return kRecommendedIndexKey;
case feedstore::Record::DATA_NOT_SET:
break;
}
NOTREACHED() << "Invalid record case " << record.data_case();
return "";
}
bool FilterByKey(const base::flat_set<std::string>& key_set,
const std::string& key) {
return key_set.contains(key);
}
feedstore::Record MakeRecord(feedstore::Content content) {
feedstore::Record record;
*record.mutable_content() = std::move(content);
return record;
}
feedstore::Record MakeRecord(
feedstore::StreamStructureSet stream_structure_set) {
feedstore::Record record;
*record.mutable_stream_structures() = std::move(stream_structure_set);
return record;
}
feedstore::Record MakeRecord(feedstore::StreamSharedState shared_state) {
feedstore::Record record;
*record.mutable_shared_state() = std::move(shared_state);
return record;
}
feedstore::Record MakeRecord(feedstore::StreamData stream_data) {
feedstore::Record record;
*record.mutable_stream_data() = std::move(stream_data);
return record;
}
feedstore::Record MakeRecord(feedstore::StoredAction action) {
feedstore::Record record;
*record.mutable_local_action() = std::move(action);
return record;
}
feedstore::Record MakeRecord(feedstore::Metadata metadata) {
feedstore::Record record;
*record.mutable_metadata() = std::move(metadata);
return record;
}
feedstore::Record MakeRecord(feedstore::RecommendedWebFeedIndex index) {
feedstore::Record record;
*record.mutable_recommended_web_feed_index() = std::move(index);
return record;
}
feedstore::Record MakeRecord(
feedstore::SubscribedWebFeeds subscribed_web_feeds) {
feedstore::Record record;
*record.mutable_subscribed_web_feeds() = std::move(subscribed_web_feeds);
return record;
}
feedstore::Record MakeRecord(feedstore::WebFeedInfo web_feed_info) {
feedstore::Record record;
*record.mutable_recommended_web_feed() = std::move(web_feed_info);
return record;
}
template <typename T>
std::pair<std::string, feedstore::Record> MakeKeyAndRecord(T record_data) {
std::pair<std::string, feedstore::Record> result;
result.second = MakeRecord(std::move(record_data));
result.first = KeyForRecord(result.second);
return result;
}
std::unique_ptr<std::vector<std::pair<std::string, feedstore::Record>>>
MakeUpdatesForStreamModelUpdateRequest(
int32_t structure_set_sequence_number,
const StreamType& stream_type,
std::unique_ptr<StreamModelUpdateRequest> update_request) {
base::StringPiece stream_id = feedstore::StreamId(stream_type);
auto updates = std::make_unique<
std::vector<std::pair<std::string, feedstore::Record>>>();
update_request->stream_data.set_stream_id(std::string(stream_id));
updates->push_back(MakeKeyAndRecord(std::move(update_request->stream_data)));
for (feedstore::Content& content : update_request->content) {
content.set_stream_id(std::string(stream_id));
updates->push_back(MakeKeyAndRecord(std::move(content)));
}
for (feedstore::StreamSharedState& shared_state :
update_request->shared_states) {
shared_state.set_stream_id(std::string(stream_id));
updates->push_back(MakeKeyAndRecord(std::move(shared_state)));
}
feedstore::StreamStructureSet stream_structure_set;
stream_structure_set.set_stream_id(std::string(stream_id));
stream_structure_set.set_sequence_number(structure_set_sequence_number);
for (feedstore::StreamStructure& structure :
update_request->stream_structures) {
*stream_structure_set.add_structures() = std::move(structure);
}
updates->push_back(MakeKeyAndRecord(std::move(stream_structure_set)));
return updates;
}
void SortActions(std::vector<feedstore::StoredAction>* actions) {
std::sort(actions->begin(), actions->end(),
[](const feedstore::StoredAction& a,
const feedstore::StoredAction& b) { return a.id() < b.id(); });
}
base::OnceCallback<void(bool)> DropBoolParam(base::OnceClosure callback) {
return base::BindOnce(
[](base::OnceClosure callback, bool ok) { std::move(callback).Run(); },
std::move(callback));
}
} // namespace
FeedStore::LoadStreamResult::LoadStreamResult() = default;
FeedStore::LoadStreamResult::~LoadStreamResult() = default;
FeedStore::LoadStreamResult::LoadStreamResult(LoadStreamResult&&) = default;
FeedStore::LoadStreamResult& FeedStore::LoadStreamResult::operator=(
LoadStreamResult&&) = default;
FeedStore::StartupData::StartupData() = default;
FeedStore::StartupData::StartupData(StartupData&&) = default;
FeedStore::StartupData::~StartupData() = default;
FeedStore::StartupData& FeedStore::StartupData::operator=(StartupData&&) =
default;
FeedStore::FeedStore(
std::unique_ptr<leveldb_proto::ProtoDatabase<feedstore::Record>> database)
: database_status_(leveldb_proto::Enums::InitStatus::kNotInitialized),
database_(std::move(database)) {
}
FeedStore::~FeedStore() = default;
void FeedStore::Initialize(base::OnceClosure initialize_complete) {
if (IsInitialized()) {
std::move(initialize_complete).Run();
} else {
initialize_callback_ = std::move(initialize_complete);
database_->Init(
base::BindOnce(&FeedStore::OnDatabaseInitialized, GetWeakPtr()));
}
}
void FeedStore::OnDatabaseInitialized(leveldb_proto::Enums::InitStatus status) {
database_status_ = status;
if (initialize_callback_)
std::move(initialize_callback_).Run();
}
bool FeedStore::IsInitialized() const {
return database_status_ == leveldb_proto::Enums::InitStatus::kOK;
}
bool FeedStore::IsInitializedForTesting() const {
return IsInitialized();
}
void FeedStore::ReadSingle(
const std::string& key,
base::OnceCallback<void(bool, std::unique_ptr<feedstore::Record>)>
callback) {
if (!IsInitialized()) {
std::move(callback).Run(false, nullptr);
return;
}
database_->GetEntry(key, std::move(callback));
}
void FeedStore::ReadMany(
const base::flat_set<std::string>& key_set,
base::OnceCallback<
void(bool, std::unique_ptr<std::vector<feedstore::Record>>)> callback) {
if (!IsInitialized()) {
std::move(callback).Run(false, nullptr);
return;
}
database_->LoadEntriesWithFilter(
base::BindRepeating(&FilterByKey, std::move(key_set)),
CreateReadOptions(),
/*target_prefix=*/"", std::move(callback));
}
void FeedStore::ClearAll(base::OnceCallback<void(bool)> callback) {
auto filter = [](const std::string& key) { return true; };
database_->UpdateEntriesWithRemoveFilter(
std::make_unique<
std::vector<std::pair<std::string, feedstore::Record>>>(),
base::BindRepeating(filter), std::move(callback));
}
void FeedStore::LoadStream(
const StreamType& stream_type,
base::OnceCallback<void(LoadStreamResult)> callback) {
if (!IsInitialized()) {
LoadStreamResult result;
result.read_error = true;
std::move(callback).Run(std::move(result));
return;
}
auto filter = [](const std::string& stream_data_key,
const std::string& structure_key_prefix,
const std::string& key) {
return key == stream_data_key ||
base::StartsWith(key, structure_key_prefix) || IsLocalActionKey(key);
};
database_->LoadEntriesWithFilter(
base::BindRepeating(
filter, StreamDataKey(stream_type),
base::StrCat({"T/", feedstore::StreamId(stream_type), "/"})),
CreateReadOptions(),
/*target_prefix=*/"",
base::BindOnce(&FeedStore::OnLoadStreamFinished, GetWeakPtr(),
stream_type, std::move(callback)));
}
void FeedStore::OnLoadStreamFinished(
const StreamType& stream_type,
base::OnceCallback<void(LoadStreamResult)> callback,
bool success,
std::unique_ptr<std::vector<feedstore::Record>> records) {
LoadStreamResult result;
result.stream_type = stream_type;
if (!records || !success) {
result.read_error = true;
} else {
for (feedstore::Record& record : *records) {
switch (record.data_case()) {
case feedstore::Record::kStreamData:
result.stream_data = std::move(*record.mutable_stream_data());
DLOG_IF(ERROR, result.stream_data.stream_id() !=
feedstore::StreamId(stream_type))
<< "Read a record with the wrong stream_id";
break;
case feedstore::Record::kStreamStructures:
DLOG_IF(ERROR, record.stream_structures().stream_id() !=
feedstore::StreamId(stream_type))
<< "Read a record with the wrong stream_id";
result.stream_structures.push_back(
std::move(*record.mutable_stream_structures()));
break;
case feedstore::Record::kLocalAction:
result.pending_actions.push_back(
std::move(*record.mutable_local_action()));
break;
default:
break;
}
}
}
SortActions(&result.pending_actions);
std::move(callback).Run(std::move(result));
}
void FeedStore::OverwriteStream(
const StreamType& stream_type,
std::unique_ptr<StreamModelUpdateRequest> update_request,
base::OnceCallback<void(bool)> callback) {
std::unique_ptr<std::vector<std::pair<std::string, feedstore::Record>>>
updates = MakeUpdatesForStreamModelUpdateRequest(
/*structure_set_sequence_number=*/0, stream_type,
std::move(update_request));
UpdateFullStreamData(stream_type, std::move(updates), std::move(callback));
}
void FeedStore::UpdateFullStreamData(
const StreamType& stream_type,
std::unique_ptr<std::vector<std::pair<std::string, feedstore::Record>>>
updates,
base::OnceCallback<void(bool)> callback) {
// Set up a filter to delete all stream-related data.
// But we need to exclude keys being written right now.
std::vector<std::string> key_vector(updates->size());
for (size_t i = 0; i < key_vector.size(); ++i) {
key_vector[i] = (*updates)[i].first;
}
base::flat_set<std::string> updated_keys(std::move(key_vector));
StreamKeyMatcher key_matcher(stream_type);
auto filter = [](const StreamKeyMatcher& key_matcher,
const base::flat_set<std::string>& updated_keys,
const std::string& key) {
return key_matcher.IsKeyForStream(key) && !updated_keys.contains(key);
};
database_->UpdateEntriesWithRemoveFilter(
std::move(updates),
base::BindRepeating(filter, key_matcher, std::move(updated_keys)),
base::BindOnce(&FeedStore::OnSaveStreamEntriesUpdated, GetWeakPtr(),
std::move(callback)));
}
void FeedStore::SaveStreamUpdate(
const StreamType& stream_type,
int32_t structure_set_sequence_number,
std::unique_ptr<StreamModelUpdateRequest> update_request,
base::OnceCallback<void(bool)> callback) {
database_->UpdateEntries(
MakeUpdatesForStreamModelUpdateRequest(structure_set_sequence_number,
stream_type,
std::move(update_request)),
std::make_unique<leveldb_proto::KeyVector>(),
base::BindOnce(&FeedStore::OnSaveStreamEntriesUpdated, GetWeakPtr(),
std::move(callback)));
}
void FeedStore::ClearStreamData(const StreamType& stream_type,
base::OnceCallback<void(bool)> callback) {
UpdateFullStreamData(
stream_type,
std::make_unique<
std::vector<std::pair<std::string, feedstore::Record>>>(),
std::move(callback));
}
void FeedStore::OnSaveStreamEntriesUpdated(
base::OnceCallback<void(bool)> complete_callback,
bool ok) {
std::move(complete_callback).Run(ok);
}
void FeedStore::WriteOperations(
const StreamType& stream_type,
int32_t sequence_number,
std::vector<feedstore::DataOperation> operations) {
base::StringPiece stream_id = feedstore::StreamId(stream_type);
std::vector<feedstore::Record> records;
feedstore::Record structures_record;
feedstore::StreamStructureSet& structure_set =
*structures_record.mutable_stream_structures();
for (feedstore::DataOperation& operation : operations) {
*structure_set.add_structures() = std::move(*operation.mutable_structure());
if (operation.has_content()) {
feedstore::Record record;
operation.mutable_content()->set_stream_id(std::string(stream_id));
record.set_allocated_content(operation.release_content());
records.push_back(std::move(record));
}
}
structure_set.set_stream_id(std::string(feedstore::StreamId(stream_type)));
structure_set.set_sequence_number(sequence_number);
records.push_back(std::move(structures_record));
Write(std::move(records), base::DoNothing());
}
void FeedStore::ReadContent(
const StreamType& stream_type,
std::vector<feedwire::ContentId> content_ids,
std::vector<feedwire::ContentId> shared_state_ids,
base::OnceCallback<void(std::vector<feedstore::Content>,
std::vector<feedstore::StreamSharedState>)>
content_callback) {
std::vector<std::string> key_vector;
key_vector.reserve(content_ids.size() + shared_state_ids.size());
for (const auto& content_id : content_ids)
key_vector.push_back(ContentKey(stream_type, content_id));
for (const auto& content_id : shared_state_ids)
key_vector.push_back(SharedStateKey(stream_type, content_id));
for (const auto& shared_state_id : shared_state_ids)
key_vector.push_back(SharedStateKey(stream_type, shared_state_id));
ReadMany(base::flat_set<std::string>(std::move(key_vector)),
base::BindOnce(&FeedStore::OnReadContentFinished, GetWeakPtr(),
std::move(content_callback)));
}
void FeedStore::OnReadContentFinished(
base::OnceCallback<void(std::vector<feedstore::Content>,
std::vector<feedstore::StreamSharedState>)>
callback,
bool success,
std::unique_ptr<std::vector<feedstore::Record>> records) {
if (!success || !records) {
std::move(callback).Run({}, {});
return;
}
std::vector<feedstore::Content> content;
// Most of records will be content.
content.reserve(records->size());
std::vector<feedstore::StreamSharedState> shared_states;
for (auto& record : *records) {
if (record.data_case() == feedstore::Record::kContent)
content.push_back(std::move(record.content()));
else if (record.data_case() == feedstore::Record::kSharedState)
shared_states.push_back(std::move(record.shared_state()));
}
std::move(callback).Run(std::move(content), std::move(shared_states));
}
void FeedStore::ReadActions(
base::OnceCallback<void(std::vector<feedstore::StoredAction>)> callback) {
database_->LoadEntriesWithFilter(
base::BindRepeating(IsLocalActionKey),
base::BindOnce(&FeedStore::OnReadActionsFinished, GetWeakPtr(),
std::move(callback)));
}
void FeedStore::OnReadActionsFinished(
base::OnceCallback<void(std::vector<feedstore::StoredAction>)> callback,
bool success,
std::unique_ptr<std::vector<feedstore::Record>> records) {
if (!success || !records) {
std::move(callback).Run({});
return;
}
std::vector<feedstore::StoredAction> actions;
actions.reserve(records->size());
for (auto& record : *records)
actions.push_back(std::move(record.local_action()));
SortActions(&actions);
std::move(callback).Run(std::move(actions));
}
void FeedStore::WriteActions(std::vector<feedstore::StoredAction> actions,
base::OnceCallback<void(bool)> callback) {
std::vector<feedstore::Record> records;
records.reserve(actions.size());
for (auto& action : actions) {
feedstore::Record record;
*record.mutable_local_action() = std::move(action);
records.push_back(record);
}
Write(std::move(records), std::move(callback));
}
void FeedStore::UpdateActions(
std::vector<feedstore::StoredAction> actions_to_update,
std::vector<LocalActionId> ids_to_remove,
base::OnceCallback<void(bool)> callback) {
auto entries_to_save = std::make_unique<
leveldb_proto::ProtoDatabase<feedstore::Record>::KeyEntryVector>();
for (auto& action : actions_to_update)
entries_to_save->push_back(MakeKeyAndRecord(std::move(action)));
auto keys_to_remove = std::make_unique<std::vector<std::string>>();
for (LocalActionId id : ids_to_remove)
keys_to_remove->push_back(LocalActionKey(id));
database_->UpdateEntries(std::move(entries_to_save),
std::move(keys_to_remove), std::move(callback));
}
void FeedStore::RemoveActions(std::vector<LocalActionId> ids,
base::OnceCallback<void(bool)> callback) {
auto keys = std::make_unique<std::vector<std::string>>();
keys->reserve(ids.size());
for (LocalActionId id : ids)
keys->push_back(LocalActionKey(id));
database_->UpdateEntries(
/*entries_to_save=*/std::make_unique<
std::vector<std::pair<std::string, feedstore::Record>>>(),
/*keys_to_remove=*/std::move(keys), std::move(callback));
}
void FeedStore::Write(std::vector<feedstore::Record> records,
base::OnceCallback<void(bool)> callback) {
auto entries_to_save = std::make_unique<
leveldb_proto::ProtoDatabase<feedstore::Record>::KeyEntryVector>();
for (auto& record : records) {
std::string key = KeyForRecord(record);
if (!key.empty())
entries_to_save->push_back({std::move(key), std::move(record)});
}
database_->UpdateEntries(
std::move(entries_to_save),
/*keys_to_remove=*/std::make_unique<leveldb_proto::KeyVector>(),
base::BindOnce(&FeedStore::OnWriteFinished, GetWeakPtr(),
std::move(callback)));
}
void FeedStore::OnWriteFinished(base::OnceCallback<void(bool)> callback,
bool success) {
std::move(callback).Run(success);
}
void FeedStore::ReadMetadata(
base::OnceCallback<void(std::unique_ptr<feedstore::Metadata>)> callback) {
ReadSingle(kMetadataKey, base::BindOnce(&FeedStore::OnReadMetadataFinished,
GetWeakPtr(), std::move(callback)));
}
void FeedStore::ReadWebFeedStartupData(
base::OnceCallback<void(WebFeedStartupData)> callback) {
ReadMany({kSubscribedFeedsKey, kRecommendedIndexKey},
base::BindOnce(&FeedStore::OnReadWebFeedStartupDataFinished,
GetWeakPtr(), std::move(callback)));
}
void FeedStore::OnReadWebFeedStartupDataFinished(
base::OnceCallback<void(WebFeedStartupData)> callback,
bool read_ok,
std::unique_ptr<std::vector<feedstore::Record>> records) {
WebFeedStartupData result;
if (records) {
for (feedstore::Record& r : *records) {
if (r.has_recommended_web_feed_index()) {
result.recommended_feed_index =
std::move(*r.mutable_recommended_web_feed_index());
} else if (r.has_subscribed_web_feeds()) {
result.subscribed_web_feeds =
std::move(*r.mutable_subscribed_web_feeds());
} else {
DLOG(ERROR) << "OnReadWebFeedStartupDataFinished: Got record with no "
"useful data. data_case="
<< static_cast<int>(r.data_case());
}
}
}
std::move(callback).Run(std::move(result));
}
void FeedStore::ReadStartupData(
base::OnceCallback<void(StartupData)> callback) {
ReadMany({StreamDataKey(kWebFeedStream), StreamDataKey(kForYouStream),
kMetadataKey},
base::BindOnce(&FeedStore::OnReadStartupDataFinished, GetWeakPtr(),
std::move(callback)));
}
void FeedStore::OnReadStartupDataFinished(
base::OnceCallback<void(StartupData)> callback,
bool read_ok,
std::unique_ptr<std::vector<feedstore::Record>> records) {
StartupData result;
if (records) {
for (feedstore::Record& r : *records) {
if (r.has_stream_data()) {
result.stream_data.push_back(std::move(r.stream_data()));
} else if (r.has_metadata()) {
result.metadata = base::WrapUnique(r.release_metadata());
} else {
DLOG(ERROR) << "OnReadStartupDataFinished: Got record with no "
"useful data. data_case="
<< static_cast<int>(r.data_case());
}
}
}
std::move(callback).Run(std::move(result));
}
void FeedStore::WriteRecommendedFeeds(
feedstore::RecommendedWebFeedIndex index,
std::vector<feedstore::WebFeedInfo> web_feed_info,
base::OnceClosure callback) {
auto entries_to_save = std::make_unique<
leveldb_proto::ProtoDatabase<feedstore::Record>::KeyEntryVector>();
entries_to_save->push_back(MakeKeyAndRecord(std::move(index)));
for (auto& info : web_feed_info) {
entries_to_save->push_back(MakeKeyAndRecord(std::move(info)));
}
auto remove_record = [](const std::string& key) {
return key.size() > 1 && key[1] == '/' && key[0] == 'R';
};
database_->UpdateEntriesWithRemoveFilter(std::move(entries_to_save),
base::BindRepeating(remove_record),
DropBoolParam(std::move(callback)));
}
void FeedStore::WriteSubscribedFeeds(feedstore::SubscribedWebFeeds index,
base::OnceClosure callback) {
Write({MakeRecord(index)}, DropBoolParam(std::move(callback)));
}
void FeedStore::OnReadMetadataFinished(
base::OnceCallback<void(std::unique_ptr<feedstore::Metadata>)> callback,
bool read_ok,
std::unique_ptr<feedstore::Record> record) {
if (!record || !read_ok) {
std::move(callback).Run(nullptr);
return;
}
std::move(callback).Run(base::WrapUnique(record->release_metadata()));
}
void FeedStore::WriteMetadata(feedstore::Metadata metadata,
base::OnceCallback<void(bool)> callback) {
Write({MakeRecord(std::move(metadata))}, std::move(callback));
}
void FeedStore::UpgradeFromStreamSchemaV0(
feedstore::Metadata metadata,
base::OnceCallback<void(feedstore::Metadata)> callback) {
// Migration does two things:
// 1. Record a new metadata with the new schema version.
// 2. Delete all stream data.
metadata.set_stream_schema_version(kCurrentStreamSchemaVersion);
auto updates = std::make_unique<
std::vector<std::pair<std::string, feedstore::Record>>>();
updates->emplace_back(kMetadataKey, MakeRecord(metadata));
database_->UpdateEntriesWithRemoveFilter(
std::move(updates), base::BindRepeating(&IsAnyStreamRecordKey),
DropBoolParam(base::BindOnce(std::move(callback), std::move(metadata))));
}
void FeedStore::ReadRecommendedWebFeedInfo(
const std::string& web_feed_id,
base::OnceCallback<void(std::unique_ptr<feedstore::WebFeedInfo>)>
callback) {
ReadSingle("R/" + web_feed_id,
base::BindOnce(&FeedStore::ReadRecommendedWebFeedInfoFinished,
GetWeakPtr(), std::move(callback)));
}
void FeedStore::ReadRecommendedWebFeedInfoFinished(
base::OnceCallback<void(std::unique_ptr<feedstore::WebFeedInfo>)> callback,
bool read_ok,
std::unique_ptr<feedstore::Record> record) {
if (!record || !read_ok) {
std::move(callback).Run(nullptr);
return;
}
std::move(callback).Run(
base::WrapUnique(record->release_recommended_web_feed()));
}
} // namespace feed