blob: 27ef7f3c983fa6ea91b661c7bfcc5c7b7fda2124 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Tool to run alsa_conformance_test automatically."""
import argparse
import collections
import json
import logging
import re
import subprocess
import sys
TEST_BINARY = 'alsa_conformance_test'
Range = collections.namedtuple('Range', ['lower', 'upper'])
DataDevInfo = collections.namedtuple('DataDevInfo', [
'name', 'card', 'device', 'stream', 'valid_formats', 'valid_rates',
'valid_channels', 'period_size_range', 'buffer_size_range'
])
DataParams = collections.namedtuple('DataParams', [
'name', 'stream', 'access', 'format', 'channels', 'rate', 'period_size',
'buffer_size'
])
DataResult = collections.namedtuple('DataResult', [
'points', 'step_average', 'step_min', 'step_max', 'step_sd', 'rate',
'rate_error', 'underrun_nums', 'overrun_nums'
])
DEFAULT_PARAMS = DataParams(
name=None,
stream=None,
access='MMAP_INTERLEAVED',
format='S16_LE',
channels=2,
rate=48000,
period_size=240,
buffer_size=None)
Criteria = collections.namedtuple('PassCriteria', [
'rate_diff', 'rate_err'
])
DESCRIPTION = """
Test basic funtion of alsa pcm device automatically.
It is a script for alsa_conformance_test.
"""
TEST_SUITES = ['test_params', 'test_rates', 'test_all_pairs']
TEST_SUITES_DESCRIPTION = """
test suites list:
test_params Check whether all parameters can be set correctly.
test_rates Check whether all estimated rates are the same as what
it set.
test_all_pairs Check whether the audio is still stable when mixing
different params.
"""
class Output(object):
"""The output from alsa_conformance_test.
Attributes:
rc: The return value.
out: The output from stdout.
err: The output from stderr.
"""
def __init__(self, rc, out, err):
"""Inits Output object."""
self.rc = rc
self.out = out
self.err = err
class Parser(object):
"""Object which can parse result from alsa_conformance_test.
Attributes:
_context: The output result from alsa_conformance_test.
"""
def parse(self, context):
"""Parses alsa_conformance_test result.
Args:
context: The output result from alsa_conformance_test.
"""
raise NotImplementedError
def _get_value(self, key, unit=''):
"""Finds the key in context and returns its content.
Args:
key: String representing the key.
unit: String representing the unit.
Returns:
The content following the key. For example:
_context = '''
format: S16_LE
channels: 4
rate: 48000 fps
period size: 240 frames
'''
_get_value('format') = 'S16_LE'
_get_value('channels') = '4'
_get_value('rate', 'fps') = '48000'
_get_value('period size', 'frames') = '240'
Raises:
ValueError: Can not find the key in context or finds an
unmatched unit.
"""
pattern = key + ': (.*)' + unit + '\n'
search = re.search(pattern, self._context)
if search is None:
msg = 'Can not find keyword {}'.format(key)
if not unit:
msg += ' with unit {}'.format(unit)
raise ValueError(msg)
return search.group(1).strip()
def _get_list(self, key):
"""Finds the key in context and returns its content as a list.
Args:
key: String representing the key.
Returns:
The list following the key. For example:
_context = '''
available channels: 1, 2
available formats: S16_LE S32_LE
available rates: 44100 48000 96000
'''
_get_list('available channels') = ['1', '2']
_get_list('available formats') = ['S16_LE', 'S32_LE']
_get_list('available rates') = ['44100', '48000', '96000']
Raises:
ValueError: Can not find the key in context.
"""
content = self._get_value(key)
content = content.strip('[]')
content = content.replace(',', ' ')
return content.split()
def _get_range(self, key):
"""Finds the key in context and returns its content as a range.
Args:
key: String representing the key.
Returns:
The range following the key. For example:
context = '''
period size range: [16, 262144]
'''
_get_range('period size range') = [16, 262144]
Raises:
ValueError: Can not find the key in context or wrong format.
"""
content_list = self._get_list(key)
if len(content_list) != 2:
raise ValueError('Wrong range format.')
return Range(*map(int, content_list))
class DeviceInfoParser(Parser):
"""Object which can parse device info from alsa_conformance_test."""
def parse(self, context):
"""Parses device information.
Args:
context: The output result from alsa_conformance_test
with --dev_info_only flag.
Returns:
The DataDevInfo object which includes device information. For example:
context = '''
------DEVICE INFORMATION------
PCM handle name: hw:0,0
PCM type: HW
card: CardID [CardName]
device: DeviceID [DeviceName]
stream: PLAYBACK
available range: 1, 2
available formats: S16_LE S32_LE
rate range: [44100, 192000]
available rates: 44100 48000 96000 192000
period size range: [16, 262144]
buffer size range: [32, 524288]
------------------------------
'''
Result
DataDevInfo(
name='hw:0,0',
card='soundcard',
stream='PLAYBACK',
valid_formats=['S16_LE', 'S32_LE'],
valid_channels=['1', '2'],
valid_rates=[44100, 48000, 96000, 192000],
period_size_range=Range(lower=16, upper=262144),
buffer_size_range=Range(lower=32, upper=524288)
)
Raises:
ValueError: Can not get device information.
"""
if 'DEVICE INFORMATION' not in context:
raise ValueError('Can not get device information.')
self._context = context
return DataDevInfo(
self._get_value('PCM handle name'),
self._get_value('card'),
self._get_value('device'),
self._get_value('stream'),
self._get_list('available formats'),
list(map(int, self._get_list('available rates'))),
list(map(int, self._get_list('available channels'))),
self._get_range('period size range'),
self._get_range('buffer size range'))
class ParamsParser(Parser):
"""Object which can parse params from alsa_conformance_test."""
def parse(self, context):
"""Parses device params.
Args:
context: The output result from alsa_conformance_test.
Returns:
The DataParams object which includes device information. For example:
context = '''
---------PRINT PARAMS---------
PCM name: hw:0,0
stream: PLAYBACK
access type: MMAP_INTERLEAVED
format: S16_LE
channels: 2
rate: 48000 fps
period time: 5000 us
period size: 240 frames
buffer time: 160000 us
buffer size: 7680 frames
------------------------------
'''
Result
DataParams(
name='hw:0,0',
stream='PLAYBACK',
access='MMAP_INTERLEAVED',
format='S16_LE',
channels=2,
rate=48000,
period_size=240,
buffer_size=7680
)
Raises:
ValueError: Can not get params information or wrong format.
"""
if 'PRINT PARAMS' not in context:
raise ValueError('Can not get params information.')
self._context = context
rate = self._get_value('rate', unit='fps')
period_size = self._get_value('period size', unit='frames')
buffer_size = self._get_value('buffer size', unit='frames')
return DataParams(
self._get_value('PCM name'),
self._get_value('stream'),
self._get_value('access type'),
self._get_value('format'),
int(self._get_value('channels')),
float(rate),
int(period_size),
int(buffer_size))
class ResultParser(Parser):
"""Object which can parse run result from alsa_conformance_test."""
def parse(self, context):
"""Parses run result.
Args:
context: The output result from alsa_conformance_test.
Returns:
The DataResult object which includes run result. For example:
context = '''
----------RUN RESULT----------
number of recorders: 1
number of points: 6142
step average: 7.769419
step min: 1
step max: 41
step standard deviation: 1.245727
rate: 48000.042167
rate error: 0.349262
number of underrun: 0
number of overrun: 0
'''
Result
DataResult(
points=6162,
step_average=7.769419,
step_min=1,
step_max=41,
step_sd=1.245727,
rate=48000.042167,
rate_error=0.349262,
underrun_nums=0,
overrun_nums=0
)
Raises:
ValueError: Can not get run result or wrong format.
"""
if 'RUN RESULT' not in context:
raise ValueError('Can not get run result.')
self._context = context[context.find('RUN RESULT'):]
return DataResult(
int(self._get_value('number of points')),
float(self._get_value('step average')),
int(self._get_value('step min')),
int(self._get_value('step max')),
float(self._get_value('step standard deviation')),
float(self._get_value('rate')),
float(self._get_value('rate error')),
int(self._get_value('number of underrun')),
int(self._get_value('number of overrun')))
class AlsaConformanceTester(object):
"""Object which can set params and run alsa_conformance_test."""
def __init__(self, name, stream, criteria, threshold):
"""Initializes an AlsaConformanceTester.
Args:
name: PCM device for playback or capture.
stream: The stream type. (PLAYBACK or CAPTURE)
criteria: A Criteria object for pass criteria.
"""
self.name = name
self.stream = stream
self.format = None
self.channels = None
self.rate = None
self.period_size = None
self.merge_thld_size = threshold
self.criteria = criteria
output = self.run(['--dev_info_only'])
if output.rc != 0:
print('Fail - {}'.format(output.err))
exit()
self.dev_info = DeviceInfoParser().parse(output.out)
def init_params(self):
"""Sets the device params to the default values.
If the default value is not supported, choose the first supported one
instead.
"""
in_range = lambda x, Range: Range.lower <= x <= Range.upper
if DEFAULT_PARAMS.format in self.dev_info.valid_formats:
self.format = DEFAULT_PARAMS.format
else:
self.format = self.dev_info.valid_formats[0]
if DEFAULT_PARAMS.channels in self.dev_info.valid_channels:
self.channels = DEFAULT_PARAMS.channels
else:
self.channels = self.dev_info.valid_channels[0]
if DEFAULT_PARAMS.rate in self.dev_info.valid_rates:
self.rate = DEFAULT_PARAMS.rate
else:
self.rate = self.dev_info.valid_rates[0]
if in_range(DEFAULT_PARAMS.period_size, self.dev_info.period_size_range):
self.period_size = DEFAULT_PARAMS.period_size
else:
self.period_size = self.dev_info.period_size_range.lower
def show_dev_info(self):
"""Prints device information."""
print('Device Information')
print('\tName:', self.dev_info.name)
print('\tCard:', self.dev_info.card)
print('\tDevice:', self.dev_info.device)
print('\tStream:', self.dev_info.stream)
print('\tFormat:', self.dev_info.valid_formats)
print('\tChannels:', self.dev_info.valid_channels)
print('\tRate:', self.dev_info.valid_rates)
print('\tPeriod_size range:', list(self.dev_info.period_size_range))
print('\tBuffer_size range:', list(self.dev_info.buffer_size_range))
def run(self, arg):
"""Runs alsa_conformance_test.
Args:
arg: An array of strings for extra arguments.
Returns:
The Output object from alsa_conformance_test.
"""
if self.stream == 'PLAYBACK':
stream_arg = '-P'
elif self.stream == 'CAPTURE':
stream_arg = '-C'
cmd = [TEST_BINARY, stream_arg, self.name] + arg
if self.rate is not None:
cmd += ['-r', str(self.rate)]
if self.channels is not None:
cmd += ['-c', str(self.channels)]
if self.format is not None:
cmd += ['-f', str(self.format)]
if self.period_size is not None:
cmd += ['-p', str(self.period_size)]
if self.merge_thld_size is not None:
cmd += ['--merge_threshold_sz', str(self.merge_thld_size)]
logging.info('Execute command: %s', ' '.join(cmd))
p = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf8')
rc = p.wait()
out, err = p.communicate()
return Output(rc, out, err[:-1])
def run_and_check(self, test_name, test_args, check_function):
"""Runs alsa_conformance_test and checks result.
Args:
test_name: The name of test.
test_args: An array of strings for extra arguments of test.
check_function: The function to check the result from
alsa_conformance_test. Refer to _default_check_function
for default implementation.
Returns:
The data or result. For example:
{'name': The name of the test.
'result': The first return value from check_function.
It should be 'pass' or 'fail'.
'error': The second return value from check_function.}
"""
data = {}
data['name'] = test_name
logging.info(test_name)
output = self.run(test_args)
result, error = check_function(output)
data['result'] = result
data['error'] = error
logging_msg = result
if result == 'fail':
logging_msg += ' - ' + error
logging.info(logging_msg)
return data
@staticmethod
def _default_check_function(output):
"""It is the default check function of test.
Args:
output: The Output object from alsa_conformance_test.
Returns:
result: pass or fail.
err: The error message.
"""
if output.rc != 0:
result = 'fail'
error = output.err
else:
result = 'pass'
error = ''
return result, error
def test(self, test_suites, use_json):
"""Does testing.
Args:
test_suites: Indicate which tests will be run.
use_json: If true, print result with json format.
"""
result = {}
result['testSuites'] = []
if 'test_params' in test_suites:
result['testSuites'].append(self.test_params())
if 'test_rates' in test_suites:
result['testSuites'].append(self.test_rates())
if 'test_all_pairs' in test_suites:
result['testSuites'].append(self.test_all_pairs())
result = self.summarize(result)
if use_json:
print(json.dumps(result, indent=4, sort_keys=True))
else:
self.print_result(result)
def test_params(self):
"""Checks if we can set params correctly on device."""
result = {}
result['name'] = 'Test Params'
result['tests'] = []
result['tests'] += self.test_params_channels()
result['tests'] += self.test_params_formats()
result['tests'] += self.test_params_rates()
return result
def test_params_channels(self):
"""Checks if channels can be set correctly."""
self.init_params()
result = []
for self.channels in self.dev_info.valid_channels:
test_name = 'Set channels {}'.format(self.channels)
test_args = ['-d', '0.1']
data = self.run_and_check(test_name, test_args,
self._default_check_function)
result.append(data)
return result
def test_params_formats(self):
"""Checks if formats can be set correctly."""
self.init_params()
result = []
for self.format in self.dev_info.valid_formats:
test_name = 'Set format {}'.format(self.format)
test_args = ['-d', '0.1']
data = self.run_and_check(test_name, test_args,
self._default_check_function)
result.append(data)
return result
def test_params_rates(self):
"""Checks if rates can be set correctly."""
def check_function(output):
"""Checks if rate in params is the same as rate being set."""
result = 'pass'
error = ''
if output.rc != 0:
result = 'fail'
error = output.err
else:
params = ParamsParser().parse(output.out)
if params.rate != self.rate:
result = 'fail'
error = 'Set rate {} but got {}'.format(self.rate, params.rate)
return result, error
self.init_params()
result = []
for self.rate in self.dev_info.valid_rates:
test_name = 'Set rate {}'.format(self.rate)
test_args = ['-d', '0.1']
data = self.run_and_check(test_name, test_args, check_function)
result.append(data)
return result
def _check_rate(self, output):
"""Checks if rate being set meets rate calculated by the test."""
result = 'pass'
error = ''
if output.rc != 0:
result = 'fail'
error = output.err
else:
run_result = ResultParser().parse(output.out)
rate_threshold = self.rate * self.criteria.rate_diff / 100.0
if abs(run_result.rate - self.rate) > rate_threshold:
result = 'fail'
error = ('Expected rate is {}, measure {}, '
'difference {} > threshold {}')
error = error.format(
self.rate, run_result.rate,
abs(run_result.rate - self.rate),
rate_threshold)
elif run_result.rate_error > self.criteria.rate_err:
result = 'fail'
error = 'Rate error {} > threshold {}'.format(
run_result.rate_error, self.criteria.rate_err)
return result, error
def test_rates(self):
"""Checks if rates meet our prediction."""
result = {}
result['name'] = 'Test Rates'
result['tests'] = []
self.init_params()
for self.rate in self.dev_info.valid_rates:
test_name = 'Set rate {}'.format(self.rate)
test_args = ['-d', '1']
data = self.run_and_check(test_name, test_args, self._check_rate)
result['tests'].append(data)
return result
def test_all_pairs(self):
"""Checks if the audio is still stable when mixing different params.
The test will check if rates meet our prediction when testing all
combinations of channels, sample rates and formats.
"""
result = {}
result['name'] = 'Test All Pairs'
result['tests'] = []
self.init_params()
for self.channels in self.dev_info.valid_channels:
for self.format in self.dev_info.valid_formats:
for self.rate in self.dev_info.valid_rates:
test_name = 'Set channels {}, format {}, rate {}'.format(
self.channels, self.format, self.rate)
test_args = ['-d', '1']
data = self.run_and_check(test_name, test_args, self._check_rate)
result['tests'].append(data)
return result
def summarize(self, result):
"""Summarizes the test results.
Args:
result: A result from tester.
Returns:
The result with counts of pass and fail. For example:
{
"pass": 4,
"fail": 1,
"testSuites": [
{
"name": "Test Params",
"pass": 4,
"fail": 1,
"tests": [
{
"name": "Set channels 2",
"result": "pass",
"error": ""
},
{
"name": "Set rate 48000",
"result": "fail",
"error": "Set rate 48000 but got 44100"
}
]
}
]
}
"""
result['pass'] = 0
result['fail'] = 0
for suite in result['testSuites']:
suite['pass'] = 0
suite['fail'] = 0
for test in suite['tests']:
suite[test['result']] += 1
result['pass'] += suite['pass']
result['fail'] += suite['fail']
return result
def print_result(self, result):
"""Prints the test results.
Args:
result: A result from summarize.
"""
print('{} passed, {} failed'.format(result['pass'], result['fail']))
self.show_dev_info()
for suite in result['testSuites']:
print(suite['name'])
for test in suite['tests']:
msg = test['name'] + ': ' + test['result']
if test['result'] == 'fail':
msg += ' - ' + test['error']
print('\t' + msg)
def main():
parser = argparse.ArgumentParser(
description=DESCRIPTION,
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=TEST_SUITES_DESCRIPTION)
parser.add_argument(
'-C', '--input_device',
help='Alsa input device, such as hw:0,0')
parser.add_argument(
'-P', '--output_device',
help='Alsa output device, such as hw:0,0')
parser.add_argument(
'--rate-criteria-diff-pct',
help=('The pass criteria of rate. The value is a percentage of rate. '
'For example, 0.01 means the pass range is [47995.2, 48004.8] '
'for rate 48000. (default: 0.01)'),
type=float, default=0.01)
parser.add_argument(
'--rate-err-criteria',
help='The pass criteria of rate error. (default: 10)',
type=float, default=10)
parser.add_argument(
'--merge-thld-size',
help=('Override the auto computed merge_threshold_sz. '
'See the Explaination of point merge in the doc for details.'),
type=int)
parser.add_argument(
'--json', action='store_true', help='Print result in JSON format')
parser.add_argument('--log-file', help='The file to save logs.')
parser.add_argument(
'--test-suites', nargs='+',
help='Customize which test suites should be run. If not set, all suites '
'will be run. See the test suites list for more information.',
choices=TEST_SUITES, default=TEST_SUITES, metavar='TEST_SUITE')
args = parser.parse_args()
criteria = Criteria(args.rate_criteria_diff_pct, args.rate_err_criteria)
if args.log_file is not None:
logging.basicConfig(
level=logging.DEBUG, filename=args.log_file, filemode='w')
if not args.input_device and not args.output_device:
print('Require an input or output device to test.', file=sys.stderr)
exit(1)
if args.input_device and args.output_device:
print('Not support testing multiple devices yet.', file=sys.stderr)
exit(1)
if args.input_device:
tester = AlsaConformanceTester(args.input_device, 'CAPTURE', criteria,
args.merge_thld_size)
if args.output_device:
tester = AlsaConformanceTester(args.output_device, 'PLAYBACK', criteria,
args.merge_thld_size)
tester.test(args.test_suites, args.json)
if __name__ == '__main__':
main()