| // Copyright 2020 The Chromium Authors |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "components/cast_streaming/browser/stream_consumer.h" |
| |
| #include <algorithm> |
| |
| #include "base/logging.h" |
| #include "base/task/sequenced_task_runner.h" |
| #include "components/cast_streaming/public/features.h" |
| #include "media/base/media_util.h" |
| #include "third_party/openscreen/src/platform/base/span.h" |
| |
| namespace cast_streaming { |
| |
| StreamConsumer::StreamConsumer(openscreen::cast::Receiver* receiver, |
| base::TimeDelta frame_duration, |
| mojo::ScopedDataPipeProducerHandle data_pipe, |
| FrameReceivedCB frame_received_cb, |
| base::RepeatingClosure on_new_frame) |
| : receiver_(receiver), |
| data_pipe_(std::move(data_pipe)), |
| frame_received_cb_(std::move(frame_received_cb)), |
| pipe_watcher_(FROM_HERE, |
| mojo::SimpleWatcher::ArmingPolicy::MANUAL, |
| base::SequencedTaskRunner::GetCurrentDefault()), |
| frame_duration_(frame_duration), |
| on_new_frame_(std::move(on_new_frame)) { |
| DCHECK(receiver_); |
| receiver_->SetConsumer(this); |
| MojoResult result = |
| pipe_watcher_.Watch(data_pipe_.get(), MOJO_HANDLE_SIGNAL_WRITABLE, |
| base::BindRepeating(&StreamConsumer::OnPipeWritable, |
| base::Unretained(this))); |
| if (result != MOJO_RESULT_OK) { |
| CloseDataPipeOnError(); |
| return; |
| } |
| } |
| |
| StreamConsumer::StreamConsumer(StreamConsumer&& other, |
| openscreen::cast::Receiver* receiver, |
| mojo::ScopedDataPipeProducerHandle data_pipe) |
| : StreamConsumer(receiver, |
| other.frame_duration_, |
| std::move(data_pipe), |
| std::move(other.frame_received_cb_), |
| std::move(other.on_new_frame_)) { |
| if (other.is_read_pending_) { |
| ReadFrame(std::move(other.no_frames_available_cb_)); |
| } |
| } |
| |
| // NOTE: Do NOT call into |receiver_| methods here, as the object may no longer |
| // be valid at time of this object's destruction. |
| StreamConsumer::~StreamConsumer() = default; |
| |
| void StreamConsumer::ReadFrame(base::OnceClosure no_frames_available_cb) { |
| DCHECK(!is_read_pending_); |
| DCHECK(!no_frames_available_cb_); |
| is_read_pending_ = true; |
| no_frames_available_cb_ = std::move(no_frames_available_cb); |
| MaybeSendNextFrame(); |
| } |
| |
| void StreamConsumer::CloseDataPipeOnError() { |
| DLOG(WARNING) << "[ssrc:" << receiver_->ssrc() << "] Data pipe closed."; |
| pipe_watcher_.Cancel(); |
| data_pipe_.reset(); |
| } |
| |
| void StreamConsumer::OnPipeWritable(MojoResult result) { |
| DCHECK(data_pipe_); |
| |
| if (result != MOJO_RESULT_OK) { |
| CloseDataPipeOnError(); |
| return; |
| } |
| |
| uint32_t bytes_written = pending_buffer_remaining_bytes_; |
| result = data_pipe_->WriteData(pending_buffer_ + pending_buffer_offset_, |
| &bytes_written, MOJO_WRITE_DATA_FLAG_NONE); |
| if (result != MOJO_RESULT_OK) { |
| CloseDataPipeOnError(); |
| return; |
| } |
| |
| pending_buffer_offset_ += bytes_written; |
| pending_buffer_remaining_bytes_ -= bytes_written; |
| if (pending_buffer_remaining_bytes_ != 0) { |
| pipe_watcher_.ArmOrNotify(); |
| return; |
| } |
| |
| MaybeSendNextFrame(); |
| } |
| |
| void StreamConsumer::OnFramesReady(int next_frame_buffer_size) { |
| MaybeSendNextFrame(); |
| } |
| |
| void StreamConsumer::FlushUntil(uint32_t frame_id) { |
| skip_until_frame_id_ = frame_id; |
| if (is_read_pending_) { |
| is_read_pending_ = false; |
| no_frames_available_cb_.Reset(); |
| frame_received_cb_.Run(nullptr); |
| } |
| } |
| |
| void StreamConsumer::MaybeSendNextFrame() { |
| if (!is_read_pending_ || pending_buffer_remaining_bytes_ > 0) { |
| return; |
| } |
| |
| const int current_frame_buffer_size = receiver_->AdvanceToNextFrame(); |
| if (current_frame_buffer_size == openscreen::cast::Receiver::kNoFramesReady) { |
| if (no_frames_available_cb_) { |
| std::move(no_frames_available_cb_).Run(); |
| } |
| return; |
| } |
| |
| on_new_frame_.Run(); |
| |
| void* buffer = nullptr; |
| uint32_t buffer_size = current_frame_buffer_size; |
| uint32_t mojo_buffer_size = current_frame_buffer_size; |
| |
| if (buffer_size > kMaxFrameSize) { |
| LOG(ERROR) << "[ssrc:" << receiver_->ssrc() << "] " |
| << "Frame size too big: " << buffer_size; |
| CloseDataPipeOnError(); |
| return; |
| } |
| |
| openscreen::cast::EncodedFrame encoded_frame; |
| |
| // Write to temporary storage in case we need to drop this frame. |
| pending_buffer_offset_ = 0; |
| encoded_frame = receiver_->ConsumeNextFrame( |
| openscreen::ByteBuffer(&pending_buffer_[0], buffer_size)); |
| |
| // If the frame occurs before the id we want to flush until, drop it and try |
| // again. |
| // TODO(crbug.com/1412561): Move this logic to Openscreen. |
| if (encoded_frame.frame_id < |
| openscreen::cast::FrameId(int64_t{skip_until_frame_id_})) { |
| VLOG(1) << "Skipping Frame " << encoded_frame.frame_id; |
| MaybeSendNextFrame(); |
| return; |
| } |
| |
| skip_until_frame_id_ = 0; |
| no_frames_available_cb_.Reset(); |
| |
| pending_buffer_remaining_bytes_ = buffer_size; |
| MojoResult result = data_pipe_->BeginWriteData( |
| &buffer, &mojo_buffer_size, MOJO_BEGIN_WRITE_DATA_FLAG_NONE); |
| |
| if (result == MOJO_RESULT_SHOULD_WAIT) { |
| pipe_watcher_.ArmOrNotify(); |
| return; |
| } |
| |
| if (result != MOJO_RESULT_OK) { |
| CloseDataPipeOnError(); |
| return; |
| } |
| |
| // Write as much as we can to the |data_pipe_| buffer. |
| int bytes_written; |
| if (buffer_size <= mojo_buffer_size) { |
| memcpy(buffer, pending_buffer_, buffer_size); |
| pending_buffer_offset_ = buffer_size; |
| pending_buffer_remaining_bytes_ = 0; |
| bytes_written = buffer_size; |
| } else { |
| memcpy(buffer, pending_buffer_, mojo_buffer_size); |
| pending_buffer_offset_ = mojo_buffer_size; |
| pending_buffer_remaining_bytes_ = buffer_size - mojo_buffer_size; |
| bytes_written = mojo_buffer_size; |
| } |
| |
| result = data_pipe_->EndWriteData(bytes_written); |
| if (result != MOJO_RESULT_OK) { |
| CloseDataPipeOnError(); |
| return; |
| } |
| |
| const bool is_key_frame = |
| encoded_frame.dependency == |
| openscreen::cast::EncodedFrame::Dependency::kKeyFrame; |
| |
| base::TimeDelta playout_time = |
| base::Microseconds(encoded_frame.rtp_timestamp |
| .ToTimeSinceOrigin<std::chrono::microseconds>( |
| receiver_->rtp_timebase()) |
| .count()); |
| |
| // Some senders do not send an initial playout time of 0. To work around this, |
| // a playout offset is added here. This is only required on mirroring sessions |
| // - and will in fact break remoting where timestamps by design do NOT begin |
| // at zero. |
| if (!IsCastRemotingEnabled()) { |
| if (playout_offset_ == base::TimeDelta::Max()) { |
| playout_offset_ = playout_time; |
| } |
| playout_time -= playout_offset_; |
| } |
| |
| DVLOG(3) << "[ssrc:" << receiver_->ssrc() << "] " |
| << "Received new frame. Timestamp: " << playout_time |
| << ", is_key_frame: " << is_key_frame; |
| |
| is_read_pending_ = false; |
| frame_received_cb_.Run(media::mojom::DecoderBuffer::New( |
| playout_time /* timestamp */, frame_duration_, |
| false /* is_end_of_stream */, buffer_size, is_key_frame, |
| media::EmptyExtraData(), media::mojom::DecryptConfigPtr(), |
| base::TimeDelta() /* front_discard */, |
| base::TimeDelta() /* back_discard */ |
| )); |
| |
| if (pending_buffer_remaining_bytes_ != 0) { |
| pipe_watcher_.ArmOrNotify(); |
| } |
| } |
| |
| } // namespace cast_streaming |