blob: 86a5b4e913abcc70729820992a730722600746e8 [file] [log] [blame]
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/cast_channel/mojo_data_pump.h"
#include <utility>
#include "base/bind.h"
#include "base/memory/ptr_util.h"
#include "net/base/completion_once_callback.h"
#include "net/base/io_buffer.h"
#include "net/base/net_errors.h"
namespace cast_channel {
MojoDataPump::MojoDataPump(mojo::ScopedDataPipeConsumerHandle receive_stream,
mojo::ScopedDataPipeProducerHandle send_stream)
: receive_stream_(std::move(receive_stream)),
receive_stream_watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL),
send_stream_(std::move(send_stream)),
send_stream_watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL) {
DCHECK(receive_stream_.is_valid());
DCHECK(send_stream_.is_valid());
receive_stream_watcher_.Watch(
receive_stream_.get(),
MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
base::BindRepeating(&MojoDataPump::ReceiveMore, base::Unretained(this)));
send_stream_watcher_.Watch(
send_stream_.get(),
MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
base::BindRepeating(&MojoDataPump::SendMore, base::Unretained(this)));
}
MojoDataPump::~MojoDataPump() {}
void MojoDataPump::Read(net::IOBuffer* io_buffer,
int count,
net::CompletionOnceCallback callback) {
DCHECK(callback);
DCHECK(!read_callback_);
if (count <= 0) {
std::move(callback).Run(net::ERR_INVALID_ARGUMENT);
return;
}
pending_read_buffer_ = io_buffer;
read_size_ = count;
read_callback_ = std::move(callback);
receive_stream_watcher_.ArmOrNotify();
}
void MojoDataPump::Write(net::IOBuffer* io_buffer,
int io_buffer_size,
net::CompletionOnceCallback callback) {
DCHECK(callback);
DCHECK(!write_callback_);
write_callback_ = std::move(callback);
pending_write_buffer_ = io_buffer;
pending_write_buffer_size_ = io_buffer_size;
send_stream_watcher_.ArmOrNotify();
}
void MojoDataPump::ReceiveMore(MojoResult result,
const mojo::HandleSignalsState& state) {
DCHECK(read_callback_);
DCHECK_NE(0u, read_size_);
uint32_t num_bytes = read_size_;
if (result == MOJO_RESULT_OK) {
result = receive_stream_->ReadData(pending_read_buffer_->data(), &num_bytes,
MOJO_READ_DATA_FLAG_NONE);
}
if (result == MOJO_RESULT_SHOULD_WAIT) {
receive_stream_watcher_.ArmOrNotify();
return;
}
read_size_ = 0;
if (result != MOJO_RESULT_OK) {
std::move(read_callback_).Run(net::ERR_FAILED);
return;
}
std::move(read_callback_).Run(num_bytes);
}
void MojoDataPump::SendMore(MojoResult result,
const mojo::HandleSignalsState& state) {
DCHECK(write_callback_);
uint32_t num_bytes = pending_write_buffer_size_;
if (result == MOJO_RESULT_OK) {
result = send_stream_->WriteData(pending_write_buffer_->data(), &num_bytes,
MOJO_WRITE_DATA_FLAG_NONE);
}
if (result == MOJO_RESULT_SHOULD_WAIT) {
send_stream_watcher_.ArmOrNotify();
return;
}
pending_write_buffer_ = nullptr;
pending_write_buffer_size_ = 0;
if (result != MOJO_RESULT_OK) {
std::move(write_callback_).Run(net::ERR_FAILED);
return;
}
std::move(write_callback_).Run(num_bytes);
}
} // namespace cast_channel