| # Lint as: python2, python3 |
| # -*- coding: utf-8 -*- |
| |
| # Copyright 2019 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """This module implements the PeripheralKit instance for a bluez peripheral |
| on Raspberry Pi. |
| """ |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import logging |
| import os |
| import re |
| import sys |
| import threading |
| import time |
| |
| import dbus |
| import dbus.mainloop.glib |
| import dbus.service |
| |
| # Libraries needed on raspberry pi. ImportError on |
| # Fizz can be ignored. |
| try: |
| from gi.repository import GLib |
| except ImportError: |
| pass |
| |
| |
| from chameleond.utils import pairing_agent |
| from chameleond.utils import system_tools |
| from chameleond.utils.raspi_bluez_service import BluezService |
| from .bluetooth_peripheral_kit import PeripheralKit |
| from .bluetooth_peripheral_kit import PeripheralKitException |
| from .bluez_service_consts import PERIPHERAL_DEVICE_CLASS, \ |
| PERIPHERAL_DEVICE_NAME, BLUEZ_SERVICE_NAME, BLUEZ_SERVICE_PATH, \ |
| BTD_CONF_FILE, PERIPHERAL_BTD_FLAGS |
| from .bluez_gatt_server import GATTServer |
| from .hci_cmd import HciCommands, HciTool |
| from six.moves import range |
| |
| DBUS_BLUEZ_SERVICE_IFACE = 'org.bluez' |
| DBUS_BLUEZ_ADAPTER_IFACE = DBUS_BLUEZ_SERVICE_IFACE + '.Adapter1' |
| DBUS_BLUEZ_DEVICE_IFACE = DBUS_BLUEZ_SERVICE_IFACE + '.Device1' |
| DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' |
| DBUS_OBJECT_MANAGER_INTERFACE = 'org.freedesktop.DBus.ObjectManager' |
| |
| # Time to wait for bluetoothd to start |
| BLUETOOTHD_WAIT_TIME = 30 |
| MAX_DBUS_RETRY_ATTEMPTS = 3 |
| CONNECTION_WAIT_TIME = 10 |
| |
| # Definitions of mouse button HID encodings |
| RAW_HID_BUTTONS_RELEASED = 0x0 |
| RAW_HID_LEFT_BUTTON = 0x01 |
| RAW_HID_RIGHT_BUTTON = 0x02 |
| |
| # UART input modes |
| # raw mode |
| UART_INPUT_RAW_MODE = 0xFD |
| RAW_REPORT_START = 0xA1 |
| # Length of report format for keyboard |
| RAW_REPORT_FORMAT_KEYBOARD_LENGTH = 9 |
| RAW_REPORT_FORMAT_KEYBOARD_DESCRIPTOR = 1 |
| RAW_REPORT_FORMAT_KEYBOARD_LEN_SCAN_CODES = 6 |
| |
| # shorthand mode |
| UART_INPUT_SHORTHAND_MODE = 0xFE |
| SHORTHAND_REPORT_FORMAT_KEYBOARD_MAX_LEN_SCAN_CODES = 6 |
| # Length of report format for mouse |
| RAW_REPORT_FORMAT_MOUSE_LENGTH = 5 |
| RAW_REPORT_FORMAT_MOUSE_DESCRIPTOR = 2 |
| |
| # Time to wait about the pairing agent to exit. |
| AGENT_EXIT_TIMEOUT_SECS = 5 |
| |
| |
| class BluezPeripheralException(PeripheralKitException): |
| """A dummy exception class for Bluez class.""" |
| pass |
| |
| |
| class BluezPeripheral(PeripheralKit): |
| """This is an abstraction of a Bluez peripheral.""" |
| |
| |
| def __init__(self): |
| super(BluezPeripheral, self).__init__() |
| self._settings = {} |
| self._setup_dbus_loop() |
| |
| self.remote_address = None |
| self._address = None |
| self._bluez_service = None |
| self._service = None |
| self._device_type = None |
| self._gatt_server = None |
| |
| # Bluez DBus constants - npnext |
| self._service_iface = None |
| self._adapter_iface = None |
| self._dbus_system_bus = dbus.SystemBus() |
| self._dbus_hci_adapter_path = '/org/bluez/hci0' |
| self._dbus_hci_props = None |
| |
| # Make sure adapter is powered for tests. _set_hci_prop will connect |
| # self._dbus_hci_props, and even start bluetoothd if necessary |
| self._set_hci_prop('org.bluez.Adapter1', 'Powered', dbus.Boolean(1)) |
| |
| self._agent = None |
| |
| logging.debug('Bluetooth peripheral powered and waiting for bind to device') |
| |
| |
| def __del__(self): |
| """Quit the mainloop when done.""" |
| self._loop.quit() |
| |
| |
| def get_service_iface(self): |
| if not self._service_iface: |
| self._service = self._dbus_system_bus.get_object(BLUEZ_SERVICE_NAME, |
| BLUEZ_SERVICE_PATH) |
| self._service_iface = dbus.Interface(self._service, BLUEZ_SERVICE_NAME) |
| |
| return self._service_iface |
| |
| |
| def StartPairingAgent(self, capability): |
| """Start a pairing agent with the specified capability. |
| |
| Args: |
| capability: the capability of the device. |
| The capability parameter can have the values: |
| DisplayOnly, DisplayYesNo, KeyboardOnly, NoInputNoOutput |
| and KeyboardDisplay. |
| Refer to bluez/doc/agent-api.txt for more details. |
| """ |
| if self._agent: |
| self.StopPairingAgent() |
| |
| self._agent = threading.Thread(target=pairing_agent.SetupAgent, |
| args=(capability,)) |
| logging.info('The pairing agent is started with capability %s', capability) |
| self._agent.start() |
| |
| |
| def StopPairingAgent(self): |
| """Stop the pairing agent.""" |
| logging.info('Stop the pairing agent') |
| if self._agent: |
| pairing_agent.StopAgent() |
| self._agent.join(AGENT_EXIT_TIMEOUT_SECS) |
| logging.info('agent exited and joined') |
| self._agent = None |
| |
| |
| def _setup_dbus_loop(self): |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
| self._loop = GLib.MainLoop() |
| self._thread = threading.Thread(target=self._loop.run) |
| self._thread.start() |
| |
| |
| def _kill_bluetoothd(self): |
| """Calls for bluetoothd to be killed, in case it is in a bad state""" |
| |
| os.system('killall -9 bluetoothd') |
| |
| |
| def _start_bluetoothd(self): |
| """Calls for bluetoothd to be started, if not started already""" |
| |
| os.system('sudo service bluetooth start') |
| |
| |
| def _check_prop_handle(self, handle): |
| """Checks if bluetoothd is running by accessing an adapter object""" |
| |
| try: |
| handle.Get('org.bluez.Adapter1', 'Powered') |
| return True |
| |
| except Exception as e: |
| logging.info('Failed to retrieve bluez adapter property: %s', str(e)) |
| return False |
| |
| |
| def _create_prop_handle(self): |
| """Establish dbus link to bluetoothd for hci properties |
| |
| Returns: dbus interface to bluetoothd properties |
| """ |
| |
| return dbus.Interface(self._dbus_system_bus.get_object(\ |
| 'org.bluez',\ |
| '/org/bluez/hci0'),\ |
| 'org.freedesktop.DBus.Properties') |
| |
| |
| def _refresh_hci_prop_handle(self): |
| """Initializes a dbus handle to hci properties""" |
| |
| # If we already have a link to bluetoothd, just return it |
| if self._dbus_hci_props is not None: |
| return |
| |
| # Next step, try to link to already-running bluetoothd |
| handle = self._create_prop_handle() |
| |
| if self._check_prop_handle(handle): |
| self._dbus_hci_props = handle |
| return |
| |
| # A pgrep return of 0 indicates that a process was found. If bluetoothd is |
| # running and we can't reach it, kill and restart to get it out of bad state |
| if os.system('pgrep bluetoothd') == 0: |
| logging.error('Could not reach bluetoothd, killing') |
| self._kill_bluetoothd() |
| |
| logging.info('Starting bluetooth service') |
| self._start_bluetoothd() |
| |
| # Since bluetoothd can be delayed by system conditions outside our control, |
| # we wait and refresh our link to its dbus properties until it is ready |
| for _ in range(0, BLUETOOTHD_WAIT_TIME): |
| handle = self._create_prop_handle() |
| if self._check_prop_handle(handle): |
| logging.info('Bluetooth ready, returning link') |
| self._dbus_hci_props = handle |
| return |
| |
| time.sleep(1) |
| |
| |
| def _set_hci_prop(self, iface_name, prop_name, prop_val): |
| """Handles HCI property set |
| |
| Sometimes resource isn't ready, so this utility will repeat commands |
| multiple times to allow more reliable operation |
| |
| Returns: |
| True if set succeeds, False otherwise |
| """ |
| |
| # Make sure we have a working handle to bluetoothd |
| self._refresh_hci_prop_handle() |
| |
| for _ in range(MAX_DBUS_RETRY_ATTEMPTS): |
| try: |
| initial_value = self._dbus_hci_props.Get(iface_name, prop_name) |
| |
| # Return if property has value we want already |
| if initial_value == prop_val: |
| return True |
| |
| # Set property to the value we want |
| logging.info('{}: {} -> {}'.format(prop_name, initial_value, prop_val)) |
| self._dbus_hci_props.Set(iface_name, prop_name, prop_val) |
| |
| time.sleep(.1) |
| |
| except dbus.exceptions.DBusException as e: |
| error_msg = 'Setting {} to {} again: {}'.format(prop_name, prop_val, |
| str(e)) |
| logging.error(error_msg) |
| time.sleep(.1) |
| |
| return False |
| |
| |
| def _get_hci_prop(self, iface_name, prop_name): |
| """Handles HCI property get |
| |
| Args: iface_name: Dbus interface to be queried |
| Args: prop_name: Property name to be queried |
| |
| Returns: property value, or None on error |
| """ |
| |
| # Make sure we have a working handle to bluetoothd |
| self._refresh_hci_prop_handle() |
| |
| for _ in range(MAX_DBUS_RETRY_ATTEMPTS): |
| try: |
| return self._dbus_hci_props.Get(iface_name, prop_name) |
| |
| except dbus.exceptions.DBusException as e: |
| error_msg = 'Will try to get {} again: {}'.format(prop_name, str(e)) |
| logging.error(error_msg) |
| time.sleep(.1) |
| |
| return None |
| |
| |
| def SetTrustedByRemoteAddress(self, remote_address, trusted=True): |
| device = self.GetDeviceWithAddress(remote_address) |
| if device is None: |
| logging.error('Failed to get device with the address %s', remote_address) |
| return False |
| |
| try: |
| properties = dbus.Interface(device, DBUS_PROP_IFACE) |
| properties.Set(DBUS_BLUEZ_DEVICE_IFACE, 'Trusted', |
| dbus.Boolean(trusted, variant_level=1)) |
| return True |
| except Exception as e: |
| logging.error('SetTrustedByRemoteAddress: %s', e) |
| except: |
| logging.error('SetTrustedByRemoteAddress: unexpected error') |
| return False |
| |
| |
| def GetCapabilities(self): |
| """What can this kit do/not do that tests need to adjust for? |
| |
| Returns: |
| A dictionary from PeripheralKit.CAP_* strings to an appropriate value. |
| See PeripheralKit for details. |
| """ |
| capabilities = {PeripheralKit.CAP_TRANSPORTS: |
| [PeripheralKit.TRANSPORT_BREDR, PeripheralKit.TRANSPORT_LE], |
| PeripheralKit.CAP_HAS_PIN: True, |
| PeripheralKit.CAP_INIT_CONNECT: True} |
| |
| return capabilities |
| |
| |
| def EnableBLE(self, use_ble): |
| """Put device into either LE or Classic mode |
| |
| The raspi is a dual device, supporting both LE and Classic BT. If we are |
| testing LE connections, I don't want the DUT to successfully be able to |
| make a classic connection, for instance, so the unwanted one is disabled |
| """ |
| |
| if use_ble: |
| state_change_cmds = ['sudo btmgmt power off', |
| 'sudo btmgmt le on', |
| 'sudo btmgmt bredr off', |
| 'sudo btmgmt sc on', |
| 'sudo btmgmt power on'] |
| |
| else: |
| state_change_cmds = ['sudo btmgmt power off', |
| 'sudo btmgmt bredr on', |
| 'sudo btmgmt le off', |
| 'sudo btmgmt ssp on', |
| 'sudo btmgmt power on'] |
| |
| for cmd in state_change_cmds: |
| os.system(cmd) |
| |
| # I add a slight delay so there is adequate time for the change to be done |
| time.sleep(.1) |
| |
| |
| def GetBaseDeviceType(self, device_type): |
| """Returns the base device type of a peripheral, i.e. BLE_MOUSE -> MOUSE""" |
| |
| if 'BLE_' in device_type: |
| device_type = device_type.replace('BLE_', '') |
| |
| return device_type |
| |
| |
| def SpecifyDeviceType(self, device_type): |
| """Instantiates one of our supported devices |
| |
| The raspi stack is designed to emulate a single device at a time. This |
| function is called by autotest and defines which device must be emulated |
| for the current test |
| |
| Args: |
| device_type: String device type, e.g. "MOUSE" |
| """ |
| |
| # Do nothing if we were already bound to this device |
| if self._device_type == device_type: |
| return True |
| |
| if self._device_type is not None: |
| error = 'Peripheral already bound to device: {}'.format(self._device_type) |
| logging.error(error) |
| raise BluezPeripheralException(error) |
| |
| self._device_type = device_type |
| |
| if 'BLE' in device_type: |
| logging.info('Binding to BLE device!') |
| |
| # Enable le only, to make sure our test device isn't connecting |
| # via BT classic |
| self.EnableBLE(True) |
| |
| # Establish and run gatt server |
| self._gatt_server = GATTServer(device_type) |
| |
| else: |
| logging.info('Binding to Classic device!') |
| |
| # Enable bt classic only, to make sure our test device isn't connecting |
| # via LE |
| self.EnableBLE(False) |
| |
| # Set device type and initiate service based on it |
| self._bluez_service = BluezService(self._device_type, |
| self.GetLocalBluetoothAddress()) |
| |
| self.Init() |
| |
| |
| self.SetAdvertisedName(PERIPHERAL_DEVICE_NAME[self.GetBaseDeviceType(\ |
| self._device_type)]) |
| |
| logging.info('Bluetooth peripheral now bound to %s', device_type) |
| |
| # Give the service a moment to initialize |
| time.sleep(1) |
| |
| return True |
| |
| |
| def SetBtdFlags(self, device_type): |
| """Allows us to set bluetoothd config execution flags |
| |
| In some cases, we wish to apply specific bluetoothd execution flags |
| depending on the device type we are emulating. This function sets the |
| bluetoothd flags, and is intended to be run before the service is restarted |
| |
| @param device_type: String device type, e.g. "MOUSE" |
| """ |
| # Define regex to separate executable from flags. Example contents: |
| # ExecStart=/usr/libexec/bluetooth/bluetoothd -d -P input |
| PLUGIN_RE = re.compile(r'^(?P<prefix>\s*ExecStart=.*?)' # pre-flags |
| r'(?P<flags>\s*-.*)?$') # optional flags block |
| |
| desired_btd_flags = PERIPHERAL_BTD_FLAGS[self.GetBaseDeviceType( |
| device_type)] |
| |
| file_contents = [] |
| with open(BTD_CONF_FILE, 'r') as mf: |
| file_contents = mf.readlines() |
| |
| exec_line = 0 |
| m = None |
| for idx, line in enumerate(file_contents): |
| m = PLUGIN_RE.search(line) |
| if m: |
| exec_line = idx |
| break |
| |
| if not m: |
| logging.warn('Failed to locate ExecStart block in bluetoothd config') |
| return |
| |
| # Update the exec line with desired bluetoothd flags |
| old_line = file_contents[exec_line] |
| file_contents[exec_line] = '{} {}\n'.format(m.group('prefix'), |
| desired_btd_flags) |
| |
| # Store the original as a comment above the new one, if it doesn't exist yet |
| if m.group('prefix') not in file_contents[exec_line-1]: |
| file_contents.insert(exec_line, '# {}'.format(old_line)) |
| |
| logging.info('Applying bluetoothd flags: %s', desired_btd_flags) |
| |
| # Write new contents back to the file |
| with open(BTD_CONF_FILE, 'w') as mf: |
| mf.write(''.join(file_contents)) |
| |
| def ResetStack(self, next_device_type=None): |
| """Restores BT stack to pristine state by restarting running services""" |
| reset_cmds = [ |
| 'systemctl daemon-reload', 'service bluetooth restart', |
| '/etc/init.d/chameleond restart' |
| ] |
| |
| # If we know what device type is used next, we can specify which plugins |
| # bluetoothd should be loaded with before the service is restarted |
| if next_device_type: |
| self.SetBtdFlags(next_device_type) |
| |
| self.AdapterPowerOff() |
| |
| # Stop Bluetooth audio related daemons which may interfere |
| # with HID tests. |
| # Since these commands may return non zero value if they are not |
| # running, we should not put them in the reset_cmds below. |
| os.system('sudo killall pulseaudio') |
| os.system('sudo service ofono stop') |
| |
| # Restart chameleon and bluetooth service |
| os.system(' && '.join(reset_cmds)) |
| |
| |
| # Power cycle the device |
| def PowerCycle(self): |
| return self.Reboot() |
| |
| |
| # Override BluetoothHID's implementation of init |
| def Init(self, factory_reset=True): |
| """Ensures our chip is in the correct state for the tests to be run""" |
| |
| # Make sure device is powered up and discoverable |
| if not self._get_hci_prop('org.bluez.Adapter1', 'Powered'): |
| logging.info('Init: Adapter not powered') |
| return False |
| self._set_hci_prop('org.bluez.Adapter1', 'Discoverable', dbus.Boolean(1)) |
| |
| # Set class based on device we're emulating |
| if self._device_type and 'BLE' not in self._device_type: |
| self.SetClassOfService(PERIPHERAL_DEVICE_CLASS[self._device_type]) |
| |
| return True |
| |
| |
| def CreateSerialDevice(self): |
| """Device setup and recovery |
| |
| In the current test framework, CreateSerialDevice is called for |
| device initialization or to try and correct an error. While we |
| don't have a serial connection, this function is used in the same |
| way - device setup and recovery |
| """ |
| |
| return self.Init() |
| |
| |
| # Check the serial device we aren't using |
| def CheckSerialConnection(self): |
| return True |
| |
| |
| # Close the serial device we aren't using |
| def Close(self): |
| return True |
| |
| |
| def EnterCommandMode(self): |
| return True |
| |
| |
| def GetPort(self): |
| return '/dev/fakedev' |
| |
| def AdapterPowerOff(self): |
| return self._set_hci_prop('org.bluez.Adapter1', 'Powered', dbus.Boolean(0)) |
| |
| def AdapterPowerOn(self): |
| return self._set_hci_prop('org.bluez.Adapter1', 'Powered', dbus.Boolean(1)) |
| |
| def Reboot(self): |
| logging.info('REBOOTING') |
| |
| self.AdapterPowerOff() |
| self.AdapterPowerOn() |
| |
| # Put ourselves back into correct state for discovery |
| return self.Init() |
| |
| |
| def SetAdvertisedName(self, name): |
| self._set_hci_prop('org.bluez.Adapter1', 'Alias', dbus.String(name)) |
| |
| def SetDiscoverable(self, discoverable): |
| self._set_hci_prop('org.bluez.Adapter1', 'Discoverable', |
| dbus.Boolean(discoverable)) |
| |
| # Also set discoverable in gatt server if relevant |
| if self._gatt_server: |
| self._gatt_server.SetDiscoverable(discoverable) |
| |
| |
| def GetAuthenticationMode(self): |
| return PeripheralKit.OPEN_MODE |
| |
| |
| def GetPinCode(self): |
| return '0000' |
| |
| |
| def GetLocalBluetoothAddress(self): |
| """Get the builtin Bluetooth MAC address. |
| |
| If the HCI device doesn't exist, Get() will throw an exception |
| (dbus.exceptions.DBus.Error.UnknownObject) |
| """ |
| |
| # Return stored value if we have one |
| if self._address is not None: |
| return self._address |
| |
| try: |
| addr = str(self._get_hci_prop('org.bluez.Adapter1', 'Address')) |
| self._address = addr |
| except Exception as e: |
| logging.error('Failed to get local addr: {}'.format(e)) |
| addr = None |
| return addr |
| |
| |
| def GetConnectionStatus(self): |
| """Determine whether the device has an active connection |
| |
| Returns: |
| True if a connection is active, False otherwise |
| """ |
| |
| # If we can't find a connected object, no active connections |
| if self.GetRemoteConnectedBluetoothAddress() is not None: |
| return True |
| |
| return False |
| |
| |
| def GetDeviceWithAddress(self, addr): |
| manager = dbus.Interface( |
| self._dbus_system_bus.get_object(DBUS_BLUEZ_SERVICE_IFACE, '/'), |
| DBUS_OBJECT_MANAGER_INTERFACE) |
| |
| objects = manager.GetManagedObjects() |
| |
| # Go through each object in org.bluez.Device1 until |
| # we find the one that matches our desired address |
| for path, ifaces in list(objects.items()): |
| device = ifaces.get(DBUS_BLUEZ_DEVICE_IFACE) |
| |
| if device is not None and device['Address'] == addr: |
| obj = self._dbus_system_bus.get_object(DBUS_BLUEZ_SERVICE_IFACE, path) |
| return dbus.Interface(obj, DBUS_BLUEZ_DEVICE_IFACE) |
| return None |
| |
| |
| def GetRemoteConnectedBluetoothAddress(self): |
| """Get the address of the current connected device, if applicable |
| |
| Returns: |
| None if no connection can be found |
| Mac address of connected device i |
| """ |
| |
| # Grab ObjectManager and its objects |
| manager = dbus.Interface(self._dbus_system_bus.get_object('org.bluez', '/'), |
| DBUS_OBJECT_MANAGER_INTERFACE) |
| |
| objects = manager.GetManagedObjects() |
| |
| # Go through each obj in org.bluez.Device1 and check "Connected" attribute |
| for _, ifaces in list(objects.items()): |
| device = ifaces.get(DBUS_BLUEZ_DEVICE_IFACE) |
| if device is None: |
| continue |
| if device['Address'] is not None and device['Connected']: |
| return device['Address'] |
| |
| return None |
| |
| |
| def SetClassOfService(self, class_of_service): |
| # Class is a read-only DBus property, so needs to be set using system calls. |
| cmd = 'sudo hciconfig hci0 class {}'.format(hex(class_of_service)) |
| os.system(cmd) |
| |
| |
| def SetRemoteAddress(self, remote_address): |
| """Sets address later used for connect |
| |
| Args: |
| remote_address: string denoting remote address |
| """ |
| self.remote_address = remote_address |
| return True |
| |
| def Connect(self): |
| """Attempts to connect to the address specified with SetRemoteAddress() |
| |
| Returns: |
| True if connection to remote address succeeds, False otherwise |
| """ |
| |
| if self.remote_address is None: |
| logging.error('Connect called with no remote address supplied') |
| return False |
| |
| logging.info('Attempting to connect to %s', self.remote_address) |
| device = self.GetDeviceWithAddress(self.remote_address) |
| |
| if 'BLE' in self._device_type: |
| # Call connect directly through bluez device object |
| |
| if not device: |
| logging.error('Device {} is None, can not connect'.format( |
| self.remote_address)) |
| return False |
| |
| try: |
| device.Connect() |
| |
| except Exception as e: |
| logging.error('Failed to connect to {}: {}'.format(self.remote_address, |
| e)) |
| return False |
| |
| else: |
| # Request Bluez service to connect to address |
| self.get_service_iface().Connect( |
| self.remote_address, |
| reply_handler=self.ConnectDoneHandler, |
| error_handler=self.ConnectErrorHandler) |
| |
| device_iface = dbus.Interface(device, DBUS_PROP_IFACE) |
| is_connected = bool(device_iface.Get(DBUS_BLUEZ_DEVICE_IFACE, 'Connected')) |
| start_time = time.time() |
| |
| while not is_connected: |
| if time.time() > start_time + CONNECTION_WAIT_TIME: |
| break |
| |
| time.sleep(1) |
| is_connected = bool(device_iface.Get(DBUS_BLUEZ_DEVICE_IFACE, |
| 'Connected')) |
| |
| return is_connected |
| |
| |
| def ConnectDoneHandler(self): |
| """Called when connect completes""" |
| pass |
| |
| |
| def ConnectErrorHandler(self, err): |
| """Called in case of error on connect""" |
| logging.error('ConnectErrorHandler: %s', err) |
| |
| |
| def _GetAdapterIface(self): |
| """Returns handle to bluez adapter interface |
| |
| Returns: |
| Handle to bluez adapter interface |
| """ |
| |
| if self._adapter_iface is None: |
| self._adapter_iface = dbus.Interface( |
| self._dbus_system_bus.get_object( |
| DBUS_BLUEZ_SERVICE_IFACE, |
| self._dbus_hci_adapter_path), |
| DBUS_BLUEZ_ADAPTER_IFACE) |
| |
| return self._adapter_iface |
| |
| |
| def RemoveDevice(self, remote_address): |
| """Removes a remote device from bluez |
| |
| Args: |
| remote_address: string address of BT device |
| """ |
| |
| try: |
| logging.info('Trying to remove {}'.format(remote_address)) |
| device = self.GetDeviceWithAddress(remote_address) |
| |
| if device: |
| self._GetAdapterIface().RemoveDevice(device) |
| |
| except Exception as e: |
| logging.info('Failed to remove device {}: {}'.format(remote_address, e)) |
| |
| |
| def StartDiscovery(self): |
| """Tries to start discovery on the adapter |
| |
| Returns: |
| True if discovery started without error, False otherwise |
| """ |
| |
| try: |
| self._GetAdapterIface().StartDiscovery() |
| |
| except Exception as e: |
| logging.error('Failed to start discovery: {}'.format(e)) |
| |
| return False |
| |
| return True |
| |
| |
| def StopDiscovery(self): |
| """Tries to stop discovery on the adapter |
| |
| Returns: |
| True if discovery stopped without error, False otherwise |
| """ |
| |
| try: |
| self._GetAdapterIface().StopDiscovery() |
| |
| except Exception as e: |
| logging.error('Failed to stop discovery: {}'.format(e)) |
| |
| return False |
| |
| return True |
| |
| |
| def StartUnfilteredDiscovery(self): |
| """Starts unfiltered discovery session for DUT advertisement testing |
| |
| Since there is no way to initiate an LE scan through bluez without |
| filtering enabled, submit request directly over HCI layer. This is useful |
| for LE advertising tests, as we will discover advertisements much faster. |
| """ |
| |
| # Submit LE Set Scan Enable command with filter_duplicates disabled |
| cmds = [ |
| HciCommands.le_set_scan_params(), |
| HciCommands.le_set_scan_enable(enable=True, filter_duplicates=False) |
| ] |
| |
| tool = HciTool(sudo=True, wait_time=0.1) |
| |
| for cmd in cmds: |
| tool.add_command(cmd) |
| |
| tool.run_commands() |
| |
| return True |
| |
| |
| def StopUnfilteredDiscovery(self): |
| """Stops unfiltered discovery session for DUT advertisement testing |
| |
| Disables a scan started via self.StartUnfilteredDiscovery() |
| """ |
| |
| tool = HciTool(sudo=True, wait_time=0.1) |
| |
| # Submit LE Set Scan Enable command with LE_Scan_enable set to false |
| tool.add_command(HciCommands.le_set_scan_enable(enable=False)) |
| |
| tool.run_commands() |
| |
| return True |
| |
| |
| def FindAdvertisementWithAttributes(self, attrs=[], timeout=10): |
| """Locate an advertisement containing the requested attributes from btmon |
| |
| Args: |
| attrs: List of strings to be located within an advertising event |
| timeout: Seconds to discover before returning failure |
| |
| Returns: |
| String containing matching advertising event if found, None otherwise |
| """ |
| |
| def _IsMatchedEvent(ev): |
| """Determines whether an event matches the requested criteria |
| |
| Args: |
| ev: String containing btmon event |
| |
| Returns: True if event matches criteria, False otherwise |
| """ |
| content = ''.join(ev) |
| |
| if 'LE Advertising Report' not in content: |
| return False |
| |
| # Search for all requested attributes. Only return true if all are found |
| for attr in attrs: |
| if attr not in content: |
| return False |
| |
| return True |
| |
| current_event = [] |
| |
| # Start searching for devices |
| if not self.StartUnfilteredDiscovery(): |
| return None |
| |
| end_time = time.time() + timeout |
| process = system_tools.SystemTools.RunInSubprocess('btmon') |
| |
| # If no btmon traffic is occurring, the readline call will block forever. |
| # Here, configure a timer to terminate the process after our selected |
| # timeout. |
| timer = threading.Timer(timeout, process.terminate) |
| timer.start() |
| |
| desired_event = None |
| while time.time() < end_time: |
| output = process.stdout.readline().decode('utf8') |
| if output == '' and process.poll() is not None: |
| break |
| |
| # output will contain a single line of btmon, i.e. |
| # '> HCI Event: LE Meta Event (0x3e) plen 43' |
| if output: |
| # Event headers are labeled with their data length. If we encounter a |
| # header, we check the last event to see if it matches our search |
| if ' plen ' in output: |
| if current_event and _IsMatchedEvent(current_event): |
| desired_event = ''.join(current_event) |
| break |
| |
| # If we found an event header and the previous wasn't a match, start |
| # recording this event |
| current_event = [] |
| |
| # Don't add header to this event, as the header timestamp has a non-zero |
| # chance to match a numeric search attribute |
| else: |
| current_event.append(output) |
| |
| # Terminate listening process and stop discovery |
| process.terminate() |
| timer.cancel() |
| self.StopUnfilteredDiscovery() |
| return desired_event |
| |
| |
| def _DiscoverDevice(self, remote_address, wait_time=10, |
| allow_early_discovery_by_path=False): |
| """This method determines if a device has been discovered via dbus path |
| |
| Here, we perform the same operation as test_discover_device in autotest, |
| where we consider a remote device "discovered" when bluez creates a dbus |
| object for it. This allows us to avoid installing and importing extra |
| libraries for scanning, particularly for LE. |
| |
| Args: |
| remote_address: string address of desired BT device |
| wait_time: seconds we should wait before returning failure to discover |
| allow_early_discovery_by_path: boolean denoting whether we can return |
| immediately if we already have a dbus object for this address. |
| Otherwise, a fresh advertisement is required for a device to be |
| discovered |
| |
| Returns: |
| True if device is found, False otherwise |
| """ |
| |
| if allow_early_discovery_by_path: |
| # Return early if we already have this device |
| if self.GetDeviceWithAddress(remote_address): |
| logging.info('Device {} found without discovery'.format(remote_address)) |
| return True |
| |
| self.discovered_devices = [] |
| # Register callback on PropertiesChanged signal |
| receiver = self._dbus_system_bus.add_signal_receiver( |
| self._DeviceFound, |
| signal_name='PropertiesChanged', |
| bus_name='org.bluez', |
| path_keyword='device_path') |
| |
| # Start searching for devices |
| self.StartDiscovery() |
| |
| # Wait until our device has been added to discovered_devices by listening |
| # thread, or timeout |
| start_time = time.time() |
| while remote_address not in self.discovered_devices: |
| if time.time() > start_time + wait_time: |
| break |
| |
| time.sleep(1) |
| |
| # Clean up |
| receiver.remove() |
| self.StopDiscovery() |
| |
| return remote_address in self.discovered_devices |
| |
| |
| def _DeviceFound(self, *args, **kwargs): |
| """Called when a property changes on the bluez d-bus interface |
| |
| Useful for tracking reception of advertisements, as they update the bluez |
| device |
| |
| Args: |
| args: list of form [caller, property_dict] |
| kwargs: dict containing keyword arguments requested in the |
| add_signal_receiver call i.e. device_path of calling object |
| """ |
| |
| # Renaming to be more human readable while satisfying pylint |
| changed_prop = args |
| caller_details = kwargs |
| |
| caller = str(changed_prop[0]) |
| |
| if 'Device1' in caller: |
| device_path = caller_details.get('device_path') |
| device_address = device_path[-17:].replace('_', ':') |
| |
| # Store new devices. list append is thread safe |
| if device_address not in self.discovered_devices: |
| logging.info('New device %s found', device_address) |
| self.discovered_devices.append(device_address) |
| |
| |
| def Discover(self, remote_address): |
| """Try to discover the remote device |
| |
| Returns: |
| True if remote address is discovered. |
| """ |
| |
| return self._DiscoverDevice(remote_address) |
| |
| |
| def Disconnect(self): |
| """Requests a disconnect from the remote device |
| |
| Returns: |
| True if connected device exists, False otherwise |
| """ |
| |
| logging.debug('Disconnecting from adapter') |
| |
| # Can't do anything if we're not connected |
| if not self.GetConnectionStatus(): |
| return False |
| |
| device = self.GetDeviceWithAddress(self.remote_address) |
| if device is None: |
| logging.error('Failed to get device with the address %s', |
| self.remote_address) |
| return False |
| |
| device.Disconnect() |
| |
| return True |
| |
| |
| def SendHIDReport(self, report): |
| """Sends a hid report to our bluez service |
| |
| Args: |
| report: scan code representing state of HID device |
| """ |
| # Passing with empty handlers allows operation to run async |
| self.get_service_iface().SendHIDReport(report, |
| reply_handler=self.KeysSentHandler, |
| error_handler=self.KeysErrorHandler) |
| |
| |
| def KeysSentHandler(self): |
| """Called when hid report send completes""" |
| pass |
| |
| |
| def KeysErrorHandler(self, err): |
| """Called in case of error on hid report send""" |
| logging.error('KeysErrorHandler: %s', err) |
| |
| |
| def _CheckValidModifiers(self, modifiers): |
| invalid_modifiers = [m for m in modifiers if m not in self.MODIFIERS] |
| if invalid_modifiers: |
| logging.error('Modifiers not valid: "%s".', str(invalid_modifiers)) |
| return False |
| return True |
| |
| |
| def _IsValidScanCode(self, code): |
| """Check if the code is a valid scan code. |
| |
| Args: |
| code: the code to check |
| |
| Returns: |
| True: if the code is a valid scan code. |
| """ |
| return (self.SCAN_NO_EVENT <= code <= self.SCAN_PAUSE or |
| self.SCAN_SYSTEM_POWER <= code <= self.SCAN_SYSTEM_WAKE) |
| |
| |
| def _CheckValidScanCodes(self, keys): |
| invalid_keys = [k for k in keys if not self._IsValidScanCode(k)] |
| if invalid_keys: |
| logging.error('Keys not valid: "%s".', str(invalid_keys)) |
| return False |
| return True |
| |
| |
| def RawKeyCodes(self, modifiers=None, keys=None): |
| """Generate the codes in raw keyboard report format. |
| |
| This method sends data in the raw report mode. The first start |
| byte chr(UART_INPUT_RAW_MODE) is stripped and the following bytes |
| are sent without interpretation. |
| |
| For example, generate the codes of 'shift-alt-i' by |
| codes = RawKeyCodes(modifiers=[RasPi.LEFT_SHIFT, RasPi.LEFT_ALT], |
| keys=[RasPi.SCAN_I]) |
| |
| Args: |
| modifiers: a list of modifiers |
| keys: a list of scan codes of keys |
| |
| Returns: |
| a raw code string if both modifiers and keys are valid, or |
| None otherwise. |
| """ |
| modifiers = modifiers or [] |
| keys = keys or [] |
| |
| if not (self._CheckValidModifiers(modifiers) and |
| self._CheckValidScanCodes(keys)): |
| return None |
| |
| real_scan_codes = [key for key in keys] |
| padding_0s = (0) * (RAW_REPORT_FORMAT_KEYBOARD_LEN_SCAN_CODES - |
| len(real_scan_codes)) |
| |
| return bytearray((UART_INPUT_RAW_MODE, |
| RAW_REPORT_FORMAT_KEYBOARD_LENGTH, |
| RAW_REPORT_FORMAT_KEYBOARD_DESCRIPTOR, |
| sum(modifiers), |
| 0x0) + |
| tuple(keys) + |
| padding_0s) |
| |
| |
| def _MouseButtonsRawHidValues(self): |
| """Gives the raw HID values for whatever buttons are pressed.""" |
| currently_pressed = 0x0 |
| for button in self._buttons_pressed: |
| if button == PeripheralKit.MOUSE_BUTTON_LEFT: |
| currently_pressed |= RAW_HID_LEFT_BUTTON |
| elif button == PeripheralKit.MOUSE_BUTTON_RIGHT: |
| currently_pressed |= RAW_HID_RIGHT_BUTTON |
| else: |
| error = 'Unknown mouse button in state: %s' % button |
| logging.error(error) |
| raise BluezPeripheralException(error) |
| return currently_pressed |
| |
| |
| def MouseMove(self, delta_x, delta_y): |
| """Move the mouse (delta_x, delta_y) steps. |
| |
| If buttons are being pressed, they will stay pressed during this operation. |
| This move is relative to the current position by the HID standard. |
| Valid step values must be in the range [-127,127]. |
| |
| Args: |
| delta_x: The number of steps to move horizontally. |
| Negative values move left, positive values move right. |
| delta_y: The number of steps to move vertically. |
| Negative values move up, positive values move down. |
| """ |
| raw_buttons = self._MouseButtonsRawHidValues() |
| if delta_x or delta_y: |
| mouse_codes = self._RawMouseCodes(buttons=raw_buttons, x_stop=delta_x, |
| y_stop=delta_y) |
| self.SendHIDReport(mouse_codes) |
| |
| |
| def MouseScroll(self, steps): |
| """Scroll the mouse wheel steps number of steps. |
| |
| Buttons currently pressed will stay pressed during this operation. |
| Valid step values must be in the range [-127,127]. |
| |
| Args: |
| steps: The number of steps to scroll the wheel. |
| With traditional scrolling: |
| Negative values scroll down, positive values scroll up. |
| With reversed (formerly "Australian") scrolling this is reversed. |
| """ |
| raw_buttons = self._MouseButtonsRawHidValues() |
| if steps: |
| mouse_codes = self._RawMouseCodes(buttons=raw_buttons, wheel=steps) |
| self.SendHIDReport(mouse_codes) |
| |
| |
| def MousePressButtons(self, buttons): |
| """Press the specified mouse buttons. |
| |
| The kit will continue to press these buttons until otherwise instructed, or |
| until its state has been reset. |
| |
| Args: |
| buttons: A set of buttons, as PeripheralKit MOUSE_BUTTON_* values, that |
| will be pressed (and held down). |
| """ |
| self._MouseButtonStateUnion(buttons) |
| raw_buttons = self._MouseButtonsRawHidValues() |
| if raw_buttons: |
| mouse_codes = self._RawMouseCodes(buttons=raw_buttons) |
| self.SendHIDReport(mouse_codes) |
| |
| |
| def MouseReleaseAllButtons(self): |
| """Release all mouse buttons.""" |
| self._MouseButtonStateClear() |
| mouse_codes = self._RawMouseCodes(buttons=RAW_HID_BUTTONS_RELEASED) |
| self.SendHIDReport(mouse_codes) |
| |
| |
| def _RawMouseCodes(self, buttons=0, x_stop=0, y_stop=0, wheel=0): |
| """Generate the codes in mouse raw report format. |
| |
| This method sends data in the raw report mode. The first start |
| byte chr(UART_INPUT_RAW_MODE) is stripped and the following bytes |
| are sent without interpretation. |
| |
| For example, generate the codes of moving cursor 100 pixels left |
| and 50 pixels down: |
| codes = _RawMouseCodes(x_stop=-100, y_stop=50) |
| |
| Args: |
| buttons: the buttons to press and release |
| x_stop: the pixels to move horizontally |
| y_stop: the pixels to move vertically |
| wheel: the steps to scroll |
| |
| Returns: |
| a raw code string. |
| """ |
| def SignedChar(value): |
| """Converted the value to a legitimate signed character value. |
| |
| Given value must be in [-127,127], or odd things will happen. |
| |
| Args: |
| value: a signed integer |
| |
| Returns: |
| a signed character value |
| """ |
| if value < 0: |
| # Perform two's complement. |
| return value + 256 |
| return value |
| |
| return bytearray((RAW_REPORT_START, |
| RAW_REPORT_FORMAT_MOUSE_DESCRIPTOR, |
| SignedChar(buttons), |
| SignedChar(x_stop), |
| SignedChar(y_stop), |
| SignedChar(wheel))) |
| |
| |
| def PressShorthandCodes(self, modifiers=None, keys=None): |
| """Generate key press codes in shorthand report format. |
| |
| Only key press is sent. The shorthand mode is useful in separating the |
| key press and key release events. |
| |
| For example, generate the codes of 'shift-alt-i' by |
| codes = PressShorthandCodes(modifiers=[RasPi.LEFT_SHIFT, RasPi.LEFT_ALT], |
| keys=[RasPi_I]) |
| |
| Args: |
| modifiers: a list of modifiers |
| keys: a list of scan codes of keys |
| |
| Returns: |
| a shorthand code string if both modifiers and keys are valid, or |
| None otherwise. |
| """ |
| modifiers = modifiers or [] |
| keys = keys or [] |
| |
| if not (self._CheckValidModifiers(modifiers) and |
| self._CheckValidScanCodes(keys)): |
| return None |
| |
| if len(keys) > SHORTHAND_REPORT_FORMAT_KEYBOARD_MAX_LEN_SCAN_CODES: |
| return None |
| |
| return (chr(UART_INPUT_SHORTHAND_MODE) + |
| chr(len(keys) + 1) + |
| chr(sum(modifiers)) + |
| ''.join([chr(key) for key in keys])) |
| |
| |
| def ReleaseShorthandCodes(self): |
| """Generate the shorthand report format code for key release. |
| |
| Key release is sent. |
| |
| Returns: |
| a special shorthand code string to release any pressed keys. |
| """ |
| return chr(UART_INPUT_SHORTHAND_MODE) + chr(0x0) |
| |
| |
| def GetKitInfo(self): |
| """A simple demo of getting kit information.""" |
| logging.info('advertised name: %s', self.GetAdvertisedName()) |
| logging.info('local bluetooth address: %s', self.GetLocalBluetoothAddress()) |
| class_of_service = self.GetClassOfService() |
| try: |
| class_of_service = hex(class_of_service) |
| except TypeError: |
| pass |
| logging.info('Class of service: %s', class_of_service) |
| class_of_device = self.GetClassOfDevice() |
| try: |
| class_of_device = hex(class_of_device) |
| except TypeError: |
| pass |
| logging.info('Class of device: %s', class_of_device) |
| |
| |
| if __name__ == '__main__': |
| # To allow basic printing when run from command line |
| logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) |
| |
| kit_instance = BluezPeripheral() |
| kit_instance.GetKitInfo() |