blob: 623c24a14eaf122527e9c23a90305a05f645bb17 [file] [log] [blame]
// Copyright 2023 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "content/browser/preloading/prefetch/prefetch_data_pipe_tee.h"
#include "base/containers/span.h"
#include "base/metrics/histogram_functions.h"
#include "base/notreached.h"
#include "base/strings/string_view_util.h"
#include "mojo/public/cpp/system/string_data_source.h"
#include "services/network/public/cpp/loading_params.h"
namespace content {
namespace {
MojoResult CreateDataPipeForServingData(
mojo::ScopedDataPipeProducerHandle& producer_handle,
mojo::ScopedDataPipeConsumerHandle& consumer_handle) {
MojoCreateDataPipeOptions options;
options.struct_size = sizeof(MojoCreateDataPipeOptions);
options.flags = MOJO_CREATE_DATA_PIPE_FLAG_NONE;
options.element_num_bytes = 1;
options.capacity_num_bytes = network::GetDataPipeDefaultAllocationSize(
network::DataPipeAllocationSize::kLargerSizeIfPossible);
return mojo::CreateDataPipe(&options, producer_handle, consumer_handle);
}
} // namespace
PrefetchDataPipeTee::PrefetchDataPipeTee(
mojo::ScopedDataPipeConsumerHandle source,
size_t buffer_limit)
: source_(std::move(source)),
source_watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL,
base::SequencedTaskRunner::GetCurrentDefault()),
buffer_limit_(buffer_limit),
target_watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC,
base::SequencedTaskRunner::GetCurrentDefault()) {
source_watcher_.Watch(source_.get(), MOJO_HANDLE_SIGNAL_READABLE,
MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
base::BindRepeating(&PrefetchDataPipeTee::OnReadable,
weak_factory_.GetWeakPtr()));
source_watcher_.ArmOrNotify();
}
PrefetchDataPipeTee::~PrefetchDataPipeTee() {
CHECK(!target_.first);
base::UmaHistogramEnumeration(
"Preloading.Prefetch.PrefetchDataPipeTeeDtorState", state_);
}
mojo::ScopedDataPipeConsumerHandle PrefetchDataPipeTee::Clone() {
++count_clone_called_;
switch (state_) {
case State::kLoading:
if (target_.first || pending_writes_) {
base::UmaHistogramCounts100(
"Preloading.Prefetch.PrefetchDataPipeTeeCloneFailed.Loading",
count_clone_called_);
return {};
}
break;
case State::kSizeExceededNoTarget:
CHECK(!target_.first);
CHECK_EQ(pending_writes_, 0u);
state_ = State::kSizeExceeded;
break;
case State::kSizeExceeded:
base::UmaHistogramCounts100(
"Preloading.Prefetch.PrefetchDataPipeTeeCloneFailed.SizeExceeded",
count_clone_called_);
return {};
case State::kLoaded:
break;
}
mojo::ScopedDataPipeConsumerHandle consumer_handle;
mojo::ScopedDataPipeProducerHandle producer_handle;
MojoResult rv =
CreateDataPipeForServingData(producer_handle, consumer_handle);
if (rv != MOJO_RESULT_OK) {
return {};
}
auto producer =
std::make_unique<mojo::DataPipeProducer>(std::move(producer_handle));
// Send `buffer_` (== the whole data read so far) to the new target.
WriteData(std::make_pair(std::move(producer), base::WrapRefCounted(this)),
buffer_);
return consumer_handle;
}
void PrefetchDataPipeTee::OnReadable(MojoResult result,
const mojo::HandleSignalsState& state) {
if (pending_writes_) {
// Reading is blocked while writing to a target is ongoing.
return;
}
switch (state_) {
case State::kLoading:
case State::kSizeExceeded:
break;
case State::kSizeExceededNoTarget:
// Reading is blocked until the first target is added, because there are
// no buffer space to store the read data until the target is added.
return;
case State::kLoaded:
// No further read is needed.
return;
}
base::span<const uint8_t> read_data;
MojoResult rv = source_->BeginReadData(MOJO_READ_DATA_FLAG_NONE, read_data);
if (rv == MOJO_RESULT_OK) {
switch (state_) {
case State::kLoading:
CHECK_LE(buffer_.size(), buffer_limit_);
if (buffer_.size() + read_data.size() <= buffer_limit_) {
buffer_.append(base::as_string_view(read_data));
if (target_.first) {
WriteData(ResetTarget({}), //
std::string_view(buffer_).substr(buffer_.size() -
read_data.size()));
}
break;
}
// If there are no targets yet, stop reading and keep `buffer_` (== the
// whole data read so far) until the first target is added.
if (!target_.first) {
read_data = read_data.first(buffer_limit_ - buffer_.size());
buffer_.append(base::as_string_view(read_data));
state_ = State::kSizeExceededNoTarget;
break;
}
// If there is a target, clear the current `buffer_` because it was
// already written to the target. The current read data is written to
// the target below.
buffer_.clear();
state_ = State::kSizeExceeded;
[[fallthrough]];
case State::kSizeExceeded:
CHECK(buffer_.empty());
if (!target_.first) {
// The target was already removed. Discard the current read data,
// because anyway no targets can be added to `this`.
break;
}
buffer_.append(base::as_string_view(read_data));
WriteData(ResetTarget({}), buffer_);
break;
case State::kSizeExceededNoTarget:
case State::kLoaded:
NOTREACHED();
}
source_->EndReadData(read_data.size());
source_watcher_.ArmOrNotify();
} else if (rv == MOJO_RESULT_FAILED_PRECONDITION) {
switch (state_) {
case State::kLoading:
state_ = State::kLoaded;
// Closes the producer handle, if any.
ResetTarget({});
break;
case State::kSizeExceeded:
// Closes the producer handle, if any.
ResetTarget({});
break;
case State::kSizeExceededNoTarget:
case State::kLoaded:
NOTREACHED();
}
} else if (rv != MOJO_RESULT_SHOULD_WAIT) {
NOTREACHED() << "Unhandled MojoResult: " << rv;
}
}
PrefetchDataPipeTee::ProducerPair PrefetchDataPipeTee::ResetTarget(
ProducerPair target) {
auto old_target = std::move(target_);
target_ = std::move(target);
target_watcher_.Cancel();
if (target_.first) {
CHECK_EQ(target_.second.get(), this);
// Detect disconnection during `target_` is set and no ongoing writes for
// `target_` is performed. Disconnection during ongoing writes are detected
// and handled by `OnDataWritten()`.
target_watcher_.Watch(
target_.first->GetProducerHandle(), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
base::BindRepeating(&PrefetchDataPipeTee::OnWriteDataPipeClosed,
weak_factory_.GetWeakPtr()));
}
return old_target;
}
void PrefetchDataPipeTee::OnWriteDataPipeClosed(
MojoResult result,
const mojo::HandleSignalsState& state) {
CHECK(target_.first);
if (state.peer_closed()) {
ResetTarget({});
}
}
void PrefetchDataPipeTee::WriteData(ProducerPair target,
base::span<const char> data) {
CHECK_EQ(target.second.get(), this);
++pending_writes_;
auto* raw_target = target.first.get();
raw_target->Write(std::make_unique<mojo::StringDataSource>(
data, mojo::StringDataSource::AsyncWritingMode::
STRING_STAYS_VALID_UNTIL_COMPLETION),
base::BindOnce(&PrefetchDataPipeTee::OnDataWritten,
base::Unretained(this), std::move(target)));
}
void PrefetchDataPipeTee::OnDataWritten(ProducerPair target,
MojoResult result) {
CHECK_GT(pending_writes_, 0u);
--pending_writes_;
switch (state_) {
case State::kLoaded:
// Destruct `target`, because all data (== `buffer_`) is written to
// `target`.
break;
case State::kSizeExceeded:
// Data are streamed and thus cleared after written.
// `buffer_` is kept until here because
// `STRING_STAYS_VALID_UNTIL_COMPLETION` is used.
buffer_.clear();
[[fallthrough]];
case State::kLoading:
// Continue writing on `target` by setting it to `target_` again on
// successful writes, while destruct `target` on errors.
if (result == MOJO_RESULT_OK) {
ResetTarget(std::move(target));
}
// In either case continue loading because `buffer_` and upcoming data can
// be still used for future targets.
if (pending_writes_ == 0) {
source_watcher_.ArmOrNotify();
}
break;
case State::kSizeExceededNoTarget:
NOTREACHED();
}
}
} // namespace content