blob: fe6bd433cdbca06558e98d37ac11e1c28bf7683c [file] [log] [blame]
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "media/remoting/courier_renderer.h"
#include <algorithm>
#include <limits>
#include <utility>
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/callback_helpers.h"
#include "base/memory/ptr_util.h"
#include "base/message_loop/message_loop.h"
#include "base/numerics/safe_math.h"
#include "base/threading/thread_task_runner_handle.h"
#include "base/time/default_tick_clock.h"
#include "base/time/time.h"
#include "media/base/bind_to_current_loop.h"
#include "media/base/buffering_state.h"
#include "media/base/media_resource.h"
#include "media/base/renderer_client.h"
#include "media/base/video_renderer_sink.h"
#include "media/remoting/demuxer_stream_adapter.h"
#include "media/remoting/proto_enum_utils.h"
#include "media/remoting/proto_utils.h"
#include "media/remoting/renderer_controller.h"
namespace media {
namespace remoting {
namespace {
// The moving time window to track the media time and statistics updates.
constexpr base::TimeDelta kTrackingWindow = base::TimeDelta::FromSeconds(5);
// The allowed delay for the remoting playback. When continuously exceeds this
// limit for |kPlaybackDelayCountThreshold| times, the user experience is likely
// poor and the controller is notified.
constexpr base::TimeDelta kMediaPlaybackDelayThreshold =
base::TimeDelta::FromMilliseconds(750);
constexpr int kPlaybackDelayCountThreshold = 10;
// The allowed percentage of the number of video frames dropped vs. the number
// of the video frames decoded. When exceeds this limit, the user experience is
// likely poor and the controller is notified.
constexpr int kMaxNumVideoFramesDroppedPercentage = 3;
// The time period to allow receiver get stable after playback rate change or
// Flush().
constexpr base::TimeDelta kStabilizationPeriod =
base::TimeDelta::FromSeconds(2);
// The amount of time between polling the DemuxerStreamAdapters to measure their
// data flow rates for metrics.
constexpr base::TimeDelta kDataFlowPollPeriod =
base::TimeDelta::FromSeconds(10);
} // namespace
CourierRenderer::CourierRenderer(
scoped_refptr<base::SingleThreadTaskRunner> media_task_runner,
const base::WeakPtr<RendererController>& controller,
VideoRendererSink* video_renderer_sink)
: state_(STATE_UNINITIALIZED),
main_task_runner_(base::ThreadTaskRunnerHandle::Get()),
media_task_runner_(std::move(media_task_runner)),
media_resource_(nullptr),
client_(nullptr),
controller_(controller),
rpc_broker_(controller_->GetRpcBroker()),
rpc_handle_(rpc_broker_->GetUniqueHandle()),
remote_renderer_handle_(RpcBroker::kInvalidHandle),
video_renderer_sink_(video_renderer_sink),
clock_(new base::DefaultTickClock()),
weak_factory_(this) {
VLOG(2) << __func__;
// Note: The constructor is running on the main thread, but will be destroyed
// on the media thread. Therefore, all weak pointers must be dereferenced on
// the media thread.
const RpcBroker::ReceiveMessageCallback receive_callback =
base::Bind(&CourierRenderer::OnMessageReceivedOnMainThread,
media_task_runner_, weak_factory_.GetWeakPtr());
rpc_broker_->RegisterMessageReceiverCallback(rpc_handle_, receive_callback);
}
CourierRenderer::~CourierRenderer() {
VLOG(2) << __func__;
DCHECK(media_task_runner_->BelongsToCurrentThread());
// Post task on main thread to unregister message receiver.
main_task_runner_->PostTask(
FROM_HERE, base::Bind(&RpcBroker::UnregisterMessageReceiverCallback,
rpc_broker_, rpc_handle_));
if (video_renderer_sink_) {
video_renderer_sink_->PaintSingleFrame(
VideoFrame::CreateBlackFrame(gfx::Size(1280, 720)));
}
}
void CourierRenderer::Initialize(MediaResource* media_resource,
RendererClient* client,
const PipelineStatusCB& init_cb) {
VLOG(2) << __func__;
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(media_resource);
DCHECK(client);
if (state_ != STATE_UNINITIALIZED) {
media_task_runner_->PostTask(
FROM_HERE, base::Bind(init_cb, PIPELINE_ERROR_INVALID_STATE));
return;
}
media_resource_ = media_resource;
client_ = client;
init_workflow_done_callback_ = init_cb;
state_ = STATE_CREATE_PIPE;
// TODO(servolk): Add support for multiple streams. For now use the first
// enabled audio and video streams to preserve the existing behavior.
::media::DemuxerStream* audio_demuxer_stream =
media_resource_->GetFirstStream(DemuxerStream::AUDIO);
::media::DemuxerStream* video_demuxer_stream =
media_resource_->GetFirstStream(DemuxerStream::VIDEO);
// Create audio mojo data pipe handles if audio is available.
std::unique_ptr<mojo::DataPipe> audio_data_pipe;
if (audio_demuxer_stream) {
audio_data_pipe = base::WrapUnique(DemuxerStreamAdapter::CreateDataPipe());
}
// Create video mojo data pipe handles if video is available.
std::unique_ptr<mojo::DataPipe> video_data_pipe;
if (video_demuxer_stream) {
video_data_pipe = base::WrapUnique(DemuxerStreamAdapter::CreateDataPipe());
}
// Establish remoting data pipe connection using main thread.
const SharedSession::DataPipeStartCallback data_pipe_callback =
base::Bind(&CourierRenderer::OnDataPipeCreatedOnMainThread,
media_task_runner_, weak_factory_.GetWeakPtr(), rpc_broker_);
main_task_runner_->PostTask(
FROM_HERE,
base::Bind(&RendererController::StartDataPipe, controller_,
base::Passed(&audio_data_pipe), base::Passed(&video_data_pipe),
data_pipe_callback));
}
void CourierRenderer::SetCdm(CdmContext* cdm_context,
const CdmAttachedCB& cdm_attached_cb) {
VLOG(2) << __func__ << " cdm_id:" << cdm_context->GetCdmId();
DCHECK(media_task_runner_->BelongsToCurrentThread());
// TODO(erickung): add implementation once Remote CDM implementation is done.
// Right now it returns callback immediately.
if (!cdm_attached_cb.is_null()) {
cdm_attached_cb.Run(false);
}
}
void CourierRenderer::Flush(const base::Closure& flush_cb) {
VLOG(2) << __func__;
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(flush_cb_.is_null());
if (state_ != STATE_PLAYING) {
DCHECK_EQ(state_, STATE_ERROR);
// In the error state, this renderer will be shut down shortly. To prevent
// breaking the pipeline impl, just run the done callback (interface
// requirement).
media_task_runner_->PostTask(FROM_HERE, flush_cb);
return;
}
state_ = STATE_FLUSHING;
base::Optional<uint32_t> flush_audio_count;
if (audio_demuxer_stream_adapter_)
flush_audio_count = audio_demuxer_stream_adapter_->SignalFlush(true);
base::Optional<uint32_t> flush_video_count;
if (video_demuxer_stream_adapter_)
flush_video_count = video_demuxer_stream_adapter_->SignalFlush(true);
// Makes sure flush count is valid if stream is available or both audio and
// video agrees on the same flushing state.
if ((audio_demuxer_stream_adapter_ && !flush_audio_count.has_value()) ||
(video_demuxer_stream_adapter_ && !flush_video_count.has_value()) ||
(audio_demuxer_stream_adapter_ && video_demuxer_stream_adapter_ &&
flush_audio_count.has_value() != flush_video_count.has_value())) {
VLOG(1) << "Ignoring flush request while under flushing operation";
return;
}
flush_cb_ = flush_cb;
// Issues RPC_R_FLUSHUNTIL RPC message.
std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
rpc->set_handle(remote_renderer_handle_);
rpc->set_proc(pb::RpcMessage::RPC_R_FLUSHUNTIL);
pb::RendererFlushUntil* message = rpc->mutable_renderer_flushuntil_rpc();
if (flush_audio_count.has_value())
message->set_audio_count(*flush_audio_count);
if (flush_video_count.has_value())
message->set_video_count(*flush_video_count);
message->set_callback_handle(rpc_handle_);
VLOG(2) << __func__ << ": Sending RPC_R_FLUSHUNTIL to " << rpc->handle()
<< " with audio_count=" << message->audio_count()
<< ", video_count=" << message->video_count()
<< ", callback_handle=" << message->callback_handle();
SendRpcToRemote(std::move(rpc));
}
void CourierRenderer::StartPlayingFrom(base::TimeDelta time) {
VLOG(2) << __func__ << ": " << time.InMicroseconds();
DCHECK(media_task_runner_->BelongsToCurrentThread());
if (state_ != STATE_PLAYING) {
DCHECK_EQ(state_, STATE_ERROR);
return;
}
// Issues RPC_R_STARTPLAYINGFROM RPC message.
std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
rpc->set_handle(remote_renderer_handle_);
rpc->set_proc(pb::RpcMessage::RPC_R_STARTPLAYINGFROM);
rpc->set_integer64_value(time.InMicroseconds());
VLOG(2) << __func__ << ": Sending RPC_R_STARTPLAYINGFROM to " << rpc->handle()
<< " with time_usec=" << rpc->integer64_value();
SendRpcToRemote(std::move(rpc));
{
base::AutoLock auto_lock(time_lock_);
current_media_time_ = time;
}
ResetMeasurements();
}
void CourierRenderer::SetPlaybackRate(double playback_rate) {
VLOG(2) << __func__ << ": " << playback_rate;
DCHECK(media_task_runner_->BelongsToCurrentThread());
if (state_ != STATE_FLUSHING && state_ != STATE_PLAYING) {
DCHECK_EQ(state_, STATE_ERROR);
return;
}
// Issues RPC_R_SETPLAYBACKRATE RPC message.
std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
rpc->set_handle(remote_renderer_handle_);
rpc->set_proc(pb::RpcMessage::RPC_R_SETPLAYBACKRATE);
rpc->set_double_value(playback_rate);
VLOG(2) << __func__ << ": Sending RPC_R_SETPLAYBACKRATE to " << rpc->handle()
<< " with rate=" << rpc->double_value();
SendRpcToRemote(std::move(rpc));
playback_rate_ = playback_rate;
ResetMeasurements();
}
void CourierRenderer::SetVolume(float volume) {
VLOG(2) << __func__ << ": " << volume;
DCHECK(media_task_runner_->BelongsToCurrentThread());
if (state_ != STATE_FLUSHING && state_ != STATE_PLAYING) {
DCHECK_EQ(state_, STATE_ERROR);
return;
}
// Issues RPC_R_SETVOLUME RPC message.
std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
rpc->set_handle(remote_renderer_handle_);
rpc->set_proc(pb::RpcMessage::RPC_R_SETVOLUME);
rpc->set_double_value(volume);
VLOG(2) << __func__ << ": Sending RPC_R_SETVOLUME to " << rpc->handle()
<< " with volume=" << rpc->double_value();
SendRpcToRemote(std::move(rpc));
}
base::TimeDelta CourierRenderer::GetMediaTime() {
// No BelongsToCurrentThread() checking because this can be called from other
// threads.
// TODO(erickung): Interpolate current media time using local system time.
// Current receiver is to update |current_media_time_| every 250ms. But it
// needs to lower the update frequency in order to reduce network usage. Hence
// the interpolation is needed after receiver implementation is changed.
base::AutoLock auto_lock(time_lock_);
return current_media_time_;
}
// static
void CourierRenderer::OnDataPipeCreatedOnMainThread(
scoped_refptr<base::SingleThreadTaskRunner> media_task_runner,
base::WeakPtr<CourierRenderer> self,
base::WeakPtr<RpcBroker> rpc_broker,
mojom::RemotingDataStreamSenderPtrInfo audio,
mojom::RemotingDataStreamSenderPtrInfo video,
mojo::ScopedDataPipeProducerHandle audio_handle,
mojo::ScopedDataPipeProducerHandle video_handle) {
media_task_runner->PostTask(
FROM_HERE,
base::Bind(&CourierRenderer::OnDataPipeCreated, self,
base::Passed(&audio), base::Passed(&video),
base::Passed(&audio_handle), base::Passed(&video_handle),
rpc_broker ? rpc_broker->GetUniqueHandle()
: RpcBroker::kInvalidHandle,
rpc_broker ? rpc_broker->GetUniqueHandle()
: RpcBroker::kInvalidHandle));
}
void CourierRenderer::OnDataPipeCreated(
mojom::RemotingDataStreamSenderPtrInfo audio,
mojom::RemotingDataStreamSenderPtrInfo video,
mojo::ScopedDataPipeProducerHandle audio_handle,
mojo::ScopedDataPipeProducerHandle video_handle,
int audio_rpc_handle,
int video_rpc_handle) {
VLOG(2) << __func__;
DCHECK(media_task_runner_->BelongsToCurrentThread());
if (state_ == STATE_ERROR)
return; // Abort because something went wrong in the meantime.
DCHECK_EQ(state_, STATE_CREATE_PIPE);
DCHECK(!init_workflow_done_callback_.is_null());
// TODO(servolk): Add support for multiple streams. For now use the first
// enabled audio and video streams to preserve the existing behavior.
::media::DemuxerStream* audio_demuxer_stream =
media_resource_->GetFirstStream(DemuxerStream::AUDIO);
::media::DemuxerStream* video_demuxer_stream =
media_resource_->GetFirstStream(DemuxerStream::VIDEO);
// Create audio demuxer stream adapter if audio is available.
if (audio_demuxer_stream && audio.is_valid() && audio_handle.is_valid() &&
audio_rpc_handle != RpcBroker::kInvalidHandle) {
VLOG(2) << "Initialize audio";
audio_demuxer_stream_adapter_.reset(new DemuxerStreamAdapter(
main_task_runner_, media_task_runner_, "audio", audio_demuxer_stream,
rpc_broker_, audio_rpc_handle, std::move(audio),
std::move(audio_handle),
base::Bind(&CourierRenderer::OnFatalError, base::Unretained(this))));
}
// Create video demuxer stream adapter if video is available.
if (video_demuxer_stream && video.is_valid() && video_handle.is_valid() &&
video_rpc_handle != RpcBroker::kInvalidHandle) {
VLOG(2) << "Initialize video";
video_demuxer_stream_adapter_.reset(new DemuxerStreamAdapter(
main_task_runner_, media_task_runner_, "video", video_demuxer_stream,
rpc_broker_, video_rpc_handle, std::move(video),
std::move(video_handle),
base::Bind(&CourierRenderer::OnFatalError, base::Unretained(this))));
}
// Checks if data pipe is created successfully.
if (!audio_demuxer_stream_adapter_ && !video_demuxer_stream_adapter_) {
OnFatalError(DATA_PIPE_CREATE_ERROR);
return;
}
state_ = STATE_ACQUIRING;
// Issues RPC_ACQUIRE_RENDERER RPC message.
std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
rpc->set_handle(RpcBroker::kAcquireHandle);
rpc->set_proc(pb::RpcMessage::RPC_ACQUIRE_RENDERER);
rpc->set_integer_value(rpc_handle_);
VLOG(2) << __func__ << ": Sending RPC_ACQUIRE_RENDERER to " << rpc->handle()
<< " with rpc_handle=" << rpc->integer_value();
SendRpcToRemote(std::move(rpc));
}
// static
void CourierRenderer::OnMessageReceivedOnMainThread(
scoped_refptr<base::SingleThreadTaskRunner> media_task_runner,
base::WeakPtr<CourierRenderer> self,
std::unique_ptr<pb::RpcMessage> message) {
media_task_runner->PostTask(FROM_HERE,
base::Bind(&CourierRenderer::OnReceivedRpc, self,
base::Passed(&message)));
}
void CourierRenderer::OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(message);
switch (message->proc()) {
case pb::RpcMessage::RPC_ACQUIRE_RENDERER_DONE:
AcquireRendererDone(std::move(message));
break;
case pb::RpcMessage::RPC_R_INITIALIZE_CALLBACK:
InitializeCallback(std::move(message));
break;
case pb::RpcMessage::RPC_R_FLUSHUNTIL_CALLBACK:
FlushUntilCallback();
break;
case pb::RpcMessage::RPC_R_SETCDM_CALLBACK:
SetCdmCallback(std::move(message));
break;
case pb::RpcMessage::RPC_RC_ONTIMEUPDATE:
OnTimeUpdate(std::move(message));
break;
case pb::RpcMessage::RPC_RC_ONBUFFERINGSTATECHANGE:
OnBufferingStateChange(std::move(message));
break;
case pb::RpcMessage::RPC_RC_ONENDED:
VLOG(2) << __func__ << ": Received RPC_RC_ONENDED.";
client_->OnEnded();
break;
case pb::RpcMessage::RPC_RC_ONERROR:
VLOG(2) << __func__ << ": Received RPC_RC_ONERROR.";
OnFatalError(RECEIVER_PIPELINE_ERROR);
break;
case pb::RpcMessage::RPC_RC_ONAUDIOCONFIGCHANGE:
OnAudioConfigChange(std::move(message));
break;
case pb::RpcMessage::RPC_RC_ONVIDEOCONFIGCHANGE:
OnVideoConfigChange(std::move(message));
break;
case pb::RpcMessage::RPC_RC_ONVIDEONATURALSIZECHANGE:
OnVideoNaturalSizeChange(std::move(message));
break;
case pb::RpcMessage::RPC_RC_ONVIDEOOPACITYCHANGE:
OnVideoOpacityChange(std::move(message));
break;
case pb::RpcMessage::RPC_RC_ONSTATISTICSUPDATE:
OnStatisticsUpdate(std::move(message));
break;
case pb::RpcMessage::RPC_RC_ONWAITINGFORDECRYPTIONKEY:
VLOG(2) << __func__ << ": Received RPC_RC_ONWAITINGFORDECRYPTIONKEY.";
client_->OnWaitingForDecryptionKey();
break;
case pb::RpcMessage::RPC_RC_ONDURATIONCHANGE:
OnDurationChange(std::move(message));
break;
default:
VLOG(1) << "Unknown RPC: " << message->proc();
}
}
void CourierRenderer::SendRpcToRemote(std::unique_ptr<pb::RpcMessage> message) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(main_task_runner_);
main_task_runner_->PostTask(
FROM_HERE, base::Bind(&RpcBroker::SendMessageToRemote, rpc_broker_,
base::Passed(&message)));
}
void CourierRenderer::AcquireRendererDone(
std::unique_ptr<pb::RpcMessage> message) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(message);
remote_renderer_handle_ = message->integer_value();
VLOG(2) << __func__
<< ": Received RPC_ACQUIRE_RENDERER_DONE with remote_renderer_handle="
<< remote_renderer_handle_;
if (state_ != STATE_ACQUIRING || init_workflow_done_callback_.is_null()) {
LOG(WARNING) << "Unexpected acquire renderer done RPC.";
OnFatalError(PEERS_OUT_OF_SYNC);
return;
}
state_ = STATE_INITIALIZING;
// Issues RPC_R_INITIALIZE RPC message to initialize renderer.
std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
rpc->set_handle(remote_renderer_handle_);
rpc->set_proc(pb::RpcMessage::RPC_R_INITIALIZE);
pb::RendererInitialize* init = rpc->mutable_renderer_initialize_rpc();
init->set_client_handle(rpc_handle_);
init->set_audio_demuxer_handle(
audio_demuxer_stream_adapter_
? audio_demuxer_stream_adapter_->rpc_handle()
: RpcBroker::kInvalidHandle);
init->set_video_demuxer_handle(
video_demuxer_stream_adapter_
? video_demuxer_stream_adapter_->rpc_handle()
: RpcBroker::kInvalidHandle);
init->set_callback_handle(rpc_handle_);
VLOG(2) << __func__ << ": Sending RPC_R_INITIALIZE to " << rpc->handle()
<< " with client_handle=" << init->client_handle()
<< ", audio_demuxer_handle=" << init->audio_demuxer_handle()
<< ", video_demuxer_handle=" << init->video_demuxer_handle()
<< ", callback_handle=" << init->callback_handle();
SendRpcToRemote(std::move(rpc));
}
void CourierRenderer::InitializeCallback(
std::unique_ptr<pb::RpcMessage> message) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(message);
const bool success = message->boolean_value();
VLOG(2) << __func__
<< ": Received RPC_R_INITIALIZE_CALLBACK with success=" << success;
if (state_ != STATE_INITIALIZING || init_workflow_done_callback_.is_null()) {
LOG(WARNING) << "Unexpected initialize callback RPC.";
OnFatalError(PEERS_OUT_OF_SYNC);
return;
}
if (!success) {
OnFatalError(RECEIVER_INITIALIZE_FAILED);
return;
}
metrics_recorder_.OnRendererInitialized();
state_ = STATE_PLAYING;
base::ResetAndReturn(&init_workflow_done_callback_).Run(PIPELINE_OK);
}
void CourierRenderer::FlushUntilCallback() {
DCHECK(media_task_runner_->BelongsToCurrentThread());
VLOG(2) << __func__ << ": Received RPC_R_FLUSHUNTIL_CALLBACK";
if (state_ != STATE_FLUSHING || flush_cb_.is_null()) {
LOG(WARNING) << "Unexpected flushuntil callback RPC.";
OnFatalError(PEERS_OUT_OF_SYNC);
return;
}
state_ = STATE_PLAYING;
if (audio_demuxer_stream_adapter_)
audio_demuxer_stream_adapter_->SignalFlush(false);
if (video_demuxer_stream_adapter_)
video_demuxer_stream_adapter_->SignalFlush(false);
base::ResetAndReturn(&flush_cb_).Run();
ResetMeasurements();
}
void CourierRenderer::SetCdmCallback(std::unique_ptr<pb::RpcMessage> message) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(message);
VLOG(2) << __func__ << ": Received RPC_R_SETCDM_CALLBACK with cdm_id="
<< message->renderer_set_cdm_rpc().cdm_id() << ", callback_handle="
<< message->renderer_set_cdm_rpc().callback_handle();
// TODO(erickung): add implementation once Remote CDM implementation is done.
NOTIMPLEMENTED();
}
void CourierRenderer::OnTimeUpdate(std::unique_ptr<pb::RpcMessage> message) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(message);
// Shutdown remoting session if receiving malformed RPC message.
if (!message->has_rendererclient_ontimeupdate_rpc()) {
VLOG(1) << __func__ << " missing required RPC message";
OnFatalError(RPC_INVALID);
return;
}
const int64_t time_usec =
message->rendererclient_ontimeupdate_rpc().time_usec();
const int64_t max_time_usec =
message->rendererclient_ontimeupdate_rpc().max_time_usec();
VLOG(2) << __func__
<< ": Received RPC_RC_ONTIMEUPDATE with time_usec=" << time_usec
<< ", max_time_usec=" << max_time_usec;
// Ignores invalid time, such as negative value, or time larger than max value
// (usually the time stamp that all streams are pushed into AV pipeline).
if (time_usec < 0 || max_time_usec < 0 || time_usec > max_time_usec)
return;
{
// Updates current time information.
base::AutoLock auto_lock(time_lock_);
current_media_time_ = base::TimeDelta::FromMicroseconds(time_usec);
current_max_time_ = base::TimeDelta::FromMicroseconds(max_time_usec);
}
metrics_recorder_.OnEvidenceOfPlayoutAtReceiver();
OnMediaTimeUpdated();
}
void CourierRenderer::OnBufferingStateChange(
std::unique_ptr<pb::RpcMessage> message) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(message);
if (!message->has_rendererclient_onbufferingstatechange_rpc()) {
VLOG(1) << __func__ << " missing required RPC message";
OnFatalError(RPC_INVALID);
return;
}
VLOG(2) << __func__ << ": Received RPC_RC_ONBUFFERINGSTATECHANGE with state="
<< message->rendererclient_onbufferingstatechange_rpc().state();
base::Optional<BufferingState> state = ToMediaBufferingState(
message->rendererclient_onbufferingstatechange_rpc().state());
if (!state.has_value())
return;
if (state == BufferingState::BUFFERING_HAVE_NOTHING) {
receiver_is_blocked_on_local_demuxers_ = IsWaitingForDataFromDemuxers();
} else if (receiver_is_blocked_on_local_demuxers_) {
receiver_is_blocked_on_local_demuxers_ = false;
ResetMeasurements();
}
client_->OnBufferingStateChange(state.value());
}
void CourierRenderer::OnAudioConfigChange(
std::unique_ptr<pb::RpcMessage> message) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(message);
// Shutdown remoting session if receiving malformed RPC message.
if (!message->has_rendererclient_onaudioconfigchange_rpc()) {
VLOG(1) << __func__ << " missing required RPC message";
OnFatalError(RPC_INVALID);
return;
}
const auto* audio_config_message =
message->mutable_rendererclient_onaudioconfigchange_rpc();
const pb::AudioDecoderConfig pb_audio_config =
audio_config_message->audio_decoder_config();
AudioDecoderConfig out_audio_config;
ConvertProtoToAudioDecoderConfig(pb_audio_config, &out_audio_config);
DCHECK(out_audio_config.IsValidConfig());
VLOG(2) << __func__ << ": Received RPC_RC_ONAUDIOCONFIGCHANGE with config:"
<< out_audio_config.AsHumanReadableString();
client_->OnAudioConfigChange(out_audio_config);
}
void CourierRenderer::OnVideoConfigChange(
std::unique_ptr<pb::RpcMessage> message) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(message);
// Shutdown remoting session if receiving malformed RPC message.
if (!message->has_rendererclient_onvideoconfigchange_rpc()) {
VLOG(1) << __func__ << " missing required RPC message";
OnFatalError(RPC_INVALID);
return;
}
const auto* video_config_message =
message->mutable_rendererclient_onvideoconfigchange_rpc();
const pb::VideoDecoderConfig pb_video_config =
video_config_message->video_decoder_config();
VideoDecoderConfig out_video_config;
ConvertProtoToVideoDecoderConfig(pb_video_config, &out_video_config);
DCHECK(out_video_config.IsValidConfig());
VLOG(2) << __func__ << ": Received RPC_RC_ONVIDEOCONFIGCHANGE with config:"
<< out_video_config.AsHumanReadableString();
client_->OnVideoConfigChange(out_video_config);
}
void CourierRenderer::OnVideoNaturalSizeChange(
std::unique_ptr<pb::RpcMessage> message) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(message);
// Shutdown remoting session if receiving malformed RPC message.
if (!message->has_rendererclient_onvideonatualsizechange_rpc()) {
VLOG(1) << __func__ << " missing required RPC message";
OnFatalError(RPC_INVALID);
return;
}
const auto& size_change =
message->rendererclient_onvideonatualsizechange_rpc();
VLOG(2) << __func__ << ": Received RPC_RC_ONVIDEONATURALSIZECHANGE with size="
<< size_change.width() << 'x' << size_change.height();
if (size_change.width() <= 0 || size_change.height() <= 0)
return;
client_->OnVideoNaturalSizeChange(
gfx::Size(size_change.width(), size_change.height()));
}
void CourierRenderer::OnVideoOpacityChange(
std::unique_ptr<pb::RpcMessage> message) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(message);
const bool opaque = message->boolean_value();
VLOG(2) << __func__
<< ": Received RPC_RC_ONVIDEOOPACITYCHANGE with opaque=" << opaque;
client_->OnVideoOpacityChange(opaque);
}
void CourierRenderer::OnStatisticsUpdate(
std::unique_ptr<pb::RpcMessage> message) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(message);
// Shutdown remoting session if receiving malformed RPC message.
if (!message->has_rendererclient_onstatisticsupdate_rpc()) {
VLOG(1) << __func__ << " missing required RPC message";
OnFatalError(RPC_INVALID);
return;
}
PipelineStatistics stats;
ConvertProtoToPipelineStatistics(
message->rendererclient_onstatisticsupdate_rpc(), &stats);
// Note: Each field in |stats| is a delta, not the aggregate amount.
VLOG(2) << __func__
<< ": Received RPC_RC_ONSTATISTICSUPDATE with audio_bytes_decoded="
<< stats.audio_bytes_decoded
<< ", video_bytes_decoded=" << stats.video_bytes_decoded
<< ", video_frames_decoded=" << stats.video_frames_decoded
<< ", video_frames_dropped=" << stats.video_frames_dropped
<< ", audio_memory_usage=" << stats.audio_memory_usage
<< ", video_memory_usage=" << stats.video_memory_usage;
if (stats.audio_bytes_decoded > 0 || stats.video_frames_decoded > 0 ||
stats.video_frames_dropped > 0) {
metrics_recorder_.OnEvidenceOfPlayoutAtReceiver();
}
UpdateVideoStatsQueue(stats.video_frames_decoded, stats.video_frames_dropped);
client_->OnStatisticsUpdate(stats);
}
void CourierRenderer::OnDurationChange(
std::unique_ptr<pb::RpcMessage> message) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(message);
VLOG(2) << __func__ << ": Received RPC_RC_ONDURATIONCHANGE with usec="
<< message->integer64_value();
if (message->integer64_value() < 0)
return;
client_->OnDurationChange(
base::TimeDelta::FromMicroseconds(message->integer64_value()));
}
void CourierRenderer::OnFatalError(StopTrigger stop_trigger) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK_NE(UNKNOWN_STOP_TRIGGER, stop_trigger);
VLOG(2) << __func__ << " with StopTrigger " << stop_trigger;
// If this is the first error, notify the controller. It is expected the
// controller will cause this renderer to shut down shortly.
if (state_ != STATE_ERROR) {
state_ = STATE_ERROR;
main_task_runner_->PostTask(
FROM_HERE, base::Bind(&RendererController::OnRendererFatalError,
controller_, stop_trigger));
}
data_flow_poll_timer_.Stop();
// This renderer will be shut down shortly. To prevent breaking the pipeline,
// just run the callback without reporting error.
if (!init_workflow_done_callback_.is_null()) {
base::ResetAndReturn(&init_workflow_done_callback_).Run(PIPELINE_OK);
return;
}
if (!flush_cb_.is_null())
base::ResetAndReturn(&flush_cb_).Run();
}
void CourierRenderer::OnMediaTimeUpdated() {
DCHECK(media_task_runner_->BelongsToCurrentThread());
if (!flush_cb_.is_null())
return; // Don't manage and check the queue when Flush() is on-going.
if (receiver_is_blocked_on_local_demuxers_)
return; // Don't manage and check the queue when buffering is on-going.
base::TimeTicks current_time = clock_->NowTicks();
if (current_time < ignore_updates_until_time_)
return; // Not stable yet.
media_time_queue_.push_back(
std::make_pair(current_time, current_media_time_));
base::TimeDelta window_duration =
current_time - media_time_queue_.front().first;
if (window_duration < kTrackingWindow)
return; // Not enough data to make a reliable decision.
base::TimeDelta media_duration =
media_time_queue_.back().second - media_time_queue_.front().second;
base::TimeDelta update_duration =
(media_time_queue_.back().first - media_time_queue_.front().first) *
playback_rate_;
if ((media_duration - update_duration).magnitude() >=
kMediaPlaybackDelayThreshold) {
VLOG(1) << "Irregular playback detected: Media playback delayed."
<< " media_duration = " << media_duration
<< " update_duration = " << update_duration;
++times_playback_delayed_;
if (times_playback_delayed_ == kPlaybackDelayCountThreshold)
OnFatalError(PACING_TOO_SLOWLY);
} else {
times_playback_delayed_ = 0;
}
// Prune |media_time_queue_|.
while (media_time_queue_.back().first - media_time_queue_.front().first >=
kTrackingWindow)
media_time_queue_.pop_front();
}
void CourierRenderer::UpdateVideoStatsQueue(int video_frames_decoded,
int video_frames_dropped) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
if (!flush_cb_.is_null())
return; // Don't manage and check the queue when Flush() is on-going.
if (!stats_updated_) {
if (video_frames_decoded)
stats_updated_ = true;
// Ignore the first stats since it may include the information during
// unstable period.
return;
}
base::TimeTicks current_time = clock_->NowTicks();
if (current_time < ignore_updates_until_time_)
return; // Not stable yet.
video_stats_queue_.push_back(std::make_tuple(
current_time, video_frames_decoded, video_frames_dropped));
sum_video_frames_decoded_ += video_frames_decoded;
sum_video_frames_dropped_ += video_frames_dropped;
base::TimeDelta window_duration =
current_time - std::get<0>(video_stats_queue_.front());
if (window_duration < kTrackingWindow)
return; // Not enough data to make a reliable decision.
if (sum_video_frames_decoded_ &&
sum_video_frames_dropped_ * 100 >
sum_video_frames_decoded_ * kMaxNumVideoFramesDroppedPercentage) {
VLOG(1) << "Irregular playback detected: Too many video frames dropped."
<< " video_frames_decoded= " << sum_video_frames_decoded_
<< " video_frames_dropped= " << sum_video_frames_dropped_;
OnFatalError(FRAME_DROP_RATE_HIGH);
}
// Prune |video_stats_queue_|.
while (std::get<0>(video_stats_queue_.back()) -
std::get<0>(video_stats_queue_.front()) >=
kTrackingWindow) {
sum_video_frames_decoded_ -= std::get<1>(video_stats_queue_.front());
sum_video_frames_dropped_ -= std::get<2>(video_stats_queue_.front());
video_stats_queue_.pop_front();
}
}
void CourierRenderer::ResetMeasurements() {
DCHECK(media_task_runner_->BelongsToCurrentThread());
media_time_queue_.clear();
video_stats_queue_.clear();
sum_video_frames_dropped_ = 0;
sum_video_frames_decoded_ = 0;
stats_updated_ = false;
ignore_updates_until_time_ = clock_->NowTicks() + kStabilizationPeriod;
if (state_ != STATE_ERROR &&
(audio_demuxer_stream_adapter_ || video_demuxer_stream_adapter_)) {
data_flow_poll_timer_.Start(FROM_HERE, kDataFlowPollPeriod, this,
&CourierRenderer::MeasureAndRecordDataRates);
}
}
void CourierRenderer::MeasureAndRecordDataRates() {
DCHECK(media_task_runner_->BelongsToCurrentThread());
// Whenever media is first started or flushed/seeked, there is a "burst
// bufferring" period as the remote device rapidly fills its buffer before
// resuming playback. Since the goal here is to measure the sustained content
// bitrates, ignore the byte counts the first time since the last
// ResetMeasurements() call.
const base::TimeTicks current_time = clock_->NowTicks();
if (current_time < ignore_updates_until_time_ + kDataFlowPollPeriod) {
if (audio_demuxer_stream_adapter_)
audio_demuxer_stream_adapter_->GetBytesWrittenAndReset();
if (video_demuxer_stream_adapter_)
video_demuxer_stream_adapter_->GetBytesWrittenAndReset();
return;
}
const int kBytesPerKilobit = 1024 / 8;
if (audio_demuxer_stream_adapter_) {
const double kilobits_per_second =
(audio_demuxer_stream_adapter_->GetBytesWrittenAndReset() /
kDataFlowPollPeriod.InSecondsF()) /
kBytesPerKilobit;
DCHECK_GE(kilobits_per_second, 0);
const base::CheckedNumeric<int> checked_kbps = kilobits_per_second;
metrics_recorder_.OnAudioRateEstimate(
checked_kbps.ValueOrDefault(std::numeric_limits<int>::max()));
}
if (video_demuxer_stream_adapter_) {
const double kilobits_per_second =
(video_demuxer_stream_adapter_->GetBytesWrittenAndReset() /
kDataFlowPollPeriod.InSecondsF()) /
kBytesPerKilobit;
DCHECK_GE(kilobits_per_second, 0);
const base::CheckedNumeric<int> checked_kbps = kilobits_per_second;
metrics_recorder_.OnVideoRateEstimate(
checked_kbps.ValueOrDefault(std::numeric_limits<int>::max()));
}
}
bool CourierRenderer::IsWaitingForDataFromDemuxers() const {
DCHECK(media_task_runner_->BelongsToCurrentThread());
return ((video_demuxer_stream_adapter_ &&
video_demuxer_stream_adapter_->is_processing_read_request() &&
!video_demuxer_stream_adapter_->is_data_pending()) ||
(audio_demuxer_stream_adapter_ &&
audio_demuxer_stream_adapter_->is_processing_read_request() &&
!audio_demuxer_stream_adapter_->is_data_pending()));
}
} // namespace remoting
} // namespace media