blob: 18c982818b6b9dffbe808a2480483939335fc2e1 [file] [log] [blame]
// Copyright 2018 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/cronet/native/upload_data_sink.h"
#include <inttypes.h>
#include <utility>
#include "base/check_op.h"
#include "base/functional/bind.h"
#include "base/memory/raw_ptr.h"
#include "base/strings/strcat.h"
#include "base/strings/stringprintf.h"
#include "base/task/single_thread_task_runner.h"
#include "components/cronet/cronet_upload_data_stream.h"
#include "components/cronet/native/engine.h"
#include "components/cronet/native/generated/cronet.idl_impl_struct.h"
#include "components/cronet/native/include/cronet_c.h"
#include "components/cronet/native/io_buffer_with_cronet_buffer.h"
#include "components/cronet/native/runnables.h"
#include "components/cronet/native/url_request.h"
#include "net/base/io_buffer.h"
namespace cronet {
// This class is called by Cronet's network stack as an implementation of
// CronetUploadDataStream::Delegate, and forwards the calls along to
// Cronet_UploadDataSinkImpl on the embedder's executor.
// This class is always called on the network thread and is destroyed in
// OnUploadDataStreamDestroyed() callback.
class Cronet_UploadDataSinkImpl::NetworkTasks
: public CronetUploadDataStream::Delegate {
public:
NetworkTasks(Cronet_UploadDataSinkImpl* upload_data_sink,
Cronet_Executor* upload_data_provider_executor);
NetworkTasks(const NetworkTasks&) = delete;
NetworkTasks& operator=(const NetworkTasks&) = delete;
~NetworkTasks() override;
private:
// CronetUploadDataStream::Delegate implementation:
void InitializeOnNetworkThread(
base::WeakPtr<CronetUploadDataStream> upload_data_stream) override;
void Read(scoped_refptr<net::IOBuffer> buffer, int buf_len) override;
void Rewind() override;
void OnUploadDataStreamDestroyed() override;
// Post |task| to client executor.
void PostTaskToExecutor(base::OnceClosure task);
// The upload data sink that is owned by url request and always accessed on
// the client thread. It always outlives |this| callback.
const raw_ptr<Cronet_UploadDataSinkImpl, AcrossTasksDanglingUntriaged>
upload_data_sink_ = nullptr;
// Executor for provider callback, used, but not owned, by |this|. Always
// outlives |this| callback.
Cronet_ExecutorPtr const upload_data_provider_executor_ = nullptr;
THREAD_CHECKER(network_thread_checker_);
};
Cronet_UploadDataSinkImpl::NetworkTasks::NetworkTasks(
Cronet_UploadDataSinkImpl* upload_data_sink,
Cronet_Executor* upload_data_provider_executor)
: upload_data_sink_(upload_data_sink),
upload_data_provider_executor_(upload_data_provider_executor) {
DETACH_FROM_THREAD(network_thread_checker_);
}
Cronet_UploadDataSinkImpl::NetworkTasks::~NetworkTasks() = default;
Cronet_UploadDataSinkImpl::Cronet_UploadDataSinkImpl(
Cronet_UrlRequestImpl* url_request,
Cronet_UploadDataProvider* upload_data_provider,
Cronet_Executor* upload_data_provider_executor)
: url_request_(url_request),
upload_data_provider_executor_(upload_data_provider_executor),
upload_data_provider_(upload_data_provider) {}
Cronet_UploadDataSinkImpl::~Cronet_UploadDataSinkImpl() = default;
void Cronet_UploadDataSinkImpl::InitRequest(CronetURLRequest* request) {
int64_t length = upload_data_provider_->GetLength();
if (length == -1) {
is_chunked_ = true;
} else {
CHECK_GE(length, 0);
length_ = static_cast<uint64_t>(length);
remaining_length_ = length_;
}
request->SetUpload(std::make_unique<CronetUploadDataStream>(
new NetworkTasks(this, upload_data_provider_executor_), length));
}
void Cronet_UploadDataSinkImpl::OnReadSucceeded(uint64_t bytes_read,
bool final_chunk) {
{
base::AutoLock lock(lock_);
CheckState(READ);
in_which_user_callback_ = NOT_IN_CALLBACK;
if (!upload_data_provider_)
return;
}
if (url_request_->IsDone())
return;
if (close_when_not_in_callback_) {
PostCloseToExecutor();
return;
}
CHECK(bytes_read > 0 || (final_chunk && bytes_read == 0));
// Bytes read exceeds buffer length.
CHECK_LE(static_cast<size_t>(bytes_read), buffer_->io_buffer_len());
if (!is_chunked_) {
// Only chunked upload can have the final chunk.
CHECK(!final_chunk);
// Read upload data length exceeds specified length.
if (bytes_read > remaining_length_) {
PostCloseToExecutor();
std::string error_message =
base::StringPrintf("Read upload data length %" PRIu64
" exceeds expected length %" PRIu64,
length_ - remaining_length_ + bytes_read, length_);
url_request_->OnUploadDataProviderError(error_message.c_str());
return;
}
remaining_length_ -= bytes_read;
}
network_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&CronetUploadDataStream::OnReadSuccess,
upload_data_stream_, bytes_read, final_chunk));
}
void Cronet_UploadDataSinkImpl::OnReadError(Cronet_String error_message) {
{
base::AutoLock lock(lock_);
CheckState(READ);
in_which_user_callback_ = NOT_IN_CALLBACK;
if (!upload_data_provider_)
return;
}
if (url_request_->IsDone())
return;
PostCloseToExecutor();
url_request_->OnUploadDataProviderError(error_message);
}
void Cronet_UploadDataSinkImpl::OnRewindSucceeded() {
{
base::AutoLock lock(lock_);
CheckState(REWIND);
in_which_user_callback_ = NOT_IN_CALLBACK;
if (!upload_data_provider_)
return;
}
remaining_length_ = length_;
if (url_request_->IsDone())
return;
if (close_when_not_in_callback_) {
PostCloseToExecutor();
return;
}
network_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&CronetUploadDataStream::OnRewindSuccess,
upload_data_stream_));
}
void Cronet_UploadDataSinkImpl::OnRewindError(Cronet_String error_message) {
{
base::AutoLock lock(lock_);
CheckState(REWIND);
in_which_user_callback_ = NOT_IN_CALLBACK;
if (!upload_data_provider_)
return;
}
if (url_request_->IsDone())
return;
PostCloseToExecutor();
url_request_->OnUploadDataProviderError(error_message);
}
void Cronet_UploadDataSinkImpl::InitializeUploadDataStream(
base::WeakPtr<CronetUploadDataStream> upload_data_stream,
scoped_refptr<base::SingleThreadTaskRunner> network_task_runner) {
DCHECK(!upload_data_stream_);
DCHECK(!network_task_runner_.get());
upload_data_stream_ = upload_data_stream;
network_task_runner_ = network_task_runner;
}
void Cronet_UploadDataSinkImpl::PostCloseToExecutor() {
Cronet_RunnablePtr runnable = new cronet::OnceClosureRunnable(base::BindOnce(
&Cronet_UploadDataSinkImpl::Close, base::Unretained(this)));
// |runnable| is passed to executor, which destroys it after execution.
Cronet_Executor_Execute(upload_data_provider_executor_, runnable);
}
void Cronet_UploadDataSinkImpl::Read(scoped_refptr<net::IOBuffer> buffer,
int buf_len) {
if (url_request_->IsDone())
return;
Cronet_UploadDataProviderPtr upload_data_provider = nullptr;
{
base::AutoLock lock(lock_);
if (!upload_data_provider_)
return;
CheckState(NOT_IN_CALLBACK);
in_which_user_callback_ = READ;
upload_data_provider = upload_data_provider_;
}
buffer_ =
std::make_unique<Cronet_BufferWithIOBuffer>(std::move(buffer), buf_len);
Cronet_UploadDataProvider_Read(upload_data_provider, this,
buffer_->cronet_buffer());
}
void Cronet_UploadDataSinkImpl::Rewind() {
if (url_request_->IsDone())
return;
Cronet_UploadDataProviderPtr upload_data_provider = nullptr;
{
base::AutoLock lock(lock_);
if (!upload_data_provider_)
return;
CheckState(NOT_IN_CALLBACK);
in_which_user_callback_ = REWIND;
upload_data_provider = upload_data_provider_;
}
Cronet_UploadDataProvider_Rewind(upload_data_provider, this);
}
void Cronet_UploadDataSinkImpl::Close() {
Cronet_UploadDataProviderPtr upload_data_provider = nullptr;
{
base::AutoLock lock(lock_);
// If |upload_data_provider_| is already closed from OnResponseStarted(),
// don't close it again from OnError() or OnCanceled().
if (!upload_data_provider_)
return;
if (in_which_user_callback_ != NOT_IN_CALLBACK) {
// If currently in the callback, then wait until return from callback
// before closing.
close_when_not_in_callback_ = true;
return;
}
upload_data_provider = upload_data_provider_;
upload_data_provider_ = nullptr;
}
Cronet_UploadDataProvider_Close(upload_data_provider);
}
void Cronet_UploadDataSinkImpl::CheckState(UserCallback expected_state) {
lock_.AssertAcquired();
CHECK(in_which_user_callback_ == expected_state);
}
void Cronet_UploadDataSinkImpl::NetworkTasks::InitializeOnNetworkThread(
base::WeakPtr<CronetUploadDataStream> upload_data_stream) {
DCHECK_CALLED_ON_VALID_THREAD(network_thread_checker_);
PostTaskToExecutor(
base::BindOnce(&Cronet_UploadDataSinkImpl::InitializeUploadDataStream,
base::Unretained(upload_data_sink_), upload_data_stream,
base::SingleThreadTaskRunner::GetCurrentDefault()));
}
void Cronet_UploadDataSinkImpl::NetworkTasks::Read(
scoped_refptr<net::IOBuffer> buffer,
int buf_len) {
DCHECK_CALLED_ON_VALID_THREAD(network_thread_checker_);
PostTaskToExecutor(base::BindOnce(&Cronet_UploadDataSinkImpl::Read,
base::Unretained(upload_data_sink_),
std::move(buffer), buf_len));
}
void Cronet_UploadDataSinkImpl::NetworkTasks::Rewind() {
DCHECK_CALLED_ON_VALID_THREAD(network_thread_checker_);
PostTaskToExecutor(base::BindOnce(&Cronet_UploadDataSinkImpl::Rewind,
base::Unretained(upload_data_sink_)));
}
void Cronet_UploadDataSinkImpl::NetworkTasks::OnUploadDataStreamDestroyed() {
DCHECK_CALLED_ON_VALID_THREAD(network_thread_checker_);
delete this;
}
void Cronet_UploadDataSinkImpl::NetworkTasks::PostTaskToExecutor(
base::OnceClosure task) {
Cronet_RunnablePtr runnable =
new cronet::OnceClosureRunnable(std::move(task));
// |runnable| is passed to executor, which destroys it after execution.
Cronet_Executor_Execute(upload_data_provider_executor_, runnable);
}
} // namespace cronet