| # Copyright 2013 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """A factory test for basic Wifi. |
| |
| Description |
| ----------- |
| This test checks if the signal strength of the antennas satisfy the input spec. |
| |
| This test can accept a list of wireless services but only the strongest one is |
| used as the test AP and the other APs are ignored. |
| |
| This test can test signal strength via ``iw dev {device} scan`` or radiotap. |
| |
| Be sure to set AP correctly. |
| 1. Select a fixed channel instead of auto. |
| 2. Disable the power control in AP. |
| 3. Make sure SSID of AP is unique. |
| |
| Test Procedure |
| -------------- |
| 1. Accepts a dict of antenna:strength and a list of (ssid, frequency). |
| 2. For each (antenna, AP=(ssid, frequency)), we test the signal strength of it. |
| 3. Chooses AP with maximum strength as the test AP. |
| 4. Checks if (antenna, test AP) is greater than the spec for all antennas. |
| |
| Dependency |
| ---------- |
| - `iw` utility |
| - `ifconfig` utility |
| - `ip` utility (radiotap) |
| - `tcpdump` utility (radiotap) |
| - ``iw phy {phy_name} set antenna 1 1`` (switch_antenna) |
| |
| Examples |
| -------- |
| To run this test on DUT, add a test item in the test list: |
| |
| .. test_list:: |
| |
| generic_wireless_examples:WirelessAntenna |
| |
| Set the 2nd element in a service if you only want to use a specific frequency: |
| |
| .. test_list:: |
| |
| generic_wireless_examples:WirelessAntennaSpecificFrequency |
| |
| """ |
| |
| import abc |
| import collections |
| import logging |
| import re |
| import struct |
| import subprocess |
| import sys |
| from typing import Dict, List, Literal, Optional, Set, Tuple |
| |
| from cros.factory.device import device_types |
| from cros.factory.device import device_utils |
| from cros.factory.device import wifi |
| from cros.factory.test import event_log # TODO(chuntsen): Deprecate event log. |
| from cros.factory.test.i18n import _ |
| from cros.factory.test import session |
| from cros.factory.test import test_case |
| from cros.factory.test import test_ui |
| from cros.factory.testlog import testlog |
| from cros.factory.utils.arg_utils import Arg |
| from cros.factory.utils import net_utils |
| from cros.factory.utils.schema import JSONSchemaDict |
| from cros.factory.utils import sync_utils |
| from cros.factory.utils import type_utils |
| |
| |
| _ARG_SERVICES_SCHEMA = JSONSchemaDict('services schema object', { |
| 'type': 'array', |
| 'items': { |
| 'type': 'array', |
| 'items': [ |
| {'type': 'string'}, |
| {'type': ['integer', 'null']}, |
| {'type': ['string', 'null']} |
| ], |
| 'minItems': 3, |
| 'maxItems': 3 |
| } |
| }) |
| _ARG_SWITCH_ANTENNA_CONFIG_SCHEMA = JSONSchemaDict( |
| 'switch antenna config schema object', |
| { |
| 'type': 'object', |
| 'properties': { |
| 'main': { |
| 'type': 'array', |
| 'items': {'type': 'integer'}, |
| 'minItems': 2, |
| 'maxItems': 2 |
| }, |
| 'aux': { |
| 'type': 'array', |
| 'items': {'type': 'integer'}, |
| 'minItems': 2, |
| 'maxItems': 2 |
| }, |
| 'all': { |
| 'type': 'array', |
| 'items': {'type': 'integer'}, |
| 'minItems': 2, |
| 'maxItems': 2 |
| }, |
| }, |
| 'additionalProperties': False, |
| 'required': ['main', 'aux', 'all'] |
| }) |
| _ARG_STRENGTH_SCHEMA = JSONSchemaDict('strength schema object', { |
| 'type': 'object', |
| 'additionalProperties': {'type': 'number'} |
| }) |
| |
| _DEFAULT_SWITCH_ANTENNA_CONFIG = {'main': [1, 1], |
| 'aux': [2, 2], |
| 'all': [3, 3]} |
| |
| Antenna = Literal['main', 'aux', 'all'] |
| |
| class SwitchAntennaWiFiChip(wifi.AbstractWiFiChip): |
| |
| # The scanned result with last_seen value greater than this value |
| # will be ignored. |
| _THRESHOLD_LAST_SEEN_MS = 1000 |
| |
| def __init__(self, device, interface, phy_name, services, |
| switch_antenna_config, switch_antenna_sleep_secs, scan_timeout): |
| super().__init__(device, interface, phy_name) |
| self._services = [(service.ssid, service.freq) for service in services] |
| self._switch_antenna_config = switch_antenna_config |
| # yapf: disable |
| self._signal_table = {antenna: {service: [] # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| for service in self._services} |
| for antenna in self._switch_antenna_config} |
| self._antenna = None |
| self._switch_antenna_sleep_secs = switch_antenna_sleep_secs |
| self._scan_timeout = scan_timeout |
| |
| def ScanSignal(self, service, antenna, scan_count): |
| service_index = (service.ssid, service.freq) |
| for unused_try_count in range(scan_count): |
| if len(self._signal_table[antenna][service_index]) >= scan_count: |
| break |
| self.SwitchAntenna(antenna) |
| scan_output = self._device.wifi.FilterAccessPoints( |
| interface=self._interface, frequency=service.freq, |
| scan_timeout=self._scan_timeout) |
| |
| # yapf: disable |
| same_freq_service = {s: [] # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| for s in self._services if s[1] == service.freq} |
| for ap in scan_output: |
| scanned_service = (ap.ssid, ap.frequency) |
| if (ap.last_seen <= self._THRESHOLD_LAST_SEEN_MS and |
| scanned_service in same_freq_service): |
| same_freq_service[scanned_service].append(ap) |
| |
| for scanned_service, duplicates in same_freq_service.items(): |
| if not duplicates: |
| session.console.warning( |
| 'Can not scan service %s %d.', |
| scanned_service[0], scanned_service[1]) |
| continue |
| if len(duplicates) > 1: |
| session.console.warning( |
| 'There are more than one result for service %s %d.', |
| scanned_service[0], scanned_service[1]) |
| for ap in duplicates: |
| session.console.warning( |
| 'mac: %s, ssid: %s, freq: %d, signal %f, last_seen %d ms', |
| ap.bssid, ap.ssid, ap.frequency, ap.strength, ap.last_seen) |
| # Use the one with strongest signal if duplicates. |
| ap = max(duplicates, key=lambda t: t.strength) |
| session.console.info( |
| 'scan : %s %s %d %f %d ms.', ap.ssid, ap.bssid, ap.frequency, |
| ap.strength, ap.last_seen) |
| self._signal_table[antenna][scanned_service].append(ap.strength) |
| |
| def GetAverageSignal(self, service, antenna): |
| """Get the average signal strength of (service, antenna).""" |
| result = self._signal_table[antenna][(service.ssid, service.freq)] |
| return sum(result) / len(result) if result else None |
| |
| def Destroy(self): |
| """Restores antenna config to 'all' if it is not 'all'.""" |
| if self._antenna == 'all': |
| logging.info('Already using antenna "all".') |
| else: |
| logging.info('Restore antenna.') |
| self.SwitchAntenna('all') |
| |
| def _GetConfiguredAntenna(self) -> Tuple[int, int]: |
| """Gets configured antenna. |
| |
| This method executes `iw` command to fetch configured antenna of the WiFi |
| chip. |
| |
| Returns: An integer tuple representing (TX, RX). |
| |
| Raises: |
| process_utils.CalledProcessError: When `iw` has non-zero exit code. |
| wifi.WiFiError: When failed to parse information from `iw` output. |
| """ |
| cmd = ['iw', 'phy', self._phy_name, 'info'] |
| output = self._device.CheckOutput(cmd) |
| pattern = r'Configured Antennas: TX (0x[0-9a-fA-F]+) RX (0x[0-9a-fA-F]+)' |
| matching = re.findall(pattern, output) |
| |
| # If `Configured Antennas` does not appear or appears more than once in the |
| # output then the output is unexpected. |
| if len(matching) != 1: |
| raise wifi.WiFiError('Failed to get configured antennas.') |
| |
| tx = int(matching[0][0], 16) |
| rx = int(matching[0][1], 16) |
| return tx, rx |
| |
| def SwitchAntenna(self, antenna, max_retries=10): |
| """Sets antenna using iw command. |
| |
| Args: |
| max_retries: The maximum retry time to set antenna. |
| switch_antenna_sleep_secs: The sleep time after switching antenna and |
| ifconfig up. |
| |
| Raises: |
| WiFiError if fail to set antenna for max_retries tries. |
| """ |
| if self._antenna == antenna: |
| return |
| |
| tx_bitmap, rx_bitmap = self._switch_antenna_config[antenna] |
| |
| @sync_utils.RetryDecorator(max_attempt_count=max_retries, interval_sec=0, |
| target_condition=bool, exceptions_to_catch=[]) |
| def _SwitchAntenna(): |
| self._device.wifi.BringsDownInterface(self._interface) |
| |
| process = self._device.Popen( |
| ['iw', 'phy', self._phy_name, 'set', 'antenna', str(tx_bitmap), |
| str(rx_bitmap)], |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE, log=True) |
| unused_stdout, stderr = process.communicate() |
| retcode = process.returncode |
| if retcode == 0: |
| # Some WiFi chips do not support switching to certain antenna but return |
| # zero on `iw` command. We have to explicitly check if antenna is |
| # switched. See b/273440001. |
| configured_tx, configured_rx = self._GetConfiguredAntenna() |
| if (configured_tx, configured_rx) == (tx_bitmap, rx_bitmap): |
| return True |
| |
| raise wifi.WiFiError( |
| 'Failed to switch antenna. ' |
| f'Expected TX {hex(tx_bitmap)} RX {hex(rx_bitmap)}, ' |
| f'but got TX {hex(configured_tx)} RX {hex(configured_rx)}.') |
| |
| # (-95) EOPNOTSUPP Operation not supported on transport endpoint |
| # Do ifconfig down again may solve this problem. |
| if retcode == 161: |
| logging.info('Retry...') |
| return False |
| |
| raise wifi.WiFiError( |
| f'Failed to set antenna. ret code: {int(retcode)}. stderr: {stderr}') |
| |
| try: |
| _SwitchAntenna() |
| except type_utils.MaxRetryError as e: |
| raise wifi.WiFiError( |
| f'Failed to set antenna for {max_retries} tries') from e |
| finally: |
| self._device.wifi.BringsUpInterface(self._interface, |
| self._switch_antenna_sleep_secs) |
| self._antenna = antenna |
| |
| |
| class DisableSwitchWiFiChip(SwitchAntennaWiFiChip): |
| |
| def SwitchAntenna(self, antenna, unused_max_retries=10): |
| if self._antenna == antenna: |
| return |
| logging.info('Switching antenna is disabled. Skipping setting antenna to' |
| ' %s. Just bring up the interface.', antenna) |
| # Bring up the interface because IwSetAntenna brings up interface after |
| # antenna is switched. |
| self._device.wifi.BringsUpInterface(self._interface, |
| self._switch_antenna_sleep_secs) |
| self._antenna = antenna |
| |
| |
| _RE_BEACON = re.compile(r'(\d+) MHz.*Beacon \((.+)\)') |
| |
| |
| class RadiotapPacket: |
| # yapf: disable |
| FIELD = collections.namedtuple('Field', ['name', 'struct', 'align']) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| ANTENNA_SIGNAL_FIELD = FIELD('Antenna Signal', struct.Struct('b'), 0) |
| ANTENNA_INDEX_FIELD = FIELD('Antenna Index', struct.Struct('B'), 0) |
| EXTENDED_BIT = 31 |
| FIELDS = [ |
| FIELD('TSFT', struct.Struct('Q'), 8), |
| FIELD('Flags', struct.Struct('B'), 0), |
| FIELD('Rate', struct.Struct('B'), 0), |
| FIELD('Channel', struct.Struct('HH'), 2), |
| FIELD('FHSS', struct.Struct('BB'), 0), |
| ANTENNA_SIGNAL_FIELD, |
| FIELD('Antenna Noise', struct.Struct('b'), 0), |
| FIELD('Lock Quality', struct.Struct('H'), 2), |
| FIELD('TX Attenuation', struct.Struct('H'), 2), |
| FIELD('dB TX Attenuation', struct.Struct('H'), 2), |
| FIELD('dBm TX Power', struct.Struct('b'), 1), |
| ANTENNA_INDEX_FIELD, |
| FIELD('dB Antenna Signal', struct.Struct('B'), 0), |
| FIELD('dB Antenna Noise', struct.Struct('B'), 0), |
| FIELD('RX Flags', struct.Struct('H'), 2), |
| FIELD('TX Flags', struct.Struct('H'), 2), |
| FIELD('RTS Retries', struct.Struct('B'), 0), |
| FIELD('Data Retries', struct.Struct('B'), 0), |
| None, |
| FIELD('MCS', struct.Struct('BBB'), 1), |
| FIELD('AMPDU status', struct.Struct('IHBB'), 4), |
| FIELD('VHT', struct.Struct('HBBBBBBBBH'), 2), |
| FIELD('Timestamp', struct.Struct('QHBB'), 8), |
| None, |
| None, |
| None, |
| None, |
| None, |
| None] |
| MAIN_HEADER_FORMAT = struct.Struct('BBhI') |
| # yapf: disable |
| PARSE_INFO = collections.namedtuple('AntennaData', ['header_size', # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| 'data_bytes', |
| 'antenna_offsets']) |
| |
| # This is a variable-length header, but this is what we want to see. |
| # It is formed by the format string 'BBhI' from `MAIN_HEADER_FORMAT` and 'II'. |
| EXPECTED_HEADER_FORMAT = struct.Struct('BBhIII') |
| |
| @classmethod |
| def Decode(cls, packet_bytes): |
| """Returns signal strength data for each antenna. |
| |
| Format is {all_signal, {antenna_index, antenna_signal}}. |
| """ |
| if len(packet_bytes) < RadiotapPacket.EXPECTED_HEADER_FORMAT.size: |
| return None |
| parts = RadiotapPacket.EXPECTED_HEADER_FORMAT.unpack_from(packet_bytes) |
| present0, present1, present2 = parts[3:] |
| parse_info = RadiotapPacket.ParseHeader([present0, present1, present2]) |
| required_bytes = parse_info.header_size + parse_info.data_bytes |
| if len(packet_bytes) < required_bytes: |
| return None |
| antenna_data = [] |
| for datum in filter(bool, parse_info.antenna_offsets): |
| signal = datum.get(RadiotapPacket.ANTENNA_SIGNAL_FIELD) |
| if RadiotapPacket.ANTENNA_SIGNAL_FIELD not in datum: |
| continue |
| signal_offset = datum[RadiotapPacket.ANTENNA_SIGNAL_FIELD] |
| signal, = RadiotapPacket.ANTENNA_SIGNAL_FIELD.struct.unpack_from( |
| packet_bytes[(signal_offset + parse_info.header_size):]) |
| if RadiotapPacket.ANTENNA_INDEX_FIELD in datum: |
| index_offset = datum[RadiotapPacket.ANTENNA_INDEX_FIELD] |
| index, = RadiotapPacket.ANTENNA_SIGNAL_FIELD.struct.unpack_from( |
| packet_bytes[(index_offset + parse_info.header_size):]) |
| antenna_data.append((index, signal)) |
| else: |
| antenna_data.append(signal) |
| return antenna_data |
| |
| @classmethod |
| def ParseHeader(cls, field_list): |
| """Returns packet information of the radiotap header should have.""" |
| header_size = RadiotapPacket.MAIN_HEADER_FORMAT.size |
| data_bytes = 0 |
| # yapf: disable |
| antenna_offsets = [] # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| |
| for bitmask in field_list: |
| antenna_offsets.append({}) |
| for bit, field in enumerate(RadiotapPacket.FIELDS): |
| if bitmask & (1 << bit): |
| if field is None: |
| session.console.warning( |
| 'Unknown field at bit %d is given in radiotap packet, the ' |
| 'result would probably be wrong...') |
| continue |
| if field.align and (data_bytes % field.align): |
| data_bytes += field.align - (data_bytes % field.align) |
| if field in (RadiotapPacket.ANTENNA_SIGNAL_FIELD, |
| RadiotapPacket.ANTENNA_INDEX_FIELD): |
| antenna_offsets[-1][field] = data_bytes |
| data_bytes += field.struct.size |
| |
| if not bitmask & (1 << RadiotapPacket.EXTENDED_BIT): |
| break |
| header_size += 4 |
| else: |
| raise NotImplementedError('Packet has too many extensions for me!') |
| |
| # Offset the antenna fields by the header size. |
| return RadiotapPacket.PARSE_INFO(header_size, data_bytes, antenna_offsets) |
| |
| |
| class Capture: |
| """Context for a live tcpdump packet capture for beacons.""" |
| |
| def __init__(self, dut: device_types.DeviceBoard, device_name, phy, |
| keep_monitor): |
| self.dut = dut |
| self.monitor_process = None |
| self.created_device = None |
| self.parent_device = device_name |
| self.phy = phy |
| self.keep_monitor = keep_monitor |
| |
| def CreateDevice(self, monitor_device='antmon0'): |
| """Creates a monitor device to monitor beacon.""" |
| # This command returns 0 if the monitor_device exists. |
| return_value = self.dut.Call(['iw', 'dev', monitor_device, 'info'], |
| log=True) |
| if return_value == 0: |
| self.created_device = monitor_device |
| if self.keep_monitor: |
| return |
| # The device may exist if the test is aborted after CreateDevice and |
| # before RemoveDevice. We remove it here to make sure we use a new |
| # monitor. |
| self.RemoveDevice() |
| |
| # This command creates the monitor_device. |
| self.dut.CheckCall(['iw', self.parent_device, 'interface', 'add', |
| monitor_device, 'type', 'monitor'], log=True) |
| self.created_device = monitor_device |
| |
| def RemoveDevice(self): |
| """Removes the monitor device.""" |
| if self.created_device is None: |
| return |
| if not self.keep_monitor: |
| # This command removes the monitor_device. |
| self.dut.CheckCall(['iw', self.created_device, 'del'], log=True) |
| self.created_device = None |
| |
| def GetSignal(self): |
| """Gets signal from tcpdump.""" |
| while True: |
| # yapf: disable |
| line = self.monitor_process.stdout.readline() # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| m = _RE_BEACON.search(line) |
| if m: |
| freq = int(m.group(1)) |
| ssid = m.group(2) |
| break |
| packet_bytes = b'' |
| while True: |
| # yapf: disable |
| line = self.monitor_process.stdout.readline() # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| if not line.startswith('\t0x'): |
| break |
| |
| # Break up lines of the form "\t0x0000: abcd ef" into a bytes |
| # b"\xab\xcd\xef". |
| parts = line[3:].split() |
| for part in parts[1:]: |
| packet_bytes += bytes((int(part[:2], 16),)) |
| if len(part) > 2: |
| packet_bytes += bytes((int(part[2:], 16),)) |
| packet = RadiotapPacket.Decode(packet_bytes) |
| if packet: |
| return {'ssid': ssid, 'freq': freq, 'signal': packet} |
| |
| def set_beacon_filter(self, value): |
| """Sets beacon filter. |
| |
| This function is currently only needed for Intel WiFi. |
| """ |
| path = ( |
| f'/sys/kernel/debug/ieee80211/{self.phy}/netdev:{self.parent_device}' |
| f'/iwlmvm/bf_params') |
| # yapf: disable |
| if self.dut.path.exists(path): # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| session.console.info('Setting beacon filter (enable=%d) for Intel WiFi', |
| value) |
| self.dut.WriteFile(path, f'bf_enable_beacon_filter={int(value)}\n') |
| |
| def Create(self): |
| if not self.created_device: |
| self.CreateDevice() |
| self.dut.CheckCall( |
| # yapf: disable |
| ['ip', 'link', 'set', self.created_device, 'up'], log=True) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| self.dut.CheckCall( |
| ['iw', self.parent_device, 'set', 'power_save', 'off'], log=True) |
| self.set_beacon_filter(0) |
| # yapf: disable |
| self.monitor_process = self.dut.Popen( # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| ['tcpdump', '-nUxxi', self.created_device, 'type', 'mgt', # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| 'subtype', 'beacon'], stdout=subprocess.PIPE, log=True) |
| |
| def Destroy(self): |
| if self.monitor_process: |
| self.monitor_process.kill() |
| self.monitor_process.wait() |
| self.monitor_process = None |
| self.set_beacon_filter(1) |
| self.RemoveDevice() |
| |
| |
| class AbstractNonSwitchableWiFiChip(wifi.AbstractWiFiChip): |
| """Abstract class of non-switchable WiFi chip.""" |
| |
| def __init__(self, device: device_types.DeviceBoard, interface: str, |
| phy_name: str, connect_timeout: int, scan_timeout: int): |
| super().__init__(device, interface, phy_name) |
| self._connect_timeout = connect_timeout |
| self._scan_timeout = scan_timeout |
| |
| # A tuple (SSID, frequency, antenna)-to-signal strengths mapping. |
| self._signal_mapping: Dict[Tuple[str, int, Antenna], |
| List[int]] = collections.defaultdict(list) |
| |
| self._ap: Optional[wifi.AccessPoint] = None |
| self._connection: Optional[wifi.Connection] = None |
| |
| def ScanSignal(self, service: wifi.ServiceSpec, antenna: Antenna, |
| scan_count: int) -> None: |
| """See wifi.AbstractWiFiChip.ScanSignal.""" |
| # yapf: disable |
| ssid = service.ssid # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| freq = service.freq # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| assert freq is not None |
| |
| record_count = len(self._signal_mapping[(ssid, freq, antenna)]) |
| if record_count >= scan_count: |
| return |
| |
| session.console.info(f'Switching to AP {ssid} {freq:d}...') |
| if not self._ConnectService(ssid=ssid, freq=freq, |
| # yapf: disable |
| password=service.password): # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| return |
| |
| self._MeasureSignalStrength(ssid, freq, scan_count - record_count) |
| assert len(self._signal_mapping[(ssid, freq, antenna)]) >= scan_count |
| |
| self._DisconnectService() |
| |
| @abc.abstractmethod |
| def _MeasureSignalStrength(self, ssid: str, freq: int, |
| measure_count: int) -> None: |
| """Measures signal strengths of an AP for each antenna. |
| |
| Args: |
| ssid: The AP SSID. |
| freq: The AP frequency. |
| measure_count: The count to measure the signal strength. |
| """ |
| raise NotImplementedError |
| |
| def GetAverageSignal(self, service: wifi.ServiceSpec, |
| antenna: Antenna) -> Optional[float]: |
| """See wifi.AbstractWiFiChip.GetAverageSignal.""" |
| # yapf: disable |
| assert service.freq is not None # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| result = self._signal_mapping[(service.ssid, service.freq, antenna)] # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| return sum(result) / len(result) if result else None |
| |
| def _ConnectService(self, ssid: str, freq: int, |
| password: Optional[str]) -> bool: |
| """Connects to a WiFi AP. |
| |
| Args: |
| ssid: The WiFi AP SSID. |
| freq: The WiFi AP frequency. |
| password: The WiFi AP password; or None if no password required. |
| |
| Returns: |
| True if successfully connected to the WiFi AP; otherwise False. |
| """ |
| try: |
| self._ap = self._device.wifi.FindAccessPoint( |
| ssid=ssid, interface=self._interface, frequency=freq, |
| scan_timeout=self._scan_timeout) |
| except wifi.WiFiError as e: |
| session.console.info(f'Unable to find the service {ssid}: {e!r}') |
| return False |
| |
| try: |
| self._connection = self._device.wifi.Connect( |
| self._ap, interface=self._interface, passkey=password, |
| connect_timeout=self._connect_timeout) |
| except type_utils.TimeoutError: |
| session.console.info(f'Unable to connect to the service {ssid}') |
| return False |
| |
| session.console.info('Successfully connected to service %s', ssid) |
| return True |
| |
| def _DisconnectService(self) -> None: |
| """Disconnects from the WiFi AP.""" |
| if self._connection: |
| assert self._ap is not None |
| |
| self._connection.Disconnect() |
| session.console.info('Disconnect to service %s', self._ap.ssid) |
| |
| self._ap = None |
| self._connection = None |
| |
| def Destroy(self): |
| """See wifi.AbstractWiFiChip.Destroy.""" |
| self._DisconnectService() |
| |
| |
| class RadiotapWiFiChip(AbstractNonSwitchableWiFiChip): |
| |
| def __init__(self, device: device_types.DeviceBoard, interface: str, |
| phy_name: str, connect_timeout: int, scan_timeout: int, |
| keep_monitor: bool): |
| super().__init__(device=device, interface=interface, phy_name=phy_name, |
| connect_timeout=connect_timeout, scan_timeout=scan_timeout) |
| self._keep_monitor = keep_monitor |
| |
| def _MeasureSignalStrength(self, ssid: str, freq: int, |
| measure_count: int) -> None: |
| """See NonSwitchableWiFiChip._MeasureSignalStrength""" |
| capture = Capture(self._device, self._interface, self._phy_name, |
| self._keep_monitor) |
| capture_times = 0 |
| try: |
| capture.Create() |
| while capture_times < measure_count: |
| signal_result = capture.GetSignal() |
| if signal_result['ssid'] == ssid and signal_result['freq'] == freq: |
| session.console.info('%s', signal_result) |
| signal = signal_result['signal'] |
| self._signal_mapping[(ssid, freq, 'all')].append(signal[0]) |
| self._signal_mapping[(ssid, freq, 'main')].append(signal[1][1]) |
| self._signal_mapping[(ssid, freq, 'aux')].append(signal[2][1]) |
| capture_times += 1 |
| else: |
| session.console.info('Ignore the signal %r', signal_result) |
| finally: |
| capture.Destroy() |
| |
| |
| class StationDumpWiFiChip(AbstractNonSwitchableWiFiChip): |
| """WiFi chip type which measures signal strengths by running `iw` command. |
| |
| This type of WiFi chip measures signal strengths by the following steps: |
| |
| 1. Connect to the target AP. |
| 2. Run `iw <interface> station dump` command. |
| 3. Parse the output from the above command, which includes signal strength |
| information of antennas of "main", "aux", and "all". |
| """ |
| |
| def _MeasureSignalStrength(self, ssid: str, freq: int, |
| measure_count: int) -> None: |
| """See NonSwitchableWiFiChip._MeasureSignalStrength.""" |
| |
| iw_command = ['iw', self._interface, 'station', 'dump'] |
| for _ in range(measure_count): |
| try: |
| output = self._device.CheckOutput(iw_command) |
| except device_types.CalledProcessError as e: |
| raise wifi.WiFiError( |
| f'Failed to measure signal strength: {e!r}') from None |
| |
| try: |
| connection_status = net_utils.ParseWirelessInterfaceStationDumpOutput( |
| output) |
| except ValueError as e: |
| raise wifi.WiFiError(str(e)) from e |
| |
| assert connection_status.signal |
| assert len(connection_status.signal.antenna) == 2 |
| all_strength = connection_status.signal.computed |
| self._signal_mapping[(ssid, freq, 'all')].append(all_strength) |
| main_strength, aux_strength = connection_status.signal.antenna |
| self._signal_mapping[(ssid, freq, 'main')].append(main_strength) |
| self._signal_mapping[(ssid, freq, 'aux')].append(aux_strength) |
| |
| |
| class WirelessTest(test_case.TestCase): |
| """Basic wireless test class. |
| |
| Properties: |
| _phy_name: wireless phy name to test. |
| """ |
| related_components = (test_case.TestCategory.WIFI, ) |
| ARGS = [ |
| Arg('device_name', str, |
| ('Wireless device name to test. e.g. wlan0. If not specified, it will' |
| 'fail if multiple devices are found, otherwise use the only one ' |
| 'device it found.'), default=None), |
| Arg('services', list, ( |
| 'A list of ``[<service_ssid>:str, <freq>:int|None, ' |
| '<password>:str|None]`` sequences like ``[[SSID1, FREQ1, PASS1], ' |
| '[SSID2, FREQ2, PASS2], ...]``. Each sequence should contain ' |
| 'exactly 3 items. If ``<freq>`` is ``None`` the test ' |
| 'will detect the frequency by ``iw <device_name> scan`` command ' |
| 'automatically. ``<password>=None`` implies the service can connect ' |
| 'without a password.'), schema=_ARG_SERVICES_SCHEMA), |
| Arg('ignore_missing_services', bool, |
| ('Ignore services that are not found during scanning. This argument ' |
| 'is not needed for switch antenna wifi chip'), default=False), |
| Arg('scan_timeout', int, 'Timeout for scanning the services.', |
| default=20), |
| Arg('connect_timeout', int, 'Timeout for connecting to the service.', |
| default=10), |
| Arg('strength', dict, |
| ('A dict of minimal signal strengths. For example, a dict like ' |
| '``{"main": strength_1, "aux": strength_2, "all": strength_all}``. ' |
| 'The test will check signal strength according to the different ' |
| 'antenna configurations in this dict.'), |
| schema=_ARG_STRENGTH_SCHEMA), |
| Arg('scan_count', int, 'Number of scans to get average signal strength.', |
| default=5), |
| Arg('switch_antenna_config', dict, |
| ('A dict of ``{"main": (tx, rx), "aux": (tx, rx), "all": (tx, rx)}`` ' |
| 'for the config when switching the antenna.'), |
| default=_DEFAULT_SWITCH_ANTENNA_CONFIG, |
| schema=_ARG_SWITCH_ANTENNA_CONFIG_SCHEMA), |
| Arg('switch_antenna_sleep_secs', int, |
| ('The sleep time after switching antenna and ifconfig up. Need to ' |
| 'decide this value carefully since it depends on the platform and ' |
| 'antenna config to test.'), default=10), |
| Arg('press_space_to_start', bool, 'Press space to start the test.', |
| default=True), |
| Arg('wifi_chip_type', str, |
| ('The type of wifi chip. Indicates how the chip test the signal ' |
| 'strength of different antennas. Currently, the valid options are ' |
| '``switch_antenna``, ``radiotap``, ``station_dump``, or ' |
| '``disable_switch``. If the value is None, it will detect the value ' |
| 'automatically. Note that Qualcomm Atheros chip does not support ' |
| 'auto-detection.'), default=None), |
| Arg('keep_monitor', bool, |
| ('Set to True for WiFi driver that does not support ' |
| '``iw dev antmon0 del``.'), default=False), |
| ] |
| |
| def setUp(self): |
| # yapf: disable |
| self.ui.ToggleTemplateClass('font-large', True) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| |
| self._dut = device_utils.CreateDUTInterface() |
| self._device_name = None |
| self._phy_name = None |
| self._services = [wifi.ServiceSpec(ssid, freq, password) |
| # yapf: disable |
| for ssid, freq, password in self.args.services] # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| self.assertTrue(self._services, 'At least one service should be specified.') |
| self._wifi_chip_type = None |
| self._wifi_chip = None |
| |
| # yapf: disable |
| if (self.args.wifi_chip_type == 'disable_switch' and # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| list(self.args.strength) != ['all']): # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| self.FailTask(f'Switching antenna is disabled but antenna configs are ' # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| f'{list(self.args.strength)}') |
| |
| # Group checker for Testlog. |
| self._service_group_checker = testlog.GroupParam( |
| 'service_signal', ['service', 'service_strength']) |
| testlog.UpdateParam('service', param_type=testlog.ParamType.argument) |
| self._antenna_group_checker = testlog.GroupParam( |
| 'antenna', ['antenna', 'freq', 'strength']) |
| testlog.UpdateParam('antenna', param_type=testlog.ParamType.argument) |
| testlog.UpdateParam('freq', param_type=testlog.ParamType.argument) |
| |
| def tearDown(self): |
| """Restores wifi states.""" |
| if self._wifi_chip: |
| self._wifi_chip.Destroy() |
| |
| def _ChooseMaxStrengthService(self): |
| """Chooses the service that has the largest signal strength among services. |
| |
| Returns: |
| The service that has the largest signal strength among services. |
| """ |
| max_strength_service, max_strength = None, -sys.float_info.max |
| for service in self._services: |
| # yapf: disable |
| strength = self._wifi_chip.GetAverageSignal(service, 'all') # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| if strength: |
| session.console.info('Service %s signal strength %f.', service, |
| strength) |
| # yapf: disable |
| event_log.Log('service_signal', service=service.ssid, strength=strength) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| with self._service_group_checker: |
| # yapf: disable |
| testlog.LogParam('service', service.ssid) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| testlog.LogParam('service_strength', strength) |
| if strength > max_strength: |
| max_strength_service, max_strength = service, strength |
| else: |
| session.console.info('Service %s has no valid signal strength.', |
| service) |
| |
| if max_strength_service: |
| session.console.info('Service %s has the highest signal strength %f ' |
| 'among %s.', max_strength_service, max_strength, |
| self._services) |
| return max_strength_service |
| session.console.warning('Services %s are not valid.', self._services) |
| return None |
| |
| def _ScanSignals(self, services, antenna): |
| """Scans and averages signal strengths for services. |
| |
| Args: |
| services: A list of (service_ssid, freq) tuples to scan. |
| antenna: The antenna config to scan. |
| """ |
| for service in services: |
| # yapf: disable |
| self.ui.SetState( # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| _('Scanning on device {device} frequency {freq}...', |
| device=self._device_name, |
| freq=service.freq)) |
| |
| # yapf: disable |
| self._wifi_chip.ScanSignal(service, antenna, self.args.scan_count) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| |
| # yapf: disable |
| self.ui.SetState( # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| _('Done scanning on device {device} frequency {freq}...', |
| device=self._device_name, |
| freq=service.freq)) |
| |
| def _CheckSpec(self, service, spec_antenna_strength, antenna): |
| """Checks if the scan result of antenna config can meet test spec. |
| |
| Args: |
| service: (service_ssid, freq, password) tuple. |
| spec_antenna_strength: A dict of minimal signal strengths. |
| antenna: The antenna config to check. |
| """ |
| session.console.info('Checking antenna %s spec', antenna) |
| # yapf: disable |
| scanned_strength = self._wifi_chip.GetAverageSignal(service, antenna) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| spec_strength = spec_antenna_strength[antenna] |
| if not scanned_strength: |
| self.FailTask( |
| f'Antenna {antenna}, service: {service}: Can not scan signal ' |
| f'strength.') |
| |
| event_log.Log(f'antenna_{antenna}', freq=service.freq, |
| rssi=scanned_strength, |
| meet=(scanned_strength and scanned_strength >= spec_strength)) |
| with self._antenna_group_checker: |
| testlog.LogParam('antenna', antenna) |
| testlog.LogParam('freq', service.freq) |
| result = testlog.CheckNumericParam('strength', scanned_strength, |
| min=spec_strength) |
| if not result: |
| self.FailTask( |
| f'Antenna {antenna}, service: {service}: The scanned strength ' |
| f'{scanned_strength:f} < spec strength {spec_strength:f}') |
| else: |
| session.console.info( |
| 'Antenna %s, service: %s: The scanned strength %f >= spec strength' |
| ' %f', antenna, service, scanned_strength, spec_strength) |
| |
| def _DetectWiFiChipType(self): |
| # yapf: disable |
| self.ui.SetState(_('Detecting wifi chip type...')) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| |
| # yapf: disable |
| self._wifi_chip_type = self.args.wifi_chip_type # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| if not self._wifi_chip_type or self._wifi_chip_type == 'switch_antenna': |
| self._wifi_chip = SwitchAntennaWiFiChip( |
| self._dut, self._device_name, self._phy_name, self._services, |
| # yapf: disable |
| self.args.switch_antenna_config, self.args.switch_antenna_sleep_secs, # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| self.args.scan_timeout) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| if self._wifi_chip_type: |
| return |
| # If wifi_chip_type is not specified and the device is able to switch |
| # antenna then we assume the chip type is switch_antenna. |
| last_success_antenna = None |
| # yapf: disable |
| for antenna in self.args.strength: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| try: |
| self._wifi_chip.SwitchAntenna(antenna) |
| last_success_antenna = antenna |
| except wifi.WiFiError as e: |
| session.console.info('Unable to switch antenna to %s. %r', antenna, e) |
| break |
| else: |
| # All antennas are switchable. |
| self._wifi_chip_type = 'switch_antenna' |
| return |
| if last_success_antenna: |
| # Switch back to antenna all. |
| try: |
| self._wifi_chip.SwitchAntenna('all') |
| except wifi.WiFiError: |
| session.console.info( |
| 'Unable to switch antenna to all after switch to %s.', |
| last_success_antenna) |
| raise |
| |
| if not self._wifi_chip_type or self._wifi_chip_type == 'radiotap': |
| self._wifi_chip = RadiotapWiFiChip( |
| # yapf: disable |
| device=self._dut, interface=self._device_name, # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| phy_name=self._phy_name, connect_timeout=self.args.connect_timeout, # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| scan_timeout=self.args.scan_timeout, # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| keep_monitor=self.args.keep_monitor) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| self._wifi_chip_type = 'radiotap' |
| return |
| |
| if self._wifi_chip_type == 'disable_switch': |
| self._wifi_chip = DisableSwitchWiFiChip( |
| self._dut, self._device_name, self._phy_name, self._services, |
| # yapf: disable |
| self.args.switch_antenna_config, self.args.switch_antenna_sleep_secs, # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| self.args.scan_timeout) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| return |
| |
| if self._wifi_chip_type == 'station_dump': |
| self._wifi_chip = StationDumpWiFiChip( |
| # yapf: disable |
| device=self._dut, interface=self._device_name, # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| phy_name=self._phy_name, connect_timeout=self.args.connect_timeout, # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| scan_timeout=self.args.scan_timeout) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| return |
| |
| raise ValueError(f'Wifi chip type {self._wifi_chip_type} is not supported.') |
| |
| def _ScanAllServices(self) -> None: |
| # yapf: disable |
| self.ui.SetState(_('Checking frequencies...')) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| |
| scan_result = self._dut.wifi.FilterAccessPoints( |
| # yapf: disable |
| interface=self._device_name, scan_timeout=self.args.scan_timeout) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| ssid_freqs = {service.ssid: set() for service in self._services} # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| |
| for scanned_service in scan_result: |
| if scanned_service.ssid in ssid_freqs: |
| ssid_freqs[scanned_service.ssid].add(scanned_service.frequency) |
| |
| resolved_service_specs: Set[wifi.ServiceSpec] = set() |
| for service in self._services: |
| # yapf: disable |
| if not ssid_freqs[service.ssid]: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| error_message = f'The service {service.ssid} is not found.' # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| if self.args.ignore_missing_services: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| logging.info('%s Ignore this service and continue the test.', |
| error_message) |
| continue |
| self.FailTask(error_message) |
| # yapf: disable |
| elif service.freq is None: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| for freq in ssid_freqs[service.ssid]: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| resolved_service_specs.add( |
| # yapf: disable |
| wifi.ServiceSpec(service.ssid, freq, service.password)) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| elif service.freq not in ssid_freqs[service.ssid]: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| error_message = ( |
| # yapf: disable |
| f'Frequency {service.freq} is not supported by the service ' # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| f'{service.ssid}. Available frequencies are ' |
| f'{ssid_freqs[service.ssid]!r}.') |
| # yapf: disable |
| if self.args.ignore_missing_services: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| logging.info('%s Ignore this service and continue the test.', |
| error_message) |
| continue |
| self.FailTask(error_message) |
| else: |
| resolved_service_specs.add(service) |
| self._services = list(resolved_service_specs) |
| |
| def _TrySetRegionUSFor6G(self): |
| """Set region for testing 6G in the factory.""" |
| LOWEST_6G_FREQ = 5955 |
| HIGHEST_6G_FREQ = 7115 |
| # yapf: disable |
| has_6G = any(LOWEST_6G_FREQ <= service.freq <= HIGHEST_6G_FREQ # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| for service in self._services |
| # yapf: disable |
| if service.freq is not None) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| if not has_6G: |
| return |
| # factory_iw is the binary which sets region. |
| factory_iw = 'factory_iw' |
| if self._dut.Call(['which', factory_iw], log=True) == 0: |
| self._dut.wifi.BringsUpInterface(self._device_name) |
| # Set region to US. |
| self._dut.CheckCall( |
| [factory_iw, self._device_name, 'iwl', 'country', 'US'], log=True) |
| self.Sleep(5) |
| else: |
| session.console.info('%r is not installed. Cannot set region.', |
| factory_iw) |
| |
| def runTest(self): |
| # yapf: disable |
| self._device_name = self._dut.wifi.SelectInterface(self.args.device_name) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| session.console.info('Selected device_name is %s.', self._device_name) |
| |
| self._phy_name = self._dut.wifi.DetectPhyName(self._device_name) |
| session.console.info('phy name is %s.', self._phy_name) |
| |
| # yapf: disable |
| if self.args.press_space_to_start: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # Prompts a message to tell operator to press space key when ready. |
| # yapf: disable |
| self.ui.SetState(_('Press space to start scanning.')) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| self.ui.WaitKeysOnce(test_ui.SPACE_KEY) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| |
| self._TrySetRegionUSFor6G() |
| |
| self._ScanAllServices() |
| |
| self._DetectWiFiChipType() |
| session.console.info('Wifi chip type is %s.', self._wifi_chip_type) |
| |
| # Scans using antenna 'all'. |
| self._ScanSignals(self._services, 'all') |
| |
| # Gets the service with the largest strength to test for each spec. |
| test_service = self._ChooseMaxStrengthService() |
| if test_service is None: |
| # yapf: disable |
| self.FailTask(f'Services {self.args.services} are not valid.') # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| |
| # Checks 'all' since we have scanned using antenna 'all' already. |
| # yapf: disable |
| self._CheckSpec(test_service, self.args.strength, 'all') # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| |
| # Scans and tests for other antenna config. |
| # yapf: disable |
| for antenna in self.args.strength: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| if antenna == 'all': |
| continue |
| self._ScanSignals(self._services, antenna) |
| # yapf: disable |
| self._CheckSpec(test_service, self.args.strength, antenna) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |