| # Copyright 2022 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import asyncio |
| import enum |
| import logging |
| import random |
| |
| from avatar import PandoraDevice, PandoraDevices, asynchronous, parameterized |
| from mobly import base_test, test_runner |
| from mobly.asserts import assert_equal # type: ignore |
| from mobly.asserts import assert_false # type: ignore |
| from mobly.asserts import assert_true # type: ignore |
| from pandora.host_pb2 import PUBLIC, DataTypes |
| from typing import Any, Dict, Optional |
| |
| |
| class AdvertisingEventProperties(enum.IntEnum): |
| ADV_IND = 0x13 |
| ADV_DIRECT_IND = 0x15 |
| ADV_SCAN_IND = 0x12 |
| ADV_NONCONN_IND = 0x10 |
| |
| CONNECTABLE = 0x01 |
| SCANNABLE = 0x02 |
| DIRECTED = 0x04 |
| LEGACY = 0x10 |
| ANONYMOUS = 0x20 |
| |
| |
| class LeAdvertisingTest(base_test.BaseTestClass): # type: ignore[misc] |
| """Suite of tests designed to validate that Android correctly reports |
| all kinds of advertising events to the user application.""" |
| |
| devices: Optional[PandoraDevices] = None |
| dut: PandoraDevice |
| ref: PandoraDevice |
| |
| def setup_class(self) -> None: |
| self.devices = PandoraDevices(self) |
| dut, ref, *_ = self.devices |
| self.dut, self.ref = dut, ref |
| |
| def teardown_class(self) -> None: |
| if self.devices: |
| self.devices.stop_all() |
| |
| @asynchronous |
| async def setup_test(self) -> None: |
| await asyncio.gather(self.dut.reset(), self.ref.reset()) |
| |
| @parameterized( |
| (AdvertisingEventProperties.ADV_IND, 0), |
| (AdvertisingEventProperties.ADV_IND, 31), |
| (AdvertisingEventProperties.ADV_DIRECT_IND, 0), |
| (AdvertisingEventProperties.ADV_SCAN_IND, 0), |
| (AdvertisingEventProperties.ADV_SCAN_IND, 31), |
| (AdvertisingEventProperties.ADV_NONCONN_IND, 0), |
| (AdvertisingEventProperties.ADV_NONCONN_IND, 31), |
| ) # type: ignore[misc] |
| def test_legacy_advertising_parameters( |
| self, advertising_event_properties: AdvertisingEventProperties, advertising_data_length: int |
| ) -> None: |
| # Advertise from the Ref device with the specified legacy advertising |
| # event properties. Use the manufacturer specific data to pad the advertising data to the |
| # desired length. The scan response data must always be provided when |
| # scannable but it is defaulted. |
| connectable = (advertising_event_properties & AdvertisingEventProperties.CONNECTABLE) != 0 |
| scannable = (advertising_event_properties & AdvertisingEventProperties.SCANNABLE) != 0 |
| directed = (advertising_event_properties & AdvertisingEventProperties.DIRECTED) != 0 |
| |
| manufacturer_specific_data_length = max(0, advertising_data_length - 5) # Flags (3) + LV (2) |
| manufacturer_specific_data = bytes([random.randint(1, 255) for _ in range(manufacturer_specific_data_length)]) |
| advertising_data = ( |
| DataTypes(manufacturer_specific_data=manufacturer_specific_data) if advertising_data_length > 0 else None |
| ) |
| |
| scan_response_data = DataTypes() if scannable else None |
| target = self.dut.address if directed else None |
| |
| advertiser = self.ref.host.Advertise( |
| legacy=True, |
| connectable=connectable, |
| data=advertising_data, # type: ignore[arg-type] |
| scan_response_data=scan_response_data, # type: ignore[arg-type] |
| public=target, |
| own_address_type=PUBLIC, |
| ) |
| scanner = self.dut.host.Scan(legacy=False, passive=False) |
| |
| report = next((x for x in scanner if x.public == self.ref.address)) |
| |
| scanner.cancel() |
| advertiser.cancel() |
| |
| assert_true(report.legacy, msg='expected legacy advertising report') |
| assert_equal(report.connectable, connectable) |
| # TODO: scannable is not set by the android server |
| # assert_equal(report.scannable, scannable) |
| # TODO: direct_address is not set by the android server |
| assert_equal(report.data.manufacturer_specific_data, manufacturer_specific_data) |
| assert_false(report.truncated, msg='expected non-truncated advertising report') |
| |
| @parameterized( |
| (dict(incomplete_service_class_uuids16=["183A", "181F"]),), |
| # (dict(complete_service_class_uuids16=["183A", "181F"]),), |
| (dict(incomplete_service_class_uuids32=["FFFF183A", "FFFF181F"]),), |
| # (dict(complete_service_class_uuids32=["FFFF183A", "FFFF181F"]),), |
| (dict(incomplete_service_class_uuids128=["FFFF181F-FFFF-1000-8000-00805F9B34FB"]),), |
| # (dict(complete_service_class_uuids128=["FFFF183A-FFFF-1000-8000-00805F9B34FB"]),), |
| (dict(shortened_local_name="avatar"),), |
| (dict(complete_local_name="avatar_the_last_test_blender"),), |
| (dict(tx_power_level=20),), |
| (dict(class_of_device=0x40680),), |
| (dict(peripheral_connection_interval_min=0x0006, peripheral_connection_interval_max=0x0C80),), |
| (dict(service_solicitation_uuids16=["183A", "181F"]),), |
| (dict(service_solicitation_uuids32=["FFFF183A", "FFFF181F"]),), |
| (dict(service_solicitation_uuids128=["FFFF183A-FFFF-1000-8000-00805F9B34FB"]),), |
| (dict(service_data_uuid16={"183A": bytes([1, 2, 3, 4])}),), |
| (dict(service_data_uuid32={"FFFF183A": bytes([1, 2, 3, 4])}),), |
| (dict(service_data_uuid128={"FFFF181F-FFFF-1000-8000-00805F9B34FB": bytes([1, 2, 3, 4])}),), |
| # (dict(public_target_addresses=[bytes([1, 2, 3, 4, 5, 6]), |
| # bytes([6, 5, 2, 4, 3, 1])]),), |
| # (dict(random_target_addresses=[bytes([1, 2, 3, 4, 5, 6]), |
| # bytes([6, 5, 2, 4, 3, 1])]),), |
| (dict(appearance=0x0591),), |
| (dict(advertising_interval=0x1000),), |
| # (dict(advertising_interval=0x100000),), |
| (dict(uri="https://www.google.com"),), |
| (dict(le_supported_features=bytes([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x10, 0x9F])),), |
| (dict(manufacturer_specific_data=bytes([0, 1, 2, 3, 4])),), |
| # (dict(le_discoverability_mode=DISCOVERABLE_GENERAL),), |
| ) # type: ignore[misc] |
| def test_advertising_data_types(self, advertising_data: Dict[str, Any]) -> None: |
| # Advertise from the Ref device with the specified advertising data. |
| # Validate that the Ref generates the correct advertising data, |
| # and that the dut presents the correct advertising data in the scan |
| # result. |
| advertiser = self.ref.host.Advertise( |
| legacy=True, |
| connectable=True, |
| data=DataTypes(**advertising_data), |
| own_address_type=PUBLIC, |
| ) |
| scanner = self.dut.host.Scan(legacy=False, passive=False) |
| |
| report = next((x for x in scanner if x.public == self.ref.address)) |
| |
| scanner.cancel() |
| advertiser.cancel() |
| |
| assert_true(report.legacy, msg='expected legacy advertising report') |
| assert_equal(report.connectable, True) |
| for (key, value) in advertising_data.items(): # type: ignore [misc] |
| assert_equal(getattr(report.data, key), value) # type: ignore [misc] |
| assert_false(report.truncated, msg='expected non-truncated advertising report') |
| |
| |
| if __name__ == '__main__': |
| logging.basicConfig(level=logging.DEBUG) |
| test_runner.main() # type: ignore |