| // Copyright 2014 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "device/serial/data_sink_receiver.h" |
| |
| #include <limits> |
| |
| #include "base/bind.h" |
| #include "base/message_loop/message_loop.h" |
| |
| namespace device { |
| |
| // A ReadOnlyBuffer implementation that provides a view of a buffer owned by a |
| // DataSinkReceiver. |
| class DataSinkReceiver::Buffer : public ReadOnlyBuffer { |
| public: |
| Buffer(scoped_refptr<DataSinkReceiver> receiver, |
| const char* buffer, |
| uint32_t buffer_size); |
| ~Buffer() override; |
| |
| void Cancel(int32_t error); |
| |
| // ReadOnlyBuffer overrides. |
| const char* GetData() override; |
| uint32_t GetSize() override; |
| void Done(uint32_t bytes_read) override; |
| void DoneWithError(uint32_t bytes_read, int32_t error) override; |
| |
| private: |
| // The DataSinkReceiver of whose buffer we are providing a view. |
| scoped_refptr<DataSinkReceiver> receiver_; |
| |
| const char* buffer_; |
| uint32_t buffer_size_; |
| |
| // Whether this receive has been cancelled. |
| bool cancelled_; |
| |
| // If |cancelled_|, contains the cancellation error to report. |
| int32_t cancellation_error_; |
| }; |
| |
| // A frame of data received from the client. |
| class DataSinkReceiver::DataFrame { |
| public: |
| explicit DataFrame(mojo::Array<uint8_t> data, |
| const mojo::Callback<void(uint32_t, int32_t)>& callback); |
| |
| // Returns the number of unconsumed bytes remaining of this data frame. |
| uint32_t GetRemainingBytes(); |
| |
| // Returns a pointer to the remaining data to be consumed. |
| const char* GetData(); |
| |
| // Reports that |bytes_read| bytes have been consumed. |
| void OnDataConsumed(uint32_t bytes_read); |
| |
| // Reports that an error occurred. |
| void ReportError(uint32_t bytes_read, int32_t error); |
| |
| private: |
| mojo::Array<uint8_t> data_; |
| uint32_t offset_; |
| const mojo::Callback<void(uint32_t, int32_t)> callback_; |
| }; |
| |
| DataSinkReceiver::DataSinkReceiver( |
| mojo::InterfaceRequest<serial::DataSink> request, |
| const ReadyCallback& ready_callback, |
| const CancelCallback& cancel_callback, |
| const ErrorCallback& error_callback) |
| : binding_(this, request.Pass()), |
| ready_callback_(ready_callback), |
| cancel_callback_(cancel_callback), |
| error_callback_(error_callback), |
| current_error_(0), |
| buffer_in_use_(NULL), |
| shut_down_(false), |
| weak_factory_(this) { |
| binding_.set_error_handler(this); |
| } |
| |
| void DataSinkReceiver::ShutDown() { |
| shut_down_ = true; |
| } |
| |
| DataSinkReceiver::~DataSinkReceiver() { |
| } |
| |
| void DataSinkReceiver::Cancel(int32_t error) { |
| // If we have sent a ReportBytesSentAndError but have not received the |
| // response, that ReportBytesSentAndError message will appear to the |
| // DataSinkClient to be caused by this Cancel message. In that case, we ignore |
| // the cancel. |
| if (current_error_) |
| return; |
| |
| // If there is a buffer is in use, mark the buffer as cancelled and notify the |
| // client by calling |cancel_callback_|. The sink implementation may or may |
| // not take the cancellation into account when deciding what error (if any) to |
| // return. If the sink returns an error, we ignore the cancellation error. |
| // Otherwise, if the sink does not report an error, we override that with the |
| // cancellation error. Once a cancellation has been received, the next report |
| // sent to the client will always contain an error; the error returned by the |
| // sink or the cancellation error if the sink does not return an error. |
| if (buffer_in_use_) { |
| buffer_in_use_->Cancel(error); |
| if (!cancel_callback_.is_null()) |
| cancel_callback_.Run(error); |
| return; |
| } |
| ReportError(0, error); |
| } |
| |
| void DataSinkReceiver::OnData( |
| mojo::Array<uint8_t> data, |
| const mojo::Callback<void(uint32_t, int32_t)>& callback) { |
| if (current_error_) { |
| callback.Run(0, current_error_); |
| return; |
| } |
| pending_data_buffers_.push( |
| linked_ptr<DataFrame>(new DataFrame(data.Pass(), callback))); |
| if (!buffer_in_use_) |
| RunReadyCallback(); |
| } |
| |
| void DataSinkReceiver::OnConnectionError() { |
| DispatchFatalError(); |
| } |
| |
| void DataSinkReceiver::RunReadyCallback() { |
| DCHECK(!shut_down_ && !current_error_); |
| // If data arrives while a call to RunReadyCallback() is posted, we can be |
| // called with buffer_in_use_ already set. |
| if (buffer_in_use_) |
| return; |
| buffer_in_use_ = |
| new Buffer(this, |
| pending_data_buffers_.front()->GetData(), |
| pending_data_buffers_.front()->GetRemainingBytes()); |
| ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_)); |
| } |
| |
| void DataSinkReceiver::Done(uint32_t bytes_read) { |
| if (!DoneInternal(bytes_read)) |
| return; |
| pending_data_buffers_.front()->OnDataConsumed(bytes_read); |
| if (pending_data_buffers_.front()->GetRemainingBytes() == 0) |
| pending_data_buffers_.pop(); |
| if (!pending_data_buffers_.empty()) { |
| base::MessageLoop::current()->PostTask( |
| FROM_HERE, |
| base::Bind(&DataSinkReceiver::RunReadyCallback, |
| weak_factory_.GetWeakPtr())); |
| } |
| } |
| |
| void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) { |
| if (!DoneInternal(bytes_read)) |
| return; |
| ReportError(bytes_read, error); |
| } |
| |
| bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) { |
| if (shut_down_) |
| return false; |
| |
| DCHECK(buffer_in_use_); |
| buffer_in_use_ = NULL; |
| return true; |
| } |
| |
| void DataSinkReceiver::ReportError(uint32_t bytes_read, int32_t error) { |
| // When we encounter an error, we must discard the data from any send buffers |
| // transmitted by the DataSink client before it receives this error. |
| DCHECK(error); |
| current_error_ = error; |
| while (!pending_data_buffers_.empty()) { |
| pending_data_buffers_.front()->ReportError(bytes_read, error); |
| pending_data_buffers_.pop(); |
| bytes_read = 0; |
| } |
| } |
| |
| void DataSinkReceiver::ClearError() { |
| current_error_ = 0; |
| } |
| |
| void DataSinkReceiver::DispatchFatalError() { |
| if (shut_down_) |
| return; |
| |
| ShutDown(); |
| if (!error_callback_.is_null()) |
| error_callback_.Run(); |
| } |
| |
| DataSinkReceiver::Buffer::Buffer(scoped_refptr<DataSinkReceiver> receiver, |
| const char* buffer, |
| uint32_t buffer_size) |
| : receiver_(receiver), |
| buffer_(buffer), |
| buffer_size_(buffer_size), |
| cancelled_(false), |
| cancellation_error_(0) { |
| } |
| |
| DataSinkReceiver::Buffer::~Buffer() { |
| if (!receiver_.get()) |
| return; |
| if (cancelled_) |
| receiver_->DoneWithError(0, cancellation_error_); |
| else |
| receiver_->Done(0); |
| } |
| |
| void DataSinkReceiver::Buffer::Cancel(int32_t error) { |
| cancelled_ = true; |
| cancellation_error_ = error; |
| } |
| |
| const char* DataSinkReceiver::Buffer::GetData() { |
| return buffer_; |
| } |
| |
| uint32_t DataSinkReceiver::Buffer::GetSize() { |
| return buffer_size_; |
| } |
| |
| void DataSinkReceiver::Buffer::Done(uint32_t bytes_read) { |
| scoped_refptr<DataSinkReceiver> receiver = receiver_; |
| receiver_ = nullptr; |
| if (cancelled_) |
| receiver->DoneWithError(bytes_read, cancellation_error_); |
| else |
| receiver->Done(bytes_read); |
| buffer_ = NULL; |
| buffer_size_ = 0; |
| } |
| |
| void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read, |
| int32_t error) { |
| scoped_refptr<DataSinkReceiver> receiver = receiver_; |
| receiver_ = nullptr; |
| receiver->DoneWithError(bytes_read, error); |
| buffer_ = NULL; |
| buffer_size_ = 0; |
| } |
| |
| DataSinkReceiver::DataFrame::DataFrame( |
| mojo::Array<uint8_t> data, |
| const mojo::Callback<void(uint32_t, int32_t)>& callback) |
| : data_(data.Pass()), offset_(0), callback_(callback) { |
| DCHECK_LT(0u, data_.size()); |
| } |
| |
| // Returns the number of uncomsumed bytes remaining of this data frame. |
| uint32_t DataSinkReceiver::DataFrame::GetRemainingBytes() { |
| return static_cast<uint32_t>(data_.size() - offset_); |
| } |
| |
| // Returns a pointer to the remaining data to be consumed. |
| const char* DataSinkReceiver::DataFrame::GetData() { |
| DCHECK_LT(offset_, data_.size()); |
| return reinterpret_cast<const char*>(&data_[0]) + offset_; |
| } |
| |
| void DataSinkReceiver::DataFrame::OnDataConsumed(uint32_t bytes_read) { |
| offset_ += bytes_read; |
| DCHECK_LE(offset_, data_.size()); |
| if (offset_ == data_.size()) |
| callback_.Run(offset_, 0); |
| } |
| void DataSinkReceiver::DataFrame::ReportError(uint32_t bytes_read, |
| int32_t error) { |
| offset_ += bytes_read; |
| DCHECK_LE(offset_, data_.size()); |
| callback_.Run(offset_, error); |
| } |
| |
| } // namespace device |