|  | // Copyright 2021 The Chromium Authors | 
|  | // Use of this source code is governed by a BSD-style license that can be | 
|  | // found in the LICENSE file. | 
|  |  | 
|  | #include "ash/quick_pair/message_stream/message_stream.h" | 
|  |  | 
|  | #include "ash/quick_pair/common/fast_pair/fast_pair_metrics.h" | 
|  | #include "base/compiler_specific.h" | 
|  | #include "base/functional/bind.h" | 
|  | #include "base/strings/string_number_conversions.h" | 
|  | #include "chromeos/ash/services/quick_pair/quick_pair_process.h" | 
|  | #include "chromeos/ash/services/quick_pair/quick_pair_process_manager.h" | 
|  | #include "components/cross_device/logging/logging.h" | 
|  | #include "device/bluetooth/bluetooth_socket.h" | 
|  | #include "net/base/io_buffer.h" | 
|  |  | 
|  | namespace { | 
|  |  | 
|  | constexpr int kMaxBufferSize = 4096; | 
|  | constexpr int kMaxRetryCount = 10; | 
|  | constexpr int kMessageStorageCapacity = 1000; | 
|  |  | 
|  | }  // namespace | 
|  |  | 
|  | namespace ash { | 
|  | namespace quick_pair { | 
|  |  | 
|  | MessageStream::MessageStream(const std::string& device_address, | 
|  | scoped_refptr<device::BluetoothSocket> socket) | 
|  | : device_address_(device_address), socket_(socket) { | 
|  | Receive(); | 
|  | } | 
|  |  | 
|  | MessageStream::~MessageStream() { | 
|  | if (socket_.get()) | 
|  | socket_->Disconnect(base::DoNothing()); | 
|  |  | 
|  | // Notify observers for lifetime management | 
|  | for (auto& obs : observers_) | 
|  | obs.OnMessageStreamDestroyed(device_address_); | 
|  | } | 
|  |  | 
|  | void MessageStream::AddObserver(Observer* observer) { | 
|  | observers_.AddObserver(observer); | 
|  | } | 
|  |  | 
|  | void MessageStream::RemoveObserver(Observer* observer) { | 
|  | observers_.RemoveObserver(observer); | 
|  | } | 
|  |  | 
|  | void MessageStream::Receive() { | 
|  | if (receive_retry_counter_ == kMaxRetryCount) { | 
|  | CD_LOG(WARNING, Feature::FP) | 
|  | << __func__ | 
|  | << ": Failed to receive or parse data from socket more than " | 
|  | << kMaxRetryCount << " times."; | 
|  |  | 
|  | if (socket_.get()) { | 
|  | socket_->Disconnect(base::BindOnce(&MessageStream::OnSocketDisconnected, | 
|  | weak_ptr_factory_.GetWeakPtr())); | 
|  | } | 
|  |  | 
|  | return; | 
|  | } | 
|  |  | 
|  | // Retry receiving data. | 
|  | receive_retry_counter_++; | 
|  | socket_->Receive(/*buffer_size=*/kMaxBufferSize, | 
|  | base::BindOnce(&MessageStream::ReceiveDataSuccess, | 
|  | weak_ptr_factory_.GetWeakPtr()), | 
|  | base::BindOnce(&MessageStream::ReceiveDataError, | 
|  | weak_ptr_factory_.GetWeakPtr())); | 
|  | } | 
|  |  | 
|  | void MessageStream::ReceiveDataSuccess(int buffer_size, | 
|  | scoped_refptr<net::IOBuffer> io_buffer) { | 
|  | RecordMessageStreamReceiveResult(/*success=*/true); | 
|  | receive_retry_counter_ = 0; | 
|  |  | 
|  | if (!io_buffer->data()) { | 
|  | Receive(); | 
|  | return; | 
|  | } | 
|  |  | 
|  | std::vector<uint8_t> message_bytes(buffer_size); | 
|  | for (int i = 0; i < buffer_size; i++) { | 
|  | char* c = UNSAFE_TODO(io_buffer->data() + i); | 
|  | message_bytes[i] = static_cast<uint8_t>(*c); | 
|  | } | 
|  |  | 
|  | quick_pair_process::ParseMessageStreamMessages( | 
|  | std::move(message_bytes), | 
|  | base::BindOnce(&MessageStream::ParseMessageStreamSuccess, | 
|  | weak_ptr_factory_.GetWeakPtr()), | 
|  | base::BindOnce(&MessageStream::OnUtilityProcessStopped, | 
|  | weak_ptr_factory_.GetWeakPtr())); | 
|  | } | 
|  |  | 
|  | void MessageStream::ReceiveDataError(device::BluetoothSocket::ErrorReason error, | 
|  | const std::string& error_message) { | 
|  | CD_LOG(INFO, Feature::FP) << __func__ << ": Error: " << error_message; | 
|  | RecordMessageStreamReceiveResult(/*success=*/false); | 
|  | RecordMessageStreamReceiveError(error); | 
|  |  | 
|  | if (error == device::BluetoothSocket::ErrorReason::kDisconnected) { | 
|  | OnSocketDisconnected(); | 
|  | return; | 
|  | } | 
|  |  | 
|  | Receive(); | 
|  | } | 
|  |  | 
|  | void MessageStream::Disconnect(base::OnceClosure on_disconnect_callback) { | 
|  | CD_LOG(INFO, Feature::FP) << __func__; | 
|  |  | 
|  | // If we already have disconnected the socket, then we can run the callback. | 
|  | // This can happen since the socket might have disconnected previously but | 
|  | // we kept the MessageStream instance alive to preserve messages from the | 
|  | // corresponding device. | 
|  | if (!socket_.get()) { | 
|  | std::move(on_disconnect_callback).Run(); | 
|  | return; | 
|  | } | 
|  |  | 
|  | socket_->Disconnect(base::BindOnce( | 
|  | &MessageStream::OnSocketDisconnectedWithCallback, | 
|  | weak_ptr_factory_.GetWeakPtr(), std::move(on_disconnect_callback))); | 
|  | } | 
|  |  | 
|  | void MessageStream::OnSocketDisconnected() { | 
|  | for (auto& obs : observers_) | 
|  | obs.OnDisconnected(device_address_); | 
|  | } | 
|  |  | 
|  | void MessageStream::OnSocketDisconnectedWithCallback( | 
|  | base::OnceClosure on_disconnect_callback) { | 
|  | OnSocketDisconnected(); | 
|  | std::move(on_disconnect_callback).Run(); | 
|  | } | 
|  |  | 
|  | void MessageStream::ParseMessageStreamSuccess( | 
|  | std::vector<mojom::MessageStreamMessagePtr> messages) { | 
|  | CD_LOG(VERBOSE, Feature::FP) << __func__; | 
|  |  | 
|  | if (messages.empty()) { | 
|  | CD_LOG(WARNING, Feature::FP) << __func__ << ": no messages"; | 
|  | Receive(); | 
|  | return; | 
|  | } | 
|  |  | 
|  | // Store messages and notify observers. | 
|  | for (size_t i = 0; i < messages.size(); ++i) { | 
|  | if (messages_.size() == kMessageStorageCapacity) | 
|  | messages_.pop_front(); | 
|  |  | 
|  | messages_.push_back(std::move(messages[i])); | 
|  | NotifyObservers(messages_.back()); | 
|  | } | 
|  |  | 
|  | // Attempt to receive new messages from socket. | 
|  | Receive(); | 
|  | } | 
|  |  | 
|  | std::string MessageStream::MessageStreamMessageTypeToString( | 
|  | const mojom::MessageStreamMessagePtr& message) { | 
|  | if (message->is_model_id()) | 
|  | return "Model ID"; | 
|  |  | 
|  | if (message->is_ble_address_update()) | 
|  | return "BLE address update"; | 
|  |  | 
|  | if (message->is_battery_update()) | 
|  | return "Battery Update"; | 
|  |  | 
|  | if (message->is_remaining_battery_time()) | 
|  | return "Remaining Battery Time"; | 
|  |  | 
|  | if (message->is_enable_silence_mode()) | 
|  | return "Enable Silence Mode"; | 
|  |  | 
|  | if (message->is_companion_app_log_buffer_full()) | 
|  | return "Companion App Log Buffer Full"; | 
|  |  | 
|  | if (message->is_active_components_byte()) | 
|  | return "Active Components Byte"; | 
|  |  | 
|  | if (message->is_ring_device_event()) | 
|  | return "Ring Device Event"; | 
|  |  | 
|  | if (message->is_acknowledgement()) | 
|  | return "Acknowledgement"; | 
|  |  | 
|  | if (message->is_sdk_version()) | 
|  | return "SDK version"; | 
|  |  | 
|  | NOTREACHED(); | 
|  | } | 
|  |  | 
|  | void MessageStream::NotifyObservers( | 
|  | const mojom::MessageStreamMessagePtr& message) { | 
|  | CD_LOG(VERBOSE, Feature::FP) << __func__ << ": MessageStreamMessagePtr is " | 
|  | << MessageStreamMessageTypeToString(message); | 
|  |  | 
|  | if (message->is_model_id()) { | 
|  | for (auto& obs : observers_) | 
|  | obs.OnModelIdMessage(device_address_, message->get_model_id()); | 
|  |  | 
|  | return; | 
|  | } | 
|  |  | 
|  | if (message->is_ble_address_update()) { | 
|  | for (auto& obs : observers_) | 
|  | obs.OnBleAddressUpdateMessage(device_address_, | 
|  | message->get_ble_address_update()); | 
|  |  | 
|  | return; | 
|  | } | 
|  |  | 
|  | if (message->is_battery_update()) { | 
|  | for (auto& obs : observers_) | 
|  | obs.OnBatteryUpdateMessage(device_address_, | 
|  | std::move(message->get_battery_update())); | 
|  |  | 
|  | return; | 
|  | } | 
|  |  | 
|  | if (message->is_remaining_battery_time()) { | 
|  | for (auto& obs : observers_) | 
|  | obs.OnRemainingBatteryTimeMessage(device_address_, | 
|  | message->get_remaining_battery_time()); | 
|  |  | 
|  | return; | 
|  | } | 
|  |  | 
|  | if (message->is_enable_silence_mode()) { | 
|  | for (auto& obs : observers_) | 
|  | obs.OnEnableSilenceModeMessage(device_address_, | 
|  | message->get_enable_silence_mode()); | 
|  |  | 
|  | return; | 
|  | } | 
|  |  | 
|  | if (message->is_companion_app_log_buffer_full()) { | 
|  | for (auto& obs : observers_) | 
|  | obs.OnCompanionAppLogBufferFullMessage(device_address_); | 
|  |  | 
|  | return; | 
|  | } | 
|  |  | 
|  | if (message->is_active_components_byte()) { | 
|  | for (auto& obs : observers_) | 
|  | obs.OnActiveComponentsMessage(device_address_, | 
|  | message->get_active_components_byte()); | 
|  |  | 
|  | return; | 
|  | } | 
|  |  | 
|  | if (message->is_ring_device_event()) { | 
|  | for (auto& obs : observers_) | 
|  | obs.OnRingDeviceMessage(device_address_, | 
|  | std::move(message->get_ring_device_event())); | 
|  |  | 
|  | return; | 
|  | } | 
|  |  | 
|  | if (message->is_acknowledgement()) { | 
|  | for (auto& obs : observers_) | 
|  | obs.OnAcknowledgementMessage(device_address_, | 
|  | std::move(message->get_acknowledgement())); | 
|  |  | 
|  | return; | 
|  | } | 
|  |  | 
|  | if (message->is_sdk_version()) { | 
|  | for (auto& obs : observers_) | 
|  | obs.OnAndroidSdkVersionMessage(device_address_, | 
|  | message->get_sdk_version()); | 
|  |  | 
|  | return; | 
|  | } | 
|  |  | 
|  | CD_LOG(WARNING, Feature::FP) << __func__ << ": unexpected message type."; | 
|  | NOTREACHED(); | 
|  | } | 
|  |  | 
|  | void MessageStream::OnUtilityProcessStopped( | 
|  | QuickPairProcessManager::ShutdownReason shutdown_reason) { | 
|  | CD_LOG(INFO, Feature::FP) << __func__ << ": Error: " << shutdown_reason; | 
|  |  | 
|  | receive_retry_counter_++; | 
|  | Receive(); | 
|  | } | 
|  |  | 
|  | }  // namespace quick_pair | 
|  | }  // namespace ash |