blob: 9c2f4bcc4ae0534305f42c09c8e03430f2fdec25 [file] [log] [blame]
// Copyright 2024 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "net/http/http_stream_pool_job_controller.h"
#include <memory>
#include <optional>
#include <vector>
#include "base/memory/raw_ptr.h"
#include "base/notreached.h"
#include "base/task/sequenced_task_runner.h"
#include "base/time/time.h"
#include "base/values.h"
#include "net/base/load_flags.h"
#include "net/base/load_states.h"
#include "net/base/net_error_details.h"
#include "net/base/net_errors.h"
#include "net/base/port_util.h"
#include "net/base/request_priority.h"
#include "net/dns/public/resolve_error_info.h"
#include "net/http/alternative_service.h"
#include "net/http/http_network_session.h"
#include "net/http/http_stream_key.h"
#include "net/http/http_stream_pool.h"
#include "net/http/http_stream_pool_group.h"
#include "net/http/http_stream_pool_job.h"
#include "net/http/http_stream_pool_request_info.h"
#include "net/http/http_stream_request.h"
#include "net/log/net_log_event_type.h"
#include "net/log/net_log_with_source.h"
#include "net/quic/quic_chromium_client_session.h"
#include "net/quic/quic_http_stream.h"
#include "net/socket/next_proto.h"
#include "net/spdy/spdy_http_stream.h"
#include "net/ssl/ssl_cert_request_info.h"
#include "net/ssl/ssl_config.h"
#include "net/third_party/quiche/src/quiche/quic/core/quic_versions.h"
#include "url/scheme_host_port.h"
#include "url/url_constants.h"
namespace net {
// static
std::optional<HttpStreamPool::JobController::Alternative>
HttpStreamPool::JobController::CalculateAlternative(
HttpStreamPool* pool,
const HttpStreamKey& origin_stream_key,
const HttpStreamPoolRequestInfo& request_info,
bool enable_alternative_services) {
const NextProto protocol = request_info.alternative_service_info.protocol();
if (!enable_alternative_services || protocol == NextProto::kProtoUnknown) {
return std::nullopt;
}
CHECK(protocol == NextProto::kProtoHTTP2 ||
protocol == NextProto::kProtoQUIC);
url::SchemeHostPort destination(
url::kHttpsScheme,
request_info.alternative_service_info.GetHostPortPair().host(),
request_info.alternative_service_info.GetHostPortPair().port());
// If the alternative endpoint's destination is the same as origin, we don't
// need an alternative job since the origin job will handle all protocols for
// the destination.
if (destination == request_info.destination) {
return std::nullopt;
}
HttpStreamKey stream_key(
destination, request_info.privacy_mode, request_info.socket_tag,
request_info.network_anonymization_key, request_info.secure_dns_policy,
request_info.disable_cert_network_fetches);
Alternative alternative = {
.stream_key = std::move(stream_key),
.protocol = request_info.alternative_service_info.protocol(),
.quic_version = quic::ParsedQuicVersion::Unsupported()};
if (protocol == NextProto::kProtoQUIC) {
alternative.quic_version =
pool->SelectQuicVersion(request_info.alternative_service_info);
alternative.quic_key =
origin_stream_key.CalculateQuicSessionAliasKey(std::move(destination));
}
return alternative;
}
HttpStreamPool::JobController::JobController(
HttpStreamPool* pool,
HttpStreamPoolRequestInfo request_info,
RequestPriority priority,
std::vector<SSLConfig::CertAndStatus> allowed_bad_certs,
bool enable_ip_based_pooling,
bool enable_alternative_services)
: pool_(pool),
priority_(priority),
allowed_bad_certs_(std::move(allowed_bad_certs)),
enable_ip_based_pooling_(enable_ip_based_pooling),
enable_alternative_services_(enable_alternative_services),
respect_limits_(request_info.load_flags & LOAD_IGNORE_LIMITS
? RespectLimits::kIgnore
: RespectLimits::kRespect),
is_http1_allowed_(request_info.is_http1_allowed),
proxy_info_(request_info.proxy_info),
alternative_service_info_(request_info.alternative_service_info),
origin_stream_key_(request_info.destination,
request_info.privacy_mode,
request_info.socket_tag,
request_info.network_anonymization_key,
request_info.secure_dns_policy,
request_info.disable_cert_network_fetches),
origin_quic_key_(origin_stream_key_.CalculateQuicSessionAliasKey()),
alternative_(CalculateAlternative(pool,
origin_stream_key_,
request_info,
enable_alternative_services_)),
net_log_(request_info.factory_job_controller_net_log),
created_time_(base::TimeTicks::Now()) {
net_log_.BeginEvent(
NetLogEventType::HTTP_STREAM_POOL_JOB_CONTROLLER_ALIVE, [&] {
base::Value::Dict dict;
dict.Set("origin_destination",
origin_stream_key_.destination().Serialize());
if (alternative_.has_value()) {
dict.Set("alternative_destination",
alternative_->stream_key.destination().Serialize());
}
dict.Set("enable_ip_based_pooling", enable_ip_based_pooling_);
dict.Set("enable_alternative_services", enable_alternative_services_);
dict.Set("respect_limits", respect_limits_ == RespectLimits::kRespect);
return dict;
});
CHECK(proxy_info_.is_direct());
if (!alternative_.has_value() &&
alternative_service_info_.protocol() == NextProto::kProtoQUIC) {
origin_quic_version_ = pool_->SelectQuicVersion(alternative_service_info_);
}
}
HttpStreamPool::JobController::~JobController() {
net_log_.EndEvent(NetLogEventType::HTTP_STREAM_POOL_JOB_CONTROLLER_ALIVE);
}
void HttpStreamPool::JobController::HandleStreamRequest(
HttpStreamRequest* stream_request,
HttpStreamRequest::Delegate* delegate) {
CHECK(stream_request);
CHECK(!delegate_);
CHECK(!stream_request_);
stream_request->SetHelperForSwitchingToPool(this);
delegate_ = delegate;
stream_request_ = stream_request;
if (pool_->delegate_for_testing_) {
pool_->delegate_for_testing_->OnRequestStream(origin_stream_key_);
}
if (!IsPortAllowedForScheme(origin_stream_key_.destination().port(),
origin_stream_key_.destination().scheme())) {
base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE,
base::BindOnce(&HttpStreamPool::JobController::CallOnStreamFailed,
weak_ptr_factory_.GetWeakPtr(), ERR_UNSAFE_PORT,
NetErrorDetails(), ResolveErrorInfo()));
return;
}
std::unique_ptr<HttpStream> quic_http_stream =
MaybeCreateStreamFromExistingQuicSession();
if (quic_http_stream) {
net_log_.AddEvent(
NetLogEventType::
HTTP_STREAM_POOL_JOB_CONTROLLER_FOUND_EXISTING_QUIC_SESSION);
base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE,
base::BindOnce(
&HttpStreamPool::JobController::CallRequestCompleteAndStreamReady,
weak_ptr_factory_.GetWeakPtr(), std::move(quic_http_stream),
NextProto::kProtoQUIC));
return;
}
SpdySessionKey spdy_session_key =
origin_stream_key_.CalculateSpdySessionKey();
base::WeakPtr<SpdySession> spdy_session = pool_->FindAvailableSpdySession(
origin_stream_key_, spdy_session_key, enable_ip_based_pooling_,
stream_request_->net_log());
if (spdy_session) {
net_log_.AddEvent(
NetLogEventType::
HTTP_STREAM_POOL_JOB_CONTROLLER_FOUND_EXISTING_SPDY_SESSION);
auto http_stream = std::make_unique<SpdyHttpStream>(
spdy_session, stream_request_->net_log().source(),
spdy_session_pool()->GetDnsAliasesForSessionKey(spdy_session_key));
base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE,
base::BindOnce(
&HttpStreamPool::JobController::CallRequestCompleteAndStreamReady,
weak_ptr_factory_.GetWeakPtr(), std::move(http_stream),
NextProto::kProtoHTTP2));
return;
}
if (alternative_.has_value()) {
alternative_job_ =
pool_
->GetOrCreateGroup(alternative_->stream_key, alternative_->quic_key)
.CreateJob(this, alternative_->quic_version, alternative_->protocol,
stream_request_->net_log());
alternative_job_->Start();
} else {
alternative_job_result_ = OK;
}
const bool alternative_job_succeeded = alternative_job_ &&
alternative_job_result_.has_value() &&
*alternative_job_result_ == OK;
if (!alternative_job_succeeded) {
origin_job_ =
pool_->GetOrCreateGroup(origin_stream_key_, origin_quic_key_)
.CreateJob(this, origin_quic_version_, NextProto::kProtoUnknown,
stream_request_->net_log());
origin_job_->Start();
}
}
int HttpStreamPool::JobController::Preconnect(
size_t num_streams,
CompletionOnceCallback callback) {
num_streams = std::min(kDefaultMaxStreamSocketsPerGroup, num_streams);
if (!IsPortAllowedForScheme(origin_stream_key_.destination().port(),
origin_stream_key_.destination().scheme())) {
return ERR_UNSAFE_PORT;
}
if (CanUseExistingQuicSession()) {
net_log_.AddEvent(
NetLogEventType::
HTTP_STREAM_POOL_JOB_CONTROLLER_FOUND_EXISTING_QUIC_SESSION);
return OK;
}
SpdySessionKey spdy_session_key =
origin_stream_key_.CalculateSpdySessionKey();
if (pool_->FindAvailableSpdySession(origin_stream_key_, spdy_session_key,
/*enable_ip_based_pooling=*/true)) {
net_log_.AddEvent(
NetLogEventType::
HTTP_STREAM_POOL_JOB_CONTROLLER_FOUND_EXISTING_SPDY_SESSION);
return OK;
}
Group& group = pool_->GetOrCreateGroup(origin_stream_key_, origin_quic_key_);
if (group.ActiveStreamSocketCount() >= num_streams) {
return OK;
}
if (pool_->delegate_for_testing_) {
// Some tests expect OnPreconnect() is called after checking existing
// sessions.
std::optional<int> result = pool_->delegate_for_testing_->OnPreconnect(
origin_stream_key_, num_streams);
if (result.has_value()) {
return *result;
}
}
preconnect_callback_ = std::move(callback);
origin_job_ =
std::make_unique<Job>(this, &group, origin_quic_version_,
NextProto::kProtoUnknown, net_log_, num_streams);
origin_job_->Start();
return ERR_IO_PENDING;
}
RequestPriority HttpStreamPool::JobController::priority() const {
return priority_;
}
HttpStreamPool::RespectLimits HttpStreamPool::JobController::respect_limits()
const {
return respect_limits_;
}
const std::vector<SSLConfig::CertAndStatus>&
HttpStreamPool::JobController::allowed_bad_certs() const {
return allowed_bad_certs_;
}
bool HttpStreamPool::JobController::enable_ip_based_pooling() const {
return enable_ip_based_pooling_;
}
bool HttpStreamPool::JobController::enable_alternative_services() const {
return enable_alternative_services_;
}
bool HttpStreamPool::JobController::is_http1_allowed() const {
return is_http1_allowed_;
}
const ProxyInfo& HttpStreamPool::JobController::proxy_info() const {
return proxy_info_;
}
const NetLogWithSource& HttpStreamPool::JobController::net_log() const {
return net_log_;
}
void HttpStreamPool::JobController::OnStreamReady(
Job* job,
std::unique_ptr<HttpStream> stream,
NextProto negotiated_protocol) {
SetJobResult(job, OK);
// Use PostTask to align the behavior with HttpStreamFactory::Job, see
// https://crrev.com/2827533002.
// TODO(crbug.com/346835898): Avoid using PostTask here if possible.
base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE,
base::BindOnce(&JobController::CallRequestCompleteAndStreamReady,
weak_ptr_factory_.GetWeakPtr(), std::move(stream),
negotiated_protocol));
}
void HttpStreamPool::JobController::OnStreamFailed(
Job* job,
int status,
const NetErrorDetails& net_error_details,
ResolveErrorInfo resolve_error_info) {
stream_request_->AddConnectionAttempts(job->connection_attempts());
SetJobResult(job, status);
if (AllJobsFinished()) {
// Use PostTask to align the behavior with HttpStreamFactory::Job, see
// https://crrev.com/2827533002.
// TODO(crbug.com/346835898): Avoid using PostTask here if possible.
base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE,
base::BindOnce(&JobController::CallOnStreamFailed,
weak_ptr_factory_.GetWeakPtr(), status,
net_error_details, std::move(resolve_error_info)));
}
}
void HttpStreamPool::JobController::OnCertificateError(
Job* job,
int status,
const SSLInfo& ssl_info) {
stream_request_->AddConnectionAttempts(job->connection_attempts());
CancelOtherJob(job);
// Use PostTask to align the behavior with HttpStreamFactory::Job, see
// https://crrev.com/2827533002.
// TODO(crbug.com/346835898): Avoid using PostTask here if possible.
base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE,
base::BindOnce(&JobController::CallOnCertificateError,
weak_ptr_factory_.GetWeakPtr(), status, ssl_info));
}
void HttpStreamPool::JobController::OnNeedsClientAuth(
Job* job,
SSLCertRequestInfo* cert_info) {
stream_request_->AddConnectionAttempts(job->connection_attempts());
CancelOtherJob(job);
// Use PostTask to align the behavior with HttpStreamFactory::Job, see
// https://crrev.com/2827533002.
// TODO(crbug.com/346835898): Avoid using PostTask here if possible.
base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE, base::BindOnce(&JobController::CallOnNeedsClientAuth,
weak_ptr_factory_.GetWeakPtr(),
base::RetainedRef(cert_info)));
}
void HttpStreamPool::JobController::OnPreconnectComplete(Job* job, int status) {
base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE,
base::BindOnce(&JobController::ResetJobAndInvokePreconnectCallback,
weak_ptr_factory_.GetWeakPtr(), job, status));
}
LoadState HttpStreamPool::JobController::GetLoadState() const {
CHECK(stream_request_);
if (stream_request_->completed()) {
return LOAD_STATE_IDLE;
}
if (origin_job_) {
return origin_job_->GetLoadState();
}
if (alternative_job_) {
return alternative_job_->GetLoadState();
}
return LOAD_STATE_IDLE;
}
void HttpStreamPool::JobController::OnRequestComplete() {
delegate_ = nullptr;
stream_request_ = nullptr;
origin_job_.reset();
alternative_job_.reset();
MaybeMarkAlternativeServiceBroken();
pool_->OnJobControllerComplete(this);
// `this` is deleted.
}
int HttpStreamPool::JobController::RestartTunnelWithProxyAuth() {
NOTREACHED();
}
void HttpStreamPool::JobController::SetPriority(RequestPriority priority) {
priority_ = priority;
if (origin_job_) {
origin_job_->SetPriority(priority);
}
if (alternative_job_) {
alternative_job_->SetPriority(priority);
}
}
base::Value::Dict HttpStreamPool::JobController::GetInfoAsValue() const {
base::Value::Dict dict;
dict.Set("origin_stream_key", origin_stream_key_.ToValue());
if (alternative_.has_value()) {
dict.Set("alternative_stream_key", alternative_->stream_key.ToValue());
}
base::TimeDelta elapsed = base::TimeTicks::Now() - created_time_;
dict.Set("elapsed_ms", static_cast<int>(elapsed.InMilliseconds()));
return dict;
}
QuicSessionPool* HttpStreamPool::JobController::quic_session_pool() {
return pool_->http_network_session()->quic_session_pool();
}
SpdySessionPool* HttpStreamPool::JobController::spdy_session_pool() {
return pool_->http_network_session()->spdy_session_pool();
}
std::unique_ptr<HttpStream>
HttpStreamPool::JobController::MaybeCreateStreamFromExistingQuicSession() {
std::unique_ptr<HttpStream> stream =
MaybeCreateStreamFromExistingQuicSessionInternal(origin_quic_key_);
if (stream) {
return stream;
}
if (alternative_.has_value()) {
stream = MaybeCreateStreamFromExistingQuicSessionInternal(
alternative_->quic_key);
}
return stream;
}
std::unique_ptr<HttpStream>
HttpStreamPool::JobController::MaybeCreateStreamFromExistingQuicSessionInternal(
const QuicSessionAliasKey& key) {
if (!key.destination().IsValid() ||
!pool_->CanUseQuic(
key.destination(), key.session_key().network_anonymization_key(),
enable_ip_based_pooling_, enable_alternative_services_)) {
return nullptr;
}
QuicChromiumClientSession* quic_session =
quic_session_pool()->FindExistingSession(key.session_key(),
key.destination());
if (quic_session) {
return std::make_unique<QuicHttpStream>(
quic_session->CreateHandle(key.destination()),
quic_session->GetDnsAliasesForSessionKey(key.session_key()));
}
if (alternative_.has_value()) {
return nullptr;
}
return nullptr;
}
bool HttpStreamPool::JobController::CanUseExistingQuicSession() {
return pool_->CanUseExistingQuicSession(
origin_quic_key_, enable_ip_based_pooling_, enable_alternative_services_);
}
void HttpStreamPool::JobController::CallRequestCompleteAndStreamReady(
std::unique_ptr<HttpStream> stream,
NextProto negotiated_protocol) {
CHECK(stream_request_);
CHECK(delegate_);
stream_request_->Complete(negotiated_protocol,
ALTERNATE_PROTOCOL_USAGE_UNSPECIFIED_REASON);
delegate_->OnStreamReady(proxy_info_, std::move(stream));
}
void HttpStreamPool::JobController::CallOnStreamFailed(
int status,
const NetErrorDetails& net_error_details,
ResolveErrorInfo resolve_error_info) {
delegate_->OnStreamFailed(status, net_error_details, proxy_info_,
std::move(resolve_error_info));
}
void HttpStreamPool::JobController::CallOnCertificateError(
int status,
const SSLInfo& ssl_info) {
delegate_->OnCertificateError(status, ssl_info);
}
void HttpStreamPool::JobController::CallOnNeedsClientAuth(
SSLCertRequestInfo* cert_info) {
delegate_->OnNeedsClientAuth(cert_info);
}
void HttpStreamPool::JobController::ResetJobAndInvokePreconnectCallback(
Job* job,
int status) {
CHECK(!alternative_job_);
CHECK_EQ(origin_job_.get(), job);
CHECK(preconnect_callback_);
origin_job_.reset();
std::move(preconnect_callback_).Run(status);
}
void HttpStreamPool::JobController::SetJobResult(Job* job, int status) {
if (origin_job_.get() == job) {
origin_job_result_ = status;
} else if (alternative_job_.get() == job) {
alternative_job_result_ = status;
} else {
NOTREACHED();
}
}
void HttpStreamPool::JobController::CancelOtherJob(Job* job) {
if (origin_job_.get() == job) {
alternative_job_.reset();
} else if (alternative_job_.get() == job) {
origin_job_.reset();
} else {
NOTREACHED();
}
}
bool HttpStreamPool::JobController::AllJobsFinished() {
return origin_job_result_.has_value() && alternative_job_result_.has_value();
}
void HttpStreamPool::JobController::MaybeMarkAlternativeServiceBroken() {
// If alternative job succeeds or not completed, no brokenness to report.
if (!alternative_job_result_.has_value() || *alternative_job_result_ == OK) {
return;
}
// No brokenness to report if the origin job fails.
if (origin_job_result_.has_value() && *origin_job_result_ != OK) {
return;
}
CHECK(alternative_.has_value());
pool_->http_network_session()
->http_server_properties()
->MarkAlternativeServiceBroken(
alternative_service_info_.alternative_service(),
alternative_->stream_key.network_anonymization_key());
}
} // namespace net