blob: 6e2762e13e921cb57a40e8bfe56b342242e65191 [file] [log] [blame]
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A factory test for checking Wifi antenna.
The test accepts a dict of wireless specs.
Each spec contains candidate services and the signal constraints.
For each spec, the test will connect to AP first.
And scan the signal quality to get signal strength for all antennas.
Then the test checks signal quality.
Be sure to set AP correctly.
1. Select one fixed channel instead of auto.
2. Disable the TX power control in AP.
3. Make sure SSID of AP is unique.
This test case can be used for Intel WP2 7260 chip.
"""
import collections
import dbus
import logging
import re
import struct
import sys
import threading
import time
import unittest
import factory_common # pylint: disable=unused-import
from cros.factory.test import event_log
from cros.factory.test import factory
from cros.factory.test.i18n import test_ui as i18n_test_ui
from cros.factory.test import test_ui
from cros.factory.test import ui_templates
from cros.factory.utils.arg_utils import Arg
from cros.factory.utils import net_utils
from cros.factory.utils import process_utils
try:
sys.path.append('/usr/local/lib/flimflam/test')
import flimflam
except ImportError:
pass
_DEFAULT_WIRELESS_TEST_CSS = '.wireless-info {font-size: 2em;}'
_MSG_SWITCHING_AP = lambda ap: i18n_test_ui.MakeI18nLabelWithClass(
'Switching to AP {ap}: ', 'wireless-info', ap=ap)
_MSG_SCANNING = lambda device, freq: i18n_test_ui.MakeI18nLabelWithClass(
'Scanning on device {device} frequency {freq}...',
'wireless-info', device=device, freq=freq)
_MSG_SCANNING_DONE = lambda device, freq: i18n_test_ui.MakeI18nLabelWithClass(
'Done scanning on device {device} frequency {freq}...',
'wireless-info', device=device, freq=freq)
_MSG_SPACE = i18n_test_ui.MakeI18nLabelWithClass(
'Press space to start scanning.', 'wireless-info')
_MSG_PRECHECK = i18n_test_ui.MakeI18nLabelWithClass(
'Checking frequencies...', 'wireless-info')
_RE_IWSCAN = re.compile(r'freq: (\d+).*SSID: (.+)$')
_RE_WIPHY = re.compile(r'wiphy (\d+)')
_RE_BEACON = re.compile(r'(\d+) MHz.*Beacon \((.+)\)')
_ANTENNA_CONFIG = ['all', 'main', 'aux']
def FlimGetService(flim, name):
"""Get service by property.
Args:
flim: flimflam object
name: property name
"""
timeout = time.time() + 10
while time.time() < timeout:
service = flim.FindElementByPropertySubstring('Service', 'Name', name)
if service:
return service
time.sleep(0.5)
def FlimGetServiceProperty(service, prop):
"""Get property from a service.
Args:
service: flimflam service object
prop: property name
"""
timeout = time.time() + 10
while time.time() < timeout:
try:
properties = service.GetProperties()
except dbus.exceptions.DBusException as e:
logging.exception('Error reading service property')
time.sleep(1)
else:
return properties[prop]
raise e
def FlimConfigureService(flim, ssid, password):
"""Config wireless ssid and password.
Args:
ssid: ssid name
password: wifi key to authenticate
"""
wlan_dict = {
'Type': dbus.String('wifi', variant_level=1),
'Mode': dbus.String('managed', variant_level=1),
'AutoConnect': dbus.Boolean(False, variant_level=1),
'SSID': dbus.String(ssid, variant_level=1)}
if password:
wlan_dict['Security'] = dbus.String('psk', variant_level=1)
wlan_dict['Passphrase'] = dbus.String(password, variant_level=1)
flim.manager.ConfigureService(wlan_dict)
def IwScan(devname, sleep_retry_time_secs=2, max_retries=10):
"""Scans on device.
Args:
devname: device name.
sleep_retry_time_secs: The sleep time before a retry.
max_retries: The maximum retry time to scan.
Returns:
A list of (ssid, frequency) tuple.
Raises:
IwException if fail to scan for max_retries tries,
or fail because of reason other than device or resource busy (-16)
"""
cmd = r"iw %s scan | grep -e 'freq\|SSID' | sed 'N;s/\n/ /'" % devname
try_count = 0
scan_result = []
while try_count < max_retries:
process = process_utils.Spawn(
cmd, read_stdout=True, log_stderr_on_error=True, log=True, shell=True)
stdout, stderr = process.communicate()
retcode = process.returncode
event_log.Log('iw_scaned', retcode=retcode, stderr=stderr)
if retcode == 0:
for line in stdout.splitlines():
m = _RE_IWSCAN.search(line)
if m:
scan_result.append((m.group(2), m.group(1)))
if len(scan_result) == 0:
try_count += 1
time.sleep(sleep_retry_time_secs)
continue
logging.info('IwScan success.')
return scan_result
elif retcode == 240: # Device or resource busy (-16)
try_count += 1
time.sleep(sleep_retry_time_secs)
elif retcode == 234: # Invalid argument (-22)
raise Exception('Failed to iw scan, ret code: %d. stderr: %s'
'Frequency might be wrong.' %
(retcode, stderr))
else:
raise Exception('Failed to iw scan, ret code: %d. stderr: %s' %
(retcode, stderr))
raise Exception('Failed to iw scan for %s tries' % max_retries)
class RadiotapPacket(object):
FIELD = collections.namedtuple('Field', ['name', 'struct', 'align'])
ANTENNA_SIGNAL_FIELD = FIELD('Antenna Signal', struct.Struct('b'), 0)
ANTENNA_INDEX_FIELD = FIELD('Antenna Index', struct.Struct('B'), 0)
EXTENDED_BIT = 31
FIELDS = [
FIELD('TSFT', struct.Struct('Q'), 8),
FIELD('Flags', struct.Struct('B'), 0),
FIELD('Rate', struct.Struct('B'), 0),
FIELD('Channel', struct.Struct('HH'), 2),
FIELD('FHSS', struct.Struct('BB'), 0),
ANTENNA_SIGNAL_FIELD,
FIELD('Antenna Noise', struct.Struct('b'), 0),
FIELD('Lock Quality', struct.Struct('H'), 2),
FIELD('TX Attenuation', struct.Struct('H'), 2),
FIELD('dB TX Attenuation', struct.Struct('H'), 2),
FIELD('dBm TX Power', struct.Struct('b'), 1),
ANTENNA_INDEX_FIELD,
FIELD('dB Antenna Signal', struct.Struct('B'), 0),
FIELD('dB Antenna Noise', struct.Struct('B'), 0),
FIELD('RX Flags', struct.Struct('H'), 2),
FIELD('MCS', struct.Struct('BBB'), 1),
FIELD('AMPDU status', struct.Struct('IHBB'), 4),
FIELD('VHT', struct.Struct('HBBBBBBBBH'), 2)]
MAIN_HEADER_FORMAT = struct.Struct('BBhI')
PARSE_INFO = collections.namedtuple('AntennaData', ['header_size',
'data_bytes',
'antenna_offsets'])
# This is a variable-length header, but this is what we want to see.
EXPECTED_HEADER_FORMAT = struct.Struct(MAIN_HEADER_FORMAT.format + 'II')
@staticmethod
def decode(packet_bytes):
"""Returns signal strength data for each antenna.
Format is {all_signal, {antenna_index, antenna_signal}}.
"""
if len(packet_bytes) < RadiotapPacket.EXPECTED_HEADER_FORMAT.size:
return None
parts = RadiotapPacket.EXPECTED_HEADER_FORMAT.unpack_from(packet_bytes)
(_, _, _, present0, present1, present2) = parts
parse_info = RadiotapPacket.parse_header([present0, present1, present2])
required_bytes = parse_info.header_size + parse_info.data_bytes
if len(packet_bytes) < required_bytes:
return None
antenna_data = []
for datum in filter(bool, parse_info.antenna_offsets):
signal = datum.get(RadiotapPacket.ANTENNA_SIGNAL_FIELD)
if RadiotapPacket.ANTENNA_SIGNAL_FIELD not in datum:
continue
signal_offset = datum[RadiotapPacket.ANTENNA_SIGNAL_FIELD]
signal, = RadiotapPacket.ANTENNA_SIGNAL_FIELD.struct.unpack_from(
packet_bytes[(signal_offset + parse_info.header_size):])
if RadiotapPacket.ANTENNA_INDEX_FIELD in datum:
index_offset = datum[RadiotapPacket.ANTENNA_INDEX_FIELD]
index, = RadiotapPacket.ANTENNA_SIGNAL_FIELD.struct.unpack_from(
packet_bytes[(index_offset + parse_info.header_size):])
antenna_data.append((index, signal))
else:
antenna_data.append(signal)
return antenna_data
@staticmethod
def parse_header(field_list):
"""Returns packet information of the radiotap header should have."""
header_size = RadiotapPacket.MAIN_HEADER_FORMAT.size
data_bytes = 0
antenna_offsets = []
for _, bitmask in enumerate(field_list):
antenna_offsets.append({})
for bit, field in enumerate(RadiotapPacket.FIELDS):
if bitmask & (1 << bit):
if field.align and (data_bytes % field.align):
data_bytes += field.align - (data_bytes % field.align)
if (field == RadiotapPacket.ANTENNA_SIGNAL_FIELD or
field == RadiotapPacket.ANTENNA_INDEX_FIELD):
antenna_offsets[-1][field] = data_bytes
data_bytes += field.struct.size
if not bitmask & (1 << RadiotapPacket.EXTENDED_BIT):
break
header_size += 4
else:
raise NotImplementedError('Packet has too many extensions for me!')
# Offset the antenna fields by the header size.
return RadiotapPacket.PARSE_INFO(header_size, data_bytes, antenna_offsets)
class Capture(object):
"""Context for a live tcpdump packet capture for beacons."""
def __init__(self, device_name, phy):
self.monitor_process = None
self.created_device = None
self.parent_device = device_name
self.phy = phy
def create_device(self, monitor_device='antmon0'):
"""Creates a monitor device to monitor beacon."""
process_utils.Spawn(['iw', self.parent_device, 'interface', 'add',
monitor_device, 'type', 'monitor'], check_call=True)
self.created_device = monitor_device
def remove_device(self, device_name):
"""Removes monitor device."""
process_utils.Spawn(['iw', device_name, 'del'], check_call=True)
def get_signal(self):
"""Gets signal from tcpdump."""
while True:
line = self.monitor_process.stdout.readline()
m = _RE_BEACON.search(line)
if m:
freq = int(m.group(1))
ssid = m.group(2)
break
packet_bytes = ''
while True:
line = self.monitor_process.stdout.readline()
if not line.startswith('\t0x'):
break
# Break up lines of the form "\t0x0000: abcd ef" into a string
# "\xab\xcd\exef".
parts = line[3:].split()
for part in parts[1:]:
packet_bytes += chr(int(part[:2], 16))
if len(part) > 2:
packet_bytes += chr(int(part[2:], 16))
packet = RadiotapPacket.decode(packet_bytes)
if packet:
return {'ssid': ssid, 'freq': freq, 'signal': packet}
def set_beacon_filter(self, value):
"""Sets beacon filter.
This function may only for Intel WP2 7260 chip.
"""
with open('/sys/kernel/debug/ieee80211/%s/netdev:%s/iwlmvm/bf_params' %
(self.phy, self.parent_device), 'w') as f:
f.write('bf_enable_beacon_filter=%d\n' % value)
def __enter__(self):
if not self.created_device:
self.create_device()
process_utils.Spawn(
['ip', 'link', 'set', self.created_device, 'up'], check_call=True)
process_utils.Spawn(
['iw', self.parent_device, 'set', 'power_save', 'off'], check_call=True)
self.set_beacon_filter(0)
self.monitor_process = process_utils.Spawn(
['tcpdump', '-nUxxi', self.created_device, 'type', 'mgt',
'subtype', 'beacon'], stdout=process_utils.PIPE)
return self
def __exit__(self, exception, value, traceback):
self.monitor_process.kill()
self.set_beacon_filter(1)
if self.created_device:
self.remove_device(self.created_device)
class WirelessRadiotapTest(unittest.TestCase):
"""Basic wireless test class.
Properties:
_ui: Test ui.
_template: Test template.
_antenna: current antenna config.
_phy_name: wireless phy name to test.
_space_event: An event that space has been pressed. It will also be set
if test has been done.
_done: An event that test has been done.
"""
ARGS = [
Arg('device_name', str,
'Wireless device name to test. '
'Set this correctly if check_antenna is True.', default='wlan0'),
Arg('services', list,
'A list of (service_ssid, freq, password) tuples like '
'``[(SSID1, FREQ1, PASS1), (SSID2, FREQ2, PASS2), '
'(SSID3, FREQ3, PASS3)]``. The test will only check the service '
'whose antenna_all signal strength is the largest. For example, if '
'(SSID1, FREQ1, PASS1) has the largest signal among the APs, '
'then only its results will be checked against the spec values.',
optional=False),
Arg('strength', dict,
'A dict of minimal signal strengths. For example, a dict like '
'``{"main": strength_1, "aux": strength_2, "all": strength_all}``. '
'The test will check signal strength according to the different '
'antenna configurations in this dict.',
optional=False),
Arg('scan_count', int,
'Number of scans to get average signal strength.', default=5),
Arg('switch_antenna_sleep_secs', int,
'The sleep time after switchingantenna and ifconfig up. Need to '
'decide this value carefully since itdepends on the platform and '
'antenna config to test.', default=10),
Arg('press_space_to_start', bool,
'Press space to start the test.', default=True)]
def setUp(self):
self._ui = test_ui.UI()
self._template = ui_templates.OneSection(self._ui)
self._ui.AppendCSS(_DEFAULT_WIRELESS_TEST_CSS)
self._phy_name = self.DetectPhyName()
logging.info('phy name is %s.', self._phy_name)
self._space_event = threading.Event()
self._done = threading.Event()
net_utils.Ifconfig(self.args.device_name, True)
self._flim = flimflam.FlimFlam(dbus.SystemBus())
self._connect_service = None
def tearDown(self):
self.DisconnectService()
def ConnectService(self, service_name, password):
"""Associates a specified wifi AP.
Password can be '' or None.
"""
self._connect_service = FlimGetService(self._flim, service_name)
if self._connect_service is None:
factory.console.info('Unable to find service %s', service_name)
return False
if FlimGetServiceProperty(self._connect_service, 'IsActive'):
logging.warning('Already connected to %s', service_name)
else:
logging.info('Connecting to %s', service_name)
FlimConfigureService(self._flim, service_name, password)
success, diagnostics = self._flim.ConnectService(
service=self._connect_service)
if not success:
factory.console.info('Unable to connect to %s, diagnostics %s',
service_name, diagnostics)
return False
else:
factory.console.info(
'Successfully connected to service %s', service_name)
return True
def DisconnectService(self):
"""Disconnect wifi AP."""
if self._connect_service:
self._flim.DisconnectService(service=self._connect_service)
factory.console.info(
'Disconnect to service %s',
FlimGetServiceProperty(self._connect_service, 'Name'))
self._connect_service = None
def DetectPhyName(self):
"""Detects the phy name for device_name device.
Returns:
The phy name for device_name device.
"""
output = process_utils.CheckOutput(
['iw', 'dev', self.args.device_name, 'info'])
logging.info('info output: %s', output)
m = _RE_WIPHY.search(output)
return ('phy' + m.group(1)) if m else None
def ChooseMaxStrengthService(self, services, service_strengths):
"""Chooses the service that has the largest signal strength among services.
Args:
services: A list of services.
service_strengths: A dict of strengths of each service.
Returns:
The service that has the largest signal strength among services.
"""
max_strength_service, max_strength = None, -sys.float_info.max
for service in services:
strength = service_strengths[service]['all']
if strength:
factory.console.info('Service %s signal strength %f.', service,
strength)
event_log.Log('service_signal', service=service, strength=strength)
if strength > max_strength:
max_strength_service, max_strength = service, strength
else:
factory.console.info('Service %s has no valid signal strength.',
service)
if max_strength_service:
logging.info('Service %s has the highest signal strength %f among %s.',
max_strength_service, max_strength, services)
return max_strength_service
else:
logging.warning('Services %s are not valid.', services)
def ScanSignal(self, service, times=3):
"""Scans antenna signal strengths for a specified service.
Device should connect to the service before starting to capture signal.
Signal result only includes antenna information of this service
(ssid, freq).
Args:
service: (service_ssid, freq, password) tuple.
times: Number of times to scan to get average.
Returns:
A list of signal result.
"""
signal_list = []
(ssid, freq, password) = service
self._template.SetState(_MSG_SWITCHING_AP(ssid))
result = self.ConnectService(ssid, password)
if result is False:
return []
self._template.SetState(_MSG_SCANNING(self.args.device_name, freq))
with Capture(self.args.device_name, self._phy_name) as capture:
capture_times = 0
while capture_times < times:
signal_result = capture.get_signal()
if signal_result['ssid'] == ssid and signal_result['freq'] == freq:
logging.info('%s', signal_result)
signal_list.append(signal_result['signal'])
capture_times += 1
self._template.SetState(
_MSG_SCANNING_DONE(self.args.device_name, freq))
self.DisconnectService()
return signal_list
def AverageSignals(self, antenna_info):
"""Averages signal strengths for each antenna of a service.
The dividend is the sum of signal strengths during all scans.
The divisor is the number of times in the scan result.
If a service is not scannable, its average value will be None.
Args:
antenna_info: A dict of each antenna information of a service.
Returns:
A dict of average signal strength of each antenna.
{antenna1: signal1, antenna2: signal2}
"""
# keys are services and values are averages
average_results = {}
# Averages the scanned strengths
for antenna in _ANTENNA_CONFIG:
average_results[antenna] = 0
for signal in antenna_info:
average_results['all'] += signal[0]
average_results['main'] += signal[1][1]
average_results['aux'] += signal[2][1]
for antenna in _ANTENNA_CONFIG:
average_results[antenna] = (
float(average_results[antenna]) / len(antenna_info)
if len(antenna_info) else None)
return average_results
def CheckSpec(self, service, spec_antenna_strength, average_signal):
"""Checks if the scan result of antenna config can meet test spec.
Args:
service: (service_ssid, freq, password) tuple.
spec_antenna_strength: A dict of minimal signal strengths.
average_signal: A dict of average signal strength of each service in
service. {service: {antenna1: signal1, antenna2: signal2}}
"""
for antenna in _ANTENNA_CONFIG:
if spec_antenna_strength.get(antenna) is None:
continue
spec_strength = spec_antenna_strength[antenna]
scanned_strength = average_signal[service][antenna]
event_log.Log(
'antenna_%s' % antenna, freq=service[1], rssi=scanned_strength,
meet=(scanned_strength and scanned_strength > spec_strength))
if not scanned_strength:
self.fail(
'Antenna %s, service: %s: Can not scan signal strength.' %
(antenna, service))
if scanned_strength < spec_strength:
self.fail(
'Antenna %s, service: %s: The scanned strength %f < spec strength'
' %f' % (antenna, service, scanned_strength, spec_strength))
else:
factory.console.info(
'Antenna %s, service: %s: The scanned strength %f > spec strength'
' %f', antenna, service, scanned_strength, spec_strength)
def PreCheck(self, services):
"""Checks each service only has one frequency.
Args:
services: A list of (service_ssid, freq) tuples to scan.
"""
wireless_services = {}
self._template.SetState(_MSG_PRECHECK)
scan_result = IwScan(self.args.device_name)
set_all_ssids = set([service[0] for service in services])
for ssid, freq in scan_result:
if ssid in set_all_ssids:
if ssid not in wireless_services:
wireless_services[ssid] = freq
elif freq != wireless_services[ssid]:
self.fail('There are more than one frequencies for ssid %s.' % ssid)
def PromptSpace(self):
"""Prompts a message to ask operator to press space."""
self._template.SetState(_MSG_SPACE)
self._ui.BindKey(test_ui.SPACE_KEY, lambda _: self.OnSpacePressed())
self._ui.Run(blocking=False, on_finish=self.Done)
def Done(self):
"""The callback when ui is done.
This will be called when test is finished, or if operator presses
'Mark Failed'.
"""
self._done.set()
self._space_event.set()
def OnSpacePressed(self):
"""The handler of space key."""
logging.info('Space pressed by operator.')
self._space_event.set()
def runTest(self):
if self.args.press_space_to_start:
# Prompts a message to tell operator to press space key when ready.
self.PromptSpace()
self._space_event.wait()
if self._done.isSet():
return
self.PreCheck(self.args.services)
antenna_info = {}
for service in self.args.services:
antenna_info[service] = self.ScanSignal(service, self.args.scan_count)
average_signal = {}
for service, signals in antenna_info.iteritems():
average_signal[service] = self.AverageSignals(signals)
# Gets the service with the largest strength to test for each spec.
test_service = self.ChooseMaxStrengthService(self.args.services,
average_signal)
if test_service is None:
self.fail('Services %s are not valid.' % self.args.services)
else:
self.CheckSpec(test_service, self.args.strength, average_signal)