blob: 037ac54488c09bd0d220355bc119b0cdb76afc02 [file] [log] [blame]
// Copyright 2019 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "cast/streaming/receiver_session.h"
#include <algorithm>
#include <chrono>
#include <string>
#include <utility>
#include "absl/strings/numbers.h"
#include "cast/common/channel/message_util.h"
#include "cast/common/public/message_port.h"
#include "cast/streaming/answer_messages.h"
#include "cast/streaming/environment.h"
#include "cast/streaming/message_fields.h"
#include "cast/streaming/offer_messages.h"
#include "cast/streaming/receiver.h"
#include "cast/streaming/sender_message.h"
#include "util/json/json_helpers.h"
#include "util/osp_logging.h"
#include "util/std_util.h"
namespace openscreen::cast {
namespace {
template <typename Stream, typename Codec>
std::unique_ptr<Stream> SelectStream(
const std::vector<Codec>& preferred_codecs,
ReceiverSession::Client& client,
const std::vector<Stream>& offered_streams) {
for (auto codec : preferred_codecs) {
for (const Stream& offered_stream : offered_streams) {
if (offered_stream.codec == codec &&
(offered_stream.stream.codec_parameter.empty() ||
client.SupportsCodecParameter(
offered_stream.stream.codec_parameter))) {
OSP_VLOG << "Selected " << CodecToString(codec)
<< " as codec for streaming";
return std::make_unique<Stream>(offered_stream);
}
}
}
return nullptr;
}
MediaCapability ToCapability(AudioCodec codec) {
switch (codec) {
case AudioCodec::kAac:
return MediaCapability::kAac;
case AudioCodec::kOpus:
return MediaCapability::kOpus;
default:
OSP_DLOG_FATAL << "Invalid audio codec: " << static_cast<int>(codec);
OSP_NOTREACHED();
}
}
MediaCapability ToCapability(VideoCodec codec) {
switch (codec) {
case VideoCodec::kVp8:
return MediaCapability::kVp8;
case VideoCodec::kVp9:
return MediaCapability::kVp9;
case VideoCodec::kH264:
return MediaCapability::kH264;
case VideoCodec::kHevc:
return MediaCapability::kHevc;
case VideoCodec::kAv1:
return MediaCapability::kAv1;
default:
OSP_DLOG_FATAL << "Invalid video codec: " << static_cast<int>(codec);
OSP_NOTREACHED();
}
}
} // namespace
ReceiverSession::Client::~Client() = default;
ReceiverSession::ReceiverSession(Client& client,
Environment& environment,
MessagePort& message_port,
ReceiverConstraints constraints)
: client_(client),
environment_(environment),
constraints_(std::move(constraints)),
session_id_(MakeUniqueSessionId("streaming_receiver")),
messenger_(message_port,
session_id_,
[this](Error error) {
OSP_DLOG_WARN << "Got a session messenger error: " << error;
client_.OnError(this, error);
}),
packet_router_(environment_) {
OSP_CHECK(std::none_of(
constraints_.video_codecs.begin(), constraints_.video_codecs.end(),
[](VideoCodec c) { return c == VideoCodec::kNotSpecified; }));
OSP_CHECK(std::none_of(
constraints_.audio_codecs.begin(), constraints_.audio_codecs.end(),
[](AudioCodec c) { return c == AudioCodec::kNotSpecified; }));
messenger_.SetHandler(
SenderMessage::Type::kOffer,
[this](const std::string& sender_id, SenderMessage message) {
OnOffer(sender_id, std::move(message));
});
messenger_.SetHandler(
SenderMessage::Type::kGetCapabilities,
[this](const std::string& sender_id, SenderMessage message) {
OnCapabilitiesRequest(sender_id, std::move(message));
});
messenger_.SetHandler(
SenderMessage::Type::kRpc,
[this](const std::string& sender_id, SenderMessage message) {
this->OnRpcMessage(sender_id, std::move(message));
});
environment_.SetSocketSubscriber(this);
}
ReceiverSession::~ReceiverSession() {
ResetReceivers(Client::kEndOfSession);
}
void ReceiverSession::OnSocketReady() {
if (pending_offer_) {
InitializeSession(*pending_offer_);
pending_offer_.reset();
}
}
void ReceiverSession::OnSocketInvalid(const Error& error) {
if (pending_offer_) {
SendErrorAnswerReply(pending_offer_->sender_id,
pending_offer_->sequence_number, error);
pending_offer_.reset();
}
client_.OnError(this,
Error(Error::Code::kSocketFailure,
"The environment is invalid and should be replaced."));
}
bool ReceiverSession::PendingOffer::IsValid() const {
return (selected_audio || selected_video) && !sender_id.empty() &&
sequence_number >= 0;
}
void ReceiverSession::OnOffer(const std::string& sender_id,
SenderMessage message) {
// We just drop offers we can't respond to. Note that libcast senders will
// always send a strictly positive sequence numbers, but zero is permitted
// by the spec.
if (message.sequence_number < 0) {
OSP_DLOG_WARN
<< "Dropping offer with missing sequence number, can't respond";
return;
}
if (!message.valid) {
const Error error(Error::Code::kParameterInvalid,
"Failed to parse malformed OFFER");
SendErrorAnswerReply(sender_id, message.sequence_number, error);
client_.OnError(this, error);
return;
}
auto properties = std::make_unique<PendingOffer>();
properties->sender_id = sender_id;
properties->sequence_number = message.sequence_number;
const Offer& offer = absl::get<Offer>(message.body);
if (offer.cast_mode == CastMode::kRemoting) {
if (!constraints_.remoting) {
SendErrorAnswerReply(
sender_id, message.sequence_number,
Error(Error::Code::kRemotingNotSupported,
"This receiver does not have remoting enabled."));
return;
}
}
properties->mode = offer.cast_mode;
SelectStreams(offer, properties.get());
if (!properties->IsValid()) {
SendErrorAnswerReply(sender_id, message.sequence_number,
Error(Error::Code::kNoStreamSelected,
"Failed to select any streams from OFFER"));
return;
}
// Finally, before we update the pending offer let's make sure we don't
// already have one.
if (pending_offer_) {
SendErrorAnswerReply(
pending_offer_->sender_id, pending_offer_->sequence_number,
Error(Error::Code::kInterrupted,
"Received a new OFFER before negotiation could complete."));
pending_offer_.reset();
}
switch (environment_.socket_state()) {
// If the environment is ready or in a bad state, we can respond
// immediately.
case Environment::SocketState::kInvalid:
SendErrorAnswerReply(
sender_id, message.sequence_number,
Error(Error::Code::kSocketClosedFailure,
"UDP socket is closed, likely due to a bind error."));
break;
case Environment::SocketState::kReady:
InitializeSession(*properties);
break;
// Else we need to store the properties we just created until we get a
// ready or error event.
case Environment::SocketState::kStarting:
pending_offer_ = std::move(properties);
break;
}
}
void ReceiverSession::OnCapabilitiesRequest(const std::string& sender_id,
SenderMessage message) {
if (message.sequence_number < 0) {
OSP_DLOG_WARN
<< "Dropping offer with missing sequence number, can't respond";
return;
}
ReceiverMessage response{
ReceiverMessage::Type::kCapabilitiesResponse, message.sequence_number,
true /* valid */
};
if (constraints_.remoting) {
response.body = CreateRemotingCapabilityV2();
} else {
response.valid = false;
response.body = ReceiverError(Error::Code::kRemotingNotSupported,
"Remoting is not supported");
}
// NOTE: we respond to any arbitrary sender here, to allow sender to get
// capabilities before making an OFFER.
const Error result = messenger_.SendMessage(sender_id, std::move(response));
if (!result.ok()) {
client_.OnError(this, result);
}
}
void ReceiverSession::OnRpcMessage(const std::string& sender_id,
SenderMessage message) {
if (!message.valid) {
OSP_DLOG_WARN
<< "Bad RPC message. This may or may not represent a serious problem.";
return;
}
if (sender_id != negotiated_sender_id_) {
OSP_DLOG_INFO << "Received an RPC message from sender " << sender_id
<< "--which we haven't negotiated with, dropping.";
return;
}
const auto& body = absl::get<std::vector<uint8_t>>(message.body);
if (!rpc_messenger_) {
OSP_DLOG_INFO << "Received an RPC message without having a messenger.";
return;
}
rpc_messenger_->ProcessMessageFromRemote(body.data(), body.size());
}
void ReceiverSession::SendRpcMessage(std::vector<uint8_t> message) {
if (negotiated_sender_id_.empty()) {
OSP_DLOG_WARN
<< "Can't send an RPC message without a currently negotiated session.";
return;
}
const Error error = messenger_.SendMessage(
negotiated_sender_id_,
ReceiverMessage{ReceiverMessage::Type::kRpc, -1, true /* valid */,
std::move(message)});
if (!error.ok()) {
OSP_LOG_WARN << "Failed to send RPC message: " << error;
}
}
void ReceiverSession::SelectStreams(const Offer& offer,
PendingOffer* properties) {
if (offer.cast_mode == CastMode::kMirroring) {
if (!offer.audio_streams.empty() && !constraints_.audio_codecs.empty()) {
properties->selected_audio =
SelectStream(constraints_.audio_codecs, client_, offer.audio_streams);
}
if (!offer.video_streams.empty() && !constraints_.video_codecs.empty()) {
properties->selected_video =
SelectStream(constraints_.video_codecs, client_, offer.video_streams);
}
} else {
OSP_CHECK(offer.cast_mode == CastMode::kRemoting);
if (offer.audio_streams.size() == 1) {
properties->selected_audio =
std::make_unique<AudioStream>(offer.audio_streams[0]);
}
if (offer.video_streams.size() == 1) {
properties->selected_video =
std::make_unique<VideoStream>(offer.video_streams[0]);
}
}
}
void ReceiverSession::InitializeSession(const PendingOffer& properties) {
Answer answer = ConstructAnswer(properties);
if (!answer.IsValid()) {
// If the answer message is invalid, there is no point in setting up a
// negotiation because the sender won't be able to connect to it.
SendErrorAnswerReply(properties.sender_id, properties.sequence_number,
Error(Error::Code::kParameterInvalid,
"Failed to construct an ANSWER message"));
return;
}
// Only spawn receivers if we know we have a valid answer message.
ConfiguredReceivers receivers = SpawnReceivers(properties);
negotiated_sender_id_ = properties.sender_id;
if (properties.mode == CastMode::kMirroring) {
client_.OnNegotiated(this, std::move(receivers));
} else {
rpc_messenger_ =
std::make_unique<RpcMessenger>([this](std::vector<uint8_t> message) {
this->SendRpcMessage(std::move(message));
});
client_.OnRemotingNegotiated(
this, RemotingNegotiation{std::move(receivers), rpc_messenger_.get()});
}
const Error result = messenger_.SendMessage(
negotiated_sender_id_,
ReceiverMessage{ReceiverMessage::Type::kAnswer,
properties.sequence_number, true /* valid */,
std::move(answer)});
if (!result.ok()) {
client_.OnError(this, result);
}
}
std::unique_ptr<Receiver> ReceiverSession::ConstructReceiver(
const Stream& stream) {
// Session config is currently only for mirroring.
SessionConfig config = {stream.ssrc, stream.ssrc + 1,
stream.rtp_timebase, stream.channels,
stream.target_delay, stream.aes_key,
stream.aes_iv_mask, /* is_pli_enabled */ true};
if (!config.IsValid()) {
return nullptr;
}
return std::make_unique<Receiver>(environment_, packet_router_,
std::move(config));
}
ReceiverSession::ConfiguredReceivers ReceiverSession::SpawnReceivers(
const PendingOffer& properties) {
OSP_CHECK(properties.IsValid());
ResetReceivers(Client::kRenegotiated);
AudioCaptureConfig audio_config;
if (properties.selected_audio) {
current_audio_receiver_ =
ConstructReceiver(properties.selected_audio->stream);
audio_config =
AudioCaptureConfig{properties.selected_audio->codec,
properties.selected_audio->stream.channels,
properties.selected_audio->bit_rate,
properties.selected_audio->stream.rtp_timebase,
properties.selected_audio->stream.target_delay,
properties.selected_audio->stream.codec_parameter};
}
VideoCaptureConfig video_config;
if (properties.selected_video) {
current_video_receiver_ =
ConstructReceiver(properties.selected_video->stream);
video_config =
VideoCaptureConfig{properties.selected_video->codec,
properties.selected_video->max_frame_rate,
properties.selected_video->max_bit_rate,
properties.selected_video->resolutions,
properties.selected_video->stream.target_delay,
properties.selected_video->stream.codec_parameter};
}
return ConfiguredReceivers{current_audio_receiver_.get(),
std::move(audio_config),
current_video_receiver_.get(),
std::move(video_config), properties.sender_id};
}
void ReceiverSession::ResetReceivers(Client::ReceiversDestroyingReason reason) {
if (current_video_receiver_ || current_audio_receiver_) {
client_.OnReceiversDestroying(this, reason);
current_audio_receiver_.reset();
current_video_receiver_.reset();
rpc_messenger_.reset();
}
}
Answer ReceiverSession::ConstructAnswer(const PendingOffer& properties) {
OSP_CHECK(properties.IsValid());
std::vector<int> stream_indexes;
std::vector<Ssrc> stream_ssrcs;
Constraints constraints;
if (properties.selected_audio) {
stream_indexes.push_back(properties.selected_audio->stream.index);
stream_ssrcs.push_back(properties.selected_audio->stream.ssrc + 1);
for (const auto& limit : constraints_.audio_limits) {
if (limit.codec == properties.selected_audio->codec ||
limit.applies_to_all_codecs) {
constraints.audio = AudioConstraints{
limit.max_sample_rate, limit.max_channels, limit.min_bit_rate,
limit.max_bit_rate, limit.max_delay,
};
break;
}
}
}
if (properties.selected_video) {
stream_indexes.push_back(properties.selected_video->stream.index);
stream_ssrcs.push_back(properties.selected_video->stream.ssrc + 1);
for (const auto& limit : constraints_.video_limits) {
if (limit.codec == properties.selected_video->codec ||
limit.applies_to_all_codecs) {
constraints.video = VideoConstraints{
limit.max_pixels_per_second, std::nullopt, /* min dimensions */
limit.max_dimensions, limit.min_bit_rate,
limit.max_bit_rate, limit.max_delay,
};
break;
}
}
}
std::optional<DisplayDescription> display;
if (constraints_.display_description) {
const auto* d = constraints_.display_description.get();
display = DisplayDescription{d->dimensions, std::nullopt,
d->can_scale_content
? AspectRatioConstraint::kVariable
: AspectRatioConstraint::kFixed};
}
// Only set the constraints in the answer if they are valid (meaning we
// successfully found limits above).
std::optional<Constraints> answer_constraints;
if (constraints.IsValid()) {
answer_constraints = std::move(constraints);
}
return Answer{environment_.GetBoundLocalEndpoint().port,
std::move(stream_indexes), std::move(stream_ssrcs),
answer_constraints, std::move(display)};
}
ReceiverCapability ReceiverSession::CreateRemotingCapabilityV2() {
// If we don't support remoting, there is no reason to respond to
// capability requests--they are not used for mirroring.
OSP_CHECK(constraints_.remoting);
ReceiverCapability capability;
capability.remoting_version = kSupportedRemotingVersion;
for (const AudioCodec& codec : constraints_.audio_codecs) {
capability.media_capabilities.push_back(ToCapability(codec));
}
for (const VideoCodec& codec : constraints_.video_codecs) {
capability.media_capabilities.push_back(ToCapability(codec));
}
if (constraints_.remoting->supports_chrome_audio_codecs) {
capability.media_capabilities.push_back(MediaCapability::kAudio);
}
if (constraints_.remoting->supports_4k) {
capability.media_capabilities.push_back(MediaCapability::k4k);
}
return capability;
}
void ReceiverSession::SendErrorAnswerReply(const std::string& sender_id,
int sequence_number,
const Error& error) {
OSP_DLOG_WARN << error;
const Error result = messenger_.SendMessage(
sender_id,
ReceiverMessage{ReceiverMessage::Type::kAnswer, sequence_number,
false /* valid */, ReceiverError(error)});
if (!result.ok()) {
client_.OnError(this, result);
}
}
} // namespace openscreen::cast