blob: 9ba228f1e6d7ae6f8eb17bd7086ffeb250bacfc5 [file] [log] [blame]
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "media/mojo/common/mojo_decoder_buffer_converter.h"
#include <memory>
#include "base/logging.h"
#include "base/memory/ptr_util.h"
#include "base/single_thread_task_runner.h"
#include "base/threading/thread_task_runner_handle.h"
#include "media/base/audio_buffer.h"
#include "media/base/cdm_context.h"
#include "media/base/decoder_buffer.h"
#include "media/mojo/common/media_type_converters.h"
#include "media/mojo/common/mojo_pipe_read_write_util.h"
using media::mojo_pipe_read_write_util::IsPipeReadWriteError;
namespace media {
uint32_t GetDefaultDecoderBufferConverterCapacity(DemuxerStream::Type type) {
uint32_t capacity = 0;
if (type == DemuxerStream::AUDIO) {
// TODO(timav): Consider capacity calculation based on AudioDecoderConfig.
capacity = 512 * 1024;
} else if (type == DemuxerStream::VIDEO) {
// Video can get quite large; at 4K, VP9 delivers packets which are ~1MB in
// size; so allow for some head room.
// TODO(xhwang, sandersd): Provide a better way to customize this value.
capacity = 2 * (1024 * 1024);
} else {
NOTREACHED() << "Unsupported type: " << type;
// Choose an arbitrary size.
capacity = 512 * 1024;
}
return capacity;
}
// MojoDecoderBufferReader
// static
std::unique_ptr<MojoDecoderBufferReader> MojoDecoderBufferReader::Create(
uint32_t capacity,
mojo::ScopedDataPipeProducerHandle* producer_handle) {
DVLOG(1) << __func__;
DCHECK_GT(capacity, 0u);
auto data_pipe = std::make_unique<mojo::DataPipe>(capacity);
*producer_handle = std::move(data_pipe->producer_handle);
return std::make_unique<MojoDecoderBufferReader>(
std::move(data_pipe->consumer_handle));
}
MojoDecoderBufferReader::MojoDecoderBufferReader(
mojo::ScopedDataPipeConsumerHandle consumer_handle)
: consumer_handle_(std::move(consumer_handle)),
pipe_watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL,
base::SequencedTaskRunnerHandle::Get()),
armed_(false),
bytes_read_(0) {
DVLOG(1) << __func__;
MojoResult result =
pipe_watcher_.Watch(consumer_handle_.get(), MOJO_HANDLE_SIGNAL_READABLE,
MOJO_WATCH_CONDITION_SATISFIED,
base::Bind(&MojoDecoderBufferReader::OnPipeReadable,
base::Unretained(this)));
if (result != MOJO_RESULT_OK) {
DVLOG(1) << __func__
<< ": Failed to start watching the pipe. result=" << result;
consumer_handle_.reset();
}
}
MojoDecoderBufferReader::~MojoDecoderBufferReader() {
DVLOG(1) << __func__;
CancelAllPendingReadCBs();
if (flush_cb_)
std::move(flush_cb_).Run();
}
void MojoDecoderBufferReader::CancelReadCB(ReadCB read_cb) {
DVLOG(1) << "Failed to read DecoderBuffer because the pipe is already closed";
std::move(read_cb).Run(nullptr);
}
void MojoDecoderBufferReader::CancelAllPendingReadCBs() {
while (!pending_read_cbs_.empty()) {
ReadCB read_cb = std::move(pending_read_cbs_.front());
pending_read_cbs_.pop_front();
// TODO(sandersd): Make sure there are no possible re-entrancy issues
// here. Perhaps these should be posted, or merged into a single error
// callback?
CancelReadCB(std::move(read_cb));
}
}
void MojoDecoderBufferReader::CompleteCurrentRead() {
DVLOG(4) << __func__;
DCHECK(!pending_read_cbs_.empty());
DCHECK_EQ(pending_read_cbs_.size(), pending_buffers_.size());
ReadCB read_cb = std::move(pending_read_cbs_.front());
pending_read_cbs_.pop_front();
scoped_refptr<DecoderBuffer> buffer = std::move(pending_buffers_.front());
pending_buffers_.pop_front();
DCHECK(buffer->end_of_stream() || buffer->data_size() == bytes_read_);
bytes_read_ = 0;
std::move(read_cb).Run(std::move(buffer));
if (pending_read_cbs_.empty() && flush_cb_)
std::move(flush_cb_).Run();
}
void MojoDecoderBufferReader::ScheduleNextRead() {
DVLOG(4) << __func__;
DCHECK(!armed_);
DCHECK(!pending_buffers_.empty());
armed_ = true;
pipe_watcher_.ArmOrNotify();
}
// TODO(xhwang): Move this up to match declaration order.
void MojoDecoderBufferReader::ReadDecoderBuffer(
mojom::DecoderBufferPtr mojo_buffer,
ReadCB read_cb) {
DVLOG(3) << __func__;
DCHECK(!flush_cb_);
if (!consumer_handle_.is_valid()) {
DCHECK(pending_read_cbs_.empty());
CancelReadCB(std::move(read_cb));
return;
}
scoped_refptr<DecoderBuffer> media_buffer(
mojo_buffer.To<scoped_refptr<DecoderBuffer>>());
DCHECK(media_buffer);
// We don't want reads to complete out of order, so we queue them even if they
// are zero-sized.
pending_read_cbs_.push_back(std::move(read_cb));
pending_buffers_.push_back(std::move(media_buffer));
// Do nothing if a read is already scheduled.
if (armed_)
return;
// To reduce latency, always process pending reads immediately.
ProcessPendingReads();
}
void MojoDecoderBufferReader::Flush(base::OnceClosure flush_cb) {
DVLOG(2) << __func__;
DCHECK(!flush_cb_);
if (pending_read_cbs_.empty()) {
std::move(flush_cb).Run();
return;
}
flush_cb_ = std::move(flush_cb);
}
bool MojoDecoderBufferReader::HasPendingReads() const {
return !pending_read_cbs_.empty();
}
void MojoDecoderBufferReader::OnPipeReadable(
MojoResult result,
const mojo::HandleSignalsState& state) {
DVLOG(4) << __func__ << "(" << result << ", " << state.readable() << ")";
// |MOJO_RESULT_CANCELLED| may be dispatched even while the SimpleWatcher
// is disarmed, and no further notifications will be dispatched after that.
DCHECK(armed_ || result == MOJO_RESULT_CANCELLED);
armed_ = false;
if (result != MOJO_RESULT_OK) {
OnPipeError(result);
return;
}
DCHECK(state.readable());
ProcessPendingReads();
}
void MojoDecoderBufferReader::ProcessPendingReads() {
DVLOG(4) << __func__;
DCHECK(!armed_);
DCHECK(!pending_buffers_.empty());
while (!pending_buffers_.empty()) {
DecoderBuffer* buffer = pending_buffers_.front().get();
uint32_t buffer_size = 0u;
if (!pending_buffers_.front()->end_of_stream())
buffer_size = base::checked_cast<uint32_t>(buffer->data_size());
// Immediately complete empty reads.
// A non-EOS buffer can have zero size. See http://crbug.com/663438
if (buffer_size == 0) {
// TODO(sandersd): Make sure there are no possible re-entrancy issues
// here. Perhaps read callbacks should be posted?
CompleteCurrentRead();
continue;
}
// We may be starting to read a new buffer (|bytes_read_| == 0), or
// recovering from a previous partial read (|bytes_read_| > 0).
DCHECK_GT(buffer_size, bytes_read_);
uint32_t num_bytes = buffer_size - bytes_read_;
MojoResult result =
consumer_handle_->ReadData(buffer->writable_data() + bytes_read_,
&num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
if (IsPipeReadWriteError(result)) {
OnPipeError(result);
return;
}
if (result == MOJO_RESULT_SHOULD_WAIT) {
ScheduleNextRead();
return;
}
DCHECK_EQ(result, MOJO_RESULT_OK);
DVLOG(4) << __func__ << ": " << num_bytes << " bytes read.";
DCHECK_GT(num_bytes, 0u);
bytes_read_ += num_bytes;
// TODO(sandersd): Make sure there are no possible re-entrancy issues
// here.
if (bytes_read_ == buffer_size)
CompleteCurrentRead();
// Since we can still read, try to read more.
}
}
void MojoDecoderBufferReader::OnPipeError(MojoResult result) {
DVLOG(1) << __func__ << "(" << result << ")";
DCHECK(IsPipeReadWriteError(result));
consumer_handle_.reset();
if (!pending_buffers_.empty()) {
DVLOG(1) << __func__ << ": reading from data pipe failed. result=" << result
<< ", buffer size=" << pending_buffers_.front()->data_size()
<< ", num_bytes(read)=" << bytes_read_;
bytes_read_ = 0;
pending_buffers_.clear();
CancelAllPendingReadCBs();
}
}
// MojoDecoderBufferWriter
// static
std::unique_ptr<MojoDecoderBufferWriter> MojoDecoderBufferWriter::Create(
uint32_t capacity,
mojo::ScopedDataPipeConsumerHandle* consumer_handle) {
DVLOG(1) << __func__;
DCHECK_GT(capacity, 0u);
auto data_pipe = std::make_unique<mojo::DataPipe>(capacity);
*consumer_handle = std::move(data_pipe->consumer_handle);
return std::make_unique<MojoDecoderBufferWriter>(
std::move(data_pipe->producer_handle));
}
MojoDecoderBufferWriter::MojoDecoderBufferWriter(
mojo::ScopedDataPipeProducerHandle producer_handle)
: producer_handle_(std::move(producer_handle)),
pipe_watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL,
base::SequencedTaskRunnerHandle::Get()),
armed_(false),
bytes_written_(0) {
DVLOG(1) << __func__;
MojoResult result =
pipe_watcher_.Watch(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
MOJO_WATCH_CONDITION_SATISFIED,
base::Bind(&MojoDecoderBufferWriter::OnPipeWritable,
base::Unretained(this)));
if (result != MOJO_RESULT_OK) {
DVLOG(1) << __func__
<< ": Failed to start watching the pipe. result=" << result;
producer_handle_.reset();
}
}
MojoDecoderBufferWriter::~MojoDecoderBufferWriter() {
DVLOG(1) << __func__;
}
void MojoDecoderBufferWriter::ScheduleNextWrite() {
DVLOG(4) << __func__;
DCHECK(!armed_);
DCHECK(!pending_buffers_.empty());
armed_ = true;
pipe_watcher_.ArmOrNotify();
}
mojom::DecoderBufferPtr MojoDecoderBufferWriter::WriteDecoderBuffer(
scoped_refptr<DecoderBuffer> media_buffer) {
DVLOG(3) << __func__;
// DecoderBuffer cannot be written if the pipe is already closed.
if (!producer_handle_.is_valid()) {
DVLOG(1)
<< __func__
<< ": Failed to write DecoderBuffer because the pipe is already closed";
return nullptr;
}
mojom::DecoderBufferPtr mojo_buffer =
mojom::DecoderBuffer::From(*media_buffer);
// A non-EOS buffer can have zero size. See http://crbug.com/663438
if (media_buffer->end_of_stream() || media_buffer->data_size() == 0)
return mojo_buffer;
// Queue writing the buffer's data into our DataPipe.
pending_buffers_.push_back(std::move(media_buffer));
// Do nothing if a write is already scheduled. Otherwise, to reduce latency,
// always try to write data to the pipe first.
if (!armed_)
ProcessPendingWrites();
return mojo_buffer;
}
void MojoDecoderBufferWriter::OnPipeWritable(
MojoResult result,
const mojo::HandleSignalsState& state) {
DVLOG(4) << __func__ << "(" << result << ", " << state.writable() << ")";
// |MOJO_RESULT_CANCELLED| may be dispatched even while the SimpleWatcher
// is disarmed, and no further notifications will be dispatched after that.
DCHECK(armed_ || result == MOJO_RESULT_CANCELLED);
armed_ = false;
if (result != MOJO_RESULT_OK) {
OnPipeError(result);
return;
}
DCHECK(state.writable());
ProcessPendingWrites();
}
void MojoDecoderBufferWriter::ProcessPendingWrites() {
DVLOG(4) << __func__;
DCHECK(!armed_);
DCHECK(!pending_buffers_.empty());
while (!pending_buffers_.empty()) {
DecoderBuffer* buffer = pending_buffers_.front().get();
uint32_t buffer_size = base::checked_cast<uint32_t>(buffer->data_size());
DCHECK_GT(buffer_size, 0u) << "Unexpected EOS or empty buffer";
// We may be starting to write a new buffer (|bytes_written_| == 0), or
// recovering from a previous partial write (|bytes_written_| > 0).
uint32_t num_bytes = buffer_size - bytes_written_;
DCHECK_GT(num_bytes, 0u);
MojoResult result = producer_handle_->WriteData(
buffer->data() + bytes_written_, &num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
if (IsPipeReadWriteError(result)) {
OnPipeError(result);
return;
}
if (result == MOJO_RESULT_SHOULD_WAIT) {
ScheduleNextWrite();
return;
}
DCHECK_EQ(MOJO_RESULT_OK, result);
DVLOG(4) << __func__ << ": " << num_bytes << " bytes written.";
DCHECK_GT(num_bytes, 0u);
bytes_written_ += num_bytes;
if (bytes_written_ == buffer_size) {
pending_buffers_.pop_front();
bytes_written_ = 0;
}
// Since we can still write, try to write more.
}
}
void MojoDecoderBufferWriter::OnPipeError(MojoResult result) {
DVLOG(1) << __func__ << "(" << result << ")";
DCHECK(IsPipeReadWriteError(result));
producer_handle_.reset();
if (!pending_buffers_.empty()) {
DVLOG(1) << __func__ << ": writing to data pipe failed. result=" << result
<< ", buffer size=" << pending_buffers_.front()->data_size()
<< ", num_bytes(written)=" << bytes_written_;
pending_buffers_.clear();
bytes_written_ = 0;
}
}
} // namespace media