| // Copyright 2015 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "content/child/shared_memory_data_consumer_handle.h" |
| |
| #include <algorithm> |
| #include <deque> |
| #include <utility> |
| |
| #include "base/bind.h" |
| #include "base/macros.h" |
| #include "base/memory/ptr_util.h" |
| #include "base/message_loop/message_loop.h" |
| #include "base/single_thread_task_runner.h" |
| #include "base/synchronization/lock.h" |
| #include "base/threading/thread_task_runner_handle.h" |
| #include "content/public/child/fixed_received_data.h" |
| |
| namespace content { |
| |
| namespace { |
| |
| class DelegateThreadSafeReceivedData final |
| : public RequestPeer::ThreadSafeReceivedData { |
| public: |
| explicit DelegateThreadSafeReceivedData( |
| std::unique_ptr<RequestPeer::ReceivedData> data) |
| : data_(std::move(data)), |
| task_runner_(base::ThreadTaskRunnerHandle::Get()) {} |
| ~DelegateThreadSafeReceivedData() override { |
| if (!task_runner_->BelongsToCurrentThread()) { |
| // Delete the data on the original thread. |
| task_runner_->DeleteSoon(FROM_HERE, data_.release()); |
| } |
| } |
| |
| const char* payload() const override { return data_->payload(); } |
| int length() const override { return data_->length(); } |
| int encoded_data_length() const override { |
| return data_->encoded_data_length(); |
| } |
| |
| private: |
| std::unique_ptr<RequestPeer::ReceivedData> data_; |
| scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
| |
| DISALLOW_COPY_AND_ASSIGN(DelegateThreadSafeReceivedData); |
| }; |
| |
| } // namespace |
| |
| using Result = blink::WebDataConsumerHandle::Result; |
| |
| // All methods (except for ctor/dtor) must be called with |lock_| aquired |
| // unless otherwise stated. |
| class SharedMemoryDataConsumerHandle::Context final |
| : public base::RefCountedThreadSafe<Context> { |
| public: |
| explicit Context(const base::Closure& on_reader_detached) |
| : result_(Ok), |
| first_offset_(0), |
| client_(nullptr), |
| writer_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
| on_reader_detached_(on_reader_detached), |
| is_on_reader_detached_valid_(!on_reader_detached_.is_null()), |
| is_handle_active_(true), |
| is_two_phase_read_in_progress_(false) {} |
| |
| bool IsEmpty() const { |
| lock_.AssertAcquired(); |
| return queue_.empty(); |
| } |
| void ClearIfNecessary() { |
| lock_.AssertAcquired(); |
| if (!is_handle_locked() && !is_handle_active()) { |
| // No one is interested in the contents. |
| if (is_on_reader_detached_valid_) { |
| // We post a task even in the writer thread in order to avoid a |
| // reentrance problem as calling |on_reader_detached_| may manipulate |
| // the context synchronously. |
| writer_task_runner_->PostTask(FROM_HERE, on_reader_detached_); |
| } |
| Clear(); |
| } |
| } |
| void ClearQueue() { |
| lock_.AssertAcquired(); |
| queue_.clear(); |
| first_offset_ = 0; |
| } |
| RequestPeer::ThreadSafeReceivedData* Top() { |
| lock_.AssertAcquired(); |
| return queue_.front().get(); |
| } |
| void Push(std::unique_ptr<RequestPeer::ThreadSafeReceivedData> data) { |
| lock_.AssertAcquired(); |
| queue_.push_back(std::move(data)); |
| } |
| size_t first_offset() const { |
| lock_.AssertAcquired(); |
| return first_offset_; |
| } |
| Result result() const { |
| lock_.AssertAcquired(); |
| return result_; |
| } |
| void set_result(Result r) { |
| lock_.AssertAcquired(); |
| result_ = r; |
| } |
| void AcquireReaderLock(Client* client) { |
| lock_.AssertAcquired(); |
| DCHECK(!notification_task_runner_); |
| DCHECK(!client_); |
| notification_task_runner_ = base::ThreadTaskRunnerHandle::Get(); |
| client_ = client; |
| if (client && !(IsEmpty() && result() == Ok)) { |
| // We cannot notify synchronously because the user doesn't have the reader |
| // yet. |
| notification_task_runner_->PostTask( |
| FROM_HERE, base::Bind(&Context::NotifyInternal, this, false)); |
| } |
| } |
| void ReleaseReaderLock() { |
| lock_.AssertAcquired(); |
| DCHECK(notification_task_runner_); |
| notification_task_runner_ = nullptr; |
| client_ = nullptr; |
| } |
| void PostNotify() { |
| lock_.AssertAcquired(); |
| auto runner = notification_task_runner_; |
| if (!runner) |
| return; |
| // We don't re-post the task when the runner changes while waiting for |
| // this task because in this case a new reader is obtained and |
| // notification is already done at the reader creation time if necessary. |
| runner->PostTask(FROM_HERE, |
| base::Bind(&Context::NotifyInternal, this, false)); |
| } |
| // Must be called with |lock_| not aquired. |
| void Notify() { NotifyInternal(true); } |
| // This function doesn't work in the destructor if |on_reader_detached_| is |
| // not null. |
| void ResetOnReaderDetached() { |
| lock_.AssertAcquired(); |
| if (on_reader_detached_.is_null()) { |
| DCHECK(!is_on_reader_detached_valid_); |
| return; |
| } |
| is_on_reader_detached_valid_ = false; |
| if (writer_task_runner_->BelongsToCurrentThread()) { |
| // We can reset the closure immediately. |
| on_reader_detached_.Reset(); |
| } else { |
| // We need to reset |on_reader_detached_| on the right thread because it |
| // might lead to the object destruction. |
| writer_task_runner_->PostTask( |
| FROM_HERE, base::Bind(&Context::ResetOnReaderDetachedWithLock, this)); |
| } |
| } |
| bool is_handle_locked() const { |
| lock_.AssertAcquired(); |
| return static_cast<bool>(notification_task_runner_); |
| } |
| bool IsReaderBoundToCurrentThread() const { |
| lock_.AssertAcquired(); |
| return notification_task_runner_ && |
| notification_task_runner_->BelongsToCurrentThread(); |
| } |
| bool is_handle_active() const { |
| lock_.AssertAcquired(); |
| return is_handle_active_; |
| } |
| void set_is_handle_active(bool b) { |
| lock_.AssertAcquired(); |
| is_handle_active_ = b; |
| } |
| void Consume(size_t s) { |
| lock_.AssertAcquired(); |
| first_offset_ += s; |
| auto* top = Top(); |
| if (static_cast<size_t>(top->length()) <= first_offset_) { |
| queue_.pop_front(); |
| first_offset_ = 0; |
| } |
| } |
| bool is_two_phase_read_in_progress() const { |
| lock_.AssertAcquired(); |
| return is_two_phase_read_in_progress_; |
| } |
| void set_is_two_phase_read_in_progress(bool b) { |
| lock_.AssertAcquired(); |
| is_two_phase_read_in_progress_ = b; |
| } |
| // Can be called with |lock_| not aquired. |
| base::Lock& lock() { return lock_; } |
| |
| private: |
| // Must be called with |lock_| not aquired. |
| void NotifyInternal(bool repost) { |
| scoped_refptr<base::SingleThreadTaskRunner> runner; |
| { |
| base::AutoLock lock(lock_); |
| runner = notification_task_runner_; |
| } |
| if (!runner) |
| return; |
| |
| if (runner->BelongsToCurrentThread()) { |
| // It is safe to access member variables without lock because |client_| |
| // is bound to the current thread. |
| if (client_) |
| client_->didGetReadable(); |
| return; |
| } |
| if (repost) { |
| // We don't re-post the task when the runner changes while waiting for |
| // this task because in this case a new reader is obtained and |
| // notification is already done at the reader creation time if necessary. |
| runner->PostTask(FROM_HERE, |
| base::Bind(&Context::NotifyInternal, this, false)); |
| } |
| } |
| void Clear() { |
| lock_.AssertAcquired(); |
| ClearQueue(); |
| client_ = nullptr; |
| ResetOnReaderDetached(); |
| } |
| // Must be called with |lock_| not aquired. |
| void ResetOnReaderDetachedWithLock() { |
| base::AutoLock lock(lock_); |
| ResetOnReaderDetached(); |
| } |
| |
| friend class base::RefCountedThreadSafe<Context>; |
| ~Context() = default; |
| |
| base::Lock lock_; |
| // |result_| stores the ultimate state of this handle if it has. Otherwise, |
| // |Ok| is set. |
| Result result_; |
| std::deque<std::unique_ptr<RequestPeer::ThreadSafeReceivedData>> queue_; |
| size_t first_offset_; |
| Client* client_; |
| scoped_refptr<base::SingleThreadTaskRunner> notification_task_runner_; |
| scoped_refptr<base::SingleThreadTaskRunner> writer_task_runner_; |
| base::Closure on_reader_detached_; |
| // We need this boolean variable to remember if |on_reader_detached_| is |
| // callable because we need to reset |on_reader_detached_| only on the writer |
| // thread and hence |on_reader_detached_.is_null()| is untrustworthy on |
| // other threads. |
| bool is_on_reader_detached_valid_; |
| bool is_handle_active_; |
| bool is_two_phase_read_in_progress_; |
| |
| DISALLOW_COPY_AND_ASSIGN(Context); |
| }; |
| |
| SharedMemoryDataConsumerHandle::Writer::Writer( |
| const scoped_refptr<Context>& context, |
| BackpressureMode mode) |
| : context_(context), mode_(mode) { |
| } |
| |
| SharedMemoryDataConsumerHandle::Writer::~Writer() { |
| Close(); |
| base::AutoLock lock(context_->lock()); |
| context_->ResetOnReaderDetached(); |
| } |
| |
| void SharedMemoryDataConsumerHandle::Writer::AddData( |
| std::unique_ptr<RequestPeer::ReceivedData> data) { |
| if (!data->length()) { |
| // We omit empty data. |
| return; |
| } |
| |
| bool needs_notification = false; |
| { |
| base::AutoLock lock(context_->lock()); |
| if (!context_->is_handle_active() && !context_->is_handle_locked()) { |
| // No one is interested in the data. |
| return; |
| } |
| |
| needs_notification = context_->IsEmpty(); |
| std::unique_ptr<RequestPeer::ThreadSafeReceivedData> data_to_pass; |
| if (mode_ == kApplyBackpressure) { |
| data_to_pass = |
| base::MakeUnique<DelegateThreadSafeReceivedData>(std::move(data)); |
| } else { |
| data_to_pass = base::MakeUnique<FixedReceivedData>(data.get()); |
| } |
| context_->Push(std::move(data_to_pass)); |
| } |
| |
| if (needs_notification) { |
| // We CAN issue the notification synchronously if the associated reader |
| // lives in this thread, because this function cannot be called in the |
| // client's callback. |
| context_->Notify(); |
| } |
| } |
| |
| void SharedMemoryDataConsumerHandle::Writer::Close() { |
| base::AutoLock lock(context_->lock()); |
| if (context_->result() == Ok) { |
| context_->set_result(Done); |
| context_->ResetOnReaderDetached(); |
| if (context_->IsEmpty()) { |
| // We cannot issue the notification synchronously because this function |
| // can be called in the client's callback. |
| context_->PostNotify(); |
| } |
| } |
| } |
| |
| void SharedMemoryDataConsumerHandle::Writer::Fail() { |
| base::AutoLock lock(context_->lock()); |
| if (context_->result() == Ok) { |
| // TODO(yhirano): Use an appropriate error code other than |
| // UnexpectedError. |
| context_->set_result(UnexpectedError); |
| |
| if (context_->is_two_phase_read_in_progress()) { |
| // If we are in two-phase read session, we cannot discard the data. We |
| // will clear the queue at the end of the session. |
| } else { |
| context_->ClearQueue(); |
| } |
| |
| context_->ResetOnReaderDetached(); |
| // We cannot issue the notification synchronously because this function can |
| // be called in the client's callback. |
| context_->PostNotify(); |
| } |
| } |
| |
| SharedMemoryDataConsumerHandle::ReaderImpl::ReaderImpl( |
| scoped_refptr<Context> context, |
| Client* client) |
| : context_(context) { |
| base::AutoLock lock(context_->lock()); |
| DCHECK(!context_->is_handle_locked()); |
| context_->AcquireReaderLock(client); |
| } |
| |
| SharedMemoryDataConsumerHandle::ReaderImpl::~ReaderImpl() { |
| base::AutoLock lock(context_->lock()); |
| context_->ReleaseReaderLock(); |
| context_->ClearIfNecessary(); |
| } |
| |
| Result SharedMemoryDataConsumerHandle::ReaderImpl::read( |
| void* data, |
| size_t size, |
| Flags flags, |
| size_t* read_size_to_return) { |
| base::AutoLock lock(context_->lock()); |
| |
| size_t total_read_size = 0; |
| *read_size_to_return = 0; |
| |
| if (context_->result() == Ok && context_->is_two_phase_read_in_progress()) |
| context_->set_result(UnexpectedError); |
| |
| if (context_->result() != Ok && context_->result() != Done) |
| return context_->result(); |
| |
| while (!context_->IsEmpty() && total_read_size < size) { |
| auto* top = context_->Top(); |
| size_t readable = top->length() - context_->first_offset(); |
| size_t writable = size - total_read_size; |
| size_t read_size = std::min(readable, writable); |
| const char* begin = top->payload() + context_->first_offset(); |
| std::copy(begin, begin + read_size, |
| static_cast<char*>(data) + total_read_size); |
| total_read_size += read_size; |
| context_->Consume(read_size); |
| } |
| *read_size_to_return = total_read_size; |
| if (total_read_size || !context_->IsEmpty()) |
| return Ok; |
| if (context_->result() == Done) |
| return Done; |
| return ShouldWait; |
| } |
| |
| Result SharedMemoryDataConsumerHandle::ReaderImpl::beginRead( |
| const void** buffer, |
| Flags flags, |
| size_t* available) { |
| *buffer = nullptr; |
| *available = 0; |
| |
| base::AutoLock lock(context_->lock()); |
| |
| if (context_->result() == Ok && context_->is_two_phase_read_in_progress()) |
| context_->set_result(UnexpectedError); |
| |
| if (context_->result() != Ok && context_->result() != Done) |
| return context_->result(); |
| |
| if (context_->IsEmpty()) |
| return context_->result() == Done ? Done : ShouldWait; |
| |
| context_->set_is_two_phase_read_in_progress(true); |
| auto* top = context_->Top(); |
| *buffer = top->payload() + context_->first_offset(); |
| *available = top->length() - context_->first_offset(); |
| |
| return Ok; |
| } |
| |
| Result SharedMemoryDataConsumerHandle::ReaderImpl::endRead(size_t read_size) { |
| base::AutoLock lock(context_->lock()); |
| |
| if (!context_->is_two_phase_read_in_progress()) |
| return UnexpectedError; |
| |
| context_->set_is_two_phase_read_in_progress(false); |
| if (context_->result() != Ok && context_->result() != Done) { |
| // We have an error, so we can discard the stored data. |
| context_->ClearQueue(); |
| } else { |
| context_->Consume(read_size); |
| } |
| |
| return Ok; |
| } |
| |
| SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle( |
| BackpressureMode mode, |
| std::unique_ptr<Writer>* writer) |
| : SharedMemoryDataConsumerHandle(mode, base::Closure(), writer) {} |
| |
| SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle( |
| BackpressureMode mode, |
| const base::Closure& on_reader_detached, |
| std::unique_ptr<Writer>* writer) |
| : context_(new Context(on_reader_detached)) { |
| writer->reset(new Writer(context_, mode)); |
| } |
| |
| SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() { |
| base::AutoLock lock(context_->lock()); |
| context_->set_is_handle_active(false); |
| context_->ClearIfNecessary(); |
| } |
| |
| std::unique_ptr<blink::WebDataConsumerHandle::Reader> |
| SharedMemoryDataConsumerHandle::obtainReader(Client* client) { |
| return base::WrapUnique(new ReaderImpl(context_, client)); |
| } |
| |
| const char* SharedMemoryDataConsumerHandle::debugName() const { |
| return "SharedMemoryDataConsumerHandle"; |
| } |
| |
| } // namespace content |