blob: 4f59ea5746a65e0d7517c1b1ce0ea7983d3dd832 [file] [log] [blame]
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "services/network/chunked_data_pipe_upload_data_stream.h"
#include "base/bind.h"
#include "base/callback.h"
#include "base/location.h"
#include "base/logging.h"
#include "mojo/public/c/system/types.h"
#include "net/base/io_buffer.h"
namespace network {
ChunkedDataPipeUploadDataStream::ChunkedDataPipeUploadDataStream(
scoped_refptr<ResourceRequestBody> resource_request_body,
mojom::ChunkedDataPipeGetterPtr chunked_data_pipe_getter)
: net::UploadDataStream(true /* is_chunked */,
resource_request_body->identifier()),
resource_request_body_(std::move(resource_request_body)),
chunked_data_pipe_getter_(std::move(chunked_data_pipe_getter)),
handle_watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL,
base::SequencedTaskRunnerHandle::Get()) {
chunked_data_pipe_getter_.set_connection_error_handler(
base::BindOnce(&ChunkedDataPipeUploadDataStream::OnDataPipeGetterClosed,
base::Unretained(this)));
chunked_data_pipe_getter_->GetSize(
base::BindOnce(&ChunkedDataPipeUploadDataStream::OnSizeReceived,
base::Unretained(this)));
}
ChunkedDataPipeUploadDataStream::~ChunkedDataPipeUploadDataStream() {}
int ChunkedDataPipeUploadDataStream::InitInternal(
const net::NetLogWithSource& net_log) {
// If there was an error either passed to the ReadCallback or as a result of
// closing the DataPipeGetter pipe, fail the read.
if (status_ != net::OK)
return status_;
// If the data pipe was closed, just fail initialization.
if (chunked_data_pipe_getter_.encountered_error())
return net::ERR_FAILED;
// Get a new data pipe and start.
mojo::DataPipe data_pipe;
chunked_data_pipe_getter_->StartReading(std::move(data_pipe.producer_handle));
data_pipe_ = std::move(data_pipe.consumer_handle);
return net::OK;
}
int ChunkedDataPipeUploadDataStream::ReadInternal(net::IOBuffer* buf,
int buf_len) {
DCHECK(!buf_);
DCHECK(buf);
DCHECK_GT(buf_len, 0);
// If there was an error either passed to the ReadCallback or as a result of
// closing the DataPipeGetter pipe, fail the read.
if (status_ != net::OK)
return status_;
// Nothing else to do, if the entire body was read.
if (size_ && bytes_read_ == *size_) {
// This shouldn't be called if the stream was already completed.
DCHECK(!IsEOF());
SetIsFinalChunk();
return net::OK;
}
// Only start watching once a read starts. This is because OnHandleReadable()
// uses |buf_| implicitly assuming that this method has already been called.
if (!handle_watcher_.IsWatching()) {
handle_watcher_.Watch(
data_pipe_.get(),
MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
base::BindRepeating(&ChunkedDataPipeUploadDataStream::OnHandleReadable,
base::Unretained(this)));
}
uint32_t num_bytes = buf_len;
if (size_ && num_bytes > *size_ - bytes_read_)
num_bytes = *size_ - bytes_read_;
MojoResult rv =
data_pipe_->ReadData(buf->data(), &num_bytes, MOJO_READ_DATA_FLAG_NONE);
if (rv == MOJO_RESULT_OK) {
bytes_read_ += num_bytes;
// Not needed for correctness, but this allows the consumer to send the
// final chunk and the end of stream message together, for protocols that
// allow it.
if (size_ && *size_ == bytes_read_)
SetIsFinalChunk();
return num_bytes;
}
if (rv == MOJO_RESULT_SHOULD_WAIT) {
handle_watcher_.ArmOrNotify();
buf_ = buf;
buf_len_ = buf_len;
return net::ERR_IO_PENDING;
}
// The pipe was closed. If the size isn't known yet, could be a success or a
// failure.
if (!size_) {
// Need to keep the buffer around because its presence is used to indicate
// that there's a pending UploadDataStream read.
buf_ = buf;
buf_len_ = buf_len;
handle_watcher_.Cancel();
data_pipe_.reset();
return net::ERR_IO_PENDING;
}
// |size_| was checked earlier, so if this point is reached, the pipe was
// closed before receiving all bytes.
DCHECK_LT(bytes_read_, *size_);
return net::ERR_FAILED;
}
void ChunkedDataPipeUploadDataStream::ResetInternal() {
// Init rewinds the stream. Throw away current state, other than |size_| and
// |status_|.
buf_ = nullptr;
buf_len_ = 0;
handle_watcher_.Cancel();
bytes_read_ = 0;
data_pipe_.reset();
}
void ChunkedDataPipeUploadDataStream::OnSizeReceived(int32_t status,
uint64_t size) {
DCHECK(!size_);
DCHECK_EQ(net::OK, status_);
status_ = status;
if (status == net::OK) {
size_ = size;
if (size == bytes_read_) {
// Only set this as a final chunk if there's a read in progress. Setting
// it asynchronously could result in confusing consumers.
if (buf_)
SetIsFinalChunk();
} else if (size < bytes_read_ || (buf_ && !data_pipe_.is_valid())) {
// If more data was received than was expected, or there's a pending read
// and data pipe was closed without passing in as many bytes as expected,
// the upload can't continue. If there's no pending read but the pipe was
// closed, the closure and size difference will be noticed on the next
// read attempt.
status_ = net::ERR_FAILED;
}
}
// If this is done, and there's a pending read, complete the pending read.
// If there's not a pending read, either |status_| will be reported on the
// next read, the file will be marked as done, so ReadInternal() won't be
// called again.
if (buf_ && (IsEOF() || status_ != net::OK)) {
// |data_pipe_| isn't needed any more, and if it's still open, a close pipe
// message would cause issues, since this class normally only watches the
// pipe when there's a pending read.
handle_watcher_.Cancel();
data_pipe_.reset();
// Clear |buf_| as well, so it's only non-null while there's a pending read.
buf_ = nullptr;
buf_len_ = 0;
OnReadCompleted(status_);
// |this| may have been deleted at this point.
}
}
void ChunkedDataPipeUploadDataStream::OnHandleReadable(MojoResult result) {
DCHECK(buf_);
// Final result of the Read() call, to be passed to the consumer.
// Swap out |buf_| and |buf_len_|
scoped_refptr<net::IOBuffer> buf(std::move(buf_));
int buf_len = buf_len_;
buf_len_ = 0;
int rv = ReadInternal(buf.get(), buf_len);
if (rv != net::ERR_IO_PENDING)
OnReadCompleted(rv);
// |this| may have been deleted at this point.
}
void ChunkedDataPipeUploadDataStream::OnDataPipeGetterClosed() {
// If the size hasn't been received yet, treat this as receiving an error.
// Otherwise, this will only be a problem if/when InitInternal() tries to
// start reading again, so do nothing.
if (!size_)
OnSizeReceived(net::ERR_FAILED, 0);
}
} // namespace network