blob: 7d7e0ac450bc974ed730d52038bcaf34c4a4b275 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Lint as: python3
# pylint: disable=import-error,logging-not-lazy
# Copyright 2019 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This module implements gatt server functionality on bluez"""
from __future__ import print_function
import sys
import logging
import dbus
import dbus.mainloop.glib
try:
from gi.repository import GObject
except ImportError:
import gobject as GObject
from .bluez_le_hid_service import HIDApplication
from .bluez_le_better_together_service import PhoneLEBetterTogetherApplication
from .bluez_le_fast_pair_service import FastPairLEApplication
from .bluetooth_peripheral_kit import PeripheralKit
from .example_gatt_server import Application, BLUEZ_SERVICE_NAME, \
DBUS_OM_IFACE, GATT_MANAGER_IFACE
from .bluez_service_consts import PERIPHERAL_DEVICE_APPEARANCE, \
PERIPHERAL_DEVICE_NAME
from .hci_cmd import HciCommands, HciTool
mainloop = None
class GATTServer(object):
"""Bluez gatt server implementation"""
def __init__(self, device=None):
bus = dbus.SystemBus()
adapter = self._FindAdapter(bus)
if not adapter:
logging.error('GattManager1 interface not found')
return
service_manager = dbus.Interface(
bus.get_object(BLUEZ_SERVICE_NAME, adapter), GATT_MANAGER_IFACE)
self.device_type = 'generic' if not device else device.device_type
self.peripheral_type = None
if 'MOUSE' in self.device_type:
self.peripheral_type = PeripheralKit.MOUSE
elif 'KEYBOARD' in self.device_type:
self.peripheral_type = PeripheralKit.KEYBOARD
elif 'PHONE' in self.device_type:
self.peripheral_type = PeripheralKit.PHONE
elif 'FAST_PAIR' in self.device_type:
self.peripheral_type = PeripheralKit.FAST_PAIR
# Choose application using args
if 'MOUSE' in self.device_type or 'KEYBOARD' in self.device_type:
self.app = HIDApplication(bus, self.device_type)
logging.info('HID %s application starting...', self.device_type)
elif 'PHONE' in self.device_type:
self.app = PhoneLEBetterTogetherApplication(bus, self.device_type)
logging.info('%s LE application starting...', self.device_type)
elif 'FAST_PAIR' in self.device_type:
self.app = FastPairLEApplication(bus, device)
logging.info('Fast Pair application starting...')
else:
# Default to generic application provided in example
self.app = Application(bus)
logging.info('Generic application starting...')
# We register application with empty callbacks to run it async
service_manager.RegisterApplication(
self.app.GetPath(), {},
reply_handler=self._RegisterAppCB,
error_handler=self._RegisterAppErrCB)
# Start with discoverable enabled
self._discoverable = True
self._ConfigureAdvertisements()
# Add a handler for changes in state, specifically requesting caller's
# device_path in the callback
bus.add_signal_receiver(
self._PropertyChanged,
signal_name='PropertiesChanged',
bus_name='org.bluez',
path_keyword='device_path')
def _RegisterAppCB(self):
"""Callback for app registration"""
pass
def _RegisterAppErrCB(self, error):
"""Callback for app registration error"""
logging.error('Failed to register application: ' + str(error))
mainloop.quit()
def _FindAdapter(self, bus):
"""Finds adapter within dbus tree"""
remote_om = dbus.Interface(
bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
# TODO b:142131418 occasionally this stalls and times out, causing 25s delay
objects = remote_om.GetManagedObjects()
for o, props in list(objects.items()):
if GATT_MANAGER_IFACE in list(props.keys()):
return o
return None
def _FormatDataField(self, attr_id, data):
"""Puts each data block into the correct structure
Returns:
data in structure [attr len, attr id, attr data]
"""
formatted_data = [len(data) + 1, attr_id]
formatted_data.extend(data)
return formatted_data
def _ConfigureAdvertisements(self):
"""Configures and registers LE advertisement"""
FULL_LOCAL_NAME_ID = 0x09
FLAGS_ID = 0x01
APPEARANCE_ID = 0x19
ADV_SERVICE_ID = 0x03
ADV_SERVICE_DATA_ID = 0x16
# Assign local name based on peripheral type
local_name = PERIPHERAL_DEVICE_NAME[self.peripheral_type]
local_name_data = [ord(ch) for ch in local_name]
# LE only, general discoverable
flag_data = [0x06]
# Appearance - based on peripheral
appearance_data = PERIPHERAL_DEVICE_APPEARANCE[self.peripheral_type]
# Data structure is as follows
# total len | attr 1 len, attr 1 id, attr 1 data | attr2 ...
# Create data structure
data = []
if 'FAST_PAIR' in self.device_type:
data.extend(self._FormatDataField(FLAGS_ID, flag_data))
data.extend(
self._FormatDataField(ADV_SERVICE_DATA_ID,
[0x2c, 0xfe] + self.app.GetServiceData()))
else:
data.extend(self._FormatDataField(FULL_LOCAL_NAME_ID, local_name_data))
data.extend(self._FormatDataField(FLAGS_ID, flag_data))
data.extend(self._FormatDataField(APPEARANCE_ID, appearance_data))
data.extend(self._FormatDataField(ADV_SERVICE_ID, [0x12, 0x18]))
# Pad the packet with zeros to make bluez happy
data.extend([0] * (31 - len(data)))
# hci commands can be found in Version 5.1 | Vol 2, Part E Sec 7.8
# Advertisement structure
set_adv_data_cmd = HciCommands.le_set_advertising_data(
data_length=len(data), data=data)
# (TODO b/158217517) we have found that a race condition exists in two-peer
# tests where the adv and scan response of a peer is interrupted, which
# causes the dev info to be incorrect on the DUT. We make the scan response
# data identical to adv data as a work-around to ameliorate these failures
# while the underlying issue is fixed
# Also apply adv data to scan response
set_adv_resp_cmd = HciCommands.le_set_scan_response_data(
data_length=len(data), data=data)
# Stop advertising
stop_adv = HciCommands.le_set_advertising_enable(enable=False)
# Start advertising
start_adv = HciCommands.le_set_advertising_enable(enable=True)
# Advertise with low duty-cycle (30-35ms) to make discovery faster
rapid_adv = HciCommands.le_set_advertising_params(
interval_min=0x30, interval_max=0x38)
cmds = [stop_adv, set_adv_data_cmd, set_adv_resp_cmd, rapid_adv, start_adv
] if self._discoverable else [stop_adv]
# Add commands and run them via hcitool
tool = HciTool(sudo=True, wait_time=0.1)
for cmd in cmds:
tool.add_command(cmd)
tool.run_commands()
def SetDiscoverable(self, discoverable):
"""Change whether device is discoverable and configure advertisements.
Only configures advertisements if the discoverable value has changed.
Args:
discoverable: Whether the device should advertise.
"""
prev_discoverable = self._discoverable
self._discoverable = discoverable
if discoverable != prev_discoverable:
self._ConfigureAdvertisements()
def _PropertyChanged(self, *args, **kwargs):
"""Called when a property changes on the bluez d-bus interface
Useful for tracking the peer's connection and discovery status
Args:
args: list of form [caller, property_dict]
kwargs: dict containing keyword arguments requested in the
add_signal_receiver call i.e. device_path of calling object
"""
# Renaming to be more human readable while satisfying pylint
changed_prop = args
caller_info = kwargs
caller = str(changed_prop[0])
prop_dict = changed_prop[1]
if 'Device1' in caller:
if dbus.String('Connected') in prop_dict:
remote_addr = str(caller_info['device_path']).split('dev_')[-1]
conn_status = bool(prop_dict[dbus.String('Connected')])
info_msg = 'Connection change to {}: {}'.format(remote_addr,
conn_status)
logging.info(info_msg)
# Re-enable advertising on disconnect so server can continue
if not conn_status:
self._ConfigureAdvertisements()
if 'Adapter1' in caller:
if dbus.String('Powered') in prop_dict:
power_status = bool(prop_dict[dbus.String('Powered')])
# Re-enable advertising on power up so server can continue
if power_status:
self._ConfigureAdvertisements()
def main():
# Allow logging to work like print when server is run standalone
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
global mainloop # pylint: disable=global-statement
# Set up main loop for incoming activity on the bus
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
mainloop = GObject.MainLoop()
# Establish and run server with example HoG application
server = GATTServer('MOUSE') # pylint: disable=unused-variable
mainloop.run()
if __name__ == '__main__':
main()