| // Copyright 2018 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "components/mirroring/service/remoting_sender.h" |
| |
| #include <algorithm> |
| |
| #include "base/bind.h" |
| #include "base/bind_helpers.h" |
| #include "base/logging.h" |
| #include "base/time/default_tick_clock.h" |
| #include "media/cast/constants.h" |
| #include "media/cast/sender/sender_encoded_frame.h" |
| #include "media/mojo/common/mojo_data_pipe_read_write.h" |
| |
| namespace mirroring { |
| |
| RemotingSender::RemotingSender( |
| scoped_refptr<media::cast::CastEnvironment> cast_environment, |
| media::cast::CastTransport* transport, |
| const media::cast::FrameSenderConfig& config, |
| mojo::ScopedDataPipeConsumerHandle pipe, |
| media::mojom::RemotingDataStreamSenderRequest request, |
| base::OnceClosure error_callback) |
| : FrameSender(cast_environment, |
| transport, |
| config, |
| media::cast::NewFixedCongestionControl(config.max_bitrate)), |
| clock_(cast_environment->Clock()), |
| error_callback_(std::move(error_callback)), |
| data_pipe_reader_(new media::MojoDataPipeReader(std::move(pipe))), |
| binding_(this, std::move(request)), |
| input_queue_discards_remaining_(0), |
| is_reading_(false), |
| flow_restart_pending_(true), |
| weak_factory_(this) { |
| binding_.set_connection_error_handler(base::BindOnce( |
| &RemotingSender::OnRemotingDataStreamError, base::Unretained(this))); |
| } |
| |
| RemotingSender::~RemotingSender() {} |
| |
| void RemotingSender::SendFrame(uint32_t frame_size) { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| const bool need_to_start_processing = input_queue_.empty(); |
| input_queue_.push(base::BindRepeating(&RemotingSender::ReadFrame, |
| base::Unretained(this), frame_size)); |
| input_queue_.push(base::BindRepeating(&RemotingSender::TrySendFrame, |
| base::Unretained(this))); |
| if (need_to_start_processing) |
| ProcessNextInputTask(); |
| } |
| |
| void RemotingSender::CancelInFlightData() { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| |
| // TODO(miu): The following code is something we want to do as an |
| // optimization. However, as-is, it's not quite correct. We can only cancel |
| // frames where no packets have actually hit the network yet. Said another |
| // way, we can only cancel frames the receiver has definitely not seen any |
| // part of (including kickstarting!). http://crbug.com/647423 |
| #if 0 |
| if (latest_acked_frame_id_ < last_sent_frame_id_) { |
| std::vector<media::cast::FrameId> frames_to_cancel; |
| do { |
| ++latest_acked_frame_id_; |
| frames_to_cancel.push_back(latest_acked_frame_id_); |
| } while (latest_acked_frame_id_ < last_sent_frame_id_); |
| transport_->CancelSendingFrames(ssrc_, frames_to_cancel); |
| } |
| #endif |
| |
| // Flag that all pending input operations should discard data. |
| input_queue_discards_remaining_ = input_queue_.size(); |
| |
| flow_restart_pending_ = true; |
| VLOG(1) << "Now restarting because in-flight data was just canceled."; |
| } |
| |
| int RemotingSender::GetNumberOfFramesInEncoder() const { |
| NOTREACHED(); |
| return 0; |
| } |
| |
| base::TimeDelta RemotingSender::GetInFlightMediaDuration() const { |
| NOTREACHED(); |
| return base::TimeDelta(); |
| } |
| |
| void RemotingSender::OnCancelSendingFrames() { |
| // One or more frames were canceled. This may allow pending input operations |
| // to complete. |
| ProcessNextInputTask(); |
| } |
| |
| void RemotingSender::ProcessNextInputTask() { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| if (input_queue_.empty() || is_reading_) |
| return; |
| |
| input_queue_.front().Run(); |
| } |
| |
| void RemotingSender::ReadFrame(uint32_t size) { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| DCHECK(!is_reading_); |
| if (!data_pipe_reader_->IsPipeValid()) { |
| VLOG(1) << "Data pipe handle no longer valid."; |
| OnRemotingDataStreamError(); |
| return; |
| } |
| |
| is_reading_ = true; |
| if (input_queue_discards_remaining_ > 0) { |
| data_pipe_reader_->Read( |
| nullptr, size, |
| base::BindOnce(&RemotingSender::OnFrameRead, base::Unretained(this))); |
| } else { |
| next_frame_data_.resize(size); |
| data_pipe_reader_->Read( |
| reinterpret_cast<uint8_t*>(base::data(next_frame_data_)), size, |
| base::BindOnce(&RemotingSender::OnFrameRead, base::Unretained(this))); |
| } |
| } |
| |
| void RemotingSender::TrySendFrame() { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| DCHECK(!is_reading_); |
| if (input_queue_discards_remaining_ > 0) { |
| OnInputTaskComplete(); |
| return; |
| } |
| |
| // If there would be too many frames in-flight, do not proceed. |
| if (GetUnacknowledgedFrameCount() >= media::cast::kMaxUnackedFrames) { |
| VLOG(1) << "Cannot send frame now because too many frames are in flight."; |
| return; |
| } |
| |
| const bool is_first_frame_to_be_sent = last_send_time_.is_null(); |
| const media::cast::FrameId frame_id = is_first_frame_to_be_sent |
| ? media::cast::FrameId::first() |
| : (last_sent_frame_id_ + 1); |
| |
| base::TimeTicks last_frame_reference_time = last_send_time_; |
| auto remoting_frame = std::make_unique<media::cast::SenderEncodedFrame>(); |
| remoting_frame->frame_id = frame_id; |
| if (flow_restart_pending_) { |
| remoting_frame->dependency = media::cast::EncodedFrame::KEY; |
| flow_restart_pending_ = false; |
| } else { |
| DCHECK(!is_first_frame_to_be_sent); |
| remoting_frame->dependency = media::cast::EncodedFrame::DEPENDENT; |
| } |
| remoting_frame->referenced_frame_id = |
| remoting_frame->dependency == media::cast::EncodedFrame::KEY |
| ? frame_id |
| : frame_id - 1; |
| remoting_frame->reference_time = clock_->NowTicks(); |
| remoting_frame->encode_completion_time = remoting_frame->reference_time; |
| media::cast::RtpTimeTicks last_frame_rtp_timestamp; |
| if (is_first_frame_to_be_sent) { |
| last_frame_reference_time = remoting_frame->reference_time; |
| last_frame_rtp_timestamp = |
| media::cast::RtpTimeTicks() - media::cast::RtpTimeDelta::FromTicks(1); |
| } else { |
| last_frame_rtp_timestamp = GetRecordedRtpTimestamp(frame_id - 1); |
| } |
| // Ensure each successive frame's RTP timestamp is unique, but otherwise just |
| // base it on the reference time. |
| remoting_frame->rtp_timestamp = |
| last_frame_rtp_timestamp + |
| std::max(media::cast::RtpTimeDelta::FromTicks(1), |
| media::cast::RtpTimeDelta::FromTimeDelta( |
| remoting_frame->reference_time - last_frame_reference_time, |
| media::cast::kRemotingRtpTimebase)); |
| remoting_frame->data.swap(next_frame_data_); |
| |
| SendEncodedFrame(0, std::move(remoting_frame)); |
| |
| OnInputTaskComplete(); |
| } |
| |
| void RemotingSender::OnFrameRead(bool success) { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| DCHECK(is_reading_); |
| is_reading_ = false; |
| if (!success) { |
| OnRemotingDataStreamError(); |
| return; |
| } |
| OnInputTaskComplete(); |
| } |
| |
| void RemotingSender::OnInputTaskComplete() { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| DCHECK(!input_queue_.empty()); |
| input_queue_.pop(); |
| if (input_queue_discards_remaining_ > 0) |
| --input_queue_discards_remaining_; |
| |
| // Always force a post task to prevent the stack from growing too deep. |
| base::ThreadTaskRunnerHandle::Get()->PostTask( |
| FROM_HERE, base::BindOnce(&RemotingSender::ProcessNextInputTask, |
| weak_factory_.GetWeakPtr())); |
| } |
| |
| void RemotingSender::OnRemotingDataStreamError() { |
| data_pipe_reader_.reset(); |
| binding_.Close(); |
| if (!error_callback_.is_null()) |
| std::move(error_callback_).Run(); |
| } |
| |
| } // namespace mirroring |