blob: 3168a1679b72755628efb9c7af06125ab438b492 [file] [log] [blame]
# Copyright 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A factory test for checking Wifi antenna for Intel WP2 7260 chip.
Description
-----------
The test calculates the average RSSI values for each antenna by decoding
the radiotap-wrapped beacon frames sent by the AP service.
Be sure to set AP correctly.
1. Select one fixed channel instead of auto.
2. Disable the TX power control in AP.
3. Make sure SSID of AP is unique.
Test Procedure
--------------
The test accepts a list of wireless service specs.
1. For each service candidate, the test does following steps:
a. Connect to that service.
b. Monitor the beacon frame sent by the AP in radiotap format.
c. Calculate the average RSSI values of each antenna.
2. Among all RSSI results, the test chooses the one whose antenna_all signal
strength is the largest for checking the spec.
Dependency
----------
- `iw` utility
- `ip` utility
- `tcpdump` utility
Examples
--------
To run this test on DUT, add a test item in the test list::
{
"pytest_name": "wireless_radiotap",
"args": {
"services": [
["my_ap_service", 5300, "wifi_password"]
]
}
}
If the wifi service only serves one frequency (you can check by
``iw wlan0 scan``), you can also ask the test to detect the frequency value
by the ``iw scan`` command::
{
"pytest_name": "wireless_radiotap",
"args": {
"services": [
["my_ap_service", None, "wifi_password"]
]
}
}
"""
import collections
import re
import struct
import subprocess
import sys
import time
import factory_common # pylint: disable=unused-import
from cros.factory.device import device_utils
from cros.factory.device import wifi
from cros.factory.test import event_log # TODO(chuntsen): Deprecate event log.
from cros.factory.test.i18n import _
from cros.factory.test import session
from cros.factory.test import test_case
from cros.factory.test import test_ui
from cros.factory.testlog import testlog
from cros.factory.utils.arg_utils import Arg
from cros.factory.utils import type_utils
_RE_IWSCAN = re.compile(r'freq: (\d+).*SSID: (.+)$')
_RE_WIPHY = re.compile(r'wiphy (\d+)')
_RE_BEACON = re.compile(r'(\d+) MHz.*Beacon \((.+)\)')
_ANTENNA_CONFIG = ['all', 'main', 'aux']
class ServiceSpec(type_utils.Obj):
def __init__(self, ssid, freq, password):
super(ServiceSpec, self).__init__(ssid=ssid, freq=freq, password=password)
def __hash__(self):
return hash((self.ssid, self.freq, self.password))
def IwScan(dut, iw_scan_group_checker, devname,
sleep_retry_time_secs=2, max_retries=10):
"""Scans on device.
Args:
devname: device name.
sleep_retry_time_secs: The sleep time before a retry.
max_retries: The maximum retry time to scan.
Returns:
A list of `ServiceSpec` instance.
Raises:
IwException if fail to scan for max_retries tries,
or fail because of reason other than device or resource busy (-16)
"""
cmd = ("iw %s scan | grep -e '^\\s*\\(freq\\|SSID\\):' | sed 'N;s/\\n/ /'" %
devname)
for unused_try_count in xrange(max_retries):
process = dut.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
log=True)
stdout, stderr = process.communicate()
retcode = process.returncode
event_log.Log('iw_scaned', retcode=retcode, stderr=stderr)
with iw_scan_group_checker:
testlog.LogParam('retcode', retcode)
testlog.LogParam('stderr', stderr)
if retcode == 0:
scan_result = []
for line in stdout.splitlines():
m = _RE_IWSCAN.search(line)
if m:
scan_result.append(ServiceSpec(m.group(2), int(m.group(1)), None))
if scan_result:
session.console.info('IwScan success.')
return scan_result
elif retcode == 234: # Invalid argument (-22)
raise Exception('Failed to iw scan, ret code: %d. stderr: %s'
'Frequency might be wrong.' %
(retcode, stderr))
elif retcode != 240: # Device or resource busy (-16)
raise Exception('Failed to iw scan, ret code: %d. stderr: %s' %
(retcode, stderr))
time.sleep(sleep_retry_time_secs)
raise Exception('Failed to iw scan for %s tries' % max_retries)
class RadiotapPacket(object):
FIELD = collections.namedtuple('Field', ['name', 'struct', 'align'])
ANTENNA_SIGNAL_FIELD = FIELD('Antenna Signal', struct.Struct('b'), 0)
ANTENNA_INDEX_FIELD = FIELD('Antenna Index', struct.Struct('B'), 0)
EXTENDED_BIT = 31
FIELDS = [
FIELD('TSFT', struct.Struct('Q'), 8),
FIELD('Flags', struct.Struct('B'), 0),
FIELD('Rate', struct.Struct('B'), 0),
FIELD('Channel', struct.Struct('HH'), 2),
FIELD('FHSS', struct.Struct('BB'), 0),
ANTENNA_SIGNAL_FIELD,
FIELD('Antenna Noise', struct.Struct('b'), 0),
FIELD('Lock Quality', struct.Struct('H'), 2),
FIELD('TX Attenuation', struct.Struct('H'), 2),
FIELD('dB TX Attenuation', struct.Struct('H'), 2),
FIELD('dBm TX Power', struct.Struct('b'), 1),
ANTENNA_INDEX_FIELD,
FIELD('dB Antenna Signal', struct.Struct('B'), 0),
FIELD('dB Antenna Noise', struct.Struct('B'), 0),
FIELD('RX Flags', struct.Struct('H'), 2),
FIELD('TX Flags', struct.Struct('H'), 2),
FIELD('RTS Retries', struct.Struct('B'), 0),
FIELD('Data Retries', struct.Struct('B'), 0),
None,
FIELD('MCS', struct.Struct('BBB'), 1),
FIELD('AMPDU status', struct.Struct('IHBB'), 4),
FIELD('VHT', struct.Struct('HBBBBBBBBH'), 2),
FIELD('Timestamp', struct.Struct('QHBB'), 8),
None,
None,
None,
None,
None,
None]
MAIN_HEADER_FORMAT = struct.Struct('BBhI')
PARSE_INFO = collections.namedtuple('AntennaData', ['header_size',
'data_bytes',
'antenna_offsets'])
# This is a variable-length header, but this is what we want to see.
EXPECTED_HEADER_FORMAT = struct.Struct(MAIN_HEADER_FORMAT.format + 'II')
@staticmethod
def Decode(packet_bytes):
"""Returns signal strength data for each antenna.
Format is {all_signal, {antenna_index, antenna_signal}}.
"""
if len(packet_bytes) < RadiotapPacket.EXPECTED_HEADER_FORMAT.size:
return None
parts = RadiotapPacket.EXPECTED_HEADER_FORMAT.unpack_from(packet_bytes)
present0, present1, present2 = parts[3:]
parse_info = RadiotapPacket.ParseHeader([present0, present1, present2])
required_bytes = parse_info.header_size + parse_info.data_bytes
if len(packet_bytes) < required_bytes:
return None
antenna_data = []
for datum in filter(bool, parse_info.antenna_offsets):
signal = datum.get(RadiotapPacket.ANTENNA_SIGNAL_FIELD)
if RadiotapPacket.ANTENNA_SIGNAL_FIELD not in datum:
continue
signal_offset = datum[RadiotapPacket.ANTENNA_SIGNAL_FIELD]
signal, = RadiotapPacket.ANTENNA_SIGNAL_FIELD.struct.unpack_from(
packet_bytes[(signal_offset + parse_info.header_size):])
if RadiotapPacket.ANTENNA_INDEX_FIELD in datum:
index_offset = datum[RadiotapPacket.ANTENNA_INDEX_FIELD]
index, = RadiotapPacket.ANTENNA_SIGNAL_FIELD.struct.unpack_from(
packet_bytes[(index_offset + parse_info.header_size):])
antenna_data.append((index, signal))
else:
antenna_data.append(signal)
return antenna_data
@staticmethod
def ParseHeader(field_list):
"""Returns packet information of the radiotap header should have."""
header_size = RadiotapPacket.MAIN_HEADER_FORMAT.size
data_bytes = 0
antenna_offsets = []
for bitmask in field_list:
antenna_offsets.append({})
for bit, field in enumerate(RadiotapPacket.FIELDS):
if bitmask & (1 << bit):
if field is None:
session.console.warning(
'Unknown field at bit %d is given in radiotap packet, the '
'result would probably be wrong...')
continue
if field.align and (data_bytes % field.align):
data_bytes += field.align - (data_bytes % field.align)
if (field == RadiotapPacket.ANTENNA_SIGNAL_FIELD or
field == RadiotapPacket.ANTENNA_INDEX_FIELD):
antenna_offsets[-1][field] = data_bytes
data_bytes += field.struct.size
if not bitmask & (1 << RadiotapPacket.EXTENDED_BIT):
break
header_size += 4
else:
raise NotImplementedError('Packet has too many extensions for me!')
# Offset the antenna fields by the header size.
return RadiotapPacket.PARSE_INFO(header_size, data_bytes, antenna_offsets)
class Capture(object):
"""Context for a live tcpdump packet capture for beacons."""
def __init__(self, dut, device_name, phy):
self.dut = dut
self.monitor_process = None
self.created_device = None
self.parent_device = device_name
self.phy = phy
def CreateDevice(self, monitor_device='antmon0'):
"""Creates a monitor device to monitor beacon."""
self.dut.CheckCall(['iw', self.parent_device, 'interface', 'add',
monitor_device, 'type', 'monitor'], log=True)
self.created_device = monitor_device
def RemoveDevice(self, device_name):
"""Removes monitor device."""
self.dut.CheckCall(['iw', device_name, 'del'], log=True)
def GetSignal(self):
"""Gets signal from tcpdump."""
while True:
line = self.monitor_process.stdout.readline()
m = _RE_BEACON.search(line)
if m:
freq = int(m.group(1))
ssid = m.group(2)
break
packet_bytes = ''
while True:
line = self.monitor_process.stdout.readline()
if not line.startswith('\t0x'):
break
# Break up lines of the form "\t0x0000: abcd ef" into a string
# "\xab\xcd\xef".
parts = line[3:].split()
for part in parts[1:]:
packet_bytes += chr(int(part[:2], 16))
if len(part) > 2:
packet_bytes += chr(int(part[2:], 16))
packet = RadiotapPacket.Decode(packet_bytes)
if packet:
return {'ssid': ssid, 'freq': freq, 'signal': packet}
def set_beacon_filter(self, value):
"""Sets beacon filter.
This function may only for Intel WP2 7260 chip.
"""
path = '/sys/kernel/debug/ieee80211/%s/netdev:%s/iwlmvm/bf_params' % (
self.phy, self.parent_device)
self.dut.WriteFile(path, 'bf_enable_beacon_filter=%d\n' % value)
def __enter__(self):
if not self.created_device:
self.CreateDevice()
self.dut.CheckCall(
['ip', 'link', 'set', self.created_device, 'up'], log=True)
self.dut.CheckCall(
['iw', self.parent_device, 'set', 'power_save', 'off'], log=True)
self.set_beacon_filter(0)
self.monitor_process = self.dut.Popen(
['tcpdump', '-nUxxi', self.created_device, 'type', 'mgt',
'subtype', 'beacon'], stdout=subprocess.PIPE, log=True)
return self
def __exit__(self, exception, value, traceback):
self.monitor_process.kill()
self.set_beacon_filter(1)
if self.created_device:
self.RemoveDevice(self.created_device)
class WirelessRadiotapTest(test_case.TestCase):
"""Basic wireless test class.
Properties:
_antenna: current antenna config.
_phy_name: wireless phy name to test.
"""
ARGS = [
Arg('device_name', str,
'Wireless device name to test. ', default='wlan0'),
Arg('services', list,
'A list of ``(<service_ssid>:str, <freq>:str|None, '
'<password>:str|None)`` tuples like ``[(SSID1, FREQ1, PASS1), '
'(SSID2, FREQ2, PASS2), ...]``. If ``<freq>`` is ``None`` the test '
'will detect the frequency by ``iw <device_name> scan`` command '
'automatically. ``<password>=None`` implies the service can connect '
'without a password.'),
Arg('connect_timeout', int,
'Timeout for connecting to the service.',
default=10),
Arg('strength', dict,
'A dict of minimal signal strengths. For example, a dict like '
'``{"main": strength_1, "aux": strength_2, "all": strength_all}``. '
'The test will check signal strength according to the different '
'antenna configurations in this dict.'),
Arg('scan_count', int,
'Number of scans to get average signal strength.', default=5),
Arg('press_space_to_start', bool,
'Press space to start the test.', default=True)]
def setUp(self):
self.ui.ToggleTemplateClass('font-large', True)
self._dut = device_utils.CreateDUTInterface()
self._phy_name = None
self._ap = None
self._connection = None
self._services = [ServiceSpec(ssid, freq, password)
for ssid, freq, password in self.args.services]
self.assertTrue(self._services, 'At least one service should be specified.')
# Group checker for Testlog.
self._iw_scan_group_checker = testlog.GroupParam(
'iw_scan', ['retcode', 'stderr'])
self._service_group_checker = testlog.GroupParam(
'service_signal', ['service', 'service_strength'])
testlog.UpdateParam('service', param_type=testlog.PARAM_TYPE.argument)
self._antenna_group_checker = testlog.GroupParam(
'antenna', ['antenna', 'freq', 'strength'])
testlog.UpdateParam('antenna', param_type=testlog.PARAM_TYPE.argument)
testlog.UpdateParam('freq', param_type=testlog.PARAM_TYPE.argument)
def tearDown(self):
self._DisconnectService()
def _ConnectService(self, service_name, password):
"""Associates a specified wifi AP.
Password can be '' or None.
"""
try:
self._ap = self._dut.wifi.FindAccessPoint(ssid=service_name)
except wifi.WifiError as e:
session.console.info(
'Unable to find the service %s: %r' % (service_name, e))
return False
try:
self._connection = self._dut.wifi.Connect(
self._ap, passkey=password, connect_timeout=self.args.connect_timeout)
except type_utils.TimeoutError as e:
session.console.info('Unable to connect to the service %s' % service_name)
return False
session.console.info(
'Successfully connected to service %s', service_name)
return True
def _DisconnectService(self):
"""Disconnect wifi AP."""
if self._connection:
self._connection.Disconnect()
session.console.info('Disconnect to service %s', self._ap.ssid)
self._connection = None
def _DetectPhyName(self):
"""Detects the phy name for device_name device.
Returns:
The phy name for device_name device.
"""
output = self._dut.CheckOutput(
['iw', 'dev', self.args.device_name, 'info'], log=True)
m = _RE_WIPHY.search(output)
return ('phy' + m.group(1)) if m else None
def _ChooseMaxStrengthService(self, service_strengths):
"""Chooses the service that has the largest signal strength among services.
Args:
services: A list of services.
service_strengths: A dict of strengths of each service.
Returns:
The service that has the largest signal strength among services.
"""
max_strength_service, max_strength = None, -sys.float_info.max
for service in self._services:
strength = service_strengths[service]['all']
if strength:
session.console.info('Service %s signal strength %f.', service,
strength)
event_log.Log('service_signal', service=service.ssid, strength=strength)
with self._service_group_checker:
testlog.LogParam('service', service.ssid)
testlog.LogParam('service_strength', strength)
if strength > max_strength:
max_strength_service, max_strength = service, strength
else:
session.console.info('Service %s has no valid signal strength.',
service)
if max_strength_service:
session.console.info('Service %s has the highest signal strength %f '
'among %s.', max_strength_service, max_strength,
self._services)
return max_strength_service
else:
session.console.warning('Services %s are not valid.', self._services)
return None
def _ScanSignal(self, service, times=3):
"""Scans antenna signal strengths for a specified service.
Device should connect to the service before starting to capture signal.
Signal result only includes antenna information of this service
(ssid, freq).
Args:
service: (service_ssid, freq, password) tuple.
times: Number of times to scan to get average.
Returns:
A list of signal result.
"""
signal_list = []
self.ui.SetState(_('Switching to AP {ap}...', ap=service.ssid))
if not self._ConnectService(service.ssid, service.password):
return []
self.ui.SetState(
_('Scanning on device {device} frequency {freq}...',
device=self.args.device_name,
freq=service.freq))
with Capture(self._dut, self.args.device_name, self._phy_name) as capture:
capture_times = 0
while capture_times < times:
signal_result = capture.GetSignal()
if (signal_result['ssid'] == service.ssid and
signal_result['freq'] == service.freq):
session.console.info('%s', signal_result)
signal_list.append(signal_result['signal'])
capture_times += 1
else:
session.console.info('Ignore the signal %r', signal_result)
self.ui.SetState(
_('Done scanning on device {device} frequency {freq}...',
device=self.args.device_name,
freq=service.freq))
self._DisconnectService()
return signal_list
def _AverageSignals(self, antenna_info):
"""Averages signal strengths for each antenna of a service.
The dividend is the sum of signal strengths during all scans.
The divisor is the number of times in the scan result.
If a service is not scannable, its average value will be None.
Args:
antenna_info: A dict of each antenna information of a service.
Returns:
A dict of average signal strength of each antenna.
{antenna1: signal1, antenna2: signal2}
"""
# keys are services and values are averages
average_results = {}
# Averages the scanned strengths
for antenna in _ANTENNA_CONFIG:
average_results[antenna] = 0
for signal in antenna_info:
average_results['all'] += signal[0]
average_results['main'] += signal[1][1]
average_results['aux'] += signal[2][1]
for antenna in _ANTENNA_CONFIG:
average_results[antenna] = (
float(average_results[antenna]) / len(antenna_info)
if antenna_info else None)
return average_results
def _CheckSpec(self, service, spec_antenna_strength, average_signal):
"""Checks if the scan result of antenna config can meet test spec.
Args:
service: (service_ssid, freq, password) tuple.
spec_antenna_strength: A dict of minimal signal strengths.
average_signal: A dict of average signal strength of each service in
service. {service: {antenna1: signal1, antenna2: signal2}}
"""
for antenna in _ANTENNA_CONFIG:
if spec_antenna_strength.get(antenna) is None:
continue
spec_strength = spec_antenna_strength[antenna]
scanned_strength = average_signal[service][antenna]
if not scanned_strength:
self.FailTask(
'Antenna %s, service: %s: Can not scan signal strength.' %
(antenna, service))
event_log.Log(
'antenna_%s' % antenna, freq=service.freq,
rssi=scanned_strength,
meet=(scanned_strength and scanned_strength >= spec_strength))
with self._antenna_group_checker:
testlog.LogParam('antenna', antenna)
testlog.LogParam('freq', service.freq)
result = testlog.CheckNumericParam('strength', scanned_strength,
min=spec_strength)
if not result:
self.FailTask(
'Antenna %s, service: %s: The scanned strength %f < spec strength'
' %f' % (antenna, service, scanned_strength, spec_strength))
else:
session.console.info(
'Antenna %s, service: %s: The scanned strength %f > spec strength'
' %f', antenna, service, scanned_strength, spec_strength)
def _ScanAllServices(self):
self.ui.SetState(_('Checking frequencies...'))
scan_result = IwScan(
self._dut, self._iw_scan_group_checker, self.args.device_name)
ssid_freqs = {service.ssid: set() for service in self._services}
for scanned_service in scan_result:
if scanned_service.ssid in ssid_freqs:
ssid_freqs[scanned_service.ssid].add(scanned_service.freq)
for service in self._services:
if not ssid_freqs[service.ssid]:
self.FailTask('The service %s is not found.' % service.ssid)
elif service.freq is None:
if len(ssid_freqs[service.ssid]) > 1:
self.FailTask('There are more than one frequencies (%r) for ssid %s. '
'Please specify the frequency explicity.' %
(ssid_freqs[service.ssid], service.ssid))
service.freq = ssid_freqs[service.ssid].pop()
elif service.freq not in ssid_freqs[service.ssid]:
self.FailTask('Frequency %s is not supported by the service %s. '
'Available frequencies are %r.' %
(service.freq, service.ssid, ssid_freqs[service.ssid]))
def runTest(self):
self._phy_name = self._DetectPhyName()
session.console.info('phy name is %s.', self._phy_name)
if self.args.press_space_to_start:
self.ui.SetState(_('Press space to start scanning.'))
self.ui.WaitKeysOnce(test_ui.SPACE_KEY)
self._ScanAllServices()
average_signal = {}
for service in self._services:
signals = self._ScanSignal(service, self.args.scan_count)
average_signal[service] = self._AverageSignals(signals)
# Gets the service with the largest strength to test for each spec.
test_service = self._ChooseMaxStrengthService(average_signal)
if test_service is None:
self.FailTask('Services %s are not valid.' % self.args.services)
else:
self._CheckSpec(test_service, self.args.strength, average_signal)