blob: 3dab3a2dae31ce9e201c4a3516de8600b06557d1 [file] [log] [blame]
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "content/browser/loader/data_pipe_to_source_stream.h"
#include <utility>
#include "base/auto_reset.h"
#include "base/bind.h"
#include "net/base/io_buffer.h"
namespace content {
DataPipeToSourceStream::DataPipeToSourceStream(
mojo::ScopedDataPipeConsumerHandle body)
: net::SourceStream(net::SourceStream::TYPE_NONE),
body_(std::move(body)),
handle_watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL,
base::SequencedTaskRunnerHandle::Get()) {
handle_watcher_.Watch(
body_.get(), MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
base::BindRepeating(&DataPipeToSourceStream::OnReadable,
base::Unretained(this)));
}
DataPipeToSourceStream::~DataPipeToSourceStream() = default;
std::string DataPipeToSourceStream::Description() const {
return "DataPipe";
}
int DataPipeToSourceStream::Read(net::IOBuffer* buf,
int buf_size,
net::CompletionOnceCallback callback) {
base::AutoReset<bool> inside_read_checker(&inside_read_, true);
if (!body_.get()) {
// We have finished reading the pipe.
return 0;
}
const void* buffer = nullptr;
uint32_t available = 0;
MojoResult result =
body_->BeginReadData(&buffer, &available, MOJO_READ_DATA_FLAG_NONE);
switch (result) {
case MOJO_RESULT_OK: {
uint32_t consume =
std::min(base::checked_cast<uint32_t>(buf_size), available);
memcpy(buf->data(), buffer, consume);
body_->EndReadData(consume);
return base::checked_cast<int>(consume);
}
case MOJO_RESULT_FAILED_PRECONDITION:
// Finished reading.
FinishReading();
return 0;
case MOJO_RESULT_SHOULD_WAIT:
// Data is not available yet.
pending_callback_ = std::move(callback);
output_buf_ = buf;
output_buf_size_ = buf_size;
handle_watcher_.ArmOrNotify();
return net::ERR_IO_PENDING;
}
NOTREACHED() << static_cast<int>(result);
return net::ERR_UNEXPECTED;
}
void DataPipeToSourceStream::OnReadable(MojoResult unused) {
// It's not expected that we call this synchronously inside Read.
DCHECK(!inside_read_);
DCHECK(pending_callback_);
DCHECK(output_buf_);
const void* buffer = nullptr;
uint32_t available = 0;
MojoResult result =
body_->BeginReadData(&buffer, &available, MOJO_READ_DATA_FLAG_NONE);
switch (result) {
case MOJO_RESULT_OK: {
uint32_t consume =
std::min(base::checked_cast<uint32_t>(output_buf_size_), available);
memcpy(output_buf_->data(), buffer, consume);
body_->EndReadData(consume);
std::move(pending_callback_).Run(consume);
return;
}
case MOJO_RESULT_FAILED_PRECONDITION:
FinishReading();
std::move(pending_callback_).Run(0);
return;
case MOJO_RESULT_SHOULD_WAIT:
handle_watcher_.ArmOrNotify();
return;
}
NOTREACHED() << static_cast<int>(result);
}
void DataPipeToSourceStream::FinishReading() {
handle_watcher_.Cancel();
body_.reset();
}
} // namespace content