| // Copyright 2021 The Chromium Authors |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "ash/quick_pair/message_stream/message_stream.h" |
| |
| #include "ash/quick_pair/common/fast_pair/fast_pair_metrics.h" |
| #include "base/compiler_specific.h" |
| #include "base/functional/bind.h" |
| #include "base/strings/string_number_conversions.h" |
| #include "chromeos/ash/services/quick_pair/quick_pair_process.h" |
| #include "chromeos/ash/services/quick_pair/quick_pair_process_manager.h" |
| #include "components/cross_device/logging/logging.h" |
| #include "device/bluetooth/bluetooth_socket.h" |
| #include "net/base/io_buffer.h" |
| |
| namespace { |
| |
| constexpr int kMaxBufferSize = 4096; |
| constexpr int kMaxRetryCount = 10; |
| constexpr int kMessageStorageCapacity = 1000; |
| |
| } // namespace |
| |
| namespace ash { |
| namespace quick_pair { |
| |
| MessageStream::MessageStream(const std::string& device_address, |
| scoped_refptr<device::BluetoothSocket> socket) |
| : device_address_(device_address), socket_(socket) { |
| Receive(); |
| } |
| |
| MessageStream::~MessageStream() { |
| if (socket_.get()) |
| socket_->Disconnect(base::DoNothing()); |
| |
| // Notify observers for lifetime management |
| for (auto& obs : observers_) |
| obs.OnMessageStreamDestroyed(device_address_); |
| } |
| |
| void MessageStream::AddObserver(Observer* observer) { |
| observers_.AddObserver(observer); |
| } |
| |
| void MessageStream::RemoveObserver(Observer* observer) { |
| observers_.RemoveObserver(observer); |
| } |
| |
| void MessageStream::Receive() { |
| if (receive_retry_counter_ == kMaxRetryCount) { |
| CD_LOG(WARNING, Feature::FP) |
| << __func__ |
| << ": Failed to receive or parse data from socket more than " |
| << kMaxRetryCount << " times."; |
| |
| if (socket_.get()) { |
| socket_->Disconnect(base::BindOnce(&MessageStream::OnSocketDisconnected, |
| weak_ptr_factory_.GetWeakPtr())); |
| } |
| |
| return; |
| } |
| |
| // Retry receiving data. |
| receive_retry_counter_++; |
| socket_->Receive(/*buffer_size=*/kMaxBufferSize, |
| base::BindOnce(&MessageStream::ReceiveDataSuccess, |
| weak_ptr_factory_.GetWeakPtr()), |
| base::BindOnce(&MessageStream::ReceiveDataError, |
| weak_ptr_factory_.GetWeakPtr())); |
| } |
| |
| void MessageStream::ReceiveDataSuccess(int buffer_size, |
| scoped_refptr<net::IOBuffer> io_buffer) { |
| RecordMessageStreamReceiveResult(/*success=*/true); |
| receive_retry_counter_ = 0; |
| |
| if (!io_buffer->data()) { |
| Receive(); |
| return; |
| } |
| |
| std::vector<uint8_t> message_bytes(buffer_size); |
| for (int i = 0; i < buffer_size; i++) { |
| char* c = UNSAFE_TODO(io_buffer->data() + i); |
| message_bytes[i] = static_cast<uint8_t>(*c); |
| } |
| |
| quick_pair_process::ParseMessageStreamMessages( |
| std::move(message_bytes), |
| base::BindOnce(&MessageStream::ParseMessageStreamSuccess, |
| weak_ptr_factory_.GetWeakPtr()), |
| base::BindOnce(&MessageStream::OnUtilityProcessStopped, |
| weak_ptr_factory_.GetWeakPtr())); |
| } |
| |
| void MessageStream::ReceiveDataError(device::BluetoothSocket::ErrorReason error, |
| const std::string& error_message) { |
| CD_LOG(INFO, Feature::FP) << __func__ << ": Error: " << error_message; |
| RecordMessageStreamReceiveResult(/*success=*/false); |
| RecordMessageStreamReceiveError(error); |
| |
| if (error == device::BluetoothSocket::ErrorReason::kDisconnected) { |
| OnSocketDisconnected(); |
| return; |
| } |
| |
| Receive(); |
| } |
| |
| void MessageStream::Disconnect(base::OnceClosure on_disconnect_callback) { |
| CD_LOG(INFO, Feature::FP) << __func__; |
| |
| // If we already have disconnected the socket, then we can run the callback. |
| // This can happen since the socket might have disconnected previously but |
| // we kept the MessageStream instance alive to preserve messages from the |
| // corresponding device. |
| if (!socket_.get()) { |
| std::move(on_disconnect_callback).Run(); |
| return; |
| } |
| |
| socket_->Disconnect(base::BindOnce( |
| &MessageStream::OnSocketDisconnectedWithCallback, |
| weak_ptr_factory_.GetWeakPtr(), std::move(on_disconnect_callback))); |
| } |
| |
| void MessageStream::OnSocketDisconnected() { |
| for (auto& obs : observers_) |
| obs.OnDisconnected(device_address_); |
| } |
| |
| void MessageStream::OnSocketDisconnectedWithCallback( |
| base::OnceClosure on_disconnect_callback) { |
| OnSocketDisconnected(); |
| std::move(on_disconnect_callback).Run(); |
| } |
| |
| void MessageStream::ParseMessageStreamSuccess( |
| std::vector<mojom::MessageStreamMessagePtr> messages) { |
| CD_LOG(VERBOSE, Feature::FP) << __func__; |
| |
| if (messages.empty()) { |
| CD_LOG(WARNING, Feature::FP) << __func__ << ": no messages"; |
| Receive(); |
| return; |
| } |
| |
| // Store messages and notify observers. |
| for (size_t i = 0; i < messages.size(); ++i) { |
| if (messages_.size() == kMessageStorageCapacity) |
| messages_.pop_front(); |
| |
| messages_.push_back(std::move(messages[i])); |
| NotifyObservers(messages_.back()); |
| } |
| |
| // Attempt to receive new messages from socket. |
| Receive(); |
| } |
| |
| std::string MessageStream::MessageStreamMessageTypeToString( |
| const mojom::MessageStreamMessagePtr& message) { |
| if (message->is_model_id()) |
| return "Model ID"; |
| |
| if (message->is_ble_address_update()) |
| return "BLE address update"; |
| |
| if (message->is_battery_update()) |
| return "Battery Update"; |
| |
| if (message->is_remaining_battery_time()) |
| return "Remaining Battery Time"; |
| |
| if (message->is_enable_silence_mode()) |
| return "Enable Silence Mode"; |
| |
| if (message->is_companion_app_log_buffer_full()) |
| return "Companion App Log Buffer Full"; |
| |
| if (message->is_active_components_byte()) |
| return "Active Components Byte"; |
| |
| if (message->is_ring_device_event()) |
| return "Ring Device Event"; |
| |
| if (message->is_acknowledgement()) |
| return "Acknowledgement"; |
| |
| if (message->is_sdk_version()) |
| return "SDK version"; |
| |
| NOTREACHED(); |
| } |
| |
| void MessageStream::NotifyObservers( |
| const mojom::MessageStreamMessagePtr& message) { |
| CD_LOG(VERBOSE, Feature::FP) << __func__ << ": MessageStreamMessagePtr is " |
| << MessageStreamMessageTypeToString(message); |
| |
| if (message->is_model_id()) { |
| for (auto& obs : observers_) |
| obs.OnModelIdMessage(device_address_, message->get_model_id()); |
| |
| return; |
| } |
| |
| if (message->is_ble_address_update()) { |
| for (auto& obs : observers_) |
| obs.OnBleAddressUpdateMessage(device_address_, |
| message->get_ble_address_update()); |
| |
| return; |
| } |
| |
| if (message->is_battery_update()) { |
| for (auto& obs : observers_) |
| obs.OnBatteryUpdateMessage(device_address_, |
| std::move(message->get_battery_update())); |
| |
| return; |
| } |
| |
| if (message->is_remaining_battery_time()) { |
| for (auto& obs : observers_) |
| obs.OnRemainingBatteryTimeMessage(device_address_, |
| message->get_remaining_battery_time()); |
| |
| return; |
| } |
| |
| if (message->is_enable_silence_mode()) { |
| for (auto& obs : observers_) |
| obs.OnEnableSilenceModeMessage(device_address_, |
| message->get_enable_silence_mode()); |
| |
| return; |
| } |
| |
| if (message->is_companion_app_log_buffer_full()) { |
| for (auto& obs : observers_) |
| obs.OnCompanionAppLogBufferFullMessage(device_address_); |
| |
| return; |
| } |
| |
| if (message->is_active_components_byte()) { |
| for (auto& obs : observers_) |
| obs.OnActiveComponentsMessage(device_address_, |
| message->get_active_components_byte()); |
| |
| return; |
| } |
| |
| if (message->is_ring_device_event()) { |
| for (auto& obs : observers_) |
| obs.OnRingDeviceMessage(device_address_, |
| std::move(message->get_ring_device_event())); |
| |
| return; |
| } |
| |
| if (message->is_acknowledgement()) { |
| for (auto& obs : observers_) |
| obs.OnAcknowledgementMessage(device_address_, |
| std::move(message->get_acknowledgement())); |
| |
| return; |
| } |
| |
| if (message->is_sdk_version()) { |
| for (auto& obs : observers_) |
| obs.OnAndroidSdkVersionMessage(device_address_, |
| message->get_sdk_version()); |
| |
| return; |
| } |
| |
| CD_LOG(WARNING, Feature::FP) << __func__ << ": unexpected message type."; |
| NOTREACHED(); |
| } |
| |
| void MessageStream::OnUtilityProcessStopped( |
| QuickPairProcessManager::ShutdownReason shutdown_reason) { |
| CD_LOG(INFO, Feature::FP) << __func__ << ": Error: " << shutdown_reason; |
| |
| receive_retry_counter_++; |
| Receive(); |
| } |
| |
| } // namespace quick_pair |
| } // namespace ash |