blob: fe4909e8745afcfb5fa4e6247c0111803314fce2 [file] [log] [blame]
# Lint as: python2, python3
# Copyright (c) 2020 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import array
import btsocket
import fcntl
import logging
import socket
import struct
# Constants from lib/mgmt.h in BlueZ source
MGMT_INDEX_NONE = 0xFFFF
MGMT_HDR_SIZE = 6
MGMT_STATUS_SUCCESS = 0x00
MGMT_STATUS_UNKNOWN_COMMAND = 0x01
MGMT_STATUS_NOT_CONNECTED = 0x02
MGMT_STATUS_FAILED = 0x03
MGMT_STATUS_CONNECT_FAILED = 0x04
MGMT_STATUS_AUTH_FAILED = 0x05
MGMT_STATUS_NOT_PAIRED = 0x06
MGMT_STATUS_NO_RESOURCES = 0x07
MGMT_STATUS_TIMEOUT = 0x08
MGMT_STATUS_ALREADY_CONNECTED = 0x09
MGMT_STATUS_BUSY = 0x0a
MGMT_STATUS_REJECTED = 0x0b
MGMT_STATUS_NOT_SUPPORTED = 0x0c
MGMT_STATUS_INVALID_PARAMS = 0x0d
MGMT_STATUS_DISCONNECTED = 0x0e
MGMT_STATUS_NOT_POWERED = 0x0f
MGMT_STATUS_CANCELLED = 0x10
MGMT_STATUS_INVALID_INDEX = 0x11
MGMT_STATUS_RFKILLED = 0x12
MGMT_OP_READ_VERSION = 0x0001
MGMT_OP_READ_COMMANDS = 0x0002
MGMT_OP_READ_INDEX_LIST = 0x0003
MGMT_OP_READ_INFO = 0x0004
MGMT_OP_SET_POWERED = 0x0005
MGMT_OP_SET_DISCOVERABLE = 0x0006
MGMT_OP_SET_CONNECTABLE = 0x0007
MGMT_OP_SET_FAST_CONNECTABLE = 0x0008
MGMT_OP_SET_PAIRABLE = 0x0009
MGMT_OP_SET_LINK_SECURITY = 0x000A
MGMT_OP_SET_SSP = 0x000B
MGMT_OP_SET_HS = 0x000C
MGMT_OP_SET_LE = 0x000D
MGMT_OP_SET_DEV_CLASS = 0x000E
MGMT_OP_SET_LOCAL_NAME = 0x000F
MGMT_OP_ADD_UUID = 0x0010
MGMT_OP_REMOVE_UUID = 0x0011
MGMT_OP_LOAD_LINK_KEYS = 0x0012
MGMT_OP_LOAD_LONG_TERM_KEYS = 0x0013
MGMT_OP_DISCONNECT = 0x0014
MGMT_OP_GET_CONNECTIONS = 0x0015
MGMT_OP_PIN_CODE_REPLY = 0x0016
MGMT_OP_PIN_CODE_NEG_REPLY = 0x0017
MGMT_OP_SET_IO_CAPABILITY = 0x0018
MGMT_OP_PAIR_DEVICE = 0x0019
MGMT_OP_CANCEL_PAIR_DEVICE = 0x001A
MGMT_OP_UNPAIR_DEVICE = 0x001B
MGMT_OP_USER_CONFIRM_REPLY = 0x001C
MGMT_OP_USER_CONFIRM_NEG_REPLY = 0x001D
MGMT_OP_USER_PASSKEY_REPLY = 0x001E
MGMT_OP_USER_PASSKEY_NEG_REPLY = 0x001F
MGMT_OP_READ_LOCAL_OOB_DATA = 0x0020
MGMT_OP_ADD_REMOTE_OOB_DATA = 0x0021
MGMT_OP_REMOVE_REMOTE_OOB_DATA = 0x0022
MGMT_OP_START_DISCOVERY = 0x0023
MGMT_OP_STOP_DISCOVERY = 0x0024
MGMT_OP_CONFIRM_NAME = 0x0025
MGMT_OP_BLOCK_DEVICE = 0x0026
MGMT_OP_UNBLOCK_DEVICE = 0x0027
MGMT_OP_SET_DEVICE_ID = 0x0028
MGMT_OP_SET_ADVERTISING = 0x0029
MGMT_OP_SET_BREDR = 0x002A
MGMT_OP_SET_STATIC_ADDRESS = 0x002B
MGMT_OP_SET_SCAN_PARAMS = 0x002C
MGMT_OP_SET_SECURE_CONN = 0x002D
MGMT_OP_SET_DEBUG_KEYS = 0x002E
MGMT_OP_SET_PRIVACY = 0x002F
MGMT_OP_LOAD_IRKS = 0x0030
MGMT_OP_GET_CONN_INFO = 0x0031
MGMT_OP_GET_CLOCK_INFO = 0x0032
MGMT_OP_ADD_DEVICE = 0x0033
MGMT_OP_REMOVE_DEVICE = 0x0034
MGMT_OP_LOAD_CONN_PARAM = 0x0035
MGMT_OP_READ_UNCONF_INDEX_LIST = 0x0036
MGMT_OP_READ_CONFIG_INFO = 0x0037
MGMT_OP_SET_EXTERNAL_CONFIG = 0x0038
MGMT_OP_SET_PUBLIC_ADDRESS = 0x0039
MGMT_EV_CMD_COMPLETE = 0x0001
MGMT_EV_CMD_STATUS = 0x0002
MGMT_EV_CONTROLLER_ERROR = 0x0003
MGMT_EV_INDEX_ADDED = 0x0004
MGMT_EV_INDEX_REMOVED = 0x0005
MGMT_EV_NEW_SETTINGS = 0x0006
MGMT_EV_CLASS_OF_DEV_CHANGED = 0x0007
MGMT_EV_LOCAL_NAME_CHANGED = 0x0008
MGMT_EV_NEW_LINK_KEY = 0x0009
MGMT_EV_NEW_LONG_TERM_KEY = 0x000A
MGMT_EV_DEVICE_CONNECTED = 0x000B
MGMT_EV_DEVICE_DISCONNECTED = 0x000C
MGMT_EV_CONNECT_FAILED = 0x000D
MGMT_EV_PIN_CODE_REQUEST = 0x000E
MGMT_EV_USER_CONFIRM_REQUEST = 0x000F
MGMT_EV_USER_PASSKEY_REQUEST = 0x0010
MGMT_EV_AUTH_FAILED = 0x0011
MGMT_EV_DEVICE_FOUND = 0x0012
MGMT_EV_DISCOVERING = 0x0013
MGMT_EV_DEVICE_BLOCKED = 0x0014
MGMT_EV_DEVICE_UNBLOCKED = 0x0015
MGMT_EV_DEVICE_UNPAIRED = 0x0016
MGMT_EV_PASSKEY_NOTIFY = 0x0017
MGMT_EV_NEW_IRK = 0x0018
MGMT_EV_NEW_CSRK = 0x0019
MGMT_EV_DEVICE_ADDED = 0x001a
MGMT_EV_DEVICE_REMOVED = 0x001b
MGMT_EV_NEW_CONN_PARAM = 0x001c
MGMT_EV_UNCONF_INDEX_ADDED = 0x001d
MGMT_EV_UNCONF_INDEX_REMOVED = 0x001e
MGMT_EV_NEW_CONFIG_OPTIONS = 0x001f
# Settings returned by MGMT_OP_READ_INFO
MGMT_SETTING_POWERED = 0x00000001
MGMT_SETTING_CONNECTABLE = 0x00000002
MGMT_SETTING_FAST_CONNECTABLE = 0x00000004
MGMT_SETTING_DISCOVERABLE = 0x00000008
MGMT_SETTING_PAIRABLE = 0x00000010
MGMT_SETTING_LINK_SECURITY = 0x00000020
MGMT_SETTING_SSP = 0x00000040
MGMT_SETTING_BREDR = 0x00000080
MGMT_SETTING_HS = 0x00000100
MGMT_SETTING_LE = 0x00000200
MGMT_SETTING_ADVERTISING = 0x00000400
MGMT_SETTING_SECURE_CONNECTIONS = 0x00000800
MGMT_SETTING_DEBUG_KEYS = 0x00001000
MGMT_SETTING_PRIVACY = 0x00002000
MGMT_SETTING_CONTROLLER_CONFIG = 0x00004000
# Options returned by MGMT_OP_READ_CONFIG_INFO
MGMT_OPTION_EXTERNAL_CONFIG = 0x00000001
MGMT_OPTION_PUBLIC_ADDRESS = 0x00000002
# Disconnect reason returned in MGMT_EV_DEVICE_DISCONNECTED
MGMT_DEV_DISCONN_UNKNOWN = 0x00
MGMT_DEV_DISCONN_TIMEOUT = 0x01
MGMT_DEV_DISCONN_LOCAL_HOST = 0x02
MGMT_DEV_DISCONN_REMOTE = 0x03
# Flags returned in MGMT_EV_DEVICE_FOUND
MGMT_DEV_FOUND_CONFIRM_NAME = 0x01
MGMT_DEV_FOUND_LEGACY_PAIRING = 0x02
# EIR Data field types
EIR_FLAGS = 0x01
EIR_UUID16_SOME = 0x02
EIR_UUID16_ALL = 0x03
EIR_UUID32_SOME = 0x04
EIR_UUID32_ALL = 0x05
EIR_UUID128_SOME = 0x06
EIR_UUID128_ALL = 0x07
EIR_NAME_SHORT = 0x08
EIR_NAME_COMPLETE = 0x09
EIR_TX_POWER = 0x0A
EIR_CLASS_OF_DEV = 0x0D
EIR_SSP_HASH = 0x0E
EIR_SSP_RANDOMIZER = 0x0F
EIR_DEVICE_ID = 0x10
EIR_GAP_APPEARANCE = 0x19
# Derived from lib/hci.h
HCIGETDEVLIST = 0x800448d2
HCIGETDEVINFO = 0x800448d3
HCI_UP = 1 << 0
HCI_INIT = 1 << 1
HCI_RUNNING = 1 << 2
HCI_PSCAN = 1 << 3
HCI_ISCAN = 1 << 4
HCI_AUTH = 1 << 5
HCI_ENCRYPT = 1 << 6
HCI_INQUIRY = 1 << 7
HCI_RAW = 1 << 8
def parse_eir(eirdata):
"""Parse Bluetooth Extended Inquiry Result (EIR) data structuree.
@param eirdata: Encoded eir data structure.
@return Dictionary equivalent to the expanded structure keyed by EIR_*
fields, with any data members parsed to useful formats.
"""
fields = {}
pos = 0
while pos < len(eirdata):
# Byte at the current position is the field length, which should be
# zero at the end of the structure.
(field_len,) = struct.unpack('B', memoryview(eirdata)[pos : pos + 1])
if field_len == 0:
break
# Next byte is the field type, and the rest of the field is the data.
# Note that the length field doesn't include itself so that's why the
# offsets and lengths look a little odd.
(field_type,) = struct.unpack('B',
memoryview(eirdata)[pos + 1 : pos + 2])
data = eirdata[pos+2:pos+field_len+1]
pos += field_len + 1
# Parse the individual fields to make the data meaningful.
if field_type == EIR_NAME_SHORT or field_type == EIR_NAME_COMPLETE:
data = data.rstrip('\0')
# Place in the dictionary keyed by type.
fields[field_type] = data
return fields
class BluetoothSocketError(Exception):
"""Error raised for general issues with BluetoothSocket."""
pass
class BluetoothInvalidPacketError(Exception):
"""Error raised when an invalid packet is received from the socket."""
pass
class BluetoothControllerError(Exception):
"""Error raised when the Controller Error event is received."""
pass
class BluetoothSocket(btsocket.socket):
"""Bluetooth Socket.
BluetoothSocket wraps the btsocket.socket() class, and thus the system
socket.socket() class, to implement the necessary send and receive methods
for the HCI Control and Monitor protocols (aka mgmt_ops) of the
Linux Kernel.
Instantiate either BluetoothControlSocket or BluetoothRawSocket rather
than this class directly.
See bluez/doc/mgmt_api.txt for details.
"""
def __init__(self):
super(BluetoothSocket, self).__init__(family=btsocket.AF_BLUETOOTH,
type=socket.SOCK_RAW,
proto=btsocket.BTPROTO_HCI)
self.events = []
def send_command(self, code, index, data=''):
"""Send a command to the socket.
To send a command, wait for the reply event, and parse it use
send_command_and_wait() instead.
@param code: Command Code.
@param index: Controller index, may be MGMT_INDEX_NONE.
@param data: Parameters as bytearray or str (optional).
"""
# Send the command to the kernel
msg = struct.pack('<HHH', code, index, len(data)) + data
length = self.send(msg)
if length != len(msg):
raise BluetoothSocketError('Short write on socket')
def recv_event(self):
"""Receive a single event from the socket.
The event data is not parsed; in the case of command complete events
this means it includes both the data for the event and the response
for the command.
Use settimeout() to set whether this method will block if there is no
data, return immediately or wait for a specific length of time before
timing out and raising TimeoutError.
@return tuple of (event, index, data)
"""
# Read the message from the socket
hdr = bytearray(MGMT_HDR_SIZE)
data = bytearray(512)
try:
(nbytes, ancdata, msg_flags, address) = self.recvmsg_into(
(hdr, data))
except btsocket.timeout as e:
raise BluetoothSocketError('Error receiving event: %s' % e)
if nbytes < MGMT_HDR_SIZE:
raise BluetoothInvalidPacketError('Packet shorter than header')
# Parse the header
(event, index, length) = struct.unpack_from('<HHH', memoryview(hdr))
if nbytes < MGMT_HDR_SIZE + length:
raise BluetoothInvalidPacketError('Packet shorter than length')
return (event, index, data[:length])
def send_command_and_wait(self, cmd_code, cmd_index, cmd_data=b'',
expected_length=None):
"""Send a command to the socket and wait for the reply.
Additional events are appended to the events list of the socket object.
@param cmd_code: Command Code.
@param cmd_index: Controller index, may be btsocket.HCI_DEV_NONE.
@param cmd_data: Parameters as bytearray or str.
@param expected_length: May be set to verify the length of the data.
Use settimeout() to set whether this method will block if there is no
reply, return immediately or wait for a specific length of time before
timing out and raising TimeoutError.
@return tuple of (status, data)
"""
self.send_command(cmd_code, cmd_index, cmd_data)
while True:
(event, index, data) = self.recv_event()
if event == MGMT_EV_CMD_COMPLETE:
if index != cmd_index:
raise BluetoothInvalidPacketError(
('Response for wrong controller index received: ' +
'0x%04d (expected 0x%04d)' % (index, cmd_index)))
if len(data) < 3:
raise BluetoothInvalidPacketError(
('Incorrect command complete event data length: ' +
'%d (expected at least 3)' % len(data)))
(code, status) = struct.unpack_from('<HB',
memoryview(data)[0 : 3])
logging.debug('[0x%04x] command 0x%04x complete: 0x%02x',
index, code, status)
if code != cmd_code:
raise BluetoothInvalidPacketError(
('Response for wrong command code received: ' +
'0x%04d (expected 0x%04d)' % (code, cmd_code)))
response_length = len(data) - 3
if (expected_length is not None and
response_length != expected_length):
raise BluetoothInvalidPacketError(
('Incorrect length of data for response: ' +
'%d (expected %d)' % (response_length,
expected_length)))
return (status, data[3:])
elif event == MGMT_EV_CMD_STATUS:
if index != cmd_index:
raise BluetoothInvalidPacketError(
('Response for wrong controller index received: ' +
'0x%04d (expected 0x%04d)' % (index, cmd_index)))
if len(data) != 3:
raise BluetoothInvalidPacketError(
('Incorrect command status event data length: ' +
'%d (expected 3)' % len(data)))
(code, status) = struct.unpack_from('<HB',
memoryview(data)[0 : 3])
logging.debug('[0x%04x] command 0x%02x status: 0x%02x',
index, code, status)
if code != cmd_code:
raise BluetoothInvalidPacketError(
('Response for wrong command code received: ' +
'0x%04d (expected 0x%04d)' % (code, cmd_code)))
return (status, None)
elif event == MGMT_EV_CONTROLLER_ERROR:
if len(data) != 1:
raise BluetoothInvalidPacketError(
('Incorrect controller error event data length: ' +
'%d (expected 1)' % len(data)))
(error_code) = struct.unpack_from('<B', memoryview(data)[0 : 1])
raise BluetoothControllerError('Controller error: %d' %
error_code)
else:
logging.debug('[0x%04x] event 0x%02x length: %d',
index, event, len(data))
self.events.append((event, index, data))
def wait_for_events(self, index, events):
"""Wait for and return the first of a set of events specified.
@param index: Controller index of event, may be btsocket.HCI_DEV_NONE.
@param events: List of event codes to wait for.
Use settimeout() to set whether this method will block if there is no
event received, return immediately or wait for a specific length of
time before timing out and raising TimeoutError.
@return Tuple of (event, data)
"""
while True:
for idx, (event, event_index, data) in enumerate(self.events):
if event_index == index and event in events:
self.events.pop(idx)
return (event, data)
(event, event_index, data) = self.recv_event()
if event_index == index and event in events:
return (event, data)
elif event == MGMT_EV_CMD_COMPLETE:
if len(data) < 3:
raise BluetoothInvalidPacketError(
('Incorrect command complete event data length: ' +
'%d (expected at least 3)' % len(data)))
(code, status) = struct.unpack_from('<HB',
memoryview(data)[0 : 3])
logging.debug('[0x%04x] command 0x%04x complete: 0x%02x '
'(Ignored)', index, code, status)
elif event == MGMT_EV_CMD_STATUS:
if len(data) != 3:
raise BluetoothInvalidPacketError(
('Incorrect command status event data length: ' +
'%d (expected 3)' % len(data)))
(code, status) = struct.unpack_from('<HB',
memoryview(data)[0 : 3])
logging.debug('[0x%04x] command 0x%02x status: 0x%02x '
'(Ignored)', index, code, status)
elif event == MGMT_EV_CONTROLLER_ERROR:
if len(data) != 1:
raise BluetoothInvalidPacketError(
('Incorrect controller error event data length: ' +
'%d (expected 1)' % len(data)))
(error_code) = struct.unpack_from('<B', memoryview(data)[0 : 1])
logging.debug('[0x%04x] controller error: %d (Ignored)',
index, error_code)
else:
self.events.append((event, index, data))
class BluetoothControlSocket(BluetoothSocket):
"""Bluetooth Control Socket.
BluetoothControlSocket provides convenient methods mapping to each mgmt_ops
command that send an appropriately formatted command and parse the response.
"""
DEFAULT_TIMEOUT = 15
def __init__(self):
super(BluetoothControlSocket, self).__init__()
self.bind((btsocket.HCI_DEV_NONE, btsocket.HCI_CHANNEL_CONTROL))
self.settimeout(self.DEFAULT_TIMEOUT)
# Certain features will depend on the management version and revision,
# so check those now.
(version, revision) = self.read_version()
logging.debug('MGMT API %d.%d', version, revision)
self._kernel_confirms_name = (
(version > 1) or ((version == 1) and (revision >= 5)))
def read_version(self):
"""Read the version of the management interface.
@return tuple (version, revision) on success, None on failure.
"""
(status, data) = self.send_command_and_wait(
MGMT_OP_READ_VERSION,
MGMT_INDEX_NONE,
expected_length=3)
if status != MGMT_STATUS_SUCCESS:
return None
(version, revision) = struct.unpack_from('<BH', memoryview(data))
return (version, revision)
def read_supported_commands(self):
"""Read the supported management commands and events.
@return tuple (commands, events) on success, None on failure.
"""
(status, data) = self.send_command_and_wait(
MGMT_OP_READ_COMMANDS,
MGMT_INDEX_NONE)
if status != MGMT_STATUS_SUCCESS:
return None
if len(data) < 4:
raise BluetoothInvalidPacketError(
('Incorrect length of data for response: ' +
'%d (expected at least 4)' % len(data)))
(ncommands, nevents) = struct.unpack_from('<HH',
memoryview(data)[0 : 4])
offset = 4
expected_length = offset + (ncommands * 2) + (nevents * 2)
if len(data) != expected_length:
raise BluetoothInvalidPacketError(
('Incorrect length of data for response: ' +
'%d (expected %d)' % (len(data), expected_length)))
commands = []
while len(commands) < ncommands:
commands.extend(struct.unpack_from(
'<H', memoryview(data)[offset : offset + 2]))
offset += 2
events = []
while len(events) < nevents:
events.extend(struct.unpack_from(
'<H', memoryview(data)[offset : offset + 2]))
offset += 2
return (commands, events)
def read_index_list(self):
"""Read the list of currently known controllers.
@return array of controller indexes on success, None on failure.
"""
(status, data) = self.send_command_and_wait(
MGMT_OP_READ_INDEX_LIST,
MGMT_INDEX_NONE)
if status != MGMT_STATUS_SUCCESS:
return None
if len(data) < 2:
raise BluetoothInvalidPacketError(
('Incorrect length of data for response: ' +
'%d (expected at least 2)' % len(data)))
(nindexes,) = struct.unpack_from('<H', memoryview(data)[0 : 2])
offset = 2
expected_length = offset + (nindexes * 2)
if len(data) != expected_length:
raise BluetoothInvalidPacketError(
('Incorrect length of data for response: ' +
'%d (expected %d)' % (len(data), expected_length)))
indexes = []
while len(indexes) < nindexes:
indexes.extend(struct.unpack_from(
'<H', memoryview(data)[offset : offset + 2]))
offset += 2
return indexes
def read_info(self, index):
"""Read the state and basic information of a controller.
Address is returned as a string in upper-case hex to match the
BlueZ property.
@param index: Controller index.
@return tuple (address, bluetooth_version, manufacturer,
supported_settings, current_settings,
class_of_device, name, short_name) on success,
None on failure.
"""
(status, data) = self.send_command_and_wait(
MGMT_OP_READ_INFO,
index,
expected_length=280)
if status != MGMT_STATUS_SUCCESS:
return None
(address, bluetooth_version, manufacturer,
supported_settings, current_settings,
class_of_device_lo, class_of_device_mid, class_of_device_hi,
name, short_name) = struct.unpack_from(
'<6sBHLL3B249s11s', memoryview(data))
return (
':'.join('%02X' % x
for x in reversed(struct.unpack('6B', address))),
bluetooth_version,
manufacturer,
supported_settings,
current_settings,
(class_of_device_lo |(class_of_device_mid << 8) |
(class_of_device_hi << 16)),
name.decode().rstrip('\0'),
short_name.decode().rstrip('\0'))
def set_powered(self, index, powered):
"""Set the powered state of a controller.
@param index: Controller index.
@param powered: Whether controller radio should be powered.
@return New controller settings on success, None on failure.
"""
msg_data = struct.pack('<B', bool(powered))
(status, data) = self.send_command_and_wait(
MGMT_OP_SET_POWERED,
index,
msg_data,
expected_length=4)
if status != MGMT_STATUS_SUCCESS:
return None
(current_settings, ) = struct.unpack_from('<L', memoryview(data))
return current_settings
def set_discoverable(self, index, discoverable, timeout=0):
"""Set the discoverable state of a controller.
@param index: Controller index.
@param discoverable: Whether controller should be discoverable.
@param timeout: Timeout in seconds before disabling discovery again,
ignored when discoverable is False, must not be zero when
discoverable is True.
@return New controller settings on success, 0 if the feature is not
supported and the parameter was False, None otherwise.
"""
msg_data = struct.pack('<BH', bool(discoverable), timeout)
(status, data) = self.send_command_and_wait(
MGMT_OP_SET_DISCOVERABLE,
index,
msg_data,
expected_length=4)
if status == MGMT_STATUS_NOT_SUPPORTED and not discoverable:
return 0
elif status != MGMT_STATUS_SUCCESS:
return None
(current_settings, ) = struct.unpack_from('<L', memoryview(data))
return current_settings
def set_connectable(self, index, connectable):
"""Set the connectable state of a controller.
@param index: Controller index.
@param connectable: Whether controller should be connectable.
@return New controller settings on success, 0 if the feature is not
supported and the parameter was False, None otherwise.
"""
msg_data = struct.pack('<B', bool(connectable))
(status, data) = self.send_command_and_wait(
MGMT_OP_SET_CONNECTABLE,
index,
msg_data,
expected_length=4)
if status == MGMT_STATUS_NOT_SUPPORTED and not connectable:
return 0
elif status != MGMT_STATUS_SUCCESS:
return None
(current_settings, ) = struct.unpack_from('<L', memoryview(data))
return current_settings
def set_fast_connectable(self, index, connectable):
"""Set the fast connectable state of a controller.
Fast Connectable is a state where page scan parameters are set to favor
faster connect times at the expense of higher power consumption.
Unlike most other set_* commands, this may only be used when the
controller is powered.
@param index: Controller index.
@param connectable: Whether controller should be fast connectable.
@return New controller settings on success, 0 if the feature is not
supported and the parameter was False or the controller is
powered down, None otherwise.
"""
msg_data = struct.pack('<B', bool(connectable))
(status, data) = self.send_command_and_wait(
MGMT_OP_SET_FAST_CONNECTABLE,
index,
msg_data)
if status == MGMT_STATUS_NOT_SUPPORTED and not connectable:
return 0
elif status != MGMT_STATUS_SUCCESS:
return None
# This is documented as returning current settings, but doesn't in
# our kernel version (probably a bug), so if no data is returned,
# pretend that was success.
if len(data) == 0:
return 0
elif len(data) != 4:
raise BluetoothInvalidPacketError(
('Incorrect length of data for response: ' +
'%d (expected 4)' % len(data)))
(current_settings, ) = struct.unpack_from('<L', memoryview(data))
return current_settings
def set_pairable(self, index, pairable):
"""Set the pairable state of a controller.
@param index: Controller index.
@param pairable: Whether controller should be pairable.
@return New controller settings on success, 0 if the feature is not
supported and the parameter was False, None otherwise.
"""
msg_data = struct.pack('<B', bool(pairable))
(status, data) = self.send_command_and_wait(
MGMT_OP_SET_PAIRABLE,
index,
msg_data,
expected_length=4)
if status != MGMT_STATUS_SUCCESS:
return None
(current_settings, ) = struct.unpack_from('<L', memoryview(data))
return current_settings
def set_link_security(self, index, link_security):
"""Set the link security state of a controller.
Toggles the use of link level security (aka Security Mode 3) for a
controller.
@param index: Controller index.
@param link_security: Whether controller should be link_security.
@return New controller settings on success, 0 if the feature is not
supported and the parameter was False, None otherwise.
"""
msg_data = struct.pack('<B', bool(link_security))
(status, data) = self.send_command_and_wait(
MGMT_OP_SET_LINK_SECURITY,
index,
msg_data,
expected_length=4)
if status == MGMT_STATUS_NOT_SUPPORTED and not link_security:
return 0
elif status != MGMT_STATUS_SUCCESS:
return None
(current_settings, ) = struct.unpack_from('<L', memoryview(data))
return current_settings
def set_ssp(self, index, ssp):
"""Set the whether a controller supports Simple Secure Pairing.
@param index: Controller index.
@param ssp: Whether controller should support SSP.
@return New controller settings on success, 0 if the feature is not
supported and the parameter was False, None otherwise.
"""
msg_data = struct.pack('<B', bool(ssp))
(status, data) = self.send_command_and_wait(
MGMT_OP_SET_SSP,
index,
msg_data,
expected_length=4)
if status == MGMT_STATUS_NOT_SUPPORTED and not ssp:
return 0
elif status != MGMT_STATUS_SUCCESS:
return None
(current_settings, ) = struct.unpack_from('<L', memoryview(data))
return current_settings
def set_hs(self, index, hs):
"""Set the whether a controller supports Bluetooth High Speed.
@param index: Controller index.
@param hs: Whether controller should support High Speed.
@return New controller settings on success, 0 if the feature is not
supported and the parameter was False, None otherwise.
"""
msg_data = struct.pack('<B', bool(hs))
(status, data) = self.send_command_and_wait(
MGMT_OP_SET_HS,
index,
msg_data,
expected_length=4)
if status == MGMT_STATUS_NOT_SUPPORTED and not hs:
return 0
elif status != MGMT_STATUS_SUCCESS:
return None
(current_settings, ) = struct.unpack_from('<L', memoryview(data))
return current_settings
def set_le(self, index, le):
"""Set the whether a controller supports Bluetooth Low Energy.
@param index: Controller index.
@param le: Whether controller should support Low Energy.
@return New controller settings on success, 0 if the feature is not
supported and the parameter was False, None otherwise.
"""
msg_data = struct.pack('<B', bool(le))
(status, data) = self.send_command_and_wait(
MGMT_OP_SET_LE,
index,
msg_data,
expected_length=4)
if status == MGMT_STATUS_NOT_SUPPORTED and not le:
return 0
elif status != MGMT_STATUS_SUCCESS:
return None
(current_settings, ) = struct.unpack_from('<L', memoryview(data))
return current_settings
def set_device_class(self, index, major, minor):
"""Set the device class of the controller.
Consult the Bluetooth Baseband Assigned Numbers specification for valid
values, in general both values are bit fields defined by that
specification.
If the device class is set while the controller is powered off, 0 will
be returned, but the new class will be set by the host subsystem after
the controller is powered on.
@param index: Controller index.
@param major: Major device class.
@param minor: Minor device class.
@return New three-octet device class on success, None on failure.
"""
msg_data = struct.pack('<BB', major, minor)
(status, data) = self.send_command_and_wait(
MGMT_OP_SET_DEV_CLASS,
index,
msg_data,
expected_length=3)
if status != MGMT_STATUS_SUCCESS:
return None
(class_of_device_lo, class_of_device_mid,
class_of_device_hi) = struct.unpack_from('<3B', memoryview(data))
return (class_of_device_lo |(class_of_device_mid << 8) |
(class_of_device_hi << 16))
def set_local_name(self, index, name, short_name):
"""Set the local name of the controller.
@param index: Controller index.
@param name: Full length name, up to 248 characters.
@param short_name: Short name, up to 10 characters.
@return Tuple of (name, short_name) on success, None on failure.
"""
# Truncate the provided parameters and then zero-pad using struct
# so we pass a fixed-length null-terminated string to the kernel.
msg_data = struct.pack('<249s11s', name[:248].encode(),
short_name[:10].encode())
(status, data) = self.send_command_and_wait(
MGMT_OP_SET_LOCAL_NAME,
index,
msg_data,
expected_length=260)
if status != MGMT_STATUS_SUCCESS:
return None
(name, short_name) = struct.unpack_from('<249s11s', memoryview(data))
return (name.decode().rstrip('\0'), short_name.decode().rstrip('\0'))
def start_discovery(self, index, address_type):
"""Start discovering remote devices.
Call get_discovered_devices() to retrieve the list of devices found.
@param index: Controller index.
@param address_type: Address types to discover.
@return Address types discovery was started for on success,
None on failure.
"""
msg_data = struct.pack('<B', address_type)
(status, data) = self.send_command_and_wait(
MGMT_OP_START_DISCOVERY,
index,
msg_data,
expected_length=1)
if status != MGMT_STATUS_SUCCESS:
return None
(address_type,) = struct.unpack_from('<B', memoryview(data))
return address_type
def stop_discovery(self, index, address_type):
"""Stop discovering remote devices.
There is usually no need to call this method explicitly as discovery
is automatically stopped once it has iterated through the necessary
channels.
@param index: Controller index.
@param address_type: Address types to stop discovering.
@return Address types discovery was stopped for on success,
None on failure.
"""
msg_data = struct.pack('<B', address_type)
(status, data) = self.send_command_and_wait(
MGMT_OP_STOP_DISCOVERY,
index,
msg_data,
expected_length=1)
if status != MGMT_STATUS_SUCCESS:
return None
(address_type,) = struct.unpack_from('<B', memoryview(data))
return address_type
def get_discovered_devices(self, index):
"""Return list of discovered remote devices.
This method may be called any time after start_discovery() and will
wait until the full list of devices has been returned, there is usually
no need to call stop_discovery() explicitly.
Use settimeout() to set whether this method will block if there are no
events, return immediately or wait for a specific length of time before
timing out and raising TimeoutError.
@param index: Controller index.
@return List of devices found as tuples with the format
(address, address_type, rssi, flags, eirdata)
"""
devices = []
discovering = True
while discovering:
(event, data) = self.wait_for_events(
index,
( MGMT_EV_DISCOVERING, MGMT_EV_DEVICE_FOUND ))
if event == MGMT_EV_DISCOVERING:
if len(data) != 2:
raise BluetoothInvalidPacketError(
('Incorrect discovering event data length: ' +
'%d (expected 2)' % len(data)))
(address_type,
discovering) = struct.unpack_from('<BB', memoryview(data))
elif event == MGMT_EV_DEVICE_FOUND:
if len(data) < 14:
raise BluetoothInvalidPacketError(
('Incorrect device found event data length: ' +
'%d (expected at least 14)' % len(data)))
(address, address_type, rssi,
flags, eir_len) = struct.unpack_from('<6sBbLH',
memoryview(data)[0 : 14])
if len(data) != 14 + eir_len:
raise BluetoothInvalidPacketError(
('Incorrect device found event data length: ' +
'%d (expected %d)' % (len(data), 14 + eir_len)))
devices.append((
':'.join('%02X' % x
for x in reversed(
struct.unpack('6B', address))),
address_type,
rssi,
flags,
bytes(data[14:])
))
# The kernel might want us to confirm whether or not we
# know the name of the device. We don't really care whether
# or not this works, we just have to do it to get the EIR
# Request.
if flags & MGMT_DEV_FOUND_CONFIRM_NAME:
msg_data = struct.pack('<6sBB',
address, address_type, False)
if self._kernel_confirms_name:
self.send_command_and_wait(
MGMT_OP_CONFIRM_NAME,
index,
msg_data)
else:
self.send_command(
MGMT_OP_CONFIRM_NAME,
index,
msg_data)
return devices
def set_advertising(self, index, advertising):
"""Set the whether a controller is advertising via LE.
@param index: Controller index.
@param advertising: Whether controller should advertise via LE.
@return New controller settings on success, 0 if the feature is not
supported and the parameter was False, None otherwise.
"""
msg_data = struct.pack('<B', bool(advertising))
(status, data) = self.send_command_and_wait(
MGMT_OP_SET_ADVERTISING,
index,
msg_data,
expected_length=4)
if status == MGMT_STATUS_NOT_SUPPORTED and not advertising:
return 0
elif status != MGMT_STATUS_SUCCESS:
return None
(current_settings, ) = struct.unpack_from('<L', memoryview(data))
return current_settings
def set_bredr(self, index, bredr):
"""Set the whether a controller supports Bluetooth BR/EDR (classic).
@param index: Controller index.
@param bredr: Whether controller should support BR/EDR.
@return New controller settings on success, 0 if the feature is not
supported and the parameter was False, None otherwise.
"""
msg_data = struct.pack('<B', bool(bredr))
(status, data) = self.send_command_and_wait(
MGMT_OP_SET_BREDR,
index,
msg_data,
expected_length=4)
if status == MGMT_STATUS_NOT_SUPPORTED and not bredr:
return 0
elif status != MGMT_STATUS_SUCCESS:
return None
(current_settings, ) = struct.unpack_from('<L', memoryview(data))
return current_settings
def set_secure_connections(self, index, secure_conn):
"""Set whether a controller supports secure connections.
@param index: Controller index.
@param secure_conn: Whether controller should support secure connections
@return New controller settings on success, 0 if the feature is not
supported and the parameter was False, None otherwise.
"""
msg_data = struct.pack('<B', bool(secure_conn))
(status, data) = self.send_command_and_wait(
MGMT_OP_SET_SECURE_CONN,
index,
msg_data,
expected_length=4)
if status == MGMT_STATUS_NOT_SUPPORTED and not secure_conn:
return 0
elif status != MGMT_STATUS_SUCCESS:
return None
(current_settings, ) = struct.unpack_from('<L', memoryview(data))
return current_settings
def add_device(self, index, address, address_type, action):
"""Add a device to the action list.
@param index: Controller index.
@param address: Address of the device to add.
@param address_type: Type of device in @address.
@param action: Action to take.
@return Tuple of ( address, address_type ) on success,
None on failure.
"""
msg_data = struct.pack('<6sBB', address, address_type, action)
(status, data) = self.send_command_and_wait(
MGMT_OP_ADD_DEVICE,
index,
msg_data,
expected_length=7)
if status != MGMT_STATUS_SUCCESS:
return None
(address, address_type,) = struct.unpack_from('<6sB', memoryview(data))
return (address, address_type)
def remove_device(self, index, address, address_type):
"""Remove a device from the action list.
@param index: Controller index.
@param address: Address of the device to remove.
@param address_type: Type of device in @address.
@return Tuple of ( address, address_type ) on success,
None on failure.
"""
msg_data = struct.pack('<6sB', address, address_type)
(status, data) = self.send_command_and_wait(
MGMT_OP_REMOVE_DEVICE,
index,
msg_data,
expected_length=7)
if status != MGMT_STATUS_SUCCESS:
return None
(address, address_type,) = struct.unpack_from('<6sB', memoryview(data))
return (address, address_type)
class BluetoothRawSocket(BluetoothSocket):
"""Bluetooth Raw HCI Socket.
BluetoothRawSocket is a subclass of BluetoothSocket representing raw access
to the HCI controller and providing commands corresponding to ioctls that
can be made on that kind of socket.
"""
def get_dev_info(self, index):
"""Read HCI device information.
This method uses the same underlying ioctl as the hciconfig tool.
Address is returned as a string in upper-case hex to match the
BlueZ property.
@param index: Device index.
@return tuple (index, name, address, flags, device_type, bus_type,
features, pkt_type, link_policy, link_mode,
acl_mtu, acl_pkts, sco_mtu, sco_pkts,
err_rx, err_tx, cmd_tx, evt_rx, acl_tx, acl_rx,
sco_tx, sco_rx, byte_rx, byte_tx) on success,
None on failure.
"""
buf = array.array('B', [0] * 96)
fcntl.ioctl(self.fileno(), HCIGETDEVINFO, buf, 1)
( dev_id, name, address, flags, dev_type, features, pkt_type,
link_policy, link_mode, acl_mtu, acl_pkts, sco_mtu, sco_pkts,
err_rx, err_tx, cmd_tx, evt_rx, acl_tx, acl_rx, sco_tx, sco_rx,
byte_rx, byte_tx ) = struct.unpack_from(
'@H8s6sIBQIIIHHHHIIIIIIIIII', buf)
return (
dev_id,
name.rstrip('\0'),
':'.join('%02X' % x
for x in reversed(struct.unpack('6B', address))),
flags,
(dev_type & 0x30) >> 4,
dev_type & 0x0f,
features,
pkt_type,
link_policy,
link_mode,
acl_mtu,
acl_pkts,
sco_mtu,
sco_pkts,
err_rx,
err_tx,
cmd_tx,
evt_rx,
acl_tx,
acl_rx,
sco_tx,
sco_rx,
byte_rx,
byte_tx)