blob: 51d92be42eeec8e85936c7227115cb24c3bc9397 [file] [log] [blame]
#!/usr/bin/python
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""The testplan module.
The testplan module will go through the CSV file (the format of which is
defined below) and put them to a TestPlan object, which acts like a read-only
list of the TestCase.
The format of the CSV file.
The first column is '@config' is for the config.
The format: @config, key, value
The first column is '@title' is for the header containing the names of arguments
and result. There is another '@result' between the arguments and the results.
The format:
@title, arg_key1, arg_key2, ..., @result, result_key1, result_key2, ...
Other rows contain a test case. Before these row there should be a valid header.
The format:
, arg_value1, arg_value2, ...,, result_value1, result_value2, ...
We use the number sign(#) as the comment character.
If the first column starts from #, we ignore the rest of the line.
"""
import csv
import itertools
import graphyte_common # pylint: disable=unused-import
from graphyte.data_parser import ListChecker
from graphyte.data_parser import TupleChecker
from graphyte.data_parser import Parse
from graphyte.default_setting import logger
from graphyte.utils.graphyte_utils import AssertRangeContain
from graphyte.utils.graphyte_utils import IsInBound
from graphyte.utils.graphyte_utils import SearchConfig
CONFIG_MARKER = '@config'
TITLE_MARKER = '@title'
RESULT_MARKER = '@result'
HEADER_CHECKER = {
'order_by': ListChecker(['component_name', 'rf_type', 'test_type',
'center_freq', 'power_level'])}
SUPPORTED_RF_TYPES = ['WLAN', 'BLUETOOTH', '802_15_4']
SUPPORTED_TEST_TYPES = ['TX', 'RX']
class TestPlan(object):
"""Generates the test plan from CSV file.
The class parses the CSV file and generates a list of test cases.
Usage:
test_plan = TestPlan(config_file)
if not test_plan:
print 'Error: Failed to load the config file.'
for test_case in test_plan:
print 'test_case: %s' % str(test_case)
"""
def __init__(self, config_file=None, search_dirs=None):
self.test_cases = []
self.test_config = {}
if config_file:
self.LoadConfigFile(config_file, search_dirs)
def __len__(self):
"""Returns the count of the test cases.
we implement __len__ and __getitem__ to make TestPlan behave like a
read-only list.
"""
return len(self.test_cases)
def __getitem__(self, index):
return self.test_cases[index]
def LoadConfigFile(self, config_file, search_dirs=None):
try:
self.ParseCSVFile(config_file, search_dirs)
except ValueError:
logger.exception('Parsing csv file %s error.', config_file)
self.test_cases = []
order_by = self.test_config.get('order_by', None)
if order_by is not None:
cmp_key = lambda test_case: test_case.ResolveAttr(order_by)
self.test_cases.sort(key=cmp_key)
def ParseCSVFile(self, config_file, search_dirs=None):
def _LogErrorAndRaise(row_idx, msg):
msg = 'Row %d: %s' % (row_idx, msg)
logger.error(msg)
raise ValueError(msg)
logger.info('Loading test config file: %s', config_file)
with open(SearchConfig(config_file, search_dirs)) as csv_file:
reader = csv.reader(csv_file)
header = None
split_idx = None
for row_idx, row in enumerate(reader, start=1):
row = StripRow(row)
if len(row) == 0:
continue
if row[0].startswith('#'):
continue
# General config starts with '@config'
elif row[0] == CONFIG_MARKER:
if len(row) != 3:
_LogErrorAndRaise(row_idx,
'The format of config row should be: '
'@config, <key>, <value>')
key, value = row[1], row[2]
data = Parse(value, HEADER_CHECKER.get(key))
self.test_config[key] = data
logger.debug('test_config: %s', self.test_config)
# Header starts with '@title', arguments and results is splitted by
# '@result'
elif row[0] == TITLE_MARKER:
header = row[1:]
try:
split_idx = header.index(RESULT_MARKER)
except ValueError:
_LogErrorAndRaise(row_idx, '%s is missing' % RESULT_MARKER)
arg_header, result_header = header[:split_idx], header[split_idx+1:]
# Each line represents a group of test cases
else:
if header is None:
_LogErrorAndRaise(
row_idx, 'There should be valid title before content.')
value = row[1:]
if len(value) != len(header):
_LogErrorAndRaise(
row_idx, 'The length of data is different from the header.')
try:
arg_data = [Parse(data) for data in value[:split_idx]]
arg_table = dict(zip(arg_header, arg_data))
result_data = [Parse(data) for data in value[split_idx+1:]]
result_limit = dict(zip(result_header, result_data))
except ValueError as ex:
_LogErrorAndRaise(row_idx, str(ex))
# Generate the test cases with all combination of arguments
for key in arg_table:
if not isinstance(arg_table[key], list):
arg_table[key] = [arg_table[key]]
arg_keys = arg_table.keys()
logger.debug('arg_keys: %s', arg_keys)
for arg_values in itertools.product(*arg_table.values()):
logger.debug('arg_values: %s', arg_values)
args = dict(zip(arg_keys, arg_values))
try:
self.test_cases.extend(CreateTestCase(args, result_limit))
except ValueError as ex:
_LogErrorAndRaise(row_idx, str(ex))
def CreateTestCase(args, result_limit):
"""Creates the test case according to the RF type.
Since the verification and the generation of the test case for each RF type
are slightly different, we use the simple factory pattern to create the test
case.
Args:
args: the arguments of the test case.
result_limit: the result limit of the test case.
Return:
a list of the test case.
"""
factory_map = {
'WLAN': WLANTestCaseFactory(),
'BLUETOOTH': BluetoothTestCaseFactory(),
'802_15_4': ZigbeeTestCaseFactory()}
try:
factory = factory_map[args['rf_type']]
except KeyError:
raise ValueError('Invalid args %s: unsupported RF type.' % args)
return factory.Create(args, result_limit)
class TestCaseFactory(object):
"""The abstract class for generating a test case of the certain RF type.
The class is mainly for verifying the arguments and the result limit are
valid, and then creating the test case.
"""
def Create(self, args, result_limit):
self._VerifyArgAndResult(args, result_limit)
return self._Create(args, result_limit)
def _VerifyArgAndResult(self, args, result_limit):
"""Verifies the test case is valid or not.
1. Check the RF type is correct.
2. Check all required arguments exist and are valid.
3. Check there is at least one result and all results are valid.
Raises:
KeyError if the required arguments are missing.
ValueError if the test case is not valid.
"""
raise NotImplementedError
def _Create(self, args, result_limit):
name = self._GenerateCanonicalName(args)
return [TestCase(name, args, result_limit)]
@property
def canonical_arg_order(self):
"""The argument order of the canonical name.
Returns:
a list of the argument name.
"""
raise NotImplementedError
def _GenerateCanonicalName(self, args):
arg_values = [args[key] for key in self.canonical_arg_order if key in args]
return ' '.join(map(str, arg_values))
@staticmethod
def VerifyArguments(checker_dict, args):
"""Verifies all arguments are valid.
If the keys of args are not the same as checker_dict, or there is any value
is not valid, then raise the ValueError exception.
Args:
checker_dict: a dict which keys are the valid args and values are
corresponding data checkers. For example:
{
'rf_type': ListChecker(['WLAN', 'BLUETOOTH', '802_15_4']),
'test_type': ListChecker(['TX', 'RX'])
}
args: the arguments of the test case.
Raises:
ValueError if args does not pass the verification.
"""
if set(args.keys()) != set(checker_dict.keys()):
missing_keys = list(set(checker_dict.keys()) - set(args.keys()))
unknown_keys = list(set(args.keys()) - set(checker_dict.keys()))
err_str = 'The keys of arguments are not valid.'
if missing_keys:
err_str += ' The keys %s are missing.' % (missing_keys)
if unknown_keys:
err_str += ' The keys %s are unknown.' % (unknown_keys)
raise ValueError(err_str)
for key, value in args.items():
if not checker_dict[key].CheckData(value):
raise ValueError('"%s" is not valid for %s.' % (value, key))
@staticmethod
def _VerifyResults(result_list, result_limit):
"""Verifies all results are valid.
If there is key of result_limit not in result_list, or the value is not
valid, then raise the ValueError exception.
Args:
result_list: a list of all valid result keys.
result_limit: the result limit of the test case.
Raises:
ValueError if data is not passed the verification.
"""
result_checker = TupleChecker(([None, int, float], # lower bound
[None, int, float])) # upper bound
if not result_limit:
raise ValueError('There is no result.')
unknown_keys = set(result_limit.keys()) - set(result_list)
if unknown_keys:
raise ValueError('The keys %s are unknown.' % list(unknown_keys))
for key, value in result_limit.items():
if not result_checker.CheckData(value):
raise ValueError('"%s" is not valid for %s.' % (value, key))
class WLANTestCaseFactory(TestCaseFactory):
RF_TYPE = 'WLAN'
# The limitations are from ieee 802.11-2016. The title is Part 11: Wireless
# LAN Medium Access Control (MAC) and Physical Layer (PHY) Specifications.
# B is in chapter 15 and chapter 16.
# A is in chapter 17.
# G is in chapter 18.
# N is in chapter 19.
# AC is in chapter 21.
# 16.3.6.5 and 16.3.6.6.
# 17.2.2.3 and 17.2.3.4. Note that Graphyte does not support 10MHz or 5MHz.
# 19.3.5. Note that Graphyte does not support UEQM so no MCS[33,76].
# 21.3.5.
DATA_RATE_A_G = [6, 9, 12, 18, 24, 36, 48, 54]
DATA_RATE_AC = ['MCS' + str(i) for i in range(10)]
DATA_RATE_B = [1, 2, 5.5, 11]
DATA_RATE_N = ['MCS' + str(i) for i in range(32)]
REQUIRED_ARG_CHECKER = {
'rf_type': ListChecker([RF_TYPE]),
'component_name': ListChecker([str]),
'test_type': ListChecker(SUPPORTED_TEST_TYPES),
'center_freq': ListChecker([lambda freq: 2400 < freq < 6000]),
'power_level': ListChecker([int, float, 'auto']),
'standard': ListChecker(['A', 'AC', 'B', 'G', 'N']),
'bandwidth': ListChecker([20, 40, 80, 160]),
'data_rate': ListChecker(DATA_RATE_B +
DATA_RATE_A_G +
DATA_RATE_N +
DATA_RATE_AC),
'chain_mask': ListChecker(range(1, 16)), # 4x4 MIMO supported
'nss': ListChecker([None, 1, 2, 3, 4]),
'long_preamble': ListChecker([0, 1])}
STANDARD_ARG_CHECKERS = {
'A': {'data_rate': ListChecker(DATA_RATE_A_G),
'bandwidth': ListChecker([20]),
'nss': ListChecker([None])},
'AC': {'data_rate': ListChecker(DATA_RATE_AC),
'bandwidth': ListChecker([20, 40, 80, 160]),
'nss': ListChecker([1, 2, 3, 4])},
'B': {'data_rate': ListChecker(DATA_RATE_B),
'bandwidth': ListChecker([20]),
'nss': ListChecker([None])},
'G': {'data_rate': ListChecker(DATA_RATE_A_G),
'bandwidth': ListChecker([20]),
'nss': ListChecker([None])},
'N': {'data_rate': ListChecker(DATA_RATE_N),
'bandwidth': ListChecker([20, 40]),
'nss': ListChecker([None])}}
# The indices of different modulation scheme.
MODULATION_INDEX = {
'A': dict(zip(DATA_RATE_A_G, range(1, 9))),
'AC': dict(zip(DATA_RATE_AC, [1, 3, 4, 5, 6, 7, 8, 9, 10, 11])),
'B': {data_rate: 0 for data_rate in DATA_RATE_B},
'G': dict(zip(DATA_RATE_A_G, range(1, 9))),
'N': {'MCS' + str(i): [1, 3, 4, 5, 6, 7, 8, 9][i % 8]
for i in range(32)}}
# 15.4.5.10 Transmit modulation accuracy
# 17.3.9.7.4 Transmitter constellation error
# 19.3.18.7.3 Transmitter constellation error
# 21.3.17.4.3 Transmitter constellation error
STANDARD_EVM_UPPER_BOUNDS = [-9.1, -5, -8, -10, -13, -16, -19, -22, -25, -27,
-30, -32]
# 15.4.6.2 Receiver minimum input level sensitivity
# 16.3.8.2 Receiver minimum input level sensitivity
# 17.3.10.2 Receiver minimum input sensitivity
# 19.3.19.1 Receiver minimum input sensitivity
# 21.3.18.1 Receiver minimum input sensitivity
STANDARD_SENSITIVITY = {
20: [-76, -82, -81, -79, -77, -74, -70, -66, -65, -64, -59, -57],
40: [None, -79, None, -76, -74, -71, -67, -63, -62, -61, -56, -54],
80: [None, -76, None, -73, -71, -68, -64, -60, -59, -58, -53, -51],
160: [None, -73, None, -70, -68, -65, -61, -57, -56, -55, -50, -48]}
RX_ARG_CHECKER = {
'rx_num_packets': ListChecker([int])}
TX_RESULTS = ['evm', 'avg_power', 'freq_error', 'lo_leakage', 'mask_margin',
'spectral_flatness', 'obw', 'phase_noise']
RX_RESULTS = ['rx_per']
# 15.4.5.6 Transmit center frequency tolerance
# 17.3.9.5 Transmit center frequency tolerance
# 19.3.18.4 Transmit center frequency tolerance
# 21.3.17.3 Transmit center frequency and symbol clock frequency tolerance
STANDARD_FREQ_ERROR = {
'A': (-20, 20),
'AC': (-20, 20),
'B': (-25, 25),
'G': (-20, 20),
'N': (-20, 20),
'N5': (-25, 25)}
# 15.4.5.9 RF carrier suppression
# 17.3.9.7.2 Transmitter center frequency leakage
# 19.3.18.7.2 Transmit center frequency leakage
# 21.3.17.4.2 Transmit center frequency leakage
STANDARD_LO_LEAKAGE = {
'A': {20: (None, -15.0)},
'AC': {20: (None, -17.5), 40: (None, -20.6),
80: (None, -23.8), 160: (None, -26.8)},
'B': {20: (None, -15.0)},
'G': {20: (None, -15.0)},
'N': {20: (None, -15.0), 40: (None, -20.0)}}
# 15.4.6.2 Receiver minimum input level sensitivity
# 16.3.8.2 Receiver minimum input level sensitivity
# 17.3.10.2 Receiver minimum input sensitivity
# 19.3.19.1 Receiver minimum input sensitivity
# 21.3.18.1 Receiver minimum input sensitivity
STANDARD_RX_PER = {
'A': (0, 10),
'AC': (0, 10),
'B': (0, 8),
'G': (0, 10),
'N': (0, 10)}
def _VerifyArgAndResult(self, args, result_limit):
# Check the RF type is valid.
if self.RF_TYPE != args['rf_type']:
raise ValueError('%s is not a valid RF type.' % args['rf_type'])
# Check all required arguments are valid.
arg_checker = self.REQUIRED_ARG_CHECKER.copy()
if args['test_type'] == 'RX':
arg_checker.update(self.RX_ARG_CHECKER)
TestCaseFactory.VerifyArguments(arg_checker, args)
# Check if the arguments are valid in the specified standard.
bandwidth = args['bandwidth']
center_freq = args['center_freq']
data_rate = args['data_rate']
standard = args['standard']
TestCaseFactory.VerifyArguments(
self.STANDARD_ARG_CHECKERS[standard],
{'data_rate': data_rate,
'bandwidth': bandwidth,
'nss': args['nss']})
modulation_index = self.MODULATION_INDEX[standard][data_rate]
if args['test_type'] == 'RX':
standard_limit = (None,
self.STANDARD_SENSITIVITY[bandwidth][modulation_index])
AssertRangeContain('standard %s with data rate %s and bandwidth %d'
% (standard, data_rate, bandwidth),
'power_level',
standard_limit,
(None, args['power_level']))
# Check there is result and all results are valid.
if args['test_type'] == 'TX':
result_list = self.TX_RESULTS
else:
result_list = self.RX_RESULTS
TestCaseFactory._VerifyResults(result_list, result_limit)
# Check if the results are valid in the specified standard.
if 'evm' in result_limit:
AssertRangeContain('standard %s with data rate %s'
% (standard, data_rate),
'evm',
(None,
self.STANDARD_EVM_UPPER_BOUNDS[modulation_index]),
result_limit['evm'])
if 'freq_error' in result_limit:
index = standard if standard != 'N' or center_freq < 3000 else 'N5'
AssertRangeContain('standard %s with frequency %f'
% (standard, center_freq),
'freq_error',
self.STANDARD_FREQ_ERROR[index],
result_limit['freq_error'])
if 'lo_leakage' in result_limit:
AssertRangeContain('standard %s with bandwidth %s'
% (standard, bandwidth),
'lo_leakage',
self.STANDARD_LO_LEAKAGE[standard][bandwidth],
result_limit['lo_leakage'])
if 'obw' in result_limit:
AssertRangeContain('all conditions',
'obw',
(0, None),
result_limit['obw'])
if 'rx_per' in result_limit:
AssertRangeContain('standard %s' % standard,
'rx_per',
self.STANDARD_RX_PER[standard],
result_limit['rx_per'])
@property
def canonical_arg_order(self):
return [
'rf_type', 'component_name', 'test_type', 'center_freq',
'standard', 'data_rate', 'bandwidth', 'chain_mask']
def _GenerateCanonicalName(self, args):
args = args.copy() # do not change the original argument.
args['bandwidth'] = 'BW-' + str(args['bandwidth'])
chain_list = ChainMaskToList(args['chain_mask'])
args['chain_mask'] = 'ANTENNA-' + ''.join(map(str, chain_list))
return super(WLANTestCaseFactory, self)._GenerateCanonicalName(args)
class BluetoothTestCaseFactory(TestCaseFactory):
RF_TYPE = 'BLUETOOTH'
# The standard test limits are from
# https://www.bluetooth.com/specifications/qualification-test-requirements/.
# Current reference revisions are RF.TS.5.1.0 and RF-PHY.TS.5.1.1.
# RF.TS.5.1.0 describes tests for BDR or EDR.
# RF-PHY.TS.5.1.1 describes tests for LE.
# Each test is named by something like RF-PHY/TRM/BV-03-C.
REQUIRED_ARG_CHECKER = {
'rf_type': ListChecker([RF_TYPE]),
'component_name': ListChecker([str]),
'test_type': ListChecker(SUPPORTED_TEST_TYPES),
'center_freq': ListChecker([lambda freq: 2400 < freq < 2500]),
'power_level': ListChecker([int, float, 'auto']),
'packet_type': ListChecker(['1DH1', '1DH3', '1DH5', '2DH1', '2DH3',
'2DH5', '3DH1', '3DH3', '3DH5', 'LE'])}
LE_RX_ARG_CHECKER = {
'rx_num_packets': ListChecker([int])}
BT_RX_ARG_CHECKER = {
'rx_num_bits': ListChecker([int])}
ADJACENT_CHANNELS = ['acp_5', 'acp_4', 'acp_3', 'acp_2', 'acp_1',
'acp0', 'acp1', 'acp2', 'acp3', 'acp4', 'acp5']
COMMON_RESULTS = ['avg_power'] + ADJACENT_CHANNELS
FORMAT_RESULT_CHECKER = {
'BDR': ['bandwidth_20db', 'freq_deviation', 'freq_drift', 'delta_f1_avg',
'delta_f2_avg', 'delta_f2_max', 'delta_f2_f1_avg_ratio'],
'EDR': ['freq_deviation', 'edr_evm_avg', 'edr_evm_peak',
'edr_extreme_omega_0', 'edr_extreme_omega_i0', 'edr_omega_i',
'edr_power_diff', 'edr_prob_evm_99_pass'],
'LE': ['freq_offset', 'delta_f1_avg', 'delta_f2_avg',
'delta_f2_max', 'delta_f0_fn_max', 'delta_f1_f0',
'delta_fn_fn5_max', 'delta_f2_f1_avg_ratio']}
LE_RX_RESULTS = ['rx_per']
BT_RX_RESULTS = ['rx_ber']
# For a Bluetooth test case, each result needs different bit pattern.
# This dict records the relation between results and bit patterns.
# NOTE1: This dict only for BDR and LE. EDR only uses PRBS9 pattern.
# That is, EDR freq_deviation still uses PRBS9.
# NOTE2: delta_f2_f1_avg_ratio is the ratio of delta_f2_avg and delta_f1_avg,
# so it needs 'F0' and 'AA' patterns.
BIT_PATTERN_RESULTS = {
'PRBS9': ['avg_power', 'bandwidth_20db', 'freq_offset', 'rx_ber',
'rx_per'] + ADJACENT_CHANNELS,
'F0': ['freq_deviation', 'delta_f1_avg', 'delta_f2_f1_avg_ratio'],
'AA': ['freq_drift', 'delta_f2_avg', 'delta_f2_max', 'delta_f1_f0',
'delta_f2_f1_avg_ratio', 'delta_f0_fn_max', 'delta_fn_fn5_max']}
# The maximum number of bytes for each BDR and EDR packet type.
# In Rx test, the instrument should transmit the largest packet.
# Ref: Bluetooth Core Specification 4.0 Table 6.9: ACL packets.
MAX_PACKET_LENGTH = {
'1DH1': 27,
'1DH3': 183,
'1DH5': 339,
'2DH1': 54,
'2DH3': 367,
'2DH5': 679,
'3DH1': 83,
'3DH3': 552,
'3DH5': 1021}
# The limits of adjacent channel power are defined in RF-PHY/TRM/BV-03-C and
# RF/TRM/CA/BV-06-C.
ADJACENT_CHANNEL_POWER = {
'LE': dict(zip(
ADJACENT_CHANNELS,
[-30, -30, -30, -20, None, None, None, -20, -30, -30, -30])),
'BDR': dict(zip(
ADJACENT_CHANNELS,
[-40, -40, -40, -20, None, None, None, -20, -40, -40, -40])),
'EDR': dict(zip(
ADJACENT_CHANNELS,
[-40, -40, -40, -20, None, None, None, -20, -40, -40, -40]))}
def _DetermineFormat(self, packet_type):
"""Determines the bluetooth format of the packet type.
Args:
packet_type: the Bluetooth packet type.
Returns:
one of 'LE', 'BDR', or 'EDR'.
"""
if packet_type == 'LE':
return 'LE'
# The Bluetooth packet types are in the format: nXYm
# n indicates modulation type. 1 is 1Mbps (basic data rate).
# 2 is 2Mbps, 3 is 3Mbps modulation (enhanced data rate).
# Hence the packet type starts from '2' or '3' is EDR, otherwise is BDR.
if packet_type[0] in ['2', '3']:
return 'EDR'
return 'BDR'
def _VerifyArgAndResultStandardRX(self, packet_format, args, result_limit):
"""Check if Bluetooth RX test limits are better than standard."""
AssertRangeContain('all packets', 'power_level',
(None, -70), (None, args['power_level']))
if packet_format == 'LE':
# The limits of power_level, rx_num_packets, and rx_per for LE are defined
# in RF-PHY/RCV/BV-01-C.
AssertRangeContain(packet_format, 'rx_num_packets',
(1500, None), (args['rx_num_packets'], None))
AssertRangeContain(packet_format, 'rx_per',
(0, 30.8), result_limit['rx_per'])
elif packet_format == 'BDR':
# The limits of power_level, rx_num_bits, and rx_ber for BDR are defined
# in RF/RCV/CA/BV-02-C.
AssertRangeContain(packet_format, 'rx_num_bits',
(1600000, None), (args['rx_num_bits'], None))
AssertRangeContain(packet_format, 'rx_ber',
(0, 0.1), result_limit['rx_ber'])
else: # packet_format == EDR
# The limits of power_level, rx_num_bits, and rx_ber for EDR are defined
# in RF/RCV/CA/BV-07-C.
standard_ber_level_0 = 0.007
standard_ber_level_1 = 0.01
AssertRangeContain(packet_format, 'rx_ber',
(0, standard_ber_level_1), result_limit['rx_ber'])
if result_limit['rx_ber'][1] <= standard_ber_level_0:
message = 'EDR with rx_ber in [0, %f]' % standard_ber_level_0
standard_bits = (1600000, None)
else:
message = 'EDR with rx_ber in (%f, %f]' % (standard_ber_level_0,
standard_ber_level_1)
standard_bits = (16000000, None)
AssertRangeContain(message, 'rx_num_bits', standard_bits,
(args['rx_num_bits'], None))
def _VerifyArgAndResultStandardTX(self, packet_format, packet_type,
result_limit):
"""Check if Bluetooth TX test limits are better than standard."""
standard_acp_list = self.ADJACENT_CHANNEL_POWER[packet_format]
for channel, standard_acp in standard_acp_list.items():
if channel in result_limit:
AssertRangeContain(
packet_format, channel, (None, standard_acp), result_limit[channel])
STANDARD_BT_TX = {
# The limits of delta_f1_avg, delta_f2_max, and delta_f2_f1_avg_ratio
# are defined in RF-PHY/TRM/BV-05-C and RF/TRM/CA/BV-07-C.
'delta_f1_avg': (225, 275) if packet_format == 'LE' else (140, 175),
'delta_f2_max': (182, None) if packet_format == 'LE' else (115, None),
'delta_f2_f1_avg_ratio': (0.8, None),
# The limits of delta_f0_fn_max, delta_f1_f0, delta_fn_fn5_max, and
# freq_offset are defined in RF-PHY/TRM/BV-06-C.
'delta_f0_fn_max': (None, 50),
'delta_f1_f0': (None, 23),
'delta_fn_fn5_max': (None, 20),
'freq_offset': (-150, 150),
# The limits of freq_drift are defined in RF/TRM/CA/BV-09-C.
'freq_drift': (-25, 25) if packet_type == '1DH1' else (-40, 40),
# The limits of edr_power_diff are defined in RF/TRM/CA/BV-10-C.
'edr_power_diff': (-4, 1),
# The limits of edr_evm_avg, edr_evm_peak, edr_prob_evm_99_pass,
# edr_extreme_omega_0, edr_extreme_omega_i0, and edr_omega_i are defined
# in RF/TRM/CA/BV-11-C.
'edr_evm_avg': (0, 20) if packet_type[0] == '2' else (0, 13),
'edr_evm_peak': (0, 35) if packet_type[0] == '2' else (0, 25),
'edr_prob_evm_99_pass': (0, 30) if packet_type[0] == '2' else (0, 20),
'edr_extreme_omega_0': (-10, 10),
'edr_extreme_omega_i0': (-75, 75),
'edr_omega_i': (-75, 75),
# The limits of bandwidth_20db are defined in RF/TRM/CA/BV-05-C.
'bandwidth_20db': (0, 1.0)
}
for arg_name, limits in STANDARD_BT_TX.items():
if arg_name in result_limit:
AssertRangeContain(
packet_type, arg_name, limits, result_limit[arg_name])
def _VerifyArgAndResult(self, args, result_limit):
# Check the RF type is valid.
if self.RF_TYPE != args['rf_type']:
raise ValueError('%s is not a valid RF type.' % args['rf_type'])
# Check all required arguments are valid.
arg_checker = self.REQUIRED_ARG_CHECKER.copy()
test_type = args['test_type']
packet_type = args['packet_type']
packet_format = self._DetermineFormat(packet_type)
if test_type == 'RX':
if packet_format == 'LE':
arg_checker.update(self.LE_RX_ARG_CHECKER)
else:
arg_checker.update(self.BT_RX_ARG_CHECKER)
TestCaseFactory.VerifyArguments(arg_checker, args)
# Check there is result and all results are valid.
if test_type == 'TX':
result_list = (self.COMMON_RESULTS +
self.FORMAT_RESULT_CHECKER[packet_format])
elif packet_format == 'LE':
result_list = self.LE_RX_RESULTS
else:
result_list = self.BT_RX_RESULTS
TestCaseFactory._VerifyResults(result_list, result_limit)
if test_type == 'RX':
self._VerifyArgAndResultStandardRX(packet_format, args, result_limit)
else:
self._VerifyArgAndResultStandardTX(
packet_format, packet_type, result_limit)
def _Create(self, args, result_limit):
"""Creates the bluetooth test case.
For BDR and LE, the bit pattern of the packet are different from results.
We add the 'bit_pattern' argument and split into different test cases.
There is only 'PRBS9' bit pattern for EDR.
"""
name = self._GenerateCanonicalName(args)
packet_format = self._DetermineFormat(args['packet_type'])
# Convert rx_num_bits to rx_num_packets. Add 10% packets for some loss.
if args['test_type'] == 'RX' and packet_format != 'LE':
args['rx_num_packets'] = int(1.1 * args['rx_num_bits'] / 8 /
self.MAX_PACKET_LENGTH[args['packet_type']])
if packet_format == 'EDR':
args['bit_pattern'] = 'PRBS9'
return [TestCase(name, args, result_limit)]
# Split the test case by the bit pattern.
ret = []
for bit_pattern, result_list in self.BIT_PATTERN_RESULTS.items():
filtered_result = dict(
(k, v) for k, v in result_limit.items() if k in result_list)
if filtered_result:
copy_args = args.copy()
copy_args['bit_pattern'] = bit_pattern
ret.append(TestCase(name, copy_args, filtered_result))
return ret
@property
def canonical_arg_order(self):
return ['rf_type', 'component_name', 'test_type', 'center_freq',
'packet_type']
class ZigbeeTestCaseFactory(TestCaseFactory):
RF_TYPE = '802_15_4'
REQUIRED_ARG_CHECKER = {
'rf_type': ListChecker([RF_TYPE]),
'component_name': ListChecker([str]),
'test_type': ListChecker(SUPPORTED_TEST_TYPES),
'center_freq': ListChecker([lambda freq: 2400 < freq < 6000]),
'power_level': ListChecker([int, float, 'auto'])}
RX_ARG_CHECKER = {
'rx_num_packets': ListChecker([int])}
TX_RESULTS = ['avg_power', 'evm_all', 'evm_offset', 'freq_error']
RX_RESULTS = ['rx_per']
def _VerifyArgAndResult(self, args, result_limit):
# Check the RF type is valid.
if self.RF_TYPE != args['rf_type']:
raise ValueError('%s is not a valid RF type.' % args['rf_type'])
# Check all required arguments are valid.
arg_checker = self.REQUIRED_ARG_CHECKER.copy()
if args['test_type'] == 'RX':
arg_checker.update(self.RX_ARG_CHECKER)
TestCaseFactory.VerifyArguments(arg_checker, args)
# Check there is result and all results are valid.
if args['test_type'] == 'TX':
result_list = self.TX_RESULTS
else:
result_list = self.RX_RESULTS
TestCaseFactory._VerifyResults(result_list, result_limit)
@property
def canonical_arg_order(self):
return ['rf_type', 'component_name', 'test_type', 'center_freq']
class TestCase(object):
"""The test case. It contains the arguments of a RF test."""
def __init__(self, name, args, result_limit):
"""Initializes the test case.
Arg:
name: A string that should contain important arguments information.
args: A dict of the test arguments. The key 'rf_type', 'test_type' and
'component_name' are essential.
Example:
{
'component_name': 'WLAN_2G',
'rf_type': 'WLAN',
'test_type': 'TX',
'center_freq': 2412
}
result_limit: A dict of the test result. The values are a tuple of lower
bound and upper bound.
Example:
{
'evm': (None, -25),
'avg_power': (16, 20)
}
"""
self.rf_type = args['rf_type']
self.test_type = args['test_type']
self.args = args
self.result_limit = result_limit
self.name = name
def __str__(self):
return self.name
def ResolveAttr(self, arg_keys):
"""Returns the value in args. It's used for sorting the test cases."""
return tuple([self.args.get(arg_key) for arg_key in arg_keys])
def CheckTestPasses(self, result):
"""Checks the result meets the result limit of the test case or not.
The method verifies that each item in the `result_limit` is included in the
result dict, and verifies that the result value meets the limit.
Args:
result: A dict in which keys should contain all the keys of
`self.result_limit`, and the values are the result values.
Returns:
True if every result value meets the result limit.
"""
is_pass = True
for key in self.result_limit.keys():
# Check there are all the required items in the result.
if key not in result:
logger.error('The result of %s is missing.', key)
is_pass = False
# Check the result value meets the result limit.
elif not IsInBound(result[key], self.result_limit[key]):
logger.info('The test in result %s failed. %s not in %s',
key, result[key], self.result_limit[key])
is_pass = False
return is_pass
def ToDict(self):
"""Returns a dictionary containing all arguments of the class.
It is for expanding the arguments of the function. For example:
def foo(component_name, **kwargs):
pass
foo(**test_case.ToDict())
"""
ret = self.args.copy()
ret['result_limit'] = self.result_limit.copy()
ret['name'] = self.name
return ret
def Copy(self):
return TestCase(self.name, self.args.copy(), self.result_limit.copy())
def ChainMaskToList(chain_mask):
"""Converts the chain mask to the list of active antenna, starts from 0.
The chain mask is the indicator of the activated antenna in binary
representation. LSB means the first antenna and we called it antenna #0. For
example:
1 (b0001) means the first antenna is activated.
2 (b0010) means the second antenna is activated.
3 (b0011) means the first and the second antenna are activated.
If the chain_mask is None, it means there is only 1 antenna.
Args:
chain_mask: None or integer.
Returns:
A list of integer which contains the activated antenna indices.
"""
if chain_mask is None:
return [0]
ret = []
antenna_idx = 0
while chain_mask > 0:
if chain_mask & 1:
ret.append(antenna_idx)
chain_mask >>= 1
antenna_idx += 1
return ret
def StripRow(row):
"""Strips every item in the row, and remove the right side of empty item."""
row = [item.strip() for item in row]
while len(row) > 0 and len(row[-1]) == 0:
row.pop()
return row