| # Copyright 2021 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Tests thunderbolt port with a loopback card. |
| |
| Description |
| ----------- |
| Verifies the thunderbolt port with a loopback card. |
| |
| Test Procedure |
| -------------- |
| 1. Operator inserts the loopback card. |
| 2. The tool sends payloads to the loopback card. |
| 3. The tool receives payloads from the loopback card and checks correctness. |
| 4. The tool collects lane margining data and uploads it to factory server or |
| saves it to local factory report. |
| 5. Operator removes the loopback card. |
| |
| Dependency |
| ---------- |
| - Loopback card driver. |
| - tdtl tool if we want to test lane margining. |
| - Write serial number to device data before the test for data collecting. |
| - (Optional) Be able to connect factory server when running the test. |
| |
| Examples |
| -------- |
| Test controller 0-1 with port 0 with 60 seconds timeout:: |
| |
| { |
| "pytest_name": "thunderbolt_loopback" |
| "args": { |
| "usbpd_spec": { |
| "port": 0 |
| }, |
| "timeout_secs": 60, |
| "controller_patterns": [ |
| "0-1.*" |
| ] |
| } |
| } |
| |
| Test controller 0-3 with CC1 port 1 with 60 seconds timeout:: |
| |
| { |
| "pytest_name": "thunderbolt_loopback" |
| "args": { |
| "usbpd_spec": { |
| "port": 1, |
| "polarity": 1 |
| }, |
| "timeout_secs": 60, |
| "controller_patterns": [ |
| "0-3.*" |
| ] |
| } |
| } |
| """ |
| |
| import enum |
| import logging |
| import os |
| import re |
| import subprocess |
| import time |
| from typing import Set |
| |
| from cros.factory.device import device_utils |
| from cros.factory.device import usb_c |
| from cros.factory.test import device_data |
| from cros.factory.test.env import paths |
| from cros.factory.test.i18n import _ |
| from cros.factory.test import server_proxy |
| from cros.factory.test import session |
| from cros.factory.test import test_case |
| from cros.factory.testlog import testlog |
| from cros.factory.utils.arg_utils import Arg |
| from cros.factory.utils import sync_utils |
| |
| |
| _LOOPBACK_TEST_PATH = '/sys/kernel/debug/thunderbolt' |
| _DEFAULT_CONTROLLER_PATTERNS = ('0-1.*', '0-3.*', '1-1.*', '1-3.*') |
| _RE_ADP_DOMAIN = re.compile(r'.*(?P<domain>\d+)-(?P<adapter>\d+)\.\d+') |
| _RE_MARGIN_LOOPBACK = re.compile( |
| r'(RT\d+ L\d+ )(BOTTOM|LEFT),(TOP|RIGHT) = (\d+),(\d+)') |
| _DMA_TEST = 'dma_test' |
| _TEST_MODULE = 'thunderbolt_dma_test' |
| |
| |
| class LinkWidthType(str, enum.Enum): |
| Single = 'Single' |
| Dual = 'Dual' |
| |
| def __str__(self): |
| return self.name |
| |
| |
| class LinkSpeedType(str, enum.Enum): |
| Slow = 'Slow' |
| Fast = 'Fast' |
| |
| def __str__(self): |
| return self.name |
| |
| |
| ENCODE_LINK_WIDTH = { |
| LinkWidthType.Single: '1', |
| LinkWidthType.Dual: '2' |
| } |
| ENCODE_LINK_SPEED = { |
| LinkSpeedType.Slow: '10', |
| LinkSpeedType.Fast: '20' |
| } |
| _RE_STATUS = re.compile(r'result: (.+)\n(?:.|\n)*$') |
| |
| |
| class _CardState(str, enum.Enum): |
| Absent = 'Absent' |
| Multiple = 'Multiple' |
| Wrong = 'Wrong' |
| |
| def __str__(self): |
| return self.name |
| |
| |
| _TDTL_PATH = os.path.join(paths.FACTORY_DIR, 'tdtl-main') |
| if not os.path.exists(_TDTL_PATH): |
| _TDTL_PATH = os.path.join(paths.FACTORY_DIR, 'tdtl-master') |
| |
| |
| class ThunderboltLoopbackTest(test_case.TestCase): |
| """Thunderbolt loopback card factory test.""" |
| related_components = tuple() |
| |
| LOG_GROUP_NAME = 'usb4_lane_margining_log' |
| LOG_KEYS = [ |
| 'DOMAIN', |
| 'ADP', |
| 'RT1 L0 BOTTOM', |
| 'RT1 L0 TOP', |
| 'RT1 L0 LEFT', |
| 'RT1 L0 RIGHT', |
| 'RT1 L1 BOTTOM', |
| 'RT1 L1 TOP', |
| 'RT1 L1 LEFT', |
| 'RT1 L1 RIGHT', |
| 'RT2 L0 BOTTOM', |
| 'RT2 L0 TOP', |
| 'RT2 L0 LEFT', |
| 'RT2 L0 RIGHT', |
| 'RT2 L1 BOTTOM', |
| 'RT2 L1 TOP', |
| 'RT2 L1 LEFT', |
| 'RT2 L1 RIGHT', |
| ] |
| ARGS = [ |
| Arg('timeout_secs', int, 'Timeout value for the test.', default=None), |
| Arg('expected_link_speed', LinkSpeedType, 'Link speed.', |
| default=LinkSpeedType.Fast), |
| Arg('expected_link_width', LinkWidthType, 'Link width.', |
| default=LinkWidthType.Dual), |
| Arg('packets_to_send', int, 'Amount of packets to be sent.', |
| default=1000), |
| Arg('packets_to_receive', int, 'Amount of packets to be received.', |
| default=1000), |
| Arg('debugfs_path', str, 'The path of debugfs to test.', default=None), |
| Arg('controller_port', str, 'Please migrate to controller_patterns.', |
| default=None), |
| Arg('controller_patterns', (list, tuple), |
| ('The list of the glob patterns of the controller port to test. ' |
| 'Choose a subset of {_DEFAULT_CONTROLLER_PATTERNS!r}.'), |
| default=_DEFAULT_CONTROLLER_PATTERNS), |
| Arg('usbpd_spec', dict, |
| ('A dict which must contain "port" and optionally specify "polarity".' |
| ' For example, `{"port": 1, "polarity": 1}`.'), |
| schema=usb_c.USB_PD_SPEC_SCHEMA, _transform=usb_c.MigrateUSBPDSpec), |
| Arg('load_module', bool, 'Load test module.', default=True), |
| Arg('check_muxinfo_only', bool, 'Check muxinfo only.', default=False), |
| Arg('lane_margining', bool, 'Collect lane margining data.', default=True), |
| Arg('lane_margining_timeout_secs', (int, float), |
| 'Timeout for collecting lane margining data.', default=10), |
| Arg('lane_margining_csv', bool, |
| ('If set, will upload the lane margining data to the factory server ' |
| 'at the end of the test. Thus, DUT needs to connect to the server. ' |
| 'If not set, will only save testlog in the factory report.'), |
| default=False), |
| Arg('check_card_removal', bool, |
| 'If set, require removing the card after DMA test.', default=True), |
| ] |
| |
| def setUp(self): |
| # yapf: disable |
| self.ui.ToggleTemplateClass('font-large', True) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| self._dut = device_utils.CreateDUTInterface() |
| # yapf: disable |
| self._usbpd_port = self.args.usbpd_spec['port'] # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| self._usbpd_polarity = { |
| 1: 'NORMAL', |
| 2: 'INVERTED' |
| # yapf: disable |
| }.get(self.args.usbpd_spec.get('polarity')) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| self._remove_module = False |
| self._card_state = None |
| self._muxinfo = {} |
| self._first_typec_control = True |
| self._first_check_mux_info = True |
| |
| self._group_checker = None |
| # yapf: disable |
| if self.args.lane_margining: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # Group checker and details for Testlog. |
| self._group_checker = testlog.GroupParam(self.LOG_GROUP_NAME, |
| self.LOG_KEYS) |
| testlog.UpdateParam('ADP', param_type=testlog.ParamType.argument) |
| testlog.UpdateParam('DOMAIN', param_type=testlog.ParamType.argument) |
| self._errors = [] |
| |
| def tearDown(self): |
| if self._remove_module: |
| self._dut.CheckCall(['modprobe', '-r', _TEST_MODULE], log=True) |
| |
| def _GlobLoopbackPath(self, controller_patterns: Set[str]): |
| """Returns a list of loopback card paths from controller_patterns. |
| |
| Args: |
| controller_patterns: The set of the glob patterns. |
| """ |
| # yapf: disable |
| devices = [] # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| for name in controller_patterns: |
| device_path = self._dut.path.join(_LOOPBACK_TEST_PATH, name, _DMA_TEST) |
| devices.extend( |
| self._dut.path.dirname(path) for path in self._dut.Glob(device_path)) |
| return devices |
| |
| def _SetCardState(self, state): |
| if self._card_state == state: |
| return False |
| self._card_state = state |
| return True |
| |
| def _SendTypecControl(self): |
| """Send typeccontrol control command.""" |
| _first_typec_control = self._first_typec_control |
| self._first_typec_control = False |
| mode = { |
| 'DP': '0', |
| 'TBT': '1', |
| 'USB4': '2' |
| } |
| try: |
| self._dut.CheckCall( |
| ['ectool', 'typeccontrol', |
| str(self._usbpd_port), '2', mode['TBT']], log=_first_typec_control) |
| except Exception: |
| pass |
| |
| def _CheckMuxinfo(self): |
| """Returns True if TBT=1.""" |
| fail_tag = 'GetPDMuxInfo' |
| _first_check_mux_info = self._first_check_mux_info |
| self._first_check_mux_info = False |
| try: |
| outputs = self._dut.usb_c.GetPDMuxInfo(self._usbpd_port, |
| log=_first_check_mux_info) |
| except Exception: |
| if self._muxinfo.get(fail_tag) != 1: |
| logging.exception('%s failed', fail_tag) |
| # yapf: disable |
| self.ui.SetState(_('Please unplug and replug.')) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| self._muxinfo = { |
| fail_tag: 1 |
| } |
| return False |
| else: |
| if self._muxinfo != outputs: |
| logging.info('%s %r', fail_tag, outputs) |
| # yapf: disable |
| self.ui.SetState( # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| f'Port {int(self._usbpd_port)}<br>{fail_tag} {outputs!r}') |
| self._muxinfo = outputs |
| if self._usbpd_polarity: |
| if outputs['POLARITY'] != self._usbpd_polarity: |
| # yapf: disable |
| self.ui.SetInstruction( # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| _('Wrong USB side, please flip over {media}.', |
| media='Loopback card')) |
| return False |
| # yapf: disable |
| self.ui.SetInstruction('') # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| if outputs['TBT']: |
| return True |
| if outputs['USB']: |
| self._SendTypecControl() |
| return False |
| |
| def _FindLoopbackPath(self): |
| """Returns the loopback card controller path. |
| |
| Returns: |
| If self.args.debugfs_path is set, then just return it if it exists, |
| otherwise return None. If self.args.debugfs_path is not set, then use |
| self.args.controller_patterns to glob the path and return the result if |
| there is only one match, otherwise return None. |
| """ |
| # yapf: disable |
| if self.args.debugfs_path: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| if self._dut.path.exists(self.args.debugfs_path): # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| return self.args.debugfs_path # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| if self._SetCardState(_CardState.Absent): |
| logging.info('No loopback card exists.') |
| return None |
| |
| target_controller_patterns = set( |
| # yapf: disable |
| [self.args.controller_port] if self.args.controller_port else self.args # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| .controller_patterns) |
| target_controllers = self._GlobLoopbackPath(target_controller_patterns) |
| if len(target_controllers) > 1: |
| if self._SetCardState(_CardState.Multiple): |
| # yapf: disable |
| self.ui.SetState(_('Do not insert more than one loopback card.')) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| logging.info( |
| 'Multiple loopback cards exist: %r with patterns: %r. ' |
| 'Set controller_patterns to be more specific.', target_controllers, |
| target_controller_patterns) |
| return None |
| |
| if len(target_controllers) == 1: |
| return target_controllers[0] |
| |
| non_target_controller_patterns = set( |
| _DEFAULT_CONTROLLER_PATTERNS) - target_controller_patterns |
| non_target_controllers = self._GlobLoopbackPath( |
| non_target_controller_patterns) |
| if non_target_controllers: |
| if self._SetCardState(_CardState.Wrong): |
| # yapf: disable |
| self.ui.SetState( # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| _('The loopback card is inserted into the wrong port.')) |
| logging.info( |
| 'The loopback card is inserted into the wrong port: ' |
| '%r with patterns: %r', non_target_controllers, |
| non_target_controller_patterns) |
| else: |
| if self._SetCardState(_CardState.Absent): |
| # yapf: disable |
| self.ui.SetState(_('Insert the loopback card.')) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| logging.info('No loopback card exists with patterns: %r', |
| target_controller_patterns) |
| return None |
| |
| def _LogAndWriteFile(self, filename, content): |
| logging.info('echo %s > %s', content, filename) |
| self._dut.WriteFile(filename, content) |
| |
| def _TestLaneMargining(self, domain: str, adapter: str): |
| """Uses tdtl tool to collect lane margining data. |
| |
| Args: |
| domain: A string we pass to tdtl tool. |
| adapter: A string we pass to tdtl tool. |
| |
| Returns: |
| log_result: A dict to save the result. |
| """ |
| session.console.info('Start collecting lane margining data.') |
| # Log 0 when failed. |
| # Log -1 when timeout. |
| log_result = dict.fromkeys(self.LOG_KEYS, None) |
| log_result.update({ |
| 'ADP': int(adapter), |
| 'DOMAIN': int(domain), |
| }) |
| # self._dut.CheckOutput do not support env and timeout |
| # process_utils.Spawn do not support timeout |
| cmd = [ |
| './cli.py', 'margin_loopback', '-d', domain, '-a', adapter, '-r', '0', |
| '-i', '1' |
| ] |
| env = { |
| 'ADP': adapter, |
| 'LC_ALL': 'en_US.utf-8', |
| } |
| logging.info('env: %r, cmd: %r, cwd: %r', env, cmd, _TDTL_PATH) |
| # yapf: disable |
| stop_timer = self.ui.StartCountdownTimer( # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| self.args.lane_margining_timeout_secs) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| try: |
| result = subprocess.run( |
| cmd, |
| env=env, |
| cwd=_TDTL_PATH, |
| # yapf: disable |
| timeout=self.args.lane_margining_timeout_secs, # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| encoding='utf-8', |
| stdout=subprocess.PIPE, |
| check=False) |
| except subprocess.TimeoutExpired: |
| logging.exception('_TestLaneMargining timeout') |
| self._errors.append('_TestLaneMargining timeout') |
| for key, value in log_result.items(): |
| if value is None: |
| log_result[key] = -1 |
| return log_result |
| finally: |
| stop_timer.set() |
| try: |
| logging.info('stdout:\n%s', result.stdout) |
| result.check_returncode() |
| except Exception: |
| logging.exception('_TestLaneMargining failed') |
| self._errors.append('_TestLaneMargining failed') |
| for key, value in log_result.items(): |
| if value is None: |
| log_result[key] = 0 |
| # The output of `cli.py margin_loopback` looks like below. |
| # |
| # RT1 L0 BOTTOM,TOP = 56,54 |
| # RT2 L0 BOTTOM,TOP = 56,62 |
| # RT1 L0 LEFT,RIGHT = 20,17 |
| # RT2 L0 LEFT,RIGHT = 22,24 |
| # RT1 L1 BOTTOM,TOP = 62,70 |
| # RT2 L1 BOTTOM,TOP = 60,68 |
| # RT1 L1 LEFT,RIGHT = 21,22 |
| # RT2 L1 LEFT,RIGHT = 17,16 |
| for line in result.stdout.splitlines(): |
| match = _RE_MARGIN_LOOPBACK.match(line) |
| if not match: |
| continue |
| for index in range(2, 4): |
| log_result.update( |
| {match.group(1) + match.group(index): int(match.group(index + 2))}) |
| return log_result |
| |
| def _GetUITimer(self): |
| """Returns the stop event flag of the timer or None if no timeout.""" |
| # yapf: disable |
| if self.args.timeout_secs: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| return self.ui.StartFailingCountdownTimer(self.args.timeout_secs) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| return None |
| |
| def _UploadLaneMarginingViaCSV(self, log_result: dict): |
| """Uploads the result of lane margining via UploadCSVEntry.""" |
| timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime()) |
| csv_entries = [device_data.GetSerialNumber(), timestamp] |
| csv_entries.extend(log_result[key] for key in self.LOG_KEYS) |
| # yapf: disable |
| self.ui.SetState(_('Trying to check server protocol...')) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| try: |
| server = server_proxy.GetServerProxy(timeout=5) |
| server.Ping() |
| server.UploadCSVEntry(self.LOG_GROUP_NAME, csv_entries) |
| except server_proxy.Fault: |
| messages = f'Server fault {server_proxy.GetServerURL()}' |
| logging.exception(messages) |
| self._errors.append(messages) |
| except Exception: |
| messages = f'Unable to sync with server {server_proxy.GetServerURL()}' |
| logging.exception(messages) |
| self._errors.append(messages) |
| |
| def _SaveLaneMarginingViaTestlog(self, log_result: dict): |
| """Saves the result of lane margining via Testlog.""" |
| # yapf: disable |
| with self._group_checker: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| for key, value in log_result.items(): |
| testlog.LogParam(key, value) |
| |
| def _UploadOrSaveLaneMargining(self, log_result: dict): |
| """Uploads or Saves the result of lane margining.""" |
| # yapf: disable |
| if self.args.lane_margining_csv: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| self._UploadLaneMarginingViaCSV(log_result) |
| self._SaveLaneMarginingViaTestlog(log_result) |
| |
| def _WaitMuxInfoBecomingTBT(self): |
| """Waits until Mux info becomes TBT=1.""" |
| stop_timer = self._GetUITimer() |
| |
| # yapf: disable |
| self.ui.SetState(_('Insert the loopback card.')) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # yapf: disable |
| sync_utils.WaitFor(self._CheckMuxinfo, self.args.timeout_secs, # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| poll_interval=0.5) |
| if stop_timer: |
| stop_timer.set() |
| |
| def _WaitForLoopbackCardInsertion(self): |
| """Waits until device node appears.""" |
| stop_timer = self._GetUITimer() |
| |
| # yapf: disable |
| self.ui.SetState(_('Insert the loopback card.')) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| device_path = sync_utils.WaitFor(self._FindLoopbackPath, |
| # yapf: disable |
| self.args.timeout_secs, poll_interval=0.5) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| match = _RE_ADP_DOMAIN.fullmatch(device_path) |
| if not match: |
| raise Exception('device_path is not in expected format.') |
| adapter = match.group('adapter') |
| domain = match.group('domain') |
| session.console.info('The ADP is at %r, domain is %r.', adapter, domain) |
| |
| if stop_timer: |
| stop_timer.set() |
| |
| return device_path, domain, adapter |
| |
| def _WaitForLoopbackCardRemoval(self, device_path): |
| """Waits until device node disappears.""" |
| stop_timer = self._GetUITimer() |
| |
| # yapf: disable |
| self.ui.SetState(_('Remove the loopback card.')) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| |
| sync_utils.WaitFor(lambda: not self._dut.path.exists(device_path), |
| # yapf: disable |
| self.args.timeout_secs, poll_interval=0.5) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| if stop_timer: |
| stop_timer.set() |
| |
| def _TestDMA(self, device_path): |
| """Performs DMA test.""" |
| stop_timer = self._GetUITimer() |
| |
| # yapf: disable |
| self.ui.SetState(_('Test is in progress, please do not move the device.')) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| session.console.info('The loopback card path is at %r.', device_path) |
| device_test_path = self._dut.path.join(device_path, _DMA_TEST) |
| # Configure the test |
| self._LogAndWriteFile( |
| self._dut.path.join(device_test_path, 'speed'), |
| # yapf: disable |
| ENCODE_LINK_SPEED[self.args.expected_link_speed]) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| self._LogAndWriteFile( |
| self._dut.path.join(device_test_path, 'lanes'), |
| # yapf: disable |
| ENCODE_LINK_WIDTH[self.args.expected_link_width]) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| self._LogAndWriteFile( |
| self._dut.path.join(device_test_path, 'packets_to_send'), |
| # yapf: disable |
| str(self.args.packets_to_send)) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| self._LogAndWriteFile( |
| self._dut.path.join(device_test_path, 'packets_to_receive'), |
| # yapf: disable |
| str(self.args.packets_to_receive)) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # Run the test. |
| self._LogAndWriteFile(self._dut.path.join(device_test_path, 'test'), '1') |
| if stop_timer: |
| stop_timer.set() |
| # Check the result. |
| status_path = self._dut.path.join(device_test_path, 'status') |
| logging.info('cat %s', status_path) |
| output = self._dut.ReadFile(status_path) |
| logging.info('output:\n%s', output) |
| match = _RE_STATUS.match(output) |
| if not match: |
| self._errors.append('Output format of status is changed.') |
| # yapf: disable |
| result = match.group(1) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| if result == 'success': |
| return |
| if result in ('fail', 'failed', 'not run'): |
| self._errors.append(f'result: {result}') |
| else: |
| self._errors.append(f'Unknown result: {result!r}') |
| |
| def runTest(self): |
| self._WaitMuxInfoBecomingTBT() |
| # yapf: disable |
| if self.args.check_muxinfo_only: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| self.PassTask() |
| |
| # yapf: disable |
| if self.args.load_module: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| # Fail the test if the module doesn't exist. |
| self._dut.CheckCall(['modinfo', _TEST_MODULE]) |
| # If the module is loaded before the test then do not remove it. |
| loaded = self._dut.Call(['modprobe', '--first-time', _TEST_MODULE], |
| log=True) |
| self._remove_module = not loaded |
| |
| device_path, domain, adapter = self._WaitForLoopbackCardInsertion() |
| self._TestDMA(device_path) |
| |
| # yapf: disable |
| if self.args.lane_margining: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| log_result = self._TestLaneMargining(domain, adapter) |
| self._UploadOrSaveLaneMargining(log_result) |
| |
| # yapf: disable |
| if self.args.check_card_removal: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long |
| # yapf: enable |
| self._WaitForLoopbackCardRemoval(device_path) |
| |
| if self._errors: |
| self.FailTask('\n'.join(self._errors)) |