blob: 2d2473ac792210016c04be345ad3ba905b7f895f [file] [log] [blame]
// Copyright 2024 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "net/http/http_stream_pool_job.h"
#include <memory>
#include <utility>
#include "base/containers/contains.h"
#include "base/functional/bind.h"
#include "base/notreached.h"
#include "base/task/sequenced_task_runner.h"
#include "base/time/time.h"
#include "base/timer/timer.h"
#include "net/base/completion_once_callback.h"
#include "net/base/host_port_pair.h"
#include "net/base/load_states.h"
#include "net/base/load_timing_info.h"
#include "net/base/net_errors.h"
#include "net/base/request_priority.h"
#include "net/dns/host_resolver.h"
#include "net/http/http_network_session.h"
#include "net/http/http_stream_key.h"
#include "net/http/http_stream_pool_group.h"
#include "net/http/http_stream_pool_handle.h"
#include "net/log/net_log_with_source.h"
#include "net/socket/stream_attempt.h"
#include "net/socket/tcp_stream_attempt.h"
#include "net/socket/tls_stream_attempt.h"
#include "net/spdy/spdy_http_stream.h"
#include "net/spdy/spdy_session.h"
#include "net/ssl/ssl_cert_request_info.h"
namespace net {
HttpStreamPool::Job::RequestEntry::RequestEntry(Job* job) : job_(job) {
CHECK(job_);
}
std::unique_ptr<HttpStreamRequest>
HttpStreamPool::Job::RequestEntry::CreateRequest(
HttpStreamRequest::Delegate* delegate,
const NetLogWithSource& net_log) {
CHECK(!delegate_);
CHECK(delegate);
delegate_ = delegate;
auto request = std::make_unique<HttpStreamRequest>(
this, /*websocket_handshake_stream_create_helper=*/nullptr, net_log,
HttpStreamRequest::HTTP_STREAM);
request_ = request.get();
return request;
}
LoadState HttpStreamPool::Job::RequestEntry::GetLoadState() const {
CHECK(request_);
if (request_->completed()) {
return LOAD_STATE_IDLE;
}
return job_->GetLoadState();
}
void HttpStreamPool::Job::RequestEntry::OnRequestComplete() {
CHECK(request_);
CHECK(delegate_);
request_ = nullptr;
delegate_ = nullptr;
job_->OnRequestComplete(this);
// `this` is deleted.
}
int HttpStreamPool::Job::RequestEntry::RestartTunnelWithProxyAuth() {
NOTREACHED_NORETURN();
}
void HttpStreamPool::Job::RequestEntry::SetPriority(RequestPriority priority) {
CHECK(request_);
job_->SetRequestPriority(request_, priority);
}
HttpStreamPool::Job::RequestEntry::~RequestEntry() = default;
// Represents an in-flight stream attempt.
struct HttpStreamPool::Job::InFlightAttempt {
explicit InFlightAttempt(std::unique_ptr<StreamAttempt> attempt)
: attempt(std::move(attempt)) {}
InFlightAttempt(const InFlightAttempt&) = delete;
InFlightAttempt& operator=(const InFlightAttempt&) = delete;
~InFlightAttempt() = default;
std::unique_ptr<StreamAttempt> attempt;
// Timer to start a next attempt. When fired, `this` is treated as a slow
// attempt but `this` is not timed out yet.
base::OneShotTimer slow_timer;
bool is_slow = false;
};
// Represents a preconnect request.
struct HttpStreamPool::Job::PreconnectEntry {
PreconnectEntry(size_t num_streams, CompletionOnceCallback callback)
: num_streams(num_streams), callback(std::move(callback)) {}
PreconnectEntry(const PreconnectEntry&) = delete;
PreconnectEntry& operator=(const PreconnectEntry&) = delete;
~PreconnectEntry() = default;
size_t num_streams;
CompletionOnceCallback callback;
// Set to the latest error when errors happened.
int result = OK;
};
HttpStreamPool::Job::Job(Group* group, NetLog* net_log)
: group_(group),
net_log_(NetLogWithSource::Make(net_log,
NetLogSourceType::HTTP_STREAM_POOL_JOB)),
requests_(NUM_PRIORITIES) {
net_log_.BeginEventReferencingSource(
NetLogEventType::HTTP_STREAM_POOL_JOB_ALIVE, group_->net_log().source());
group_->net_log().AddEventReferencingSource(
NetLogEventType::HTTP_STREAM_POOL_GROUP_JOB_CREATED, net_log_.source());
proxy_info_.UseDirect();
CHECK(group_);
}
HttpStreamPool::Job::~Job() {
net_log().EndEvent(NetLogEventType::HTTP_STREAM_POOL_JOB_ALIVE);
group_->net_log().AddEventReferencingSource(
NetLogEventType::HTTP_STREAM_POOL_GROUP_JOB_DESTROYED, net_log_.source());
}
std::unique_ptr<HttpStreamRequest> HttpStreamPool::Job::RequestStream(
HttpStreamRequest::Delegate* delegate,
RequestPriority priority,
const std::vector<SSLConfig::CertAndStatus>& allowed_bad_certs,
bool enable_ip_based_pooling,
const NetLogWithSource& net_log) {
auto entry = std::make_unique<RequestEntry>(this);
std::unique_ptr<HttpStreamRequest> request =
entry->CreateRequest(delegate, net_log);
requests_.Insert(std::move(entry), priority);
if (is_failing_) {
// `this` is failing, notify the failure.
base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE, base::BindOnce(&Job::NotifyStreamRequestOfFailure,
weak_ptr_factory_.GetWeakPtr()));
return request;
}
if (!enable_ip_based_pooling) {
enable_ip_based_pooling_ = enable_ip_based_pooling;
}
MaybeChangeServiceEndpointRequestPriority();
// Check if we already have SPDY session. When found, notify the request that
// an HttpStream is ready. Use PostTask() since `delegate` doesn't expect the
// request finishes synchronously.
if (!spdy_session_) {
spdy_session_ =
http_network_session()->spdy_session_pool()->FindAvailableSession(
spdy_session_key(), enable_ip_based_pooling_,
/*is_websocket=*/false, net_log);
}
if (spdy_session_) {
base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE, base::BindOnce(&Job::CreateSpdyStreamAndNotify,
weak_ptr_factory_.GetWeakPtr()));
return request;
}
// Check idle streams. If found, notify the request that an HttpStream is
// ready. Use PostTask() since `delegate` doesn't expect the request finishes
// synchronously.
std::unique_ptr<StreamSocket> stream_socket = group_->GetIdleStreamSocket();
if (stream_socket) {
base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE,
base::BindOnce(&Job::CreateTextBasedStreamAndNotify,
weak_ptr_factory_.GetWeakPtr(), std::move(stream_socket),
LoadTimingInfo::ConnectTiming()));
return request;
}
allowed_bad_certs_ = allowed_bad_certs;
StartInternal(priority);
return request;
}
int HttpStreamPool::Job::Preconnect(size_t num_streams,
CompletionOnceCallback callback) {
if (is_failing_) {
return error_to_notify_;
}
if (spdy_session_pool()->HasAvailableSession(spdy_session_key(),
/*is_websocket=*/false)) {
CHECK(!RequiresHTTP11());
return OK;
}
if (group_->ActiveStreamSocketCount() >= num_streams) {
return OK;
}
auto entry =
std::make_unique<PreconnectEntry>(num_streams, std::move(callback));
preconnects_.emplace(std::move(entry));
StartInternal(RequestPriority::IDLE);
return ERR_IO_PENDING;
}
void HttpStreamPool::Job::OnServiceEndpointsUpdated() {
ProcessServiceEndpointChanges();
}
void HttpStreamPool::Job::OnServiceEndpointRequestFinished(int rv) {
CHECK(!service_endpoint_request_finished_);
CHECK(service_endpoint_request_);
service_endpoint_request_finished_ = true;
dns_resolution_end_time_ = base::TimeTicks::Now();
resolve_error_info_ = service_endpoint_request_->GetResolveErrorInfo();
if (rv != OK) {
error_to_notify_ = rv;
NotifyFailure();
return;
}
CHECK(!service_endpoint_request_->GetEndpointResults().empty());
ProcessServiceEndpointChanges();
}
int HttpStreamPool::Job::WaitForSSLConfigReady(
CompletionOnceCallback callback) {
if (ssl_config_.has_value()) {
return OK;
}
ssl_config_waiting_callbacks_.emplace_back(std::move(callback));
return ERR_IO_PENDING;
}
SSLConfig HttpStreamPool::Job::GetSSLConfig() {
CHECK(ssl_config_.has_value());
return *ssl_config_;
}
void HttpStreamPool::Job::ProcessPendingRequest() {
if (PendingRequestCount() == 0) {
return;
}
std::unique_ptr<StreamSocket> stream_socket = group_->GetIdleStreamSocket();
if (stream_socket) {
CreateTextBasedStreamAndNotify(std::move(stream_socket),
LoadTimingInfo::ConnectTiming());
return;
}
MaybeAttemptConnection(/*max_attempts=*/1);
}
void HttpStreamPool::Job::CancelInFlightAttempts() {
pool()->DecrementTotalConnectingStreamCount(in_flight_attempts_.size());
in_flight_attempts_.clear();
slow_attempt_count_ = 0;
}
void HttpStreamPool::Job::CancelRequests(int error) {
error_to_notify_ = error;
is_canceling_requests_ = true;
NotifyFailure();
}
size_t HttpStreamPool::Job::PendingRequestCount() const {
return PendingCountInternal(requests_.size());
}
size_t HttpStreamPool::Job::PendingPreconnectCount() const {
size_t num_streams = 0;
for (const auto& entry : preconnects_) {
num_streams = std::max(num_streams, entry->num_streams);
}
return PendingCountInternal(num_streams);
}
const HttpStreamKey& HttpStreamPool::Job::stream_key() const {
return group_->stream_key();
}
const SpdySessionKey& HttpStreamPool::Job::spdy_session_key() const {
return group_->spdy_session_key();
}
HttpNetworkSession* HttpStreamPool::Job::http_network_session() {
return group_->http_network_session();
}
SpdySessionPool* HttpStreamPool::Job::spdy_session_pool() {
return http_network_session()->spdy_session_pool();
}
HttpStreamPool* HttpStreamPool::Job::pool() {
return group_->pool();
}
const HttpStreamPool* HttpStreamPool::Job::pool() const {
return group_->pool();
}
const NetLogWithSource& HttpStreamPool::Job::net_log() {
return net_log_;
}
bool HttpStreamPool::Job::UsingTls() const {
return GURL::SchemeIsCryptographic(stream_key().destination().scheme());
}
bool HttpStreamPool::Job::RequiresHTTP11() {
return http_network_session()->http_server_properties()->RequiresHTTP11(
stream_key().destination(), stream_key().network_anonymization_key());
}
LoadState HttpStreamPool::Job::GetLoadState() const {
if (group_->ReachedMaxStreamLimit()) {
return LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET;
}
if (pool()->ReachedMaxStreamLimit()) {
return LOAD_STATE_WAITING_FOR_STALLED_SOCKET_POOL;
}
LoadState load_state = LOAD_STATE_IDLE;
// When there are in-flight attempts, use most advanced one.
for (const auto& in_flight_attempt : in_flight_attempts_) {
load_state =
std::max(load_state, in_flight_attempt->attempt->GetLoadState());
// There should not be a load state later than LOAD_STATE_SSL_HANDSHAKE.
if (load_state == LOAD_STATE_SSL_HANDSHAKE) {
break;
}
}
if (load_state != LOAD_STATE_IDLE) {
return load_state;
}
if (service_endpoint_request_ && !service_endpoint_request_finished_) {
return LOAD_STATE_RESOLVING_HOST;
}
return LOAD_STATE_IDLE;
}
RequestPriority HttpStreamPool::Job::GetPriority() const {
CHECK(!requests_.empty());
return static_cast<RequestPriority>(requests_.FirstMax().priority());
}
bool HttpStreamPool::Job::IsStalledByPoolLimit() {
if (!GetIPEndPointToAttempt().has_value()) {
return false;
}
switch (CanAttemptConnection()) {
case CanAttemptResult::kAttempt:
case CanAttemptResult::kReachedPoolLimit:
return true;
case CanAttemptResult::kNoPendingRequest:
case CanAttemptResult::kThrottledForSpdy:
case CanAttemptResult::kReachedGroupLimit:
return false;
}
}
void HttpStreamPool::Job::StartInternal(RequestPriority priority) {
if (service_endpoint_request_ || service_endpoint_request_finished_) {
MaybeAttemptConnection();
} else {
ResolveServiceEndpoint(priority);
}
}
void HttpStreamPool::Job::ResolveServiceEndpoint(
RequestPriority initial_priority) {
CHECK(!service_endpoint_request_);
HostResolver::ResolveHostParameters parameters;
parameters.initial_priority = initial_priority;
parameters.secure_dns_policy = stream_key().secure_dns_policy();
service_endpoint_request_ =
http_network_session()->host_resolver()->CreateServiceEndpointRequest(
HostResolver::Host(stream_key().destination()),
stream_key().network_anonymization_key(), net_log(),
std::move(parameters));
dns_resolution_start_time_ = base::TimeTicks::Now();
int rv = service_endpoint_request_->Start(this);
if (rv != ERR_IO_PENDING) {
OnServiceEndpointRequestFinished(rv);
}
}
void HttpStreamPool::Job::MaybeChangeServiceEndpointRequestPriority() {
if (service_endpoint_request_ && !service_endpoint_request_finished_) {
service_endpoint_request_->ChangeRequestPriority(GetPriority());
}
}
void HttpStreamPool::Job::ProcessServiceEndpointChanges() {
if (CanUseExistingSessionAfterEndpointChanges()) {
return;
}
MaybeCalculateSSLConfig();
MaybeAttemptConnection();
}
bool HttpStreamPool::Job::CanUseExistingSessionAfterEndpointChanges() {
CHECK(service_endpoint_request_);
if (spdy_session_) {
return true;
}
if (!enable_ip_based_pooling_) {
return false;
}
for (const auto& endpoint : service_endpoint_request_->GetEndpointResults()) {
spdy_session_ =
spdy_session_pool()->FindMatchingIpSessionForServiceEndpoint(
spdy_session_key(), endpoint,
service_endpoint_request_->GetDnsAliasResults());
if (spdy_session_) {
group_->Refresh();
// Use PostTask() because we could reach here from RequestStream()
// synchronously when the DNS resolution finishes immediately.
base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE, base::BindOnce(&Job::CreateSpdyStreamAndNotify,
weak_ptr_factory_.GetWeakPtr()));
return true;
}
}
return false;
}
void HttpStreamPool::Job::MaybeCalculateSSLConfig() {
if (!UsingTls() || ssl_config_.has_value()) {
return;
}
CHECK(service_endpoint_request_);
if (!service_endpoint_request_->EndpointsCryptoReady()) {
return;
}
SSLConfig ssl_config;
ssl_config.allowed_bad_certs = allowed_bad_certs_;
ssl_config.privacy_mode = stream_key().privacy_mode();
ssl_config.disable_cert_verification_network_fetches =
stream_key().disable_cert_network_fetches();
ssl_config.early_data_enabled =
http_network_session()->params().enable_early_data;
ssl_config.alpn_protos = http_network_session()->GetAlpnProtos();
ssl_config.application_settings =
http_network_session()->GetApplicationSettings();
http_network_session()->http_server_properties()->MaybeForceHTTP11(
stream_key().destination(), stream_key().network_anonymization_key(),
&ssl_config);
ssl_config.ignore_certificate_errors =
http_network_session()->params().ignore_certificate_errors;
ssl_config.network_anonymization_key =
stream_key().network_anonymization_key();
// TODO(crbug.com/346835898): Support ECH.
ssl_config_.emplace(std::move(ssl_config));
// Restart slow timer for in-flight attempts that have already completed
// TCP handshakes.
for (auto& in_flight_attempt : in_flight_attempts_) {
if (!in_flight_attempt->is_slow &&
!in_flight_attempt->slow_timer.IsRunning()) {
// TODO(crbug.com/346835898): Should we use a different delay other than
// the connection attempt delay?
// base::Unretained() is safe here because `this` owns the
// `in_flight_attempt` and `slow_timer`.
in_flight_attempt->slow_timer.Start(
FROM_HERE, kConnectionAttemptDelay,
base::BindOnce(&Job::OnInFlightAttemptSlow, base::Unretained(this),
in_flight_attempt.get()));
}
}
for (auto& callback : ssl_config_waiting_callbacks_) {
std::move(callback).Run(OK);
}
ssl_config_waiting_callbacks_.clear();
}
void HttpStreamPool::Job::MaybeAttemptConnection(
std::optional<size_t> max_attempts) {
if (PendingRequestCount() == 0 && preconnects_.empty()) {
// There are no requests waiting for streams.
return;
}
CHECK(!preconnects_.empty() || group_->IdleStreamSocketCount() == 0);
// TODO(crbug.com/346835898): Ensure that we don't attempt connections when
// failing or creating HttpStream on top of a SPDY session.
CHECK(!is_failing_);
CHECK(!spdy_session_);
std::optional<IPEndPoint> ip_endpoint = GetIPEndPointToAttempt();
if (!ip_endpoint.has_value()) {
if (service_endpoint_request_finished_ && in_flight_attempts_.empty()) {
// Tried all endpoints.
NotifyFailure();
}
return;
}
// There might be multiple pending requests. Make attempts as much as needed
// and allowed.
size_t num_attempts = 0;
const bool using_tls = UsingTls();
while (IsConnectionAttemptReady()) {
std::unique_ptr<StreamAttempt> attempt;
if (using_tls) {
attempt = std::make_unique<TlsStreamAttempt>(
pool()->stream_attempt_params(), *ip_endpoint,
HostPortPair::FromSchemeHostPort(stream_key().destination()),
/*ssl_config_provider=*/this);
} else {
attempt = std::make_unique<TcpStreamAttempt>(
pool()->stream_attempt_params(), *ip_endpoint);
}
net_log().AddEventReferencingSource(
NetLogEventType::HTTP_STREAM_POOL_JOB_ATTEMPT_START,
attempt->net_log().source());
net_log().AddEvent(
NetLogEventType::HTTP_STREAM_POOL_JOB_ATTEMPT_START, [&] {
base::Value::Dict dict;
dict.Set("num_requests", static_cast<int>(requests_.size()));
dict.Set("num_preconnects", static_cast<int>(preconnects_.size()));
dict.Set("num_inflight_attempts",
static_cast<int>(in_flight_attempts_.size()));
dict.Set("num_slow_attempts", static_cast<int>(slow_attempt_count_));
attempt->net_log().source().AddToEventParameters(dict);
return dict;
});
auto in_flight_attempt =
std::make_unique<InFlightAttempt>(std::move(attempt));
InFlightAttempt* raw_attempt = in_flight_attempt.get();
in_flight_attempts_.emplace(std::move(in_flight_attempt));
pool()->IncrementTotalConnectingStreamCount();
int rv = raw_attempt->attempt->Start(base::BindOnce(
&Job::OnInFlightAttemptComplete, base::Unretained(this), raw_attempt));
// Add NetLog dependency after Start() so that the first event of the
// attempt can have meaningful description in the NetLog viewer.
raw_attempt->attempt->net_log().AddEventReferencingSource(
NetLogEventType::STREAM_ATTEMPT_BOUND_TO_POOL, net_log().source());
if (rv != ERR_IO_PENDING) {
base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE, base::BindOnce(&Job::OnInFlightAttemptComplete,
base::Unretained(this), raw_attempt, rv));
} else {
raw_attempt->slow_timer.Start(
FROM_HERE, kConnectionAttemptDelay,
base::BindOnce(&Job::OnInFlightAttemptSlow, base::Unretained(this),
raw_attempt));
if (using_tls) {
static_cast<TlsStreamAttempt*>(raw_attempt->attempt.get())
->SetTcpHandshakeCompletionCallback(
base::BindOnce(&Job::OnInFlightAttemptTcpHandshakeComplete,
base::Unretained(this), raw_attempt));
}
}
++num_attempts;
if (max_attempts.has_value() && num_attempts >= *max_attempts) {
break;
}
}
}
bool HttpStreamPool::Job::IsConnectionAttemptReady() {
switch (CanAttemptConnection()) {
case CanAttemptResult::kAttempt:
return true;
case CanAttemptResult::kNoPendingRequest:
return false;
case CanAttemptResult::kThrottledForSpdy:
// TODO(crbug.com/346835898): Consider throttling less aggressively (e.g.
// allow TCP handshake but throttle TLS handshake) so that endpoints we've
// used HTTP/2 on aren't penalised on slow or lossy connections.
if (!spdy_throttle_timer_.IsRunning()) {
spdy_throttle_timer_.Start(
FROM_HERE, kSpdyThrottleDelay,
base::BindOnce(&Job::OnSpdyThrottleDelayPassed,
base::Unretained(this)));
}
return false;
case CanAttemptResult::kReachedGroupLimit:
// TODO(crbug.com/346835898): Better to handle cases where we partially
// attempted some connections.
NotifyPreconnectsComplete(ERR_PRECONNECT_MAX_SOCKET_LIMIT);
return false;
case CanAttemptResult::kReachedPoolLimit:
// If we can't attempt connection due to the pool's limit, try to close an
// idle stream in the pool.
if (!pool()->CloseOneIdleStreamSocket()) {
// TODO(crbug.com/346835898): Better to handle cases where we partially
// attempted some connections.
NotifyPreconnectsComplete(ERR_PRECONNECT_MAX_SOCKET_LIMIT);
return false;
}
return true;
}
}
HttpStreamPool::Job::CanAttemptResult
HttpStreamPool::Job::CanAttemptConnection() {
size_t pending_count =
std::max(PendingRequestCount(), PendingPreconnectCount());
if (pending_count == 0) {
return CanAttemptResult::kNoPendingRequest;
}
if (ShouldThrottleAttemptForSpdy()) {
return CanAttemptResult::kThrottledForSpdy;
}
if (group_->ReachedMaxStreamLimit()) {
return CanAttemptResult::kReachedGroupLimit;
}
if (pool()->ReachedMaxStreamLimit()) {
return CanAttemptResult::kReachedPoolLimit;
}
return CanAttemptResult::kAttempt;
}
bool HttpStreamPool::Job::ShouldThrottleAttemptForSpdy() {
if (!http_network_session()->http_server_properties()->GetSupportsSpdy(
stream_key().destination(),
stream_key().network_anonymization_key())) {
return false;
}
CHECK(UsingTls());
// The first attempt should not be blocked.
if (in_flight_attempts_.empty()) {
return false;
}
if (spdy_throttle_delay_passed_) {
return false;
}
CHECK(!spdy_session_);
return true;
}
size_t HttpStreamPool::Job::PendingCountInternal(size_t pending_count) const {
CHECK_GE(in_flight_attempts_.size(), slow_attempt_count_);
// When SPDY throttle delay passed, treat all in-flight attempts as non-slow,
// to avoid attempting connections more than requested.
// TODO(crbug.com/346835898): This behavior is tricky. Figure out a better
// way to handle this situation.
size_t slow_count = spdy_throttle_delay_passed_ ? 0 : slow_attempt_count_;
size_t non_slow_count = in_flight_attempts_.size() - slow_count;
// The number of in-flight, non-slow attempts could be larger than the number
// of requests (e.g. a request was cancelled in the middle of an attempt).
if (pending_count <= non_slow_count) {
return 0;
}
return pending_count - non_slow_count;
}
std::optional<IPEndPoint> HttpStreamPool::Job::GetIPEndPointToAttempt() {
if (!service_endpoint_request_ ||
service_endpoint_request_->GetEndpointResults().empty()) {
return std::nullopt;
}
// Look for an IPEndPoint from the preferred address family first.
for (auto& endpoint : service_endpoint_request_->GetEndpointResults()) {
std::optional<IPEndPoint> ip_endpoint =
prefer_ipv6_ ? FindPreferredIPEndpoint(endpoint.ipv6_endpoints)
: FindPreferredIPEndpoint(endpoint.ipv4_endpoints);
if (ip_endpoint.has_value()) {
return ip_endpoint;
}
}
// If there is no IPEndPoint from the preferred address family, check the
// another address family.
for (auto& endpoint : service_endpoint_request_->GetEndpointResults()) {
std::optional<IPEndPoint> ip_endpoint =
prefer_ipv6_ ? FindPreferredIPEndpoint(endpoint.ipv4_endpoints)
: FindPreferredIPEndpoint(endpoint.ipv6_endpoints);
if (ip_endpoint.has_value()) {
return ip_endpoint;
}
}
return std::nullopt;
}
std::optional<IPEndPoint> HttpStreamPool::Job::FindPreferredIPEndpoint(
const std::vector<IPEndPoint>& ip_endpoints) {
// Prefer the first unattempted endpoint in `ip_endpoints`. Allow to use
// the first slow endpoint when SPDY throttle delay passed.
std::optional<IPEndPoint> slow_endpoint;
for (const auto& ip_endpoint : ip_endpoints) {
if (base::Contains(failed_ip_endpoints_, ip_endpoint)) {
continue;
}
if (base::Contains(slow_ip_endpoints_, ip_endpoint)) {
if (!slow_endpoint.has_value()) {
slow_endpoint = ip_endpoint;
}
continue;
}
return ip_endpoint;
}
if (spdy_throttle_delay_passed_) {
return slow_endpoint;
}
return std::nullopt;
}
HttpStreamPool::Job::FailureKind HttpStreamPool::Job::DetermineFailureKind() {
if (is_canceling_requests_) {
return FailureKind::kStreamFailed;
}
if (IsCertificateError(error_to_notify_)) {
return FailureKind::kCertifcateError;
}
if (error_to_notify_ == ERR_SSL_CLIENT_AUTH_CERT_NEEDED) {
return FailureKind::kNeedsClientAuth;
}
return FailureKind::kStreamFailed;
}
void HttpStreamPool::Job::NotifyFailure() {
is_failing_ = true;
NotifyPreconnectsComplete(error_to_notify_);
NotifyStreamRequestOfFailure();
// `this` may be deleted.
}
void HttpStreamPool::Job::NotifyStreamRequestOfFailure() {
CHECK(is_failing_);
RequestEntry* entry = ExtractFirstRequestToNotify();
if (!entry) {
// TODO(crbug.com/346835898): Ensure that MaybeComplete() is called
// eventually.
return;
}
base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE, base::BindOnce(&Job::NotifyStreamRequestOfFailure,
weak_ptr_factory_.GetWeakPtr()));
FailureKind kind = DetermineFailureKind();
switch (kind) {
case FailureKind::kStreamFailed:
entry->delegate()->OnStreamFailed(error_to_notify_, net_error_details_,
proxy_info_, resolve_error_info_);
break;
case FailureKind::kCertifcateError:
CHECK(cert_error_ssl_info_.has_value());
entry->delegate()->OnCertificateError(error_to_notify_,
*cert_error_ssl_info_);
break;
case FailureKind::kNeedsClientAuth:
CHECK(client_auth_cert_info_.get());
entry->delegate()->OnNeedsClientAuth(client_auth_cert_info_.get());
break;
}
// `this` may be deleted.
}
void HttpStreamPool::Job::NotifyPreconnectsComplete(int rv) {
while (!preconnects_.empty()) {
std::unique_ptr<PreconnectEntry> entry =
std::move(preconnects_.extract(preconnects_.begin()).value());
base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE, base::BindOnce(std::move(entry->callback), rv));
}
if (preconnects_.empty()) {
base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE,
base::BindOnce(&Job::MaybeComplete, weak_ptr_factory_.GetWeakPtr()));
}
}
void HttpStreamPool::Job::ProcessPreconnectsAfterAttemptComplete(int rv) {
std::vector<PreconnectEntry*> completed;
for (auto& entry : preconnects_) {
CHECK_GT(entry->num_streams, 0u);
--entry->num_streams;
if (rv != OK) {
entry->result = rv;
}
if (entry->num_streams == 0) {
completed.emplace_back(entry.get());
}
}
for (auto* entry_ptr : completed) {
auto it = preconnects_.find(entry_ptr);
CHECK(it != preconnects_.end());
std::unique_ptr<PreconnectEntry> entry =
std::move(preconnects_.extract(it).value());
base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE, base::BindOnce(std::move(entry->callback), entry->result));
}
if (preconnects_.empty()) {
base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE,
base::BindOnce(&Job::MaybeComplete, weak_ptr_factory_.GetWeakPtr()));
}
}
void HttpStreamPool::Job::CreateTextBasedStreamAndNotify(
std::unique_ptr<StreamSocket> stream_socket,
LoadTimingInfo::ConnectTiming connect_timing) {
NextProto negotiated_protocol = stream_socket->GetNegotiatedProtocol();
CHECK_NE(negotiated_protocol, NextProto::kProtoHTTP2);
std::unique_ptr<HttpStream> http_stream = group_->CreateTextBasedStream(
std::move(stream_socket), std::move(connect_timing));
NotifyStreamReady(std::move(http_stream), negotiated_protocol);
// `this` may be deleted.
}
void HttpStreamPool::Job::CreateSpdyStreamAndNotify() {
CHECK(spdy_session_);
CHECK(!is_canceling_requests_);
CHECK(!is_failing_);
// If there are more than one remaining request, post a task to create
// HttpStreams for these requests.
if (requests_.size() > 1) {
base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE, base::BindOnce(&Job::CreateSpdyStreamAndNotify,
weak_ptr_factory_.GetWeakPtr()));
}
std::set<std::string> dns_aliases =
http_network_session()->spdy_session_pool()->GetDnsAliasesForSessionKey(
spdy_session_key());
auto http_stream = std::make_unique<SpdyHttpStream>(
spdy_session_, net_log().source(), std::move(dns_aliases));
NotifyStreamReady(std::move(http_stream), NextProto::kProtoHTTP2);
// `this` may be deleted.
}
void HttpStreamPool::Job::NotifyStreamReady(std::unique_ptr<HttpStream> stream,
NextProto negotiated_protocol) {
RequestEntry* entry = ExtractFirstRequestToNotify();
if (!entry) {
// The ownership of the stream will be moved to the group as `stream` is
// going to be destructed.
return;
}
entry->request()->Complete(negotiated_protocol,
ALTERNATE_PROTOCOL_USAGE_UNSPECIFIED_REASON);
entry->delegate()->OnStreamReady(proxy_info_, std::move(stream));
}
HttpStreamPool::Job::RequestEntry*
HttpStreamPool::Job::ExtractFirstRequestToNotify() {
if (requests_.empty()) {
return nullptr;
}
std::unique_ptr<RequestEntry> entry = requests_.Erase(requests_.FirstMax());
RequestEntry* raw_entry = entry.get();
notified_requests_.emplace(std::move(entry));
return raw_entry;
}
void HttpStreamPool::Job::SetRequestPriority(HttpStreamRequest* request,
RequestPriority priority) {
for (RequestQueue::Pointer pointer = requests_.FirstMax(); !pointer.is_null();
pointer = requests_.GetNextTowardsLastMin(pointer)) {
if (pointer.value()->request() == request) {
if (pointer.priority() == priority) {
break;
}
std::unique_ptr<RequestEntry> entry = requests_.Erase(pointer);
requests_.Insert(std::move(entry), priority);
break;
}
}
MaybeChangeServiceEndpointRequestPriority();
}
void HttpStreamPool::Job::OnRequestComplete(RequestEntry* entry) {
auto notified_it = notified_requests_.find(entry);
if (notified_it != notified_requests_.end()) {
notified_requests_.erase(notified_it);
} else {
for (RequestQueue::Pointer pointer = requests_.FirstMax();
!pointer.is_null();
pointer = requests_.GetNextTowardsLastMin(pointer)) {
if (pointer.value()->request() == entry->request()) {
requests_.Erase(pointer);
break;
}
}
}
MaybeComplete();
}
void HttpStreamPool::Job::OnInFlightAttemptComplete(
InFlightAttempt* raw_attempt,
int rv) {
net_log().AddEventReferencingSource(
NetLogEventType::HTTP_STREAM_POOL_JOB_ATTEMPT_END,
raw_attempt->attempt->net_log().source());
raw_attempt->slow_timer.Stop();
if (raw_attempt->is_slow) {
CHECK_GT(slow_attempt_count_, 0u);
--slow_attempt_count_;
}
auto it = in_flight_attempts_.find(raw_attempt);
CHECK(it != in_flight_attempts_.end());
std::unique_ptr<InFlightAttempt> in_flight_attempt =
std::move(in_flight_attempts_.extract(it).value());
pool()->DecrementTotalConnectingStreamCount();
if (rv != OK) {
HandleAttemptFailure(std::move(in_flight_attempt), rv);
return;
}
LoadTimingInfo::ConnectTiming connect_timing =
in_flight_attempt->attempt->connect_timing();
connect_timing.domain_lookup_start = dns_resolution_start_time_;
// If the attempt started before DNS resolution completion, `connect_start`
// could be smaller than `dns_resolution_end_time_`. Use the smallest one.
connect_timing.domain_lookup_end =
dns_resolution_end_time_.is_null()
? connect_timing.connect_start
: std::min(connect_timing.connect_start, dns_resolution_end_time_);
std::unique_ptr<StreamSocket> stream_socket =
in_flight_attempt->attempt->ReleaseStreamSocket();
CHECK(stream_socket);
spdy_throttle_timer_.Stop();
if (stream_socket->GetNegotiatedProtocol() == NextProto::kProtoHTTP2) {
CHECK(!spdy_session_pool()->FindAvailableSession(
group_->spdy_session_key(), enable_ip_based_pooling_,
/*is_websocket=*/false, net_log()));
std::unique_ptr<HttpStreamPoolHandle> handle = group_->CreateHandle(
std::move(stream_socket), std::move(connect_timing));
int create_result =
spdy_session_pool()->CreateAvailableSessionFromSocketHandle(
spdy_session_key(), std::move(handle), net_log(), &spdy_session_);
if (create_result != OK) {
HandleAttemptFailure(std::move(in_flight_attempt), create_result);
return;
}
CHECK(spdy_session_);
// Cancel in-flight requests and close idle streams as we don't need them
// anymore.
group_->Refresh();
NotifyPreconnectsComplete(OK);
CreateSpdyStreamAndNotify();
return;
}
ProcessPreconnectsAfterAttemptComplete(rv);
CHECK_NE(stream_socket->GetNegotiatedProtocol(), NextProto::kProtoHTTP2);
CreateTextBasedStreamAndNotify(std::move(stream_socket),
std::move(connect_timing));
}
void HttpStreamPool::Job::OnInFlightAttemptTcpHandshakeComplete(
InFlightAttempt* raw_attempt,
int rv) {
auto it = in_flight_attempts_.find(raw_attempt);
CHECK(it != in_flight_attempts_.end());
if (raw_attempt->is_slow || !raw_attempt->slow_timer.IsRunning()) {
return;
}
raw_attempt->slow_timer.Stop();
}
void HttpStreamPool::Job::OnInFlightAttemptSlow(InFlightAttempt* raw_attempt) {
auto it = in_flight_attempts_.find(raw_attempt);
CHECK(it != in_flight_attempts_.end());
raw_attempt->is_slow = true;
++slow_attempt_count_;
slow_ip_endpoints_.emplace(raw_attempt->attempt->ip_endpoint());
prefer_ipv6_ = !raw_attempt->attempt->ip_endpoint().address().IsIPv6();
MaybeAttemptConnection();
}
void HttpStreamPool::Job::HandleAttemptFailure(
std::unique_ptr<InFlightAttempt> in_flight_attempt,
int rv) {
CHECK_NE(rv, ERR_IO_PENDING);
failed_ip_endpoints_.emplace(in_flight_attempt->attempt->ip_endpoint());
ProcessPreconnectsAfterAttemptComplete(rv);
if (is_failing_) {
// `this` has already failed and is notifying requests to the failure.
return;
}
error_to_notify_ = rv;
if (rv == ERR_SSL_CLIENT_AUTH_CERT_NEEDED) {
CHECK(UsingTls());
client_auth_cert_info_ = in_flight_attempt->attempt->GetCertRequestInfo();
in_flight_attempt.reset();
NotifyFailure();
return;
}
if (IsCertificateError(rv)) {
// When a certificate error happened for an attempt, notifies all requests
// of the error.
CHECK(UsingTls());
CHECK(in_flight_attempt->attempt->stream_socket());
SSLInfo ssl_info;
bool has_ssl_info =
in_flight_attempt->attempt->stream_socket()->GetSSLInfo(&ssl_info);
CHECK(has_ssl_info);
cert_error_ssl_info_ = ssl_info;
in_flight_attempt.reset();
NotifyFailure();
} else {
in_flight_attempt.reset();
MaybeAttemptConnection();
}
}
void HttpStreamPool::Job::OnSpdyThrottleDelayPassed() {
CHECK(!spdy_throttle_delay_passed_);
spdy_throttle_delay_passed_ = true;
MaybeAttemptConnection();
}
void HttpStreamPool::Job::MaybeComplete() {
if (!requests_.empty() || !notified_requests_.empty() ||
!preconnects_.empty() || !in_flight_attempts_.empty()) {
return;
}
group_->OnJobComplete();
// `this` is deleted.
}
} // namespace net