blob: 912b2a90be99b37b3f9380e29eac70d5a64e7c9b [file] [log] [blame]
#!/usr/bin/python
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Tool to run alsa_conformance_test automatically."""
import argparse
import collections
import json
import logging
import re
import subprocess
TEST_BINARY = 'alsa_conformance_test'
Range = collections.namedtuple('Range', ['lower', 'upper'])
DataDevInfo = collections.namedtuple('DataDevInfo', [
'name', 'stream', 'valid_formats', 'valid_rates', 'channels_range',
'period_size_range', 'buffer_size_range'
])
DataParams = collections.namedtuple('DataParams', [
'name', 'stream', 'access', 'format', 'channels', 'rate', 'period_size',
'buffer_size'
])
DataResult = collections.namedtuple('DataResult', [
'points', 'step_average', 'step_min', 'step_max', 'step_sd', 'rate',
'rate_error', 'underrun_nums', 'overrun_nums'
])
DEFAULT_PARAMS = DataParams(
name=None,
stream=None,
access='MMAP_INTERLEAVED',
format='S16_LE',
channels=2,
rate=48000,
period_size=240,
buffer_size=None)
Criteria = collections.namedtuple('PassCriteria', [
'rate_diff', 'rate_err'
])
PASS_CRITERIA = Criteria(
rate_diff=20,
rate_err=10
)
class Output(object):
"""The output from alsa_conformance_test.
Attributes:
rc: The return value.
out: The output from stdout.
err: The output from stderr.
"""
def __init__(self, rc, out, err):
"""Inits Output object."""
self.rc = rc
self.out = out
self.err = err
class Parser(object):
"""Object which can parse result from alsa_conformance_test.
Attributes:
_context: The output result from alsa_conformance_test.
"""
def parse(self, context):
"""Parses alsa_conformance_test result.
Args:
context: The output result from alsa_conformance_test.
"""
raise NotImplementedError
def _get_value(self, key, unit=''):
"""Finds the key in context and returns its content.
Args:
key: String representing the key.
unit: String representing the unit.
Returns:
The content following the key. For example:
_context = '''
format: S16_LE
channels: 4
rate: 48000 bps
period size: 240 frames
'''
_get_value('format') = 'S16_LE'
_get_value('channels') = '4'
_get_value('rate', 'bps') = '48000'
_get_value('period size', 'frames') = '240'
Raises:
ValueError: Can not find the key in context or finds an
unmatched unit.
"""
pattern = key + ': (.*)' + unit + '\n'
search = re.search(pattern, self._context)
if search is None:
msg = 'Can not find keyword %s' % key
if not unit:
msg += ' with unit %s' % unit
raise ValueError(msg)
return search.group(1).strip()
def _get_list(self, key):
"""Finds the key in context and returns its content as a list.
Args:
key: String representing the key.
Returns:
The list following the key. For example:
_context = '''
channels range: [2, 2]
available formats: S16_LE S32_LE
available rates: 44100 48000 96000
'''
_get_list('channels range') = ['2', '2']
_get_list('available formats') = ['S16_LE', 'S32_LE']
_get_list('available rates') = ['44100', '48000', '96000']
Raises:
ValueError: Can not find the key in context.
"""
content = self._get_value(key)
content = content.strip('[]')
content = content.replace(',', ' ')
return content.split()
def _get_range(self, key):
"""Finds the key in context and returns its content as a range.
Args:
key: String representing the key.
Returns:
The range following the key. For example:
context = '''
channels range: [2, 2]
period size range: [16, 262144]
'''
_get_range('channels range') = [2, 2]
_get_range('period size range') = [16, 262144]
Raises:
ValueError: Can not find the key in context or wrong format.
"""
content_list = self._get_list(key)
if len(content_list) != 2:
raise ValueError('Wrong range format.')
return Range(*map(int, content_list))
class DeviceInfoParser(Parser):
"""Object which can parse device info from alsa_conformance_test."""
def parse(self, context):
"""Parses device information.
Args:
context: The output result from alsa_conformance_test
with --dev_info_only flag.
Returns:
The DataDevInfo object which includes device information. For example:
context = '''
------DEVICE INFORMATION------
PCM handle name: hw:0,0
PCM type: HW
stream: PLAYBACK
channels range: [2, 2]
available formats: S16_LE S32_LE
rate range: [44100, 192000]
available rates: 44100 48000 96000 192000
period size range: [16, 262144]
buffer size range: [32, 524288]
------------------------------
'''
Result
DataDevInfo(
name='hw:0,0',
stream='PLAYBACK',
valid_formats=['S16_LE', 'S32_LE'],
channels_range=Range(lower=2, upper=2),
valid_rates=[44100, 48000, 96000, 192000],
period_size_range=Range(lower=16, upper=262144),
buffer_size_range=Range(lower=32, upper=524288)
)
Raises:
ValueError: Can not get device information.
"""
if 'DEVICE INFORMATION' not in context:
raise ValueError('Can not get device information.')
self._context = context
return DataDevInfo(
self._get_value('PCM handle name'),
self._get_value('stream'),
self._get_list('available formats'),
map(int, self._get_list('available rates')),
self._get_range('channels range'),
self._get_range('period size range'),
self._get_range('buffer size range'))
class ParamsParser(Parser):
"""Object which can parse params from alsa_conformance_test."""
def parse(self, context):
"""Parses device params.
Args:
context: The output result from alsa_conformance_test.
Returns:
The DataParams object which includes device information. For example:
context = '''
---------PRINT PARAMS---------
PCM name: hw:0,0
stream: PLAYBACK
access type: MMAP_INTERLEAVED
format: S16_LE
channels: 2
rate: 48000 bps
period time: 5000 us
period size: 240 frames
buffer time: 160000 us
buffer size: 7680 frames
------------------------------
'''
Result
DataParams(
name='hw:0,0',
stream='PLAYBACK',
access='MMAP_INTERLEAVED',
format='S16_LE',
channels=2,
rate=48000,
period_size=240,
buffer_size=7680
)
Raises:
ValueError: Can not get params information or wrong format.
"""
if 'PRINT PARAMS' not in context:
raise ValueError('Can not get params information.')
self._context = context
rate = self._get_value('rate', unit='bps')
period_size = self._get_value('period size', unit='frames')
buffer_size = self._get_value('buffer size', unit='frames')
return DataParams(
self._get_value('PCM name'),
self._get_value('stream'),
self._get_value('access type'),
self._get_value('format'),
int(self._get_value('channels')),
float(rate),
int(period_size),
int(buffer_size))
class ResultParser(Parser):
"""Object which can parse run result from alsa_conformance_test."""
def parse(self, context):
"""Parses run result.
Args:
context: The output result from alsa_conformance_test.
Returns:
The DataResult object which includes run result. For example:
context = '''
----------RUN RESULT----------
number of recorders: 1
number of points: 6142
step average: 7.769419
step min: 1
step max: 41
step standard deviation: 1.245727
rate: 48000.042167
rate error: 0.349262
number of underrun: 0
number of overrun: 0
'''
Result
DataResult(
points=6162,
step_average=7.769419,
step_min=1,
step_max=41,
step_sd=1.245727,
rate=48000.042167,
rate_error=0.349262,
underrun_nums=0,
overrun_nums=0
)
Raises:
ValueError: Can not get run result or wrong format.
"""
if 'RUN RESULT' not in context:
raise ValueError('Can not get run result.')
self._context = context[context.find('RUN RESULT'):]
return DataResult(
int(self._get_value('number of points')),
float(self._get_value('step average')),
int(self._get_value('step min')),
int(self._get_value('step max')),
float(self._get_value('step standard deviation')),
float(self._get_value('rate')),
float(self._get_value('rate error')),
int(self._get_value('number of underrun')),
int(self._get_value('number of overrun')))
class AlsaConformanceTester(object):
"""Object which can set params and run alsa_conformance_test."""
def __init__(self, name, stream):
"""Initializes an AlsaConformanceTester.
Args:
name: PCM device for playback or capture.
stream: The stream type. (PLAYBACK or CAPTURE)
"""
self.name = name
self.stream = stream
self.format = None
self.channels = None
self.rate = None
self.period_size = None
output = self.run(['--dev_info_only'])
if output.rc != 0:
print 'Fail - %s' % output.err
exit()
self.dev_info = DeviceInfoParser().parse(output.out)
def init_params(self):
"""Sets the device params to the default values.
If the default value is not supported, choose the first supported one
instead.
"""
in_range = lambda x, Range: Range.lower <= x <= Range.upper
if DEFAULT_PARAMS.format in self.dev_info.valid_formats:
self.format = DEFAULT_PARAMS.format
else:
self.format = self.dev_info.valid_formats[0]
if in_range(DEFAULT_PARAMS.channels, self.dev_info.channels_range):
self.channels = DEFAULT_PARAMS.channels
else:
self.channels = self.dev_info.channels_range.lower
if DEFAULT_PARAMS.rate in self.dev_info.valid_rates:
self.rate = DEFAULT_PARAMS.rate
else:
self.rate = self.dev_info.valid_rates[0]
if in_range(DEFAULT_PARAMS.period_size, self.dev_info.period_size_range):
self.period_size = DEFAULT_PARAMS.period_size
else:
self.period_size = self.dev_info.period_size_range.lower
def show_dev_info(self):
"""Prints device information."""
print 'Device Information'
print '\tName:', self.dev_info.name
print '\tStream:', self.dev_info.stream
print '\tFormat:', self.dev_info.valid_formats
print '\tChannels range:', list(self.dev_info.channels_range)
print '\tRate:', self.dev_info.valid_rates
print '\tPeriod_size range:', list(self.dev_info.period_size_range)
print '\tBuffer_size range:', list(self.dev_info.buffer_size_range)
def run(self, arg):
"""Runs alsa_conformance_test.
Args:
arg: An array of strings for extra arguments.
Returns:
The Output object from alsa_conformance_test.
"""
if self.stream == 'PLAYBACK':
stream_arg = '-P'
elif self.stream == 'CAPTURE':
stream_arg = '-C'
cmd = [TEST_BINARY, stream_arg, self.name] + arg
if self.rate is not None:
cmd += ['-r', str(self.rate)]
if self.channels is not None:
cmd += ['-c', str(self.channels)]
if self.format is not None:
cmd += ['-f', str(self.format)]
if self.period_size is not None:
cmd += ['-p', str(self.period_size)]
logging.info('Execute command: %s', ' '.join(cmd))
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
rc = p.wait()
out, err = p.communicate()
return Output(rc, out, err[:-1])
def run_and_check(self, test_name, test_args, check_function):
"""Runs alsa_conformance_test and checks result.
Args:
test_name: The name of test.
test_args: An array of strings for extra arguments of test.
check_function: The function to check the result from
alsa_conformance_test. Refer to _default_check_function
for default implementation.
Returns:
The data or result. For example:
{'name': The name of the test.
'result': The first return value from check_function.
It should be 'pass' or 'fail'.
'error': The second return value from check_function.}
"""
data = {}
data['name'] = test_name
logging.info(test_name)
output = self.run(test_args)
result, error = check_function(output)
data['result'] = result
data['error'] = error
logging_msg = result
if result == 'fail':
logging_msg += ' - ' + error
logging.info(logging_msg)
return data
@staticmethod
def _default_check_function(output):
"""It is the default check function of test.
Args:
output: The Output object from alsa_conformance_test.
Returns:
result: pass or fail.
err: The error message.
"""
if output.rc != 0:
result = 'fail'
error = output.err
else:
result = 'pass'
error = ''
return result, error
def test(self, use_json):
"""Does testing.
Args:
use_json: If true, print result with json format.
"""
result = {}
result['testSuites'] = []
result['testSuites'].append(self.test_params())
result['testSuites'].append(self.test_rates())
result = self.summarize(result)
if use_json:
print json.dumps(result, indent=4, sort_keys=True)
else:
self.print_result(result)
def test_params(self):
"""Checks if we can set params correctly on device."""
result = {}
result['name'] = 'Test Params'
result['tests'] = []
result['tests'] += self.test_params_channels()
result['tests'] += self.test_params_formats()
result['tests'] += self.test_params_rates()
return result
def test_params_channels(self):
"""Checks if channels can be set correctly."""
self.init_params()
result = []
for self.channels in range(self.dev_info.channels_range.lower,
self.dev_info.channels_range.upper + 1):
test_name = 'Set channels %d' % (self.channels)
test_args = ['-d', '0.1']
data = self.run_and_check(test_name, test_args,
self._default_check_function)
result.append(data)
return result
def test_params_formats(self):
"""Checks if formats can be set correctly."""
self.init_params()
result = []
for self.format in self.dev_info.valid_formats:
test_name = 'Set format %s' % (self.format)
test_args = ['-d', '0.1']
data = self.run_and_check(test_name, test_args,
self._default_check_function)
result.append(data)
return result
def test_params_rates(self):
"""Checks if rates can be set correctly."""
def check_function(output):
"""Checks if rate in params is the same as rate being set."""
result = 'pass'
error = ''
if output.rc != 0:
result = 'fail'
error = output.err
else:
params = ParamsParser().parse(output.out)
if params.rate != self.rate:
result = 'fail'
error = 'Set rate %d but got %d' % (self.rate, params.rate)
return result, error
self.init_params()
result = []
for self.rate in self.dev_info.valid_rates:
test_name = 'Set rate %d' % (self.rate)
test_args = ['-d', '0.1']
data = self.run_and_check(test_name, test_args, check_function)
result.append(data)
return result
def test_rates(self):
"""Checks if rates meet our prediction."""
result = {}
result['name'] = 'Test Rates'
result['tests'] = []
def check_function(output):
"""Checks if rate being set meets rate calculated by the test."""
result = 'pass'
error = ''
if output.rc != 0:
result = 'fail'
error = output.err
else:
run_result = ResultParser().parse(output.out)
if abs(run_result.rate - self.rate) > PASS_CRITERIA.rate_diff:
result = 'fail'
error = ('Expected rate is %lf, measure %lf, '
'difference %lf > threshold %lf')
error = error % (self.rate, run_result.rate,
abs(run_result.rate - self.rate),
PASS_CRITERIA.rate_diff)
elif run_result.rate_error > PASS_CRITERIA.rate_err:
result = 'fail'
error = 'Rate error %lf > threshold %lf' % (
run_result.rate_error, PASS_CRITERIA.rate_err)
return result, error
self.init_params()
for self.rate in self.dev_info.valid_rates:
test_name = 'Set rate %d' % (self.rate)
test_args = ['-d', '1']
data = self.run_and_check(test_name, test_args, check_function)
result['tests'].append(data)
return result
def summarize(self, result):
"""Summarizes the test results.
Args:
result: A result from tester.
Returns:
The result with counts of pass and fail. For example:
{
"pass": 4,
"fail": 1,
"testSuites": [
{
"name": "Test Params",
"pass": 4,
"fail": 1,
"tests": [
{
"name": "Set channels 2",
"result": "pass",
"error": ""
},
{
"name": "Set rate 48000",
"result": "fail",
"error": "Set rate 48000 but got 44100"
}
]
}
]
}
"""
result['pass'] = 0
result['fail'] = 0
for suite in result['testSuites']:
suite['pass'] = 0
suite['fail'] = 0
for test in suite['tests']:
suite[test['result']] += 1
result['pass'] += suite['pass']
result['fail'] += suite['fail']
return result
def print_result(self, result):
"""Prints the test results.
Args:
result: A result from summarize.
"""
print '%d passed, %d failed' % (result['pass'], result['fail'])
self.show_dev_info()
for suite in result['testSuites']:
print suite['name']
for test in suite['tests']:
msg = test['name'] + ': ' + test['result']
if test['result'] == 'fail':
msg += ' - ' + test['error']
print '\t' + msg
def check_type(stream):
"""Check stream type. Raise error if it is not an available type."""
if stream not in ['PLAYBACK', 'CAPTURE']:
msg = stream + ' is not an available type.'
raise argparse.ArgumentTypeError(msg)
return stream
def main():
description = """
Test basic funtion of alsa pcm device automatically.
It is a script for alsa_conformance_test.
"""
parser = argparse.ArgumentParser(description=description)
parser.add_argument('device', help='Alsa pcm device, such as hw:0,0')
parser.add_argument(
'stream',
help='Alsa pcm stream type (PLAYBACK or CAPTURE)',
type=check_type)
parser.add_argument(
'--json', action='store_true', help='Print result in JSON format')
parser.add_argument('--log-file', help='The file to save logs.')
args = parser.parse_args()
if args.log_file is not None:
logging.basicConfig(
level=logging.DEBUG, filename=args.log_file, filemode='w')
tester = AlsaConformanceTester(args.device, args.stream)
tester.test(args.json)
if __name__ == '__main__':
main()