blob: af2bf4b27f450423786a881eb55603abd31e8dc4 [file] [log] [blame]
# Copyright 2021 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Tests thunderbolt port with a loopback card.
Description
-----------
Verifies the thunderbolt port with a loopback card.
Test Procedure
--------------
1. Operator inserts the loopback card.
2. The tool sends payloads to the loopback card.
3. The tool receives payloads from the loopback card and checks correctness.
4. The tool collects lane margining data and uploads it to factory server or
saves it to local factory report.
5. Operator removes the loopback card.
Dependency
----------
- Loopback card driver.
- tdtl tool if we want to test lane margining.
- Write serial number to device data before the test for data collecting.
- (Optional) Be able to connect factory server when running the test.
Examples
--------
Test controller 0-1 with port 0 with 60 seconds timeout::
{
"pytest_name": "thunderbolt_loopback"
"args": {
"usbpd_spec": {
"port": 0
},
"timeout_secs": 60,
"controller_patterns": [
"0-1.*"
]
}
}
Test controller 0-3 with CC1 port 1 with 60 seconds timeout::
{
"pytest_name": "thunderbolt_loopback"
"args": {
"usbpd_spec": {
"port": 1,
"polarity": 1
},
"timeout_secs": 60,
"controller_patterns": [
"0-3.*"
]
}
}
"""
import enum
import logging
import os
import re
import subprocess
import time
from typing import Set
from cros.factory.device import device_utils
from cros.factory.device import usb_c
from cros.factory.test import device_data
from cros.factory.test.env import paths
from cros.factory.test.i18n import _
from cros.factory.test import server_proxy
from cros.factory.test import session
from cros.factory.test import test_case
from cros.factory.testlog import testlog
from cros.factory.utils.arg_utils import Arg
from cros.factory.utils import sync_utils
_LOOPBACK_TEST_PATH = '/sys/kernel/debug/thunderbolt'
_DEFAULT_CONTROLLER_PATTERNS = ('0-1.*', '0-3.*', '1-1.*', '1-3.*')
_RE_ADP_DOMAIN = re.compile(r'.*(?P<domain>\d+)-(?P<adapter>\d+)\.\d+')
_RE_MARGIN_LOOPBACK = re.compile(
r'(RT\d+ L\d+ )(BOTTOM|LEFT),(TOP|RIGHT) = (\d+),(\d+)')
_DMA_TEST = 'dma_test'
_TEST_MODULE = 'thunderbolt_dma_test'
class LinkWidthType(str, enum.Enum):
Single = 'Single'
Dual = 'Dual'
def __str__(self):
return self.name
class LinkSpeedType(str, enum.Enum):
Slow = 'Slow'
Fast = 'Fast'
def __str__(self):
return self.name
ENCODE_LINK_WIDTH = {
LinkWidthType.Single: '1',
LinkWidthType.Dual: '2'
}
ENCODE_LINK_SPEED = {
LinkSpeedType.Slow: '10',
LinkSpeedType.Fast: '20'
}
_RE_STATUS = re.compile(r'result: (.+)\n(?:.|\n)*$')
class _CardState(str, enum.Enum):
Absent = 'Absent'
Multiple = 'Multiple'
Wrong = 'Wrong'
def __str__(self):
return self.name
_TDTL_PATH = os.path.join(paths.FACTORY_DIR, 'tdtl-main')
if not os.path.exists(_TDTL_PATH):
_TDTL_PATH = os.path.join(paths.FACTORY_DIR, 'tdtl-master')
class ThunderboltLoopbackTest(test_case.TestCase):
"""Thunderbolt loopback card factory test."""
related_components = tuple()
LOG_GROUP_NAME = 'usb4_lane_margining_log'
LOG_KEYS = [
'DOMAIN',
'ADP',
'RT1 L0 BOTTOM',
'RT1 L0 TOP',
'RT1 L0 LEFT',
'RT1 L0 RIGHT',
'RT1 L1 BOTTOM',
'RT1 L1 TOP',
'RT1 L1 LEFT',
'RT1 L1 RIGHT',
'RT2 L0 BOTTOM',
'RT2 L0 TOP',
'RT2 L0 LEFT',
'RT2 L0 RIGHT',
'RT2 L1 BOTTOM',
'RT2 L1 TOP',
'RT2 L1 LEFT',
'RT2 L1 RIGHT',
]
ARGS = [
Arg('timeout_secs', int, 'Timeout value for the test.', default=None),
Arg('expected_link_speed', LinkSpeedType, 'Link speed.',
default=LinkSpeedType.Fast),
Arg('expected_link_width', LinkWidthType, 'Link width.',
default=LinkWidthType.Dual),
Arg('packets_to_send', int, 'Amount of packets to be sent.',
default=1000),
Arg('packets_to_receive', int, 'Amount of packets to be received.',
default=1000),
Arg('debugfs_path', str, 'The path of debugfs to test.', default=None),
Arg('controller_port', str, 'Please migrate to controller_patterns.',
default=None),
Arg('controller_patterns', (list, tuple),
('The list of the glob patterns of the controller port to test. '
'Choose a subset of {_DEFAULT_CONTROLLER_PATTERNS!r}.'),
default=_DEFAULT_CONTROLLER_PATTERNS),
Arg('usbpd_spec', dict,
('A dict which must contain "port" and optionally specify "polarity".'
' For example, `{"port": 1, "polarity": 1}`.'),
schema=usb_c.USB_PD_SPEC_SCHEMA, _transform=usb_c.MigrateUSBPDSpec),
Arg('load_module', bool, 'Load test module.', default=True),
Arg('check_muxinfo_only', bool, 'Check muxinfo only.', default=False),
Arg('lane_margining', bool, 'Collect lane margining data.', default=True),
Arg('lane_margining_timeout_secs', (int, float),
'Timeout for collecting lane margining data.', default=10),
Arg('lane_margining_csv', bool,
('If set, will upload the lane margining data to the factory server '
'at the end of the test. Thus, DUT needs to connect to the server. '
'If not set, will only save testlog in the factory report.'),
default=False),
Arg('check_card_removal', bool,
'If set, require removing the card after DMA test.', default=True),
]
def setUp(self):
# yapf: disable
self.ui.ToggleTemplateClass('font-large', True) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
self._dut = device_utils.CreateDUTInterface()
# yapf: disable
self._usbpd_port = self.args.usbpd_spec['port'] # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
self._usbpd_polarity = {
1: 'NORMAL',
2: 'INVERTED'
# yapf: disable
}.get(self.args.usbpd_spec.get('polarity')) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
self._remove_module = False
self._card_state = None
self._muxinfo = {}
self._first_typec_control = True
self._first_check_mux_info = True
self._group_checker = None
# yapf: disable
if self.args.lane_margining: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# Group checker and details for Testlog.
self._group_checker = testlog.GroupParam(self.LOG_GROUP_NAME,
self.LOG_KEYS)
testlog.UpdateParam('ADP', param_type=testlog.ParamType.argument)
testlog.UpdateParam('DOMAIN', param_type=testlog.ParamType.argument)
self._errors = []
def tearDown(self):
if self._remove_module:
self._dut.CheckCall(['modprobe', '-r', _TEST_MODULE], log=True)
def _GlobLoopbackPath(self, controller_patterns: Set[str]):
"""Returns a list of loopback card paths from controller_patterns.
Args:
controller_patterns: The set of the glob patterns.
"""
# yapf: disable
devices = [] # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
for name in controller_patterns:
device_path = self._dut.path.join(_LOOPBACK_TEST_PATH, name, _DMA_TEST)
devices.extend(
self._dut.path.dirname(path) for path in self._dut.Glob(device_path))
return devices
def _SetCardState(self, state):
if self._card_state == state:
return False
self._card_state = state
return True
def _SendTypecControl(self):
"""Send typeccontrol control command."""
_first_typec_control = self._first_typec_control
self._first_typec_control = False
mode = {
'DP': '0',
'TBT': '1',
'USB4': '2'
}
try:
self._dut.CheckCall(
['ectool', 'typeccontrol',
str(self._usbpd_port), '2', mode['TBT']], log=_first_typec_control)
except Exception:
pass
def _CheckMuxinfo(self):
"""Returns True if TBT=1."""
fail_tag = 'GetPDMuxInfo'
_first_check_mux_info = self._first_check_mux_info
self._first_check_mux_info = False
try:
outputs = self._dut.usb_c.GetPDMuxInfo(self._usbpd_port,
log=_first_check_mux_info)
except Exception:
if self._muxinfo.get(fail_tag) != 1:
logging.exception('%s failed', fail_tag)
# yapf: disable
self.ui.SetState(_('Please unplug and replug.')) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
self._muxinfo = {
fail_tag: 1
}
return False
else:
if self._muxinfo != outputs:
logging.info('%s %r', fail_tag, outputs)
# yapf: disable
self.ui.SetState( # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
f'Port {int(self._usbpd_port)}<br>{fail_tag} {outputs!r}')
self._muxinfo = outputs
if self._usbpd_polarity:
if outputs['POLARITY'] != self._usbpd_polarity:
# yapf: disable
self.ui.SetInstruction( # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
_('Wrong USB side, please flip over {media}.',
media='Loopback card'))
return False
# yapf: disable
self.ui.SetInstruction('') # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
if outputs['TBT']:
return True
if outputs['USB']:
self._SendTypecControl()
return False
def _FindLoopbackPath(self):
"""Returns the loopback card controller path.
Returns:
If self.args.debugfs_path is set, then just return it if it exists,
otherwise return None. If self.args.debugfs_path is not set, then use
self.args.controller_patterns to glob the path and return the result if
there is only one match, otherwise return None.
"""
# yapf: disable
if self.args.debugfs_path: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
if self._dut.path.exists(self.args.debugfs_path): # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
return self.args.debugfs_path # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
if self._SetCardState(_CardState.Absent):
logging.info('No loopback card exists.')
return None
target_controller_patterns = set(
# yapf: disable
[self.args.controller_port] if self.args.controller_port else self.args # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
.controller_patterns)
target_controllers = self._GlobLoopbackPath(target_controller_patterns)
if len(target_controllers) > 1:
if self._SetCardState(_CardState.Multiple):
# yapf: disable
self.ui.SetState(_('Do not insert more than one loopback card.')) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
logging.info(
'Multiple loopback cards exist: %r with patterns: %r. '
'Set controller_patterns to be more specific.', target_controllers,
target_controller_patterns)
return None
if len(target_controllers) == 1:
return target_controllers[0]
non_target_controller_patterns = set(
_DEFAULT_CONTROLLER_PATTERNS) - target_controller_patterns
non_target_controllers = self._GlobLoopbackPath(
non_target_controller_patterns)
if non_target_controllers:
if self._SetCardState(_CardState.Wrong):
# yapf: disable
self.ui.SetState( # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
_('The loopback card is inserted into the wrong port.'))
logging.info(
'The loopback card is inserted into the wrong port: '
'%r with patterns: %r', non_target_controllers,
non_target_controller_patterns)
else:
if self._SetCardState(_CardState.Absent):
# yapf: disable
self.ui.SetState(_('Insert the loopback card.')) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
logging.info('No loopback card exists with patterns: %r',
target_controller_patterns)
return None
def _LogAndWriteFile(self, filename, content):
logging.info('echo %s > %s', content, filename)
self._dut.WriteFile(filename, content)
def _TestLaneMargining(self, domain: str, adapter: str):
"""Uses tdtl tool to collect lane margining data.
Args:
domain: A string we pass to tdtl tool.
adapter: A string we pass to tdtl tool.
Returns:
log_result: A dict to save the result.
"""
session.console.info('Start collecting lane margining data.')
# Log 0 when failed.
# Log -1 when timeout.
log_result = dict.fromkeys(self.LOG_KEYS, None)
log_result.update({
'ADP': int(adapter),
'DOMAIN': int(domain),
})
# self._dut.CheckOutput do not support env and timeout
# process_utils.Spawn do not support timeout
cmd = [
'./cli.py', 'margin_loopback', '-d', domain, '-a', adapter, '-r', '0',
'-i', '1'
]
env = {
'ADP': adapter,
'LC_ALL': 'en_US.utf-8',
}
logging.info('env: %r, cmd: %r, cwd: %r', env, cmd, _TDTL_PATH)
# yapf: disable
stop_timer = self.ui.StartCountdownTimer( # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
self.args.lane_margining_timeout_secs) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
try:
result = subprocess.run(
cmd,
env=env,
cwd=_TDTL_PATH,
# yapf: disable
timeout=self.args.lane_margining_timeout_secs, # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
encoding='utf-8',
stdout=subprocess.PIPE,
check=False)
except subprocess.TimeoutExpired:
logging.exception('_TestLaneMargining timeout')
self._errors.append('_TestLaneMargining timeout')
for key, value in log_result.items():
if value is None:
log_result[key] = -1
return log_result
finally:
stop_timer.set()
try:
logging.info('stdout:\n%s', result.stdout)
result.check_returncode()
except Exception:
logging.exception('_TestLaneMargining failed')
self._errors.append('_TestLaneMargining failed')
for key, value in log_result.items():
if value is None:
log_result[key] = 0
# The output of `cli.py margin_loopback` looks like below.
#
# RT1 L0 BOTTOM,TOP = 56,54
# RT2 L0 BOTTOM,TOP = 56,62
# RT1 L0 LEFT,RIGHT = 20,17
# RT2 L0 LEFT,RIGHT = 22,24
# RT1 L1 BOTTOM,TOP = 62,70
# RT2 L1 BOTTOM,TOP = 60,68
# RT1 L1 LEFT,RIGHT = 21,22
# RT2 L1 LEFT,RIGHT = 17,16
for line in result.stdout.splitlines():
match = _RE_MARGIN_LOOPBACK.match(line)
if not match:
continue
for index in range(2, 4):
log_result.update(
{match.group(1) + match.group(index): int(match.group(index + 2))})
return log_result
def _GetUITimer(self):
"""Returns the stop event flag of the timer or None if no timeout."""
# yapf: disable
if self.args.timeout_secs: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
return self.ui.StartFailingCountdownTimer(self.args.timeout_secs) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
return None
def _UploadLaneMarginingViaCSV(self, log_result: dict):
"""Uploads the result of lane margining via UploadCSVEntry."""
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime())
csv_entries = [device_data.GetSerialNumber(), timestamp]
csv_entries.extend(log_result[key] for key in self.LOG_KEYS)
# yapf: disable
self.ui.SetState(_('Trying to check server protocol...')) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
try:
server = server_proxy.GetServerProxy(timeout=5)
server.Ping()
server.UploadCSVEntry(self.LOG_GROUP_NAME, csv_entries)
except server_proxy.Fault:
messages = f'Server fault {server_proxy.GetServerURL()}'
logging.exception(messages)
self._errors.append(messages)
except Exception:
messages = f'Unable to sync with server {server_proxy.GetServerURL()}'
logging.exception(messages)
self._errors.append(messages)
def _SaveLaneMarginingViaTestlog(self, log_result: dict):
"""Saves the result of lane margining via Testlog."""
# yapf: disable
with self._group_checker: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
for key, value in log_result.items():
testlog.LogParam(key, value)
def _UploadOrSaveLaneMargining(self, log_result: dict):
"""Uploads or Saves the result of lane margining."""
# yapf: disable
if self.args.lane_margining_csv: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
self._UploadLaneMarginingViaCSV(log_result)
self._SaveLaneMarginingViaTestlog(log_result)
def _WaitMuxInfoBecomingTBT(self):
"""Waits until Mux info becomes TBT=1."""
stop_timer = self._GetUITimer()
# yapf: disable
self.ui.SetState(_('Insert the loopback card.')) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# yapf: disable
sync_utils.WaitFor(self._CheckMuxinfo, self.args.timeout_secs, # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
poll_interval=0.5)
if stop_timer:
stop_timer.set()
def _WaitForLoopbackCardInsertion(self):
"""Waits until device node appears."""
stop_timer = self._GetUITimer()
# yapf: disable
self.ui.SetState(_('Insert the loopback card.')) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
device_path = sync_utils.WaitFor(self._FindLoopbackPath,
# yapf: disable
self.args.timeout_secs, poll_interval=0.5) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
match = _RE_ADP_DOMAIN.fullmatch(device_path)
if not match:
raise Exception('device_path is not in expected format.')
adapter = match.group('adapter')
domain = match.group('domain')
session.console.info('The ADP is at %r, domain is %r.', adapter, domain)
if stop_timer:
stop_timer.set()
return device_path, domain, adapter
def _WaitForLoopbackCardRemoval(self, device_path):
"""Waits until device node disappears."""
stop_timer = self._GetUITimer()
# yapf: disable
self.ui.SetState(_('Remove the loopback card.')) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
sync_utils.WaitFor(lambda: not self._dut.path.exists(device_path),
# yapf: disable
self.args.timeout_secs, poll_interval=0.5) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
if stop_timer:
stop_timer.set()
def _TestDMA(self, device_path):
"""Performs DMA test."""
stop_timer = self._GetUITimer()
# yapf: disable
self.ui.SetState(_('Test is in progress, please do not move the device.')) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
session.console.info('The loopback card path is at %r.', device_path)
device_test_path = self._dut.path.join(device_path, _DMA_TEST)
# Configure the test
self._LogAndWriteFile(
self._dut.path.join(device_test_path, 'speed'),
# yapf: disable
ENCODE_LINK_SPEED[self.args.expected_link_speed]) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
self._LogAndWriteFile(
self._dut.path.join(device_test_path, 'lanes'),
# yapf: disable
ENCODE_LINK_WIDTH[self.args.expected_link_width]) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
self._LogAndWriteFile(
self._dut.path.join(device_test_path, 'packets_to_send'),
# yapf: disable
str(self.args.packets_to_send)) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
self._LogAndWriteFile(
self._dut.path.join(device_test_path, 'packets_to_receive'),
# yapf: disable
str(self.args.packets_to_receive)) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# Run the test.
self._LogAndWriteFile(self._dut.path.join(device_test_path, 'test'), '1')
if stop_timer:
stop_timer.set()
# Check the result.
status_path = self._dut.path.join(device_test_path, 'status')
logging.info('cat %s', status_path)
output = self._dut.ReadFile(status_path)
logging.info('output:\n%s', output)
match = _RE_STATUS.match(output)
if not match:
self._errors.append('Output format of status is changed.')
# yapf: disable
result = match.group(1) # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
if result == 'success':
return
if result in ('fail', 'failed', 'not run'):
self._errors.append(f'result: {result}')
else:
self._errors.append(f'Unknown result: {result!r}')
def runTest(self):
self._WaitMuxInfoBecomingTBT()
# yapf: disable
if self.args.check_muxinfo_only: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
self.PassTask()
# yapf: disable
if self.args.load_module: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
# Fail the test if the module doesn't exist.
self._dut.CheckCall(['modinfo', _TEST_MODULE])
# If the module is loaded before the test then do not remove it.
loaded = self._dut.Call(['modprobe', '--first-time', _TEST_MODULE],
log=True)
self._remove_module = not loaded
device_path, domain, adapter = self._WaitForLoopbackCardInsertion()
self._TestDMA(device_path)
# yapf: disable
if self.args.lane_margining: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
log_result = self._TestLaneMargining(domain, adapter)
self._UploadOrSaveLaneMargining(log_result)
# yapf: disable
if self.args.check_card_removal: # type: ignore #TODO(b/338318729) Fixit! # pylint: disable=line-too-long
# yapf: enable
self._WaitForLoopbackCardRemoval(device_path)
if self._errors:
self.FailTask('\n'.join(self._errors))