blob: 3cccbe0eb1e07616ac43e856cec3b4f9db4feaff [file] [log] [blame]
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chrome/browser/prefetch/search_prefetch/streaming_search_prefetch_url_loader.h"
#include <string>
#include <utility>
#include "base/bind.h"
#include "base/location.h"
#include "base/memory/ptr_util.h"
#include "base/task/post_task.h"
#include "base/task/task_traits.h"
#include "base/threading/thread_task_runner_handle.h"
#include "base/time/time.h"
#include "chrome/browser/profiles/profile.h"
#include "content/public/browser/storage_partition.h"
#include "mojo/public/c/system/data_pipe.h"
#include "net/http/http_response_headers.h"
#include "net/http/http_util.h"
#include "net/traffic_annotation/network_traffic_annotation.h"
#include "services/network/public/cpp/constants.h"
#include "services/network/public/cpp/resource_request.h"
#include "services/network/public/cpp/shared_url_loader_factory.h"
#include "url/gurl.h"
StreamingSearchPrefetchURLLoader::StreamingSearchPrefetchURLLoader(
StreamingSearchPrefetchRequest* streaming_prefetch_request,
Profile* profile,
std::unique_ptr<network::ResourceRequest> resource_request,
const net::NetworkTrafficAnnotationTag& network_traffic_annotation)
: resource_request_(std::move(resource_request)),
streaming_prefetch_request_(streaming_prefetch_request) {
DCHECK(streaming_prefetch_request_);
auto url_loader_factory =
content::BrowserContext::GetDefaultStoragePartition(profile)
->GetURLLoaderFactoryForBrowserProcess();
// Create a network service URL loader with passed in params.
url_loader_factory->CreateLoaderAndStart(
network_url_loader_.BindNewPipeAndPassReceiver(), 0, 0,
network::mojom::kURLLoadOptionNone, *resource_request_,
url_loader_receiver_.BindNewPipeAndPassRemote(
base::ThreadTaskRunnerHandle::Get()),
net::MutableNetworkTrafficAnnotationTag(network_traffic_annotation));
url_loader_receiver_.set_disconnect_handler(base::BindOnce(
&StreamingSearchPrefetchURLLoader::OnURLLoaderMojoDisconnect,
base::Unretained(this)));
}
StreamingSearchPrefetchURLLoader::~StreamingSearchPrefetchURLLoader() = default;
SearchPrefetchURLLoader::RequestHandler
StreamingSearchPrefetchURLLoader::ServingResponseHandler(
std::unique_ptr<SearchPrefetchURLLoader> loader) {
DCHECK(!streaming_prefetch_request_);
return base::BindOnce(
&StreamingSearchPrefetchURLLoader::SetUpForwardingClient,
weak_factory_.GetWeakPtr(), std::move(loader));
}
void StreamingSearchPrefetchURLLoader::SetUpForwardingClient(
std::unique_ptr<SearchPrefetchURLLoader> loader,
const network::ResourceRequest& resource_request,
mojo::PendingReceiver<network::mojom::URLLoader> receiver,
mojo::PendingRemote<network::mojom::URLLoaderClient> forwarding_client) {
DCHECK(!streaming_prefetch_request_);
// Bind to the content/ navigation code.
DCHECK(!receiver_.is_bound());
if (network_url_loader_)
network_url_loader_->SetPriority(resource_request.priority, -1);
// At this point, we are bound to the mojo receiver, so we can release
// |loader|, which points to |this|.
receiver_.Bind(std::move(receiver));
loader.release();
receiver_.set_disconnect_handler(base::BindOnce(
&StreamingSearchPrefetchURLLoader::OnURLLoaderClientMojoDisconnect,
weak_factory_.GetWeakPtr()));
forwarding_client_.Bind(std::move(forwarding_client));
if (!resource_request.report_raw_headers) {
resource_response_->raw_request_response_info = nullptr;
}
forwarding_client_->OnReceiveResponse(std::move(resource_response_));
RunEventQueue();
}
void StreamingSearchPrefetchURLLoader::OnReceiveResponse(
network::mojom::URLResponseHeadPtr head) {
DCHECK(!forwarding_client_);
DCHECK(streaming_prefetch_request_);
// Store head and pause new messages until the forwarding client is set up.
resource_response_ = std::move(head);
estimated_length_ = resource_response_->content_length < 0
? 0
: resource_response_->content_length;
if (estimated_length_ > 0)
body_content_.reserve(estimated_length_);
if (!streaming_prefetch_request_->CanServePrefetchRequest(
resource_response_->headers)) {
// Not safe to do anything after this point
streaming_prefetch_request_->ErrorEncountered();
return;
}
streaming_prefetch_request_->MarkPrefetchAsServable();
}
void StreamingSearchPrefetchURLLoader::OnReceiveRedirect(
const net::RedirectInfo& redirect_info,
network::mojom::URLResponseHeadPtr head) {
if (streaming_prefetch_request_) {
streaming_prefetch_request_->ErrorEncountered();
} else {
delete this;
}
}
void StreamingSearchPrefetchURLLoader::OnUploadProgress(
int64_t current_position,
int64_t total_size,
OnUploadProgressCallback callback) {
// We only handle GETs.
NOTREACHED();
}
void StreamingSearchPrefetchURLLoader::OnReceiveCachedMetadata(
mojo_base::BigBuffer data) {
// Do nothing. This is not supported for navigation loader.
}
void StreamingSearchPrefetchURLLoader::OnTransferSizeUpdated(
int32_t transfer_size_diff) {
if (forwarding_client_) {
DCHECK(forwarding_client_);
forwarding_client_->OnTransferSizeUpdated(transfer_size_diff);
return;
}
estimated_length_ += transfer_size_diff;
if (estimated_length_ > 0)
body_content_.reserve(estimated_length_);
event_queue_.push_back(
base::BindOnce(&StreamingSearchPrefetchURLLoader::OnTransferSizeUpdated,
base::Unretained(this), transfer_size_diff));
}
void StreamingSearchPrefetchURLLoader::OnStartLoadingResponseBody(
mojo::ScopedDataPipeConsumerHandle body) {
if (forwarding_client_) {
DCHECK(forwarding_client_);
DCHECK(!streaming_prefetch_request_);
forwarding_client_->OnStartLoadingResponseBody(std::move(body));
return;
}
serving_from_data_ = true;
pipe_drainer_ =
std::make_unique<mojo::DataPipeDrainer>(this, std::move(body));
event_queue_.push_back(base::BindOnce(
&StreamingSearchPrefetchURLLoader::OnStartLoadingResponseBodyFromData,
base::Unretained(this)));
}
void StreamingSearchPrefetchURLLoader::OnDataAvailable(const void* data,
size_t num_bytes) {
body_content_.append(std::string(static_cast<const char*>(data), num_bytes));
bytes_of_raw_data_to_transfer_ += num_bytes;
if (forwarding_client_)
PushData();
}
void StreamingSearchPrefetchURLLoader::OnDataComplete() {
drain_complete_ = true;
}
void StreamingSearchPrefetchURLLoader::OnStartLoadingResponseBodyFromData() {
mojo::ScopedDataPipeConsumerHandle consumer_handle;
MojoCreateDataPipeOptions options;
options.struct_size = sizeof(MojoCreateDataPipeOptions);
options.flags = MOJO_CREATE_DATA_PIPE_FLAG_NONE;
options.element_num_bytes = 1;
options.capacity_num_bytes = network::kDataPipeDefaultAllocationSize;
MojoResult rv =
mojo::CreateDataPipe(&options, &producer_handle_, &consumer_handle);
if (rv != MOJO_RESULT_OK) {
delete this;
return;
}
handle_watcher_ = std::make_unique<mojo::SimpleWatcher>(
FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::MANUAL,
base::SequencedTaskRunnerHandle::Get());
handle_watcher_->Watch(
producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
MOJO_WATCH_CONDITION_SATISFIED,
base::BindRepeating(&StreamingSearchPrefetchURLLoader::OnHandleReady,
weak_factory_.GetWeakPtr()));
forwarding_client_->OnStartLoadingResponseBody(std::move(consumer_handle));
PushData();
}
void StreamingSearchPrefetchURLLoader::OnHandleReady(
MojoResult result,
const mojo::HandleSignalsState& state) {
if (result != MOJO_RESULT_OK) {
delete this;
return;
}
PushData();
}
void StreamingSearchPrefetchURLLoader::PushData() {
while (true) {
DCHECK_GE(bytes_of_raw_data_to_transfer_, write_position_);
uint32_t write_size =
static_cast<uint32_t>(bytes_of_raw_data_to_transfer_ - write_position_);
if (write_size == 0) {
if (drain_complete_)
Finish();
return;
}
MojoResult result =
producer_handle_->WriteData(body_content_.data() + write_position_,
&write_size, MOJO_WRITE_DATA_FLAG_NONE);
if (result == MOJO_RESULT_SHOULD_WAIT) {
handle_watcher_->ArmOrNotify();
return;
}
if (result != MOJO_RESULT_OK) {
delete this;
return;
}
// |write_position_| should only be updated when the mojo pipe has
// successfully been written to.
write_position_ += write_size;
}
}
void StreamingSearchPrefetchURLLoader::Finish() {
serving_from_data_ = false;
handle_watcher_.reset();
producer_handle_.reset();
if (status_) {
forwarding_client_->OnComplete(status_.value());
}
}
void StreamingSearchPrefetchURLLoader::OnComplete(
const network::URLLoaderCompletionStatus& status) {
network_url_loader_.reset();
if (forwarding_client_ && !serving_from_data_) {
DCHECK(!streaming_prefetch_request_);
forwarding_client_->OnComplete(status);
return;
}
if (streaming_prefetch_request_) {
DCHECK(!forwarding_client_);
if (status.error_code == net::OK) {
streaming_prefetch_request_->MarkPrefetchAsComplete();
} else {
streaming_prefetch_request_->ErrorEncountered();
return;
}
}
status_ = status;
}
void StreamingSearchPrefetchURLLoader::RunEventQueue() {
for (auto& event : event_queue_) {
std::move(event).Run();
}
event_queue_.clear();
}
void StreamingSearchPrefetchURLLoader::FollowRedirect(
const std::vector<std::string>& removed_headers,
const net::HttpRequestHeaders& modified_headers,
const net::HttpRequestHeaders& modified_cors_exempt_headers,
const base::Optional<GURL>& new_url) {
// This should never be called for a non-network service URLLoader.
NOTREACHED();
}
void StreamingSearchPrefetchURLLoader::SetPriority(
net::RequestPriority priority,
int32_t intra_priority_value) {
// Pass through.
if (network_url_loader_)
network_url_loader_->SetPriority(priority, intra_priority_value);
}
void StreamingSearchPrefetchURLLoader::PauseReadingBodyFromNet() {
// Pass through.
if (network_url_loader_)
network_url_loader_->PauseReadingBodyFromNet();
}
void StreamingSearchPrefetchURLLoader::ResumeReadingBodyFromNet() {
// Pass through.
if (network_url_loader_)
network_url_loader_->ResumeReadingBodyFromNet();
}
void StreamingSearchPrefetchURLLoader::OnURLLoaderMojoDisconnect() {
if (!network_url_loader_) {
// The connection should close after complete.
return;
}
if (!forwarding_client_) {
DCHECK(streaming_prefetch_request_);
streaming_prefetch_request_->ErrorEncountered();
} else {
delete this;
}
}
void StreamingSearchPrefetchURLLoader::OnURLLoaderClientMojoDisconnect() {
DCHECK(forwarding_client_);
DCHECK(!streaming_prefetch_request_);
delete this;
}
void StreamingSearchPrefetchURLLoader::ClearOwnerPointer() {
streaming_prefetch_request_ = nullptr;
}