blob: 2ce7086b6f60f3dcfc28a38807241a5574149cbc [file] [log] [blame]
// Copyright 2024 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "net/http/http_stream_pool_group.h"
#include "base/task/sequenced_task_runner.h"
#include "base/types/expected.h"
#include "net/base/completion_once_callback.h"
#include "net/base/load_timing_info.h"
#include "net/base/net_errors.h"
#include "net/http/http_basic_stream.h"
#include "net/http/http_network_session.h"
#include "net/http/http_stream.h"
#include "net/http/http_stream_key.h"
#include "net/http/http_stream_pool_attempt_manager.h"
#include "net/http/http_stream_pool_handle.h"
#include "net/log/net_log_event_type.h"
#include "net/log/net_log_with_source.h"
#include "net/socket/next_proto.h"
#include "net/socket/stream_socket.h"
#include "net/socket/stream_socket_close_reason.h"
#include "net/third_party/quiche/src/quiche/quic/core/quic_versions.h"
namespace net {
namespace {
bool IsNegotiatedProtocolTextBased(NextProto next_proto) {
return next_proto == NextProto::kProtoUnknown ||
next_proto == NextProto::kProtoHTTP11;
}
void RecordNetLogClosingSocket(const StreamSocket& stream_socket,
std::string_view reason) {
stream_socket.NetLog().AddEventWithStringParams(
NetLogEventType::HTTP_STREAM_POOL_CLOSING_SOCKET, "reason", reason);
}
} // namespace
// static
base::expected<void, std::string_view>
HttpStreamPool::Group::IsIdleStreamSocketUsable(const IdleStreamSocket& idle) {
base::TimeDelta timeout = idle.stream_socket->WasEverUsed()
? kUsedIdleStreamSocketTimeout
: kUnusedIdleStreamSocketTimeout;
if (base::TimeTicks::Now() - idle.time_became_idle >= timeout) {
return base::unexpected(kIdleTimeLimitExpired);
}
if (idle.stream_socket->WasEverUsed()) {
if (idle.stream_socket->IsConnectedAndIdle()) {
return base::ok();
}
if (idle.stream_socket->IsConnected()) {
return base::unexpected(kDataReceivedUnexpectedly);
} else {
return base::unexpected(kRemoteSideClosedConnection);
}
}
if (idle.stream_socket->IsConnected()) {
return base::ok();
}
return base::unexpected(kRemoteSideClosedConnection);
}
HttpStreamPool::Group::IdleStreamSocket::IdleStreamSocket(
std::unique_ptr<StreamSocket> stream_socket,
base::TimeTicks time_became_idle)
: stream_socket(std::move(stream_socket)),
time_became_idle(time_became_idle) {}
HttpStreamPool::Group::IdleStreamSocket::~IdleStreamSocket() = default;
HttpStreamPool::Group::Group(
HttpStreamPool* pool,
HttpStreamKey stream_key,
std::optional<QuicSessionAliasKey> quic_session_alias_key)
: pool_(pool),
stream_key_(std::move(stream_key)),
spdy_session_key_(stream_key_.CalculateSpdySessionKey()),
quic_session_alias_key_(quic_session_alias_key.has_value()
? std::move(*quic_session_alias_key)
: stream_key_.CalculateQuicSessionAliasKey()),
net_log_(
NetLogWithSource::Make(http_network_session()->net_log(),
NetLogSourceType::HTTP_STREAM_POOL_GROUP)),
force_quic_(
http_network_session()->ShouldForceQuic(stream_key_.destination(),
ProxyInfo::Direct(),
/*is_websocket=*/false)) {
net_log_.BeginEvent(NetLogEventType::HTTP_STREAM_POOL_GROUP_ALIVE, [&] {
base::Value::Dict dict;
dict.Set("stream_key", stream_key_.ToValue());
dict.Set("force_quic", force_quic_);
return dict;
});
}
HttpStreamPool::Group::~Group() {
// TODO(crbug.com/346835898): Ensure `pool_`'s total active stream counts
// are consistent.
net_log_.EndEvent(NetLogEventType::HTTP_STREAM_POOL_GROUP_ALIVE);
}
std::unique_ptr<HttpStreamPool::Job> HttpStreamPool::Group::CreateJob(
Job::Delegate* delegate,
quic::ParsedQuicVersion quic_version,
NextProto expected_protocol,
const NetLogWithSource& request_net_log) {
return std::make_unique<Job>(delegate, JobType::kRequest, this, quic_version,
expected_protocol, request_net_log);
}
void HttpStreamPool::Group::OnJobComplete(Job* job) {
if (attempt_manager_) {
attempt_manager_->OnJobComplete(job);
// `this` may be deleted.
} else {
MaybeComplete();
}
}
std::unique_ptr<HttpStreamPoolHandle> HttpStreamPool::Group::CreateHandle(
std::unique_ptr<StreamSocket> socket,
StreamSocketHandle::SocketReuseType reuse_type,
LoadTimingInfo::ConnectTiming connect_timing) {
++handed_out_stream_count_;
pool_->IncrementTotalHandedOutStreamCount();
net_log_.AddEvent(NetLogEventType::HTTP_STREAM_POOL_GROUP_HANDLE_CREATED,
[&] {
base::Value::Dict dict;
socket->NetLog().source().AddToEventParameters(dict);
dict.Set("reuse_type", static_cast<int>(reuse_type));
return dict;
});
auto handle = std::make_unique<HttpStreamPoolHandle>(
weak_ptr_factory_.GetWeakPtr(), std::move(socket), generation_);
handle->set_connect_timing(connect_timing);
handle->set_reuse_type(reuse_type);
return handle;
}
std::unique_ptr<HttpStream> HttpStreamPool::Group::CreateTextBasedStream(
std::unique_ptr<StreamSocket> socket,
StreamSocketHandle::SocketReuseType reuse_type,
LoadTimingInfo::ConnectTiming connect_timing) {
CHECK(IsNegotiatedProtocolTextBased(socket->GetNegotiatedProtocol()));
return std::make_unique<HttpBasicStream>(
CreateHandle(std::move(socket), reuse_type, std::move(connect_timing)),
/*is_for_get_to_http_proxy=*/false);
}
void HttpStreamPool::Group::ReleaseStreamSocket(
std::unique_ptr<StreamSocket> socket,
int64_t generation) {
CHECK_GT(handed_out_stream_count_, 0u);
--handed_out_stream_count_;
pool_->DecrementTotalHandedOutStreamCount();
bool reusable = false;
std::string_view not_reusable_reason;
if (!socket->IsConnectedAndIdle()) {
not_reusable_reason = socket->IsConnected()
? kDataReceivedUnexpectedly
: kClosedConnectionReturnedToPool;
} else if (generation != generation_) {
not_reusable_reason = kSocketGenerationOutOfDate;
} else if (ReachedMaxStreamLimit() || pool_->ReachedMaxStreamLimit()) {
not_reusable_reason = kExceededSocketLimits;
} else {
reusable = true;
}
if (reusable) {
AddIdleStreamSocket(std::move(socket));
} else {
RecordNetLogClosingSocket(*socket, not_reusable_reason);
socket.reset();
}
pool_->ProcessPendingRequestsInGroups();
MaybeComplete();
}
void HttpStreamPool::Group::AddIdleStreamSocket(
std::unique_ptr<StreamSocket> socket) {
CHECK(IsNegotiatedProtocolTextBased(socket->GetNegotiatedProtocol()));
CHECK_LE(ActiveStreamSocketCount(), pool_->max_stream_sockets_per_group());
idle_stream_sockets_.emplace_back(std::move(socket), base::TimeTicks::Now());
pool_->IncrementTotalIdleStreamCount();
CleanupIdleStreamSockets(CleanupMode::kTimeoutOnly, kIdleTimeLimitExpired);
ProcessPendingRequest();
}
std::unique_ptr<StreamSocket> HttpStreamPool::Group::GetIdleStreamSocket() {
// Iterate through the idle streams from oldtest to newest and try to find a
// used idle stream. Prefer the newest used idle stream.
auto idle_it = idle_stream_sockets_.end();
for (auto it = idle_stream_sockets_.begin();
it != idle_stream_sockets_.end();) {
const base::expected<void, std::string_view> usable_result =
IsIdleStreamSocketUsable(*it);
if (!usable_result.has_value()) {
RecordNetLogClosingSocket(*it->stream_socket, usable_result.error());
it = idle_stream_sockets_.erase(it);
pool_->DecrementTotalIdleStreamCount();
continue;
}
if (it->stream_socket->WasEverUsed()) {
idle_it = it;
}
++it;
}
if (idle_stream_sockets_.empty()) {
return nullptr;
}
if (idle_it == idle_stream_sockets_.end()) {
// There are no used idle streams. Pick the oldest (first) idle streams
// (FIFO).
idle_it = idle_stream_sockets_.begin();
}
CHECK(idle_it != idle_stream_sockets_.end());
std::unique_ptr<StreamSocket> stream_socket =
std::move(idle_it->stream_socket);
idle_stream_sockets_.erase(idle_it);
pool_->DecrementTotalIdleStreamCount();
return stream_socket;
}
void HttpStreamPool::Group::ProcessPendingRequest() {
// TODO(crbug.com/381742472): Ensure what we should do when failing.
if (!attempt_manager_) {
return;
}
attempt_manager_->ProcessPendingJob();
}
bool HttpStreamPool::Group::CloseOneIdleStreamSocket() {
if (idle_stream_sockets_.empty()) {
return false;
}
RecordNetLogClosingSocket(*idle_stream_sockets_.front().stream_socket,
kExceededSocketLimits);
idle_stream_sockets_.pop_front();
pool_->DecrementTotalIdleStreamCount();
// Use MaybeCompleteLater since MaybeComplete() may delete `this`, and this
// method could be called while iterating all groups.
MaybeCompleteLater();
return true;
}
size_t HttpStreamPool::Group::ConnectingStreamSocketCount() const {
return attempt_manager_ ? attempt_manager_->TcpBasedAttemptCount() : 0;
}
size_t HttpStreamPool::Group::ActiveStreamSocketCount() const {
return handed_out_stream_count_ + idle_stream_sockets_.size() +
ConnectingStreamSocketCount();
}
bool HttpStreamPool::Group::ReachedMaxStreamLimit() const {
return ActiveStreamSocketCount() >= pool_->max_stream_sockets_per_group();
}
std::optional<RequestPriority>
HttpStreamPool::Group::GetPriorityIfStalledByPoolLimit() const {
if (!attempt_manager_) {
return std::nullopt;
}
return attempt_manager_->IsStalledByPoolLimit()
? std::make_optional(attempt_manager_->GetPriority())
: std::nullopt;
}
void HttpStreamPool::Group::FlushWithError(
int error,
StreamSocketCloseReason attempt_cancel_reason,
std::string_view net_log_close_reason_utf8) {
Refresh(net_log_close_reason_utf8, attempt_cancel_reason);
CancelJobs(error);
}
void HttpStreamPool::Group::Refresh(std::string_view net_log_close_reason_utf8,
StreamSocketCloseReason cancel_reason) {
++generation_;
if (attempt_manager_) {
attempt_manager_->CancelTcpBasedAttempts(cancel_reason);
}
CleanupIdleStreamSockets(CleanupMode::kForce, net_log_close_reason_utf8);
}
void HttpStreamPool::Group::CloseIdleStreams(
std::string_view net_log_close_reason_utf8) {
CleanupIdleStreamSockets(CleanupMode::kForce, net_log_close_reason_utf8);
}
void HttpStreamPool::Group::CancelJobs(int error) {
if (attempt_manager_) {
attempt_manager_->CancelJobs(error);
}
}
HttpStreamPool::AttemptManager* HttpStreamPool::Group::GetAttemptManagerForJob(
Job* job) {
if (job->type() == JobType::kAltSvcQuicPreconnect) {
return GetAttemptManagerForAltSvcQuicPreconnect();
}
if (!attempt_manager_) {
attempt_manager_ = std::make_unique<AttemptManager>(
this, http_network_session()->net_log());
}
return attempt_manager_.get();
}
void HttpStreamPool::Group::OnAttemptManagerShuttingDown(
AttemptManager* attempt_manager) {
if (attempt_manager == attempt_manager_.get()) {
shutting_down_attempt_managers_.emplace(std::move(attempt_manager_));
CHECK(!attempt_manager_.get());
} else if (attempt_manager ==
alt_svc_quic_preconnect_attempt_manager_.get()) {
shutting_down_attempt_managers_.emplace(
std::move(alt_svc_quic_preconnect_attempt_manager_));
CHECK(!alt_svc_quic_preconnect_attempt_manager_.get());
} else {
NOTREACHED();
}
}
void HttpStreamPool::Group::OnAttemptManagerComplete(
AttemptManager* attempt_manager) {
auto it = shutting_down_attempt_managers_.find(attempt_manager);
if (it != shutting_down_attempt_managers_.end()) {
CHECK_NE(attempt_manager_.get(), attempt_manager);
CHECK_NE(alt_svc_quic_preconnect_attempt_manager_.get(), attempt_manager);
shutting_down_attempt_managers_.erase(it);
} else {
if (attempt_manager == attempt_manager_.get()) {
attempt_manager_.reset();
} else if (attempt_manager ==
alt_svc_quic_preconnect_attempt_manager_.get()) {
alt_svc_quic_preconnect_attempt_manager_.reset();
} else {
NOTREACHED();
}
}
MaybeComplete();
}
base::Value::Dict HttpStreamPool::Group::GetInfoAsValue() const {
base::Value::Dict dict;
dict.Set("active_socket_count", static_cast<int>(ActiveStreamSocketCount()));
dict.Set("idle_socket_count", static_cast<int>(IdleStreamSocketCount()));
dict.Set("handed_out_socket_count",
static_cast<int>(HandedOutStreamSocketCount()));
dict.Set("attempt_manager_alive", !!attempt_manager_);
if (attempt_manager_) {
dict.Set("attempt_state", attempt_manager_->GetInfoAsValue());
}
return dict;
}
void HttpStreamPool::Group::CleanupTimedoutIdleStreamSocketsForTesting() {
CleanupIdleStreamSockets(CleanupMode::kTimeoutOnly, "For testing");
}
void HttpStreamPool::Group::CleanupIdleStreamSockets(
CleanupMode mode,
std::string_view net_log_close_reason_utf8) {
// Iterate though the idle sockets to delete any disconnected ones.
for (auto it = idle_stream_sockets_.begin();
it != idle_stream_sockets_.end();) {
bool should_delete = mode == CleanupMode::kForce;
const base::expected<void, std::string_view> usable_result =
IsIdleStreamSocketUsable(*it);
if (!usable_result.has_value()) {
should_delete = true;
}
if (should_delete) {
RecordNetLogClosingSocket(*it->stream_socket, net_log_close_reason_utf8);
it = idle_stream_sockets_.erase(it);
pool_->DecrementTotalIdleStreamCount();
} else {
++it;
}
}
// Use MaybeCompleteLater since MaybeComplete() may delete `this`, and this
// method could be called while iterating all groups.
MaybeCompleteLater();
}
HttpStreamPool::AttemptManager*
HttpStreamPool::Group::GetAttemptManagerForAltSvcQuicPreconnect() {
if (!alt_svc_quic_preconnect_attempt_manager_) {
alt_svc_quic_preconnect_attempt_manager_ = std::make_unique<AttemptManager>(
this, http_network_session()->net_log());
}
return alt_svc_quic_preconnect_attempt_manager_.get();
}
bool HttpStreamPool::Group::CanComplete() const {
return ActiveStreamSocketCount() == 0 && !attempt_manager_ &&
!alt_svc_quic_preconnect_attempt_manager_ &&
shutting_down_attempt_managers_.empty();
}
void HttpStreamPool::Group::MaybeComplete() {
if (!CanComplete()) {
return;
}
pool_->OnGroupComplete(this);
// `this` is deleted.
}
void HttpStreamPool::Group::MaybeCompleteLater() {
if (CanComplete()) {
base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE,
base::BindOnce(&Group::MaybeComplete, weak_ptr_factory_.GetWeakPtr()));
}
}
} // namespace net