blob: 40ab025d54e715b2e04c32d9a63eae1e775e3f85 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import binascii
import io
import logging
import os
import queue
from blueberry.facade import common_pb2 as common
from blueberry.tests.gd.cert.context import get_current_context
from blueberry.tests.gd.cert.truth import assertThat
from blueberry.tests.gd_sl4a.lib.ble_lib import disable_bluetooth
from blueberry.tests.gd_sl4a.lib.ble_lib import enable_bluetooth
from blueberry.tests.gd_sl4a.lib.bt_constants import ble_address_types
from blueberry.tests.sl4a_sl4a.lib import sl4a_sl4a_base_test
from blueberry.tests.sl4a_sl4a.lib.security import Security
from blueberry.utils.bt_gatt_constants import GattCallbackString
from blueberry.utils.bt_gatt_constants import GattTransport
class IrkRotationTest(sl4a_sl4a_base_test.Sl4aSl4aBaseTestClass):
def setup_class(self):
super().setup_class()
self.default_timeout = 10 # seconds
def setup_test(self):
assertThat(super().setup_test()).isTrue()
def teardown_test(self):
current_test_dir = get_current_context().get_full_output_path()
self.cert.adb.pull([
"/data/misc/bluetooth/logs/btsnoop_hci.log",
os.path.join(current_test_dir, "CERT_%s_btsnoop_hci.log" % self.cert.serial)
])
self.cert.adb.pull([
"/data/misc/bluetooth/logs/btsnoop_hci.log.last",
os.path.join(current_test_dir, "CERT_%s_btsnoop_hci.log.last" % self.cert.serial)
])
super().teardown_test()
self.cert.adb.shell("setprop bluetooth.core.gap.le.privacy.enabled \'\'")
def _wait_for_event(self, expected_event_name, device):
try:
event_info = device.ed.pop_event(expected_event_name, self.default_timeout)
logging.info(event_info)
except queue.Empty as error:
logging.error("Failed to find event: %s", expected_event_name)
return False
return True
def __get_cert_public_address_and_irk_from_bt_config(self):
# Pull IRK from SL4A cert side to pass in from SL4A DUT side when scanning
bt_config_file_path = os.path.join(get_current_context().get_full_output_path(),
"DUT_%s_bt_config.conf" % self.cert.serial)
try:
self.cert.adb.pull(["/data/misc/bluedroid/bt_config.conf", bt_config_file_path])
except AdbError as error:
logging.error("Failed to pull SL4A cert BT config")
return False
logging.debug("Reading SL4A cert BT config")
with io.open(bt_config_file_path) as f:
for line in f.readlines():
stripped_line = line.strip()
if (stripped_line.startswith("Address")):
address_fields = stripped_line.split(' ')
# API currently requires public address to be capitalized
address = address_fields[2].upper()
logging.debug("Found cert address: %s" % address)
continue
if (stripped_line.startswith("LE_LOCAL_KEY_IRK")):
irk_fields = stripped_line.split(' ')
irk = irk_fields[2]
logging.debug("Found cert IRK: %s" % irk)
continue
return address, irk
def __test_le_reconnect_after_irk_rotation_cert_privacy_enabled(self):
self._test_le_reconnect_after_irk_rotation(True)
def test_le_reconnect_after_irk_rotation_cert_privacy_disabled(self):
self.cert.sl4a.bluetoothDisableBLE()
disable_bluetooth(self.cert.sl4a, self.cert.ed)
self.cert.adb.shell("setprop bluetooth.core.gap.le.privacy.enabled false")
self.cert.adb.shell("device_config put bluetooth INIT_logging_debug_enabled_for_all true")
enable_bluetooth(self.cert.sl4a, self.cert.ed)
self.cert.sl4a.bluetoothDisableBLE()
self._test_le_reconnect_after_irk_rotation(False)
def _bond_remote_device(self, cert_privacy_enabled, cert_public_address):
if cert_privacy_enabled:
self.cert_advertiser_.advertise_public_extended_pdu()
else:
self.cert_advertiser_.advertise_public_extended_pdu(common.PUBLIC_DEVICE_ADDRESS)
advertising_device_name = self.cert_advertiser_.get_local_advertising_name()
connect_address = self.dut_scanner_.scan_for_name(advertising_device_name)
# Bond
logging.info("Bonding with %s", connect_address)
self.dut_security_.create_bond_numeric_comparison(connect_address)
self.dut_scanner_.stop_scanning()
self.cert_advertiser_.stop_advertising()
return connect_address
def _test_le_reconnect_after_irk_rotation(self, cert_privacy_enabled):
cert_public_address, irk = self.__get_cert_public_address_and_irk_from_bt_config()
self._bond_remote_device(cert_privacy_enabled, cert_public_address)
# Remove all bonded devices to rotate the IRK
logging.info("Unbonding all devices")
self.dut_security_.remove_all_bonded_devices()
self.cert_security_.remove_all_bonded_devices()
# Bond again
logging.info("Rebonding remote device")
connect_address = self._bond_remote_device(cert_privacy_enabled, cert_public_address)
# Connect GATT
logging.info("Connecting GATT to %s", connect_address)
gatt_callback = self.dut.sl4a.gattCreateGattCallback()
bluetooth_gatt = self.dut.sl4a.gattClientConnectGatt(gatt_callback, connect_address, False,
GattTransport.TRANSPORT_LE, False, None)
assertThat(bluetooth_gatt).isNotNone()
expected_event_name = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback)
assertThat(self._wait_for_event(expected_event_name, self.dut)).isTrue()
# Close GATT connection
logging.info("Closing GATT connection")
self.dut.sl4a.gattClientClose(bluetooth_gatt)
# Reconnect GATT
logging.info("Reconnecting GATT")
gatt_callback = self.dut.sl4a.gattCreateGattCallback()
bluetooth_gatt = self.dut.sl4a.gattClientConnectGatt(gatt_callback, connect_address, False,
GattTransport.TRANSPORT_LE, False, None)
assertThat(bluetooth_gatt).isNotNone()
expected_event_name = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback)
assertThat(self._wait_for_event(expected_event_name, self.dut)).isTrue()
# Disconnect GATT
logging.info("Disconnecting GATT")
self.dut.sl4a.gattClientDisconnect(gatt_callback)
expected_event_name = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback)
assertThat(self._wait_for_event(expected_event_name, self.dut)).isTrue()
# Reconnect GATT
logging.info("Reconnecting GATT")
self.dut.sl4a.gattClientReconnect(gatt_callback)
expected_event_name = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback)
assertThat(self._wait_for_event(expected_event_name, self.dut)).isTrue()