blob: 79b4ba48c6371961808fe0cfc3bd471f6c0eb3d1 [file] [log] [blame]
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "services/network/quic_transport.h"
#include "base/auto_reset.h"
#include "base/bind.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "net/base/io_buffer.h"
#include "net/quic/platform/impl/quic_mem_slice_impl.h"
#include "net/third_party/quiche/src/quic/core/quic_session.h"
#include "net/third_party/quiche/src/quic/core/quic_types.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice_span.h"
#include "net/third_party/quiche/src/quic/quic_transport/quic_transport_stream.h"
#include "services/network/network_context.h"
#include "services/network/public/mojom/quic_transport.mojom.h"
namespace network {
namespace {
net::QuicTransportClient::Parameters CreateParameters(
const std::vector<mojom::QuicTransportCertificateFingerprintPtr>&
fingerprints) {
net::QuicTransportClient::Parameters params;
for (const auto& fingerprint : fingerprints) {
params.server_certificate_fingerprints.push_back(
quic::CertificateFingerprint{.algorithm = fingerprint->algorithm,
.fingerprint = fingerprint->fingerprint});
}
return params;
}
} // namespace
class QuicTransport::Stream final {
public:
class StreamVisitor final : public quic::QuicTransportStream::Visitor {
public:
explicit StreamVisitor(Stream* stream)
: stream_(stream->weak_factory_.GetWeakPtr()) {}
~StreamVisitor() override {
if (stream_) {
if (stream_->incoming_) {
stream_->writable_watcher_.Cancel();
stream_->writable_.reset();
stream_->transport_->client_->OnIncomingStreamClosed(
stream_->id_,
/*fin_received=*/false);
stream_->incoming_ = nullptr;
}
if (stream_->outgoing_) {
stream_->readable_watcher_.Cancel();
stream_->readable_.reset();
stream_->outgoing_ = nullptr;
}
stream_->MayDisposeLater();
}
}
// Visitor implementation:
void OnCanRead() override {
base::SequencedTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::BindOnce(&Stream::Receive, stream_));
}
void OnFinRead() override {
if (stream_) {
stream_->OnFinRead();
}
}
void OnCanWrite() override {
base::SequencedTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::BindOnce(&Stream::Send, stream_));
}
private:
const base::WeakPtr<Stream> stream_;
};
// Bidirectional
Stream(QuicTransport* transport,
quic::QuicTransportStream* stream,
mojo::ScopedDataPipeConsumerHandle readable,
mojo::ScopedDataPipeProducerHandle writable)
: transport_(transport),
id_(stream->id()),
outgoing_(stream),
incoming_(stream),
readable_(std::move(readable)),
writable_(std::move(writable)),
readable_watcher_(FROM_HERE, ArmingPolicy::MANUAL),
writable_watcher_(FROM_HERE, ArmingPolicy::MANUAL) {
DCHECK(outgoing_);
DCHECK(incoming_);
DCHECK(readable_);
DCHECK(writable_);
Init();
}
// Unidirectional: outgoing
Stream(QuicTransport* transport,
quic::QuicTransportStream* outgoing,
mojo::ScopedDataPipeConsumerHandle readable)
: transport_(transport),
id_(outgoing->id()),
outgoing_(outgoing),
readable_(std::move(readable)),
readable_watcher_(FROM_HERE, ArmingPolicy::MANUAL),
writable_watcher_(FROM_HERE, ArmingPolicy::MANUAL) {
DCHECK(outgoing_);
DCHECK(readable_);
Init();
}
// Unidirectional: incoming
Stream(QuicTransport* transport,
quic::QuicTransportStream* incoming,
mojo::ScopedDataPipeProducerHandle writable)
: transport_(transport),
id_(incoming->id()),
incoming_(incoming),
writable_(std::move(writable)),
readable_watcher_(FROM_HERE, ArmingPolicy::MANUAL),
writable_watcher_(FROM_HERE, ArmingPolicy::MANUAL) {
DCHECK(incoming_);
DCHECK(writable_);
Init();
}
void NotifyFinFromClient() {
has_received_fin_from_client_ = true;
MaySendFin();
}
void Abort(quic::QuicRstStreamErrorCode code) {
auto* stream = incoming_ ? incoming_ : outgoing_;
if (!stream) {
return;
}
stream->Reset(code);
incoming_ = nullptr;
outgoing_ = nullptr;
readable_watcher_.Cancel();
readable_.reset();
MayDisposeLater();
}
~Stream() {
auto* stream = incoming_ ? incoming_ : outgoing_;
if (!stream) {
return;
}
stream->Reset(quic::QuicRstStreamErrorCode::QUIC_STREAM_CANCELLED);
}
private:
using ArmingPolicy = mojo::SimpleWatcher::ArmingPolicy;
void Init() {
if (outgoing_) {
DCHECK(readable_);
outgoing_->set_visitor(std::make_unique<StreamVisitor>(this));
readable_watcher_.Watch(
readable_.get(),
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
base::BindRepeating(&Stream::OnReadable, base::Unretained(this)));
readable_watcher_.ArmOrNotify();
}
if (incoming_) {
DCHECK(writable_);
if (incoming_ != outgoing_) {
incoming_->set_visitor(std::make_unique<StreamVisitor>(this));
}
writable_watcher_.Watch(
writable_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
base::BindRepeating(&Stream::OnWritable, base::Unretained(this)));
writable_watcher_.ArmOrNotify();
}
}
void OnReadable(MojoResult result, const mojo::HandleSignalsState& state) {
DCHECK_EQ(result, MOJO_RESULT_OK);
Send();
}
void Send() {
MaySendFin();
while (outgoing_ && outgoing_->CanWrite()) {
const void* data = nullptr;
uint32_t available = 0;
MojoResult result = readable_->BeginReadData(
&data, &available, MOJO_BEGIN_READ_DATA_FLAG_NONE);
if (result == MOJO_RESULT_SHOULD_WAIT) {
readable_watcher_.Arm();
return;
}
if (result == MOJO_RESULT_FAILED_PRECONDITION) {
has_seen_end_of_pipe_for_readable_ = true;
MaySendFin();
return;
}
DCHECK_EQ(result, MOJO_RESULT_OK);
bool send_result = outgoing_->Write(
absl::string_view(reinterpret_cast<const char*>(data), available));
if (!send_result) {
// TODO(yhirano): Handle this failure.
readable_->EndReadData(0);
return;
}
readable_->EndReadData(available);
}
}
void OnWritable(MojoResult result, const mojo::HandleSignalsState& state) {
DCHECK_EQ(result, MOJO_RESULT_OK);
Receive();
}
void MaySendFin() {
if (!outgoing_) {
return;
}
if (!has_seen_end_of_pipe_for_readable_ || !has_received_fin_from_client_) {
return;
}
if (outgoing_->SendFin()) {
outgoing_ = nullptr;
readable_watcher_.Cancel();
readable_.reset();
MayDisposeLater();
}
// Otherwise, retry in Send().
}
void Receive() {
while (incoming_ && incoming_->ReadableBytes() > 0) {
void* buffer = nullptr;
uint32_t available = 0;
base::AutoReset<bool> auto_reset(&in_two_phase_write_, true);
MojoResult result = writable_->BeginWriteData(
&buffer, &available, MOJO_BEGIN_WRITE_DATA_FLAG_NONE);
if (result == MOJO_RESULT_SHOULD_WAIT) {
writable_watcher_.Arm();
return;
}
if (result == MOJO_RESULT_FAILED_PRECONDITION) {
// The client doesn't want further data.
writable_watcher_.Cancel();
writable_.reset();
incoming_ = nullptr;
MayDisposeLater();
return;
}
DCHECK_EQ(result, MOJO_RESULT_OK);
const size_t num_read_bytes =
incoming_->Read(reinterpret_cast<char*>(buffer), available);
writable_->EndWriteData(num_read_bytes);
if (!incoming_) {
// |incoming_| can be null here, because OnFinRead can be called in
// QuicTransportStream::Read.
writable_watcher_.Cancel();
writable_.reset();
MayDisposeLater();
return;
}
}
}
void OnFinRead() {
incoming_ = nullptr;
transport_->client_->OnIncomingStreamClosed(id_, /*fin_received=*/true);
if (in_two_phase_write_) {
return;
}
writable_watcher_.Cancel();
writable_.reset();
}
void Dispose() {
transport_->streams_.erase(id_);
// Deletes |this|.
}
void MayDisposeLater() {
if (outgoing_ || incoming_) {
return;
}
base::SequencedTaskRunnerHandle::Get()->PostTask(
FROM_HERE,
base::BindOnce(&Stream::Dispose, weak_factory_.GetWeakPtr()));
}
QuicTransport* const transport_; // outlives |this|.
const uint32_t id_;
// |outgoing_| and |incoming_| point to the same stream when this is a
// bidirectional stream. They are owned by |transport_| (via
// quic::QuicSession), and the properties will be null-set when the streams
// are gone (via StreamVisitor).
quic::QuicTransportStream* outgoing_ = nullptr;
quic::QuicTransportStream* incoming_ = nullptr;
mojo::ScopedDataPipeConsumerHandle readable_; // for |outgoing|
mojo::ScopedDataPipeProducerHandle writable_; // for |incoming|
mojo::SimpleWatcher readable_watcher_;
mojo::SimpleWatcher writable_watcher_;
bool in_two_phase_write_ = false;
bool has_seen_end_of_pipe_for_readable_ = false;
bool has_received_fin_from_client_ = false;
// This must be the last member.
base::WeakPtrFactory<Stream> weak_factory_{this};
}; // namespace network
QuicTransport::QuicTransport(
const GURL& url,
const url::Origin& origin,
const net::NetworkIsolationKey& key,
const std::vector<mojom::QuicTransportCertificateFingerprintPtr>&
fingerprints,
NetworkContext* context,
mojo::PendingRemote<mojom::QuicTransportHandshakeClient> handshake_client)
: transport_(std::make_unique<net::QuicTransportClient>(
url,
origin,
this,
key,
context->url_request_context(),
CreateParameters(fingerprints))),
context_(context),
receiver_(this),
handshake_client_(std::move(handshake_client)) {
handshake_client_.set_disconnect_handler(
base::BindOnce(&QuicTransport::Dispose, base::Unretained(this)));
transport_->Connect();
}
QuicTransport::~QuicTransport() = default;
void QuicTransport::SendDatagram(base::span<const uint8_t> data,
base::OnceCallback<void(bool)> callback) {
DCHECK(!torn_down_);
auto buffer = base::MakeRefCounted<net::IOBuffer>(data.size());
memcpy(buffer->data(), data.data(), data.size());
quic::QuicMemSlice slice(
quic::QuicMemSliceImpl(std::move(buffer), data.size()));
const quic::MessageStatus status =
transport_->session()->datagram_queue()->SendOrQueueDatagram(
std::move(slice));
std::move(callback).Run(status == quic::MESSAGE_STATUS_SUCCESS);
}
void QuicTransport::CreateStream(
mojo::ScopedDataPipeConsumerHandle readable,
mojo::ScopedDataPipeProducerHandle writable,
base::OnceCallback<void(bool, uint32_t)> callback) {
// |readable| is non-nullable, |writable| is nullable.
DCHECK(readable);
if (handshake_client_) {
// Invalid request.
std::move(callback).Run(false, 0);
return;
}
quic::QuicTransportClientSession* const session = transport_->session();
if (writable) {
// Bidirectional
if (!session->CanOpenNextOutgoingBidirectionalStream()) {
// TODO(crbug.com/104236): Instead of rejecting the creation request, we
// should wait in this case.
std::move(callback).Run(false, 0);
return;
}
quic::QuicTransportStream* const stream =
session->OpenOutgoingBidirectionalStream();
DCHECK(stream);
streams_.insert(std::make_pair(
stream->id(),
std::make_unique<Stream>(this, stream, std::move(readable),
std::move(writable))));
std::move(callback).Run(true, stream->id());
return;
}
// Unidirectional
if (!session->CanOpenNextOutgoingUnidirectionalStream()) {
// TODO(crbug.com/104236): Instead of rejecting the creation request, we
// should wait in this case.
std::move(callback).Run(false, 0);
return;
}
quic::QuicTransportStream* const stream =
session->OpenOutgoingUnidirectionalStream();
DCHECK(stream);
streams_.insert(std::make_pair(
stream->id(),
std::make_unique<Stream>(this, stream, std::move(readable))));
std::move(callback).Run(true, stream->id());
}
void QuicTransport::AcceptBidirectionalStream(
BidirectionalStreamAcceptanceCallback acceptance) {
bidirectional_stream_acceptances_.push(std::move(acceptance));
OnIncomingBidirectionalStreamAvailable();
}
void QuicTransport::AcceptUnidirectionalStream(
UnidirectionalStreamAcceptanceCallback acceptance) {
unidirectional_stream_acceptances_.push(std::move(acceptance));
OnIncomingUnidirectionalStreamAvailable();
}
void QuicTransport::SendFin(uint32_t stream) {
auto it = streams_.find(stream);
if (it == streams_.end()) {
return;
}
it->second->NotifyFinFromClient();
}
void QuicTransport::AbortStream(uint32_t stream, uint64_t code) {
auto it = streams_.find(stream);
if (it == streams_.end()) {
return;
}
auto code_to_pass = quic::QuicRstStreamErrorCode::QUIC_STREAM_NO_ERROR;
if (code < quic::QuicRstStreamErrorCode::QUIC_STREAM_LAST_ERROR) {
code_to_pass = static_cast<quic::QuicRstStreamErrorCode>(code);
}
it->second->Abort(code_to_pass);
}
void QuicTransport::OnConnected() {
if (torn_down_) {
return;
}
DCHECK(handshake_client_);
handshake_client_->OnConnectionEstablished(
receiver_.BindNewPipeAndPassRemote(),
client_.BindNewPipeAndPassReceiver());
handshake_client_.reset();
client_.set_disconnect_handler(
base::BindOnce(&QuicTransport::Dispose, base::Unretained(this)));
}
void QuicTransport::OnConnectionFailed() {
if (torn_down_) {
return;
}
DCHECK(handshake_client_);
// Here we assume that the error is not going to handed to the
// initiator renderer.
handshake_client_->OnHandshakeFailed(transport_->error());
TearDown();
}
void QuicTransport::OnClosed() {
if (torn_down_) {
return;
}
DCHECK(!handshake_client_);
TearDown();
}
void QuicTransport::OnError() {
if (torn_down_) {
return;
}
DCHECK(!handshake_client_);
TearDown();
}
void QuicTransport::OnIncomingBidirectionalStreamAvailable() {
DCHECK(!handshake_client_);
DCHECK(client_);
while (!bidirectional_stream_acceptances_.empty()) {
quic::QuicTransportStream* const stream =
transport_->session()->AcceptIncomingBidirectionalStream();
if (!stream) {
return;
}
auto acceptance = std::move(bidirectional_stream_acceptances_.front());
bidirectional_stream_acceptances_.pop();
mojo::ScopedDataPipeConsumerHandle readable_for_outgoing;
mojo::ScopedDataPipeProducerHandle writable_for_outgoing;
mojo::ScopedDataPipeConsumerHandle readable_for_incoming;
mojo::ScopedDataPipeProducerHandle writable_for_incoming;
const MojoCreateDataPipeOptions options = {
sizeof(options), MOJO_CREATE_DATA_PIPE_FLAG_NONE, 1, 256 * 1024};
if (mojo::CreateDataPipe(&options, &writable_for_outgoing,
&readable_for_outgoing) != MOJO_RESULT_OK) {
stream->Reset(quic::QuicRstStreamErrorCode::QUIC_STREAM_CANCELLED);
// TODO(yhirano): Error the entire connection.
return;
}
if (mojo::CreateDataPipe(&options, &writable_for_incoming,
&readable_for_incoming) != MOJO_RESULT_OK) {
stream->Reset(quic::QuicRstStreamErrorCode::QUIC_STREAM_CANCELLED);
// TODO(yhirano): Error the entire connection.
return;
}
streams_.insert(std::make_pair(
stream->id(),
std::make_unique<Stream>(this, stream, std::move(readable_for_outgoing),
std::move(writable_for_incoming))));
std::move(acceptance)
.Run(stream->id(), std::move(readable_for_incoming),
std::move(writable_for_outgoing));
}
}
void QuicTransport::OnIncomingUnidirectionalStreamAvailable() {
DCHECK(!handshake_client_);
DCHECK(client_);
while (!unidirectional_stream_acceptances_.empty()) {
quic::QuicTransportStream* const stream =
transport_->session()->AcceptIncomingUnidirectionalStream();
if (!stream) {
return;
}
auto acceptance = std::move(unidirectional_stream_acceptances_.front());
unidirectional_stream_acceptances_.pop();
mojo::ScopedDataPipeConsumerHandle readable_for_incoming;
mojo::ScopedDataPipeProducerHandle writable_for_incoming;
const MojoCreateDataPipeOptions options = {
sizeof(options), MOJO_CREATE_DATA_PIPE_FLAG_NONE, 1, 256 * 1024};
if (mojo::CreateDataPipe(&options, &writable_for_incoming,
&readable_for_incoming) != MOJO_RESULT_OK) {
stream->Reset(quic::QuicRstStreamErrorCode::QUIC_STREAM_CANCELLED);
// TODO(yhirano): Error the entire connection.
return;
}
streams_.insert(std::make_pair(
stream->id(), std::make_unique<Stream>(
this, stream, std::move(writable_for_incoming))));
std::move(acceptance).Run(stream->id(), std::move(readable_for_incoming));
}
}
void QuicTransport::OnDatagramReceived(base::StringPiece datagram) {
if (torn_down_) {
return;
}
client_->OnDatagramReceived(base::make_span(
reinterpret_cast<const uint8_t*>(datagram.data()), datagram.size()));
}
void QuicTransport::OnCanCreateNewOutgoingBidirectionalStream() {
// TODO(yhirano): Implement this.
}
void QuicTransport::OnCanCreateNewOutgoingUnidirectionalStream() {
// TODO(yhirano): Implement this.
}
void QuicTransport::TearDown() {
torn_down_ = true;
receiver_.reset();
handshake_client_.reset();
client_.reset();
base::SequencedTaskRunnerHandle::Get()->PostTask(
FROM_HERE,
base::BindOnce(&QuicTransport::Dispose, weak_factory_.GetWeakPtr()));
}
void QuicTransport::Dispose() {
receiver_.reset();
context_->Remove(this);
// |this| is deleted.
}
} // namespace network