blob: 07b3d01d63ec078652bb6422615d99fa35a8587c [file] [log] [blame]
# Copyright 2013 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A factory test for basic Wifi.
Description
-----------
This test checks if the signal strength of the antennas satisfy the input spec.
This test can accept a list of wireless services but only the strongest one is
used as the test AP and the other APs are ignored.
This test can test signal strength via ``iw dev {device} scan`` or radiotap.
Be sure to set AP correctly.
1. Select a fixed channel instead of auto.
2. Disable the power control in AP.
3. Make sure SSID of AP is unique.
Test Procedure
--------------
1. Accepts a dict of antenna:strength and a list of (ssid, frequency).
2. For each (antenna, AP=(ssid, frequency)), we test the signal strength of it.
3. Chooses AP with maximum strength as the test AP.
4. Checks if (antenna, test AP) is greater than the spec for all antennas.
Dependency
----------
- `iw` utility
- `ifconfig` utility
- `ip` utility (radiotap)
- `tcpdump` utility (radiotap)
- ``iw phy {phy_name} set antenna 1 1`` (switch_antenna)
Examples
--------
To run this test on DUT, add a test item in the test list:
.. test_list::
generic_wireless_examples:WirelessAntenna
Set the 2nd element in a service if you only want to use a specific frequency:
.. test_list::
generic_wireless_examples:WirelessAntennaSpecificFrequency
"""
import abc
import collections
import logging
import re
import struct
import subprocess
import sys
from typing import Dict, List, Literal, Optional, Set, Tuple
from cros.factory.device import device_types
from cros.factory.device import device_utils
from cros.factory.device import wifi
from cros.factory.test import event_log # TODO(chuntsen): Deprecate event log.
from cros.factory.test.i18n import _
from cros.factory.test import session
from cros.factory.test import test_case
from cros.factory.test import test_ui
from cros.factory.testlog import testlog
from cros.factory.utils.arg_utils import Arg
from cros.factory.utils import net_utils
from cros.factory.utils.schema import JSONSchemaDict
from cros.factory.utils import sync_utils
from cros.factory.utils import type_utils
_ARG_SERVICES_SCHEMA = JSONSchemaDict('services schema object', {
'type': 'array',
'items': {
'type': 'array',
'items': [
{'type': 'string'},
{'type': ['integer', 'null']},
{'type': ['string', 'null']}
],
'minItems': 3,
'maxItems': 3
}
})
_ARG_SWITCH_ANTENNA_CONFIG_SCHEMA = JSONSchemaDict(
'switch antenna config schema object',
{
'type': 'object',
'properties': {
'main': {
'type': 'array',
'items': {'type': 'integer'},
'minItems': 2,
'maxItems': 2
},
'aux': {
'type': 'array',
'items': {'type': 'integer'},
'minItems': 2,
'maxItems': 2
},
'all': {
'type': 'array',
'items': {'type': 'integer'},
'minItems': 2,
'maxItems': 2
},
},
'additionalProperties': False,
'required': ['main', 'aux', 'all']
})
_ARG_STRENGTH_SCHEMA = JSONSchemaDict('strength schema object', {
'type': 'object',
'additionalProperties': {'type': 'number'}
})
_DEFAULT_SWITCH_ANTENNA_CONFIG = {'main': [1, 1],
'aux': [2, 2],
'all': [3, 3]}
Antenna = Literal['main', 'aux', 'all']
class SwitchAntennaWiFiChip(wifi.AbstractWiFiChip):
# The scanned result with last_seen value greater than this value
# will be ignored.
_THRESHOLD_LAST_SEEN_MS = 1000
def __init__(self, device, interface, phy_name, services,
switch_antenna_config, switch_antenna_sleep_secs, scan_timeout):
super().__init__(device, interface, phy_name)
self._services = [(service.ssid, service.freq) for service in services]
self._switch_antenna_config = switch_antenna_config
# yapf: disable
self._signal_table = {antenna: {service: [] # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
for service in self._services}
for antenna in self._switch_antenna_config}
self._antenna = None
self._switch_antenna_sleep_secs = switch_antenna_sleep_secs
self._scan_timeout = scan_timeout
def ScanSignal(self, service, antenna, scan_count):
service_index = (service.ssid, service.freq)
for unused_try_count in range(scan_count):
if len(self._signal_table[antenna][service_index]) >= scan_count:
break
self.SwitchAntenna(antenna)
scan_output = self._device.wifi.FilterAccessPoints(
interface=self._interface, frequency=service.freq,
scan_timeout=self._scan_timeout)
# yapf: disable
same_freq_service = {s: [] # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
for s in self._services if s[1] == service.freq}
for ap in scan_output:
scanned_service = (ap.ssid, ap.frequency)
if (ap.last_seen <= self._THRESHOLD_LAST_SEEN_MS and
scanned_service in same_freq_service):
same_freq_service[scanned_service].append(ap)
for scanned_service, duplicates in same_freq_service.items():
if not duplicates:
session.console.warning(
'Can not scan service %s %d.',
scanned_service[0], scanned_service[1])
continue
if len(duplicates) > 1:
session.console.warning(
'There are more than one result for service %s %d.',
scanned_service[0], scanned_service[1])
for ap in duplicates:
session.console.warning(
'mac: %s, ssid: %s, freq: %d, signal %f, last_seen %d ms',
ap.bssid, ap.ssid, ap.frequency, ap.strength, ap.last_seen)
# Use the one with strongest signal if duplicates.
ap = max(duplicates, key=lambda t: t.strength)
session.console.info(
'scan : %s %s %d %f %d ms.', ap.ssid, ap.bssid, ap.frequency,
ap.strength, ap.last_seen)
self._signal_table[antenna][scanned_service].append(ap.strength)
def GetAverageSignal(self, service, antenna):
"""Get the average signal strength of (service, antenna)."""
result = self._signal_table[antenna][(service.ssid, service.freq)]
return sum(result) / len(result) if result else None
def Destroy(self):
"""Restores antenna config to 'all' if it is not 'all'."""
if self._antenna == 'all':
logging.info('Already using antenna "all".')
else:
logging.info('Restore antenna.')
self.SwitchAntenna('all')
def _GetConfiguredAntenna(self) -> Tuple[int, int]:
"""Gets configured antenna.
This method executes `iw` command to fetch configured antenna of the WiFi
chip.
Returns: An integer tuple representing (TX, RX).
Raises:
process_utils.CalledProcessError: When `iw` has non-zero exit code.
wifi.WiFiError: When failed to parse information from `iw` output.
"""
cmd = ['iw', 'phy', self._phy_name, 'info']
output = self._device.CheckOutput(cmd)
pattern = r'Configured Antennas: TX (0x[0-9a-fA-F]+) RX (0x[0-9a-fA-F]+)'
matching = re.findall(pattern, output)
# If `Configured Antennas` does not appear or appears more than once in the
# output then the output is unexpected.
if len(matching) != 1:
raise wifi.WiFiError('Failed to get configured antennas.')
tx = int(matching[0][0], 16)
rx = int(matching[0][1], 16)
return tx, rx
def SwitchAntenna(self, antenna, max_retries=10):
"""Sets antenna using iw command.
Args:
max_retries: The maximum retry time to set antenna.
switch_antenna_sleep_secs: The sleep time after switching antenna and
ifconfig up.
Raises:
WiFiError if fail to set antenna for max_retries tries.
"""
if self._antenna == antenna:
return
tx_bitmap, rx_bitmap = self._switch_antenna_config[antenna]
@sync_utils.RetryDecorator(max_attempt_count=max_retries, interval_sec=0,
target_condition=bool, exceptions_to_catch=[])
def _SwitchAntenna():
self._device.wifi.BringsDownInterface(self._interface)
process = self._device.Popen(
['iw', 'phy', self._phy_name, 'set', 'antenna', str(tx_bitmap),
str(rx_bitmap)],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, log=True)
unused_stdout, stderr = process.communicate()
retcode = process.returncode
if retcode == 0:
# Some WiFi chips do not support switching to certain antenna but return
# zero on `iw` command. We have to explicitly check if antenna is
# switched. See b/273440001.
configured_tx, configured_rx = self._GetConfiguredAntenna()
if (configured_tx, configured_rx) == (tx_bitmap, rx_bitmap):
return True
raise wifi.WiFiError(
'Failed to switch antenna. '
f'Expected TX {hex(tx_bitmap)} RX {hex(rx_bitmap)}, '
f'but got TX {hex(configured_tx)} RX {hex(configured_rx)}.')
# (-95) EOPNOTSUPP Operation not supported on transport endpoint
# Do ifconfig down again may solve this problem.
if retcode == 161:
logging.info('Retry...')
return False
raise wifi.WiFiError(
f'Failed to set antenna. ret code: {int(retcode)}. stderr: {stderr}')
try:
_SwitchAntenna()
except type_utils.MaxRetryError as e:
raise wifi.WiFiError(
f'Failed to set antenna for {max_retries} tries') from e
finally:
self._device.wifi.BringsUpInterface(self._interface,
self._switch_antenna_sleep_secs)
self._antenna = antenna
class DisableSwitchWiFiChip(SwitchAntennaWiFiChip):
def SwitchAntenna(self, antenna, unused_max_retries=10):
if self._antenna == antenna:
return
logging.info('Switching antenna is disabled. Skipping setting antenna to'
' %s. Just bring up the interface.', antenna)
# Bring up the interface because IwSetAntenna brings up interface after
# antenna is switched.
self._device.wifi.BringsUpInterface(self._interface,
self._switch_antenna_sleep_secs)
self._antenna = antenna
_RE_BEACON = re.compile(r'(\d+) MHz.*Beacon \((.+)\)')
class RadiotapPacket:
# yapf: disable
FIELD = collections.namedtuple('Field', ['name', 'struct', 'align']) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
ANTENNA_SIGNAL_FIELD = FIELD('Antenna Signal', struct.Struct('b'), 0)
ANTENNA_INDEX_FIELD = FIELD('Antenna Index', struct.Struct('B'), 0)
EXTENDED_BIT = 31
FIELDS = [
FIELD('TSFT', struct.Struct('Q'), 8),
FIELD('Flags', struct.Struct('B'), 0),
FIELD('Rate', struct.Struct('B'), 0),
FIELD('Channel', struct.Struct('HH'), 2),
FIELD('FHSS', struct.Struct('BB'), 0),
ANTENNA_SIGNAL_FIELD,
FIELD('Antenna Noise', struct.Struct('b'), 0),
FIELD('Lock Quality', struct.Struct('H'), 2),
FIELD('TX Attenuation', struct.Struct('H'), 2),
FIELD('dB TX Attenuation', struct.Struct('H'), 2),
FIELD('dBm TX Power', struct.Struct('b'), 1),
ANTENNA_INDEX_FIELD,
FIELD('dB Antenna Signal', struct.Struct('B'), 0),
FIELD('dB Antenna Noise', struct.Struct('B'), 0),
FIELD('RX Flags', struct.Struct('H'), 2),
FIELD('TX Flags', struct.Struct('H'), 2),
FIELD('RTS Retries', struct.Struct('B'), 0),
FIELD('Data Retries', struct.Struct('B'), 0),
None,
FIELD('MCS', struct.Struct('BBB'), 1),
FIELD('AMPDU status', struct.Struct('IHBB'), 4),
FIELD('VHT', struct.Struct('HBBBBBBBBH'), 2),
FIELD('Timestamp', struct.Struct('QHBB'), 8),
None,
None,
None,
None,
None,
None]
MAIN_HEADER_FORMAT = struct.Struct('BBhI')
# yapf: disable
PARSE_INFO = collections.namedtuple('AntennaData', ['header_size', # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
'data_bytes',
'antenna_offsets'])
# This is a variable-length header, but this is what we want to see.
# It is formed by the format string 'BBhI' from `MAIN_HEADER_FORMAT` and 'II'.
EXPECTED_HEADER_FORMAT = struct.Struct('BBhIII')
@classmethod
def Decode(cls, packet_bytes):
"""Returns signal strength data for each antenna.
Format is {all_signal, {antenna_index, antenna_signal}}.
"""
if len(packet_bytes) < RadiotapPacket.EXPECTED_HEADER_FORMAT.size:
return None
parts = RadiotapPacket.EXPECTED_HEADER_FORMAT.unpack_from(packet_bytes)
present0, present1, present2 = parts[3:]
parse_info = RadiotapPacket.ParseHeader([present0, present1, present2])
required_bytes = parse_info.header_size + parse_info.data_bytes
if len(packet_bytes) < required_bytes:
return None
antenna_data = []
for datum in filter(bool, parse_info.antenna_offsets):
signal = datum.get(RadiotapPacket.ANTENNA_SIGNAL_FIELD)
if RadiotapPacket.ANTENNA_SIGNAL_FIELD not in datum:
continue
signal_offset = datum[RadiotapPacket.ANTENNA_SIGNAL_FIELD]
signal, = RadiotapPacket.ANTENNA_SIGNAL_FIELD.struct.unpack_from(
packet_bytes[(signal_offset + parse_info.header_size):])
if RadiotapPacket.ANTENNA_INDEX_FIELD in datum:
index_offset = datum[RadiotapPacket.ANTENNA_INDEX_FIELD]
index, = RadiotapPacket.ANTENNA_SIGNAL_FIELD.struct.unpack_from(
packet_bytes[(index_offset + parse_info.header_size):])
antenna_data.append((index, signal))
else:
antenna_data.append(signal)
return antenna_data
@classmethod
def ParseHeader(cls, field_list):
"""Returns packet information of the radiotap header should have."""
header_size = RadiotapPacket.MAIN_HEADER_FORMAT.size
data_bytes = 0
# yapf: disable
antenna_offsets = [] # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
for bitmask in field_list:
antenna_offsets.append({})
for bit, field in enumerate(RadiotapPacket.FIELDS):
if bitmask & (1 << bit):
if field is None:
session.console.warning(
'Unknown field at bit %d is given in radiotap packet, the '
'result would probably be wrong...')
continue
if field.align and (data_bytes % field.align):
data_bytes += field.align - (data_bytes % field.align)
if field in (RadiotapPacket.ANTENNA_SIGNAL_FIELD,
RadiotapPacket.ANTENNA_INDEX_FIELD):
antenna_offsets[-1][field] = data_bytes
data_bytes += field.struct.size
if not bitmask & (1 << RadiotapPacket.EXTENDED_BIT):
break
header_size += 4
else:
raise NotImplementedError('Packet has too many extensions for me!')
# Offset the antenna fields by the header size.
return RadiotapPacket.PARSE_INFO(header_size, data_bytes, antenna_offsets)
class Capture:
"""Context for a live tcpdump packet capture for beacons."""
def __init__(self, dut: device_types.DeviceBoard, device_name, phy,
keep_monitor):
self.dut = dut
self.monitor_process = None
self.created_device = None
self.parent_device = device_name
self.phy = phy
self.keep_monitor = keep_monitor
def CreateDevice(self, monitor_device='antmon0'):
"""Creates a monitor device to monitor beacon."""
# This command returns 0 if the monitor_device exists.
return_value = self.dut.Call(['iw', 'dev', monitor_device, 'info'],
log=True)
if return_value == 0:
self.created_device = monitor_device
if self.keep_monitor:
return
# The device may exist if the test is aborted after CreateDevice and
# before RemoveDevice. We remove it here to make sure we use a new
# monitor.
self.RemoveDevice()
# This command creates the monitor_device.
self.dut.CheckCall(['iw', self.parent_device, 'interface', 'add',
monitor_device, 'type', 'monitor'], log=True)
self.created_device = monitor_device
def RemoveDevice(self):
"""Removes the monitor device."""
if self.created_device is None:
return
if not self.keep_monitor:
# This command removes the monitor_device.
self.dut.CheckCall(['iw', self.created_device, 'del'], log=True)
self.created_device = None
def GetSignal(self):
"""Gets signal from tcpdump."""
while True:
# yapf: disable
line = self.monitor_process.stdout.readline() # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
m = _RE_BEACON.search(line)
if m:
freq = int(m.group(1))
ssid = m.group(2)
break
packet_bytes = b''
while True:
# yapf: disable
line = self.monitor_process.stdout.readline() # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
if not line.startswith('\t0x'):
break
# Break up lines of the form "\t0x0000: abcd ef" into a bytes
# b"\xab\xcd\xef".
parts = line[3:].split()
for part in parts[1:]:
packet_bytes += bytes((int(part[:2], 16),))
if len(part) > 2:
packet_bytes += bytes((int(part[2:], 16),))
packet = RadiotapPacket.Decode(packet_bytes)
if packet:
return {'ssid': ssid, 'freq': freq, 'signal': packet}
def set_beacon_filter(self, value):
"""Sets beacon filter.
This function is currently only needed for Intel WiFi.
"""
path = (
f'/sys/kernel/debug/ieee80211/{self.phy}/netdev:{self.parent_device}'
f'/iwlmvm/bf_params')
# yapf: disable
if self.dut.path.exists(path): # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
session.console.info('Setting beacon filter (enable=%d) for Intel WiFi',
value)
self.dut.WriteFile(path, f'bf_enable_beacon_filter={int(value)}\n')
def Create(self):
if not self.created_device:
self.CreateDevice()
self.dut.CheckCall(
# yapf: disable
['ip', 'link', 'set', self.created_device, 'up'], log=True) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
self.dut.CheckCall(
['iw', self.parent_device, 'set', 'power_save', 'off'], log=True)
self.set_beacon_filter(0)
# yapf: disable
self.monitor_process = self.dut.Popen( # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
['tcpdump', '-nUxxi', self.created_device, 'type', 'mgt', # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
'subtype', 'beacon'], stdout=subprocess.PIPE, log=True)
def Destroy(self):
if self.monitor_process:
self.monitor_process.kill()
self.monitor_process.wait()
self.monitor_process = None
self.set_beacon_filter(1)
self.RemoveDevice()
class AbstractNonSwitchableWiFiChip(wifi.AbstractWiFiChip):
"""Abstract class of non-switchable WiFi chip."""
def __init__(self, device: device_types.DeviceBoard, interface: str,
phy_name: str, connect_timeout: int, scan_timeout: int):
super().__init__(device, interface, phy_name)
self._connect_timeout = connect_timeout
self._scan_timeout = scan_timeout
# A tuple (SSID, frequency, antenna)-to-signal strengths mapping.
self._signal_mapping: Dict[Tuple[str, int, Antenna],
List[int]] = collections.defaultdict(list)
self._ap: Optional[wifi.AccessPoint] = None
self._connection: Optional[wifi.Connection] = None
def ScanSignal(self, service: wifi.ServiceSpec, antenna: Antenna,
scan_count: int) -> None:
"""See wifi.AbstractWiFiChip.ScanSignal."""
# yapf: disable
ssid = service.ssid # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
freq = service.freq # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
assert freq is not None
record_count = len(self._signal_mapping[(ssid, freq, antenna)])
if record_count >= scan_count:
return
session.console.info(f'Switching to AP {ssid} {freq:d}...')
if not self._ConnectService(ssid=ssid, freq=freq,
# yapf: disable
password=service.password): # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
return
self._MeasureSignalStrength(ssid, freq, scan_count - record_count)
assert len(self._signal_mapping[(ssid, freq, antenna)]) >= scan_count
self._DisconnectService()
@abc.abstractmethod
def _MeasureSignalStrength(self, ssid: str, freq: int,
measure_count: int) -> None:
"""Measures signal strengths of an AP for each antenna.
Args:
ssid: The AP SSID.
freq: The AP frequency.
measure_count: The count to measure the signal strength.
"""
raise NotImplementedError
def GetAverageSignal(self, service: wifi.ServiceSpec,
antenna: Antenna) -> Optional[float]:
"""See wifi.AbstractWiFiChip.GetAverageSignal."""
# yapf: disable
assert service.freq is not None # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
result = self._signal_mapping[(service.ssid, service.freq, antenna)] # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
return sum(result) / len(result) if result else None
def _ConnectService(self, ssid: str, freq: int,
password: Optional[str]) -> bool:
"""Connects to a WiFi AP.
Args:
ssid: The WiFi AP SSID.
freq: The WiFi AP frequency.
password: The WiFi AP password; or None if no password required.
Returns:
True if successfully connected to the WiFi AP; otherwise False.
"""
try:
self._ap = self._device.wifi.FindAccessPoint(
ssid=ssid, interface=self._interface, frequency=freq,
scan_timeout=self._scan_timeout)
except wifi.WiFiError as e:
session.console.info(f'Unable to find the service {ssid}: {e!r}')
return False
try:
self._connection = self._device.wifi.Connect(
self._ap, interface=self._interface, passkey=password,
connect_timeout=self._connect_timeout)
except type_utils.TimeoutError:
session.console.info(f'Unable to connect to the service {ssid}')
return False
session.console.info('Successfully connected to service %s', ssid)
return True
def _DisconnectService(self) -> None:
"""Disconnects from the WiFi AP."""
if self._connection:
assert self._ap is not None
self._connection.Disconnect()
session.console.info('Disconnect to service %s', self._ap.ssid)
self._ap = None
self._connection = None
def Destroy(self):
"""See wifi.AbstractWiFiChip.Destroy."""
self._DisconnectService()
class RadiotapWiFiChip(AbstractNonSwitchableWiFiChip):
def __init__(self, device: device_types.DeviceBoard, interface: str,
phy_name: str, connect_timeout: int, scan_timeout: int,
keep_monitor: bool):
super().__init__(device=device, interface=interface, phy_name=phy_name,
connect_timeout=connect_timeout, scan_timeout=scan_timeout)
self._keep_monitor = keep_monitor
def _MeasureSignalStrength(self, ssid: str, freq: int,
measure_count: int) -> None:
"""See NonSwitchableWiFiChip._MeasureSignalStrength"""
capture = Capture(self._device, self._interface, self._phy_name,
self._keep_monitor)
capture_times = 0
try:
capture.Create()
while capture_times < measure_count:
signal_result = capture.GetSignal()
if signal_result['ssid'] == ssid and signal_result['freq'] == freq:
session.console.info('%s', signal_result)
signal = signal_result['signal']
self._signal_mapping[(ssid, freq, 'all')].append(signal[0])
self._signal_mapping[(ssid, freq, 'main')].append(signal[1][1])
self._signal_mapping[(ssid, freq, 'aux')].append(signal[2][1])
capture_times += 1
else:
session.console.info('Ignore the signal %r', signal_result)
finally:
capture.Destroy()
class StationDumpWiFiChip(AbstractNonSwitchableWiFiChip):
"""WiFi chip type which measures signal strengths by running `iw` command.
This type of WiFi chip measures signal strengths by the following steps:
1. Connect to the target AP.
2. Run `iw <interface> station dump` command.
3. Parse the output from the above command, which includes signal strength
information of antennas of "main", "aux", and "all".
"""
def _MeasureSignalStrength(self, ssid: str, freq: int,
measure_count: int) -> None:
"""See NonSwitchableWiFiChip._MeasureSignalStrength."""
iw_command = ['iw', self._interface, 'station', 'dump']
for _ in range(measure_count):
try:
output = self._device.CheckOutput(iw_command)
except device_types.CalledProcessError as e:
raise wifi.WiFiError(
f'Failed to measure signal strength: {e!r}') from None
try:
connection_status = net_utils.ParseWirelessInterfaceStationDumpOutput(
output)
except ValueError as e:
raise wifi.WiFiError(str(e)) from e
assert connection_status.signal
assert len(connection_status.signal.antenna) == 2
all_strength = connection_status.signal.computed
self._signal_mapping[(ssid, freq, 'all')].append(all_strength)
main_strength, aux_strength = connection_status.signal.antenna
self._signal_mapping[(ssid, freq, 'main')].append(main_strength)
self._signal_mapping[(ssid, freq, 'aux')].append(aux_strength)
class WirelessTest(test_case.TestCase):
"""Basic wireless test class.
Properties:
_phy_name: wireless phy name to test.
"""
related_components = (test_case.TestCategory.WIFI, )
ARGS = [
Arg('device_name', str,
('Wireless device name to test. e.g. wlan0. If not specified, it will'
'fail if multiple devices are found, otherwise use the only one '
'device it found.'), default=None),
Arg('services', list, (
'A list of ``[<service_ssid>:str, <freq>:int|None, '
'<password>:str|None]`` sequences like ``[[SSID1, FREQ1, PASS1], '
'[SSID2, FREQ2, PASS2], ...]``. Each sequence should contain '
'exactly 3 items. If ``<freq>`` is ``None`` the test '
'will detect the frequency by ``iw <device_name> scan`` command '
'automatically. ``<password>=None`` implies the service can connect '
'without a password.'), schema=_ARG_SERVICES_SCHEMA),
Arg('ignore_missing_services', bool,
('Ignore services that are not found during scanning. This argument '
'is not needed for switch antenna wifi chip'), default=False),
Arg('scan_timeout', int, 'Timeout for scanning the services.',
default=20),
Arg('connect_timeout', int, 'Timeout for connecting to the service.',
default=10),
Arg('strength', dict,
('A dict of minimal signal strengths. For example, a dict like '
'``{"main": strength_1, "aux": strength_2, "all": strength_all}``. '
'The test will check signal strength according to the different '
'antenna configurations in this dict.'),
schema=_ARG_STRENGTH_SCHEMA),
Arg('scan_count', int, 'Number of scans to get average signal strength.',
default=5),
Arg('switch_antenna_config', dict,
('A dict of ``{"main": (tx, rx), "aux": (tx, rx), "all": (tx, rx)}`` '
'for the config when switching the antenna.'),
default=_DEFAULT_SWITCH_ANTENNA_CONFIG,
schema=_ARG_SWITCH_ANTENNA_CONFIG_SCHEMA),
Arg('switch_antenna_sleep_secs', int,
('The sleep time after switching antenna and ifconfig up. Need to '
'decide this value carefully since it depends on the platform and '
'antenna config to test.'), default=10),
Arg('press_space_to_start', bool, 'Press space to start the test.',
default=True),
Arg('wifi_chip_type', str,
('The type of wifi chip. Indicates how the chip test the signal '
'strength of different antennas. Currently, the valid options are '
'``switch_antenna``, ``radiotap``, ``station_dump``, or '
'``disable_switch``. If the value is None, it will detect the value '
'automatically. Note that Qualcomm Atheros chip does not support '
'auto-detection.'), default=None),
Arg('keep_monitor', bool,
('Set to True for WiFi driver that does not support '
'``iw dev antmon0 del``.'), default=False),
]
def setUp(self):
# yapf: disable
self.ui.ToggleTemplateClass('font-large', True) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
self._dut = device_utils.CreateDUTInterface()
self._device_name = None
self._phy_name = None
self._services = [wifi.ServiceSpec(ssid, freq, password)
# yapf: disable
for ssid, freq, password in self.args.services] # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
self.assertTrue(self._services, 'At least one service should be specified.')
self._wifi_chip_type = None
self._wifi_chip = None
# yapf: disable
if (self.args.wifi_chip_type == 'disable_switch' and # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
list(self.args.strength) != ['all']): # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
self.FailTask(f'Switching antenna is disabled but antenna configs are ' # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
f'{list(self.args.strength)}')
# Group checker for Testlog.
self._service_group_checker = testlog.GroupParam(
'service_signal', ['service', 'service_strength'])
testlog.UpdateParam('service', param_type=testlog.ParamType.argument)
self._antenna_group_checker = testlog.GroupParam(
'antenna', ['antenna', 'freq', 'strength'])
testlog.UpdateParam('antenna', param_type=testlog.ParamType.argument)
testlog.UpdateParam('freq', param_type=testlog.ParamType.argument)
def tearDown(self):
"""Restores wifi states."""
if self._wifi_chip:
self._wifi_chip.Destroy()
def _ChooseMaxStrengthService(self):
"""Chooses the service that has the largest signal strength among services.
Returns:
The service that has the largest signal strength among services.
"""
max_strength_service, max_strength = None, -sys.float_info.max
for service in self._services:
# yapf: disable
strength = self._wifi_chip.GetAverageSignal(service, 'all') # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
if strength:
session.console.info('Service %s signal strength %f.', service,
strength)
# yapf: disable
event_log.Log('service_signal', service=service.ssid, strength=strength) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
with self._service_group_checker:
# yapf: disable
testlog.LogParam('service', service.ssid) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
testlog.LogParam('service_strength', strength)
if strength > max_strength:
max_strength_service, max_strength = service, strength
else:
session.console.info('Service %s has no valid signal strength.',
service)
if max_strength_service:
session.console.info('Service %s has the highest signal strength %f '
'among %s.', max_strength_service, max_strength,
self._services)
return max_strength_service
session.console.warning('Services %s are not valid.', self._services)
return None
def _ScanSignals(self, services, antenna):
"""Scans and averages signal strengths for services.
Args:
services: A list of (service_ssid, freq) tuples to scan.
antenna: The antenna config to scan.
"""
for service in services:
# yapf: disable
self.ui.SetState( # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
_('Scanning on device {device} frequency {freq}...',
device=self._device_name,
freq=service.freq))
# yapf: disable
self._wifi_chip.ScanSignal(service, antenna, self.args.scan_count) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
self.ui.SetState( # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
_('Done scanning on device {device} frequency {freq}...',
device=self._device_name,
freq=service.freq))
def _CheckSpec(self, service, spec_antenna_strength, antenna):
"""Checks if the scan result of antenna config can meet test spec.
Args:
service: (service_ssid, freq, password) tuple.
spec_antenna_strength: A dict of minimal signal strengths.
antenna: The antenna config to check.
"""
session.console.info('Checking antenna %s spec', antenna)
# yapf: disable
scanned_strength = self._wifi_chip.GetAverageSignal(service, antenna) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
spec_strength = spec_antenna_strength[antenna]
if not scanned_strength:
self.FailTask(
f'Antenna {antenna}, service: {service}: Can not scan signal '
f'strength.')
event_log.Log(f'antenna_{antenna}', freq=service.freq,
rssi=scanned_strength,
meet=(scanned_strength and scanned_strength >= spec_strength))
with self._antenna_group_checker:
testlog.LogParam('antenna', antenna)
testlog.LogParam('freq', service.freq)
result = testlog.CheckNumericParam('strength', scanned_strength,
min=spec_strength)
if not result:
self.FailTask(
f'Antenna {antenna}, service: {service}: The scanned strength '
f'{scanned_strength:f} < spec strength {spec_strength:f}')
else:
session.console.info(
'Antenna %s, service: %s: The scanned strength %f >= spec strength'
' %f', antenna, service, scanned_strength, spec_strength)
def _DetectWiFiChipType(self):
# yapf: disable
self.ui.SetState(_('Detecting wifi chip type...')) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
self._wifi_chip_type = self.args.wifi_chip_type # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
if not self._wifi_chip_type or self._wifi_chip_type == 'switch_antenna':
self._wifi_chip = SwitchAntennaWiFiChip(
self._dut, self._device_name, self._phy_name, self._services,
# yapf: disable
self.args.switch_antenna_config, self.args.switch_antenna_sleep_secs, # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
self.args.scan_timeout) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
if self._wifi_chip_type:
return
# If wifi_chip_type is not specified and the device is able to switch
# antenna then we assume the chip type is switch_antenna.
last_success_antenna = None
# yapf: disable
for antenna in self.args.strength: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
try:
self._wifi_chip.SwitchAntenna(antenna)
last_success_antenna = antenna
except wifi.WiFiError as e:
session.console.info('Unable to switch antenna to %s. %r', antenna, e)
break
else:
# All antennas are switchable.
self._wifi_chip_type = 'switch_antenna'
return
if last_success_antenna:
# Switch back to antenna all.
try:
self._wifi_chip.SwitchAntenna('all')
except wifi.WiFiError:
session.console.info(
'Unable to switch antenna to all after switch to %s.',
last_success_antenna)
raise
if not self._wifi_chip_type or self._wifi_chip_type == 'radiotap':
self._wifi_chip = RadiotapWiFiChip(
# yapf: disable
device=self._dut, interface=self._device_name, # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
phy_name=self._phy_name, connect_timeout=self.args.connect_timeout, # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
scan_timeout=self.args.scan_timeout, # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
keep_monitor=self.args.keep_monitor) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
self._wifi_chip_type = 'radiotap'
return
if self._wifi_chip_type == 'disable_switch':
self._wifi_chip = DisableSwitchWiFiChip(
self._dut, self._device_name, self._phy_name, self._services,
# yapf: disable
self.args.switch_antenna_config, self.args.switch_antenna_sleep_secs, # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
self.args.scan_timeout) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
return
if self._wifi_chip_type == 'station_dump':
self._wifi_chip = StationDumpWiFiChip(
# yapf: disable
device=self._dut, interface=self._device_name, # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
phy_name=self._phy_name, connect_timeout=self.args.connect_timeout, # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
scan_timeout=self.args.scan_timeout) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
return
raise ValueError(f'Wifi chip type {self._wifi_chip_type} is not supported.')
def _ScanAllServices(self) -> None:
# yapf: disable
self.ui.SetState(_('Checking frequencies...')) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
scan_result = self._dut.wifi.FilterAccessPoints(
# yapf: disable
interface=self._device_name, scan_timeout=self.args.scan_timeout) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
ssid_freqs = {service.ssid: set() for service in self._services} # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
for scanned_service in scan_result:
if scanned_service.ssid in ssid_freqs:
ssid_freqs[scanned_service.ssid].add(scanned_service.frequency)
resolved_service_specs: Set[wifi.ServiceSpec] = set()
for service in self._services:
# yapf: disable
if not ssid_freqs[service.ssid]: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
error_message = f'The service {service.ssid} is not found.' # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
if self.args.ignore_missing_services: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
logging.info('%s Ignore this service and continue the test.',
error_message)
continue
self.FailTask(error_message)
# yapf: disable
elif service.freq is None: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
for freq in ssid_freqs[service.ssid]: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
resolved_service_specs.add(
# yapf: disable
wifi.ServiceSpec(service.ssid, freq, service.password)) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
elif service.freq not in ssid_freqs[service.ssid]: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
error_message = (
# yapf: disable
f'Frequency {service.freq} is not supported by the service ' # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
f'{service.ssid}. Available frequencies are '
f'{ssid_freqs[service.ssid]!r}.')
# yapf: disable
if self.args.ignore_missing_services: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
logging.info('%s Ignore this service and continue the test.',
error_message)
continue
self.FailTask(error_message)
else:
resolved_service_specs.add(service)
self._services = list(resolved_service_specs)
def _TrySetRegionUSFor6G(self):
"""Set region for testing 6G in the factory."""
LOWEST_6G_FREQ = 5955
HIGHEST_6G_FREQ = 7115
# yapf: disable
has_6G = any(LOWEST_6G_FREQ <= service.freq <= HIGHEST_6G_FREQ # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
for service in self._services
# yapf: disable
if service.freq is not None) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
if not has_6G:
return
# factory_iw is the binary which sets region.
factory_iw = 'factory_iw'
if self._dut.Call(['which', factory_iw], log=True) == 0:
self._dut.wifi.BringsUpInterface(self._device_name)
# Set region to US.
self._dut.CheckCall(
[factory_iw, self._device_name, 'iwl', 'country', 'US'], log=True)
self.Sleep(5)
else:
session.console.info('%r is not installed. Cannot set region.',
factory_iw)
def runTest(self):
# yapf: disable
self._device_name = self._dut.wifi.SelectInterface(self.args.device_name) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
session.console.info('Selected device_name is %s.', self._device_name)
self._phy_name = self._dut.wifi.DetectPhyName(self._device_name)
session.console.info('phy name is %s.', self._phy_name)
# yapf: disable
if self.args.press_space_to_start: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# Prompts a message to tell operator to press space key when ready.
# yapf: disable
self.ui.SetState(_('Press space to start scanning.')) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
self.ui.WaitKeysOnce(test_ui.SPACE_KEY) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
self._TrySetRegionUSFor6G()
self._ScanAllServices()
self._DetectWiFiChipType()
session.console.info('Wifi chip type is %s.', self._wifi_chip_type)
# Scans using antenna 'all'.
self._ScanSignals(self._services, 'all')
# Gets the service with the largest strength to test for each spec.
test_service = self._ChooseMaxStrengthService()
if test_service is None:
# yapf: disable
self.FailTask(f'Services {self.args.services} are not valid.') # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# Checks 'all' since we have scanned using antenna 'all' already.
# yapf: disable
self._CheckSpec(test_service, self.args.strength, 'all') # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# Scans and tests for other antenna config.
# yapf: disable
for antenna in self.args.strength: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
if antenna == 'all':
continue
self._ScanSignals(self._services, antenna)
# yapf: disable
self._CheckSpec(test_service, self.args.strength, antenna) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable