| // Copyright 2024 The Chromium Authors |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "net/http/http_stream_pool_job.h" |
| |
| #include <memory> |
| #include <utility> |
| |
| #include "base/containers/contains.h" |
| #include "base/functional/bind.h" |
| #include "base/notreached.h" |
| #include "base/task/sequenced_task_runner.h" |
| #include "base/time/time.h" |
| #include "base/timer/timer.h" |
| #include "net/base/completion_once_callback.h" |
| #include "net/base/host_port_pair.h" |
| #include "net/base/load_states.h" |
| #include "net/base/load_timing_info.h" |
| #include "net/base/net_errors.h" |
| #include "net/base/request_priority.h" |
| #include "net/dns/host_resolver.h" |
| #include "net/http/http_network_session.h" |
| #include "net/http/http_stream_key.h" |
| #include "net/http/http_stream_pool_group.h" |
| #include "net/http/http_stream_pool_handle.h" |
| #include "net/log/net_log_with_source.h" |
| #include "net/socket/stream_attempt.h" |
| #include "net/socket/tcp_stream_attempt.h" |
| #include "net/socket/tls_stream_attempt.h" |
| #include "net/spdy/spdy_http_stream.h" |
| #include "net/spdy/spdy_session.h" |
| #include "net/ssl/ssl_cert_request_info.h" |
| |
| namespace net { |
| |
| HttpStreamPool::Job::RequestEntry::RequestEntry(Job* job) : job_(job) { |
| CHECK(job_); |
| } |
| |
| std::unique_ptr<HttpStreamRequest> |
| HttpStreamPool::Job::RequestEntry::CreateRequest( |
| HttpStreamRequest::Delegate* delegate, |
| const NetLogWithSource& net_log) { |
| CHECK(!delegate_); |
| CHECK(delegate); |
| |
| delegate_ = delegate; |
| |
| auto request = std::make_unique<HttpStreamRequest>( |
| this, /*websocket_handshake_stream_create_helper=*/nullptr, net_log, |
| HttpStreamRequest::HTTP_STREAM); |
| |
| request_ = request.get(); |
| return request; |
| } |
| |
| LoadState HttpStreamPool::Job::RequestEntry::GetLoadState() const { |
| CHECK(request_); |
| if (request_->completed()) { |
| return LOAD_STATE_IDLE; |
| } |
| return job_->GetLoadState(); |
| } |
| |
| void HttpStreamPool::Job::RequestEntry::OnRequestComplete() { |
| CHECK(request_); |
| CHECK(delegate_); |
| request_ = nullptr; |
| delegate_ = nullptr; |
| job_->OnRequestComplete(this); |
| // `this` is deleted. |
| } |
| |
| int HttpStreamPool::Job::RequestEntry::RestartTunnelWithProxyAuth() { |
| NOTREACHED_NORETURN(); |
| } |
| |
| void HttpStreamPool::Job::RequestEntry::SetPriority(RequestPriority priority) { |
| CHECK(request_); |
| job_->SetRequestPriority(request_, priority); |
| } |
| |
| HttpStreamPool::Job::RequestEntry::~RequestEntry() = default; |
| |
| // Represents an in-flight stream attempt. |
| struct HttpStreamPool::Job::InFlightAttempt { |
| explicit InFlightAttempt(std::unique_ptr<StreamAttempt> attempt) |
| : attempt(std::move(attempt)) {} |
| |
| InFlightAttempt(const InFlightAttempt&) = delete; |
| InFlightAttempt& operator=(const InFlightAttempt&) = delete; |
| |
| ~InFlightAttempt() = default; |
| |
| std::unique_ptr<StreamAttempt> attempt; |
| // Timer to start a next attempt. When fired, `this` is treated as a slow |
| // attempt but `this` is not timed out yet. |
| base::OneShotTimer slow_timer; |
| bool is_slow = false; |
| }; |
| |
| // Represents a preconnect request. |
| struct HttpStreamPool::Job::PreconnectEntry { |
| PreconnectEntry(size_t num_streams, CompletionOnceCallback callback) |
| : num_streams(num_streams), callback(std::move(callback)) {} |
| |
| PreconnectEntry(const PreconnectEntry&) = delete; |
| PreconnectEntry& operator=(const PreconnectEntry&) = delete; |
| |
| ~PreconnectEntry() = default; |
| |
| size_t num_streams; |
| CompletionOnceCallback callback; |
| // Set to the latest error when errors happened. |
| int result = OK; |
| }; |
| |
| HttpStreamPool::Job::Job(Group* group, NetLog* net_log) |
| : group_(group), |
| net_log_(NetLogWithSource::Make(net_log, |
| NetLogSourceType::HTTP_STREAM_POOL_JOB)), |
| requests_(NUM_PRIORITIES) { |
| net_log_.BeginEventReferencingSource( |
| NetLogEventType::HTTP_STREAM_POOL_JOB_ALIVE, group_->net_log().source()); |
| group_->net_log().AddEventReferencingSource( |
| NetLogEventType::HTTP_STREAM_POOL_GROUP_JOB_CREATED, net_log_.source()); |
| proxy_info_.UseDirect(); |
| CHECK(group_); |
| } |
| |
| HttpStreamPool::Job::~Job() { |
| net_log().EndEvent(NetLogEventType::HTTP_STREAM_POOL_JOB_ALIVE); |
| group_->net_log().AddEventReferencingSource( |
| NetLogEventType::HTTP_STREAM_POOL_GROUP_JOB_DESTROYED, net_log_.source()); |
| } |
| |
| std::unique_ptr<HttpStreamRequest> HttpStreamPool::Job::RequestStream( |
| HttpStreamRequest::Delegate* delegate, |
| RequestPriority priority, |
| const std::vector<SSLConfig::CertAndStatus>& allowed_bad_certs, |
| bool enable_ip_based_pooling, |
| const NetLogWithSource& net_log) { |
| auto entry = std::make_unique<RequestEntry>(this); |
| std::unique_ptr<HttpStreamRequest> request = |
| entry->CreateRequest(delegate, net_log); |
| requests_.Insert(std::move(entry), priority); |
| |
| if (is_failing_) { |
| // `this` is failing, notify the failure. |
| base::SequencedTaskRunner::GetCurrentDefault()->PostTask( |
| FROM_HERE, base::BindOnce(&Job::NotifyStreamRequestOfFailure, |
| weak_ptr_factory_.GetWeakPtr())); |
| return request; |
| } |
| |
| if (!enable_ip_based_pooling) { |
| enable_ip_based_pooling_ = enable_ip_based_pooling; |
| } |
| |
| MaybeChangeServiceEndpointRequestPriority(); |
| |
| // Check if we already have SPDY session. When found, notify the request that |
| // an HttpStream is ready. Use PostTask() since `delegate` doesn't expect the |
| // request finishes synchronously. |
| if (!spdy_session_) { |
| spdy_session_ = |
| http_network_session()->spdy_session_pool()->FindAvailableSession( |
| spdy_session_key(), enable_ip_based_pooling_, |
| /*is_websocket=*/false, net_log); |
| } |
| if (spdy_session_) { |
| base::SequencedTaskRunner::GetCurrentDefault()->PostTask( |
| FROM_HERE, base::BindOnce(&Job::CreateSpdyStreamAndNotify, |
| weak_ptr_factory_.GetWeakPtr())); |
| return request; |
| } |
| |
| // Check idle streams. If found, notify the request that an HttpStream is |
| // ready. Use PostTask() since `delegate` doesn't expect the request finishes |
| // synchronously. |
| std::unique_ptr<StreamSocket> stream_socket = group_->GetIdleStreamSocket(); |
| if (stream_socket) { |
| base::SequencedTaskRunner::GetCurrentDefault()->PostTask( |
| FROM_HERE, |
| base::BindOnce(&Job::CreateTextBasedStreamAndNotify, |
| weak_ptr_factory_.GetWeakPtr(), std::move(stream_socket), |
| LoadTimingInfo::ConnectTiming())); |
| return request; |
| } |
| |
| allowed_bad_certs_ = allowed_bad_certs; |
| |
| StartInternal(priority); |
| return request; |
| } |
| |
| int HttpStreamPool::Job::Preconnect(size_t num_streams, |
| CompletionOnceCallback callback) { |
| if (is_failing_) { |
| return error_to_notify_; |
| } |
| |
| if (spdy_session_pool()->HasAvailableSession(spdy_session_key(), |
| /*is_websocket=*/false)) { |
| CHECK(!RequiresHTTP11()); |
| return OK; |
| } |
| |
| if (group_->ActiveStreamSocketCount() >= num_streams) { |
| return OK; |
| } |
| |
| auto entry = |
| std::make_unique<PreconnectEntry>(num_streams, std::move(callback)); |
| preconnects_.emplace(std::move(entry)); |
| |
| StartInternal(RequestPriority::IDLE); |
| return ERR_IO_PENDING; |
| } |
| |
| void HttpStreamPool::Job::OnServiceEndpointsUpdated() { |
| ProcessServiceEndpointChanges(); |
| } |
| |
| void HttpStreamPool::Job::OnServiceEndpointRequestFinished(int rv) { |
| CHECK(!service_endpoint_request_finished_); |
| CHECK(service_endpoint_request_); |
| |
| service_endpoint_request_finished_ = true; |
| dns_resolution_end_time_ = base::TimeTicks::Now(); |
| resolve_error_info_ = service_endpoint_request_->GetResolveErrorInfo(); |
| |
| if (rv != OK) { |
| error_to_notify_ = rv; |
| NotifyFailure(); |
| return; |
| } |
| |
| CHECK(!service_endpoint_request_->GetEndpointResults().empty()); |
| ProcessServiceEndpointChanges(); |
| } |
| |
| int HttpStreamPool::Job::WaitForSSLConfigReady( |
| CompletionOnceCallback callback) { |
| if (ssl_config_.has_value()) { |
| return OK; |
| } |
| |
| ssl_config_waiting_callbacks_.emplace_back(std::move(callback)); |
| return ERR_IO_PENDING; |
| } |
| |
| SSLConfig HttpStreamPool::Job::GetSSLConfig() { |
| CHECK(ssl_config_.has_value()); |
| return *ssl_config_; |
| } |
| |
| void HttpStreamPool::Job::ProcessPendingRequest() { |
| if (PendingRequestCount() == 0) { |
| return; |
| } |
| |
| std::unique_ptr<StreamSocket> stream_socket = group_->GetIdleStreamSocket(); |
| if (stream_socket) { |
| CreateTextBasedStreamAndNotify(std::move(stream_socket), |
| LoadTimingInfo::ConnectTiming()); |
| return; |
| } |
| |
| MaybeAttemptConnection(/*max_attempts=*/1); |
| } |
| |
| void HttpStreamPool::Job::CancelInFlightAttempts() { |
| pool()->DecrementTotalConnectingStreamCount(in_flight_attempts_.size()); |
| in_flight_attempts_.clear(); |
| slow_attempt_count_ = 0; |
| } |
| |
| void HttpStreamPool::Job::CancelRequests(int error) { |
| error_to_notify_ = error; |
| is_canceling_requests_ = true; |
| NotifyFailure(); |
| } |
| |
| size_t HttpStreamPool::Job::PendingRequestCount() const { |
| return PendingCountInternal(requests_.size()); |
| } |
| |
| size_t HttpStreamPool::Job::PendingPreconnectCount() const { |
| size_t num_streams = 0; |
| for (const auto& entry : preconnects_) { |
| num_streams = std::max(num_streams, entry->num_streams); |
| } |
| return PendingCountInternal(num_streams); |
| } |
| |
| const HttpStreamKey& HttpStreamPool::Job::stream_key() const { |
| return group_->stream_key(); |
| } |
| |
| const SpdySessionKey& HttpStreamPool::Job::spdy_session_key() const { |
| return group_->spdy_session_key(); |
| } |
| |
| HttpNetworkSession* HttpStreamPool::Job::http_network_session() { |
| return group_->http_network_session(); |
| } |
| |
| SpdySessionPool* HttpStreamPool::Job::spdy_session_pool() { |
| return http_network_session()->spdy_session_pool(); |
| } |
| |
| HttpStreamPool* HttpStreamPool::Job::pool() { |
| return group_->pool(); |
| } |
| |
| const HttpStreamPool* HttpStreamPool::Job::pool() const { |
| return group_->pool(); |
| } |
| |
| const NetLogWithSource& HttpStreamPool::Job::net_log() { |
| return net_log_; |
| } |
| |
| bool HttpStreamPool::Job::UsingTls() const { |
| return GURL::SchemeIsCryptographic(stream_key().destination().scheme()); |
| } |
| |
| bool HttpStreamPool::Job::RequiresHTTP11() { |
| return http_network_session()->http_server_properties()->RequiresHTTP11( |
| stream_key().destination(), stream_key().network_anonymization_key()); |
| } |
| |
| LoadState HttpStreamPool::Job::GetLoadState() const { |
| if (group_->ReachedMaxStreamLimit()) { |
| return LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET; |
| } |
| |
| if (pool()->ReachedMaxStreamLimit()) { |
| return LOAD_STATE_WAITING_FOR_STALLED_SOCKET_POOL; |
| } |
| |
| LoadState load_state = LOAD_STATE_IDLE; |
| |
| // When there are in-flight attempts, use most advanced one. |
| for (const auto& in_flight_attempt : in_flight_attempts_) { |
| load_state = |
| std::max(load_state, in_flight_attempt->attempt->GetLoadState()); |
| // There should not be a load state later than LOAD_STATE_SSL_HANDSHAKE. |
| if (load_state == LOAD_STATE_SSL_HANDSHAKE) { |
| break; |
| } |
| } |
| |
| if (load_state != LOAD_STATE_IDLE) { |
| return load_state; |
| } |
| |
| if (service_endpoint_request_ && !service_endpoint_request_finished_) { |
| return LOAD_STATE_RESOLVING_HOST; |
| } |
| |
| return LOAD_STATE_IDLE; |
| } |
| |
| RequestPriority HttpStreamPool::Job::GetPriority() const { |
| CHECK(!requests_.empty()); |
| return static_cast<RequestPriority>(requests_.FirstMax().priority()); |
| } |
| |
| bool HttpStreamPool::Job::IsStalledByPoolLimit() { |
| if (!GetIPEndPointToAttempt().has_value()) { |
| return false; |
| } |
| |
| switch (CanAttemptConnection()) { |
| case CanAttemptResult::kAttempt: |
| case CanAttemptResult::kReachedPoolLimit: |
| return true; |
| case CanAttemptResult::kNoPendingRequest: |
| case CanAttemptResult::kThrottledForSpdy: |
| case CanAttemptResult::kReachedGroupLimit: |
| return false; |
| } |
| } |
| |
| void HttpStreamPool::Job::StartInternal(RequestPriority priority) { |
| if (service_endpoint_request_ || service_endpoint_request_finished_) { |
| MaybeAttemptConnection(); |
| } else { |
| ResolveServiceEndpoint(priority); |
| } |
| } |
| |
| void HttpStreamPool::Job::ResolveServiceEndpoint( |
| RequestPriority initial_priority) { |
| CHECK(!service_endpoint_request_); |
| HostResolver::ResolveHostParameters parameters; |
| parameters.initial_priority = initial_priority; |
| parameters.secure_dns_policy = stream_key().secure_dns_policy(); |
| service_endpoint_request_ = |
| http_network_session()->host_resolver()->CreateServiceEndpointRequest( |
| HostResolver::Host(stream_key().destination()), |
| stream_key().network_anonymization_key(), net_log(), |
| std::move(parameters)); |
| |
| dns_resolution_start_time_ = base::TimeTicks::Now(); |
| int rv = service_endpoint_request_->Start(this); |
| if (rv != ERR_IO_PENDING) { |
| OnServiceEndpointRequestFinished(rv); |
| } |
| } |
| |
| void HttpStreamPool::Job::MaybeChangeServiceEndpointRequestPriority() { |
| if (service_endpoint_request_ && !service_endpoint_request_finished_) { |
| service_endpoint_request_->ChangeRequestPriority(GetPriority()); |
| } |
| } |
| |
| void HttpStreamPool::Job::ProcessServiceEndpointChanges() { |
| if (CanUseExistingSessionAfterEndpointChanges()) { |
| return; |
| } |
| MaybeCalculateSSLConfig(); |
| MaybeAttemptConnection(); |
| } |
| |
| bool HttpStreamPool::Job::CanUseExistingSessionAfterEndpointChanges() { |
| CHECK(service_endpoint_request_); |
| if (spdy_session_) { |
| return true; |
| } |
| |
| if (!enable_ip_based_pooling_) { |
| return false; |
| } |
| |
| for (const auto& endpoint : service_endpoint_request_->GetEndpointResults()) { |
| spdy_session_ = |
| spdy_session_pool()->FindMatchingIpSessionForServiceEndpoint( |
| spdy_session_key(), endpoint, |
| service_endpoint_request_->GetDnsAliasResults()); |
| if (spdy_session_) { |
| group_->Refresh(); |
| // Use PostTask() because we could reach here from RequestStream() |
| // synchronously when the DNS resolution finishes immediately. |
| base::SequencedTaskRunner::GetCurrentDefault()->PostTask( |
| FROM_HERE, base::BindOnce(&Job::CreateSpdyStreamAndNotify, |
| weak_ptr_factory_.GetWeakPtr())); |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| |
| void HttpStreamPool::Job::MaybeCalculateSSLConfig() { |
| if (!UsingTls() || ssl_config_.has_value()) { |
| return; |
| } |
| |
| CHECK(service_endpoint_request_); |
| if (!service_endpoint_request_->EndpointsCryptoReady()) { |
| return; |
| } |
| |
| SSLConfig ssl_config; |
| |
| ssl_config.allowed_bad_certs = allowed_bad_certs_; |
| ssl_config.privacy_mode = stream_key().privacy_mode(); |
| ssl_config.disable_cert_verification_network_fetches = |
| stream_key().disable_cert_network_fetches(); |
| ssl_config.early_data_enabled = |
| http_network_session()->params().enable_early_data; |
| |
| ssl_config.alpn_protos = http_network_session()->GetAlpnProtos(); |
| ssl_config.application_settings = |
| http_network_session()->GetApplicationSettings(); |
| http_network_session()->http_server_properties()->MaybeForceHTTP11( |
| stream_key().destination(), stream_key().network_anonymization_key(), |
| &ssl_config); |
| |
| ssl_config.ignore_certificate_errors = |
| http_network_session()->params().ignore_certificate_errors; |
| ssl_config.network_anonymization_key = |
| stream_key().network_anonymization_key(); |
| |
| // TODO(crbug.com/346835898): Support ECH. |
| |
| ssl_config_.emplace(std::move(ssl_config)); |
| |
| // Restart slow timer for in-flight attempts that have already completed |
| // TCP handshakes. |
| for (auto& in_flight_attempt : in_flight_attempts_) { |
| if (!in_flight_attempt->is_slow && |
| !in_flight_attempt->slow_timer.IsRunning()) { |
| // TODO(crbug.com/346835898): Should we use a different delay other than |
| // the connection attempt delay? |
| // base::Unretained() is safe here because `this` owns the |
| // `in_flight_attempt` and `slow_timer`. |
| in_flight_attempt->slow_timer.Start( |
| FROM_HERE, kConnectionAttemptDelay, |
| base::BindOnce(&Job::OnInFlightAttemptSlow, base::Unretained(this), |
| in_flight_attempt.get())); |
| } |
| } |
| |
| for (auto& callback : ssl_config_waiting_callbacks_) { |
| std::move(callback).Run(OK); |
| } |
| ssl_config_waiting_callbacks_.clear(); |
| } |
| |
| void HttpStreamPool::Job::MaybeAttemptConnection( |
| std::optional<size_t> max_attempts) { |
| if (PendingRequestCount() == 0 && preconnects_.empty()) { |
| // There are no requests waiting for streams. |
| return; |
| } |
| |
| CHECK(!preconnects_.empty() || group_->IdleStreamSocketCount() == 0); |
| |
| // TODO(crbug.com/346835898): Ensure that we don't attempt connections when |
| // failing or creating HttpStream on top of a SPDY session. |
| CHECK(!is_failing_); |
| CHECK(!spdy_session_); |
| |
| std::optional<IPEndPoint> ip_endpoint = GetIPEndPointToAttempt(); |
| if (!ip_endpoint.has_value()) { |
| if (service_endpoint_request_finished_ && in_flight_attempts_.empty()) { |
| // Tried all endpoints. |
| NotifyFailure(); |
| } |
| return; |
| } |
| |
| // There might be multiple pending requests. Make attempts as much as needed |
| // and allowed. |
| size_t num_attempts = 0; |
| const bool using_tls = UsingTls(); |
| while (IsConnectionAttemptReady()) { |
| std::unique_ptr<StreamAttempt> attempt; |
| if (using_tls) { |
| attempt = std::make_unique<TlsStreamAttempt>( |
| pool()->stream_attempt_params(), *ip_endpoint, |
| HostPortPair::FromSchemeHostPort(stream_key().destination()), |
| /*ssl_config_provider=*/this); |
| } else { |
| attempt = std::make_unique<TcpStreamAttempt>( |
| pool()->stream_attempt_params(), *ip_endpoint); |
| } |
| net_log().AddEventReferencingSource( |
| NetLogEventType::HTTP_STREAM_POOL_JOB_ATTEMPT_START, |
| attempt->net_log().source()); |
| net_log().AddEvent( |
| NetLogEventType::HTTP_STREAM_POOL_JOB_ATTEMPT_START, [&] { |
| base::Value::Dict dict; |
| dict.Set("num_requests", static_cast<int>(requests_.size())); |
| dict.Set("num_preconnects", static_cast<int>(preconnects_.size())); |
| dict.Set("num_inflight_attempts", |
| static_cast<int>(in_flight_attempts_.size())); |
| dict.Set("num_slow_attempts", static_cast<int>(slow_attempt_count_)); |
| attempt->net_log().source().AddToEventParameters(dict); |
| return dict; |
| }); |
| |
| auto in_flight_attempt = |
| std::make_unique<InFlightAttempt>(std::move(attempt)); |
| InFlightAttempt* raw_attempt = in_flight_attempt.get(); |
| in_flight_attempts_.emplace(std::move(in_flight_attempt)); |
| pool()->IncrementTotalConnectingStreamCount(); |
| |
| int rv = raw_attempt->attempt->Start(base::BindOnce( |
| &Job::OnInFlightAttemptComplete, base::Unretained(this), raw_attempt)); |
| // Add NetLog dependency after Start() so that the first event of the |
| // attempt can have meaningful description in the NetLog viewer. |
| raw_attempt->attempt->net_log().AddEventReferencingSource( |
| NetLogEventType::STREAM_ATTEMPT_BOUND_TO_POOL, net_log().source()); |
| if (rv != ERR_IO_PENDING) { |
| base::SequencedTaskRunner::GetCurrentDefault()->PostTask( |
| FROM_HERE, base::BindOnce(&Job::OnInFlightAttemptComplete, |
| base::Unretained(this), raw_attempt, rv)); |
| } else { |
| raw_attempt->slow_timer.Start( |
| FROM_HERE, kConnectionAttemptDelay, |
| base::BindOnce(&Job::OnInFlightAttemptSlow, base::Unretained(this), |
| raw_attempt)); |
| if (using_tls) { |
| static_cast<TlsStreamAttempt*>(raw_attempt->attempt.get()) |
| ->SetTcpHandshakeCompletionCallback( |
| base::BindOnce(&Job::OnInFlightAttemptTcpHandshakeComplete, |
| base::Unretained(this), raw_attempt)); |
| } |
| } |
| |
| ++num_attempts; |
| if (max_attempts.has_value() && num_attempts >= *max_attempts) { |
| break; |
| } |
| } |
| } |
| |
| bool HttpStreamPool::Job::IsConnectionAttemptReady() { |
| switch (CanAttemptConnection()) { |
| case CanAttemptResult::kAttempt: |
| return true; |
| case CanAttemptResult::kNoPendingRequest: |
| return false; |
| case CanAttemptResult::kThrottledForSpdy: |
| // TODO(crbug.com/346835898): Consider throttling less aggressively (e.g. |
| // allow TCP handshake but throttle TLS handshake) so that endpoints we've |
| // used HTTP/2 on aren't penalised on slow or lossy connections. |
| if (!spdy_throttle_timer_.IsRunning()) { |
| spdy_throttle_timer_.Start( |
| FROM_HERE, kSpdyThrottleDelay, |
| base::BindOnce(&Job::OnSpdyThrottleDelayPassed, |
| base::Unretained(this))); |
| } |
| return false; |
| case CanAttemptResult::kReachedGroupLimit: |
| // TODO(crbug.com/346835898): Better to handle cases where we partially |
| // attempted some connections. |
| NotifyPreconnectsComplete(ERR_PRECONNECT_MAX_SOCKET_LIMIT); |
| return false; |
| case CanAttemptResult::kReachedPoolLimit: |
| // If we can't attempt connection due to the pool's limit, try to close an |
| // idle stream in the pool. |
| if (!pool()->CloseOneIdleStreamSocket()) { |
| // TODO(crbug.com/346835898): Better to handle cases where we partially |
| // attempted some connections. |
| NotifyPreconnectsComplete(ERR_PRECONNECT_MAX_SOCKET_LIMIT); |
| return false; |
| } |
| return true; |
| } |
| } |
| |
| HttpStreamPool::Job::CanAttemptResult |
| HttpStreamPool::Job::CanAttemptConnection() { |
| size_t pending_count = |
| std::max(PendingRequestCount(), PendingPreconnectCount()); |
| if (pending_count == 0) { |
| return CanAttemptResult::kNoPendingRequest; |
| } |
| |
| if (ShouldThrottleAttemptForSpdy()) { |
| return CanAttemptResult::kThrottledForSpdy; |
| } |
| |
| if (group_->ReachedMaxStreamLimit()) { |
| return CanAttemptResult::kReachedGroupLimit; |
| } |
| |
| if (pool()->ReachedMaxStreamLimit()) { |
| return CanAttemptResult::kReachedPoolLimit; |
| } |
| |
| return CanAttemptResult::kAttempt; |
| } |
| |
| bool HttpStreamPool::Job::ShouldThrottleAttemptForSpdy() { |
| if (!http_network_session()->http_server_properties()->GetSupportsSpdy( |
| stream_key().destination(), |
| stream_key().network_anonymization_key())) { |
| return false; |
| } |
| |
| CHECK(UsingTls()); |
| |
| // The first attempt should not be blocked. |
| if (in_flight_attempts_.empty()) { |
| return false; |
| } |
| |
| if (spdy_throttle_delay_passed_) { |
| return false; |
| } |
| |
| CHECK(!spdy_session_); |
| return true; |
| } |
| |
| size_t HttpStreamPool::Job::PendingCountInternal(size_t pending_count) const { |
| CHECK_GE(in_flight_attempts_.size(), slow_attempt_count_); |
| // When SPDY throttle delay passed, treat all in-flight attempts as non-slow, |
| // to avoid attempting connections more than requested. |
| // TODO(crbug.com/346835898): This behavior is tricky. Figure out a better |
| // way to handle this situation. |
| size_t slow_count = spdy_throttle_delay_passed_ ? 0 : slow_attempt_count_; |
| size_t non_slow_count = in_flight_attempts_.size() - slow_count; |
| // The number of in-flight, non-slow attempts could be larger than the number |
| // of requests (e.g. a request was cancelled in the middle of an attempt). |
| if (pending_count <= non_slow_count) { |
| return 0; |
| } |
| |
| return pending_count - non_slow_count; |
| } |
| |
| std::optional<IPEndPoint> HttpStreamPool::Job::GetIPEndPointToAttempt() { |
| if (!service_endpoint_request_ || |
| service_endpoint_request_->GetEndpointResults().empty()) { |
| return std::nullopt; |
| } |
| |
| // Look for an IPEndPoint from the preferred address family first. |
| for (auto& endpoint : service_endpoint_request_->GetEndpointResults()) { |
| std::optional<IPEndPoint> ip_endpoint = |
| prefer_ipv6_ ? FindPreferredIPEndpoint(endpoint.ipv6_endpoints) |
| : FindPreferredIPEndpoint(endpoint.ipv4_endpoints); |
| if (ip_endpoint.has_value()) { |
| return ip_endpoint; |
| } |
| } |
| |
| // If there is no IPEndPoint from the preferred address family, check the |
| // another address family. |
| for (auto& endpoint : service_endpoint_request_->GetEndpointResults()) { |
| std::optional<IPEndPoint> ip_endpoint = |
| prefer_ipv6_ ? FindPreferredIPEndpoint(endpoint.ipv4_endpoints) |
| : FindPreferredIPEndpoint(endpoint.ipv6_endpoints); |
| if (ip_endpoint.has_value()) { |
| return ip_endpoint; |
| } |
| } |
| |
| return std::nullopt; |
| } |
| |
| std::optional<IPEndPoint> HttpStreamPool::Job::FindPreferredIPEndpoint( |
| const std::vector<IPEndPoint>& ip_endpoints) { |
| // Prefer the first unattempted endpoint in `ip_endpoints`. Allow to use |
| // the first slow endpoint when SPDY throttle delay passed. |
| |
| std::optional<IPEndPoint> slow_endpoint; |
| for (const auto& ip_endpoint : ip_endpoints) { |
| if (base::Contains(failed_ip_endpoints_, ip_endpoint)) { |
| continue; |
| } |
| if (base::Contains(slow_ip_endpoints_, ip_endpoint)) { |
| if (!slow_endpoint.has_value()) { |
| slow_endpoint = ip_endpoint; |
| } |
| continue; |
| } |
| return ip_endpoint; |
| } |
| |
| if (spdy_throttle_delay_passed_) { |
| return slow_endpoint; |
| } |
| return std::nullopt; |
| } |
| |
| HttpStreamPool::Job::FailureKind HttpStreamPool::Job::DetermineFailureKind() { |
| if (is_canceling_requests_) { |
| return FailureKind::kStreamFailed; |
| } |
| |
| if (IsCertificateError(error_to_notify_)) { |
| return FailureKind::kCertifcateError; |
| } |
| |
| if (error_to_notify_ == ERR_SSL_CLIENT_AUTH_CERT_NEEDED) { |
| return FailureKind::kNeedsClientAuth; |
| } |
| |
| return FailureKind::kStreamFailed; |
| } |
| |
| void HttpStreamPool::Job::NotifyFailure() { |
| is_failing_ = true; |
| NotifyPreconnectsComplete(error_to_notify_); |
| NotifyStreamRequestOfFailure(); |
| // `this` may be deleted. |
| } |
| |
| void HttpStreamPool::Job::NotifyStreamRequestOfFailure() { |
| CHECK(is_failing_); |
| |
| RequestEntry* entry = ExtractFirstRequestToNotify(); |
| if (!entry) { |
| // TODO(crbug.com/346835898): Ensure that MaybeComplete() is called |
| // eventually. |
| return; |
| } |
| |
| base::SequencedTaskRunner::GetCurrentDefault()->PostTask( |
| FROM_HERE, base::BindOnce(&Job::NotifyStreamRequestOfFailure, |
| weak_ptr_factory_.GetWeakPtr())); |
| |
| FailureKind kind = DetermineFailureKind(); |
| switch (kind) { |
| case FailureKind::kStreamFailed: |
| entry->delegate()->OnStreamFailed(error_to_notify_, net_error_details_, |
| proxy_info_, resolve_error_info_); |
| break; |
| case FailureKind::kCertifcateError: |
| CHECK(cert_error_ssl_info_.has_value()); |
| entry->delegate()->OnCertificateError(error_to_notify_, |
| *cert_error_ssl_info_); |
| break; |
| case FailureKind::kNeedsClientAuth: |
| CHECK(client_auth_cert_info_.get()); |
| entry->delegate()->OnNeedsClientAuth(client_auth_cert_info_.get()); |
| break; |
| } |
| // `this` may be deleted. |
| } |
| |
| void HttpStreamPool::Job::NotifyPreconnectsComplete(int rv) { |
| while (!preconnects_.empty()) { |
| std::unique_ptr<PreconnectEntry> entry = |
| std::move(preconnects_.extract(preconnects_.begin()).value()); |
| base::SequencedTaskRunner::GetCurrentDefault()->PostTask( |
| FROM_HERE, base::BindOnce(std::move(entry->callback), rv)); |
| } |
| if (preconnects_.empty()) { |
| base::SequencedTaskRunner::GetCurrentDefault()->PostTask( |
| FROM_HERE, |
| base::BindOnce(&Job::MaybeComplete, weak_ptr_factory_.GetWeakPtr())); |
| } |
| } |
| |
| void HttpStreamPool::Job::ProcessPreconnectsAfterAttemptComplete(int rv) { |
| std::vector<PreconnectEntry*> completed; |
| for (auto& entry : preconnects_) { |
| CHECK_GT(entry->num_streams, 0u); |
| --entry->num_streams; |
| if (rv != OK) { |
| entry->result = rv; |
| } |
| if (entry->num_streams == 0) { |
| completed.emplace_back(entry.get()); |
| } |
| } |
| |
| for (auto* entry_ptr : completed) { |
| auto it = preconnects_.find(entry_ptr); |
| CHECK(it != preconnects_.end()); |
| std::unique_ptr<PreconnectEntry> entry = |
| std::move(preconnects_.extract(it).value()); |
| base::SequencedTaskRunner::GetCurrentDefault()->PostTask( |
| FROM_HERE, base::BindOnce(std::move(entry->callback), entry->result)); |
| } |
| if (preconnects_.empty()) { |
| base::SequencedTaskRunner::GetCurrentDefault()->PostTask( |
| FROM_HERE, |
| base::BindOnce(&Job::MaybeComplete, weak_ptr_factory_.GetWeakPtr())); |
| } |
| } |
| |
| void HttpStreamPool::Job::CreateTextBasedStreamAndNotify( |
| std::unique_ptr<StreamSocket> stream_socket, |
| LoadTimingInfo::ConnectTiming connect_timing) { |
| NextProto negotiated_protocol = stream_socket->GetNegotiatedProtocol(); |
| CHECK_NE(negotiated_protocol, NextProto::kProtoHTTP2); |
| |
| std::unique_ptr<HttpStream> http_stream = group_->CreateTextBasedStream( |
| std::move(stream_socket), std::move(connect_timing)); |
| NotifyStreamReady(std::move(http_stream), negotiated_protocol); |
| // `this` may be deleted. |
| } |
| |
| void HttpStreamPool::Job::CreateSpdyStreamAndNotify() { |
| CHECK(spdy_session_); |
| CHECK(!is_canceling_requests_); |
| CHECK(!is_failing_); |
| |
| // If there are more than one remaining request, post a task to create |
| // HttpStreams for these requests. |
| if (requests_.size() > 1) { |
| base::SequencedTaskRunner::GetCurrentDefault()->PostTask( |
| FROM_HERE, base::BindOnce(&Job::CreateSpdyStreamAndNotify, |
| weak_ptr_factory_.GetWeakPtr())); |
| } |
| |
| std::set<std::string> dns_aliases = |
| http_network_session()->spdy_session_pool()->GetDnsAliasesForSessionKey( |
| spdy_session_key()); |
| auto http_stream = std::make_unique<SpdyHttpStream>( |
| spdy_session_, net_log().source(), std::move(dns_aliases)); |
| NotifyStreamReady(std::move(http_stream), NextProto::kProtoHTTP2); |
| // `this` may be deleted. |
| } |
| |
| void HttpStreamPool::Job::NotifyStreamReady(std::unique_ptr<HttpStream> stream, |
| NextProto negotiated_protocol) { |
| RequestEntry* entry = ExtractFirstRequestToNotify(); |
| if (!entry) { |
| // The ownership of the stream will be moved to the group as `stream` is |
| // going to be destructed. |
| return; |
| } |
| |
| entry->request()->Complete(negotiated_protocol, |
| ALTERNATE_PROTOCOL_USAGE_UNSPECIFIED_REASON); |
| entry->delegate()->OnStreamReady(proxy_info_, std::move(stream)); |
| } |
| |
| HttpStreamPool::Job::RequestEntry* |
| HttpStreamPool::Job::ExtractFirstRequestToNotify() { |
| if (requests_.empty()) { |
| return nullptr; |
| } |
| |
| std::unique_ptr<RequestEntry> entry = requests_.Erase(requests_.FirstMax()); |
| RequestEntry* raw_entry = entry.get(); |
| notified_requests_.emplace(std::move(entry)); |
| return raw_entry; |
| } |
| |
| void HttpStreamPool::Job::SetRequestPriority(HttpStreamRequest* request, |
| RequestPriority priority) { |
| for (RequestQueue::Pointer pointer = requests_.FirstMax(); !pointer.is_null(); |
| pointer = requests_.GetNextTowardsLastMin(pointer)) { |
| if (pointer.value()->request() == request) { |
| if (pointer.priority() == priority) { |
| break; |
| } |
| |
| std::unique_ptr<RequestEntry> entry = requests_.Erase(pointer); |
| requests_.Insert(std::move(entry), priority); |
| break; |
| } |
| } |
| |
| MaybeChangeServiceEndpointRequestPriority(); |
| } |
| |
| void HttpStreamPool::Job::OnRequestComplete(RequestEntry* entry) { |
| auto notified_it = notified_requests_.find(entry); |
| if (notified_it != notified_requests_.end()) { |
| notified_requests_.erase(notified_it); |
| } else { |
| for (RequestQueue::Pointer pointer = requests_.FirstMax(); |
| !pointer.is_null(); |
| pointer = requests_.GetNextTowardsLastMin(pointer)) { |
| if (pointer.value()->request() == entry->request()) { |
| requests_.Erase(pointer); |
| break; |
| } |
| } |
| } |
| MaybeComplete(); |
| } |
| |
| void HttpStreamPool::Job::OnInFlightAttemptComplete( |
| InFlightAttempt* raw_attempt, |
| int rv) { |
| net_log().AddEventReferencingSource( |
| NetLogEventType::HTTP_STREAM_POOL_JOB_ATTEMPT_END, |
| raw_attempt->attempt->net_log().source()); |
| raw_attempt->slow_timer.Stop(); |
| if (raw_attempt->is_slow) { |
| CHECK_GT(slow_attempt_count_, 0u); |
| --slow_attempt_count_; |
| } |
| |
| auto it = in_flight_attempts_.find(raw_attempt); |
| CHECK(it != in_flight_attempts_.end()); |
| std::unique_ptr<InFlightAttempt> in_flight_attempt = |
| std::move(in_flight_attempts_.extract(it).value()); |
| pool()->DecrementTotalConnectingStreamCount(); |
| |
| if (rv != OK) { |
| HandleAttemptFailure(std::move(in_flight_attempt), rv); |
| return; |
| } |
| |
| LoadTimingInfo::ConnectTiming connect_timing = |
| in_flight_attempt->attempt->connect_timing(); |
| connect_timing.domain_lookup_start = dns_resolution_start_time_; |
| // If the attempt started before DNS resolution completion, `connect_start` |
| // could be smaller than `dns_resolution_end_time_`. Use the smallest one. |
| connect_timing.domain_lookup_end = |
| dns_resolution_end_time_.is_null() |
| ? connect_timing.connect_start |
| : std::min(connect_timing.connect_start, dns_resolution_end_time_); |
| |
| std::unique_ptr<StreamSocket> stream_socket = |
| in_flight_attempt->attempt->ReleaseStreamSocket(); |
| CHECK(stream_socket); |
| |
| spdy_throttle_timer_.Stop(); |
| |
| if (stream_socket->GetNegotiatedProtocol() == NextProto::kProtoHTTP2) { |
| CHECK(!spdy_session_pool()->FindAvailableSession( |
| group_->spdy_session_key(), enable_ip_based_pooling_, |
| /*is_websocket=*/false, net_log())); |
| std::unique_ptr<HttpStreamPoolHandle> handle = group_->CreateHandle( |
| std::move(stream_socket), std::move(connect_timing)); |
| int create_result = |
| spdy_session_pool()->CreateAvailableSessionFromSocketHandle( |
| spdy_session_key(), std::move(handle), net_log(), &spdy_session_); |
| if (create_result != OK) { |
| HandleAttemptFailure(std::move(in_flight_attempt), create_result); |
| return; |
| } |
| CHECK(spdy_session_); |
| |
| // Cancel in-flight requests and close idle streams as we don't need them |
| // anymore. |
| group_->Refresh(); |
| |
| NotifyPreconnectsComplete(OK); |
| CreateSpdyStreamAndNotify(); |
| return; |
| } |
| |
| ProcessPreconnectsAfterAttemptComplete(rv); |
| |
| CHECK_NE(stream_socket->GetNegotiatedProtocol(), NextProto::kProtoHTTP2); |
| CreateTextBasedStreamAndNotify(std::move(stream_socket), |
| std::move(connect_timing)); |
| } |
| |
| void HttpStreamPool::Job::OnInFlightAttemptTcpHandshakeComplete( |
| InFlightAttempt* raw_attempt, |
| int rv) { |
| auto it = in_flight_attempts_.find(raw_attempt); |
| CHECK(it != in_flight_attempts_.end()); |
| if (raw_attempt->is_slow || !raw_attempt->slow_timer.IsRunning()) { |
| return; |
| } |
| |
| raw_attempt->slow_timer.Stop(); |
| } |
| |
| void HttpStreamPool::Job::OnInFlightAttemptSlow(InFlightAttempt* raw_attempt) { |
| auto it = in_flight_attempts_.find(raw_attempt); |
| CHECK(it != in_flight_attempts_.end()); |
| |
| raw_attempt->is_slow = true; |
| ++slow_attempt_count_; |
| slow_ip_endpoints_.emplace(raw_attempt->attempt->ip_endpoint()); |
| prefer_ipv6_ = !raw_attempt->attempt->ip_endpoint().address().IsIPv6(); |
| |
| MaybeAttemptConnection(); |
| } |
| |
| void HttpStreamPool::Job::HandleAttemptFailure( |
| std::unique_ptr<InFlightAttempt> in_flight_attempt, |
| int rv) { |
| CHECK_NE(rv, ERR_IO_PENDING); |
| failed_ip_endpoints_.emplace(in_flight_attempt->attempt->ip_endpoint()); |
| |
| ProcessPreconnectsAfterAttemptComplete(rv); |
| |
| if (is_failing_) { |
| // `this` has already failed and is notifying requests to the failure. |
| return; |
| } |
| |
| error_to_notify_ = rv; |
| |
| if (rv == ERR_SSL_CLIENT_AUTH_CERT_NEEDED) { |
| CHECK(UsingTls()); |
| client_auth_cert_info_ = in_flight_attempt->attempt->GetCertRequestInfo(); |
| in_flight_attempt.reset(); |
| NotifyFailure(); |
| return; |
| } |
| |
| if (IsCertificateError(rv)) { |
| // When a certificate error happened for an attempt, notifies all requests |
| // of the error. |
| CHECK(UsingTls()); |
| CHECK(in_flight_attempt->attempt->stream_socket()); |
| SSLInfo ssl_info; |
| bool has_ssl_info = |
| in_flight_attempt->attempt->stream_socket()->GetSSLInfo(&ssl_info); |
| CHECK(has_ssl_info); |
| cert_error_ssl_info_ = ssl_info; |
| in_flight_attempt.reset(); |
| NotifyFailure(); |
| } else { |
| in_flight_attempt.reset(); |
| MaybeAttemptConnection(); |
| } |
| } |
| |
| void HttpStreamPool::Job::OnSpdyThrottleDelayPassed() { |
| CHECK(!spdy_throttle_delay_passed_); |
| spdy_throttle_delay_passed_ = true; |
| MaybeAttemptConnection(); |
| } |
| |
| void HttpStreamPool::Job::MaybeComplete() { |
| if (!requests_.empty() || !notified_requests_.empty() || |
| !preconnects_.empty() || !in_flight_attempts_.empty()) { |
| return; |
| } |
| |
| group_->OnJobComplete(); |
| // `this` is deleted. |
| } |
| |
| } // namespace net |