| # Lint as: python2, python3 |
| # -*- coding: utf-8 -*- |
| |
| # Copyright 2019 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """This module implements the PeripheralKit instance for a bluez peripheral |
| on Raspberry Pi. |
| """ |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import logging |
| import os |
| import re |
| import sys |
| import threading |
| import time |
| |
| import dbus |
| import dbus.mainloop.glib |
| import dbus.service |
| |
| |
| # Libraries needed on raspberry pi. ImportError on |
| # Fizz can be ignored. |
| try: |
| from gi.repository import GLib |
| except ImportError: |
| pass |
| |
| |
| from chameleond.utils import pairing_agent |
| from chameleond.utils import system_tools |
| from chameleond.utils.bluez_observer import BlueZObserver |
| from chameleond.utils.raspi_bluez_service import BluezHIDService |
| from chameleond.utils.raspi_bluez_service import BluezRfcommEchoService |
| from six.moves import range |
| |
| from . import bluetooth_socket |
| from .bluetooth_peripheral_kit import PeripheralKit |
| from .bluetooth_peripheral_kit import PeripheralKitException |
| from .bluez_gatt_server import GATTServer |
| from .bluez_gatt_server import PUBLIC_DEVICE_ADDRESS |
| from .bluez_gatt_server import RANDOM_DEVICE_ADDRESS |
| from .bluez_service_consts import BLUEZ_HID_SERVICE_NAME |
| from .bluez_service_consts import BLUEZ_HID_SERVICE_PATH |
| from .bluez_service_consts import BTD_CONF_FILE |
| from .bluez_service_consts import PERIPHERAL_BTD_FLAGS |
| from .bluez_service_consts import PERIPHERAL_DEVICE_CLASS |
| from .bluez_service_consts import PERIPHERAL_DEVICE_NAME |
| from .hci_cmd import HciCommands |
| from .hci_cmd import HciTool |
| |
| |
| DBUS_BLUEZ_SERVICE_IFACE = "org.bluez" |
| DBUS_BLUEZ_ADAPTER_IFACE = DBUS_BLUEZ_SERVICE_IFACE + ".Adapter1" |
| DBUS_BLUEZ_DEVICE_IFACE = DBUS_BLUEZ_SERVICE_IFACE + ".Device1" |
| DBUS_PROP_IFACE = "org.freedesktop.DBus.Properties" |
| DBUS_OBJECT_MANAGER_INTERFACE = "org.freedesktop.DBus.ObjectManager" |
| |
| # Time to wait for bluetoothd to start |
| BLUETOOTHD_WAIT_TIME = 30 |
| MAX_DBUS_RETRY_ATTEMPTS = 3 |
| CONNECTION_WAIT_TIME = 10 |
| |
| # Definitions of mouse button HID encodings |
| RAW_HID_BUTTONS_RELEASED = 0x0 |
| RAW_HID_LEFT_BUTTON = 0x01 |
| RAW_HID_RIGHT_BUTTON = 0x02 |
| |
| # Definitions of gamepad button HID encodings |
| RAW_HID_GAMEPAD_BTN_START = 0x08 |
| RAW_HID_GAMEPAD_BTN_A = 0x01 |
| RAW_HID_GAMEPAD_BTN_B = 0x02 |
| RAW_HID_GAMEPAD_BTN_X = 0x08 |
| RAW_HID_GAMEPAD_BTN_Y = 0x10 |
| RAW_HID_GAMEPAD_BTN_LEFT = 0x07 |
| RAW_HID_GAMEPAD_BTN_RIGHT = 0x03 |
| RAW_HID_GAMEPAD_BTN_UP = 0x01 |
| RAW_HID_GAMEPAD_BTN_DOWN = 0x05 |
| RAW_HID_GAMEPAD_LB = 0x40 |
| RAW_HID_GAMEPAD_RB = 0x80 |
| RAW_HID_GAMEPAD_LS = 0x20 |
| RAW_HID_GAMEPAD_RS = 0x40 |
| |
| # UART input modes |
| # raw mode |
| UART_INPUT_RAW_MODE = 0xFD |
| RAW_REPORT_START = 0xA1 |
| # Length of report format for keyboard |
| RAW_REPORT_FORMAT_KEYBOARD_LENGTH = 9 |
| RAW_REPORT_FORMAT_KEYBOARD_DESCRIPTOR = 1 |
| RAW_REPORT_FORMAT_KEYBOARD_LEN_SCAN_CODES = 6 |
| |
| # shorthand mode |
| UART_INPUT_SHORTHAND_MODE = 0xFE |
| SHORTHAND_REPORT_FORMAT_KEYBOARD_MAX_LEN_SCAN_CODES = 6 |
| # Length of report format for mouse |
| RAW_REPORT_FORMAT_MOUSE_LENGTH = 5 |
| RAW_REPORT_FORMAT_MOUSE_DESCRIPTOR = 2 |
| |
| # Length of report format for gamepad |
| RAW_REPORT_FORMAT_GAMEPAD_LENGTH = 1 |
| RAW_REPORT_FORMAT_GAMEPAD_DESCRIPTOR = 4 |
| |
| # Time to wait about the pairing agent to exit. |
| AGENT_EXIT_TIMEOUT_SECS = 5 |
| |
| GAMEPAD_BUTTONS_MAP = { |
| PeripheralKit.GAMEPAD_BUTTON_START: RAW_HID_GAMEPAD_BTN_START, |
| PeripheralKit.GAMEPAD_BUTTON_A: RAW_HID_GAMEPAD_BTN_A, |
| PeripheralKit.GAMEPAD_BUTTON_B: RAW_HID_GAMEPAD_BTN_B, |
| PeripheralKit.GAMEPAD_BUTTON_X: RAW_HID_GAMEPAD_BTN_X, |
| PeripheralKit.GAMEPAD_BUTTON_Y: RAW_HID_GAMEPAD_BTN_Y, |
| PeripheralKit.GAMEPAD_BUTTON_LEFT: RAW_HID_GAMEPAD_BTN_LEFT, |
| PeripheralKit.GAMEPAD_BUTTON_RIGHT: RAW_HID_GAMEPAD_BTN_RIGHT, |
| PeripheralKit.GAMEPAD_BUTTON_UP: RAW_HID_GAMEPAD_BTN_UP, |
| PeripheralKit.GAMEPAD_BUTTON_DOWN: RAW_HID_GAMEPAD_BTN_DOWN, |
| PeripheralKit.GAMEPAD_BUTTON_LEFT_BUMPER: RAW_HID_GAMEPAD_LB, |
| PeripheralKit.GAMEPAD_BUTTON_RIGHT_BUMPER: RAW_HID_GAMEPAD_RB, |
| PeripheralKit.GAMEPAD_LEFT_THUMBSTICK: RAW_HID_GAMEPAD_LS, |
| PeripheralKit.GAMEPAD_RIGHT_THUMBSTICK: RAW_HID_GAMEPAD_RS, |
| } |
| |
| GAMEPAD_BUTTONS_REPORT_INDEX = { |
| "LEFT_STICK": 0, |
| "RIGHT_STICK": 4, |
| "LEFT_TRIGGER": 8, |
| "RIGHT_TRIGGER": 10, |
| "GAMEPAD_BUTTON_LEFT": 12, |
| "GAMEPAD_BUTTON_RIGHT": 12, |
| "GAMEPAD_BUTTON_UP": 12, |
| "GAMEPAD_BUTTON_DOWN": 12, |
| "GAMEPAD_BUTTON_A": 13, |
| "GAMEPAD_BUTTON_B": 13, |
| "GAMEPAD_BUTTON_X": 13, |
| "GAMEPAD_BUTTON_Y": 13, |
| "GAMEPAD_BUTTON_LEFT_BUMPER": 13, |
| "GAMEPAD_BUTTON_RIGHT_BUMPER": 13, |
| "GAMEPAD_LEFT_THUMBSTICK": 14, |
| "GAMEPAD_RIGHT_THUMBSTICK": 14, |
| "GAMEPAD_BUTTON_START": 14, |
| } |
| |
| |
| class BluezPeripheralException(PeripheralKitException): |
| """A dummy exception class for Bluez class.""" |
| |
| pass |
| |
| |
| class BluezPeripheral(PeripheralKit): |
| """This is an abstraction of a Bluez peripheral.""" |
| |
| def __init__(self): |
| super(BluezPeripheral, self).__init__() |
| self._settings = {} |
| self._setup_dbus_loop() |
| |
| self.remote_address = None |
| self.device_type = None |
| self._address = None |
| self._bluez_service = None |
| self._service = None |
| self._gatt_server = None |
| |
| # Bluez DBus constants - npnext |
| self._service_iface = None |
| self._adapter_iface = None |
| self._dbus_system_bus = dbus.SystemBus() |
| self._dbus_hci_adapter_path = "/org/bluez/hci0" |
| self._dbus_hci_props = None |
| |
| # Make sure adapter is powered for tests. _set_hci_prop will connect |
| # self._dbus_hci_props, and even start bluetoothd if necessary |
| self._set_hci_prop("org.bluez.Adapter1", "Powered", dbus.Boolean(1)) |
| |
| self._agent = None |
| # 16 is the length of gamepad report bytes that changed according to |
| # pressed buttons. |
| self.gamepad_report = [0x00] * 16 |
| |
| self.bluez_observer = BlueZObserver(self._dbus_system_bus) |
| |
| logging.debug( |
| "Bluetooth peripheral powered and waiting for bind to device" |
| ) |
| |
| def __del__(self): |
| """Quit the mainloop when done.""" |
| self._loop.quit() |
| |
| def cleanup(self): |
| logging.info("BluezPeripheral: cleanup") |
| if self._gatt_server: |
| self._gatt_server.cleanup() |
| self._gatt_server = None |
| |
| if self._bluez_service: |
| self._bluez_service.cleanup() |
| self._bluez_service = None |
| |
| if self._agent: |
| self.StopPairingAgent() |
| |
| if self.bluez_observer: |
| self.bluez_observer.cleanup() |
| |
| self._loop.quit() |
| |
| def get_service_iface(self): |
| if not self._service_iface: |
| self._service = self._dbus_system_bus.get_object( |
| BLUEZ_HID_SERVICE_NAME, BLUEZ_HID_SERVICE_PATH |
| ) |
| self._service_iface = dbus.Interface( |
| self._service, BLUEZ_HID_SERVICE_NAME |
| ) |
| |
| return self._service_iface |
| |
| def StartPairingAgent(self, capability): |
| """Start a pairing agent with the specified capability. |
| |
| Args: |
| capability: the capability of the device. |
| The capability parameter can have the values: |
| DisplayOnly, DisplayYesNo, KeyboardOnly, NoInputNoOutput |
| and KeyboardDisplay. |
| Refer to bluez/doc/agent-api.txt for more details. |
| """ |
| if self._agent: |
| self.StopPairingAgent() |
| |
| agent_registered_event = threading.Event() |
| |
| self._agent = threading.Thread( |
| target=pairing_agent.SetupAgent, |
| args=(capability, agent_registered_event), |
| ) |
| self._agent.start() |
| agent_registered_event.wait() |
| logging.info( |
| "The pairing agent is started with capability %s", capability |
| ) |
| |
| def StopPairingAgent(self): |
| """Stop the pairing agent.""" |
| logging.info("Stop the pairing agent") |
| if self._agent: |
| pairing_agent.StopAgent() |
| self._agent.join(AGENT_EXIT_TIMEOUT_SECS) |
| logging.info("agent exited and joined") |
| self._agent = None |
| |
| def _setup_dbus_loop(self): |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
| self._loop = GLib.MainLoop() |
| self._thread = threading.Thread(target=self._loop.run) |
| self._thread.start() |
| |
| def _kill_bluetoothd(self): |
| """Calls for bluetoothd to be killed, in case it is in a bad state""" |
| |
| os.system("killall -9 bluetoothd") |
| |
| def _start_bluetoothd(self): |
| """Calls for bluetoothd to be started, if not started already""" |
| |
| os.system("sudo service bluetooth start") |
| |
| def unpack_if_variant(self, value): |
| """If given value is GLib.Variant, unpack it to the actual type.""" |
| if isinstance(value, GLib.Variant): |
| return value.unpack() |
| return value |
| |
| def _check_prop_handle(self, handle): |
| """Checks if bluetoothd is running by accessing an adapter object""" |
| |
| try: |
| handle.Get("org.bluez.Adapter1", "Powered") |
| return True |
| |
| except Exception as e: |
| logging.info( |
| "Failed to retrieve bluez adapter property: %s", str(e) |
| ) |
| return False |
| |
| def _create_prop_handle(self): |
| """Establish dbus link to bluetoothd for hci properties |
| |
| Returns: dbus interface to bluetoothd properties |
| """ |
| |
| return dbus.Interface( |
| self._dbus_system_bus.get_object("org.bluez", "/org/bluez/hci0"), |
| "org.freedesktop.DBus.Properties", |
| ) |
| |
| def _refresh_hci_prop_handle(self): |
| """Initializes a dbus handle to hci properties""" |
| |
| # If we already have a link to bluetoothd, just return it |
| if self._dbus_hci_props is not None: |
| return |
| |
| # Next step, try to link to already-running bluetoothd |
| handle = self._create_prop_handle() |
| |
| if self._check_prop_handle(handle): |
| self._dbus_hci_props = handle |
| return |
| |
| # A pgrep return of 0 indicates that a process was found. If bluetoothd is |
| # running and we can't reach it, kill and restart to get it out of bad state |
| if os.system("pgrep bluetoothd") == 0: |
| logging.error("Could not reach bluetoothd, killing") |
| self._kill_bluetoothd() |
| |
| logging.info("Starting bluetooth service") |
| self._start_bluetoothd() |
| |
| # Since bluetoothd can be delayed by system conditions outside our control, |
| # we wait and refresh our link to its dbus properties until it is ready |
| for _ in range(0, BLUETOOTHD_WAIT_TIME): |
| handle = self._create_prop_handle() |
| if self._check_prop_handle(handle): |
| logging.info("Bluetooth ready, returning link") |
| self._dbus_hci_props = handle |
| return |
| |
| time.sleep(1) |
| |
| def _set_hci_prop(self, iface_name, prop_name, prop_val): |
| """Handles HCI property set |
| |
| Sometimes resource isn't ready, so this utility will repeat commands |
| multiple times to allow more reliable operation |
| |
| Returns: |
| True if set succeeds, False otherwise |
| """ |
| |
| # Make sure we have a working handle to bluetoothd |
| self._refresh_hci_prop_handle() |
| |
| for _ in range(MAX_DBUS_RETRY_ATTEMPTS): |
| try: |
| initial_value = self._dbus_hci_props.Get(iface_name, prop_name) |
| |
| # Return if property has value we want already |
| if initial_value == prop_val: |
| return True |
| |
| # Set property to the value we want |
| logging.info( |
| "{}: {} -> {}".format(prop_name, initial_value, prop_val) |
| ) |
| self._dbus_hci_props.Set(iface_name, prop_name, prop_val) |
| |
| time.sleep(0.1) |
| |
| except dbus.exceptions.DBusException as e: |
| error_msg = "Setting {} to {} again: {}".format( |
| prop_name, prop_val, str(e) |
| ) |
| logging.error(error_msg) |
| time.sleep(0.1) |
| |
| return False |
| |
| def _get_hci_prop(self, iface_name, prop_name): |
| """Handles HCI property get |
| |
| Args: iface_name: Dbus interface to be queried |
| Args: prop_name: Property name to be queried |
| |
| Returns: property value, or None on error |
| """ |
| |
| # Make sure we have a working handle to bluetoothd |
| self._refresh_hci_prop_handle() |
| |
| for _ in range(MAX_DBUS_RETRY_ATTEMPTS): |
| try: |
| return self._dbus_hci_props.Get(iface_name, prop_name) |
| |
| except dbus.exceptions.DBusException as e: |
| error_msg = "Will try to get {} again: {}".format( |
| prop_name, str(e) |
| ) |
| logging.error(error_msg) |
| time.sleep(0.1) |
| |
| return None |
| |
| def get_device_property(self, address, prop_name): |
| """Reads a property of remote bluetooth device. |
| |
| Args: |
| address: string address of remote bluetooth device. |
| |
| Returns: |
| prop_name: property to be queried. |
| """ |
| device = self.GetDeviceWithAddress(address) |
| prop_val = None |
| |
| if device: |
| prop_val = self.unpack_if_variant( |
| device[DBUS_PROP_IFACE].Get(DBUS_BLUEZ_DEVICE_IFACE, prop_name) |
| ) |
| else: |
| logging.error( |
| f"get_device_property: Failed to get device with the address {address}" |
| ) |
| return prop_val |
| |
| def SetTrustedByRemoteAddress(self, remote_address, trusted=True): |
| device = self.GetDeviceWithAddress(remote_address) |
| if device is None: |
| logging.error( |
| "Failed to get device with the address %s", remote_address |
| ) |
| return False |
| |
| try: |
| properties = dbus.Interface(device, DBUS_PROP_IFACE) |
| properties.Set( |
| DBUS_BLUEZ_DEVICE_IFACE, |
| "Trusted", |
| dbus.Boolean(trusted, variant_level=1), |
| ) |
| return True |
| except Exception as e: |
| logging.error("SetTrustedByRemoteAddress: %s", e) |
| except: |
| logging.error("SetTrustedByRemoteAddress: unexpected error") |
| return False |
| |
| def GetCapabilities(self): |
| """What can this kit do/not do that tests need to adjust for? |
| |
| Returns: |
| A dictionary from PeripheralKit.CAP_* strings to an appropriate value. |
| See PeripheralKit for details. |
| """ |
| capabilities = { |
| PeripheralKit.CAP_TRANSPORTS: [ |
| PeripheralKit.TRANSPORT_BREDR, |
| PeripheralKit.TRANSPORT_LE, |
| ], |
| PeripheralKit.CAP_HAS_PIN: True, |
| PeripheralKit.CAP_INIT_CONNECT: True, |
| } |
| |
| return capabilities |
| |
| def EnableBLE(self, use_ble): |
| """Put device into either LE or Classic mode |
| |
| The raspi is a dual device, supporting both LE and Classic BT. If we are |
| testing LE connections, I don't want the DUT to successfully be able to |
| make a classic connection, for instance, so the unwanted one is disabled |
| """ |
| |
| if use_ble: |
| state_change_cmds = [ |
| "sudo btmgmt power off", |
| "sudo btmgmt le on", |
| "sudo btmgmt bredr off", |
| "sudo btmgmt sc on", |
| "sudo btmgmt power on", |
| ] |
| |
| else: |
| state_change_cmds = [ |
| "sudo btmgmt power off", |
| "sudo btmgmt bredr on", |
| "sudo btmgmt le off", |
| "sudo btmgmt ssp on", |
| "sudo btmgmt power on", |
| ] |
| |
| for cmd in state_change_cmds: |
| os.system(cmd) |
| |
| # I add a slight delay so there is adequate time for the change to be done |
| time.sleep(0.1) |
| |
| def GetBaseDeviceType(self, device_type): |
| """Returns the base device type of a peripheral, i.e. BLE_MOUSE -> MOUSE""" |
| |
| if "BLE_" in device_type: |
| device_type = device_type.replace("BLE_", "") |
| |
| return device_type |
| |
| def SpecifyDeviceType(self, device_type): |
| """Instantiates one of our supported devices |
| |
| The raspi stack is designed to emulate a single device at a time. This |
| function is called by autotest and defines which device must be emulated |
| for the current test |
| |
| Args: |
| device_type: String device type, e.g. "MOUSE" |
| """ |
| |
| # Do nothing if we were already bound to this device |
| if self.device_type == device_type: |
| return True |
| |
| if self.device_type is not None: |
| error = "Peripheral already bound to device: {}".format( |
| self.device_type |
| ) |
| logging.error(error) |
| raise BluezPeripheralException(error) |
| |
| self.device_type = device_type |
| |
| if "BLE" in device_type: |
| logging.info("Binding to BLE device!") |
| |
| # Enable le only, to make sure our test device isn't connecting |
| # via BT classic |
| self.EnableBLE(True) |
| |
| # Establish and run gatt server |
| self._gatt_server = GATTServer(self) |
| |
| else: |
| logging.info("Binding to Classic device!") |
| |
| # Enable bt classic only, to make sure our test device isn't connecting |
| # via LE |
| self.EnableBLE(False) |
| |
| # Set device type and initiate service based on it |
| if "DEV_KIT" in device_type: |
| self._bluez_service = BluezRfcommEchoService( |
| self.device_type, self.GetLocalBluetoothAddress() |
| ) |
| else: |
| self._bluez_service = BluezHIDService( |
| self.device_type, self.GetLocalBluetoothAddress() |
| ) |
| |
| self.Init() |
| |
| self.SetAdapterAlias( |
| PERIPHERAL_DEVICE_NAME[self.GetBaseDeviceType(self.device_type)] |
| ) |
| |
| logging.info("Bluetooth peripheral now bound to %s", device_type) |
| |
| # Give the service a moment to initialize |
| time.sleep(1) |
| |
| return True |
| |
| def SetBtdFlags(self, device_type): |
| """Allows us to set bluetoothd config execution flags |
| |
| In some cases, we wish to apply specific bluetoothd execution flags |
| depending on the device type we are emulating. This function sets the |
| bluetoothd flags, and is intended to be run before the service is restarted |
| |
| @param device_type: String device type, e.g. "MOUSE" |
| """ |
| # Define regex to separate executable from flags. Example contents: |
| # ExecStart=/usr/libexec/bluetooth/bluetoothd -d -P input |
| PLUGIN_RE = re.compile( |
| r"^(?P<prefix>\s*ExecStart=.*?)" |
| r"(?P<flags>\s*-.*)?$" # pre-flags |
| ) # optional flags block |
| |
| desired_btd_flags = PERIPHERAL_BTD_FLAGS[ |
| self.GetBaseDeviceType(device_type) |
| ] |
| |
| file_contents = [] |
| with open(BTD_CONF_FILE, "r") as mf: |
| file_contents = mf.readlines() |
| |
| exec_line = 0 |
| m = None |
| for idx, line in enumerate(file_contents): |
| m = PLUGIN_RE.search(line) |
| if m: |
| exec_line = idx |
| break |
| |
| if not m: |
| logging.warn( |
| "Failed to locate ExecStart block in bluetoothd config" |
| ) |
| return |
| |
| # Update the exec line with desired bluetoothd flags |
| old_line = file_contents[exec_line] |
| file_contents[exec_line] = "{} {}\n".format( |
| m.group("prefix"), desired_btd_flags |
| ) |
| |
| # Store the original as a comment above the new one, if it doesn't exist yet |
| if m.group("prefix") not in file_contents[exec_line - 1]: |
| file_contents.insert(exec_line, "# {}".format(old_line)) |
| |
| logging.info("Applying bluetoothd flags: %s", desired_btd_flags) |
| |
| # Write new contents back to the file |
| with open(BTD_CONF_FILE, "w") as mf: |
| mf.write("".join(file_contents)) |
| |
| def ResetStack(self, next_device_type=None, restart_chameleond=True): |
| """Restores BT stack to pristine state by restarting running services""" |
| |
| logging.info("ResetStack") |
| reset_cmds = [ |
| "systemctl daemon-reload", |
| "service bluetooth restart", |
| ] |
| |
| if restart_chameleond: |
| reset_cmds.append("service chameleond restart") |
| |
| # If we know what device type is used next, we can specify which plugins |
| # bluetoothd should be loaded with before the service is restarted |
| if next_device_type: |
| self.SetBtdFlags(next_device_type) |
| |
| self.CleanCachedFiles() |
| self.AdapterPowerOff() |
| |
| # Since bluetoothd is going to be restarted, clean up the dbus proxy to |
| # enforce we create a new one once restarted. |
| self._dbus_hci_props = None |
| |
| # Stop Bluetooth audio related daemons which may interfere |
| # with HID tests. |
| # Since these commands may return non zero value if they are not |
| # running, we should not put them in the reset_cmds below. |
| os.system("sudo killall pulseaudio") |
| os.system("sudo service ofono stop") |
| |
| # Restart chameleon and bluetooth service |
| os.system(" && ".join(reset_cmds)) |
| |
| # Power cycle the device |
| def PowerCycle(self): |
| return self.Reboot() |
| |
| # Override BluetoothHID's implementation of init |
| def Init(self, factory_reset=True): |
| """Ensures our chip is in the correct state for the tests to be run""" |
| logging.info("Init") |
| |
| # Make sure device is powered up and discoverable |
| if not self._get_hci_prop("org.bluez.Adapter1", "Powered"): |
| logging.info("Init: Adapter not powered") |
| return False |
| self.SetDiscoverable(True) |
| |
| # Set class based on device we're emulating |
| if self.device_type and "BLE" not in self.device_type: |
| self.SetClassOfService(PERIPHERAL_DEVICE_CLASS[self.device_type]) |
| |
| return True |
| |
| def CleanCachedFiles(self): |
| """Clean up files that bluetoothd loads when starts. |
| |
| This enforce bluetoothd to be in a clean state by cleanning up files in |
| /var/lib/bluetooth. |
| """ |
| |
| bluez_storage_path = "/var/lib/bluetooth/" |
| adapter_storage_paths = [] |
| cache_storage_paths = [] |
| peer_storage_paths = [] |
| |
| cleanup_syscmds = [] |
| |
| for dirpath, dirlist, filelist in os.walk(bluez_storage_path): |
| if dirpath == bluez_storage_path: |
| adapter_storage_paths = [ |
| os.path.join(dirpath, adapter) for adapter in dirlist |
| ] |
| elif dirpath in adapter_storage_paths: |
| if "settings" in filelist: |
| # Empty adapter persistent settings. e.g. Alias, Powered |
| cleanup_syscmds.append( |
| "truncate -s 0 " + os.path.join(dirpath, "settings") |
| ) |
| |
| for dirname in dirlist: |
| if dirname == "cache": |
| cache_storage_paths.append( |
| os.path.join(dirpath, "cache") |
| ) |
| else: |
| peer_storage_paths.append( |
| os.path.join(dirpath, dirname) |
| ) |
| elif dirpath in cache_storage_paths: |
| for cached_dev in filelist: |
| cleanup_syscmds.append( |
| "rm " + os.path.join(dirpath, cached_dev) |
| ) |
| elif dirpath in peer_storage_paths: |
| for info in filelist: |
| cleanup_syscmds.append("rm " + os.path.join(dirpath, info)) |
| cleanup_syscmds.append("rmdir " + dirpath) |
| |
| for cmd in cleanup_syscmds: |
| logging.info('Executing command "%s" for cleanup', cmd) |
| os.system(cmd) |
| |
| return True |
| |
| def CreateSerialDevice(self): |
| """Device setup and recovery |
| |
| In the current test framework, CreateSerialDevice is called for |
| device initialization or to try and correct an error. While we |
| don't have a serial connection, this function is used in the same |
| way - device setup and recovery |
| """ |
| |
| return self.Init() |
| |
| # Check the serial device we aren't using |
| def CheckSerialConnection(self): |
| return True |
| |
| # Close the serial device we aren't using |
| def Close(self): |
| return True |
| |
| def EnterCommandMode(self): |
| return True |
| |
| def GetPort(self): |
| return "/dev/fakedev" |
| |
| def AdapterPowerOff(self): |
| return self._set_hci_prop( |
| "org.bluez.Adapter1", "Powered", dbus.Boolean(0) |
| ) |
| |
| def AdapterPowerOn(self): |
| return self._set_hci_prop( |
| "org.bluez.Adapter1", "Powered", dbus.Boolean(1) |
| ) |
| |
| def Reboot(self): |
| logging.info("REBOOTING") |
| |
| self.AdapterPowerOff() |
| self.AdapterPowerOn() |
| |
| # Put ourselves back into correct state for discovery |
| return self.Init() |
| |
| def SetAdapterAlias(self, name): |
| self._set_hci_prop("org.bluez.Adapter1", "Alias", dbus.String(name)) |
| |
| def SetBatteryLevel(self, level): |
| """Sets device battery level.""" |
| logging.info("bluetooth_raspi.SetBatteryLevel") |
| if self._gatt_server: |
| self._gatt_server.SetBatteryLevel(level) |
| |
| def SetDiscoverable(self, discoverable): |
| self._set_hci_prop( |
| "org.bluez.Adapter1", "Discoverable", dbus.Boolean(discoverable) |
| ) |
| |
| # Set discoverable timeout to 0 so that it won't expire. |
| if discoverable: |
| self._set_hci_prop( |
| "org.bluez.Adapter1", "DiscoverableTimeout", dbus.UInt32(0) |
| ) |
| # Also set discoverable in gatt server if relevant |
| if self._gatt_server: |
| self._gatt_server.SetDiscoverable(discoverable) |
| |
| def SetDiscoverableNoConfigAdv(self, discoverable): |
| """Set discoverable without configuring advertisements.""" |
| self._set_hci_prop( |
| "org.bluez.Adapter1", "Discoverable", dbus.Boolean(discoverable) |
| ) |
| |
| # Set discoverable timeout to 0 so that it won't expire. |
| if discoverable: |
| self._set_hci_prop( |
| "org.bluez.Adapter1", "DiscoverableTimeout", dbus.UInt32(0) |
| ) |
| # Also set discoverable in gatt server if relevant |
| if self._gatt_server: |
| self._gatt_server.SetDiscoverableNoConfigAdv(discoverable) |
| |
| def GetAuthenticationMode(self): |
| return PeripheralKit.OPEN_MODE |
| |
| def GetPinCode(self): |
| return "0000" |
| |
| def AdvertiseWithNamesAndAddresses(self, name_and_addr, duration): |
| """Advertises local names and addresses for duration time one by one. |
| |
| After this function returned, the local name and address will be reset back |
| to default and discoverable will be turned off. |
| |
| Args: |
| name_and_addr: A list of (name, addr). Each tuple describes the local name |
| and the address for the device to advertise. |
| duration: The time (in seconds) for a (name, addr) to be advertised. |
| """ |
| |
| def set_addr(addr): |
| tool = HciTool(sudo=True) |
| cmd = HciCommands.set_local_bluetooth_address(addr) |
| if cmd is None: |
| raise ValueError("invalid BT address: {}".format(addr)) |
| tool.add_command(cmd) |
| tool.run_commands() |
| |
| old_name = self._gatt_server.GetAdvLocalName() |
| old_addr = self.GetLocalBluetoothAddress() |
| self.SetDiscoverable(False) |
| |
| try: |
| for name, addr in name_and_addr: |
| self._gatt_server.SetAdvLocalName(name) |
| set_addr(addr) |
| |
| self.SetDiscoverable(True) |
| time.sleep(duration) |
| self.SetDiscoverable(False) |
| finally: |
| self._gatt_server.SetAdvLocalName(old_name) |
| set_addr(old_addr) |
| self.SetDiscoverable(False) |
| |
| def GetLocalBluetoothAddress(self): |
| """Get the builtin Bluetooth MAC address. |
| |
| If the HCI device doesn't exist, Get() will throw an exception |
| (dbus.exceptions.DBus.Error.UnknownObject) |
| """ |
| |
| # Return stored value if we have one |
| if self._address is not None: |
| return self._address |
| |
| try: |
| addr = str(self._get_hci_prop("org.bluez.Adapter1", "Address")) |
| self._address = addr |
| except Exception as e: |
| logging.error("Failed to get local addr: {}".format(e)) |
| addr = None |
| return addr |
| |
| def GetConnectionStatus(self): |
| """Determine whether the device has an active connection |
| |
| Returns: |
| True if a connection is active, False otherwise |
| """ |
| |
| # If we can't find a connected object, no active connections |
| if self.GetRemoteConnectedBluetoothAddress() is not None: |
| return True |
| |
| return False |
| |
| def _GetAddressFromPath(self, device_path): |
| manager = dbus.Interface( |
| self._dbus_system_bus.get_object(DBUS_BLUEZ_SERVICE_IFACE, "/"), |
| DBUS_OBJECT_MANAGER_INTERFACE, |
| ) |
| |
| objects = manager.GetManagedObjects() |
| |
| # Go through each object until we find the one that matches |
| # our device path |
| for path, ifaces in list(objects.items()): |
| if path == device_path: |
| device = ifaces.get(DBUS_BLUEZ_DEVICE_IFACE) |
| if device is not None: |
| return device["Address"] |
| return None |
| |
| def GetDeviceWithAddress(self, addr): |
| manager = dbus.Interface( |
| self._dbus_system_bus.get_object(DBUS_BLUEZ_SERVICE_IFACE, "/"), |
| DBUS_OBJECT_MANAGER_INTERFACE, |
| ) |
| |
| objects = manager.GetManagedObjects() |
| |
| # Go through each object in org.bluez.Device1 until |
| # we find the one that matches our desired address |
| for path, ifaces in list(objects.items()): |
| device = ifaces.get(DBUS_BLUEZ_DEVICE_IFACE) |
| |
| if device is not None and device["Address"] == addr: |
| obj = self._dbus_system_bus.get_object( |
| DBUS_BLUEZ_SERVICE_IFACE, path |
| ) |
| return dbus.Interface(obj, DBUS_BLUEZ_DEVICE_IFACE) |
| return None |
| |
| def GetRandomAddress(self): |
| """Read the random address of BT |
| |
| @return: random address of BT or empty string when failed |
| """ |
| cmd = "cat /sys/kernel/debug/bluetooth/hci0/random_address" |
| address = os.popen(cmd).read().strip().upper() |
| return address |
| |
| def SetPrivacy(self, enable): |
| """Set the LE privacy state to ON/OFF |
| |
| @param enable: Whether to set LE privacy to True/False. |
| |
| @return: True if able to set privacy |
| """ |
| _control = bluetooth_socket.BluetoothControlSocket() |
| if not self.AdapterPowerOff(): |
| logging.error("Unable to power off adapter") |
| return False |
| if not _control.set_privacy(0, enable): |
| logging.error("Unable to set privacy state") |
| return False |
| if not self.AdapterPowerOn(): |
| logging.error("Unable to power on adapter") |
| return False |
| |
| if self._gatt_server: |
| self._gatt_server.SetOwnAddressType( |
| RANDOM_DEVICE_ADDRESS if enable else PUBLIC_DEVICE_ADDRESS |
| ) |
| |
| return True |
| |
| def SetAdvertising(self, enable): |
| """Set LE advertising to OFF/ON and configure advertisement |
| parameters if LE advertising is enabled. |
| |
| @param enable: Whether controller should advertise via LE. |
| |
| @return: True if new setting success |
| """ |
| _control = bluetooth_socket.BluetoothControlSocket() |
| if not _control.set_advertising(0, enable): |
| logging.error("Unable to set LE advertising {}".format(enable)) |
| return False |
| if enable and self._gatt_server: |
| self._gatt_server._ConfigureAdvertisements() |
| return True |
| |
| def SetIOCapability(self, io_capability): |
| """Set IO Capability in kernel |
| |
| @param index: Controller index. |
| @param io_capability: IO Capability in one of |
| Possible values for the parameter: |
| 'DisplayOnly' |
| 'DisplayYesNo' |
| 'KeyboardOnly' |
| 'NoInputNoOutput' |
| 'KeyboardDisplay' |
| |
| @return True on success, |
| False on failure. |
| """ |
| logging.info("Setting IO Capability to %s", io_capability) |
| _control = bluetooth_socket.BluetoothControlSocket() |
| if not _control.set_io_capability(0, io_capability): |
| logging.error("Failed to set IO Capability") |
| return False |
| return True |
| |
| def SetSecureConnections(self, secure_conn): |
| """Sets whether a controller supports secure connections. |
| |
| @param secure_conn: "off" or "on" or "only". |
| |
| @return True on success, False otherwise. |
| """ |
| logging.info("Setting Secure Connection to %s", secure_conn) |
| _control = bluetooth_socket.BluetoothControlSocket() |
| if not _control.set_secure_connections(0, secure_conn): |
| logging.error("Failed to set Secure Connection") |
| return False |
| return True |
| |
| def SetSecureSimplePairing(self, enable): |
| """Sets Secure Simple Pairing to enable / disable. |
| |
| @param enable: Whether controller should support SSP. |
| |
| @return True on success, False otherwise. |
| """ |
| logging.info("Setting Secure Simple Pairing to %s", enable) |
| _control = bluetooth_socket.BluetoothControlSocket() |
| if _control.set_ssp(0, enable) is None: |
| logging.error("Failed to set Secure Simple Pairing") |
| return False |
| return True |
| |
| def SetFastConnectable(self, enable): |
| """Sets the controller page scan mode to be fast connectable or not. |
| |
| @param enable: Whether to enable fast connectable mode. |
| |
| @return: True to set fast connectable. |
| """ |
| _control = bluetooth_socket.BluetoothControlSocket() |
| if not _control.set_fast_connectable(0, enable): |
| logging.error("Failed to set fast connectable mode.") |
| return False |
| return True |
| |
| def SetRPATimeout(self, timeout): |
| """Set the timeout value for random private address |
| |
| @param timeout: Number of seconds after which |
| BT random private address will change |
| |
| @return: True if new timeout success |
| """ |
| if not str(timeout).isdigit(): |
| logging.error("Timeout must be a number") |
| return False |
| if int(timeout) < 30 or int(timeout) > 3600: |
| logging.error("Timeout must be in range [30, 3600]s") |
| return False |
| |
| cmd = "echo {} > /sys/kernel/debug/bluetooth/hci0/rpa_timeout".format( |
| timeout |
| ) |
| err = os.system(cmd) |
| if err != 0: |
| logging.error("Unable to update RPA timeout. Err {}".format(err)) |
| return False |
| return True |
| |
| def GetRPATimeout(self): |
| """Read the timeout value for random private address |
| |
| @return: timeout value as integer |
| """ |
| cmd = "cat /sys/kernel/debug/bluetooth/hci0/rpa_timeout" |
| timeout = os.popen(cmd).read() |
| if timeout == "": |
| logging.error("Fail not exist or timeout not defined.") |
| return 0 |
| return int(timeout) |
| |
| def GetRemoteConnectedBluetoothAddress(self): |
| """Get the address of the current connected device, if applicable |
| |
| Returns: |
| None if no connection can be found |
| Mac address of connected device |
| """ |
| |
| # Grab ObjectManager and its objects |
| manager = dbus.Interface( |
| self._dbus_system_bus.get_object("org.bluez", "/"), |
| DBUS_OBJECT_MANAGER_INTERFACE, |
| ) |
| |
| objects = manager.GetManagedObjects() |
| |
| # Go through each obj in org.bluez.Device1 and check "Connected" attribute |
| for _, ifaces in list(objects.items()): |
| device = ifaces.get(DBUS_BLUEZ_DEVICE_IFACE) |
| if device is None: |
| continue |
| if device["Address"] is not None and device["Connected"]: |
| return device["Address"] |
| |
| return None |
| |
| def SetClassOfService(self, class_of_service): |
| logging.info("SetClassOfService class_of_service: %s", class_of_service) |
| |
| # Class is a read-only DBus property, so needs to be set using system calls. |
| cmd = "sudo hciconfig hci0 class {}".format(hex(class_of_service)) |
| os.system(cmd) |
| |
| def SetDeviceClass(self, major, minor): |
| """Sets the device class of the controller. |
| |
| @param major: Major device class. |
| @param minor: Minor device class. |
| |
| @return True if success, False otherwise. |
| """ |
| logging.info("SetDeviceClass major: %s minor: %s", major, minor) |
| _control = bluetooth_socket.BluetoothControlSocket() |
| if _control.set_device_class(0, major, minor) is None: |
| logging.error("Failed to set device class.") |
| return False |
| return True |
| |
| def SetLinkMode(self, mode): |
| """Sets controller to the specified link mode. |
| |
| Args: |
| mode: string in 'NONE', 'ACCEPT', 'MASTER', 'AUTH', 'ENCRYPT', |
| 'TRUSTED', 'RELIABLE', 'SECURE'. |
| """ |
| |
| logging.info("Setting link mode to %s", mode) |
| |
| cmds = [] |
| link_mode_cmd = "sudo hciconfig hci0 lm {}".format(mode) |
| |
| # link mode only allowed to change when bredr is enabled. |
| if "BLE" in self.device_type: |
| cmds.extend( |
| [ |
| "sudo btmgmt power off", |
| "sudo btmgmt bredr on", |
| link_mode_cmd, |
| "sudo btmgmt bredr off", |
| "sudo btmgmt power on", |
| ] |
| ) |
| else: |
| cmds.append(link_mode_cmd) |
| |
| for cmd in cmds: |
| os.system(cmd) |
| time.sleep(0.1) |
| |
| def SetRemoteAddress(self, remote_address): |
| """Sets address later used for connect |
| |
| Args: |
| remote_address: string denoting remote address |
| """ |
| self.remote_address = remote_address |
| return True |
| |
| def _PairSuccessHandler(self): |
| """Called when pairing request completes""" |
| |
| def _PairErrorHandler(self, err): |
| """Called in case of error on pairing request""" |
| logging.error("Failed to pair: %s", err) |
| |
| def Pair(self): |
| """Attempts to pair to the address specified with SetRemoteAddress() |
| |
| Returns: |
| True if success to issue the pairing request, False otherwise |
| """ |
| if self.remote_address is None: |
| logging.error("Pair called with no remote address supplied") |
| return False |
| |
| device = self.GetDeviceWithAddress(self.remote_address) |
| device_iface = dbus.Interface(device, DBUS_PROP_IFACE) |
| is_paired = bool(device_iface.Get(DBUS_BLUEZ_DEVICE_IFACE, "Paired")) |
| |
| if not device: |
| logging.error( |
| "Device %s is None, can not pair", self.remote_address |
| ) |
| return False |
| |
| if is_paired: |
| logging.warning( |
| "Device %s has already paired. Skip to pair", |
| self.remote_address, |
| ) |
| return False |
| |
| logging.info("Attempting to pair to %s", self.remote_address) |
| |
| device.Pair( |
| reply_handler=self._PairSuccessHandler, |
| error_handler=self._PairErrorHandler, |
| ) |
| return True |
| |
| def Connect(self): |
| """Attempts to connect to the address specified with SetRemoteAddress() |
| |
| Returns: |
| True if connection to remote address succeeds, False otherwise |
| """ |
| |
| if self.remote_address is None: |
| logging.error("Connect called with no remote address supplied") |
| return False |
| |
| device = self.GetDeviceWithAddress(self.remote_address) |
| device_iface = dbus.Interface(device, DBUS_PROP_IFACE) |
| is_connected = bool( |
| device_iface.Get(DBUS_BLUEZ_DEVICE_IFACE, "Connected") |
| ) |
| |
| if is_connected: |
| logging.warn( |
| "Device %s has already connected. Skip to connect", |
| self.remote_address, |
| ) |
| return True |
| |
| logging.info("Attempting to connect to %s", self.remote_address) |
| |
| if self.device_type in ( |
| "MOUSE", |
| "KEYBOARD", |
| "GAMEPAD", |
| ): |
| self.ConnectHID() |
| else: |
| # Call connect directly through bluez device object |
| if not device: |
| logging.error( |
| "Device {} is None, can not connect".format( |
| self.remote_address |
| ) |
| ) |
| return False |
| |
| try: |
| device.Connect() |
| except Exception as e: |
| logging.error( |
| "Failed to connect to {}: {}".format(self.remote_address, e) |
| ) |
| return False |
| |
| start_time = time.time() |
| |
| while not is_connected: |
| if time.time() > start_time + CONNECTION_WAIT_TIME: |
| break |
| |
| time.sleep(1) |
| is_connected = bool( |
| device_iface.Get(DBUS_BLUEZ_DEVICE_IFACE, "Connected") |
| ) |
| |
| return is_connected |
| |
| def ConnectHIDDoneHandler(self): |
| """Called when connect HID completes""" |
| logging.info("Connect HID succeeded") |
| |
| def ConnectHIDErrorHandler(self, err): |
| """Called in case of error on connect HID""" |
| logging.error("Connect HID failed: %s", err) |
| |
| def ConnectHID(self): |
| """Attempt to connect to HID to the remote address.""" |
| |
| self.get_service_iface().Connect( |
| self.remote_address, |
| reply_handler=self.ConnectHIDDoneHandler, |
| error_handler=self.ConnectHIDErrorHandler, |
| ) |
| |
| def _GetAdapterIface(self): |
| """Returns handle to bluez adapter interface |
| |
| Returns: |
| Handle to bluez adapter interface |
| """ |
| |
| if self._adapter_iface is None: |
| self._adapter_iface = dbus.Interface( |
| self._dbus_system_bus.get_object( |
| DBUS_BLUEZ_SERVICE_IFACE, self._dbus_hci_adapter_path |
| ), |
| DBUS_BLUEZ_ADAPTER_IFACE, |
| ) |
| |
| return self._adapter_iface |
| |
| def RemoveDevice(self, remote_address): |
| """Removes a remote device from bluez |
| |
| Args: |
| remote_address: string address of BT device |
| """ |
| |
| try: |
| logging.info("Trying to remove {}".format(remote_address)) |
| device = self.GetDeviceWithAddress(remote_address) |
| |
| if device: |
| self._GetAdapterIface().RemoveDevice(device) |
| |
| except Exception as e: |
| logging.info( |
| "Failed to remove device {}: {}".format(remote_address, e) |
| ) |
| |
| def SetDiscoveryFilter(self, discovery_filter): |
| """Sets the discovery filter. |
| |
| Returns: |
| True if the request was sent without error, False otherwise. |
| """ |
| try: |
| self._GetAdapterIface().SetDiscoveryFilter( |
| discovery_filter, signature="a{sv}" |
| ) |
| except Exception as e: |
| logging.error("Failed to set discovery filter: %s", e) |
| return False |
| return True |
| |
| def StartDiscovery(self): |
| """Tries to start discovery on the adapter |
| |
| Returns: |
| True if discovery started without error, False otherwise |
| """ |
| |
| try: |
| self._GetAdapterIface().StartDiscovery() |
| |
| except Exception as e: |
| logging.error("Failed to start discovery: {}".format(e)) |
| |
| return False |
| |
| return True |
| |
| def StopDiscovery(self): |
| """Tries to stop discovery on the adapter |
| |
| Returns: |
| True if discovery stopped without error, False otherwise |
| """ |
| |
| try: |
| self._GetAdapterIface().StopDiscovery() |
| |
| except Exception as e: |
| logging.error("Failed to stop discovery: {}".format(e)) |
| |
| return False |
| |
| return True |
| |
| def StartUnfilteredDiscovery(self): |
| """Starts unfiltered discovery session for DUT advertisement testing |
| |
| Since there is no way to initiate an LE scan through bluez without |
| filtering enabled, submit request directly over HCI layer. This is useful |
| for LE advertising tests, as we will discover advertisements much faster. |
| """ |
| |
| # Submit LE Set Scan Enable command with filter_duplicates disabled |
| cmds = [ |
| HciCommands.le_set_scan_params(), |
| HciCommands.le_set_scan_enable( |
| enable=True, filter_duplicates=False |
| ), |
| ] |
| |
| tool = HciTool(sudo=True, wait_time=0.1) |
| |
| for cmd in cmds: |
| tool.add_command(cmd) |
| |
| tool.run_commands() |
| |
| return True |
| |
| def StopUnfilteredDiscovery(self): |
| """Stops unfiltered discovery session for DUT advertisement testing |
| |
| Disables a scan started via self.StartUnfilteredDiscovery() |
| """ |
| |
| tool = HciTool(sudo=True, wait_time=0.1) |
| |
| # Submit LE Set Scan Enable command with LE_Scan_enable set to false |
| tool.add_command(HciCommands.le_set_scan_enable(enable=False)) |
| |
| tool.run_commands() |
| |
| return True |
| |
| def FindAdvertisementWithAttributes(self, attrs=[], timeout=10): |
| """Locate an advertisement containing the requested attributes from btmon |
| |
| Args: |
| attrs: List of strings to be located within an advertising event |
| timeout: Seconds to discover before returning failure |
| |
| Returns: |
| String containing matching advertising event if found, None otherwise |
| """ |
| |
| def _IsMatchedEvent(ev): |
| """Determines whether an event matches the requested criteria |
| |
| Args: |
| ev: String containing btmon event |
| |
| Returns: True if event matches criteria, False otherwise |
| """ |
| content = "".join(ev) |
| |
| if "LE Advertising Report" not in content: |
| return False |
| |
| # Search for all requested attributes. Only return true if all are found |
| for attr in attrs: |
| if attr not in content: |
| return False |
| |
| return True |
| |
| current_event = [] |
| |
| # Start searching for devices |
| if not self.StartUnfilteredDiscovery(): |
| return None |
| |
| end_time = time.time() + timeout |
| process = system_tools.SystemTools.RunInSubprocess("btmon") |
| |
| # If no btmon traffic is occurring, the readline call will block forever. |
| # Here, configure a timer to terminate the process after our selected |
| # timeout. |
| timer = threading.Timer(timeout, process.terminate) |
| timer.start() |
| |
| desired_event = None |
| while time.time() < end_time: |
| output = process.stdout.readline().decode("utf8") |
| if output == "" and process.poll() is not None: |
| break |
| |
| # output will contain a single line of btmon, i.e. |
| # '> HCI Event: LE Meta Event (0x3e) plen 43' |
| if output: |
| # Event headers are labeled with their data length. If we encounter a |
| # header, we check the last event to see if it matches our search |
| if " plen " in output: |
| if current_event and _IsMatchedEvent(current_event): |
| desired_event = "".join(current_event) |
| break |
| |
| # If we found an event header and the previous wasn't a match, start |
| # recording this event |
| current_event = [] |
| |
| # Don't add header to this event, as the header timestamp has a non-zero |
| # chance to match a numeric search attribute |
| else: |
| current_event.append(output) |
| |
| # Terminate listening process and stop discovery |
| process.terminate() |
| timer.cancel() |
| self.StopUnfilteredDiscovery() |
| return desired_event |
| |
| def _DiscoverDevice( |
| self, remote_address, wait_time=10, allow_early_discovery_by_path=False |
| ): |
| """This method determines if a device has been discovered via dbus path |
| |
| Here, we perform the same operation as test_discover_device in autotest, |
| where we consider a remote device "discovered" when bluez creates a dbus |
| object for it. This allows us to avoid installing and importing extra |
| libraries for scanning, particularly for LE. |
| |
| Args: |
| remote_address: string address of desired BT device |
| wait_time: seconds we should wait before returning failure to discover |
| allow_early_discovery_by_path: boolean denoting whether we can return |
| immediately if we already have a dbus object for this address. |
| Otherwise, a fresh advertisement is required for a device to be |
| discovered |
| |
| Returns: |
| True if device is found, False otherwise |
| """ |
| |
| if allow_early_discovery_by_path: |
| # Return early if we already have this device |
| if self.GetDeviceWithAddress(remote_address): |
| logging.info( |
| "Device {} found without discovery".format(remote_address) |
| ) |
| return True |
| |
| self.discovered_devices = [] |
| # Register callback on PropertiesChanged signal |
| # TODO: Replace with CallbackManager |
| receiver = self._dbus_system_bus.add_signal_receiver( |
| self._DeviceFound, |
| signal_name="PropertiesChanged", |
| bus_name="org.bluez", |
| path_keyword="device_path", |
| ) |
| |
| # Start searching for devices |
| self.StartDiscovery() |
| |
| # Wait until our device has been added to discovered_devices by listening |
| # thread, or timeout |
| start_time = time.time() |
| while remote_address not in self.discovered_devices: |
| if time.time() > start_time + wait_time: |
| break |
| |
| time.sleep(1) |
| |
| # Clean up |
| receiver.remove() |
| self.StopDiscovery() |
| |
| return remote_address in self.discovered_devices |
| |
| def _DeviceFound(self, *args, **kwargs): |
| """Called when a property changes on the bluez d-bus interface |
| |
| Useful for tracking reception of advertisements, as they update the bluez |
| device |
| |
| Args: |
| args: list of form [caller, property_dict] |
| kwargs: dict containing keyword arguments requested in the |
| add_signal_receiver call i.e. device_path of calling object |
| """ |
| |
| # Renaming to be more human readable while satisfying pylint |
| changed_prop = args |
| caller_details = kwargs |
| |
| caller = str(changed_prop[0]) |
| |
| if "Device1" in caller: |
| device_path = caller_details.get("device_path") |
| device_address = device_path[-17:].replace("_", ":") |
| |
| # Store new devices. list append is thread safe |
| if device_address not in self.discovered_devices: |
| logging.info("New device %s found", device_address) |
| self.discovered_devices.append(device_address) |
| resolved_address = self._GetAddressFromPath(device_path) |
| if ( |
| resolved_address is not None |
| and resolved_address not in self.discovered_devices |
| ): |
| logging.info( |
| "New device with resolved address %s found", |
| resolved_address, |
| ) |
| self.discovered_devices.append(resolved_address) |
| |
| def Discover(self, remote_address): |
| """Try to discover the remote device |
| |
| Returns: |
| True if remote address is discovered. |
| """ |
| |
| return self._DiscoverDevice(remote_address) |
| |
| def Disconnect(self): |
| """Requests a disconnect from the remote device |
| |
| Returns: |
| True if connected device exists, False otherwise |
| """ |
| |
| logging.debug("Disconnecting from adapter") |
| |
| # Can't do anything if we're not connected |
| if not self.GetConnectionStatus(): |
| return False |
| |
| device = self.GetDeviceWithAddress(self.remote_address) |
| if device is None: |
| logging.error( |
| "Failed to get device with the address %s", self.remote_address |
| ) |
| return False |
| |
| device.Disconnect() |
| |
| return True |
| |
| def SendHIDReport(self, report): |
| """Sends a hid report to our bluez service |
| |
| Args: |
| report: scan code representing state of HID device |
| """ |
| # Passing with empty handlers allows operation to run async |
| self.get_service_iface().SendHIDReport( |
| report, |
| reply_handler=self.KeysSentHandler, |
| error_handler=self.KeysErrorHandler, |
| ) |
| |
| def KeysSentHandler(self): |
| """Called when hid report send completes""" |
| pass |
| |
| def KeysErrorHandler(self, err): |
| """Called in case of error on hid report send""" |
| logging.error("KeysErrorHandler: %s", err) |
| |
| def _CheckValidModifiers(self, modifiers): |
| invalid_modifiers = [m for m in modifiers if m not in self.MODIFIERS] |
| if invalid_modifiers: |
| logging.error('Modifiers not valid: "%s".', str(invalid_modifiers)) |
| return False |
| return True |
| |
| def _IsValidScanCode(self, code): |
| """Check if the code is a valid scan code. |
| |
| Args: |
| code: the code to check |
| |
| Returns: |
| True: if the code is a valid scan code. |
| """ |
| return ( |
| self.SCAN_NO_EVENT <= code <= self.SCAN_PAUSE |
| or self.SCAN_SYSTEM_POWER <= code <= self.SCAN_SYSTEM_WAKE |
| ) |
| |
| def _CheckValidScanCodes(self, keys): |
| invalid_keys = [k for k in keys if not self._IsValidScanCode(k)] |
| if invalid_keys: |
| logging.error('Keys not valid: "%s".', str(invalid_keys)) |
| return False |
| return True |
| |
| def RawKeyCodes(self, modifiers=None, keys=None): |
| """Generate the codes in raw keyboard report format. |
| |
| This method sends data in the raw report mode. The first start |
| byte chr(UART_INPUT_RAW_MODE) is stripped and the following bytes |
| are sent without interpretation. |
| |
| For example, generate the codes of 'shift-alt-i' by |
| codes = RawKeyCodes(modifiers=[RasPi.LEFT_SHIFT, RasPi.LEFT_ALT], |
| keys=[RasPi.SCAN_I]) |
| |
| Args: |
| modifiers: a list of modifiers |
| keys: a list of scan codes of keys |
| |
| Returns: |
| a raw code string if both modifiers and keys are valid, or |
| None otherwise. |
| """ |
| modifiers = modifiers or [] |
| keys = keys or [] |
| |
| if not ( |
| self._CheckValidModifiers(modifiers) |
| and self._CheckValidScanCodes(keys) |
| ): |
| return None |
| |
| real_scan_codes = [key for key in keys] |
| padding_0s = (0) * ( |
| RAW_REPORT_FORMAT_KEYBOARD_LEN_SCAN_CODES - len(real_scan_codes) |
| ) |
| |
| return bytearray( |
| ( |
| UART_INPUT_RAW_MODE, |
| RAW_REPORT_FORMAT_KEYBOARD_LENGTH, |
| RAW_REPORT_FORMAT_KEYBOARD_DESCRIPTOR, |
| sum(modifiers), |
| 0x0, |
| ) |
| + tuple(keys) |
| + padding_0s |
| ) |
| |
| def _MouseButtonsRawHidValues(self): |
| """Gives the raw HID values for whatever buttons are pressed.""" |
| currently_pressed = 0x0 |
| for button in self._buttons_pressed: |
| if button == PeripheralKit.MOUSE_BUTTON_LEFT: |
| currently_pressed |= RAW_HID_LEFT_BUTTON |
| elif button == PeripheralKit.MOUSE_BUTTON_RIGHT: |
| currently_pressed |= RAW_HID_RIGHT_BUTTON |
| else: |
| error = "Unknown mouse button in state: %s" % button |
| logging.error(error) |
| raise BluezPeripheralException(error) |
| return currently_pressed |
| |
| def MouseMove(self, delta_x, delta_y): |
| """Move the mouse (delta_x, delta_y) steps. |
| |
| If buttons are being pressed, they will stay pressed during this operation. |
| This move is relative to the current position by the HID standard. |
| Valid step values must be in the range [-127,127]. |
| |
| Args: |
| delta_x: The number of steps to move horizontally. |
| Negative values move left, positive values move right. |
| delta_y: The number of steps to move vertically. |
| Negative values move up, positive values move down. |
| """ |
| raw_buttons = self._MouseButtonsRawHidValues() |
| if delta_x or delta_y: |
| mouse_codes = self._RawMouseCodes( |
| buttons=raw_buttons, x_stop=delta_x, y_stop=delta_y |
| ) |
| self.SendHIDReport(mouse_codes) |
| |
| def MouseScroll(self, steps): |
| """Scroll the mouse wheel steps number of steps. |
| |
| Buttons currently pressed will stay pressed during this operation. |
| Valid step values must be in the range [-127,127]. |
| |
| Args: |
| steps: The number of steps to scroll the wheel. |
| With traditional scrolling: |
| Negative values scroll down, positive values scroll up. |
| With reversed (formerly "Australian") scrolling this is reversed. |
| """ |
| raw_buttons = self._MouseButtonsRawHidValues() |
| if steps: |
| mouse_codes = self._RawMouseCodes(buttons=raw_buttons, wheel=steps) |
| self.SendHIDReport(mouse_codes) |
| |
| def MousePressButtons(self, buttons): |
| """Press the specified mouse buttons. |
| |
| The kit will continue to press these buttons until otherwise instructed, or |
| until its state has been reset. |
| |
| Args: |
| buttons: A set of buttons, as PeripheralKit MOUSE_BUTTON_* values, that |
| will be pressed (and held down). |
| """ |
| self._MouseButtonStateUnion(buttons) |
| raw_buttons = self._MouseButtonsRawHidValues() |
| if raw_buttons: |
| mouse_codes = self._RawMouseCodes(buttons=raw_buttons) |
| self.SendHIDReport(mouse_codes) |
| |
| def MouseReleaseAllButtons(self): |
| """Release all mouse buttons.""" |
| self._MouseButtonStateClear() |
| mouse_codes = self._RawMouseCodes(buttons=RAW_HID_BUTTONS_RELEASED) |
| self.SendHIDReport(mouse_codes) |
| |
| def _RawMouseCodes(self, buttons=0, x_stop=0, y_stop=0, wheel=0): |
| """Generate the codes in mouse raw report format. |
| |
| This method sends data in the raw report mode. The first start |
| byte chr(UART_INPUT_RAW_MODE) is stripped and the following bytes |
| are sent without interpretation. |
| |
| For example, generate the codes of moving cursor 100 pixels left |
| and 50 pixels down: |
| codes = _RawMouseCodes(x_stop=-100, y_stop=50) |
| |
| Args: |
| buttons: the buttons to press and release |
| x_stop: the pixels to move horizontally |
| y_stop: the pixels to move vertically |
| wheel: the steps to scroll |
| |
| Returns: |
| a raw code string. |
| """ |
| |
| def SignedChar(value): |
| """Converted the value to a legitimate signed character value. |
| |
| Given value must be in [-127,127], or odd things will happen. |
| |
| Args: |
| value: a signed integer |
| |
| Returns: |
| a signed character value |
| """ |
| if value < 0: |
| # Perform two's complement. |
| return value + 256 |
| return value |
| |
| return bytearray( |
| ( |
| RAW_REPORT_START, |
| RAW_REPORT_FORMAT_MOUSE_DESCRIPTOR, |
| SignedChar(buttons), |
| SignedChar(x_stop), |
| SignedChar(y_stop), |
| SignedChar(wheel), |
| ) |
| ) |
| |
| def SendGamepadHIDReport(self): |
| """Sends gamepad HID report.""" |
| gamepad_codes = self._RawGamepadCodes(self.gamepad_report) |
| self.SendHIDReport(gamepad_codes) |
| |
| def GamepadTriggerButtonsPress(self, trigger, value): |
| """Presses gamepad trigger buttons (LT, RT). |
| |
| Args: |
| trigger: A trigger button, as GAMEPAD_BUTTON_LEFT_TRIGGER or |
| GAMEPAD_BUTTON_RIGHT_TRIGGER value, that will be pressed. |
| value: A value between 0-1023 to press the trigger. |
| """ |
| if value: |
| hex_value = value.to_bytes(2, byteorder="little") |
| if trigger == PeripheralKit.GAMEPAD_BUTTON_LEFT_TRIGGER: |
| index = GAMEPAD_BUTTONS_REPORT_INDEX["LEFT_TRIGGER"] |
| self.gamepad_report[index : index + 1] = bytearray(hex_value) |
| elif trigger == PeripheralKit.GAMEPAD_BUTTON_RIGHT_TRIGGER: |
| index = GAMEPAD_BUTTONS_REPORT_INDEX["RIGHT_TRIGGER"] |
| self.gamepad_report[index : index + 1] = bytearray(hex_value) |
| self.SendGamepadHIDReport() |
| |
| def GamepadThumbstickMove(self, stick, delta_x=4095, delta_y=4095): |
| """Moves the gamepad thumbstick. |
| |
| Args: |
| stick: The thumbstick type, left or right as PeripheralKit |
| GAMEPAD_LEFT_THUMBSTICK or GAMEPAD_RIGHT_THUMBSTICK value. |
| delta_x: A value between 4095-65535 to move the thumbstick horizontally. |
| delta_y: A value between 4095-65535 to move the thumbstick vertically. |
| """ |
| x_hex_value = delta_x.to_bytes(2, byteorder="little") |
| y_hex_value = delta_y.to_bytes(2, byteorder="little") |
| if stick == PeripheralKit.GAMEPAD_LEFT_THUMBSTICK: |
| index = GAMEPAD_BUTTONS_REPORT_INDEX["LEFT_STICK"] |
| self.gamepad_report[index : index + 2] = bytearray(x_hex_value) |
| self.gamepad_report[index + 2 : index + 4] = bytearray(y_hex_value) |
| elif stick == PeripheralKit.GAMEPAD_RIGHT_THUMBSTICK: |
| index = GAMEPAD_BUTTONS_REPORT_INDEX["RIGHT_STICK"] |
| self.gamepad_report[index : index + 2] = bytearray(x_hex_value) |
| self.gamepad_report[index + 2 : index + 4] = bytearray(y_hex_value) |
| self.SendGamepadHIDReport() |
| |
| def GamepadPressButtons(self, button): |
| """Presses the provided gamepad button. |
| |
| Args: |
| button: A gamepad button, as GAMEPAD_BUTTON_* value, that will be |
| pressed. |
| """ |
| button_index = GAMEPAD_BUTTONS_REPORT_INDEX[button] |
| self.gamepad_report[button_index] = GAMEPAD_BUTTONS_MAP.get(button) |
| self.SendGamepadHIDReport() |
| |
| def GamepadReleaseAllButtons(self): |
| """Releases all gamepad buttons.""" |
| self.gamepad_report[:] = [0x00] * len(self.gamepad_report) |
| self.SendGamepadHIDReport() |
| |
| def _RawGamepadCodes(self, gamepad_report): |
| """Generates the codes in gamepad raw report format. |
| |
| Args: |
| gamepad_report: Gamepad report to send with HID report. |
| |
| Returns: |
| a raw code string. |
| """ |
| return bytearray( |
| ( |
| RAW_REPORT_START, |
| RAW_REPORT_FORMAT_GAMEPAD_LENGTH, |
| *gamepad_report, |
| ) |
| ) |
| |
| def PressShorthandCodes(self, modifiers=None, keys=None): |
| """Generate key press codes in shorthand report format. |
| |
| Only key press is sent. The shorthand mode is useful in separating the |
| key press and key release events. |
| |
| For example, generate the codes of 'shift-alt-i' by |
| codes = PressShorthandCodes(modifiers=[RasPi.LEFT_SHIFT, RasPi.LEFT_ALT], |
| keys=[RasPi_I]) |
| |
| Args: |
| modifiers: a list of modifiers |
| keys: a list of scan codes of keys |
| |
| Returns: |
| a shorthand code string if both modifiers and keys are valid, or |
| None otherwise. |
| """ |
| modifiers = modifiers or [] |
| keys = keys or [] |
| |
| if not ( |
| self._CheckValidModifiers(modifiers) |
| and self._CheckValidScanCodes(keys) |
| ): |
| return None |
| |
| if len(keys) > SHORTHAND_REPORT_FORMAT_KEYBOARD_MAX_LEN_SCAN_CODES: |
| return None |
| |
| return ( |
| chr(UART_INPUT_SHORTHAND_MODE) |
| + chr(len(keys) + 1) |
| + chr(sum(modifiers)) |
| + "".join([chr(key) for key in keys]) |
| ) |
| |
| def ReleaseShorthandCodes(self): |
| """Generate the shorthand report format code for key release. |
| |
| Key release is sent. |
| |
| Returns: |
| a special shorthand code string to release any pressed keys. |
| """ |
| return chr(UART_INPUT_SHORTHAND_MODE) + chr(0x0) |
| |
| def GetKitInfo(self): |
| """A simple demo of getting kit information.""" |
| logging.info("advertised name: %s", self.GetAdvertisedName()) |
| logging.info( |
| "local bluetooth address: %s", self.GetLocalBluetoothAddress() |
| ) |
| class_of_service = self.GetClassOfService() |
| try: |
| class_of_service = hex(class_of_service) |
| except TypeError: |
| pass |
| logging.info("Class of service: %s", class_of_service) |
| class_of_device = self.GetClassOfDevice() |
| try: |
| class_of_device = hex(class_of_device) |
| except TypeError: |
| pass |
| logging.info("Class of device: %s", class_of_device) |
| |
| |
| if __name__ == "__main__": |
| # To allow basic printing when run from command line |
| logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) |
| |
| kit_instance = BluezPeripheral() |
| kit_instance.GetKitInfo() |