| # Copyright 2014 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """A factory test for the audio function. |
| |
| Description |
| ----------- |
| This test perform tests on audio playback and recording devices. It supports 2 |
| loopback modes: |
| |
| 1. Loop from headphone out to headphone in. |
| 2. Loop from speaker to digital microphone. |
| |
| And 3 test scenarios: |
| |
| 1. Audiofun test, which plays different tones and checks recorded frequency. |
| This test can be conducted simultaneously on different devices. This test can |
| not be conducted with dongle inserted. |
| 2. Sinewav test, which plays simple sine wav and checks if the recorded |
| frequency is in the range specified. Optionally checks the RMS and amplitude |
| thresholds. |
| 3. Noise test, which plays nothing and record, then checks the RMS and amplitude |
| thresholds. |
| |
| Since this test is sensitive to different loopback dongles, user can set a list |
| of output volume candidates. The test can pass if it can pass at any one of |
| output volume candidates. |
| |
| Test Procedure |
| -------------- |
| 1. Operator inserts the dongle (if required). |
| 2. The playback starts automatically, and analyze recordings afterward. |
| |
| Dependency |
| ---------- |
| - Device API ``cros.factory.device.audio``. |
| |
| Examples |
| -------- |
| Here are some test list examples for different test cases. First, you need to |
| figure out the particular input/output device you want to perform test on. |
| |
| To find the audio card name, use the command ``arecord -l`` or ``aplay -l``. |
| |
| For instance, if ``arecord -l`` shows as ``card 0: kblrt5514rt5663 |
| [kblrt5514rt5663max], device 1: Audio Record (*)``, then your audio card name |
| is ``kblrt5514rt5663max`` or ``0`` and your device index is ``1``. In test list |
| argument, input_dev is ["kblrt5514rt5663max", "1"] or ["0", "1"]. |
| |
| We also have some alias for common input and output devices: |
| |
| - The input_dev can be ["kblrt5514rt5663max", "Dmic"], |
| ["kblrt5514rt5663max", "Dmic2"], or ["kblrt5514rt5663max", "Extmic"]. |
| - The output_dev can be ["kblrt5514rt5663max", "Headphone"], |
| or ["kblrt5514rt5663max", "Speaker"]. |
| |
| We use the minimal volume_gain and lower frequency to protect ears in the |
| examples. Use **default volume_gain and frequency in production** to achieve |
| higher accuracy. |
| |
| To run Audiofuntest on external mic (default) and speakers: |
| |
| .. test_list:: |
| |
| generic_audio_examples:AudioTests.SpeakerExtmic |
| |
| To run Audiofuntest on internal mics and speaker channel 0: |
| |
| .. test_list:: |
| |
| generic_audio_examples:AudioTests.SpeakerChannel0DMic |
| |
| To run noise test on internal mics and speaker: |
| |
| .. test_list:: |
| |
| generic_audio_examples:AudioTests.SpeakerDMicNoiseTest |
| |
| To run sine wave test on internal mics and speaker: |
| |
| .. test_list:: |
| |
| generic_audio_examples:AudioTests.SpeakerDMicSineWaveTest |
| |
| """ |
| |
| import collections |
| import io |
| import logging |
| import os |
| import re |
| import time |
| from typing import Optional |
| |
| from cros.factory.cli import image_tool |
| from cros.factory.device.audio import base |
| from cros.factory.device import device_utils |
| from cros.factory.test.env import paths |
| from cros.factory.test import session |
| from cros.factory.test import test_case |
| from cros.factory.test.utils import audio_utils |
| from cros.factory.testlog import testlog |
| from cros.factory.testlog import testlog_utils |
| from cros.factory.utils.arg_utils import Arg |
| from cros.factory.utils import file_utils |
| from cros.factory.utils import process_utils |
| from cros.factory.utils.schema import JSONSchemaDict |
| |
| |
| # Default setting |
| _DEFAULT_FREQ_HZ = 1000 |
| |
| # the additional duration(secs) for sine tone to playback. |
| _DEFAULT_SINEWAV_DURATION_MARGIN = 8 |
| |
| # Regular expressions to match audiofuntest message. |
| _AUDIOFUNTEST_MIC_CHANNEL_RE = re.compile(r'.*Microphone channels:\s*(.*)$') |
| _AUDIOFUNTEST_SUCCESS_RATE_RE = re.compile( |
| r'.*channel\s*=\s*([0-9]*),.*rate\s*=\s*(.*)$') |
| _AUDIOFUNTEST_RUN_START_RE = re.compile('carrier') |
| |
| # Default minimum success rate of audiofun test to pass. |
| _DEFAULT_AUDIOFUN_TEST_THRESHOLD = 50 |
| # Default iterations to do the audiofun test. |
| _DEFAULT_AUDIOFUN_TEST_ITERATION = 10 |
| # Default channels of the output_dev to be tested. |
| _DEFAULT_AUDIOFUN_TEST_OUTPUT_CHANNELS = [0, 1] |
| # Default audio gain used for audiofuntest. |
| _DEFAULT_AUDIOFUN_TEST_VOLUME_GAIN = 100 |
| # Default sample format used by audiofuntest, s16 = Signed 16 Bit. |
| _DEFAULT_AUDIOFUN_TEST_SAMPLE_FORMAT = 's16' |
| # Default sample format used to play audio, s16 = Signed 16 Bit. |
| _DEFAULT_AUDIOFUN_TEST_PLAYER_FORMAT = 's16' |
| # Default channels of the input_dev to be tested. |
| _DEFAULT_TEST_INPUT_CHANNELS = [0, 1] |
| # Default channels of the output_dev to be tested. |
| _DEFAULT_TEST_OUTPUT_CHANNELS = [0, 1] |
| # Default duration to do the sinewav test, in seconds. |
| _DEFAULT_SINEWAV_TEST_DURATION = 2 |
| # Default frequency tolerance, in Hz. |
| _DEFAULT_SINEWAV_FREQ_THRESHOLD = 50 |
| # Default duration to do the noise test, in seconds. |
| _DEFAULT_NOISE_TEST_DURATION = 1 |
| # Default RMS thresholds when checking recorded file. |
| _DEFAULT_SOX_RMS_THRESHOLD = (0.08, None) |
| # Default Amplitude thresholds when checking recorded file. |
| _DEFAULT_SOX_AMPLITUDE_THRESHOLD = (None, None) |
| # Default Max Delta thresholds when checking recorded file. |
| _DEFAULT_SOX_MAX_DELTA_THRESHOLD = (None, None) |
| # Default RMS threshold ratio relative to volume_gain when testing audiofuntest. |
| _DEFAULT_AUDIOFUNTEST_RMS_THRESHOLD_RATIO_RELATIVE_TO_VOLUME_GAIN = 0.0015 |
| # Default minimum RMS threshold when testing audiofuntest. |
| _DEFAULT_AUDIOFUNTEST_MINIMUM_RMS_THRESHOLD = 0.04 |
| # Default duration in seconds to trim in the beginning of recorded file. |
| _DEFAULT_TRIM_SECONDS = 0.5 |
| # Default minimum frequency. |
| _DEFAULT_MIN_FREQUENCY = 4000 |
| # Default maximum frequency. |
| _DEFAULT_MAX_FREQUENCY = 10000 |
| |
| _ARG_INPUT_DEVICE_SCHEMA = JSONSchemaDict( |
| 'input_dev schema object', { |
| 'type': 'array', |
| 'items': [{ |
| 'type': 'string' |
| }, { |
| 'anyOf': [{ |
| 'type': 'string', |
| 'pattern': '^[0-9]+$' |
| }, { |
| 'type': 'string', |
| 'enum': list(base.InputDevices.__members__) |
| }] |
| }], |
| 'minItems': 2, |
| 'maxItems': 2 |
| }) |
| |
| _ARG_OUTPUT_DEVICE_SCHEMA = JSONSchemaDict( |
| 'output_dev schema object', { |
| 'type': 'array', |
| 'items': [{ |
| 'type': 'string' |
| }, { |
| 'anyOf': [{ |
| 'type': 'string', |
| 'pattern': '^[0-9]+$' |
| }, { |
| 'type': 'string', |
| 'enum': list(base.OutputDevices.__members__) |
| }] |
| }], |
| 'minItems': 2, |
| 'maxItems': 2 |
| }) |
| |
| _ARG_CHANNELS_SCHEMA_DICT = { |
| 'type': 'array', |
| 'items': { |
| 'type': ['number'] |
| } |
| } |
| |
| _ARG_RANGE_THRESHOLD_SCHEMA_DICT = { |
| 'type': 'array', |
| 'items': { |
| 'type': ['number', 'null'] |
| }, |
| 'minItems': 2, |
| 'maxItems': 2 |
| } |
| |
| _FREQUENCY_STRATEGY = collections.OrderedDict({ |
| 'serial': 'serial', |
| 'low_hi_uniform': 'serial', |
| 'random': 'random', |
| 'pure_random': 'random', |
| 'step': 'step', |
| 'low_hi_uni_rand_variation': 'step', |
| }) |
| |
| |
| def GetOneOfMessages(input_list): |
| if len(input_list) == 1: |
| return f'One of ``{input_list[0]}``' |
| return 'One of' + ''.join( |
| f' ``{x}``,' for x in input_list[:-1]) + f' or ``{input_list[-1]}``' |
| |
| |
| def GetDescriptiveFrequencyStrategyMessages(): |
| return ''.join(f' - ``{name}`` is an alias of ``{value}``\n' |
| for name, value in _FREQUENCY_STRATEGY.items() |
| if name != value) |
| |
| |
| _ARG_TESTS_TO_CONDUCT_SCHEMA = JSONSchemaDict( |
| 'tests_to_conduct schema', { |
| 'type': 'array', |
| 'items': { |
| 'type': |
| 'object', |
| 'oneOf': [{ |
| 'properties': { |
| 'type': { |
| 'type': 'string', |
| 'enum': ['audiofun'] |
| }, |
| 'iteration': { |
| 'type': 'integer' |
| }, |
| 'threshold': { |
| 'type': 'number' |
| }, |
| 'input_channels': _ARG_CHANNELS_SCHEMA_DICT, |
| 'output_channels': _ARG_CHANNELS_SCHEMA_DICT, |
| 'volume_gain': { |
| 'type': 'number', |
| 'minimum': 0, |
| 'maximum': 100 |
| }, |
| 'input_gain': { |
| 'type': 'number' |
| }, |
| 'sample_format': { |
| 'type': 'string', |
| 'enum': ['u8', 's16', 's24', 's32'] |
| }, |
| 'player_format': { |
| 'type': 'string', |
| 'enum': ['u8', 's16', 's24', 's32'] |
| }, |
| 'min_frequency': { |
| 'type': 'number' |
| }, |
| 'max_frequency': { |
| 'type': 'number' |
| }, |
| 'frequency_sample_strategy': { |
| 'enum': list(_FREQUENCY_STRATEGY) |
| }, |
| 'rms_threshold': { |
| 'type': 'number' |
| } |
| }, |
| 'additionalProperties': False, |
| 'required': ['type'] |
| }, { |
| 'properties': { |
| 'type': { |
| 'type': 'string', |
| 'enum': ['sinewav'] |
| }, |
| 'duration': { |
| 'type': 'number', |
| }, |
| 'input_channels': _ARG_CHANNELS_SCHEMA_DICT, |
| 'output_channels': _ARG_CHANNELS_SCHEMA_DICT, |
| 'freq_threshold': { |
| 'type': 'number' |
| }, |
| 'rms_threshold': _ARG_RANGE_THRESHOLD_SCHEMA_DICT, |
| 'amplitude_threshold': _ARG_RANGE_THRESHOLD_SCHEMA_DICT, |
| 'max_delta_threshold': _ARG_RANGE_THRESHOLD_SCHEMA_DICT |
| }, |
| 'additionalProperties': False, |
| 'required': ['type'] |
| }, { |
| 'properties': { |
| 'type': { |
| 'type': 'string', |
| 'enum': ['noise'] |
| }, |
| 'duration': { |
| 'type': 'number' |
| }, |
| 'input_channels': _ARG_CHANNELS_SCHEMA_DICT, |
| 'rms_threshold': _ARG_RANGE_THRESHOLD_SCHEMA_DICT, |
| 'amplitude_threshold': _ARG_RANGE_THRESHOLD_SCHEMA_DICT, |
| 'max_delta_threshold': _ARG_RANGE_THRESHOLD_SCHEMA_DICT |
| }, |
| 'additionalProperties': False, |
| 'required': ['type'] |
| }] |
| } |
| }) |
| |
| |
| class AudioLoopTest(test_case.TestCase): |
| """Audio Loop test to test two kind of situations. |
| 1. Speaker to digital microphone. |
| 2. Headphone out to headphone in. |
| """ |
| related_components = ( |
| test_case.TestCategory.AUDIOCODEC, |
| test_case.TestCategory.SMART_SPEAKER_AMPLIFIER, |
| test_case.TestCategory.SPEAKERAMPLIFIER, |
| ) |
| ARGS = [ |
| Arg('audio_conf', str, 'Audio config file path', default=None), |
| Arg( |
| 'initial_actions', list, |
| 'List of [card, actions]. If actions is None, the Initialize method ' |
| 'will be invoked.', default=None), |
| Arg( |
| 'input_dev', list, 'Input ALSA device. [card_name, sub_device]. ' |
| 'For example: ["audio_card", "0"]. The sub_device could be a string ' |
| f'of an integer or one of {list(base.InputDevices.__members__)!r}.' |
| 'If this argument is a string of an integer then it represents the ' |
| 'PCM Id. Otherwise the test will find the PCM Id from UCM config ' |
| 'using this argument as the keyword.', default=['0', '0'], |
| schema=_ARG_INPUT_DEVICE_SCHEMA), |
| Arg('num_input_channels', int, 'Number of input channels.', default=2), |
| Arg( |
| 'output_dev', list, 'Output ALSA device. [card_name, sub_device]. ' |
| 'For example: ["audio_card", "0"]. The sub_device could be a string ' |
| f'of an integer or one of {list(base.OutputDevices.__members__)!r}.' |
| 'If this argument is a string of an integer then it represents the ' |
| 'PCM Id. Otherwise the test will find the PCM Id from UCM config ' |
| 'using this argument as the keyword.', default=['0', '0'], |
| schema=_ARG_OUTPUT_DEVICE_SCHEMA), |
| Arg('num_output_channels', int, 'Number of output channels.', default=2), |
| Arg('output_volume', (int, list), |
| 'An int of output volume or a list of output volume candidates', |
| default=None), |
| Arg('autostart', bool, 'Auto start option', default=False), |
| Arg('require_dongle', bool, 'Require dongle option', default=False), |
| Arg('check_dongle', bool, |
| 'Check dongle status whether match require_dongle', default=False), |
| Arg('check_cras', bool, 'Do we need to check if CRAS is running', |
| default=True), |
| Arg('cras_enabled', bool, 'Whether cras should be running or not', |
| default=True), |
| Arg('mic_source', base.InputDevices, 'Microphone source', |
| default=base.InputDevices.Extmic), |
| Arg( |
| 'test_title', str, 'Title on the test screen.' |
| 'It can be used to tell operators the test info' |
| 'For example: "LRGM Mic", "LRMG Mic"', default=''), |
| Arg('mic_jack_type', str, 'Microphone jack Type: nocheck, lrgm, lrmg', |
| default='nocheck'), |
| Arg('audiofuntest_run_delay', (int, float), |
| 'Delay between consecutive calls to audiofuntest', default=None), |
| Arg('input_rate', int, |
| ('The input sample rate for audio test. The value should be ' |
| 'determined by input device.'), default=48000), |
| Arg('output_rate', int, |
| ('The output sample rate for audio test. The value should be ' |
| 'determined by output device.'), default=48000), |
| Arg('check_conformance', bool, 'Check conformance or not.', default=True), |
| Arg('conformance_rate_criteria', float, |
| ('The pass criteria of rate. The value is a percentage of rate. See m' |
| 'ore detail in `alsa_conformance.go`.'), default=0.1), |
| Arg('conformance_rate_err_criteria', int, |
| ('The pass criteria of rate error. See more detail in `alsa_conforma' |
| 'nce.go`.'), default=100), |
| Arg( |
| 'tests_to_conduct', list, |
| 'A list of dicts. A dict should contain at least one key named\n' |
| '**type** indicating the test type, which can be **audiofun**,\n' |
| '**sinewav**, or **noise**.\n' |
| '\n' |
| 'If type is **audiofun**, the dict can optionally contain:\n' |
| ' - **iteration**: Iterations to run the test.\n' |
| ' - **threshold**: The minimum success rate to pass the test.\n' |
| ' - **input_channels**: A list of input channels to be tested.\n' |
| ' - **output_channels**: A list of output channels to be tested.\n' |
| ' - **volume_gain**: The volume gain set to audiofuntest for \n' |
| ' controlling the volume of generated audio frames. The \n' |
| ' range is from 0 to 100.\n' |
| ' - **input_gain**: The volume gain for sox recorder command.\n' |
| ' The value should be in "dB", you can see the value \n' |
| ' suggested by CRAS with command \n' |
| ' `cras_test_client --dump_server_info`, check the "Gain" \n' |
| ' column.' |
| ' - **sample_format**: The sample format for audiofuntest. \n' |
| ' See -t section in audiofuntest manual.\n' |
| ' - **player_format**: The sample format for output device.\n' |
| ' - **min_frequency**: The minimum frequency set to audiofuntest.\n' |
| ' - **max_frequency**: The maximum frequency set to audiofuntest.\n' |
| ' - **frequency_sample_strategy**: ' |
| f'{GetOneOfMessages(list(_FREQUENCY_STRATEGY))}. See ' |
| '``audiofuntest -h`` for explanations.\n' |
| f'{GetDescriptiveFrequencyStrategyMessages()}' |
| '\n' |
| 'If type is **sinewav**, the dict can optionally contain:\n' |
| ' - **duration**: The test duration, in seconds.\n' |
| ' - **input_channels**: A list of input channels to be tested.\n' |
| ' - **freq_threshold**: Acceptable frequency margin.\n' |
| ' - **rms_threshold**: **[min, max]** that will make\n' |
| ' sure the following inequality is true: *min <= recorded\n' |
| ' audio RMS (root mean square) value <= max*, otherwise,\n' |
| ' fail the test. Both of **min** and **max** can be set to\n' |
| ' None, which means no limit.\n' |
| ' - **amplitude_threshold**: **[min, max]** and it will\n' |
| ' make sure the inequality is true: *min <= minimum measured\n' |
| ' amplitude <= maximum measured amplitude <= max*,\n' |
| ' otherwise, fail the test. Both of **min** and **max** can\n' |
| ' be set to None, which means no limit.\n' |
| ' - **max_delta_threshold**: **[min, max]** and it will\n' |
| ' make sure the inequality is true: *min <= maximum measured\n' |
| ' delta <= max*, otherwise, fail the test. Both of **min** \n' |
| ' and **max** can be set to None, which means no limit.\n' |
| '\n' |
| 'If type is **noise**, the dict can optionally contain:\n' |
| ' - **duration**: The test duration, in seconds.\n' |
| ' - **rms_threshold**: **[min, max]** that will make\n' |
| ' sure the following inequality is true: *min <= recorded\n' |
| ' audio RMS (root mean square) value <= max*, otherwise,\n' |
| ' fail the test. Both of **min** and **max** can be set to\n' |
| ' None, which means no limit.\n' |
| ' - **amplitude_threshold**: **[min, max]** and it will\n' |
| ' make sure the inequality is true: *min <= minimum measured\n' |
| ' amplitude <= maximum measured amplitude <= max*,\n' |
| ' otherwise, fail the test. Both of **min** and **max** can\n' |
| ' be set to None, which means no limit.' |
| ' - **max_delta_threshold**: **[min, max]** and it will\n' |
| ' make sure the inequality is true: *min <= maximum measured\n' |
| ' delta <= max*, otherwise, fail the test. Both of **min** \n' |
| ' and **max** can be set to None, which means no limit.\n', |
| schema=_ARG_TESTS_TO_CONDUCT_SCHEMA), |
| Arg('keep_raw_logs', bool, |
| 'Whether to attach the audio by Testlog when the test fail.', |
| default=True) |
| ] |
| |
| def GetAudio(self) -> base.AbstractAudioControl: |
| return self._dut.audio |
| |
| def setUp(self): |
| self._dut = device_utils.CreateDUTInterface() |
| # yapf: disable |
| if self.args.audio_conf: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| self.GetAudio().LoadConfig(self.args.audio_conf) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| |
| # yapf: disable |
| self._output_volumes = self.args.output_volume # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| if not isinstance(self._output_volumes, list): |
| self._output_volumes = [self._output_volumes] |
| self._output_volume_index = 0 |
| |
| # The test results under each output volume candidate. |
| # If any one of tests to conduct fails, test fails under that output |
| # volume candidate. If test fails under all output volume candidates, |
| # the whole test fails. |
| self._test_results = [True] * len(self._output_volumes) |
| self._test_message = [] |
| |
| self._mic_jack_type = { |
| 'nocheck': None, |
| 'lrgm': base.MicJackType.lrgm, |
| 'lrmg': base.MicJackType.lrmg |
| # yapf: disable |
| }[self.args.mic_jack_type] # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| |
| # yapf: disable |
| if self.args.initial_actions is None: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| self.GetAudio().Initialize() |
| else: |
| # yapf: disable |
| for card, action in self.args.initial_actions: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| if card.isdigit() is False: |
| card = self.GetAudio().GetCardIndexByName(card) |
| if action is None: |
| self.GetAudio().Initialize(card) |
| else: |
| self.GetAudio().ApplyAudioConfig(action, card) |
| |
| # Transfer input and output device format |
| # yapf: disable |
| self._in_card = self.GetAudio().GetCardIndexByName(self.args.input_dev[0]) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| self._in_channel_map = _DEFAULT_TEST_INPUT_CHANNELS |
| # yapf: disable |
| if self.args.input_dev[1].isdigit(): # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| self._in_device = self.args.input_dev[1] # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| else: |
| # Detect _in_device from ucm config. |
| self._in_device = self.GetAudio().config_mgr.GetPCMId( |
| # yapf: disable |
| 'CapturePCM', self.args.input_dev[1], self._in_card) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| channels_from_ucm_config = self.GetAudio().config_mgr.GetChannelMap( |
| # yapf: disable |
| self.args.input_dev[1], self._in_card) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| if channels_from_ucm_config is not None: |
| self._in_channel_map = channels_from_ucm_config |
| |
| # yapf: disable |
| self._out_card = self.GetAudio().GetCardIndexByName(self.args.output_dev[0]) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| if self.args.output_dev[1].isdigit(): # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| self._out_device = self.args.output_dev[1] # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| else: |
| # Detect _out_device from ucm config. |
| self._out_device = self.GetAudio().config_mgr.GetPCMId( |
| # yapf: disable |
| 'PlaybackPCM', self.args.output_dev[1], self._out_card) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| |
| # Backward compatible for non-porting case, which use ALSA device name. |
| # only works on chromebook device |
| # TODO(mojahsu) Remove them later. |
| self._alsa_input_device = f'hw:{self._in_card},{self._in_device}' |
| self._alsa_output_device = f'hw:{self._out_card},{self._out_device}' |
| |
| # yapf: disable |
| if self.args.check_cras: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # Check cras status |
| # yapf: disable |
| if self.args.cras_enabled: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| cras_status = 'start/running' |
| else: |
| cras_status = 'stop/waiting' |
| self.assertIn( |
| cras_status, self._dut.CallOutput(['status', 'cras']), |
| f'cras status is wrong (expected status: {cras_status}). Please make ' |
| f'sure that you have appropriate setting for \'"disable_services": ' |
| f'["cras"]\' in the test item.') |
| self._dut_temp_dir = self._dut.temp.mktemp(True, '', 'audio_loop') |
| |
| # If the test fails, attach the audio file; otherwise, remove it. |
| self._audio_file_path = [] |
| |
| # We only need to read & apply the input gain from ucm for audiofuntest. |
| self._default_input_gain = 0 |
| |
| def tearDown(self): |
| self.GetAudio().RestoreMixerControls() |
| self._dut.CheckCall(['rm', '-rf', self._dut_temp_dir]) |
| |
| def runTest(self): |
| # If autostart, JS triggers start_run_test event. |
| # Otherwise, it binds start_run_test with 's' key pressed. |
| # yapf: disable |
| self.ui.CallJSFunction('init', # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| self.args.require_dongle, self.args.test_title) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| if self.args.autostart: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| self.ui.RunJS('window.template.innerHTML = "";') # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| else: |
| # yapf: disable |
| self.ui.WaitKeysOnce('S') # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| |
| self.CheckDongleStatus() |
| self.SetupAudio() |
| # yapf: disable |
| if self.args.check_conformance: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| self.CheckConformance() |
| |
| # yapf: disable |
| if not self.args.tests_to_conduct: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| logging.info('No audio loop test to be conducted because the argument ' |
| '\'tests_to_conduct\' is empty') |
| return |
| |
| |
| # Run each tests to conduct under each output volume candidate. |
| for self._output_volume_index, output_volume in enumerate( |
| self._output_volumes): |
| |
| if output_volume is not None: |
| # yapf: disable |
| if self.args.require_dongle: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| self.GetAudio().SetHeadphoneVolume(output_volume, self._out_card) |
| else: |
| self.GetAudio().SetSpeakerVolume(output_volume, self._out_card) |
| |
| # yapf: disable |
| for test in self.args.tests_to_conduct: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| if test['type'] == 'audiofun': |
| # Read input_gain from ucm for audiofuntest. |
| ucm_config_mgr = self.GetAudio().ucm_config_mgr |
| self._default_input_gain = ucm_config_mgr.GetDefaultInputGain( |
| self._in_card) |
| self.AudioFunTest(test) |
| elif test['type'] == 'sinewav': |
| self.SinewavTest(test) |
| elif test['type'] == 'noise': |
| self.NoiseTest(test) |
| else: |
| raise ValueError(f"Test type \"{test['type']}\" not supported.") |
| |
| if self.MayPassTest(): |
| # yapf: disable |
| self.ui.CallJSFunction('testPassResult') # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| self.Sleep(0.5) |
| for file_path in self._audio_file_path: |
| os.unlink(file_path) |
| return |
| |
| # yapf: disable |
| if self.args.keep_raw_logs: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| try: |
| for file_path in self._audio_file_path: |
| testlog.AttachFile(path=file_path, mime_type='audio/x-raw', |
| name=os.path.basename(file_path), |
| description='audio of the test', delete=True) |
| except testlog_utils.TestlogError as err: |
| self.AppendErrorMessage(str(err)) |
| else: |
| for file_path in self._audio_file_path: |
| os.unlink(file_path) |
| |
| self.FailTest() |
| |
| |
| def AppendErrorMessage(self, error_message): |
| """Sets the test result to fail and append a new error message.""" |
| self._test_results[self._output_volume_index] = False |
| self._test_message.append( |
| f'Under output volume ' |
| f'{self._output_volumes[self._output_volume_index]!r}') |
| self._test_message.append(error_message) |
| session.console.error(error_message) |
| |
| def _MatchPatternLines(self, in_stream, re_pattern, num_lines=None): |
| """Try to match the re pattern in the given number of lines. |
| |
| Try to read lines one-by-one from input stream and perform re matching. |
| Stop when matching successes or reaching the number of lines limit. |
| |
| Args: |
| in_stream: input stream to read from. |
| re_pattern: re pattern used for matching. |
| num_lines: maximum number of lines to stop for matching. |
| None for read until end of input stream. |
| """ |
| num_read = 0 |
| while True: |
| line = in_stream.readline() |
| if not line: |
| return None |
| num_read += 1 |
| m = re_pattern.match(line) |
| if m is not None: |
| return m |
| if num_lines is not None and num_read >= num_lines: |
| return None |
| |
| def _ParseSingleRunOutput(self, audiofun_output, input_channels): |
| """Parse a single run output from audiofuntest |
| |
| Sample single run output: |
| O: channel = 0, success = 1, fail = 0, rate = 100.0 |
| X: channel = 1, success = 0, fail = 1, rate = 0.0 |
| |
| Args: |
| audiofun_output: output stream of audiofuntest to parse from |
| input_channels: a list of mic channels used for testing |
| """ |
| |
| all_channel_rate = {} |
| for expected_channel in input_channels: |
| m = self._MatchPatternLines(audiofun_output, |
| _AUDIOFUNTEST_SUCCESS_RATE_RE, 1) |
| if m is None or int(m.group(1)) != expected_channel: |
| self.AppendErrorMessage( |
| f'Failed to get expected {int(expected_channel)} channel output ' |
| f'from audiofuntest') |
| return None |
| all_channel_rate[expected_channel] = float(m.group(2)) |
| return all_channel_rate |
| |
| def _AudioFunTestWithOutputChannel(self, test_arg, input_rate, output_rate, |
| output_channel): |
| """Runs audiofuntest program to get the frequency from microphone |
| immediately according to speaker and microphone setting. |
| |
| Sample audiofuntest message: |
| Config values. |
| Player parameter: aplay -r 48000 -f s16 -t raw -c 2 -B 0 - |
| Recorder parameter: arecord -r 48000 -f s16 -t raw -c 2 -B 0 - |
| Player FIFO name: |
| Recorder FIFO name: |
| Number of test rounds: 20 |
| Pass threshold: 3 |
| Allowed delay: 1200(ms) |
| Sample rate: 48000 |
| FFT size: 2048 |
| Microphone channels: 2 |
| Speaker channels: 2 |
| Microphone active channels: 0, 1, |
| Speaker active channels: 0, 1, |
| Tone length (in second): 3.00 |
| Volume range: 1.00 ~ 1.00 |
| carrier = 119 |
| O: channel = 0, success = 1, fail = 0, rate = 100.0 |
| X: channel = 1, success = 0, fail = 1, rate = 0.0 |
| carrier = 89 |
| O: channel = 0, success = 2, fail = 0, rate = 100.0 |
| X: channel = 1, success = 1, fail = 1, rate = 50.0 |
| |
| Args: |
| test_arg: The arguments of the current test. |
| input_rate: bit rate for input device |
| output_rate: bit rate for output device |
| output_channel: output device channel used for testing |
| """ |
| |
| session.console.info('Test output channel %d', output_channel) |
| |
| iteration = test_arg.get('iteration', _DEFAULT_AUDIOFUN_TEST_ITERATION) |
| |
| volume_gain = test_arg.get('volume_gain', |
| _DEFAULT_AUDIOFUN_TEST_VOLUME_GAIN) |
| self.assertTrue(0 <= volume_gain <= 100) |
| |
| audiofuntest_sample_format = test_arg.get( |
| 'sample_format', _DEFAULT_AUDIOFUN_TEST_SAMPLE_FORMAT).lower() |
| audiofuntest_bits = int(audiofuntest_sample_format[1:]) |
| if audiofuntest_sample_format.startswith('s'): |
| audiofuntest_encoding = 'signed' |
| elif audiofuntest_sample_format.startswith('u'): |
| audiofuntest_encoding = 'unsigned' |
| else: |
| raise ValueError('Unknown audiofuntest encoding') |
| |
| player_sample_format = test_arg.get( |
| 'player_format', _DEFAULT_AUDIOFUN_TEST_PLAYER_FORMAT).lower() |
| player_bits = int(player_sample_format[1:]) |
| if player_sample_format.startswith('s'): |
| player_encoding = 'signed' |
| elif player_sample_format.startswith('u'): |
| player_encoding = 'unsigned' |
| else: |
| raise ValueError('Unknown player encoding') |
| |
| min_frequency = test_arg.get('min_frequency', _DEFAULT_MIN_FREQUENCY) |
| self.assertGreaterEqual(min_frequency, 0) |
| max_frequency = test_arg.get('max_frequency', _DEFAULT_MAX_FREQUENCY) |
| self.assertLessEqual(min_frequency, max_frequency) |
| |
| player_cmd = ( |
| # yapf: disable |
| f'sox -b{audiofuntest_bits:d} -c{self.args.num_output_channels:d} -e' # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| f'{audiofuntest_encoding} -r{output_rate:d} -traw - -b{player_bits:d} ' |
| f'-e{player_encoding} -talsa {self._alsa_output_device}') |
| |
| input_gain = test_arg.get('input_gain', self._default_input_gain) |
| input_channels = test_arg.get('input_channels', self._in_channel_map) |
| recorder_cmd = ( |
| f'sox -talsa {self._alsa_input_device} -b{audiofuntest_bits:d} -c' |
| f'{len(input_channels)} -e{audiofuntest_encoding} -r{input_rate:d}' |
| f' -traw - remix {" ".join(str(x+1) for x in input_channels)} gain' |
| f' {input_gain}') |
| |
| default_rms_threshold = max( |
| volume_gain * |
| _DEFAULT_AUDIOFUNTEST_RMS_THRESHOLD_RATIO_RELATIVE_TO_VOLUME_GAIN, |
| _DEFAULT_AUDIOFUNTEST_MINIMUM_RMS_THRESHOLD) |
| rms_threshold = test_arg.get('rms_threshold', default_rms_threshold) |
| help_process = self._dut.Popen([audio_utils.AUDIOFUNTEST_PATH, '-h'], |
| stdout=process_utils.PIPE, |
| stderr=process_utils.PIPE, log=True) |
| unused_help_stdout, help_stderr = help_process.communicate() |
| audiofun_cmd = [ |
| audio_utils.AUDIOFUNTEST_PATH, |
| '-P', |
| player_cmd, |
| '-R', |
| recorder_cmd, |
| '-t', |
| audiofuntest_sample_format, |
| '-I', |
| f'{int(input_rate)}', |
| '-O', |
| f'{int(output_rate)}', |
| '-T', |
| f'{int(iteration)}', |
| '-a', |
| f'{int(output_channel)}', |
| '-c', |
| f'{len(input_channels)}', |
| '-C', |
| # yapf: disable |
| f'{int(self.args.num_output_channels)}', # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| '-g', |
| f'{int(volume_gain)}', # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| '-i', |
| f'{int(min_frequency)}', |
| '-x', |
| f'{int(max_frequency)}', |
| '-p', |
| f'{rms_threshold:f}' |
| ] |
| match = re.search(r'--played-file-path\b', help_stderr) |
| if match: |
| audio_name = f'audiofun_generated_{output_rate}_{output_channel}.raw' |
| played_audio_path = self._dut.path.join(self._dut_temp_dir, audio_name) |
| local_played_audio_path = os.path.join(paths.DATA_TESTS_DIR, |
| session.GetCurrentTestPath(), |
| audio_name) |
| audiofun_cmd.extend(['--played-file-path', played_audio_path]) |
| else: |
| played_audio_path = None |
| local_played_audio_path = None |
| logging.warning("audiofuntest doesn't support '--played-file-path'") |
| |
| match = re.search(r'--recorded-file-path\b', help_stderr) |
| if match: |
| audio_name = f'audiofun_recorded_{input_rate}_{output_channel}.raw' |
| recorded_audio_path = self._dut.path.join(self._dut_temp_dir, audio_name) |
| local_recorded_audio_path = os.path.join(paths.DATA_TESTS_DIR, |
| session.GetCurrentTestPath(), |
| audio_name) |
| audiofun_cmd.extend(['--recorded-file-path', recorded_audio_path]) |
| else: |
| recorded_audio_path = None |
| local_recorded_audio_path = None |
| logging.warning("audiofuntest doesn't support '--recorded-file-path'") |
| |
| frequency_sample_strategy = test_arg.get('frequency_sample_strategy') |
| if frequency_sample_strategy: |
| match = re.search(r'--frequency-sample-strategy\b', help_stderr) |
| if match: |
| audiofun_cmd.extend([ |
| '--frequency-sample-strategy', |
| _FREQUENCY_STRATEGY[frequency_sample_strategy] |
| ]) |
| else: |
| self.FailTask( |
| "audiofuntest doesn't support '--frequency-sample-strategy'") |
| |
| process = self._dut.Popen(audiofun_cmd, stdout=process_utils.PIPE, |
| stderr=process_utils.PIPE, log=True) |
| stdout, stderr = process.communicate() |
| if stdout: |
| logging.info('stdout:\n%s', stdout) |
| if stderr: |
| logging.info('stderr:\n%s', stderr) |
| |
| last_success_rate = None |
| stdout_stream = io.StringIO(stdout) |
| rate_msg = None |
| while self._MatchPatternLines(stdout_stream, |
| _AUDIOFUNTEST_RUN_START_RE) is not None: |
| last_success_rate = self._ParseSingleRunOutput( |
| stdout_stream, list(range(len(input_channels)))) |
| if last_success_rate is None: |
| break |
| rate_msg = ', '.join(f'Mic {int(input_channels[channel])}: {rate:.1f}%' |
| for channel, rate in last_success_rate.items()) |
| # yapf: disable |
| self.ui.CallJSFunction('testInProgress', rate_msg) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| |
| threshold = test_arg.get('threshold', _DEFAULT_AUDIOFUN_TEST_THRESHOLD) |
| |
| success = True |
| if last_success_rate is None: |
| self.AppendErrorMessage('Failed to parse audiofuntest output') |
| success = False |
| elif any(rate < threshold for rate in last_success_rate.values()): |
| self.AppendErrorMessage( |
| f'For output device channel {output_channel}, the success rate is ' |
| f'"{rate_msg}", too low!') |
| # yapf: disable |
| self.ui.CallJSFunction('testFailResult', rate_msg) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| success = False |
| self.Sleep(1) |
| |
| if not success: |
| if played_audio_path and self._dut.path.exists(played_audio_path): |
| self._dut.link.Pull(played_audio_path, local_played_audio_path) |
| self._audio_file_path.append(local_played_audio_path) |
| |
| if recorded_audio_path and self._dut.path.exists(recorded_audio_path): |
| self._dut.link.Pull(recorded_audio_path, local_recorded_audio_path) |
| self._audio_file_path.append(local_recorded_audio_path) |
| |
| def _CheckChannelArgs(self, output_channels): |
| # yapf: disable |
| if self.args.num_output_channels < max(output_channels): # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| raise ValueError('Incorrect number of output channels') |
| |
| def AudioFunTest(self, test_arg): |
| """Setup speaker and microphone test pairs and run audiofuntest program.""" |
| |
| session.console.info('Run audiofuntest from %r to %r', |
| self._alsa_output_device, self._alsa_input_device) |
| |
| output_channels = test_arg.get('output_channels', |
| _DEFAULT_AUDIOFUN_TEST_OUTPUT_CHANNELS) |
| |
| self._CheckChannelArgs(output_channels) |
| |
| for output_channel in output_channels: |
| # yapf: disable |
| self._AudioFunTestWithOutputChannel(test_arg, self.args.input_rate, # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| self.args.output_rate, output_channel) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| if self.args.audiofuntest_run_delay is not None: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| self.Sleep(self.args.audiofuntest_run_delay) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| |
| def _GenerateSinewav(self, dut_file_path, channel, wav_duration): |
| """Generate sine .wav file locally and push it to the DUT. |
| """ |
| with file_utils.UnopenedTemporaryFile(suffix='.wav') as file_path: |
| cmd = audio_utils.GetGenerateSineWavArgs( |
| file_path, |
| channel, |
| # yapf: disable |
| self.args.output_rate, # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| _DEFAULT_FREQ_HZ, |
| wav_duration) |
| process_utils.Spawn(cmd.split(' '), log=True, check_call=True) |
| self._dut.link.Push(file_path, dut_file_path) |
| |
| def SinewavTest(self, test_arg): |
| """Play sinewav, record it and check if it meets the requirements. |
| """ |
| # yapf: disable |
| self.ui.CallJSFunction('testInProgress', None) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| |
| duration = test_arg.get('duration', _DEFAULT_SINEWAV_TEST_DURATION) |
| wav_duration = duration + _DEFAULT_SINEWAV_DURATION_MARGIN |
| |
| output_channels = test_arg.get('output_channels', |
| _DEFAULT_TEST_OUTPUT_CHANNELS) |
| |
| for output_channel in output_channels: |
| volume = self._output_volumes[self._output_volume_index] |
| record_file_path = ( |
| f'/tmp/record-{volume}-{output_channel}-{time.time()}.raw') |
| with self._dut.temp.TempFile() as dut_sine_wav_path: |
| session.console.info('DUT sine wav path %s', dut_sine_wav_path) |
| # It's hard to estimate the overhead in audio record thing of different |
| # platform, To make sure we can record the whole sine tone in the record |
| # duration, we will playback a long period sine tone, and stop the |
| # playback process after we finish recording. |
| self._GenerateSinewav(dut_sine_wav_path, output_channel, wav_duration) |
| self.GetAudio().PlaybackWavFile(dut_sine_wav_path, self._out_card, |
| self._out_device, blocking=False) |
| self.RecordAndCheck(test_arg, record_file_path) |
| self.GetAudio().StopPlaybackWavFile() |
| |
| def NoiseTest(self, test_arg): |
| """Record noise and check if it meets the requirements. |
| """ |
| # yapf: disable |
| self.ui.CallJSFunction('testInProgress', None) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| |
| noise_file_path = f'/tmp/noise-{time.time()}.wav' |
| self.RecordAndCheck(test_arg, noise_file_path) |
| |
| def RecordAndCheck(self, test_arg, file_path): |
| """Record a file and check if the stats meet the requirements. |
| |
| Args: |
| test_arg: The arguments of the current test. |
| file_path: The file_path for the recorded file. |
| """ |
| duration = test_arg.get('duration', _DEFAULT_NOISE_TEST_DURATION) |
| input_channels = test_arg.get('input_channels', self._in_channel_map) |
| # Number of channel we need is the maximum channel id in `input_channel`. |
| # Add 1 for 0-based channel id. |
| num_channels = max(input_channels) + 1 |
| self._RecordFile(duration, num_channels, file_path) |
| for channel in input_channels: |
| session.console.info(f'Checking channel {channel} of {file_path}') |
| self._CheckRecordedAudio( |
| test_arg, |
| audio_utils.SoxStatOutput(file_path, num_channels, channel, |
| # yapf: disable |
| self.args.input_rate)) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| self._audio_file_path.append(file_path) |
| |
| def _RecordFile(self, duration, num_channels, file_path): |
| """Records file for *duration* seconds. |
| |
| The caller is responsible for removing the file at last. |
| |
| Args: |
| duration: Recording duration, in seconds. |
| num_channels: The number of the channels for recording. |
| file_path: The file_path for the recorded file. |
| """ |
| session.console.info('RecordFile : %s.', file_path) |
| with file_utils.UnopenedTemporaryFile() as record_path, \ |
| self._dut.temp.TempFile() as dut_record_path: |
| self.GetAudio().RecordRawFile(dut_record_path, self._in_card, |
| self._in_device, duration, num_channels, |
| # yapf: disable |
| self.args.input_rate) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| self._dut.link.Pull(dut_record_path, record_path) |
| audio_utils.TrimAudioFile( |
| in_path=record_path, out_path=file_path, start=_DEFAULT_TRIM_SECONDS, |
| # yapf: disable |
| end=None, num_channels=num_channels, sample_rate=self.args.input_rate) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| |
| def _CheckRecordedAudio(self, test_arg, sox_output): |
| rms_value = audio_utils.GetAudioRms(sox_output) |
| session.console.info('Got audio RMS value: %f.', rms_value) |
| rms_threshold = test_arg.get('rms_threshold', _DEFAULT_SOX_RMS_THRESHOLD) |
| if rms_threshold[0] is not None and rms_threshold[0] > rms_value: |
| self.AppendErrorMessage( |
| f'Audio RMS value {rms_value:f} too low. Minimum pass is ' |
| f'{rms_threshold[0]:f}.') |
| if rms_threshold[1] is not None and rms_threshold[1] < rms_value: |
| self.AppendErrorMessage( |
| f'Audio RMS value {rms_value:f} too high. Maximum pass is ' |
| f'{rms_threshold[1]:f}.') |
| |
| amplitude_threshold = test_arg.get('amplitude_threshold', |
| _DEFAULT_SOX_AMPLITUDE_THRESHOLD) |
| min_value = audio_utils.GetAudioMinimumAmplitude(sox_output) |
| session.console.info('Got audio min amplitude: %f.', min_value) |
| if (amplitude_threshold[0] is not None and |
| amplitude_threshold[0] > min_value): |
| self.AppendErrorMessage( |
| f'Audio minimum amplitude {min_value:f} too low. Minimum pass is ' |
| f'{amplitude_threshold[0]:f}.') |
| |
| max_value = audio_utils.GetAudioMaximumAmplitude(sox_output) |
| session.console.info('Got audio max amplitude: %f.', max_value) |
| if (amplitude_threshold[1] is not None and |
| amplitude_threshold[1] < max_value): |
| self.AppendErrorMessage( |
| f'Audio maximum amplitude {max_value:f} too high. Maximum pass is ' |
| f'{amplitude_threshold[1]:f}.') |
| |
| max_delta_value = audio_utils.GetAudioMaximumDelta(sox_output) |
| session.console.info('Got audio max delta value: %f.', max_delta_value) |
| max_delta_threshold = test_arg.get('max_delta_threshold', |
| _DEFAULT_SOX_MAX_DELTA_THRESHOLD) |
| if (max_delta_threshold[0] is not None and |
| max_delta_threshold[0] > max_delta_value): |
| self.AppendErrorMessage( |
| f'Audio max delta value {max_delta_value:f} too low. Minimum pass is ' |
| f'{max_delta_threshold[0]:f}.') |
| if (max_delta_threshold[1] is not None and |
| max_delta_threshold[1] < max_delta_value): |
| self.AppendErrorMessage( |
| f'Audio max delta value {max_delta_value:f} too high. Minimum pass is' |
| f' {max_delta_threshold[1]:f}.') |
| |
| if test_arg['type'] == 'sinewav': |
| freq = audio_utils.GetRoughFreq(sox_output) |
| freq_threshold = test_arg.get('freq_threshold', |
| _DEFAULT_SINEWAV_FREQ_THRESHOLD) |
| session.console.info('Expected frequency %r +- %d', _DEFAULT_FREQ_HZ, |
| freq_threshold) |
| if freq is None or (abs(freq - _DEFAULT_FREQ_HZ) > freq_threshold): |
| self.AppendErrorMessage(f'Test Fail at frequency {freq!r}') |
| else: |
| session.console.info('Got frequency %d', freq) |
| |
| def MayPassTest(self): |
| """Checks if test can pass with result of one output volume. |
| |
| Returns: True if test passes, False otherwise. |
| """ |
| session.console.info('Test results for output volume %r: %r', |
| self._output_volumes[self._output_volume_index], |
| self._test_results[self._output_volume_index]) |
| if self._test_results[self._output_volume_index]: |
| return True |
| return False |
| |
| def FailTest(self): |
| """Fails test.""" |
| session.console.info('Test results for each output volumes: %r', |
| list(zip(self._output_volumes, self._test_results))) |
| self.FailTask('; '.join(self._test_message)) |
| |
| def CheckDongleStatus(self): |
| # When audio jack detection feature is ready on a platform, we can |
| # enable check_dongle option to check jack status matches we expected. |
| # yapf: disable |
| if self.args.check_dongle: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| mic_status = self.GetAudio().GetMicJackStatus(self._in_card) |
| headphone_status = self.GetAudio().GetHeadphoneJackStatus(self._out_card) |
| plug_status = mic_status or headphone_status |
| # We've encountered false positive running audiofuntest tool against |
| # audio fun-plug on a few platforms; so it is suggested not to run |
| # audiofuntest with HP/MIC jack |
| if plug_status is True: |
| # yapf: disable |
| if any((t['type'] == 'audiofun') for t in self.args.tests_to_conduct): # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| session.console.info('Audiofuntest does not require dongle.') |
| raise ValueError('Audiofuntest does not require dongle.') |
| # yapf: disable |
| if self.args.require_dongle is False: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| session.console.info('Dongle Status is wrong, don\'t need dongle.') |
| raise ValueError('Dongle Status is wrong.') |
| |
| # for require dongle case, we need to check both microphone and headphone |
| # are all detected. |
| # yapf: disable |
| if self.args.require_dongle: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| if (mic_status and headphone_status) is False: |
| session.console.info('Dongle Status is wrong. mic %s, headphone %s', |
| mic_status, headphone_status) |
| raise ValueError('Dongle Status is wrong.') |
| |
| if self._mic_jack_type: |
| mictype = self.GetAudio().GetMicJackType(self._in_card) |
| if mictype != self._mic_jack_type: |
| session.console.info('Mic Jack Type is wrong. need %s, but %s', |
| self._mic_jack_type, |
| mictype) |
| raise ValueError('Mic Jack Type is wrong.') |
| |
| def SetupAudio(self): |
| # Enable/disable devices according to require_dongle. |
| # We don't use plug_status because plug_status may not be ready at early |
| # stage. |
| self.GetAudio().DisableAllAudioOutputs(self._out_card) |
| # yapf: disable |
| if self.args.require_dongle: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| self.GetAudio().EnableHeadphone(self._out_card) |
| else: |
| self.GetAudio().EnableSpeaker(self._out_card) |
| |
| self.GetAudio().DisableAllAudioInputs(self._in_card) |
| # yapf: disable |
| self.GetAudio().EnableDevice(self.args.mic_source, self._in_card) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| |
| def _ParseConformanceOutput(self, conformance_output: io.TextIOBase): |
| """Parse a conformance output from alsa_conformance_test.py |
| |
| Sample output: |
| 5 passed, 0 failed |
| |
| Args: |
| conformance_output: output stream of alsa_conformance_test.py |
| to parse from. |
| |
| Raises: |
| ValueError: Can not get alsa_conformance_test.py output or wrong format. |
| |
| Returns: |
| A boolean indicates the test passes or not. |
| """ |
| |
| m = self._MatchPatternLines( |
| conformance_output, |
| re.compile(r'[0-9]+\s*passed,\s*([0-9]+)\s*failed$')) |
| if m is None: |
| raise ValueError( |
| 'Failed to get expected output from alsa_conformance_test.py') |
| |
| failed_times = int(m.group(1)) |
| return failed_times == 0 |
| |
| def _CheckDeviceConformance(self, sample_rate, input_device: Optional[str], |
| output_device: Optional[str]): |
| """Run a sub-test of the conformance test. |
| |
| Args: |
| sample_rate: The sample rate to test. |
| input_device: The capture device. |
| output_device: The playback device. |
| """ |
| |
| def _IsInMergeThresholdSize480Board(): |
| """Checks if the device in board list that should modify merge threshold |
| to be able to pass conformance tests. Please refer b/274866472 for the |
| details. Currently only boards using Intel SOF should apply to this list. |
| |
| Returns: |
| `True` if given device in the board list. |
| """ |
| |
| _merge_threshold_size_480_boards = ('brya', 'rex', 'volteer', 'hades', |
| 'nissa', 'brox') |
| |
| lsb_data = image_tool.LSBFile(os.path.join('/', 'etc', 'lsb-release')) |
| board_name = lsb_data.GetChromeOSBoard().lower() |
| return board_name in _merge_threshold_size_480_boards |
| |
| commands = [ |
| audio_utils.CONFORMANCETEST_PATH, |
| '--test-suites', |
| 'test_rates', |
| # yapf: disable |
| '--rate-criteria-diff-pct', |
| f'{self.args.conformance_rate_criteria:f}', # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| '--rate-err-criteria', |
| f'{self.args.conformance_rate_err_criteria}', # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| '--allow-rate', |
| f'{sample_rate}' |
| ] |
| if input_device: |
| commands.extend(['-C', input_device]) |
| if output_device: |
| commands.extend(['-P', output_device]) |
| if _IsInMergeThresholdSize480Board(): |
| commands.extend(['--merge-thld-size', '480']) |
| # yapf: disable |
| self.ui.CallJSFunction('checkConformance', input_device, output_device) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| process = self._dut.Popen(commands, stdout=process_utils.PIPE, |
| stderr=process_utils.PIPE, log=True) |
| stdout, stderr = process.communicate() |
| if stdout: |
| logging.info('stdout:\n%s', stdout) |
| if stderr: |
| logging.info('stderr:\n%s', stderr) |
| try: |
| is_all_passed = self._ParseConformanceOutput(io.StringIO(stdout)) |
| if not is_all_passed: |
| error_messages = 'alsa_conformance_test.py failed for' |
| if input_device: |
| error_messages += f' Input device {input_device}' |
| if output_device: |
| error_messages += f' Output device {output_device}' |
| self.AppendErrorMessage(error_messages) |
| except ValueError as err: |
| dev_info_commands = [audio_utils.CONFORMANCETOOL_PATH, '--dev_info_only'] |
| if input_device: |
| dev_info_commands.extend(['-C', input_device]) |
| if output_device: |
| dev_info_commands.extend(['-P', output_device]) |
| dev_info_process = self._dut.Popen(dev_info_commands, |
| stdout=process_utils.PIPE, |
| stderr=process_utils.PIPE, log=True) |
| dev_info_stdout, unused_dev_info_stderr = dev_info_process.communicate() |
| error_messages = str(err) |
| error_messages += '; Please check parameters with the device info: ' |
| error_messages += dev_info_stdout |
| raise ValueError(error_messages) from None |
| |
| def CheckConformance(self): |
| """Run conformance test program and check the result.""" |
| |
| # TODO(cyueh) Add simultaneous test when b/201381252 is complete. |
| # yapf: disable |
| self._CheckDeviceConformance(self.args.input_rate, self._alsa_input_device, # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| None) |
| # yapf: disable |
| self._CheckDeviceConformance(self.args.output_rate, None, # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| self._alsa_output_device) |