blob: dea0b68d987ccc06c2c35a2c36e334de7143c56b [file] [log] [blame]
// Copyright 2017 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/public/cpp/system/data_pipe_producer.h"
#include <algorithm>
#include <limits>
#include <memory>
#include <utility>
#include "base/containers/span.h"
#include "base/functional/bind.h"
#include "base/functional/callback.h"
#include "base/location.h"
#include "base/memory/ref_counted_delete_on_sequence.h"
#include "base/numerics/safe_conversions.h"
#include "base/synchronization/lock.h"
#include "base/task/sequenced_task_runner.h"
#include "base/task/thread_pool.h"
#include "base/thread_annotations.h"
#include "mojo/public/cpp/system/simple_watcher.h"
namespace mojo {
namespace {
// No good reason not to attempt very large pipe transactions in case the data
// pipe in use has a very large capacity available, so we default to trying
// 64 MB chunks whenever a producer is writable.
constexpr size_t kDefaultMaxReadSize = 64 * 1024 * 1024;
} // namespace
class DataPipeProducer::SequenceState
: public base::RefCountedDeleteOnSequence<SequenceState> {
public:
using CompletionCallback =
base::OnceCallback<void(ScopedDataPipeProducerHandle producer,
MojoResult result)>;
SequenceState(ScopedDataPipeProducerHandle producer_handle,
scoped_refptr<base::SequencedTaskRunner> file_task_runner,
CompletionCallback callback,
scoped_refptr<base::SequencedTaskRunner> callback_task_runner)
: base::RefCountedDeleteOnSequence<SequenceState>(
std::move(file_task_runner)),
callback_task_runner_(std::move(callback_task_runner)),
producer_handle_(std::move(producer_handle)),
callback_(std::move(callback)) {}
SequenceState(const SequenceState&) = delete;
SequenceState& operator=(const SequenceState&) = delete;
void Cancel() {
base::AutoLock lock(lock_);
is_cancelled_ = true;
owning_task_runner()->PostTask(
FROM_HERE, base::BindOnce(&SequenceState::CancelOnSequence, this));
}
void Start(std::unique_ptr<DataSource> data_source) {
owning_task_runner()->PostTask(
FROM_HERE, base::BindOnce(&SequenceState::StartOnSequence, this,
std::move(data_source)));
}
private:
friend class base::DeleteHelper<SequenceState>;
friend class base::RefCountedDeleteOnSequence<SequenceState>;
~SequenceState() = default;
void StartOnSequence(std::unique_ptr<DataSource> data_source) {
data_source_ = std::move(data_source);
TransferSomeBytes();
if (producer_handle_.is_valid()) {
// If we didn't nail it all on the first transaction attempt, setup a
// watcher and complete the read asynchronously.
watcher_ = std::make_unique<SimpleWatcher>(
FROM_HERE, SimpleWatcher::ArmingPolicy::AUTOMATIC,
base::SequencedTaskRunner::GetCurrentDefault());
watcher_->Watch(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
MOJO_WATCH_CONDITION_SATISFIED,
base::BindRepeating(&SequenceState::OnHandleReady, this));
}
}
void OnHandleReady(MojoResult result, const HandleSignalsState& state) {
{
// Stop ourselves from doing redundant work if we've been cancelled from
// another thread. Note that we do not rely on this for any kind of thread
// safety concerns.
base::AutoLock lock(lock_);
if (is_cancelled_)
return;
}
if (result != MOJO_RESULT_OK) {
// Either the consumer pipe has been closed or something terrible
// happened. In any case, we'll never be able to write more data.
data_source_->Abort();
Finish(result);
return;
}
TransferSomeBytes();
}
void TransferSomeBytes() {
while (true) {
DCHECK_LE(bytes_transferred_, data_source_->GetLength());
const uint64_t max_data_size =
data_source_->GetLength() - bytes_transferred_;
if (max_data_size == 0) {
// There's no more data to transfer.
Finish(MOJO_RESULT_OK);
return;
}
size_t size_hint = kDefaultMaxReadSize;
if (static_cast<uint64_t>(size_hint) > max_data_size) {
size_hint = static_cast<size_t>(max_data_size);
}
base::span<uint8_t> pipe_buffer;
MojoResult mojo_result = producer_handle_->BeginWriteData(
size_hint, MOJO_WRITE_DATA_FLAG_NONE, pipe_buffer);
if (mojo_result == MOJO_RESULT_SHOULD_WAIT)
return;
if (mojo_result != MOJO_RESULT_OK) {
data_source_->Abort();
Finish(mojo_result);
return;
}
DataSource::ReadResult result = data_source_->Read(
bytes_transferred_, base::as_writable_chars(pipe_buffer));
producer_handle_->EndWriteData(result.bytes_read);
// result.bytes_read == 0 is used to determine if the read operation did
// not retrieve any bytes, which typically occurs when reaching the end of
// the file (EOF).
if (result.result != MOJO_RESULT_OK || result.bytes_read == 0) {
Finish(result.result);
return;
}
bytes_transferred_ += result.bytes_read;
}
}
void Finish(MojoResult result) {
watcher_.reset();
data_source_.reset();
callback_task_runner_->PostTask(
FROM_HERE, base::BindOnce(std::move(callback_),
std::move(producer_handle_), result));
}
void CancelOnSequence() {
if (!data_source_)
return;
data_source_->Abort();
Finish(MOJO_RESULT_CANCELLED);
}
const scoped_refptr<base::SequencedTaskRunner> callback_task_runner_;
// State which is effectively owned and used only on the file sequence.
ScopedDataPipeProducerHandle producer_handle_;
std::unique_ptr<DataPipeProducer::DataSource> data_source_;
size_t bytes_transferred_ = 0;
CompletionCallback callback_;
std::unique_ptr<SimpleWatcher> watcher_;
base::Lock lock_;
bool is_cancelled_ GUARDED_BY(lock_) = false;
};
DataPipeProducer::DataPipeProducer(ScopedDataPipeProducerHandle producer)
: producer_(std::move(producer)) {}
DataPipeProducer::~DataPipeProducer() {
if (sequence_state_)
sequence_state_->Cancel();
}
void DataPipeProducer::Write(std::unique_ptr<DataSource> data_source,
CompletionCallback callback) {
InitializeNewRequest(std::move(callback));
sequence_state_->Start(std::move(data_source));
}
void DataPipeProducer::InitializeNewRequest(CompletionCallback callback) {
DCHECK(!sequence_state_);
// TODO(crbug.com/41436919): Re-evaluate how TaskPriority is set here and in
// other file URL-loading-related code. Some callers require USER_VISIBLE
// (i.e., BEST_EFFORT is not enough).
auto file_task_runner = base::ThreadPool::CreateSequencedTaskRunner(
{base::MayBlock(), base::TaskPriority::USER_VISIBLE});
sequence_state_ = new SequenceState(
std::move(producer_), file_task_runner,
base::BindOnce(&DataPipeProducer::OnWriteComplete,
weak_factory_.GetWeakPtr(), std::move(callback)),
base::SequencedTaskRunner::GetCurrentDefault());
}
void DataPipeProducer::OnWriteComplete(CompletionCallback callback,
ScopedDataPipeProducerHandle producer,
MojoResult ready_result) {
producer_ = std::move(producer);
sequence_state_ = nullptr;
std::move(callback).Run(ready_result);
}
const DataPipeProducerHandle& DataPipeProducer::GetProducerHandle() const {
return producer_.get();
}
} // namespace mojo