blob: bb5f5254da163d2dcb4bfe65d43aca602c10d917 [file] [log] [blame]
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "media/remoting/demuxer_stream_adapter.h"
#include "base/base64.h"
#include "base/bind.h"
#include "base/callback_helpers.h"
#include "base/single_thread_task_runner.h"
#include "media/base/bind_to_current_loop.h"
#include "media/base/decoder_buffer.h"
#include "media/base/timestamp_constants.h"
#include "media/remoting/proto_enum_utils.h"
#include "media/remoting/proto_utils.h"
// Convenience logging macro used throughout this file.
#define DEMUXER_VLOG(level) VLOG(level) << __func__ << "[" << name_ << "]: "
namespace media {
namespace remoting {
// static
mojo::DataPipe* DemuxerStreamAdapter::CreateDataPipe() {
// Capacity in bytes for Mojo data pipe.
constexpr int kMojoDataPipeCapacityInBytes = 512 * 1024;
return new mojo::DataPipe(kMojoDataPipeCapacityInBytes);
}
DemuxerStreamAdapter::DemuxerStreamAdapter(
scoped_refptr<base::SingleThreadTaskRunner> main_task_runner,
scoped_refptr<base::SingleThreadTaskRunner> media_task_runner,
const std::string& name,
DemuxerStream* demuxer_stream,
const base::WeakPtr<RpcBroker>& rpc_broker,
int rpc_handle,
mojom::RemotingDataStreamSenderPtrInfo stream_sender_info,
mojo::ScopedDataPipeProducerHandle producer_handle,
const ErrorCallback& error_callback)
: main_task_runner_(std::move(main_task_runner)),
media_task_runner_(std::move(media_task_runner)),
name_(name),
rpc_broker_(rpc_broker),
rpc_handle_(rpc_handle),
demuxer_stream_(demuxer_stream),
type_(demuxer_stream ? demuxer_stream->type() : DemuxerStream::UNKNOWN),
error_callback_(error_callback),
remote_callback_handle_(RpcBroker::kInvalidHandle),
read_until_callback_handle_(RpcBroker::kInvalidHandle),
read_until_count_(0),
last_count_(0),
pending_flush_(false),
current_pending_frame_offset_(0),
pending_frame_is_eos_(false),
write_watcher_(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::MANUAL),
media_status_(DemuxerStream::kOk),
producer_handle_(std::move(producer_handle)),
bytes_written_to_pipe_(0),
request_buffer_weak_factory_(this),
weak_factory_(this) {
DCHECK(main_task_runner_);
DCHECK(media_task_runner_);
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(demuxer_stream);
DCHECK(!error_callback.is_null());
const RpcBroker::ReceiveMessageCallback receive_callback =
BindToCurrentLoop(base::Bind(&DemuxerStreamAdapter::OnReceivedRpc,
weak_factory_.GetWeakPtr()));
main_task_runner_->PostTask(
FROM_HERE, base::Bind(&RpcBroker::RegisterMessageReceiverCallback,
rpc_broker_, rpc_handle_, receive_callback));
stream_sender_.Bind(std::move(stream_sender_info));
stream_sender_.set_connection_error_handler(
base::Bind(&DemuxerStreamAdapter::OnFatalError,
weak_factory_.GetWeakPtr(), MOJO_PIPE_ERROR));
}
DemuxerStreamAdapter::~DemuxerStreamAdapter() {
DCHECK(media_task_runner_->BelongsToCurrentThread());
main_task_runner_->PostTask(
FROM_HERE, base::Bind(&RpcBroker::UnregisterMessageReceiverCallback,
rpc_broker_, rpc_handle_));
}
int64_t DemuxerStreamAdapter::GetBytesWrittenAndReset() {
DCHECK(media_task_runner_->BelongsToCurrentThread());
const int64_t current_count = bytes_written_to_pipe_;
bytes_written_to_pipe_ = 0;
return current_count;
}
base::Optional<uint32_t> DemuxerStreamAdapter::SignalFlush(bool flushing) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DEMUXER_VLOG(2) << "flushing=" << flushing;
// Ignores if |pending_flush_| states is same.
if (pending_flush_ == flushing)
return base::nullopt;
// Cleans up pending frame data.
pending_frame_.clear();
current_pending_frame_offset_ = 0;
pending_frame_is_eos_ = false;
// Invalidates pending Read() tasks.
request_buffer_weak_factory_.InvalidateWeakPtrs();
// Cancels in flight data in browser process.
pending_flush_ = flushing;
if (flushing) {
stream_sender_->CancelInFlightData();
} else {
// Sets callback handle invalid to abort ongoing read request.
read_until_callback_handle_ = RpcBroker::kInvalidHandle;
}
return last_count_;
}
void DemuxerStreamAdapter::OnReceivedRpc(
std::unique_ptr<pb::RpcMessage> message) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(message);
DCHECK(rpc_handle_ == message->handle());
switch (message->proc()) {
case pb::RpcMessage::RPC_DS_INITIALIZE:
Initialize(message->integer_value());
break;
case pb::RpcMessage::RPC_DS_READUNTIL:
ReadUntil(std::move(message));
break;
case pb::RpcMessage::RPC_DS_ENABLEBITSTREAMCONVERTER:
EnableBitstreamConverter();
break;
default:
DEMUXER_VLOG(1) << "Unknown RPC: " << message->proc();
}
}
void DemuxerStreamAdapter::Initialize(int remote_callback_handle) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(!pending_flush_);
DEMUXER_VLOG(2) << "Received RPC_DS_INITIALIZE with remote_callback_handle="
<< remote_callback_handle;
// Checks if initialization had been called or not.
if (remote_callback_handle_ != RpcBroker::kInvalidHandle) {
DEMUXER_VLOG(1) << "Duplicated initialization. Have: "
<< remote_callback_handle_
<< ", Given: " << remote_callback_handle;
// Shuts down data pipe if available if providing different remote callback
// handle for initialization. Otherwise, just silently ignore the duplicated
// request.
if (remote_callback_handle_ != remote_callback_handle) {
OnFatalError(PEERS_OUT_OF_SYNC);
}
return;
}
remote_callback_handle_ = remote_callback_handle;
// Issues RPC_DS_INITIALIZE_CALLBACK RPC message.
std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
rpc->set_handle(remote_callback_handle_);
rpc->set_proc(pb::RpcMessage::RPC_DS_INITIALIZE_CALLBACK);
auto* init_cb_message = rpc->mutable_demuxerstream_initializecb_rpc();
init_cb_message->set_type(type_);
switch (type_) {
case DemuxerStream::Type::AUDIO: {
audio_config_ = demuxer_stream_->audio_decoder_config();
pb::AudioDecoderConfig* audio_message =
init_cb_message->mutable_audio_decoder_config();
ConvertAudioDecoderConfigToProto(audio_config_, audio_message);
break;
}
case DemuxerStream::Type::VIDEO: {
video_config_ = demuxer_stream_->video_decoder_config();
pb::VideoDecoderConfig* video_message =
init_cb_message->mutable_video_decoder_config();
ConvertVideoDecoderConfigToProto(video_config_, video_message);
break;
}
default:
NOTREACHED();
}
DEMUXER_VLOG(2) << "Sending RPC_DS_INITIALIZE_CALLBACK to " << rpc->handle()
<< " with decoder_config={"
<< (type_ == DemuxerStream::Type::AUDIO
? audio_config_.AsHumanReadableString()
: video_config_.AsHumanReadableString())
<< '}';
main_task_runner_->PostTask(
FROM_HERE, base::Bind(&RpcBroker::SendMessageToRemote, rpc_broker_,
base::Passed(&rpc)));
// Starts Mojo watcher.
if (!write_watcher_.IsWatching()) {
DEMUXER_VLOG(2) << "Start Mojo data pipe watcher";
write_watcher_.Watch(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
base::Bind(&DemuxerStreamAdapter::TryWriteData,
weak_factory_.GetWeakPtr()));
write_watcher_.ArmOrNotify();
}
}
void DemuxerStreamAdapter::ReadUntil(std::unique_ptr<pb::RpcMessage> message) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(message);
if (!message->has_demuxerstream_readuntil_rpc()) {
DEMUXER_VLOG(1) << "Missing required DemuxerStreamReadUntil struct in RPC";
OnFatalError(RPC_INVALID);
return;
}
const pb::DemuxerStreamReadUntil& rpc_message =
message->demuxerstream_readuntil_rpc();
DEMUXER_VLOG(2) << "Received RPC_DS_READUNTIL with callback_handle="
<< rpc_message.callback_handle()
<< ", count=" << rpc_message.count();
if (pending_flush_) {
DEMUXER_VLOG(2) << "Skip actions since it's in the flushing state";
return;
}
if (IsProcessingReadRequest()) {
DEMUXER_VLOG(2) << "Ignore read request while it's in the reading state.";
return;
}
if (rpc_message.count() <= last_count_) {
DEMUXER_VLOG(1) << "Request count shouldn't be smaller than or equal to "
"current frame count";
return;
}
read_until_count_ = rpc_message.count();
read_until_callback_handle_ = rpc_message.callback_handle();
RequestBuffer();
}
void DemuxerStreamAdapter::EnableBitstreamConverter() {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DEMUXER_VLOG(2) << "Received RPC_DS_ENABLEBITSTREAMCONVERTER";
demuxer_stream_->EnableBitstreamConverter();
}
void DemuxerStreamAdapter::RequestBuffer() {
DCHECK(media_task_runner_->BelongsToCurrentThread());
if (!IsProcessingReadRequest() || pending_flush_) {
DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state";
return;
}
demuxer_stream_->Read(base::Bind(&DemuxerStreamAdapter::OnNewBuffer,
request_buffer_weak_factory_.GetWeakPtr()));
}
void DemuxerStreamAdapter::OnNewBuffer(
DemuxerStream::Status status,
const scoped_refptr<DecoderBuffer>& input) {
DEMUXER_VLOG(3) << "status=" << status;
DCHECK(media_task_runner_->BelongsToCurrentThread());
if (!IsProcessingReadRequest() || pending_flush_) {
DEMUXER_VLOG(2) << "Skip actions since it's not in the reading state";
return;
}
switch (status) {
case DemuxerStream::kAborted:
DCHECK(!input);
SendReadAck();
return;
case DemuxerStream::kConfigChanged:
// TODO(erickung): Notify controller of new decoder config, just in case
// that will require remoting to be shutdown (due to known
// lack-of-support).
// Stores available audio/video decoder config and issues
// RPC_DS_READUNTIL_CALLBACK RPC to notify receiver.
DCHECK(!input);
media_status_ = status;
if (demuxer_stream_->type() == DemuxerStream::VIDEO)
video_config_ = demuxer_stream_->video_decoder_config();
if (demuxer_stream_->type() == DemuxerStream::AUDIO)
audio_config_ = demuxer_stream_->audio_decoder_config();
SendReadAck();
return;
case DemuxerStream::kOk: {
media_status_ = status;
DCHECK(pending_frame_.empty());
if (!producer_handle_.is_valid())
return; // Do not start sending (due to previous fatal error).
pending_frame_ = DecoderBufferToByteArray(*input);
pending_frame_is_eos_ = input->end_of_stream();
TryWriteData(MOJO_RESULT_OK);
} break;
}
}
void DemuxerStreamAdapter::TryWriteData(MojoResult result) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
// The Mojo watcher will also call TryWriteData() sometimes as a notification
// that data pipe is ready. But that does not necessarily mean the data for a
// read request is ready to be written into the pipe.
if (!IsProcessingReadRequest() || pending_flush_) {
DEMUXER_VLOG(3) << "Skip actions since it's not in the reading state";
return;
}
if (pending_frame_.empty()) {
DEMUXER_VLOG(3) << "No data available, waiting for demuxer";
return;
}
if (!stream_sender_ || !producer_handle_.is_valid()) {
DEMUXER_VLOG(1) << "Ignore since data pipe stream sender is invalid";
return;
}
uint32_t num_bytes = pending_frame_.size() - current_pending_frame_offset_;
MojoResult mojo_result =
WriteDataRaw(producer_handle_.get(),
pending_frame_.data() + current_pending_frame_offset_,
&num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
if (mojo_result != MOJO_RESULT_OK && mojo_result != MOJO_RESULT_SHOULD_WAIT) {
DEMUXER_VLOG(1) << "Pipe was closed unexpectedly (or a bug). result:"
<< mojo_result;
OnFatalError(MOJO_PIPE_ERROR);
return;
}
write_watcher_.ArmOrNotify();
if (mojo_result != MOJO_RESULT_OK)
return;
stream_sender_->ConsumeDataChunk(current_pending_frame_offset_, num_bytes,
pending_frame_.size());
current_pending_frame_offset_ += num_bytes;
bytes_written_to_pipe_ += num_bytes;
// Checks if all buffer was written to browser process.
if (current_pending_frame_offset_ != pending_frame_.size()) {
// Returns and wait for mojo watcher to notify to write more data.
return;
}
// Signal mojo remoting service that all frame buffer is written to data pipe.
stream_sender_->SendFrame();
// Resets frame buffer variables.
bool pending_frame_is_eos = pending_frame_is_eos_;
++last_count_;
ResetPendingFrame();
// Checks if it needs to send RPC_DS_READUNTIL_CALLBACK RPC message.
if (read_until_count_ == last_count_ || pending_frame_is_eos) {
SendReadAck();
return;
}
// Contiune to read decoder buffer until reaching |read_until_count_| or
// end of stream.
media_task_runner_->PostTask(FROM_HERE,
base::Bind(&DemuxerStreamAdapter::RequestBuffer,
weak_factory_.GetWeakPtr()));
}
void DemuxerStreamAdapter::SendReadAck() {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DEMUXER_VLOG(3) << "last_count_=" << last_count_
<< ", remote_read_callback_handle="
<< read_until_callback_handle_
<< ", media_status=" << media_status_;
// Issues RPC_DS_READUNTIL_CALLBACK RPC message.
std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
rpc->set_handle(read_until_callback_handle_);
rpc->set_proc(pb::RpcMessage::RPC_DS_READUNTIL_CALLBACK);
auto* message = rpc->mutable_demuxerstream_readuntilcb_rpc();
message->set_count(last_count_);
message->set_status(ToProtoDemuxerStreamStatus(media_status_).value());
if (media_status_ == DemuxerStream::kConfigChanged) {
if (audio_config_.IsValidConfig()) {
pb::AudioDecoderConfig* audio_message =
message->mutable_audio_decoder_config();
ConvertAudioDecoderConfigToProto(audio_config_, audio_message);
} else if (video_config_.IsValidConfig()) {
pb::VideoDecoderConfig* video_message =
message->mutable_video_decoder_config();
ConvertVideoDecoderConfigToProto(video_config_, video_message);
} else {
NOTREACHED();
}
}
DEMUXER_VLOG(2) << "Sending RPC_DS_READUNTIL_CALLBACK to " << rpc->handle()
<< " with count=" << message->count()
<< ", status=" << message->status() << ", decoder_config={"
<< (audio_config_.IsValidConfig()
? audio_config_.AsHumanReadableString()
: video_config_.IsValidConfig()
? video_config_.AsHumanReadableString()
: "DID NOT CHANGE")
<< '}';
main_task_runner_->PostTask(
FROM_HERE, base::Bind(&RpcBroker::SendMessageToRemote, rpc_broker_,
base::Passed(&rpc)));
// Resets callback handle after completing the reading request.
read_until_callback_handle_ = RpcBroker::kInvalidHandle;
// Resets audio/video decoder config since it only sends once.
if (audio_config_.IsValidConfig())
audio_config_ = AudioDecoderConfig();
if (video_config_.IsValidConfig())
video_config_ = VideoDecoderConfig();
}
void DemuxerStreamAdapter::ResetPendingFrame() {
DCHECK(media_task_runner_->BelongsToCurrentThread());
current_pending_frame_offset_ = 0;
pending_frame_.clear();
pending_frame_is_eos_ = false;
}
void DemuxerStreamAdapter::OnFatalError(StopTrigger stop_trigger) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DEMUXER_VLOG(1) << __func__ << " with StopTrigger " << stop_trigger;
if (error_callback_.is_null())
return;
if (write_watcher_.IsWatching()) {
DEMUXER_VLOG(2) << "Cancel mojo data pipe watcher";
write_watcher_.Cancel();
}
base::ResetAndReturn(&error_callback_).Run(stop_trigger);
}
} // namespace remoting
} // namespace media