| // Copyright (c) 2010 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "media/base/composite_filter.h" |
| |
| #include "base/stl_util-inl.h" |
| #include "media/base/callback.h" |
| |
| namespace media { |
| |
| class CompositeFilter::FilterHostImpl : public FilterHost { |
| public: |
| FilterHostImpl(CompositeFilter* parent, FilterHost* host); |
| |
| FilterHost* host(); |
| |
| // media::FilterHost methods. |
| virtual void SetError(PipelineError error); |
| virtual base::TimeDelta GetTime() const; |
| virtual base::TimeDelta GetDuration() const; |
| virtual void SetTime(base::TimeDelta time); |
| virtual void SetDuration(base::TimeDelta duration); |
| virtual void SetBufferedTime(base::TimeDelta buffered_time); |
| virtual void SetTotalBytes(int64 total_bytes); |
| virtual void SetBufferedBytes(int64 buffered_bytes); |
| virtual void SetVideoSize(size_t width, size_t height); |
| virtual void SetStreaming(bool streaming); |
| virtual void NotifyEnded(); |
| virtual void SetLoaded(bool loaded); |
| virtual void SetNetworkActivity(bool network_activity); |
| virtual void DisableAudioRenderer(); |
| virtual void SetCurrentReadPosition(int64 offset); |
| virtual int64 GetCurrentReadPosition(); |
| |
| private: |
| CompositeFilter* parent_; |
| FilterHost* host_; |
| |
| DISALLOW_COPY_AND_ASSIGN(FilterHostImpl); |
| }; |
| |
| CompositeFilter::CompositeFilter(MessageLoop* message_loop) { |
| Init(message_loop, NULL); |
| } |
| |
| CompositeFilter::CompositeFilter(MessageLoop* message_loop, |
| ThreadFactoryFunction thread_factory) { |
| DCHECK(thread_factory); |
| Init(message_loop, thread_factory); |
| } |
| |
| void CompositeFilter::Init(MessageLoop* message_loop, |
| ThreadFactoryFunction thread_factory) { |
| DCHECK(message_loop); |
| message_loop_ = message_loop; |
| thread_factory_ = thread_factory; |
| runnable_factory_.reset( |
| new ScopedRunnableMethodFactory<CompositeFilter>(this)); |
| |
| if (!thread_factory_) { |
| thread_factory_ = &CompositeFilter::DefaultThreadFactory; |
| } |
| |
| state_ = kCreated; |
| sequence_index_ = 0; |
| error_ = PIPELINE_OK; |
| } |
| |
| CompositeFilter::~CompositeFilter() { |
| DCHECK_EQ(message_loop_, MessageLoop::current()); |
| DCHECK(state_ == kCreated || state_ == kStopped); |
| |
| // Stop every running filter thread. |
| for (FilterThreadVector::iterator iter = filter_threads_.begin(); |
| iter != filter_threads_.end(); |
| ++iter) { |
| (*iter)->Stop(); |
| } |
| |
| filters_.clear(); |
| STLDeleteElements(&filter_threads_); |
| } |
| |
| bool CompositeFilter::AddFilter(scoped_refptr<Filter> filter) { |
| DCHECK_EQ(message_loop_, MessageLoop::current()); |
| if (!filter.get() || state_ != kCreated || !host()) |
| return false; |
| |
| // Create a dedicated thread for this filter if applicable. |
| if (filter->requires_message_loop()) { |
| scoped_ptr<base::Thread> thread( |
| thread_factory_(filter->message_loop_name())); |
| |
| if (!thread.get() || !thread->Start()) { |
| return false; |
| } |
| |
| filter->set_message_loop(thread->message_loop()); |
| filter_threads_.push_back(thread.release()); |
| } |
| |
| // Register ourselves as the filter's host. |
| filter->set_host(host_impl_.get()); |
| filters_.push_back(make_scoped_refptr(filter.get())); |
| return true; |
| } |
| |
| const char* CompositeFilter::major_mime_type() const { |
| return ""; |
| } |
| |
| void CompositeFilter::set_host(FilterHost* host) { |
| DCHECK_EQ(message_loop_, MessageLoop::current()); |
| DCHECK(host); |
| DCHECK(!host_impl_.get()); |
| host_impl_.reset(new FilterHostImpl(this, host)); |
| } |
| |
| FilterHost* CompositeFilter::host() { |
| return host_impl_.get() ? host_impl_->host() : NULL; |
| } |
| |
| bool CompositeFilter::requires_message_loop() const { |
| return false; |
| } |
| |
| const char* CompositeFilter::message_loop_name() const { |
| return "CompositeFilter"; |
| } |
| |
| void CompositeFilter::set_message_loop(MessageLoop* message_loop) { |
| NOTREACHED() << "Message loop should not be set."; |
| } |
| |
| MessageLoop* CompositeFilter::message_loop() { |
| return NULL; |
| } |
| |
| void CompositeFilter::Play(FilterCallback* play_callback) { |
| DCHECK_EQ(message_loop_, MessageLoop::current()); |
| scoped_ptr<FilterCallback> callback(play_callback); |
| if (callback_.get()) { |
| SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING); |
| callback->Run(); |
| return; |
| } else if (state_ == kPlaying) { |
| callback->Run(); |
| return; |
| } else if (!host() || (state_ != kPaused && state_ != kCreated)) { |
| SendErrorToHost(PIPELINE_ERROR_INVALID_STATE); |
| callback->Run(); |
| return; |
| } |
| |
| ChangeState(kPlayPending); |
| callback_.reset(callback.release()); |
| StartSerialCallSequence(); |
| } |
| |
| void CompositeFilter::Pause(FilterCallback* pause_callback) { |
| DCHECK_EQ(message_loop_, MessageLoop::current()); |
| scoped_ptr<FilterCallback> callback(pause_callback); |
| if (callback_.get()) { |
| SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING); |
| callback->Run(); |
| return; |
| } else if (state_ == kPaused) { |
| callback->Run(); |
| return; |
| } else if (!host() || state_ != kPlaying) { |
| SendErrorToHost(PIPELINE_ERROR_INVALID_STATE); |
| callback->Run(); |
| return; |
| } |
| |
| ChangeState(kPausePending); |
| callback_.reset(callback.release()); |
| StartSerialCallSequence(); |
| } |
| |
| void CompositeFilter::Flush(FilterCallback* flush_callback) { |
| DCHECK_EQ(message_loop_, MessageLoop::current()); |
| scoped_ptr<FilterCallback> callback(flush_callback); |
| if (callback_.get()) { |
| SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING); |
| callback->Run(); |
| return; |
| } else if (!host() || (state_ != kCreated && state_ != kPaused)) { |
| SendErrorToHost(PIPELINE_ERROR_INVALID_STATE); |
| callback->Run(); |
| return; |
| } |
| |
| ChangeState(kFlushPending); |
| callback_.reset(callback.release()); |
| StartParallelCallSequence(); |
| } |
| |
| void CompositeFilter::Stop(FilterCallback* stop_callback) { |
| DCHECK_EQ(message_loop_, MessageLoop::current()); |
| scoped_ptr<FilterCallback> callback(stop_callback); |
| if (!host()) { |
| SendErrorToHost(PIPELINE_ERROR_INVALID_STATE); |
| callback->Run(); |
| return; |
| } else if (state_ == kStopped) { |
| callback->Run(); |
| return; |
| } |
| |
| switch(state_) { |
| case kError: |
| case kCreated: |
| case kPaused: |
| case kPlaying: |
| ChangeState(kStopPending); |
| break; |
| case kPlayPending: |
| ChangeState(kStopWhilePlayPending); |
| break; |
| case kPausePending: |
| ChangeState(kStopWhilePausePending); |
| break; |
| case kFlushPending: |
| ChangeState(kStopWhileFlushPending); |
| break; |
| case kSeekPending: |
| ChangeState(kStopWhileSeekPending); |
| break; |
| default: |
| SendErrorToHost(PIPELINE_ERROR_INVALID_STATE); |
| callback->Run(); |
| return; |
| } |
| |
| callback_.reset(callback.release()); |
| if (state_ == kStopPending) { |
| StartSerialCallSequence(); |
| } |
| } |
| |
| void CompositeFilter::SetPlaybackRate(float playback_rate) { |
| DCHECK_EQ(message_loop_, MessageLoop::current()); |
| for (FilterVector::iterator iter = filters_.begin(); |
| iter != filters_.end(); |
| ++iter) { |
| (*iter)->SetPlaybackRate(playback_rate); |
| } |
| } |
| |
| void CompositeFilter::Seek(base::TimeDelta time, |
| FilterCallback* seek_callback) { |
| DCHECK_EQ(message_loop_, MessageLoop::current()); |
| scoped_ptr<FilterCallback> callback(seek_callback); |
| if (callback_.get()) { |
| SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING); |
| callback->Run(); |
| return; |
| } else if (!host() || (state_ != kPaused && state_ != kCreated)) { |
| SendErrorToHost(PIPELINE_ERROR_INVALID_STATE); |
| callback->Run(); |
| return; |
| } |
| |
| ChangeState(kSeekPending); |
| callback_.reset(callback.release()); |
| pending_seek_time_ = time; |
| StartSerialCallSequence(); |
| } |
| |
| void CompositeFilter::OnAudioRendererDisabled() { |
| DCHECK_EQ(message_loop_, MessageLoop::current()); |
| for (FilterVector::iterator iter = filters_.begin(); |
| iter != filters_.end(); |
| ++iter) { |
| (*iter)->OnAudioRendererDisabled(); |
| } |
| } |
| |
| base::Thread* CompositeFilter::DefaultThreadFactory( |
| const char* thread_name) { |
| return new base::Thread(thread_name); |
| } |
| |
| void CompositeFilter::ChangeState(State new_state) { |
| DCHECK_EQ(message_loop_, MessageLoop::current()); |
| state_ = new_state; |
| } |
| |
| void CompositeFilter::StartSerialCallSequence() { |
| DCHECK_EQ(message_loop_, MessageLoop::current()); |
| error_ = PIPELINE_OK; |
| |
| if (filters_.size() > 0) { |
| sequence_index_ = 0; |
| CallFilter(filters_[sequence_index_], |
| NewThreadSafeCallback(&CompositeFilter::SerialCallback)); |
| } else { |
| sequence_index_ = 0; |
| SerialCallback(); |
| } |
| } |
| |
| void CompositeFilter::StartParallelCallSequence() { |
| DCHECK_EQ(message_loop_, MessageLoop::current()); |
| error_ = PIPELINE_OK; |
| |
| if (filters_.size() > 0) { |
| sequence_index_ = 0; |
| for (size_t i = 0; i < filters_.size(); i++) { |
| CallFilter(filters_[i], |
| NewThreadSafeCallback(&CompositeFilter::ParallelCallback)); |
| } |
| } else { |
| sequence_index_ = 0; |
| ParallelCallback(); |
| } |
| } |
| |
| void CompositeFilter::CallFilter(scoped_refptr<Filter>& filter, |
| FilterCallback* callback) { |
| switch(state_) { |
| case kPlayPending: |
| filter->Play(callback); |
| break; |
| case kPausePending: |
| filter->Pause(callback); |
| break; |
| case kFlushPending: |
| filter->Flush(callback); |
| break; |
| case kStopPending: |
| filter->Stop(callback); |
| break; |
| case kSeekPending: |
| filter->Seek(pending_seek_time_, callback); |
| break; |
| default: |
| delete callback; |
| ChangeState(kError); |
| HandleError(PIPELINE_ERROR_INVALID_STATE); |
| } |
| } |
| |
| void CompositeFilter::DispatchPendingCallback() { |
| if (callback_.get()) { |
| scoped_ptr<FilterCallback> callback(callback_.release()); |
| callback->Run(); |
| } |
| } |
| |
| CompositeFilter::State CompositeFilter::GetNextState(State state) const { |
| State ret = kInvalid; |
| switch (state) { |
| case kPlayPending: |
| ret = kPlaying; |
| break; |
| case kPausePending: |
| ret = kPaused; |
| case kFlushPending: |
| ret = kPaused; |
| break; |
| case kStopPending: |
| ret = kStopped; |
| break; |
| case kSeekPending: |
| ret = kPaused; |
| break; |
| case kStopWhilePlayPending: |
| case kStopWhilePausePending: |
| case kStopWhileFlushPending: |
| case kStopWhileSeekPending: |
| ret = kStopPending; |
| break; |
| |
| case kInvalid: |
| case kCreated: |
| case kPlaying: |
| case kPaused: |
| case kStopped: |
| case kError: |
| ret = kInvalid; |
| break; |
| |
| // default: intentionally left out to catch missing states. |
| } |
| |
| return ret; |
| } |
| |
| void CompositeFilter::SerialCallback() { |
| DCHECK_EQ(message_loop_, MessageLoop::current()); |
| if (error_ != PIPELINE_OK) { |
| // We encountered an error. Terminate the sequence now. |
| ChangeState(kError); |
| HandleError(error_); |
| return; |
| } |
| |
| if (filters_.size() > 0) |
| sequence_index_++; |
| |
| if (sequence_index_ == filters_.size()) { |
| // All filters have been successfully called without error. |
| OnCallSequenceDone(); |
| } else if (GetNextState(state_) == kStopPending) { |
| // Abort sequence early and start issuing Stop() calls. |
| ChangeState(kStopPending); |
| StartSerialCallSequence(); |
| } else { |
| // We aren't done with the sequence. Call the next filter. |
| CallFilter(filters_[sequence_index_], |
| NewThreadSafeCallback(&CompositeFilter::SerialCallback)); |
| } |
| } |
| |
| void CompositeFilter::ParallelCallback() { |
| DCHECK_EQ(message_loop_, MessageLoop::current()); |
| |
| if (filters_.size() > 0) |
| sequence_index_++; |
| |
| if (sequence_index_ == filters_.size()) { |
| if (error_ != PIPELINE_OK) { |
| // We encountered an error. |
| ChangeState(kError); |
| HandleError(error_); |
| return; |
| } |
| |
| OnCallSequenceDone(); |
| } |
| } |
| |
| void CompositeFilter::OnCallSequenceDone() { |
| State next_state = GetNextState(state_); |
| |
| if (next_state == kInvalid) { |
| // We somehow got into an unexpected state. |
| ChangeState(kError); |
| HandleError(PIPELINE_ERROR_INVALID_STATE); |
| } |
| |
| ChangeState(next_state); |
| |
| if (state_ == kStopPending) { |
| // Handle a deferred Stop(). |
| StartSerialCallSequence(); |
| } else { |
| // Call the callback to indicate that the operation has completed. |
| DispatchPendingCallback(); |
| } |
| } |
| |
| void CompositeFilter::SendErrorToHost(PipelineError error) { |
| if (host_impl_.get()) |
| host_impl_.get()->host()->SetError(error); |
| } |
| |
| void CompositeFilter::HandleError(PipelineError error) { |
| if (error != PIPELINE_OK) { |
| SendErrorToHost(error); |
| } |
| |
| DispatchPendingCallback(); |
| } |
| |
| FilterCallback* CompositeFilter::NewThreadSafeCallback( |
| void (CompositeFilter::*method)()) { |
| return TaskToCallbackAdapter::NewCallback( |
| NewRunnableFunction(&CompositeFilter::OnCallback, |
| message_loop_, |
| runnable_factory_->NewRunnableMethod(method))); |
| } |
| |
| // This method is intentionally static so that no reference to the composite |
| // is needed to call it. This method may be called by other threads and we |
| // don't want those threads to gain ownership of this composite by having a |
| // reference to it. |task| will contain a weak reference to the composite |
| // so that the reference can be cleared if the composite is destroyed before |
| // the callback is called. |
| // static |
| void CompositeFilter::OnCallback(MessageLoop* message_loop, |
| CancelableTask* task) { |
| if (MessageLoop::current() != message_loop) { |
| // Posting callback to the proper thread. |
| message_loop->PostTask(FROM_HERE, task); |
| return; |
| } |
| |
| task->Run(); |
| delete task; |
| } |
| |
| bool CompositeFilter::CanForwardError() { |
| return (state_ == kCreated) || (state_ == kPlaying) || (state_ == kPaused); |
| } |
| |
| void CompositeFilter::SetError(PipelineError error) { |
| // TODO(acolwell): Temporary hack to handle errors that occur |
| // during filter initialization. In this case we just forward |
| // the error to the host even if it is on the wrong thread. We |
| // have to do this because if we defer the call, we can't be |
| // sure the host will get the error before the "init done" callback |
| // is executed. This will be cleaned up when filter init is refactored. |
| if (state_ == kCreated) { |
| SendErrorToHost(error); |
| return; |
| } |
| |
| if (message_loop_ != MessageLoop::current()) { |
| message_loop_->PostTask(FROM_HERE, |
| NewRunnableMethod(this, &CompositeFilter::SetError, error)); |
| return; |
| } |
| |
| DCHECK_EQ(message_loop_, MessageLoop::current()); |
| |
| // Drop errors recieved while stopping or stopped. |
| // This shields the owner of this object from having |
| // to deal with errors it can't do anything about. |
| if (state_ == kStopPending || state_ == kStopped) |
| return; |
| |
| error_ = error; |
| if (CanForwardError()) |
| SendErrorToHost(error); |
| } |
| |
| CompositeFilter::FilterHostImpl::FilterHostImpl(CompositeFilter* parent, |
| FilterHost* host) : |
| parent_(parent), |
| host_(host) { |
| } |
| |
| FilterHost* CompositeFilter::FilterHostImpl::host() { |
| return host_; |
| } |
| |
| // media::FilterHost methods. |
| void CompositeFilter::FilterHostImpl::SetError(PipelineError error) { |
| parent_->SetError(error); |
| } |
| |
| base::TimeDelta CompositeFilter::FilterHostImpl::GetTime() const { |
| return host_->GetTime(); |
| } |
| |
| base::TimeDelta CompositeFilter::FilterHostImpl::GetDuration() const { |
| return host_->GetDuration(); |
| } |
| |
| void CompositeFilter::FilterHostImpl::SetTime(base::TimeDelta time) { |
| host_->SetTime(time); |
| } |
| |
| void CompositeFilter::FilterHostImpl::SetDuration(base::TimeDelta duration) { |
| host_->SetDuration(duration); |
| } |
| |
| void CompositeFilter::FilterHostImpl::SetBufferedTime( |
| base::TimeDelta buffered_time) { |
| host_->SetBufferedTime(buffered_time); |
| } |
| |
| void CompositeFilter::FilterHostImpl::SetTotalBytes(int64 total_bytes) { |
| host_->SetTotalBytes(total_bytes); |
| } |
| |
| void CompositeFilter::FilterHostImpl::SetBufferedBytes(int64 buffered_bytes) { |
| host_->SetBufferedBytes(buffered_bytes); |
| } |
| |
| void CompositeFilter::FilterHostImpl::SetVideoSize(size_t width, |
| size_t height) { |
| host_->SetVideoSize(width, height); |
| } |
| |
| void CompositeFilter::FilterHostImpl::SetStreaming(bool streaming) { |
| host_->SetStreaming(streaming); |
| } |
| |
| void CompositeFilter::FilterHostImpl::NotifyEnded() { |
| host_->NotifyEnded(); |
| } |
| |
| void CompositeFilter::FilterHostImpl::SetLoaded(bool loaded) { |
| host_->SetLoaded(loaded); |
| } |
| |
| void CompositeFilter::FilterHostImpl::SetNetworkActivity( |
| bool network_activity) { |
| host_->SetNetworkActivity(network_activity); |
| } |
| |
| void CompositeFilter::FilterHostImpl::DisableAudioRenderer() { |
| host_->DisableAudioRenderer(); |
| } |
| |
| void CompositeFilter::FilterHostImpl::SetCurrentReadPosition(int64 offset) { |
| host_->SetCurrentReadPosition(offset); |
| } |
| |
| int64 CompositeFilter::FilterHostImpl::GetCurrentReadPosition() { |
| return host_->GetCurrentReadPosition(); |
| } |
| |
| } // namespace media |