blob: bafebfd97020549e7d4f5a8b2ce7b21462ea7613 [file] [log] [blame]
// Copyright 2025 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "services/network/partial_decoder.h"
#include "base/check_op.h"
#include "net/base/io_buffer.h"
#include "net/filter/filter_source_stream.h"
namespace network {
PartialDecoderResult::PartialDecoderResult(
base::queue<scoped_refptr<net::IOBufferWithSize>> raw_buffers,
const std::optional<net::Error>& completion_status)
: completion_status_(completion_status) {
// Wrap each raw buffer in a `DrainableIOBuffer` to allow for partial
// consumption in `ConsumeRawData`.
while (!raw_buffers.empty()) {
auto buffer = raw_buffers.front();
CHECK_NE(buffer->size(), 0);
raw_buffers_.push(
base::MakeRefCounted<net::DrainableIOBuffer>(buffer, buffer->size()));
raw_buffers.pop();
}
}
PartialDecoderResult::PartialDecoderResult(PartialDecoderResult&& other) =
default;
PartialDecoderResult& PartialDecoderResult::operator=(PartialDecoderResult&&) =
default;
PartialDecoderResult::~PartialDecoderResult() = default;
bool PartialDecoderResult::HasRawData() const {
return !raw_buffers_.empty();
}
size_t PartialDecoderResult::ConsumeRawData(base::span<uint8_t> out) {
base::span<uint8_t> remaining = out;
// Copy data from the raw buffers into `out` until either all raw data is
// consumed or `out` is full.
while (!raw_buffers_.empty() && !remaining.empty()) {
// Get the next raw buffer.
auto buf = raw_buffers_.front();
// Calculate the number of bytes to write, which is the minimum of the
// remaining space in `out` and the remaining bytes in the current buffer.
size_t write_size = std::min(
remaining.size(), base::checked_cast<size_t>(buf->BytesRemaining()));
auto [destination, rest] = remaining.split_at(write_size);
// Copy the data from the raw buffer to the output span.
destination.copy_from_nonoverlapping(buf->first(write_size));
// Mark the bytes as consumed in the DrainableIOBuffer.
buf->DidConsume(write_size);
// If the current raw buffer is fully consumed, remove it.
if (buf->BytesRemaining() == 0) {
raw_buffers_.pop();
}
// Update the remaining span.
remaining = rest;
}
return out.size() - remaining.size();
}
PartialDecoder::RecordingStream::RecordingStream(
base::RepeatingCallback<int(net::IOBuffer*, int)> read_callback)
: SourceStream(net::SourceStreamType::kNone),
read_callback_(std::move(read_callback)) {}
PartialDecoder::RecordingStream::~RecordingStream() = default;
int PartialDecoder::RecordingStream::Read(
net::IOBuffer* dest_buffer,
int buffer_size,
net::CompletionOnceCallback callback) {
// Call the underlying read callback to fetch more data.
int result = read_callback_.Run(dest_buffer, buffer_size);
if (result == net::ERR_IO_PENDING) {
// If the read is pending, store the destination buffer and callback for
// later use in `OnReadCompleted`.
pending_dest_buffer_ = dest_buffer;
pending_callback_ = std::move(callback);
return result;
}
HandleReadCompleted(result, dest_buffer);
return result;
}
std::string PartialDecoder::RecordingStream::Description() const {
return std::string();
}
bool PartialDecoder::RecordingStream::MayHaveMoreBytes() const {
return true;
}
void PartialDecoder::RecordingStream::OnReadCompleted(int result) {
CHECK_NE(result, net::ERR_IO_PENDING);
CHECK(pending_dest_buffer_);
HandleReadCompleted(result, pending_dest_buffer_.get());
// Clear the pending buffer and callback, then invoke the callback to signal
// completion to the `decoding_stream_`.
pending_dest_buffer_ = nullptr;
std::move(pending_callback_).Run(result);
}
base::queue<scoped_refptr<net::IOBufferWithSize>>
PartialDecoder::RecordingStream::TakeRawBuffers() {
return std::move(raw_buffers_);
}
void PartialDecoder::RecordingStream::HandleReadCompleted(
int result,
net::IOBuffer* dest_buffer) {
CHECK_NE(result, net::ERR_IO_PENDING);
if (result > 0) {
// Record the raw data read result.
auto new_buffer = base::MakeRefCounted<net::IOBufferWithSize>(result);
new_buffer->span().copy_from(
dest_buffer->first(base::checked_cast<size_t>(result)));
raw_buffers_.push(std::move(new_buffer));
} else {
// If the read completed, store the completion status.
completion_status_ = static_cast<net::Error>(result);
}
}
PartialDecoder::PartialDecoder(
base::RepeatingCallback<int(net::IOBuffer*, int)> read_raw_data_callback,
const std::vector<net::SourceStreamType>& types,
size_t decoded_buffer_size)
: decoded_buffer_(base::MakeRefCounted<net::GrowableIOBuffer>()) {
decoded_buffer_->SetCapacity(base::checked_cast<int>(decoded_buffer_size));
// Create a `RecordingStream` to intercept and record the raw data from the
// underlying read callback.
auto recording_stream =
std::make_unique<RecordingStream>(std::move(read_raw_data_callback));
recording_stream_ = recording_stream.get();
// Create a decoding stream that uses the `RecordingStream` as its input.
// This stream will apply the specified decoders (if any) to the recorded
// raw data.
decoding_stream_ = net::FilterSourceStream::CreateDecodingSourceStream(
std::move(recording_stream), types);
}
PartialDecoder::~PartialDecoder() = default;
int PartialDecoder::ReadDecodedDataMore(
base::OnceCallback<void(int)> callback) {
CHECK(HasRemainingBuffer());
// Attempt to read more decoded data from the `decoding_stream_` into the
// remaining capacity of the `decoded_buffer_`.
int result = decoding_stream_->Read(
decoded_buffer_.get(), decoded_buffer_->RemainingCapacity(),
base::BindOnce(&PartialDecoder::OnReadDecodedDataAsyncComplete,
base::Unretained(this)));
if (result == net::ERR_IO_PENDING) {
// If the read is pending, store the callback.
pending_read_decoded_data_more_callback_ = std::move(callback);
} else if (result > 0) {
// If data was read synchronously, update the offset of the
// `decoded_buffer_` to reflect the new data.
decoded_buffer_->set_offset(decoded_buffer_->offset() + result);
}
return result;
}
void PartialDecoder::OnReadDecodedDataAsyncComplete(int result) {
if (result > 0) {
// If data was read, update the offset of the `decoded_buffer_`.
decoded_buffer_->set_offset(decoded_buffer_->offset() + result);
}
CHECK(pending_read_decoded_data_more_callback_);
// Run the stored callback to notify the caller that the read is complete.
std::move(pending_read_decoded_data_more_callback_).Run(result);
}
// Forwards the completion of a raw data read to the recording stream.
void PartialDecoder::OnReadRawDataCompleted(int bytes_read) {
CHECK(read_in_progress());
CHECK(recording_stream_);
// The `recording_stream_` will handle recording the raw data and invoking
// the original read callback.
recording_stream_->OnReadCompleted(bytes_read);
}
bool PartialDecoder::read_in_progress() const {
return !!pending_read_decoded_data_more_callback_;
}
bool PartialDecoder::HasRemainingBuffer() const {
return decoded_buffer_->RemainingCapacity() > 0;
}
base::span<const uint8_t> PartialDecoder::decoded_data() const {
return decoded_buffer_->span_before_offset();
}
PartialDecoderResult PartialDecoder::TakeResult() && {
return PartialDecoderResult(recording_stream_->TakeRawBuffers(),
recording_stream_->completion_status());
}
} // namespace network