blob: af2ddcb40cf6e37ad2e1b90c07027c562593aaf6 [file] [log] [blame]
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "device/serial/data_source_sender.h"
#include <algorithm>
#include <limits>
#include <utility>
#include "base/bind.h"
#include "base/message_loop/message_loop.h"
namespace device {
// Represents a send that is not yet fulfilled.
class DataSourceSender::PendingSend {
public:
PendingSend(DataSourceSender* sender, const ReadyCallback& callback);
// Asynchronously fills |data_| with up to |num_bytes| of data. Following
// this, one of Done() and DoneWithError() will be called with the result.
void GetData(uint32_t num_bytes);
private:
class Buffer;
// Reports a successful write of |bytes_written|.
void Done(uint32_t bytes_written);
// Reports a partially successful or unsuccessful write of |bytes_written|
// with an error of |error|.
void DoneWithError(uint32_t bytes_written, int32_t error);
// The DataSourceSender that owns this.
DataSourceSender* sender_;
// The callback to call to get data.
ReadyCallback callback_;
// Whether the buffer specified by GetData() has been passed to |callback_|,
// but has not yet called Done() or DoneWithError().
bool buffer_in_use_;
// The data obtained using |callback_| to be dispatched to the client.
std::vector<char> data_;
};
// A Writable implementation that provides a view of a buffer owned by a
// DataSourceSender.
class DataSourceSender::PendingSend::Buffer : public WritableBuffer {
public:
Buffer(scoped_refptr<DataSourceSender> sender,
PendingSend* send,
char* buffer,
uint32_t buffer_size);
~Buffer() override;
// WritableBuffer overrides.
char* GetData() override;
uint32_t GetSize() override;
void Done(uint32_t bytes_written) override;
void DoneWithError(uint32_t bytes_written, int32_t error) override;
private:
// The DataSourceSender of whose buffer we are providing a view.
scoped_refptr<DataSourceSender> sender_;
// The PendingSend to which this buffer has been created in response.
PendingSend* pending_send_;
char* buffer_;
uint32_t buffer_size_;
};
DataSourceSender::DataSourceSender(
mojo::InterfaceRequest<serial::DataSource> source,
mojo::InterfacePtr<serial::DataSourceClient> client,
const ReadyCallback& ready_callback,
const ErrorCallback& error_callback)
: binding_(this, std::move(source)),
client_(std::move(client)),
ready_callback_(ready_callback),
error_callback_(error_callback),
available_buffer_capacity_(0),
paused_(false),
shut_down_(false),
weak_factory_(this) {
DCHECK(!ready_callback.is_null() && !error_callback.is_null());
binding_.set_connection_error_handler(
base::Bind(&DataSourceSender::OnConnectionError, base::Unretained(this)));
client_.set_connection_error_handler(
base::Bind(&DataSourceSender::OnConnectionError, base::Unretained(this)));
}
void DataSourceSender::ShutDown() {
shut_down_ = true;
ready_callback_.Reset();
error_callback_.Reset();
}
DataSourceSender::~DataSourceSender() {
DCHECK(shut_down_);
}
void DataSourceSender::Init(uint32_t buffer_size) {
available_buffer_capacity_ = buffer_size;
GetMoreData();
}
void DataSourceSender::Resume() {
if (pending_send_) {
DispatchFatalError();
return;
}
paused_ = false;
GetMoreData();
}
void DataSourceSender::ReportBytesReceived(uint32_t bytes_sent) {
available_buffer_capacity_ += bytes_sent;
if (!pending_send_ && !paused_)
GetMoreData();
}
void DataSourceSender::OnConnectionError() {
DispatchFatalError();
}
void DataSourceSender::GetMoreData() {
if (shut_down_ || paused_ || pending_send_ || !available_buffer_capacity_)
return;
pending_send_.reset(new PendingSend(this, ready_callback_));
pending_send_->GetData(available_buffer_capacity_);
}
void DataSourceSender::Done(const std::vector<char>& data) {
DoneInternal(data);
if (!shut_down_ && available_buffer_capacity_) {
base::MessageLoop::current()->PostTask(
FROM_HERE,
base::Bind(&DataSourceSender::GetMoreData, weak_factory_.GetWeakPtr()));
}
}
void DataSourceSender::DoneWithError(const std::vector<char>& data,
int32_t error) {
DoneInternal(data);
if (!shut_down_)
client_->OnError(error);
paused_ = true;
// We don't call GetMoreData here so we don't send any additional data until
// Resume() is called.
}
void DataSourceSender::DoneInternal(const std::vector<char>& data) {
DCHECK(pending_send_);
if (shut_down_)
return;
available_buffer_capacity_ -= static_cast<uint32_t>(data.size());
if (!data.empty()) {
mojo::Array<uint8_t> data_to_send(data.size());
std::copy(data.begin(), data.end(), &data_to_send[0]);
client_->OnData(std::move(data_to_send));
}
pending_send_.reset();
}
void DataSourceSender::DispatchFatalError() {
if (shut_down_)
return;
error_callback_.Run();
ShutDown();
}
DataSourceSender::PendingSend::PendingSend(DataSourceSender* sender,
const ReadyCallback& callback)
: sender_(sender), callback_(callback), buffer_in_use_(false) {
}
void DataSourceSender::PendingSend::GetData(uint32_t num_bytes) {
DCHECK(num_bytes);
DCHECK(!buffer_in_use_);
buffer_in_use_ = true;
data_.resize(num_bytes);
callback_.Run(scoped_ptr<WritableBuffer>(
new Buffer(sender_, this, &data_[0], num_bytes)));
}
void DataSourceSender::PendingSend::Done(uint32_t bytes_written) {
DCHECK(buffer_in_use_);
DCHECK_LE(bytes_written, data_.size());
buffer_in_use_ = false;
data_.resize(bytes_written);
sender_->Done(data_);
}
void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_written,
int32_t error) {
DCHECK(buffer_in_use_);
DCHECK_LE(bytes_written, data_.size());
buffer_in_use_ = false;
data_.resize(bytes_written);
sender_->DoneWithError(data_, error);
}
DataSourceSender::PendingSend::Buffer::Buffer(
scoped_refptr<DataSourceSender> sender,
PendingSend* send,
char* buffer,
uint32_t buffer_size)
: sender_(sender),
pending_send_(send),
buffer_(buffer),
buffer_size_(buffer_size) {
}
DataSourceSender::PendingSend::Buffer::~Buffer() {
if (pending_send_)
pending_send_->Done(0);
}
char* DataSourceSender::PendingSend::Buffer::GetData() {
return buffer_;
}
uint32_t DataSourceSender::PendingSend::Buffer::GetSize() {
return buffer_size_;
}
void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_written) {
DCHECK(sender_.get());
PendingSend* send = pending_send_;
pending_send_ = nullptr;
send->Done(bytes_written);
sender_ = nullptr;
}
void DataSourceSender::PendingSend::Buffer::DoneWithError(
uint32_t bytes_written,
int32_t error) {
DCHECK(sender_.get());
PendingSend* send = pending_send_;
pending_send_ = nullptr;
send->DoneWithError(bytes_written, error);
sender_ = nullptr;
}
} // namespace device