blob: c3e5b18a3d9e9d3a2a56846d507234159e7ae9af [file] [log] [blame]
#!/usr/bin/python
#
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import argparse
import dbus
import factory_common # pylint: disable=W0611
import gobject
import logging
import os
import re
import threading
import uuid
import yaml
from cros.factory.test.utils import Retry
from cros.factory.utils.net_utils import PollForCondition
from dbus.mainloop.glib import DBusGMainLoop
from dbus import service # pylint: disable=W0611
from dbus import DBusException
BUS_NAME = 'org.bluez'
SERVICE_NAME = 'org.bluez'
ADAPTER_INTERFACE = SERVICE_NAME + '.Adapter1'
DEVICE_INTERFACE = SERVICE_NAME + '.Device1'
_RE_NODE_NAME = re.compile(r'<node name="(.*?)"/>')
class BluetoothManagerException(Exception):
pass
#TODO(cychiang) Add unittest for this class.
class BluetoothManager(object):
"""The class to handle bluetooth adapter and device through dbus interface.
Properties:
_main_loop: The object representing the main event loop of a PyGTK
or PyGObject application. The main loop should be running
when calling function with callback through dbus interface.
_manager: The proxy for the org.freedesktoop.DBus.ObjectManager interface
on ojbect path / on bus org.bluez.
Raises:
Raises BluetoothManagerException if org.bluez.Manager object is not
available through dbus interface.
"""
def __init__(self):
DBusGMainLoop(set_as_default=True)
self._main_loop = gobject.MainLoop()
self._manager = None
bus = dbus.SystemBus()
try:
self._manager = dbus.Interface(bus.get_object(BUS_NAME, '/'),
'org.freedesktop.DBus.ObjectManager')
except DBusException as e:
raise BluetoothManagerException('DBus Exception in getting Manager'
'dbus Interface: %s.' % e)
#TODO(cychiang). Migrate to bluez5.x api. Check crosbug.com/p/19276.
def SetDeviceConnected(self, adapter, device_address, connect):
"""Switches the device connection.
Args:
adapter: The adapter interface to control.
device_address: The mac address of input device to control.
connect: True/False to turn on/off the connection.
Returns:
Return True if succeed to turn on/off connection.
Raises:
Raises BluetoothManagerException if fail to find device or fail to switch
connection.
"""
try:
device = adapter.FindDevice(device_address)
except DBusException as e:
raise BluetoothManagerException('SetDeviceConnected: fail to find device'
' %s: %s' % (device_address, e))
bus = dbus.SystemBus()
input_interface = dbus.Interface(bus.get_object(BUS_NAME, device),
'org.bluez.Input')
try:
if connect:
input_interface.Connect()
else:
input_interface.Disconnect()
except DBusException as e:
raise BluetoothManagerException('SetDeviceConnected: fail to switch'
'connection: %s' % e)
else:
return True
#TODO(cychiang). Migrate to bluez5.x api. Check crosbug.com/p/19276.
def RemovePairedDevice(self, adapter, device_address):
"""Removes the paired device.
Args:
adapter: The adapter interface to control.
device_address: The mac address of input device to control.
Returns:
Return True if succeed to remove paired device.
Raises:
Raises BluetoothManagerException if fail to remove paired device.
"""
try:
device = adapter.FindDevice(device_address)
adapter.RemoveDevice(device)
except DBusException as e:
raise BluetoothManagerException('RemovePairedDevice: fail to remove'
' device: %s.' % e)
else:
logging.info('succefully removed device.')
return True
#TODO(cychiang). Migrate to bluez5.x api. Check crosbug.com/p/19276.
def CreatePairedDevice(self, adapter, device_address):
"""Create paired device.
Args:
adapter: The adapter interface to control.
device_address: The mac address of input device to control.
Raises:
Raises BluetoothManagerException if fails to create service agent or
fails to create paired device.
"""
success = threading.Event()
def _ReplyHandler(device):
"""The callback to handle success of adapter.CreatePairedDevice."""
logging.info('Created device %s.', device)
success.set()
self._main_loop.quit()
def _ErrorHandler(error):
"""The callback to handle error of adapter.CreatePairedDevice."""
logging.error('Creating device failed: %s.', error)
self._main_loop.quit()
bus = dbus.SystemBus()
# Exposes a service agent object at a unique path for this test.
agent_id = str(uuid.uuid4()).replace('-', '')
agent_path = os.path.join('/BluetoothTest', 'agent', agent_id)
logging.info('CreatePairedDevice: Set agent path at %s.', agent_path)
try:
dbus.service.Object(bus, agent_path)
except DBusException as e:
if str(e).find('there is already a handler.'):
logging.info('There is already an agent there, that is OK: %s.', e)
else:
logging.exception('Fail to create agent.')
raise BluetoothManagerException('CreatePairedDevice:'
'Fail to create agent.')
adapter.CreatePairedDevice(device_address, agent_path, 'DisplayYesNo',
reply_handler=_ReplyHandler,
error_handler=_ErrorHandler)
self._main_loop.run()
if success.isSet():
return True
else:
raise BluetoothManagerException('CreatePairedDevice: reply_handler'
' did not get called.')
def GetFirstAdapter(self):
"""Returns the first adapter object found by bluetooth manager.
An adapter is a proxy object which provides the interface of
'org.bluez.Adapter1'.
Raises:
Raises BluetoothManagerException if fails to get any adapter.
"""
adapters = self.GetAdapters()
if len(adapters) > 0:
return adapters[0]
else:
raise BluetoothManagerException('Fail to find any adapter.')
def _GetAdapters(self):
"""Gets a list of available bluetooth adapters.
Returns:
Returns a list of adapters. An adapter is a proxy object which provides
the interface of 'org.bluez.Adapter1'.
Raises:
Raises BluetoothManagerException if fail to get adapter interface.
"""
objects = self._manager.GetManagedObjects()
bus = dbus.SystemBus()
adapters = []
for path, interfaces in objects.iteritems():
adapter = interfaces.get(ADAPTER_INTERFACE)
if adapter is None:
continue
obj = bus.get_object(BUS_NAME, path)
adapters.append(dbus.Interface(obj, ADAPTER_INTERFACE))
return adapters
def GetAdapters(self, max_retry_times=10, interval=2):
"""Gets a list of available bluetooth adapters.
Args:
max_retry_times: The maximum retry times to find adapters.
interval: The sleep interval between two trials in seconds.
Returns:
A list of adapters found. Each adapter is a proxy object which provides
the interface of 'org.bluez.Adapter1'. Returns None if there is no
available adapter.
"""
adapters = Retry(max_retry_times, interval, None,
self._GetAdapters)
if adapters is None:
logging.error('BluetoothManager: Fail to get any adapter.')
return None
else:
return adapters
def _SwitchAdapterPower(self, adapter, on):
"""Powers on adapter by setting the Powered property.
This will bring up the adapter like 'hciconfig <DEV> up' does.
Args:
adapter: The adapter proxy object.
on: True/False for power on/off.
"""
bus = dbus.SystemBus()
device_prop = dbus.Interface(bus.get_object(BUS_NAME, adapter.object_path),
'org.freedesktop.DBus.Properties')
device_prop.Set(ADAPTER_INTERFACE, 'Powered', on)
def _WaitUntilStartDiscovery(self, adapter, timeout_secs):
"""Waits until adapter starts discovery mode.
After calling adapter.StartDiscovery(), there is a delay before the adapter
actually start scanning. This function blocks until it sees adapter property
"Discovering" is True with a timeout timeout_secs.
"""
bus = dbus.SystemBus()
device_prop = dbus.Interface(bus.get_object(BUS_NAME, adapter.object_path),
'org.freedesktop.DBus.Properties')
_Condition = lambda: device_prop.Get(ADAPTER_INTERFACE, 'Discovering') == 1
PollForCondition(_Condition, timeout_secs,
condition_name="Wait for Discovering==1")
def RemoveDevices(self, adapter, paths):
"""Lets adapter to remove devices in paths.
Args:
adapter: The adapter proxy object.
paths: A list of device paths to be removed.
"""
logging.info('Removing devices...')
for path in paths:
try:
adapter.RemoveDevice(path)
except DBusException as e:
if str(e).find('Does Not Exist'):
logging.warning('Can not remove device %s because it is not present',
path)
def GetAllDevicePaths(self, adapter):
"""Gets all device paths under the adapter"""
introspect = dbus.Interface(adapter, 'org.freedesktop.DBus.Introspectable')
node_names = _RE_NODE_NAME.findall(introspect.Introspect())
logging.info('node names: %s', node_names)
paths = [os.path.join(adapter.object_path, x) for x in node_names]
logging.info('paths: %s', paths)
return paths
def ScanDevices(self, adapter, timeout_secs=10, remove_before_scan=True):
"""Scans device around using adapter for timeout_secs.
The returned value devices is a dict containing the properties of
scanned devies. Keys are device mac addresses and values are device
properties.
For example: devices = {
dbus.String(u'08:3E:8E:2A:90:24'): dbus.Dictionary(
{dbus.String(u'Paired'): dbus.Boolean(False, variant_level=1),
dbus.String(u'LegacyPairing'): dbus.Boolean(False,
variant_level=1),
dbus.String(u'Alias'): dbus.String(u'08-3E-8E-2A-90-24',
variant_level=1),
dbus.String(u'Address'): dbus.String(u'08:3E:8E:2A:90:24',
variant_level=1),
dbus.String(u'RSSI'): dbus.Int16(-79, variant_level=1),
dbus.String(u'Class'): dbus.UInt32(0L, variant_level=1),
dbus.String(u'Trusted'): dbus.Boolean(False, variant_level=1)},
signature=dbus.Signature('sv'))
dbus.String(u'00:07:61:FC:0B:E8'): dbus.Dictionary(
{dbus.String(u'Name'):
dbus.String(u'Logitech Bluetooth Mouse M555b',
variant_level=1),
dbus.String(u'Paired'): dbus.Boolean(False, variant_level=1),
dbus.String(u'LegacyPairing'): dbus.Boolean(False,
variant_level=1),
dbus.String(u'Alias'):
dbus.String(u'Logitech Bluetooth Mouse M555b',
variant_level=1),
dbus.String(u'Address'): dbus.String(u'00:07:61:FC:0B:E8',
variant_level=1),
dbus.String(u'RSSI'): dbus.Int16(-56, variant_level=1),
dbus.String(u'Class'): dbus.UInt32(9600L, variant_level=1),
dbus.String(u'Trusted'): dbus.Boolean(False, variant_level=1),
dbus.String(u'Icon'): dbus.String(u'input-mouse',
variant_level=1)},
signature=dbus.Signature('sv'))}
Args:
adapter: The adapter interface to control.
timeout_secs: The duration of scan.
remove_before_scan: Remove devices under adapter before scanning.
Returns:
A dict containing the information of scanned devices. The dict maps
devices mac addresses to device properties.
"""
logging.info('Controlling adapter %s', adapter)
if remove_before_scan:
logging.info('Remove old devices before scanning...')
old_device_paths = self.GetAllDevicePaths(adapter)
self.RemoveDevices(adapter, old_device_paths)
# devices is a mapping from device path to device properties.
devices = dict()
self._SwitchAdapterPower(adapter, True)
logging.info('Powered on adapter')
def _ScanTimeout():
"""The callback when scan duration is over.
If adapter is still scanning, stop it and
quit self._main_loop.
Returns:
False since we want this to be called at most once.
"""
logging.info('Device scan timed out.')
adapter.StopDiscovery()
logging.info('Stop Discovery')
logging.info('Quit main loop without waiting for Discovery=1.')
self._main_loop.quit()
return False
def _CallbackInterfacesAdded(path, interfaces):
"""The callback when an interface is found.
When the adapter finds a device, it will assign the device a path and add
that device interface to dbus.
Reads the properties of device through interfaces and add the mapping
from device path to device properties into 'devices'.
Args:
path: The device path.
interfaces: The interface types.
"""
logging.info('InterfacesAdded')
if DEVICE_INTERFACE not in interfaces:
return
properties = interfaces[DEVICE_INTERFACE]
for key, value in properties.iteritems():
logging.debug('%s : %s', key, value)
if path in devices:
logging.info('replace old device properties with new device properties')
devices[path] = properties
address = (properties['Address'] if 'Address' in properties
else '<unknown>')
logging.info('Bluetooth Device Found: %s.', address)
if 'RSSI' in properties:
logging.info('Address: %s, RSSI: %s', address, properties['RSSI'])
# pylint: disable=W0613
def _CallbackDevicePropertiesChanged(interface, changed, invalidated, path):
"""The callback when device properties changed.
This is mainly for debug usage since device is scanned when callback for
InterfaceAdded is called.
Args:
interface: Interface name.
changed: A dict of changed properties.
invalidated: A list of properties changed but the value is
not provided.
path: The path of signal emitter.
"""
logging.debug('Device properties changed %s: %s at path %s',
interface, changed, path)
if interface != DEVICE_INTERFACE:
logging.error('should not get called with interface %s', interface)
return
address = (devices[path]['Address']
if path in devices and 'Address' in devices[path]
else '<unknown>')
if 'RSSI' in changed:
logging.info('Address: %s, new RSSI: %s', address, changed['RSSI'])
bus = dbus.SystemBus()
bus.add_signal_receiver(_CallbackInterfacesAdded,
dbus_interface='org.freedesktop.DBus.ObjectManager',
signal_name='InterfacesAdded')
bus.add_signal_receiver(_CallbackDevicePropertiesChanged,
dbus_interface='org.freedesktop.DBus.Properties',
signal_name='PropertiesChanged',
arg0=DEVICE_INTERFACE,
path_keyword="path")
adapter.StartDiscovery()
logging.info('Start discovery')
# Normally it takes less than a second to start discovery.
# Raises TimeoutError if it fails to start discovery within timeout.
self._WaitUntilStartDiscovery(adapter, 3)
logging.info('Device scan started.')
# Scan for timeout_secs
gobject.timeout_add(timeout_secs * 1000, _ScanTimeout)
self._main_loop.run()
bus.remove_signal_receiver(_CallbackInterfacesAdded,
dbus_interface='org.freedesktop.DBus.ObjectManager',
signal_name='InterfacesAdded')
bus.remove_signal_receiver(_CallbackDevicePropertiesChanged,
dbus_interface='org.freedesktop.DBus.Properties',
signal_name='PropertiesChanged',
arg0=DEVICE_INTERFACE,
path_keyword="path")
logging.info('Transform the key from path to address...')
devices_address_properties = dict(
((value['Address'], value) for value in devices.values()
if 'Address' in value))
return devices_address_properties
USAGE = """
Controls bluetooth adapter to scan remote devices.
"""
class BluetoothTest(object):
"""A class to test bluetooth in command line."""
args = None
manager = None
adapter = None
def Main(self):
self.ParseArgs()
self.Run()
def ParseArgs(self):
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=USAGE)
parser.add_argument(
'--properties', dest='properties', action='store_true',
help="Shows properties in the scanned results")
parser.add_argument(
'--forever', dest='forever', action='store_true',
help="Scans forever")
self.args = parser.parse_args()
logging.basicConfig(level=logging.INFO)
def Run(self):
"""Controls the adapter to scan remote devices."""
self.manager = BluetoothManager()
self.adapter = self.manager.GetFirstAdapter()
logging.info('Using adapter: %s', self.adapter)
if self.args.forever:
while True:
self._RunOnce()
else:
self._RunOnce()
def _RunOnce(self):
"""Scans once."""
result = self.manager.ScanDevices(self.adapter)
if self.args.properties:
logging.info(yaml.dump(result, default_flow_style=False))
if __name__ == '__main__':
BluetoothTest().Main()