blob: 1cb306e41fcc7a0068a888db4601c9dccecdd8c3 [file] [log] [blame]
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "third_party/blink/renderer/core/fetch/bytes_uploader.h"
#include "base/numerics/checked_math.h"
#include "base/numerics/safe_conversions.h"
#include "base/single_thread_task_runner.h"
#include "net/base/net_errors.h"
#include "third_party/blink/public/platform/task_type.h"
#include "third_party/blink/renderer/core/execution_context/execution_context.h"
#include "third_party/blink/renderer/platform/heap/persistent.h"
#include "third_party/blink/renderer/platform/loader/fetch/bytes_consumer.h"
namespace blink {
BytesUploader::BytesUploader(
BytesConsumer* consumer,
mojo::PendingReceiver<network::mojom::blink::ChunkedDataPipeGetter>
pending_receiver,
scoped_refptr<base::SingleThreadTaskRunner> task_runner)
: consumer_(consumer),
receiver_(this, std::move(pending_receiver)),
upload_pipe_watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL,
std::move(task_runner)) {
DCHECK(consumer_);
DCHECK_EQ(consumer_->GetPublicState(),
BytesConsumer::PublicState::kReadableOrWaiting);
}
BytesUploader::~BytesUploader() = default;
void BytesUploader::Trace(blink::Visitor* visitor) const {
visitor->Trace(consumer_);
BytesConsumer::Client::Trace(visitor);
}
void BytesUploader::GetSize(GetSizeCallback get_size_callback) {
DCHECK(!get_size_callback_);
get_size_callback_ = std::move(get_size_callback);
}
void BytesUploader::StartReading(
mojo::ScopedDataPipeProducerHandle upload_pipe) {
DVLOG(3) << this << " StartReading()";
DCHECK(get_size_callback_);
DCHECK(upload_pipe);
if (upload_pipe_) {
// Replay was asked by net/ service.
CloseOnError();
return;
}
upload_pipe_ = std::move(upload_pipe);
upload_pipe_watcher_.Watch(upload_pipe_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
WTF::BindRepeating(&BytesUploader::OnPipeWriteable,
WrapWeakPersistent(this)));
consumer_->SetClient(this);
if (consumer_->GetPublicState() ==
BytesConsumer::PublicState::kReadableOrWaiting) {
WriteDataOnPipe();
}
}
void BytesUploader::OnStateChange() {
DVLOG(3) << this << " OnStateChange(). consumer_->GetPublicState()="
<< consumer_->GetPublicState();
DCHECK(get_size_callback_);
switch (consumer_->GetPublicState()) {
case BytesConsumer::PublicState::kReadableOrWaiting:
WriteDataOnPipe();
return;
case BytesConsumer::PublicState::kClosed:
Close();
return;
case BytesConsumer::PublicState::kErrored:
CloseOnError();
return;
}
NOTREACHED();
}
void BytesUploader::OnPipeWriteable(MojoResult unused) {
WriteDataOnPipe();
}
void BytesUploader::WriteDataOnPipe() {
DVLOG(3) << this << " WriteDataOnPipe(). consumer_->GetPublicState()="
<< consumer_->GetPublicState();
DCHECK(upload_pipe_);
DCHECK(get_size_callback_);
if (!upload_pipe_.is_valid())
return;
while (true) {
const char* buffer;
size_t available;
auto consumer_result = consumer_->BeginRead(&buffer, &available);
DVLOG(3) << " consumer_->BeginRead()=" << consumer_result
<< ", available=" << available;
switch (consumer_result) {
case BytesConsumer::Result::kError:
CloseOnError();
return;
case BytesConsumer::Result::kShouldWait:
return;
case BytesConsumer::Result::kDone:
Close();
return;
case BytesConsumer::Result::kOk:
break;
}
DCHECK_EQ(consumer_result, BytesConsumer::Result::kOk);
uint32_t written_bytes = base::saturated_cast<uint32_t>(available);
const MojoResult mojo_result = upload_pipe_->WriteData(
buffer, &written_bytes, MOJO_WRITE_DATA_FLAG_NONE);
DVLOG(3) << " upload_pipe_->WriteData()=" << mojo_result
<< ", mojo_written=" << written_bytes
<< ", consumer_->EndRead()=" << consumer_result;
if (mojo_result == MOJO_RESULT_SHOULD_WAIT) {
// Wait for the pipe to have more capacity available
consumer_result = consumer_->EndRead(0);
upload_pipe_watcher_.ArmOrNotify();
return;
}
if (mojo_result != MOJO_RESULT_OK) {
CloseOnError();
return;
}
consumer_result = consumer_->EndRead(written_bytes);
if (!base::CheckAdd(total_size_, written_bytes)
.AssignIfValid(&total_size_)) {
CloseOnError();
return;
}
switch (consumer_result) {
case BytesConsumer::Result::kError:
CloseOnError();
return;
case BytesConsumer::Result::kShouldWait:
NOTREACHED();
return;
case BytesConsumer::Result::kDone:
Close();
return;
case BytesConsumer::Result::kOk:
break;
}
}
}
void BytesUploader::Close() {
DVLOG(3) << this << " Close(). total_size=" << total_size_;
DCHECK(get_size_callback_);
std::move(get_size_callback_).Run(net::OK, total_size_);
}
void BytesUploader::CloseOnError() {
DVLOG(3) << this << " CloseOnError(). total_size=" << total_size_;
DCHECK(consumer_);
consumer_->Cancel();
DCHECK(get_size_callback_);
std::move(get_size_callback_).Run(net::ERR_FAILED, total_size_);
}
} // namespace blink