blob: 8e0f906fbd59fc1f3ae7ee1e3cadeb10cf57fb3a [file] [log] [blame]
// Copyright 2025 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "content/browser/indexed_db/instance/sqlite/active_blob_streamer.h"
#include <limits>
#include <memory>
#include <utility>
#include <vector>
#include "base/functional/bind.h"
#include "base/numerics/clamped_math.h"
#include "base/strings/utf_string_conversions.h"
#include "base/uuid.h"
#include "content/browser/indexed_db/blob_reader.h"
#include "mojo/public/cpp/bindings/remote.h"
#include "net/base/net_errors.h"
#include "services/network/public/cpp/net_adapters.h"
namespace content::indexed_db::sqlite {
namespace {
// Each time a blob is read via mojo, a DataPipe is opened to pass the bytes
// through. This object is created once for each such read and feeds blob bytes
// into the pipe. The object is self owned.
class SqliteBlobToDataPipe {
public:
// This version feeds the bytes into `dest` and notifies `client` of important
// events. It's used when the BlobStorageContext is reading the blob.
SqliteBlobToDataPipe(
base::RepeatingCallback<sql::StreamingBlobHandle*(void)>
readable_blob_handle,
uint64_t blob_size,
uint64_t offset,
uint64_t read_length,
mojo::ScopedDataPipeProducerHandle dest,
mojo::PendingRemote<blink::mojom::BlobReaderClient> client)
: readable_blob_handle_(std::move(readable_blob_handle)),
dest_(std::move(dest)),
client_(std::move(client)),
offset_(offset),
read_length_(read_length) {
client_->OnCalculatedSize(blob_size, read_length_);
}
// This version feeds the bytes into `dest` and invokes `completion_callback`
// when done. It's used when a renderer is reading the blob.
SqliteBlobToDataPipe(base::RepeatingCallback<sql::StreamingBlobHandle*(void)>
readable_blob_handle,
uint64_t offset,
uint64_t read_length,
mojo::ScopedDataPipeProducerHandle dest,
base::OnceCallback<void(int)> completion_callback)
: readable_blob_handle_(std::move(readable_blob_handle)),
dest_(std::move(dest)),
completion_callback_(std::move(completion_callback)),
offset_(offset),
read_length_(read_length) {}
~SqliteBlobToDataPipe() = default;
void Start() {
if (read_length_ == 0) {
OnComplete(net::OK);
return;
}
writable_handle_watcher_.emplace(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL);
writable_handle_watcher_->Watch(
dest_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
base::BindRepeating(&SqliteBlobToDataPipe::OnDataPipeWritable,
base::Unretained(this)));
OnDataPipeWritable(MOJO_RESULT_OK);
}
private:
void OnDataPipeWritable(MojoResult result) {
if (result == MOJO_RESULT_FAILED_PRECONDITION) {
OnComplete(net::ERR_ABORTED);
return;
}
DCHECK_EQ(result, MOJO_RESULT_OK) << result;
sql::StreamingBlobHandle* readable_blob_handle =
readable_blob_handle_.Run();
if (!readable_blob_handle) {
OnComplete(net::ERR_ABORTED);
return;
}
// This loop shouldn't block the thread for *too* long as the mojo pipe has
// a capacity of 2MB (i.e. `BeginWrite()` will return
// MOJO_RESULT_SHOULD_WAIT at some point when streaming a large enough
// blob).
while (true) {
scoped_refptr<network::NetToMojoPendingBuffer> pending_write;
MojoResult mojo_result =
network::NetToMojoPendingBuffer::BeginWrite(&dest_, &pending_write);
switch (mojo_result) {
case MOJO_RESULT_OK:
break;
case MOJO_RESULT_SHOULD_WAIT:
// The pipe is full. We need to wait for it to have more space.
writable_handle_watcher_->ArmOrNotify();
return;
case MOJO_RESULT_FAILED_PRECONDITION:
// The data pipe consumer handle has been closed.
OnComplete(net::ERR_ABORTED);
return;
default:
// The body stream is in a bad state. Bail out.
OnComplete(net::ERR_UNEXPECTED);
return;
}
size_t read_bytes = base::checked_cast<size_t>(
std::min(static_cast<uint64_t>(pending_write->size()),
read_length_ - transferred_bytes_));
base::span<uint8_t> buffer =
base::as_writable_byte_span(*pending_write).first(read_bytes);
bool success =
readable_blob_handle->Read(offset_ + transferred_bytes_, buffer);
if (!success) {
// Read error.
dest_ = pending_write->Complete(0);
OnComplete(net::ERR_FAILED);
return;
}
dest_ = pending_write->Complete(read_bytes);
transferred_bytes_ += read_bytes;
if (transferred_bytes_ == read_length_) {
OnComplete(net::OK);
return;
}
CHECK_LT(transferred_bytes_, read_length_);
}
}
void OnComplete(int result) {
// Resets the watchers, pipes and the exchange handler, so that
// we will never be called back.
if (writable_handle_watcher_) {
writable_handle_watcher_->Cancel();
}
dest_.reset();
if (client_) {
client_->OnComplete(result, transferred_bytes_);
} else {
std::move(completion_callback_).Run(result);
}
delete this;
}
// If the database goes away, this getter will return null, and `this` must
// abort.
base::RepeatingCallback<sql::StreamingBlobHandle*(void)>
readable_blob_handle_;
// `this` is the producer, and `dest_` is its handle to the pipe.
mojo::ScopedDataPipeProducerHandle dest_;
// Exactly one of these two members will be non-null.
mojo::Remote<blink::mojom::BlobReaderClient> client_;
base::OnceCallback<void(int)> completion_callback_;
// The number of bytes successfully transferred so far.
uint64_t transferred_bytes_ = 0;
// Each read has a certain offset and length specified by the consumer.
uint64_t offset_;
uint64_t read_length_ = 0;
// Optional so that its construction can be deferred.
std::optional<mojo::SimpleWatcher> writable_handle_watcher_;
};
} // namespace
void ActiveBlobStreamer::Clone(
mojo::PendingReceiver<blink::mojom::Blob> receiver) {
receivers_.Add(this, std::move(receiver));
}
void ActiveBlobStreamer::AsDataPipeGetter(
mojo::PendingReceiver<network::mojom::DataPipeGetter> receiver) {
data_pipe_getter_receivers_.Add(this, std::move(receiver));
}
void ActiveBlobStreamer::ReadRange(
uint64_t offset,
uint64_t length,
mojo::ScopedDataPipeProducerHandle handle,
mojo::PendingRemote<blink::mojom::BlobReaderClient> client) {
(new SqliteBlobToDataPipe(base::BindRepeating(
[](base::WeakPtr<ActiveBlobStreamer> streamer)
-> sql::StreamingBlobHandle* {
if (!streamer) {
return nullptr;
}
return &streamer->readable_blob_handle_;
},
weak_factory_.GetWeakPtr()),
blob_length_, offset,
ClampReadLength(offset, length), std::move(handle),
std::move(client)))
->Start();
}
void ActiveBlobStreamer::ReadAll(
mojo::ScopedDataPipeProducerHandle handle,
mojo::PendingRemote<blink::mojom::BlobReaderClient> client) {
ReadRange(0, std::numeric_limits<uint64_t>::max(), std::move(handle),
std::move(client));
}
void ActiveBlobStreamer::Load(
mojo::PendingReceiver<network::mojom::URLLoader> loader,
const std::string& method,
const net::HttpRequestHeaders& headers,
mojo::PendingRemote<network::mojom::URLLoaderClient> client) {
// Bounce back to the registry so that we can avoid reimplementing
// `BlobUrlLoader`. This is used for Object URLs. It's not clear how often
// this is used or how important it is to make it super efficient.
registry_blob_->Load(std::move(loader), method, headers, std::move(client));
}
void ActiveBlobStreamer::ReadSideData(
blink::mojom::Blob::ReadSideDataCallback callback) {
std::move(callback).Run(std::nullopt);
}
void ActiveBlobStreamer::CaptureSnapshot(CaptureSnapshotCallback callback) {
// This method is used for the File API. Technically IDB can store Files, but
// when it does so, the size and last modification date should always be known
// and propagated to the renderer through IndexedDBExternalObject's metadata.
// This path is likely only reached when the file modification date and/or
// size is somehow unknown, but reproducing this scenario has proven
// difficult. See crbug.com/390586616
std::move(callback).Run(blob_length_, std::nullopt);
}
void ActiveBlobStreamer::GetInternalUUID(GetInternalUUIDCallback callback) {
std::move(callback).Run(uuid_);
}
void ActiveBlobStreamer::Clone(
mojo::PendingReceiver<network::mojom::DataPipeGetter> receiver) {
data_pipe_getter_receivers_.Add(this, std::move(receiver));
}
void ActiveBlobStreamer::Read(
mojo::ScopedDataPipeProducerHandle pipe,
network::mojom::DataPipeGetter::ReadCallback on_size_known) {
std::move(on_size_known).Run(net::OK, blob_length_);
Read(0, std::numeric_limits<uint64_t>::max(), std::move(pipe),
base::DoNothing());
}
void ActiveBlobStreamer::Read(
uint64_t offset,
uint64_t length,
mojo::ScopedDataPipeProducerHandle pipe,
storage::mojom::BlobDataItemReader::ReadCallback callback) {
(new SqliteBlobToDataPipe(base::BindRepeating(
[](base::WeakPtr<ActiveBlobStreamer> streamer)
-> sql::StreamingBlobHandle* {
if (!streamer) {
return nullptr;
}
return &streamer->readable_blob_handle_;
},
weak_factory_.GetWeakPtr()),
offset, ClampReadLength(offset, length),
std::move(pipe), std::move(callback)))
->Start();
}
void ActiveBlobStreamer::ReadSideData(
storage::mojom::BlobDataItemReader::ReadSideDataCallback callback) {
// This type should never have side data.
std::move(callback).Run(net::ERR_NOT_IMPLEMENTED, mojo_base::BigBuffer());
}
ActiveBlobStreamer::ActiveBlobStreamer(
const IndexedDBExternalObject& blob_info,
sql::StreamingBlobHandle readable_blob_handle,
base::OnceClosure on_became_inactive)
: uuid_(base::Uuid::GenerateRandomV4().AsLowercaseString()),
blob_length_(blob_info.size()),
content_type_(base::UTF16ToUTF8(blob_info.type())),
readable_blob_handle_(std::move(readable_blob_handle)),
on_became_inactive_(std::move(on_became_inactive)) {
receivers_.set_disconnect_handler(base::BindRepeating(
&ActiveBlobStreamer::OnMojoDisconnect, base::Unretained(this)));
data_pipe_getter_receivers_.set_disconnect_handler(base::BindRepeating(
&ActiveBlobStreamer::OnMojoDisconnect, base::Unretained(this)));
readers_.set_disconnect_handler(base::BindRepeating(
&ActiveBlobStreamer::OnMojoDisconnect, base::Unretained(this)));
}
ActiveBlobStreamer::~ActiveBlobStreamer() = default;
void ActiveBlobStreamer::BindRegistryBlob(
storage::mojom::BlobStorageContext& blob_registry) {
CHECK(!registry_blob_.is_bound());
auto element = storage::mojom::BlobDataItem::New();
element->size = blob_length_;
element->side_data_size = 0;
element->content_type = content_type_;
element->type = storage::mojom::BlobDataItemType::kIndexedDB;
readers_.Add(this, element->reader.InitWithNewPipeAndPassReceiver());
blob_registry.RegisterFromDataItem(
registry_blob_.BindNewPipeAndPassReceiver(), uuid_, std::move(element));
}
void ActiveBlobStreamer::AddReceiver(
mojo::PendingReceiver<blink::mojom::Blob> receiver,
storage::mojom::BlobStorageContext& blob_registry) {
if (!registry_blob_.is_bound()) {
CHECK(receivers_.empty());
BindRegistryBlob(blob_registry);
}
Clone(std::move(receiver));
}
void ActiveBlobStreamer::OnMojoDisconnect() {
if (!receivers_.empty() || !data_pipe_getter_receivers_.empty()) {
return;
}
// Unregistering the blob will drop its reference to the `BlobDataItem`
// associated with `this` as a `BlobDataItemReader`, which will often lead to
// `readers_` receiving a disconnect. But there may still be other references
// to the `BlobDataItem`, such as another blob, which means that `this` can go
// on living indefinitely. See crbug.com/392376370
registry_blob_.reset();
if (readers_.empty()) {
std::move(on_became_inactive_).Run();
// `this` is deleted.
}
}
uint64_t ActiveBlobStreamer::ClampReadLength(uint64_t offset,
uint64_t length) const {
return base::ClampMin(length, base::ClampSub(blob_length_, offset));
}
} // namespace content::indexed_db::sqlite