blob: 80e7c47d45bf9bf90bb67b3faff8cb4513b91a1d [file] [log] [blame]
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "device/bluetooth/socket.h"
#include <string>
#include <utility>
#include <vector>
#include "base/bind.h"
#include "base/callback_helpers.h"
#include "base/memory/ptr_util.h"
#include "device/bluetooth/bluetooth_socket.h"
#include "mojo/public/cpp/bindings/pending_receiver.h"
#include "mojo/public/cpp/bindings/receiver.h"
#include "net/base/io_buffer.h"
namespace bluetooth {
Socket::Socket(scoped_refptr<device::BluetoothSocket> bluetooth_socket,
mojo::ScopedDataPipeProducerHandle receive_stream,
mojo::ScopedDataPipeConsumerHandle send_stream)
: bluetooth_socket_(std::move(bluetooth_socket)),
receive_stream_(std::move(receive_stream)),
send_stream_(std::move(send_stream)),
receive_stream_watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL),
send_stream_watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL) {
receive_stream_watcher_.Watch(
receive_stream_.get(),
MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
base::BindRepeating(&Socket::OnReceiveStreamWritable,
base::Unretained(this)));
send_stream_watcher_.Watch(
send_stream_.get(),
MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
base::BindRepeating(&Socket::OnSendStreamReadable,
base::Unretained(this)));
ReceiveMore();
SendMore();
}
Socket::~Socket() {
ShutdownReceive();
ShutdownSend();
bluetooth_socket_->Disconnect(base::DoNothing());
}
void Socket::Disconnect(DisconnectCallback callback) {
bluetooth_socket_->Disconnect(std::move(callback));
}
void Socket::OnReceiveStreamWritable(MojoResult result) {
DCHECK(receive_stream_.is_valid());
if (result == MOJO_RESULT_OK) {
ReceiveMore();
return;
}
ShutdownReceive();
}
void Socket::ShutdownReceive() {
receive_stream_watcher_.Cancel();
receive_stream_.reset();
}
void Socket::ReceiveMore() {
DCHECK(receive_stream_.is_valid());
// The destination to which we will write incoming bytes from
// |bluetooth_socket_|. The allocated buffer and its max available size
// (assigned to |pending_write_buffer_max_size|) will be fetched by calling
// BeginWriteData() below. This already-allocated buffer is a buffer shared
// between the 2 sides of |receive_stream_|.
void* pending_write_buffer = nullptr;
// Passing 0 as the initial value allows |pending_write_buffer_max_size| to be
// assigned the buffer's max size.
uint32_t pending_write_buffer_max_size = 0;
MojoResult result = receive_stream_->BeginWriteData(
&pending_write_buffer, &pending_write_buffer_max_size,
MOJO_WRITE_DATA_FLAG_NONE);
if (result == MOJO_RESULT_SHOULD_WAIT) {
receive_stream_watcher_.ArmOrNotify();
return;
} else if (result != MOJO_RESULT_OK) {
ShutdownReceive();
return;
}
bluetooth_socket_->Receive(
pending_write_buffer_max_size,
base::BindOnce(&Socket::OnBluetoothSocketReceive,
weak_ptr_factory_.GetWeakPtr(), pending_write_buffer),
base::BindOnce(&Socket::OnBluetoothSocketReceiveError,
weak_ptr_factory_.GetWeakPtr()));
}
void Socket::OnBluetoothSocketReceive(void* pending_write_buffer,
int num_bytes_received,
scoped_refptr<net::IOBuffer> io_buffer) {
DCHECK_GT(num_bytes_received, 0);
DCHECK(io_buffer->data());
if (!receive_stream_.is_valid())
return;
memcpy(pending_write_buffer, io_buffer->data(), num_bytes_received);
receive_stream_->EndWriteData(static_cast<uint32_t>(num_bytes_received));
ReceiveMore();
}
void Socket::OnBluetoothSocketReceiveError(
device::BluetoothSocket::ErrorReason error_reason,
const std::string& error_message) {
DLOG(ERROR) << "Failed to receive data for reason '" << error_reason << "': '"
<< error_message << "'";
if (receive_stream_.is_valid()) {
receive_stream_->EndWriteData(0);
ShutdownReceive();
}
}
void Socket::OnSendStreamReadable(MojoResult result) {
DCHECK(send_stream_.is_valid());
if (result == MOJO_RESULT_OK)
SendMore();
else
ShutdownSend();
}
void Socket::ShutdownSend() {
send_stream_watcher_.Cancel();
send_stream_.reset();
}
void Socket::SendMore() {
DCHECK(send_stream_.is_valid());
// The source from which we will write outgoing bytes to
// |bluetooth_socket_|. The allocated buffer and the number of bytes already
// written by the other side of |send_stream_| (assigned to
// |pending_read_buffer_size|) will be fetched by calling BeginReadData()
// below. This already-allocated buffer is a buffer shared between the 2 sides
// of |send_stream_|.
const void* pending_read_buffer = nullptr;
// Passing 0 as the initial value allows |pending_read_buffer_size| to be
// assigned the number of bytes that the other side of |send_stream_| has
// already written.
uint32_t pending_read_buffer_size = 0;
MojoResult result = send_stream_->BeginReadData(&pending_read_buffer,
&pending_read_buffer_size,
MOJO_WRITE_DATA_FLAG_NONE);
if (result == MOJO_RESULT_SHOULD_WAIT) {
send_stream_watcher_.ArmOrNotify();
return;
} else if (result != MOJO_RESULT_OK) {
ShutdownSend();
return;
}
bluetooth_socket_->Send(base::MakeRefCounted<net::WrappedIOBuffer>(
static_cast<const char*>(pending_read_buffer)),
pending_read_buffer_size,
base::BindOnce(&Socket::OnBluetoothSocketSend,
weak_ptr_factory_.GetWeakPtr()),
base::BindOnce(&Socket::OnBluetoothSocketSendError,
weak_ptr_factory_.GetWeakPtr()));
}
void Socket::OnBluetoothSocketSend(int num_bytes_sent) {
DCHECK_GE(num_bytes_sent, 0);
if (!send_stream_.is_valid())
return;
send_stream_->EndReadData(static_cast<uint32_t>(num_bytes_sent));
SendMore();
}
void Socket::OnBluetoothSocketSendError(const std::string& error_message) {
DLOG(ERROR) << "Failed to send data: '" << error_message << "'";
if (send_stream_.is_valid()) {
send_stream_->EndReadData(0);
ShutdownSend();
}
}
} // namespace bluetooth