blob: b638290a01786613c9cb7b7b32a5ec4d05218ac9 [file] [log] [blame] [edit]
#!/usr/bin/env python3
#
# Copyright 2013 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import argparse
import logging
import os
import re
import threading
import uuid
import yaml
from cros.factory.device.bluetooth import AbstractBluetoothManager
from cros.factory.device.bluetooth import BluetoothManagerException
from cros.factory.test.utils import bluetooth_utils
from cros.factory.utils.sync_utils import PollForCondition
from cros.factory.utils.sync_utils import RetryDecorator
from cros.factory.external.py_lib import dbus
# yapf: enable
# yapf: disable
# pylint: disable=no-name-in-module,import-error
# yapf: disable
from cros.factory.external.py_lib.dbus import DBusException # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
from cros.factory.external.py_lib.dbus.mainloop.glib import DBusGMainLoop # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
from cros.factory.external.py_lib.dbus import service # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
from cros.factory.external.py_lib.gi.repository import GLib as gobject # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
BUS_NAME = 'org.bluez'
SERVICE_NAME = 'org.bluez'
ADAPTER_INTERFACE = SERVICE_NAME + '.Adapter1'
DEVICE_INTERFACE = SERVICE_NAME + '.Device1'
AGENT_INTERFACE = SERVICE_NAME + '.Agent1'
_RE_NODE_NAME = re.compile(r'<node name="(.*?)"/>')
class AuthenticationAgent(service.Object):
"""An authenticator for Bluetooth devices
This class implements methods from the org.bluez.Agent1 D-Bus
interface, which allow Bluetooth devices to authenticate themselves
against the host (the computer running this script). This does not
implement the full interface; for example it does not support the
legacy PIN code mechanism used by pre-2.1 Bluetooth keyboards.
Properties:
_bus: The device Bus to use
_path: The object path of the Agent
_display_passkey_callback: A function with signature (string) that takes
a 6 digit passkey to display to the user
_cancel_callback: A function that takes no parameters, used
to indicate cancellation from the device
"""
def __init__(self, bus, path, display_passkey_callback, cancel_callback):
service.Object.__init__(self, bus, path)
self._display_passkey_callback = display_passkey_callback
self._cancel_callback = cancel_callback
# The following method names and their in/out signatures must match the
# corresponding methods in the BlueZ 5 Agent1 DBus interface, including the
# types of the parameters. The signature 'ouq' indicates that we take three
# parameters (excluding self), of type Object, uint32, uint16 respectively
# (note that 'q' is NOT a quadword like you might expect). Of course, in
# Python, they're just arbitrary precision integers, but the signature
# must still match for the method to be properly called.
@service.method(AGENT_INTERFACE, in_signature='ouq', out_signature='')
def DisplayPasskey(self, device, passkey, entered):
logging.info('DisplayPasskey (%s, %06u entered %u)',
device, passkey, entered)
# passkey is always 6 digits, so add any leading 0s
passkey_str = str(passkey).zfill(6)
self._display_passkey_callback(passkey_str)
@service.method(AGENT_INTERFACE, in_signature='', out_signature='')
def Cancel(self):
logging.info('Cancel')
self._cancel_callback()
@service.method(AGENT_INTERFACE, in_signature='ou', out_signature='')
def RequestConfirmation(self, device, passkey):
logging.info('RequestConfirmation (%s, %06d)', device, passkey)
passkey_str = str(passkey).zfill(6)
self._display_passkey_callback(passkey_str)
# TODO(cychiang) Add unittest for this class.
class ChromeOSBluetoothManager(AbstractBluetoothManager):
"""The class to handle bluetooth adapter and device through dbus interface.
Properties:
_main_loop: The object representing the main event loop of a PyGTK
or PyGObject application. The main loop should be running
when calling function with callback through dbus interface.
_manager: The proxy for the org.freedesktoop.DBus.ObjectManager interface
on ojbect path / on bus org.bluez.
Raises:
Raises BluetoothManagerException if org.bluez.Manager object is not
available through dbus interface.
"""
# yapf: disable
Error = BluetoothManagerException # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
def __init__(self, dut):
super().__init__(dut)
DBusGMainLoop(set_as_default=True)
self._main_loop = gobject.MainLoop()
self._manager = None
# yapf: disable
bus = dbus.SystemBus() # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
try:
# yapf: disable
self._manager = dbus.Interface(bus.get_object(BUS_NAME, '/'), # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
'org.freedesktop.DBus.ObjectManager')
except DBusException as e:
raise BluetoothManagerException(
f'DBus Exception in getting Managerdbus Interface: {e}.') from None
def _FindDeviceInterface(self, mac_addr, adapter):
"""Given a MAC address, returns the corresponding device dbus object
Args:
mac_addr: The MAC address of the remote device
adapter: The bluetooth adapter dbus interface object
"""
# Remote devices belonging to the given adapter
# have their path prefixed by the adapter's object path
path_prefix = adapter.object_path
# yapf: disable
bus = dbus.SystemBus() # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
remote_objects = self._manager.GetManagedObjects() # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
for path, ifaces in remote_objects.items():
if path.startswith(path_prefix):
device = ifaces.get(DEVICE_INTERFACE)
if device and str(device['Address']) == mac_addr:
matching_device = bus.get_object(SERVICE_NAME, path)
# yapf: disable
return dbus.Interface(matching_device, DEVICE_INTERFACE) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
return None
def SetDeviceConnected(self, adapter, device_address, connect):
"""Switches the device connection.
Args:
adapter: The adapter interface to control.
device_address: The mac address of input device to control.
connect: True/False to turn on/off the connection.
Returns:
Return True if succeed to turn on/off connection.
Raises:
Raises BluetoothManagerException if fail to find device or fail to switch
connection.
"""
try:
device = self._FindDeviceInterface(device_address, adapter)
except DBusException as e:
raise BluetoothManagerException(
f'SetDeviceConnected: fail to find device {device_address}: {e}'
) from None
try:
if connect:
device.Connect()
else:
# If we could not find the device, then we are not connected to it
if device:
device.Disconnect()
except DBusException as e:
raise BluetoothManagerException(
f'SetDeviceConnected: fail to switchconnection: {e}') from None
else:
return True
def RemovePairedDevice(self, adapter, device_address):
"""Removes the paired device.
Note that a removed device may not be found in subsequent scans
for a period of time.
Args:
adapter: The adapter interface to control.
device_address: The mac address of input device to control.
Returns:
Return True if succeed to remove paired device.
Raises:
Raises BluetoothManagerException if fail to remove paired device.
"""
try:
device = self._FindDeviceInterface(device_address, adapter)
if device:
adapter.RemoveDevice(device)
except DBusException as e:
raise BluetoothManagerException(
f'RemovePairedDevice: fail to remove device: {e}.') from None
else:
logging.info('succesfully removed device.')
return True
def DisconnectAndUnpairDevice(self, adapter, device_address):
"""Disconnects and unpairs from the device, even if not currently paired.
This is intended to restore Bluetooth connection status to a known state.
DBus raises exceptions if not currently paired, so we swallow those.
Args:
adapter: The adapter interface to control.
device_address: The mac address of input device to control.
Returns: Nothing
"""
device = self._FindDeviceInterface(device_address, adapter)
if device:
try:
device.Disconnect()
except DBusException:
pass
try:
device.CancelPairing()
except DBusException:
pass
def CreatePairedDevice(self, adapter, device_address,
display_passkey_callback=None,
cancel_callback=None):
"""Create paired device.
Attempt to pair with a Bluetooth device, making the computer running this
script the host. If a callback is specified for displaying a passkey, this
host will report KeyboardDisplay capabilities, allowing the remote device
(which must be a keyboard) to respond with a passkey which must be typed on
it to authenticate. If the callback is not specified, the host reports no
interactive capabilities, forcing a "Just Works" pairing model (that's the
actual name in the Bluetooth spec) that pairs with no authentication.
Args:
adapter: The adapter interface to control.
device_address: The mac address of input device to control.
display_passkey_callback: None, or a function with signature (string)
that is passed a passkey for authentication
cancel_callback: None, or a function that accepts no arguments that is
invoked if the remote device cancels authentication
Raises:
Raises BluetoothManagerException if fails to create service agent or
fails to create paired device.
"""
matching_device = self._FindDeviceInterface(device_address, adapter)
if not matching_device:
raise BluetoothManagerException(
f'CreatePairedDevice: Address was not found in scan: {device_address}'
)
success = threading.Event()
def _ReplyHandler():
"""The callback to handle success of device.Pair."""
logging.info('Paired with device %s.', matching_device)
success.set()
self._main_loop.quit()
def _ErrorHandler(error):
"""The callback to handle error of device.Pair."""
logging.error('Pairing with device failed: %s.', error)
if cancel_callback:
cancel_callback()
self._main_loop.quit()
# yapf: disable
bus = dbus.SystemBus() # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# Exposes a service agent object at a unique path for this test.
agent_id = str(uuid.uuid4()).replace('-', '')
agent_path = os.path.join('/BluetoothTest', 'agent', agent_id)
obj = bus.get_object(BUS_NAME, '/org/bluez')
# yapf: disable
agent_manager = dbus.Interface(obj, 'org.bluez.AgentManager1') # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
logging.info('CreatePairedDevice: Set agent path at %s.', agent_path)
try:
if display_passkey_callback is None:
capability = 'NoInputNoOutput'
service.Object(bus, agent_path)
else:
capability = 'KeyboardDisplay'
AuthenticationAgent(bus, agent_path,
display_passkey_callback=display_passkey_callback,
cancel_callback=cancel_callback)
# yapf: disable
agent_manager.RegisterAgent(agent_path, dbus.String(capability)) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
except DBusException as e:
if str(e).find('there is already a handler.'):
logging.info('There is already an agent there, that is OK: %s.', e)
else:
logging.exception('Fail to create agent.')
raise BluetoothManagerException('CreatePairedDevice:'
'Fail to create agent.') from None
matching_device.Pair(reply_handler=_ReplyHandler,
error_handler=_ErrorHandler)
self._main_loop.run()
if success.isSet():
return True
raise BluetoothManagerException('Pair: reply_handler'
' did not get called.')
def _GetAdapters(self, mac_addr=None):
"""Gets a list of available bluetooth adapters.
Args:
mac_addr: The MAC address that adapter should match. None to match any.
Returns:
Returns a list of adapters. An adapter is a proxy object which provides
the interface of 'org.bluez.Adapter1'.
Raises:
Raises BluetoothManagerException if fail to get adapter interface.
"""
# yapf: disable
objects = self._manager.GetManagedObjects() # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
bus = dbus.SystemBus() # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
adapters = []
for path, interfaces in objects.items():
adapter = interfaces.get(ADAPTER_INTERFACE)
if adapter is None:
continue
if mac_addr and adapter.get('Address') != mac_addr:
continue
obj = bus.get_object(BUS_NAME, path)
# yapf: disable
adapters.append(dbus.Interface(obj, ADAPTER_INTERFACE)) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
return adapters
def GetAdapters(self, max_retry_times=10, interval=2, mac_addr=None):
"""Gets a list of available bluetooth adapters.
Args:
max_retry_times: The maximum retry times to find adapters.
interval: The sleep interval between two trials in seconds.
Returns:
A list of adapters found. Each adapter is a proxy object which provides
the interface of 'org.bluez.Adapter1'. Returns None if there is no
available adapter.
"""
retry_wrapper = RetryDecorator(max_attempt_count=max_retry_times,
interval_sec=interval,
timeout_sec=float('inf'))
adapters = retry_wrapper(self._GetAdapters)(mac_addr=mac_addr)
if adapters is None:
logging.error('AbstractBluetoothManager: Fail to get any adapter.')
return None
logging.info('GetAdapters (mac_addr=%s): %s', mac_addr, adapters)
return adapters
def _SwitchAdapterPower(self, adapter, on):
"""Powers on adapter by setting the Powered property.
This will bring up the adapter like 'hciconfig <DEV> up' does.
Args:
adapter: The adapter proxy object.
on: True/False for power on/off.
"""
# yapf: disable
bus = dbus.SystemBus() # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
device_prop = dbus.Interface(bus.get_object(BUS_NAME, adapter.object_path), # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
'org.freedesktop.DBus.Properties')
device_prop.Set(ADAPTER_INTERFACE, 'Powered', on)
def _WaitUntilStartDiscovery(self, adapter, timeout_secs):
"""Waits until adapter starts discovery mode.
After calling adapter.StartDiscovery(), there is a delay before the adapter
actually start scanning. This function blocks until it sees adapter property
"Discovering" is True with a timeout timeout_secs.
"""
# yapf: disable
bus = dbus.SystemBus() # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
device_prop = dbus.Interface(bus.get_object(BUS_NAME, adapter.object_path), # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
'org.freedesktop.DBus.Properties')
PollForCondition(
poll_method=lambda: device_prop.Get(ADAPTER_INTERFACE, 'Discovering'),
condition_method=lambda ret: ret == 1,
timeout_secs=timeout_secs,
condition_name='Wait for Discovering==1')
def RemoveDevices(self, adapter, paths):
"""Lets adapter to remove devices in paths.
Args:
adapter: The adapter proxy object.
paths: A list of device paths to be removed.
"""
logging.info('Removing devices...')
for path in paths:
try:
adapter.RemoveDevice(path)
except DBusException as e:
if str(e).find('Does Not Exist'):
logging.warning('Can not remove device %s because it is not present',
path)
def GetAllDevicePaths(self, adapter):
"""Gets all device paths under the adapter"""
# yapf: disable
introspect = dbus.Interface(adapter, 'org.freedesktop.DBus.Introspectable') # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
node_names = _RE_NODE_NAME.findall(introspect.Introspect())
logging.info('node names: %s', node_names)
paths = [os.path.join(adapter.object_path, x) for x in node_names]
logging.info('paths: %s', paths)
return paths
def GetAllDevices(self, adapter):
"""Gets all device properties of scanned devices under the adapter
The returned value is a dict containing the properties of scanned
devices. Keys are device mac addresses and values are device
properties.
Args:
adapter: The adapter interface to query.
Returns:
A dict containing the information of scanned devices. The dict maps
devices mac addresses to device properties.
"""
result = {}
path_prefix = adapter.object_path
# yapf: disable
remote_objects = self._manager.GetManagedObjects() # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
for path, ifaces in remote_objects.items():
if path.startswith(path_prefix):
device = ifaces.get(DEVICE_INTERFACE)
if device and "Address" in device:
result[device["Address"]] = device
return result
def ScanDevices(self, adapter, timeout_secs=10, match_address=None,
remove_before_scan=True):
"""Scans device around using adapter for timeout_secs.
The returned value devices is a dict containing the properties of
scanned devies. Keys are device mac addresses and values are device
properties.
For example: devices = {
dbus.String(u'08:3E:8E:2A:90:24'): dbus.Dictionary(
{dbus.String(u'Paired'): dbus.Boolean(False, variant_level=1),
dbus.String(u'LegacyPairing'): dbus.Boolean(False,
variant_level=1),
dbus.String(u'Alias'): dbus.String(u'08-3E-8E-2A-90-24',
variant_level=1),
dbus.String(u'Address'): dbus.String(u'08:3E:8E:2A:90:24',
variant_level=1),
dbus.String(u'RSSI'): dbus.Int16(-79, variant_level=1),
dbus.String(u'Class'): dbus.UInt32(0L, variant_level=1),
dbus.String(u'Trusted'): dbus.Boolean(False, variant_level=1)},
signature=dbus.Signature('sv'))
dbus.String(u'00:07:61:FC:0B:E8'): dbus.Dictionary(
{dbus.String(u'Name'):
dbus.String(u'Logitech Bluetooth Mouse M555b',
variant_level=1),
dbus.String(u'Paired'): dbus.Boolean(False, variant_level=1),
dbus.String(u'LegacyPairing'): dbus.Boolean(False,
variant_level=1),
dbus.String(u'Alias'):
dbus.String(u'Logitech Bluetooth Mouse M555b',
variant_level=1),
dbus.String(u'Address'): dbus.String(u'00:07:61:FC:0B:E8',
variant_level=1),
dbus.String(u'RSSI'): dbus.Int16(-56, variant_level=1),
dbus.String(u'Class'): dbus.UInt32(9600L, variant_level=1),
dbus.String(u'Trusted'): dbus.Boolean(False, variant_level=1),
dbus.String(u'Icon'): dbus.String(u'input-mouse',
variant_level=1)},
signature=dbus.Signature('sv'))}
Args:
adapter: The adapter interface to control.
timeout_secs: The duration of scan.
match_address: return the device immediately that matches the MAC address.
The purpose is to speed up the scanning.
remove_before_scan: Remove devices under adapter before scanning.
Returns:
A dict containing the information of scanned devices. The dict maps
devices mac addresses to device properties.
"""
logging.info('Controlling adapter %s', adapter)
if remove_before_scan:
logging.info('Remove old devices before scanning...')
old_device_paths = self.GetAllDevicePaths(adapter)
self.RemoveDevices(adapter, old_device_paths)
# devices is a mapping from device path to device properties.
devices = {}
self._SwitchAdapterPower(adapter, True)
logging.info('Powered on adapter')
def _QuitScan(reason):
"""Quit the scan with the given reason.
Possible reasons
- timeout occurs
- the match_address has been found
Returns:
False since we want this to be called at most once.
"""
logging.info(reason)
adapter.StopDiscovery()
logging.info('Discovery is stopped.')
self._main_loop.quit()
return False
def _CallbackInterfacesAdded(path, interfaces):
"""The callback when an interface is found.
When the adapter finds a device, it will assign the device a path and add
that device interface to dbus.
Reads the properties of device through interfaces and add the mapping
from device path to device properties into 'devices'.
Args:
path: The device path.
interfaces: The interface types.
"""
logging.info('InterfacesAdded')
if DEVICE_INTERFACE not in interfaces:
return
properties = interfaces[DEVICE_INTERFACE]
for key, value in properties.items():
logging.debug('%s : %s', key, value)
if path in devices:
logging.info('replace old device properties with new device properties')
devices[path] = properties
address = (properties['Address'] if 'Address' in properties
else '<unknown>')
logging.info('Bluetooth Device Found: %s.', address)
if match_address and address == match_address:
_QuitScan(f'Device {match_address} found.')
if 'RSSI' in properties:
logging.info('Address: %s, RSSI: %s', address, properties['RSSI'])
# pylint: disable=unused-argument
def _CallbackDevicePropertiesChanged(interface, changed, invalidated, path):
"""The callback when device properties changed.
This is mainly for debug usage since device is scanned when callback for
InterfaceAdded is called.
Args:
interface: Interface name.
changed: A dict of changed properties.
invalidated: A list of properties changed but the value is
not provided.
path: The path of signal emitter.
"""
logging.debug('Device properties changed %s: %s at path %s',
interface, changed, path)
if interface != DEVICE_INTERFACE:
logging.error('should not get called with interface %s', interface)
return
address = (devices[path]['Address']
if path in devices and 'Address' in devices[path]
else '<unknown>')
if 'RSSI' in changed:
logging.info('Address: %s, new RSSI: %s', address, changed['RSSI'])
# yapf: disable
bus = dbus.SystemBus() # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
bus.add_signal_receiver(_CallbackInterfacesAdded,
dbus_interface='org.freedesktop.DBus.ObjectManager',
signal_name='InterfacesAdded')
bus.add_signal_receiver(_CallbackDevicePropertiesChanged,
dbus_interface='org.freedesktop.DBus.Properties',
signal_name='PropertiesChanged',
arg0=DEVICE_INTERFACE,
path_keyword='path')
adapter.StartDiscovery()
logging.info('Start discovery')
# Normally it takes less than a second to start discovery.
# Raises TimeoutError if it fails to start discovery within timeout.
self._WaitUntilStartDiscovery(adapter, 3)
logging.info('Device scan started.')
# Scan for timeout_secs
source_id = gobject.timeout_add(timeout_secs * 1000, _QuitScan,
'Device scan timed out')
self._main_loop.run()
gobject.source_remove(source_id)
bus.remove_signal_receiver(
_CallbackInterfacesAdded,
dbus_interface='org.freedesktop.DBus.ObjectManager',
signal_name='InterfacesAdded')
bus.remove_signal_receiver(_CallbackDevicePropertiesChanged,
dbus_interface='org.freedesktop.DBus.Properties',
signal_name='PropertiesChanged',
arg0=DEVICE_INTERFACE,
path_keyword='path')
logging.info('Transform the key from path to address...')
devices_address_properties = dict(
((value['Address'], value) for value in devices.values()
if 'Address' in value))
return devices_address_properties
USAGE = """
Controls btmgmt tool to scan remote devices.
"""
class BluetoothTest:
"""A class to test bluetooth in command line."""
args = None
btmgmt = None
def Main(self):
self.ParseArgs()
self.Run()
def ParseArgs(self):
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=USAGE)
parser.add_argument(
'--properties', dest='properties', action='store_true',
help='Shows properties in the scanned results')
parser.add_argument(
'--forever', dest='forever', action='store_true',
help='Scans forever')
parser.add_argument(
'--manufacturer_id', default=None, type=int,
help='Set manufacturer id to specify the interface. Get it from '
'`btmgmt info`.')
self.args = parser.parse_args()
logging.basicConfig(level=logging.INFO)
def Run(self):
"""Controls btmgmt tool to scan remote devices."""
# yapf: disable
self.btmgmt = bluetooth_utils.BtMgmt(self.args.manufacturer_id) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
if self.args.forever: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
while True:
self._RunOnce()
else:
self._RunOnce()
def _RunOnce(self):
"""Scans once."""
# yapf: disable
result = self.btmgmt.FindDevices() # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
if self.args.properties: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
logging.info(yaml.safe_dump(result, default_flow_style=False))
if __name__ == '__main__':
BluetoothTest().Main()