| // Copyright 2020 The Chromium Authors |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "components/cast_streaming/browser/cast_streaming_session.h" |
| |
| #include "base/command_line.h" |
| #include "base/functional/bind.h" |
| #include "base/functional/callback.h" |
| #include "base/task/sequenced_task_runner.h" |
| #include "base/time/time.h" |
| #include "components/cast_streaming/browser/cast_message_port_converter.h" |
| #include "components/cast_streaming/browser/cast_streaming_switches.h" |
| #include "components/cast_streaming/browser/common/decoder_buffer_factory.h" |
| #include "components/cast_streaming/browser/control/remoting/remoting_decoder_buffer_factory.h" |
| #include "components/cast_streaming/browser/frame/mirroring_decoder_buffer_factory.h" |
| #include "components/cast_streaming/browser/frame/stream_consumer.h" |
| #include "components/cast_streaming/browser/receiver_config_conversions.h" |
| #include "components/cast_streaming/common/public/features.h" |
| #include "media/base/demuxer_stream.h" |
| #include "media/base/media_switches.h" |
| #include "media/base/timestamp_constants.h" |
| #include "media/cast/openscreen/config_conversions.h" |
| #include "media/mojo/common/mojo_decoder_buffer_converter.h" |
| #include "mojo/public/cpp/system/data_pipe.h" |
| |
| namespace cast_streaming { |
| namespace { |
| |
| // Timeout to stop the Session when no data is received. |
| constexpr base::TimeDelta kNoDataTimeout = base::Seconds(15); |
| |
| // Get the receiver streaming endpoint settings. |
| const openscreen::IPEndpoint GetReceiverStreamingEndpoint() { |
| const std::string port = |
| base::CommandLine::ForCurrentProcess()->GetSwitchValueASCII( |
| switches::kCastStreamingReceiverPort); |
| if (!port.empty()) { |
| const auto ipv4_ep = openscreen::IPEndpoint::Parse("0.0.0.0:" + port); |
| if (ipv4_ep.is_value()) { |
| LOG(INFO) << "Using UDP port " << port << " for Cast streaming."; |
| return ipv4_ep.value(); |
| } |
| LOG(ERROR) << "Can not parse value(" << port << ") from --" |
| << switches::kCastStreamingReceiverPort; |
| } |
| return openscreen::IPEndpoint::kAnyV4(); |
| } |
| |
| bool CreateDataPipeForStreamType(media::DemuxerStream::Type type, |
| mojo::ScopedDataPipeProducerHandle* producer, |
| mojo::ScopedDataPipeConsumerHandle* consumer) { |
| const MojoCreateDataPipeOptions data_pipe_options{ |
| sizeof(MojoCreateDataPipeOptions), MOJO_CREATE_DATA_PIPE_FLAG_NONE, |
| 1u /* element_num_bytes */, |
| media::GetDefaultDecoderBufferConverterCapacity(type)}; |
| MojoResult result = |
| mojo::CreateDataPipe(&data_pipe_options, *producer, *consumer); |
| return result == MOJO_RESULT_OK; |
| } |
| |
| // Timeout to end the Session when no offer message is sent. |
| constexpr base::TimeDelta kInitTimeout = base::Seconds(5); |
| |
| StreamingInitializationInfo CreateMirroringInitializationInfo( |
| const openscreen::cast::ReceiverSession* session, |
| openscreen::cast::ReceiverSession::ConfiguredReceivers receivers) { |
| std::optional<StreamingInitializationInfo::AudioStreamInfo> audio_stream_info; |
| if (receivers.audio_receiver) { |
| audio_stream_info.emplace( |
| media::cast::ToAudioDecoderConfig(receivers.audio_config), |
| receivers.audio_receiver); |
| } |
| |
| std::optional<StreamingInitializationInfo::VideoStreamInfo> video_stream_info; |
| if (receivers.video_receiver) { |
| video_stream_info.emplace( |
| media::cast::ToVideoDecoderConfig(receivers.video_config), |
| receivers.video_receiver); |
| } |
| |
| return {session, std::move(audio_stream_info), std::move(video_stream_info), |
| /* is_remoting = */ false}; |
| } |
| |
| } // namespace |
| |
| CastStreamingSession::ReceiverSessionClient::ReceiverSessionClient( |
| CastStreamingSession::Client* client, |
| std::optional<RendererControllerConfig> renderer_controls, |
| ReceiverConfig av_constraints, |
| ReceiverSession::MessagePortProvider message_port_provider, |
| scoped_refptr<base::SequencedTaskRunner> task_runner) |
| : task_runner_(task_runner), |
| environment_(&openscreen::Clock::now, |
| task_runner_, |
| GetReceiverStreamingEndpoint()), |
| cast_message_port_converter_(CastMessagePortConverter::Create( |
| std::move(message_port_provider), |
| base::BindOnce( |
| &CastStreamingSession::ReceiverSessionClient::OnCastChannelClosed, |
| base::Unretained(this)))), |
| client_(client), |
| weak_factory_(this) { |
| DCHECK(task_runner); |
| DCHECK(client_); |
| |
| // This will fail if the "trivial" implementation of |
| // CastMessagePortConverter::Create is linked. |
| DCHECK(cast_message_port_converter_); |
| |
| receiver_session_ = std::make_unique<openscreen::cast::ReceiverSession>( |
| *this, environment_, cast_message_port_converter_->GetMessagePort(), |
| ToOpenscreenConstraints(av_constraints)); |
| |
| if (renderer_controls) { |
| playback_command_dispatcher_ = std::make_unique<PlaybackCommandDispatcher>( |
| task_runner, std::move(renderer_controls.value().control_configuration), |
| base::BindRepeating( |
| &CastStreamingSession::ReceiverSessionClient::OnFlushUntil, |
| weak_factory_.GetWeakPtr()), |
| std::move(av_constraints.remoting)); |
| playback_command_dispatcher_->RegisterCommandSource( |
| std::move(renderer_controls.value().external_renderer_controls)); |
| } |
| |
| init_timeout_timer_.Start( |
| FROM_HERE, kInitTimeout, |
| base::BindOnce( |
| &CastStreamingSession::ReceiverSessionClient::OnInitializationTimeout, |
| base::Unretained(this))); |
| } |
| |
| void CastStreamingSession::ReceiverSessionClient::GetAudioBuffer( |
| base::OnceClosure no_frames_available_cb) { |
| if (preloaded_audio_buffer_) { |
| DCHECK(preloaded_audio_buffer_.value()); |
| if (client_) { |
| client_->OnAudioBufferReceived( |
| std::move(preloaded_audio_buffer_.value())); |
| } |
| preloaded_audio_buffer_ = std::nullopt; |
| return; |
| } |
| |
| DCHECK(audio_consumer_); |
| audio_consumer_->ReadFrame(std::move(no_frames_available_cb)); |
| } |
| |
| void CastStreamingSession::ReceiverSessionClient::GetVideoBuffer( |
| base::OnceClosure no_frames_available_cb) { |
| if (preloaded_video_buffer_) { |
| DCHECK(preloaded_video_buffer_.value()); |
| if (client_) { |
| client_->OnVideoBufferReceived( |
| std::move(preloaded_video_buffer_.value())); |
| } |
| preloaded_video_buffer_ = std::nullopt; |
| return; |
| } |
| |
| DCHECK(video_consumer_); |
| video_consumer_->ReadFrame(std::move(no_frames_available_cb)); |
| } |
| |
| void CastStreamingSession::ReceiverSessionClient::PreloadAudioBuffer( |
| media::mojom::DecoderBufferPtr buffer) { |
| DCHECK(!preloaded_audio_buffer_); |
| DCHECK(buffer); |
| |
| DVLOG(1) << "Audio buffer preloaded!"; |
| |
| preloaded_audio_buffer_ = std::move(buffer); |
| if (playback_command_dispatcher_ && !ongoing_session_has_video()) { |
| playback_command_dispatcher_->TryStartPlayback( |
| (*preloaded_audio_buffer_)->get_data()->timestamp); |
| } |
| } |
| |
| void CastStreamingSession::ReceiverSessionClient::PreloadVideoBuffer( |
| media::mojom::DecoderBufferPtr buffer) { |
| DCHECK(!preloaded_video_buffer_); |
| DCHECK(buffer); |
| |
| DVLOG(1) << "Video buffer preloaded!"; |
| |
| preloaded_video_buffer_ = std::move(buffer); |
| if (playback_command_dispatcher_ && ongoing_session_has_video()) { |
| playback_command_dispatcher_->TryStartPlayback( |
| (*preloaded_video_buffer_)->get_data()->timestamp); |
| } |
| } |
| |
| CastStreamingSession::ReceiverSessionClient::~ReceiverSessionClient() { |
| // Teardown of the `receiver_session_` may trigger callbacks into `this`, |
| // so destroy it explicitly here, so that callbacks execute while all other |
| // members are still valid. |
| receiver_session_.reset(); |
| } |
| |
| void CastStreamingSession::ReceiverSessionClient::OnInitializationTimeout() { |
| DVLOG(1) << __func__; |
| DCHECK(!is_initialized_); |
| EndSession(); |
| is_initialized_ = true; |
| } |
| |
| std::optional<mojo::ScopedDataPipeConsumerHandle> |
| CastStreamingSession::ReceiverSessionClient::InitializeAudioConsumer( |
| const StreamingInitializationInfo& initialization_info) { |
| DCHECK(initialization_info.audio_stream_info); |
| |
| // Create the audio data pipe. |
| mojo::ScopedDataPipeProducerHandle data_pipe_producer; |
| mojo::ScopedDataPipeConsumerHandle data_pipe_consumer; |
| if (!CreateDataPipeForStreamType(media::DemuxerStream::Type::AUDIO, |
| &data_pipe_producer, &data_pipe_consumer)) { |
| return std::nullopt; |
| } |
| |
| std::unique_ptr<DecoderBufferFactory> decoder_buffer_factory; |
| if (initialization_info.is_remoting) { |
| decoder_buffer_factory = std::make_unique<RemotingDecoderBufferFactory>(); |
| } else { |
| // The duration is set to kNoTimestamp so the audio renderer does not block. |
| // Audio frames duration is not known ahead of time in mirroring. |
| decoder_buffer_factory = std::make_unique<MirroringDecoderBufferFactory>( |
| initialization_info.audio_stream_info->receiver->rtp_timebase(), |
| media::kNoTimestamp); |
| } |
| |
| // We can use unretained pointers here because StreamConsumer is owned by |
| // this object and |client_| is guaranteed to outlive this object. |
| audio_consumer_ = std::make_unique<StreamConsumer>( |
| initialization_info.audio_stream_info->receiver, |
| std::move(data_pipe_producer), |
| base::BindRepeating(&CastStreamingSession::Client::OnAudioBufferReceived, |
| base::Unretained(client_)), |
| base::BindRepeating(&base::OneShotTimer::Reset, |
| base::Unretained(&data_timeout_timer_)), |
| std::move(decoder_buffer_factory)); |
| |
| return data_pipe_consumer; |
| } |
| |
| std::optional<mojo::ScopedDataPipeConsumerHandle> |
| CastStreamingSession::ReceiverSessionClient::InitializeVideoConsumer( |
| const StreamingInitializationInfo& initialization_info) { |
| DCHECK(initialization_info.video_stream_info); |
| |
| // Create the video data pipe. |
| mojo::ScopedDataPipeProducerHandle data_pipe_producer; |
| mojo::ScopedDataPipeConsumerHandle data_pipe_consumer; |
| if (!CreateDataPipeForStreamType(media::DemuxerStream::Type::VIDEO, |
| &data_pipe_producer, &data_pipe_consumer)) { |
| return std::nullopt; |
| } |
| |
| std::unique_ptr<DecoderBufferFactory> decoder_buffer_factory; |
| if (initialization_info.is_remoting) { |
| decoder_buffer_factory = std::make_unique<RemotingDecoderBufferFactory>(); |
| } else { |
| // The frame duration is set to 10 minutes to work around cases where |
| // senders do not send data for a long period of time. We end up with |
| // overlapping video frames but this is fine since the media pipeline mostly |
| // considers the playout time when deciding which frame to present or play |
| decoder_buffer_factory = std::make_unique<MirroringDecoderBufferFactory>( |
| initialization_info.video_stream_info->receiver->rtp_timebase(), |
| base::Minutes(10)); |
| } |
| |
| // We can use unretained pointers here because StreamConsumer is owned by |
| // this object and |client_| is guaranteed to outlive this object. |
| // |data_timeout_timer_| is also owned by this object and will outlive both |
| // StreamConsumers. |
| video_consumer_ = std::make_unique<StreamConsumer>( |
| initialization_info.video_stream_info->receiver, |
| std::move(data_pipe_producer), |
| base::BindRepeating(&CastStreamingSession::Client::OnVideoBufferReceived, |
| base::Unretained(client_)), |
| base::BindRepeating(&base::OneShotTimer::Reset, |
| base::Unretained(&data_timeout_timer_)), |
| std::move(decoder_buffer_factory)); |
| |
| return data_pipe_consumer; |
| } |
| |
| void CastStreamingSession::ReceiverSessionClient::StartStreamingSession( |
| StreamingInitializationInfo initialization_info) { |
| DVLOG(1) << __func__; |
| DCHECK_EQ(initialization_info.session, receiver_session_.get()); |
| DCHECK(!initialization_info.is_remoting || IsCastRemotingEnabled()); |
| |
| // If a Flush() call is ongoing, its unsafe to begin streaming data, so |
| // instead stall this call until the Flush() call has completed. |
| DCHECK(!start_session_cb_); |
| if (is_flush_pending_) { |
| start_session_cb_ = base::BindOnce( |
| &CastStreamingSession::ReceiverSessionClient::StartStreamingSession, |
| weak_factory_.GetWeakPtr(), std::move(initialization_info)); |
| return; |
| } |
| |
| // If audio is not supported on this receiver, disable it to avoid AV-sync |
| // issues arising from waiting for audio frames before starting playback. |
| if (base::CommandLine::ForCurrentProcess()->HasSwitch( |
| switches::kDisableAudioOutput)) { |
| LOG(WARNING) << "Disabling audio for this session due to non-support by " |
| "the hosting product instance"; |
| initialization_info.audio_stream_info = std::nullopt; |
| } |
| |
| // This is necessary in case the offer message had no audio and no video |
| // stream. |
| if (!initialization_info.audio_stream_info && |
| !initialization_info.video_stream_info) { |
| EndSession(); |
| return; |
| } |
| |
| init_timeout_timer_.Stop(); |
| |
| bool is_new_offer = is_initialized_; |
| if (is_new_offer) { |
| // This is a second offer message, reinitialize the streams. |
| const bool new_offer_has_audio = !!initialization_info.audio_stream_info; |
| const bool new_offer_has_video = !!initialization_info.video_stream_info; |
| |
| if (new_offer_has_audio != ongoing_session_has_audio() || |
| new_offer_has_video != ongoing_session_has_video()) { |
| // This call to StartStreamingSession() has support for audio and/or video |
| // streaming which does not match the ones provided during a prior call to |
| // this method. Return early here. |
| DLOG(ERROR) << "New streaming session has support for audio or video " |
| "which does not match the ones provided during a prior " |
| "streaming initialization."; |
| EndSession(); |
| return; |
| } |
| } |
| |
| // Set |is_initialized_| now so we can return early on failure. |
| is_initialized_ = true; |
| |
| std::optional<mojo::ScopedDataPipeConsumerHandle> audio_pipe_consumer_handle; |
| if (initialization_info.audio_stream_info) { |
| audio_pipe_consumer_handle = InitializeAudioConsumer(initialization_info); |
| if (audio_pipe_consumer_handle) { |
| DVLOG(1) << "Initialized audio stream. " |
| << initialization_info.audio_stream_info->config |
| .AsHumanReadableString(); |
| } else { |
| EndSession(); |
| return; |
| } |
| } |
| |
| std::optional<mojo::ScopedDataPipeConsumerHandle> video_pipe_consumer_handle; |
| if (initialization_info.video_stream_info) { |
| video_pipe_consumer_handle = InitializeVideoConsumer(initialization_info); |
| if (video_pipe_consumer_handle) { |
| DVLOG(1) << "Initialized video stream. " |
| << initialization_info.video_stream_info->config |
| .AsHumanReadableString(); |
| } else { |
| audio_consumer_.reset(); |
| EndSession(); |
| return; |
| } |
| } |
| |
| if (is_new_offer) { |
| if (client_) { |
| client_->OnSessionReinitialization(std::move(initialization_info), |
| std::move(audio_pipe_consumer_handle), |
| std::move(video_pipe_consumer_handle)); |
| } |
| } else { |
| if (client_) { |
| client_->OnSessionInitialization(std::move(initialization_info), |
| std::move(audio_pipe_consumer_handle), |
| std::move(video_pipe_consumer_handle)); |
| } |
| data_timeout_timer_.Start( |
| FROM_HERE, kNoDataTimeout, |
| base::BindOnce( |
| &CastStreamingSession::ReceiverSessionClient::OnDataTimeout, |
| base::Unretained(this))); |
| } |
| } |
| |
| void CastStreamingSession::ReceiverSessionClient::OnNegotiated( |
| const openscreen::cast::ReceiverSession* session, |
| openscreen::cast::ReceiverSession::ConfiguredReceivers receivers) { |
| StartStreamingSession( |
| CreateMirroringInitializationInfo(session, std::move(receivers))); |
| } |
| |
| void CastStreamingSession::ReceiverSessionClient::OnRemotingNegotiated( |
| const openscreen::cast::ReceiverSession* session, |
| openscreen::cast::ReceiverSession::RemotingNegotiation negotiation) { |
| DCHECK(playback_command_dispatcher_); |
| playback_command_dispatcher_->OnRemotingSessionNegotiated( |
| negotiation.messenger); |
| playback_command_dispatcher_->ConfigureRemotingAsync( |
| this, session, std::move(negotiation.receivers)); |
| } |
| |
| void CastStreamingSession::ReceiverSessionClient::OnReceiversDestroying( |
| const openscreen::cast::ReceiverSession* session, |
| ReceiversDestroyingReason reason) { |
| // This can be called when |receiver_session_| is being destroyed, so we |
| // do not sanity-check |session| here. |
| DVLOG(1) << __func__; |
| if (playback_command_dispatcher_) { |
| playback_command_dispatcher_->OnRemotingSessionEnded(); |
| } |
| |
| preloaded_audio_buffer_ = std::nullopt; |
| preloaded_video_buffer_ = std::nullopt; |
| |
| switch (reason) { |
| case ReceiversDestroyingReason::kEndOfSession: |
| EndSession(); |
| break; |
| case ReceiversDestroyingReason::kRenegotiated: |
| if (playback_command_dispatcher_) { |
| if (is_flush_pending_) { |
| DLOG(WARNING) |
| << "Skipping call to Flush() because one is already in progress"; |
| } else { |
| DVLOG(1) << "Calling Flush()"; |
| is_flush_pending_ = true; |
| playback_command_dispatcher_->Flush(base::BindOnce( |
| &CastStreamingSession::ReceiverSessionClient::OnFlushComplete, |
| weak_factory_.GetWeakPtr())); |
| } |
| } |
| if (client_) { |
| client_->OnSessionReinitializationPending(); |
| } |
| break; |
| } |
| } |
| |
| void CastStreamingSession::ReceiverSessionClient::OnFlushComplete() { |
| DCHECK(is_flush_pending_); |
| |
| DVLOG(1) << "Flush() Complete!"; |
| is_flush_pending_ = false; |
| if (start_session_cb_) { |
| std::move(start_session_cb_).Run(); |
| } |
| } |
| |
| void CastStreamingSession::ReceiverSessionClient::OnFlushUntil( |
| uint32_t audio_count, |
| uint32_t video_count) { |
| DVLOG(1) << "OnFlushUntil called: (audio_count=" << audio_count |
| << ", video_count=" << video_count << ")"; |
| if (audio_consumer_) { |
| audio_consumer_->FlushUntil(audio_count); |
| } |
| if (video_consumer_) { |
| video_consumer_->FlushUntil(video_count); |
| } |
| } |
| |
| void CastStreamingSession::ReceiverSessionClient::OnError( |
| const openscreen::cast::ReceiverSession* session, |
| const openscreen::Error& error) { |
| DCHECK_EQ(session, receiver_session_.get()); |
| LOG(ERROR) << error; |
| if (!is_initialized_) { |
| EndSession(); |
| is_initialized_ = true; |
| } |
| } |
| |
| void CastStreamingSession::ReceiverSessionClient::OnDataTimeout() { |
| DLOG(ERROR) << __func__ << ": Session ended due to timeout"; |
| receiver_session_.reset(); |
| EndSession(); |
| } |
| |
| void CastStreamingSession::ReceiverSessionClient::OnCastChannelClosed() { |
| DLOG(ERROR) << __func__ << ": Session ended due to cast channel closure"; |
| receiver_session_.reset(); |
| EndSession(); |
| } |
| |
| void CastStreamingSession::ReceiverSessionClient::EndSession() { |
| if (client_) { |
| client_->OnSessionEnded(); |
| client_ = nullptr; |
| } |
| } |
| |
| base::WeakPtr<CastStreamingSession::ReceiverSessionClient> |
| CastStreamingSession::ReceiverSessionClient::GetWeakPtr() { |
| return weak_factory_.GetWeakPtr(); |
| } |
| |
| CastStreamingSession::Client::~Client() = default; |
| CastStreamingSession::CastStreamingSession() = default; |
| CastStreamingSession::~CastStreamingSession() = default; |
| |
| void CastStreamingSession::Start( |
| Client* client, |
| std::optional<RendererControllerConfig> renderer_controls, |
| ReceiverConfig av_constraints, |
| ReceiverSession::MessagePortProvider message_port_provider, |
| scoped_refptr<base::SequencedTaskRunner> task_runner) { |
| DVLOG(1) << __func__; |
| DCHECK(client); |
| DCHECK(!receiver_session_); |
| receiver_session_ = std::make_unique<ReceiverSessionClient>( |
| client, std::move(renderer_controls), std::move(av_constraints), |
| std::move(message_port_provider), task_runner); |
| } |
| |
| void CastStreamingSession::Stop() { |
| DVLOG(1) << __func__; |
| DCHECK(receiver_session_); |
| receiver_session_.reset(); |
| } |
| |
| AudioDemuxerStreamDataProvider::RequestBufferCB |
| CastStreamingSession::GetAudioBufferRequester() { |
| DCHECK(receiver_session_); |
| return base::BindRepeating( |
| &CastStreamingSession::ReceiverSessionClient::GetAudioBuffer, |
| receiver_session_->GetWeakPtr()); |
| } |
| |
| VideoDemuxerStreamDataProvider::RequestBufferCB |
| CastStreamingSession::GetVideoBufferRequester() { |
| DCHECK(receiver_session_); |
| return base::BindRepeating( |
| &CastStreamingSession::ReceiverSessionClient::GetVideoBuffer, |
| receiver_session_->GetWeakPtr()); |
| } |
| |
| CastStreamingSession::PreloadBufferCB |
| CastStreamingSession::GetAudioBufferPreloader() { |
| DCHECK(receiver_session_); |
| return base::BindRepeating( |
| &CastStreamingSession::ReceiverSessionClient::PreloadAudioBuffer, |
| receiver_session_->GetWeakPtr()); |
| } |
| |
| CastStreamingSession::PreloadBufferCB |
| CastStreamingSession::GetVideoBufferPreloader() { |
| DCHECK(receiver_session_); |
| return base::BindRepeating( |
| &CastStreamingSession::ReceiverSessionClient::PreloadVideoBuffer, |
| receiver_session_->GetWeakPtr()); |
| } |
| |
| } // namespace cast_streaming |