blob: b36a9b8559490362e9cb63300a5fd44b3186b256 [file] [log] [blame]
// Copyright (c) 2009 The Chromium Authors. All rights reserved. Use of this
// source code is governed by a BSD-style license that can be found in the
// LICENSE file.
// A base class that provides the plumbing for a decoder filters.
#ifndef MEDIA_FILTERS_DECODER_BASE_H_
#define MEDIA_FILTERS_DECODER_BASE_H_
#include <deque>
#include "base/lock.h"
#include "base/stl_util-inl.h"
#include "base/task.h"
#include "base/thread.h"
#include "media/base/buffers.h"
#include "media/base/filters.h"
#include "media/base/filter_host.h"
namespace media {
template <class Decoder, class Output>
class DecoderBase : public Decoder {
public:
typedef CallbackRunner< Tuple1<Output*> > ReadCallback;
// MediaFilter implementation.
virtual void Stop() {
message_loop()->PostTask(FROM_HERE,
NewRunnableMethod(this, &DecoderBase::StopTask));
}
virtual void Seek(base::TimeDelta time) {
message_loop()->PostTask(FROM_HERE,
NewRunnableMethod(this, &DecoderBase::SeekTask, time));
}
// Decoder implementation.
virtual bool Initialize(DemuxerStream* demuxer_stream) {
message_loop()->PostTask(FROM_HERE,
NewRunnableMethod(this, &DecoderBase::InitializeTask, demuxer_stream));
return true;
}
virtual const MediaFormat& media_format() { return media_format_; }
// Audio or video decoder.
virtual void Read(ReadCallback* read_callback) {
message_loop()->PostTask(FROM_HERE,
NewRunnableMethod(this, &DecoderBase::ReadTask, read_callback));
}
void OnReadComplete(Buffer* buffer) {
// Little bit of magic here to get NewRunnableMethod() to generate a Task
// that holds onto a reference via scoped_refptr<>.
//
// TODO(scherkus): change the callback format to pass a scoped_refptr<> or
// better yet see if we can get away with not using reference counting.
scoped_refptr<Buffer> buffer_ref = buffer;
message_loop()->PostTask(FROM_HERE,
NewRunnableMethod(this, &DecoderBase::ReadCompleteTask, buffer_ref));
}
protected:
DecoderBase()
: pending_reads_(0),
seeking_(false),
state_(UNINITIALIZED),
thread_id_(NULL) {
}
virtual ~DecoderBase() {
DCHECK(state_ == UNINITIALIZED || state_ == STOPPED);
DCHECK(result_queue_.empty());
DCHECK(read_queue_.empty());
}
// This method is called by the derived class from within the OnDecode method.
// It places an output buffer in the result queue. It must be called from
// within the OnDecode method.
void EnqueueResult(Output* output) {
DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
if (!IsStopped()) {
result_queue_.push_back(output);
}
}
// Method that must be implemented by the derived class. Called from within
// the DecoderBase::Initialize() method before any reads are submitted to
// the demuxer stream. Returns true if successful, otherwise false indicates
// a fatal error. The derived class should NOT call the filter host's
// InitializationComplete() method. If this method returns true, then the
// base class will call the host to complete initialization. During this
// call, the derived class must fill in the media_format_ member.
virtual bool OnInitialize(DemuxerStream* demuxer_stream) = 0;
// Method that may be implemented by the derived class if desired. It will
// be called from within the MediaFilter::Stop() method prior to stopping the
// base class.
virtual void OnStop() {}
// Derived class can implement this method and perform seeking logic prior
// to the base class.
virtual void OnSeek(base::TimeDelta time) {}
// Method that must be implemented by the derived class. If the decode
// operation produces one or more outputs, the derived class should call
// the EnequeueResult() method from within this method.
virtual void OnDecode(Buffer* input) = 0;
// Used for subclasses who friend unit tests and need to set the thread id.
virtual void set_thread_id(PlatformThreadId thread_id) {
thread_id_ = thread_id;
}
MediaFormat media_format_;
private:
// GCC doesn't let us access superclass member variables directly, so use
// a helper to get around the situation.
//
// TODO(scherkus): another reason to add protected accessors to MediaFilter.
FilterHost* host() const { return Decoder::host_; }
MessageLoop* message_loop() const { return Decoder::message_loop_; }
bool IsStopped() { return state_ == STOPPED; }
void StopTask() {
DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
// Delegate to the subclass first.
OnStop();
// Throw away all buffers in all queues.
result_queue_.clear();
STLDeleteElements(&read_queue_);
state_ = STOPPED;
}
void SeekTask(base::TimeDelta time) {
DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
// Delegate to the subclass first.
OnSeek(time);
// Flush the result queue.
result_queue_.clear();
// Turn on the seeking flag so that we can discard buffers until a
// discontinuous buffer is received.
seeking_ = true;
}
void InitializeTask(DemuxerStream* demuxer_stream) {
DCHECK(state_ == UNINITIALIZED);
DCHECK(!demuxer_stream_);
DCHECK(!thread_id_ || thread_id_ == PlatformThread::CurrentId());
demuxer_stream_ = demuxer_stream;
// Grab the thread id for debugging.
thread_id_ = PlatformThread::CurrentId();
// Delegate to subclass first.
if (!OnInitialize(demuxer_stream_)) {
host()->Error(PIPELINE_ERROR_DECODE);
return;
}
// TODO(scherkus): subclass shouldn't mutate superclass media format.
DCHECK(!media_format_.empty()) << "Subclass did not set media_format_";
state_ = INITIALIZED;
host()->InitializationComplete();
}
void ReadTask(ReadCallback* read_callback) {
DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
// TODO(scherkus): should reply with a null operation (empty buffer).
if (IsStopped()) {
delete read_callback;
return;
}
// Enqueue the callback and attempt to fulfill it immediately.
read_queue_.push_back(read_callback);
FulfillPendingRead();
// Issue reads as necessary.
while (pending_reads_ < read_queue_.size()) {
demuxer_stream_->Read(NewCallback(this, &DecoderBase::OnReadComplete));
++pending_reads_;
}
}
void ReadCompleteTask(scoped_refptr<Buffer> buffer) {
DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
DCHECK_GT(pending_reads_, 0u);
--pending_reads_;
if (IsStopped()) {
return;
}
// Once the |seeking_| flag is set we ignore every buffers here
// until we receive a discontinuous buffer and we will turn off the
// |seeking_| flag.
if (buffer->IsDiscontinuous()) {
// TODO(hclam): put a DCHECK here to assert |seeking_| being true.
// I cannot do this now because seek operation is not fully
// asynchronous. There may be pending seek requests even before the
// previous was finished.
seeking_ = false;
}
if (seeking_) {
return;
}
// Decode the frame right away.
OnDecode(buffer);
// Attempt to fulfill a pending read callback and schedule additional reads
// if necessary.
FulfillPendingRead();
// Issue reads as necessary.
//
// Note that it's possible for us to decode but not produce a frame, in
// which case |pending_reads_| will remain less than |read_queue_| so we
// need to schedule an additional read.
DCHECK_LE(pending_reads_, read_queue_.size());
while (pending_reads_ < read_queue_.size()) {
demuxer_stream_->Read(NewCallback(this, &DecoderBase::OnReadComplete));
++pending_reads_;
}
}
// Attempts to fulfill a single pending read by dequeuing a buffer and read
// callback pair and executing the callback.
void FulfillPendingRead() {
DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
if (read_queue_.empty() || result_queue_.empty()) {
return;
}
// Dequeue a frame and read callback pair.
scoped_refptr<Output> output = result_queue_.front();
scoped_ptr<ReadCallback> read_callback(read_queue_.front());
result_queue_.pop_front();
read_queue_.pop_front();
// Execute the callback!
read_callback->Run(output);
}
// Tracks the number of asynchronous reads issued to |demuxer_stream_|.
// Using size_t since it is always compared against deque::size().
size_t pending_reads_;
// An internal state of the decoder that indicates that are waiting for seek
// to complete. We expect to receive a discontinuous frame/packet from the
// demuxer to signal that seeking is completed.
bool seeking_;
// Pointer to the demuxer stream that will feed us compressed buffers.
scoped_refptr<DemuxerStream> demuxer_stream_;
// Queue of decoded samples produced in the OnDecode() method of the decoder.
// Any samples placed in this queue will be assigned to the OutputQueue
// buffers once the OnDecode() method returns.
//
// TODO(ralphl): Eventually we want to have decoders get their destination
// buffer from the OutputQueue and write to it directly. Until we change
// from the Assignable buffer to callbacks and renderer-allocated buffers,
// we need this extra queue.
typedef std::deque<scoped_refptr<Output> > ResultQueue;
ResultQueue result_queue_;
// Queue of callbacks supplied by the renderer through the Read() method.
typedef std::deque<ReadCallback*> ReadQueue;
ReadQueue read_queue_;
// Simple state tracking variable.
enum State {
UNINITIALIZED,
INITIALIZED,
STOPPED,
};
State state_;
// Used for debugging.
PlatformThreadId thread_id_;
DISALLOW_COPY_AND_ASSIGN(DecoderBase);
};
} // namespace media
#endif // MEDIA_FILTERS_DECODER_BASE_H_