labtation: Add code to abstract USB hub handling from servod.
Implement per port power cycling for Cambrionix in a way that
will allow all port cycling to go through this tool and for
servod to have all hub code removed.
BUG=b:278585904
TEST=manual run on a fizz-labstation with a cambronix hub and 10 duts
attached.
Change-Id: Ie6ec69c6d41451bc406ffbba639c1afd49444016
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/labstation/+/5450877
Tested-by: Keith Haddow <haddowk@chromium.org>
Auto-Submit: Keith Haddow <haddowk@chromium.org>
Commit-Queue: Keith Haddow <haddowk@chromium.org>
Reviewed-by: Garry Wang <xianuowang@chromium.org>
diff --git a/.isort.cfg b/.isort.cfg
new file mode 100644
index 0000000..8ea365a
--- /dev/null
+++ b/.isort.cfg
@@ -0,0 +1,60 @@
+# Copyright 2022 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+# Config file for the isort python module.
+# This is used to enforce import sorting standards.
+#
+# https://pycqa.github.io/isort/docs/configuration/options.html
+
+[settings]
+# Be compatible with `black` since it also matches what we want.
+profile = black
+
+line_length = 80
+length_sort = false
+force_single_line = true
+lines_after_imports = 2
+from_first = false
+case_sensitive = false
+force_sort_within_sections = true
+order_by_type = false
+
+
+# Have to mark third_party/ libs as third party.
+# And libraries we list in cros lint's PYTHONPATH.
+# NB: Keep in sync with pylintrc.
+known_third_party =
+ _emerge,
+ apiclient,
+ chromite.third_party,
+ elftools,
+ gcloud,
+ google,
+ googleapiclient,
+ httplib2,
+ jinja2,
+ jsonschema,
+ lddtree,
+ magic,
+ mock,
+ oauth2client,
+ portage,
+ pylint,
+ requests,
+ six,
+ sqlalchemy,
+ yaml,
+
+known_first_party =
+ chromite
+
+# Allow importing multiple classes on a single line from these modules.
+# https://google.github.io/styleguide/pyguide#s2.2-imports
+single_line_exclusions =
+ abc,
+ chromite.api.gen.config.replication_config_pb2,
+ chromite.third_party.google.protobuf.struct_pb2,
+ chromite.third_party.infra_libs.buildbucket.proto,
+ collections.abc,
+ typing,
diff --git a/Makefile b/Makefile
deleted file mode 100644
index e747843..0000000
--- a/Makefile
+++ /dev/null
@@ -1,15 +0,0 @@
-# Copyright 2024 The ChromiumOS Authors
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-SUBDIRS := os-dependent/chromeos/upstart-scripts os-dependent/chromeos
-
-all:
- for dir in $(SUBDIRS); do \
- $(MAKE) -C $$dir all || exit 1; \
- done
-
-install:
- for dir in $(SUBDIRS); do \
- $(MAKE) -C $$dir install; \
- done
diff --git a/PRESUBMIT.cfg b/PRESUBMIT.cfg
new file mode 100644
index 0000000..a5baf90
--- /dev/null
+++ b/PRESUBMIT.cfg
@@ -0,0 +1,13 @@
+# Per-project `repo upload` hook settings.
+# https://chromium.googlesource.com/chromiumos/repohooks/
+
+[Hook Scripts]
+# Make cros format do nothing, add two custom checks for
+# black and isort.
+cros format = echo
+black = bash -c 'RV=0; for file in ${PRESUBMIT_FILES}; do if [[ $file == *.py || `head -n1 $file | grep -e "/env python"` ]]; \
+ then if ! black --check $file; then RV=1; fi; fi done; exit $RV'
+isort = bash -c 'RV=0; for file in ${PRESUBMIT_FILES}; do if [[ $file == *.py || `head -n1 $file | grep -e "/env python"` ]]; \
+ then if ! isort -c $file; then RV=1; fi; fi done; exit $RV'
+
+[Hook Overrides]
diff --git a/os-dependent/chromeos/Makefile b/os-dependent/chromeos/Makefile
deleted file mode 100644
index 12e064f..0000000
--- a/os-dependent/chromeos/Makefile
+++ /dev/null
@@ -1,15 +0,0 @@
-# Copyright 2024 The ChromiumOS Authors
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-.PHONY: all
-
-all:
-
-SHAREDIR = /usr/share/cros
-SHARE_DEST = $(DESTDIR)$(SHAREDIR)
-SERVOD_UTILS = servod_utils.sh
-
-install:
- @mkdir -p $(SHARE_DEST)
- install -m 0644 $(SERVOD_UTILS) $(SHARE_DEST)/$(SERVOD_UTILS)
diff --git a/os-dependent/chromeos/upstart-scripts/Makefile b/os-dependent/chromeos/upstart-scripts/Makefile
deleted file mode 100644
index b55e027..0000000
--- a/os-dependent/chromeos/upstart-scripts/Makefile
+++ /dev/null
@@ -1,8 +0,0 @@
-# Copyright 2024 The ChromiumOS Authors
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-all:
-
-install:
- install -m 0644 -t $(DESTDIR)/etc/init -D *.conf
diff --git a/os-dependent/chromeos/upstart-scripts/find-cambronix-mapping.conf b/os-dependent/chromeos/upstart-scripts/find-cambronix-mapping.conf
new file mode 100644
index 0000000..e93dc18
--- /dev/null
+++ b/os-dependent/chromeos/upstart-scripts/find-cambronix-mapping.conf
@@ -0,0 +1,14 @@
+# Copyright 2024 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+description "Generate the mapping of servo serial to cambrionix usb port"
+author "chromium-os-dev@chromium.org"
+
+start on started system-services
+
+# It is exceedingly unlikely to OOM, as it's a simple dbus-send call,
+# but better to kill it than to panic the system.
+oom score -100
+
+exec /usr/bin/find-cambrionix-mapping
\ No newline at end of file
diff --git a/usb_hubs/__init__,py b/usb_hubs/__init__,py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/usb_hubs/__init__,py
diff --git a/usb_hubs/cambrionix/__init__,py b/usb_hubs/cambrionix/__init__,py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/usb_hubs/cambrionix/__init__,py
diff --git a/usb_hubs/cambrionix/console_lib.py b/usb_hubs/cambrionix/console_lib.py
new file mode 100644
index 0000000..da11372
--- /dev/null
+++ b/usb_hubs/cambrionix/console_lib.py
@@ -0,0 +1,145 @@
+# Copyright 2024 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+"""Software interface to allow power cycling ports on Cambrionix hub."""
+import pathlib
+from time import sleep
+
+import serial
+from usb_hubs.common import base
+import yaml
+
+
+CROS_TMP_DIR = "/usr/local/tmp/"
+MAPPING_FILENAME = "usb_hub_mapping.yaml"
+MAX_PORT_NUMBER = 15
+CONSOLE_CLEAR_CHR = chr(8)
+CONSOLE_CLEAR = CONSOLE_CLEAR_CHR * 20
+CONSOLE_ENTER = "\r"
+SERIAL_PATH = "/dev/serial/by-id/"
+CAMBRIONIX_GLOB = "usb-cambrionix_SuperSync15*"
+
+
+class USBHubCommandsCambrionix(base.USBHubCommands):
+ """Hub interface implementation for cambrionix hubs."""
+
+ def __init__(self):
+ """Initialize the class, opening the serial console to the Cambrionix hub."""
+ self.serial_to_port = None
+ self.console = None
+ self.usb_uartname = None
+ self.get_uartname()
+ self.open_console()
+
+ def open_console(self):
+ """Open the serial console to the Cambrionix hub."""
+ try:
+ self.console = serial.Serial(
+ self.usb_uartname, baudrate=115200, write_timeout=5, timeout=5
+ )
+ except serial.SerialException as e:
+ raise base.USBHubCommandsException(
+ "Failed to connect to cambrionix console"
+ ) from e
+
+ def load_serial_to_port(self):
+ """_summary_"""
+ try:
+ with open(
+ CROS_TMP_DIR + MAPPING_FILENAME, "r", encoding="utf-8"
+ ) as input_file:
+ serial_to_port = yaml.safe_load(input_file)
+ self.serial_to_port = {value: key for key, value in serial_to_port.items()}
+ except FileNotFoundError as e:
+ raise base.USBHubCommandsException("Unable to open mapping file.") from e
+
+ def write_serial_to_port(self, serial_to_port):
+ """Write out the servo serial number to port mapping as yaml.
+
+ Args:
+ serial_to_port (map{ string, string}): Mapping of servo serial number to
+ cambrionix port number.
+ """
+ with open(
+ CROS_TMP_DIR + "usb_hub_mapping.yaml", "w", encoding="utf-8"
+ ) as outfile:
+ yaml.dump(serial_to_port, outfile, default_flow_style=False)
+
+ def get_port_from_serial(self, servo_serial_number):
+ """Get the port number from the servo serial number.
+
+ Args:
+ servo_serial_number (string): Servo serial number.
+
+ Returns:
+ int : Cambrionix port number associated with servo serial number.
+ """
+ return self.serial_to_port.get(servo_serial_number)
+
+ def send_to_console(self, command):
+ """Send a command to the cambrionix console
+
+ Args:
+ command (string): cambrionix console command.
+ """
+ command_to_send = command + "\r"
+ self.console.write(command_to_send.encode("utf-8"))
+
+ def power_off_port(self, port):
+ """Send command to power off a specific hub ports.
+
+ Args:
+ port (int): hub port number.
+ """
+ self.send_to_console(f"mode off {port}")
+
+ def power_on_port(self, port):
+ """Send command to power on a specific hub ports.
+
+ Args:
+ port (int): hub port number.
+ """
+ self.send_to_console(f"mode sync {port}")
+
+ def power_on_all(self):
+ """Send command to power on all hub ports."""
+ self.send_to_console("mode sync")
+
+ def power_off_all(self):
+ """Send command to power off all hub ports."""
+ self.send_to_console("mode off")
+
+ def get_uartname(self):
+ """Search for the serial devices that matches the Cambrionix pattern and open
+ the first device.
+ """
+ usb_uartnames = [
+ str(name) for name in pathlib.Path(SERIAL_PATH).glob(CAMBRIONIX_GLOB)
+ ]
+ self.usb_uartname = usb_uartnames[0]
+
+ @staticmethod
+ def is_hub_detected():
+ """Search that to find the number of serial devices attached that match
+ the Cambrionix pattern.
+
+ Returns:
+ bool: True if one and only one cambrionix hub is detected.
+ """
+ usb_uartnames = list(pathlib.Path(SERIAL_PATH).glob(CAMBRIONIX_GLOB))
+ if len(usb_uartnames) > 1:
+ print("More than 1 cambrionix hub found panic !!")
+ return len(usb_uartnames) == 1
+
+ def power_cycle_servo(self, servo_serial_number, downtime_secs=5):
+ """Given a servo serial number power cycle the correct cambrionix port number.
+
+ Args:
+ servo_serial_number (string): serial number of a servo attached to the hub.
+ downtime_secs (int, optional): _description_. Defaults to 5.
+ """
+ self.load_serial_to_port()
+ port = self.get_port_from_serial(servo_serial_number)
+ self.power_off_port(port)
+ sleep(downtime_secs)
+ self.power_on_port(port)
diff --git a/usb_hubs/cambrionix/find_mapping.py b/usb_hubs/cambrionix/find_mapping.py
new file mode 100644
index 0000000..b173353
--- /dev/null
+++ b/usb_hubs/cambrionix/find_mapping.py
@@ -0,0 +1,88 @@
+#!/usr/bin/python3
+# Copyright 2024 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Code to generate a mapping between Cambrionix USB hub ports to servo serials.
+
+This is done by switching on all of the servos connected to the port then switching
+them off one by one and noting which serial number was missing.
+
+This is faster than switching them on one by one as each servo needs time to boot and
+the USB device enumerated by the kernel. This way those boots are all done in parallel.
+
+It will fail to find any completly non responsive servo or servos that do not correctly
+report their serial number in the USB enumeration.
+
+"""
+
+import sys
+from time import sleep
+
+import usb
+from usb_hubs.cambrionix import console_lib
+
+
+SERVO_PIDS = [0x520B, 0x520D]
+
+
+def get_all_servo_serial():
+ """For each servo PID enumerate the USB devices and collect their serial number.
+
+ Returns:
+ list(string): List of connected servo serial numbers.
+ """
+ serialnos = []
+ for product in SERVO_PIDS:
+ for device in list(usb.core.find(idProduct=product, find_all=True)):
+ serialnos.append(usb.util.get_string(device, device.iSerialNumber))
+ return serialnos
+
+
+def reset_all(hub):
+ """Reset all of the servo devices connected to the hub.
+
+ The process of working out the mapping is destructive to servod processes anyway
+ there is better success if the device are all reset.
+
+ Args:
+ hub (USBHubCommandsCambrionix): Class that wraps the serial connection to a
+ specific Cambrionix hub.
+ """
+ hub.power_off_all()
+ hub.power_on_all()
+ # By experimentation 11 seconds is the minimum time it took for the 10 servos
+ # typically connected to boot and enumerate.
+ sleep(11)
+
+
+def main():
+ """Generate a yaml file with a servo serial number to usb port mapping.
+
+ Cycle through switching off each hub port and detecting the missing
+ serial number when as port is switched off.
+ """
+
+ if not console_lib.USBHubCommandsCambrionix.is_hub_detected():
+ print("No single Cambrionix hub detected.")
+ sys.exit(2)
+ hub = console_lib.USBHubCommandsCambrionix()
+ reset_all(hub)
+
+ mapping = {}
+ current_serialnos = get_all_servo_serial()
+ for port in range(1, console_lib.MAX_PORT_NUMBER + 1):
+ hub.power_off_port(port)
+ sleep(1)
+ new_serialnos = get_all_servo_serial()
+ serial_no = set(current_serialnos) - set(new_serialnos)
+ if serial_no:
+ mapping[port] = list(serial_no)[0]
+ current_serialnos = new_serialnos
+ hub.power_on_all()
+ hub.write_serial_to_port(mapping)
+
+
+if __name__ == "__main__":
+ main()
+ sys.exit(0)
diff --git a/usb_hubs/common/__init__,py b/usb_hubs/common/__init__,py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/usb_hubs/common/__init__,py
diff --git a/usb_hubs/common/base.py b/usb_hubs/common/base.py
new file mode 100644
index 0000000..b0ad2c9
--- /dev/null
+++ b/usb_hubs/common/base.py
@@ -0,0 +1,34 @@
+# Copyright 2024 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+"""Base class that details the methods all usb hub commands must implement."""
+
+
+class USBHubCommandsException(Exception):
+ """Generic exception for USB hub commands failing."""
+
+
+class USBHubCommands:
+ """USB Hub Commands that must be implemented by all usb hub instances.
+
+ Raises:
+ NotImplementedError: if a virtual method is not implemented.
+
+ """
+
+ @staticmethod
+ def is_hub_detected():
+ """Return true if this hub type is detected, otherwise false."""
+ raise NotImplementedError
+
+ def power_cycle_servo(self, servo_serial_number, downtime_secs=5):
+ """Use the hub controls to reboot the servo with serial number specified.
+
+ Not all hubs can respect the downtime_seconds, it is hardware dependent.
+
+ Args:
+ servo_serial_number (string): servo serial number
+ downtime_secs (int, optional): Number of seconds to wait before turning
+ the power on. Defaults to 5.
+ """
+ raise NotImplementedError
diff --git a/usb_hubs/per_port_powercycle.py b/usb_hubs/per_port_powercycle.py
new file mode 100644
index 0000000..3636a04
--- /dev/null
+++ b/usb_hubs/per_port_powercycle.py
@@ -0,0 +1,43 @@
+#!/usr/bin/python3
+# Copyright 2024 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+"""Command line tool that given a servo serial number will power cycle the servo"""
+import argparse
+import sys
+
+from usb_hubs.cambrionix import console_lib as cambrionix
+from usb_hubs.plugable import console_lib as plugable
+
+
+def parse_args(args):
+ """Parse the command line arguments.
+ Args:
+ args (list[str]): The list of command line arguments passed to the script.
+
+ Returns:
+ Namespace: parsed command line arguments.
+ """
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--servo_serial", required=True)
+ parser.add_argument("--downtime_secs", type=int, default=5)
+
+ return parser.parse_args(args)
+
+
+def main():
+ """_summary_"""
+ args = parse_args(sys.argv[1:])
+ if not args.servo_serial:
+ print("Must specify servo serial param. --servo_serial=<serial>")
+ sys.exit(2)
+ hub = plugable.USBHubCommandsPlugable()
+ if cambrionix.USBHubCommandsCambrionix.is_hub_detected():
+ hub = cambrionix.USBHubCommandsCambrionix()
+
+ hub.power_cycle_servo(args.servo_serial, args.downtime_secs)
+
+
+if __name__ == "__main__":
+ main()
+ sys.exit(0)
diff --git a/usb_hubs/plugable/__init__,py b/usb_hubs/plugable/__init__,py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/usb_hubs/plugable/__init__,py
diff --git a/usb_hubs/plugable/console_lib.py b/usb_hubs/plugable/console_lib.py
new file mode 100644
index 0000000..3496c44
--- /dev/null
+++ b/usb_hubs/plugable/console_lib.py
@@ -0,0 +1,283 @@
+# Copyright 2024 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+"""Software interface to allow power cycling ports on plugable hubs."""
+import logging
+import os
+import subprocess
+import sys
+import time
+
+import servo.utils.usb_hierarchy as uh
+from usb_hubs.common import base
+
+
+# List of PID supported for power-cycle. For now, it's only v4 and v4p1.
+# 0x520d: v4p1, 0x501b: v4
+PWR_CYCLE_PIDS = [0x520D, 0x501B]
+
+# VID to find all servo devices.
+SERVO_VID = 0x18D1
+
+
+class USBHubCommandsPlugable(base.USBHubCommands):
+ """Hub interface implementation for plugable hubs."""
+
+ # Lookup table for action on uhubctl.
+ ACTION_DICT = {"on": 1, "off": 0, "reset": 2}
+
+ # Repetitions to perform on the uhubctl command to get it to trigger.
+ REPS = 100
+
+ # Time to sleep after reset to let kernel handle sysfs files
+ RESET_DEBOUNCE_S = 2
+
+ # time to sleep for the servo device to re-enumerate.
+ MAX_REINIT_SLEEP_S = 8
+
+ # Polling intervals to find the |devnum| file for reset device.
+ REINIT_POLL_SLEEP_S = 0.1
+
+ # Time to sleep and attempts to interact with servo console after reboot.
+ REBOOT_SLEEP_S = 1
+ REBOOT_TIMEOUT_ATTEMPTS = 4
+
+ # Time to sleep after power off
+ PWR_OFF_SLEEP_S = 1
+
+ @staticmethod
+ def is_hub_detected():
+ # By default the fleet always assume that a plugable hub is connected.
+ # TODO can try searching for a VID/PID but it would not be 100% successful.
+ return True
+
+ def __init__(self):
+ """Setup scratch to use."""
+ self._logger = logging.getLogger(type(self).__name__)
+
+ def error(self, msg, *args):
+ # pylint: disable=invalid-name
+ """Log error and exit.
+
+ Args:
+ msg: message to log
+ *args: args to pass to logging module
+ """
+ self._logger.info(msg, *args)
+ sys.exit(1)
+
+ def _usb_path(self, serial):
+ """Helper to get the device path.
+
+ Args:
+ serial: str, servo serial
+
+ Returns:
+ /sys/bus/usb/devices/ path to servo with |serial| or None if not found
+ """
+ # This list is used to find all servos on the system.
+ vid_pid_list = [(SERVO_VID, None)]
+ devs = uh.Hierarchy.GetAllUsbDeviceSysfsPaths(vid_pid_list)
+ for dev_path in devs:
+ dev_serial = uh.Hierarchy.SerialFromSysfs(dev_path)
+ if dev_serial == serial:
+ return dev_path
+ return None
+
+ def _check_devnum_reset(self, dev_path, devnum, action):
+ """Check that the |devnum| has changed after a reset/reboot/power-cycle
+
+ Args:
+ dev_path: device sysfs path
+ devnum: int, usb devnum (original devnum, before reset action)
+ action: str, action performed (used to print better errors/logs
+
+ Note: this helper will call self.error() (and thus exit) if
+ - the devnum does not change
+ - it fails to read the devnum after self.MAX_REINIT_SLEEP_S
+ """
+ # Sleep a bit to let the device fully fall off, and the sysfs files be
+ # renewed.
+ time.sleep(self.RESET_DEBOUNCE_S)
+ # For |MAX_REINIT_SLEEP_S| seconds, try to find the new devnum for the
+ # device.
+ end = time.time() + self.MAX_REINIT_SLEEP_S
+ while time.time() < end:
+ try:
+ # check devnum reset
+ if devnum == uh.Hierarchy.DevNumFromSysfs(dev_path):
+ self.error(
+ "%r likely unsuccessful. devnum stayed the same.", action
+ )
+ # If |devnum| changed, then the goal is fulfilled. Move on.
+ break
+ except uh.HierarchyError:
+ # The device might not have re-enumerated yet. Sample again.
+ time.sleep(self.REINIT_POLL_SLEEP_S)
+ else:
+ # The while loop finished without breaking out e.g. we never read the
+ # |devnum| file successfully.
+ self.error(
+ "unable to read device |devnum| file after %ds. Giving up.",
+ self.MAX_REINIT_SLEEP_S,
+ )
+
+ def _run_uhubctl_command(self, hub, port, action):
+ """Build |uhubctl| command performing |action| on |hub|'s |port|.
+
+ Args:
+ hub: hub-port path i.e. /sys/bus/usb/devices/ dirname of the hub
+ port: str, port number on the hub
+ action: one of 'on', 'off', 'reset'
+ """
+ cmd = self._build_and_assert_uhubctl(hub=hub, port=port)
+ if action not in self.ACTION_DICT:
+ self.error("Action %s unknown", action)
+ action_number = self.ACTION_DICT[action]
+ # expand command to perform the uhubctl action.
+ cmd = cmd + ["-a", str(action_number), "-r", str(self.REPS)]
+ try:
+ subprocess.check_output(cmd, stderr=subprocess.STDOUT)
+ except subprocess.CalledProcessError as e:
+ self.error(
+ 'Error performing the uhubctl command. ran: "%s". %s.',
+ " ".join(cmd),
+ str(e),
+ )
+
+ def _build_and_assert_uhubctl(self, hub=None, port=None):
+ """Assert uhubctl exists and hub/port are known to it (if provided).
+
+ Args:
+ hub: hub-port path i.e. /sys/bus/usb/devices/ dirname of the hub
+ port: str, port number on the hub
+
+ Returns:
+ cmd: a list of args to call the uhubctl command (at hub/port if provided)
+ """
+ cmd = ["sudo", "uhubctl"]
+ try:
+ subprocess.check_output(cmd, stderr=subprocess.STDOUT)
+ except subprocess.CalledProcessError as e:
+ self.error("uhubctl not available. Be sure to run as sudo. %s", str(e))
+ if hub is not None and port is not None:
+ # expand the command to check for hub and port existing.
+ # casting port to str to ensure we don't accidentally pass an int.
+ cmd = cmd + ["-l", hub, "-p", str(port)]
+ try:
+ subprocess.check_output(cmd, stderr=subprocess.STDOUT)
+ except subprocess.CalledProcessError as e:
+ self.error(
+ "hub %s with port %s unknown to uhubctl. "
+ "Be sure the hub is a supported smart hub. %s",
+ hub,
+ port,
+ str(e),
+ )
+ return cmd
+
+ def _get_hub_and_port(self, dev_path, pid):
+ """Get the external hub and port paths for a given |pid|.
+
+ This is its own function, as different devices might have a different
+ internal topology. This method has to guarantee to implement all pids
+ in PWR_CYCLE_PIDS.
+
+ Args:
+ dev_path: /sys/bus/usb/devices/ path for servo
+ pid: servo pid
+
+ Returns:
+ (hub, port, hub3,) tuple, where hub is the external hub that the
+ servo is on port is the port on that hub that the
+ servo is on. hub3 is the usb3 virtual hub of the same
+ physical hub. It will be None if it does not exist
+ (hub only enumerated in usb2)
+ """
+ # NOTE: if a servo device or configuration should be supported, this is the
+ # spot to implement it. If more devices get implemented here, make sure to
+ # document the setup under which the user can expect it to work and for the
+ # code to guard against other setups.
+ # For example: if a servo micro is attached to a servo v4, the code to reset
+ # the micro should not just reset the hub that the v4 is hanging on, as that
+ # would reset both.
+ # TODO(coconutruben): make pid permission more robust once we have device
+ # templates.
+ if pid in PWR_CYCLE_PIDS:
+ # For servo v4(p1), the dev_path points to the stm that's hanging on
+ # an internal usb hub.
+ internal_hub = uh.Hierarchy.GetSysfsParentHubStub(dev_path)
+ smart_hub_path = uh.Hierarchy.GetSysfsParentHubStub(internal_hub)
+ if smart_hub_path:
+ # The internal hub is hanging on the smart hub's port. So the last
+ # index is the port number.
+ port = internal_hub.rsplit(".", 1)[-1]
+ smart_hub = os.path.basename(smart_hub_path)
+ # |internal_hub| is always on usb2. Let's see if this hub also
+ # enumerated on usb3.
+ smart_hub_bus, unused_ = smart_hub.split("-")
+ busnum = int(smart_hub_bus)
+ busnum3 = uh.Hierarchy.ComplementBusNum(busnum)
+ smart_hub3 = None
+ if busnum3:
+ smart_hub3 = "busnum3-smart_hub_port_path"
+ smart_hub3_path = os.path.join(
+ os.path.dirname(smart_hub_path), smart_hub3
+ )
+ if not os.path.exists(smart_hub3_path):
+ # set back to None
+ smart_hub3 = None
+ return (smart_hub, port, smart_hub3)
+ self.error(
+ "Device does not seem to be hanging on a (smart) hub. %r", dev_path
+ )
+
+ self.error("Unimplemented pid: %04x", pid)
+
+ def power_cycle_servo(self, servo_serial_number, downtime_secs=5):
+ """Perform a power-cycle on the device using uhubctl.
+
+ uhubctl exposes multiple knobs to control the power-cycling of a port.
+ This method uses the 'reset' knob by default. However, if the user
+ specifies |force|=True, it will issue an 'off' request, wait for
+ |PWR_OFF_SLEEP_S| seconds, before issuing an 'on' request. For some hubs
+ this has proven itself more reliably than a reset request.
+
+ Args:
+ force: bool, whether to perform a full power-cycle or just a reset
+ """
+ self._build_and_assert_uhubctl()
+ dev_path = self._usb_path(servo_serial_number)
+ if not dev_path:
+ self.error("Device with serial %r not found.", servo_serial_number)
+ pid = uh.Hierarchy.ProductIDFromSysfs(dev_path)
+ if pid not in PWR_CYCLE_PIDS:
+ self.error(
+ "pid: 0x%04x currently not supported for usb power cycling. "
+ "Please use one of: %s",
+ pid,
+ ", ".join("0x%04x" % p for p in PWR_CYCLE_PIDS),
+ )
+ # get devnum, and store it
+ devnum = uh.Hierarchy.DevNumFromSysfs(dev_path)
+ # extract the hub and check whether it's on uhubctl
+ hub, port, hub3 = self._get_hub_and_port(dev_path, pid)
+
+ # The sandwich (if usb2 and usb3 are available) is to first turn
+ # off usb2 and then usb3, before unrolling that operation.
+ self._run_uhubctl_command(hub=hub, port=port, action="off")
+ if hub3 is not None:
+ self._run_uhubctl_command(hub=hub3, port=port, action="off")
+ time.sleep(self.PWR_OFF_SLEEP_S)
+ if hub3 is not None:
+ self._run_uhubctl_command(hub=hub3, port=port, action="on")
+ self._run_uhubctl_command(hub=hub, port=port, action="on")
+
+ self._check_devnum_reset(dev_path, devnum, "power-cycle")
+ # At the end, no error was encountered, so indicate belief that reset was
+ # successful.
+ self._logger.info(
+ "Successfully power-cycled device with serial %r. "
+ "(At least reasonably confident).",
+ servo_serial_number,
+ )
diff --git a/usb_hubs/pyproject.toml b/usb_hubs/pyproject.toml
new file mode 100644
index 0000000..3480867
--- /dev/null
+++ b/usb_hubs/pyproject.toml
@@ -0,0 +1,4 @@
+[tool.pytest.ini_options]
+pythonpath = [
+ ".."
+]
\ No newline at end of file
diff --git a/usb_hubs/setup.py b/usb_hubs/setup.py
new file mode 100644
index 0000000..a14e722
--- /dev/null
+++ b/usb_hubs/setup.py
@@ -0,0 +1,32 @@
+#!/usr/bin/env python3
+# Copyright 2024 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Build the python code related to the labstation usb hub."""
+
+
+from setuptools import setup
+
+
+setup(
+ name="usb_hubs",
+ version="0.1",
+ maintainer="chromium os",
+ maintainer_email="chromium-os-dev@chromium.org",
+ license="Chromium",
+ package_dir={"usb_hubs": "../usb_hubs"},
+ packages=[
+ "usb_hubs",
+ "usb_hubs.common",
+ "usb_hubs.cambrionix",
+ "usb_hubs.plugable",
+ ],
+ entry_points={
+ "console_scripts": [
+ "find-cambrionix-mapping=usb_hubs.cambrionix.find_mapping:main",
+ "powercycle-servo-usbhub-port=usb_hubs.per_port_powercycle:main",
+ ],
+ },
+ description="Labstation usb hub tools.",
+)
diff --git a/usb_hubs/tests/__init__.py b/usb_hubs/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/usb_hubs/tests/__init__.py
diff --git a/usb_hubs/tests/conftest.py b/usb_hubs/tests/conftest.py
new file mode 100644
index 0000000..674ad28
--- /dev/null
+++ b/usb_hubs/tests/conftest.py
@@ -0,0 +1,25 @@
+# Copyright 2024 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+# These imports are necessary for pytest dependency injection.
+# pylint: disable=unused-import
+# pylint: disable=import-error
+# pylint: disable=missing-module-docstring
+# pylint: disable=missing-function-docstring
+# pylint: disable=missing-class-docstring
+
+import pytest
+from usb_hubs.tests.fixtures.mock_cambrionix_console import (
+ mock_cambrionix_console,
+)
+from usb_hubs.tests.fixtures.mock_cambrionix_host import mock_cambrionix_host
+from usb_hubs.tests.fixtures.mock_cambrionix_host_with_consoles import (
+ mock_host_with_multiple_cambrionix,
+)
+from usb_hubs.tests.fixtures.mock_cambrionix_host_with_consoles import (
+ mock_host_with_no_cambrionix,
+)
+from usb_hubs.tests.fixtures.mock_cambrionix_host_with_consoles import (
+ mock_host_with_one_cambrionix,
+)
diff --git a/usb_hubs/tests/e2e/cambronix/test_find_mapping.py b/usb_hubs/tests/e2e/cambronix/test_find_mapping.py
new file mode 100644
index 0000000..3cbb4b1
--- /dev/null
+++ b/usb_hubs/tests/e2e/cambronix/test_find_mapping.py
@@ -0,0 +1,143 @@
+# Copyright 2024 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+# pylint: disable=redefined-outer-name
+# pylint: disable=unused-argument
+# pylint: disable=import-error
+# pylint: disable=missing-module-docstring
+# pylint: disable=missing-function-docstring
+# pylint: disable=missing-class-docstring
+# pylint: disable=R0801
+# When pylint supports proto better remove
+# pylint: disable=no-member
+
+import sys
+from unittest.mock import call
+
+import pytest
+from usb_hubs.cambrionix.find_mapping import main
+
+
+class TestFindMapping:
+ def test_find_mapping_no_cambrionix(
+ self, monkeypatch, mock_host_with_no_cambrionix, capfd
+ ):
+ mock_host_with_no_cambrionix()
+
+ with monkeypatch.context() as patch:
+ patch.setattr(sys, "argv", ["find-cambrionix-mapping"])
+
+ with pytest.raises(SystemExit) as pytest_wrapped_e:
+ main()
+ out, unused_err = capfd.readouterr()
+ assert ("No single Cambrionix hub detected.\n") == out
+ assert SystemExit == pytest_wrapped_e.type
+
+ def test_find_mapping_cambrionix(
+ self, monkeypatch, mock_host_with_one_cambrionix, capfd
+ ):
+ host = mock_host_with_one_cambrionix()
+
+ with monkeypatch.context() as patch:
+ patch.setattr(sys, "argv", ["find-cambrionix-mapping"])
+ main()
+ out, unused_err = capfd.readouterr()
+ host.mock_open.assert_called_once_with(
+ "/usr/local/tmp/usb_hub_mapping.yaml", "w", encoding="utf-8"
+ )
+ write_calls = [
+ call("1"),
+ call(":"),
+ call(" "),
+ call("a"),
+ call("\n"),
+ call("2"),
+ call(":"),
+ call(" "),
+ call("b"),
+ call("\n"),
+ call("3"),
+ call(":"),
+ call(" "),
+ call("c"),
+ call("\n"),
+ call("4"),
+ call(":"),
+ call(" "),
+ call("d"),
+ call("\n"),
+ call("5"),
+ call(":"),
+ call(" "),
+ call("e"),
+ call("\n"),
+ call("6"),
+ call(":"),
+ call(" "),
+ call("f"),
+ call("\n"),
+ call("7"),
+ call(":"),
+ call(" "),
+ call("g"),
+ call("\n"),
+ call("8"),
+ call(":"),
+ call(" "),
+ call("h"),
+ call("\n"),
+ call("9"),
+ call(":"),
+ call(" "),
+ call("i"),
+ call("\n"),
+ call("10"),
+ call(":"),
+ call(" "),
+ call("j"),
+ call("\n"),
+ call("11"),
+ call(":"),
+ call(" "),
+ call("k"),
+ call("\n"),
+ call("12"),
+ call(":"),
+ call(" "),
+ call("l"),
+ call("\n"),
+ call("13"),
+ call(":"),
+ call(" "),
+ call("m"),
+ call("\n"),
+ call("14"),
+ call(":"),
+ call(" "),
+ call("n"),
+ call("\n"),
+ call("15"),
+ call(":"),
+ call(" "),
+ call("o"),
+ call("\n"),
+ ]
+ host.mock_open().write.assert_has_calls(calls=write_calls, any_order=False)
+ assert ("") == out
+
+ def test_find_mapping_multiple_cambrionix(
+ self, monkeypatch, mock_host_with_multiple_cambrionix, capfd
+ ):
+ mock_host_with_multiple_cambrionix()
+
+ with monkeypatch.context() as patch:
+ patch.setattr(sys, "argv", ["find-cambrionix-mapping"])
+
+ with pytest.raises(SystemExit) as pytest_wrapped_e:
+ main()
+ out, unused_err = capfd.readouterr()
+ assert (
+ "More than 1 cambrionix hub found panic !!\nNo single Cambrionix hub detected.\n"
+ ) == out
+ assert SystemExit == pytest_wrapped_e.type
diff --git a/usb_hubs/tests/e2e/cambronix/test_port_cycle.py b/usb_hubs/tests/e2e/cambronix/test_port_cycle.py
new file mode 100644
index 0000000..4813f6e
--- /dev/null
+++ b/usb_hubs/tests/e2e/cambronix/test_port_cycle.py
@@ -0,0 +1,52 @@
+# Copyright 2024 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+# pylint: disable=redefined-outer-name
+# pylint: disable=unused-argument
+# pylint: disable=import-error
+# pylint: disable=missing-module-docstring
+# pylint: disable=missing-function-docstring
+# pylint: disable=missing-class-docstring
+# pylint: disable=R0801
+# When pylint supports proto better remove
+# pylint: disable=no-member
+
+import sys
+
+import pytest
+from usb_hubs.per_port_powercycle import main
+
+
+class TestPortCycle:
+ def test_no_servo_serial_flag(
+ self, monkeypatch, mock_host_with_one_cambrionix, capfd
+ ):
+ mock_host_with_one_cambrionix()
+
+ with monkeypatch.context() as patch:
+ patch.setattr(sys, "argv", ["per_port_powercycle"])
+
+ with pytest.raises(SystemExit) as pytest_wrapped_e:
+ main()
+ out, unused_err = capfd.readouterr()
+ assert ("Must specify servo serial param. --servo_serial=<serial>\n") == out
+ assert SystemExit == pytest_wrapped_e.type
+
+ def test_basic(self, monkeypatch, mock_host_with_one_cambrionix, capfd):
+ host = mock_host_with_one_cambrionix()
+
+ with monkeypatch.context() as patch:
+ patch.setattr(sys, "argv", ["per_port_powercycle", "--servo_serial", "a"])
+ main()
+ out, unused_err = capfd.readouterr()
+ assert "" == out
+ assert b"mode off 1\rmode sync 1\r" == host.console.written
+
+ host.console.written = b""
+ with monkeypatch.context() as patch:
+ patch.setattr(sys, "argv", ["per_port_powercycle", "--servo_serial", "o"])
+ main()
+ out, unused_err = capfd.readouterr()
+ assert "" == out
+ assert b"mode off 15\rmode sync 15\r" == host.console.written
diff --git a/usb_hubs/tests/fixtures/mock_cambrionix_console.py b/usb_hubs/tests/fixtures/mock_cambrionix_console.py
new file mode 100644
index 0000000..2c2b677
--- /dev/null
+++ b/usb_hubs/tests/fixtures/mock_cambrionix_console.py
@@ -0,0 +1,90 @@
+# Copyright 2024 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+# pylint: disable=missing-module-docstring
+# pylint: disable=missing-function-docstring
+# pylint: disable=missing-class-docstring
+
+import contextlib
+import logging
+import re
+
+from cambrionix.console_lib import CONSOLE_CLEAR_CHR
+from cambrionix.console_lib import CONSOLE_ENTER
+import pytest
+
+
+_logger = logging.getLogger("mock_cambrionix_host")
+
+
+@pytest.fixture(scope="function")
+def mock_cambrionix_console():
+ def generate_cambrionix_console():
+ class MockCambrionixConsole(contextlib.AbstractContextManager):
+ def __init__(self):
+ self.last_write = None
+ self.written = b""
+ self.console_responses = {
+ b"": None,
+ }
+
+ self.port_enabled_mapping = {
+ 1: True,
+ 2: True,
+ 3: True,
+ 4: True,
+ 5: True,
+ 6: True,
+ 7: True,
+ 8: True,
+ 9: True,
+ 10: True,
+ 11: True,
+ 12: True,
+ 13: True,
+ 14: True,
+ 15: True,
+ }
+
+ def reset_input_buffer(self):
+ pass
+
+ def reset_output_buffer(self):
+ pass
+
+ def write(self, text):
+ self.last_write = text.strip(
+ f"{CONSOLE_ENTER}{CONSOLE_CLEAR_CHR} ".encode("utf-8")
+ )
+ self.written += text
+ if text.strip() == b"mode off":
+ self.port_enabled_mapping = {
+ port: False for port in self.port_enabled_mapping
+ }
+ elif text.strip() == b"mode sync":
+ self.port_enabled_mapping = {
+ port: True for port in self.port_enabled_mapping
+ }
+ match = re.match(r"mode off ([0-9]*)", text.strip().decode("utf-8"))
+ if match:
+ self.port_enabled_mapping[int(match.group(1))] = False
+ match = re.match(r"mode on ([0-9]*)", text.strip().decode("utf-8"))
+ if match:
+ self.port_enabled_mapping[int(match.group(1))] = True
+
+ def read(self):
+ return None
+
+ def read_until(self, unused_end_char):
+ return self.console_responses[self.last_write]
+
+ def flush(self):
+ pass
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ pass
+
+ return MockCambrionixConsole()
+
+ return generate_cambrionix_console
diff --git a/usb_hubs/tests/fixtures/mock_cambrionix_host.py b/usb_hubs/tests/fixtures/mock_cambrionix_host.py
new file mode 100644
index 0000000..1a466f8
--- /dev/null
+++ b/usb_hubs/tests/fixtures/mock_cambrionix_host.py
@@ -0,0 +1,141 @@
+# Copyright 2024 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+# pylint: disable=unused-argument
+# pylint: disable=import-error
+# pylint: disable=missing-module-docstring
+# pylint: disable=missing-function-docstring
+# pylint: disable=missing-class-docstring
+# pylint: disable=import-error
+# pylint: disable=no-name-in-module
+
+
+import logging
+import os
+from pathlib import PosixPath
+import re
+
+import pytest
+import serial
+from usb_hubs.cambrionix.console_lib import SERIAL_PATH
+
+
+_logger = logging.getLogger("mock_cambrionix_host")
+
+
+class USBDevice:
+ servo_serial_number = None
+ vid = None
+
+ def __init__(self, serial_number, vid):
+ self.servo_serial_number = serial_number
+ self.vid = vid
+
+
+@pytest.fixture(scope="function")
+def mock_cambrionix_host(class_mocker, mocker):
+ def generate_cambrionix_host():
+ """Mock generator function. This allows multiple tests to be run in
+ parallel as it generates a new mock for each test vs sharing the same
+ mock between tests.
+
+ Returns:
+ MockCambrionixHost:
+ """
+
+ class MockCambrionixHost:
+ def __init__(self):
+ self.usb_devices = []
+ class_mocker.patch("serial.Serial", side_effect=self.mock_serial_open)
+ class_mocker.patch(
+ "pathlib.PosixPath.glob",
+ side_effect=self.mock_glob,
+ )
+ class_mocker.patch(
+ "usb.core.find",
+ side_effect=self.mock_usbfind,
+ )
+
+ class_mocker.patch(
+ "usb.util.get_string",
+ side_effect=self.mock_usb_get_string,
+ )
+
+ mocker.patch(
+ "time.sleep",
+ side_effect=self.mock_sleep,
+ )
+
+ self.mock_open = mocker.mock_open(
+ read_data=(
+ "1: a\n2: b\n3: c\n4: d\n5: e\n6: f\n7: g\n8: h\n9: i\n10:"
+ " j\n11: k\n12: l\n13: m\n14: n\n15: o\n"
+ )
+ )
+
+ mocker.patch("builtins.open", self.mock_open)
+
+ self.port_device_mapping = {
+ 1: USBDevice("a", 0x520B),
+ 2: USBDevice("b", 0x520B),
+ 3: USBDevice("c", 0x520B),
+ 4: USBDevice("d", 0x520B),
+ 5: USBDevice("e", 0x520B),
+ 6: USBDevice("f", 0x520B),
+ 7: USBDevice("g", 0x520B),
+ 8: USBDevice("h", 0x520B),
+ 9: USBDevice("i", 0x520B),
+ 10: USBDevice("j", 0x520B),
+ 11: USBDevice("k", 0x520B),
+ 12: USBDevice("l", 0x520B),
+ 13: USBDevice("m", 0x520B),
+ 14: USBDevice("n", 0x520B),
+ 15: USBDevice("o", 0x520B),
+ }
+
+ self.console = None
+
+ def set_usb_devices(self, usb_devices):
+ self.usb_devices = usb_devices
+
+ def clear_usb_devices(self):
+ self.usb_devices = []
+
+ def raise_on_open(self):
+ class_mocker.patch(
+ "serial.Serial", side_effect=serial.SerialException()
+ )
+
+ def mock_glob(self, glob_str):
+ matched_usb_devices = [
+ device for device in self.usb_devices if re.match(glob_str, device)
+ ]
+ return [
+ PosixPath(f"{SERIAL_PATH}{device}")
+ for device in matched_usb_devices
+ ]
+
+ # pylint
+ def mock_serial_open(self, port, baudrate, timeout):
+ if not os.path.basename(port) in self.usb_devices:
+ raise serial.SerialException(f"Device not found {port}.")
+ self.console = self.usb_devices[os.path.basename(port)]
+ return self.console
+
+ def mock_usbfind(self, vendor_id, find_all):
+ results = []
+ for port, enabled in self.console.port_enabled_mapping.items():
+ if enabled and self.port_device_mapping[port].vid == vendor_id:
+ results.append(self.port_device_mapping[port])
+ return results
+
+ def mock_usb_get_string(self, device, attr):
+ return attr
+
+ def mock_sleep(self, time):
+ return
+
+ return MockCambrionixHost()
+
+ return generate_cambrionix_host
diff --git a/usb_hubs/tests/fixtures/mock_cambrionix_host_with_consoles.py b/usb_hubs/tests/fixtures/mock_cambrionix_host_with_consoles.py
new file mode 100644
index 0000000..f0ba035
--- /dev/null
+++ b/usb_hubs/tests/fixtures/mock_cambrionix_host_with_consoles.py
@@ -0,0 +1,56 @@
+# Copyright 2024 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+# pylint: disable=missing-module-docstring
+# pylint: disable=missing-function-docstring
+# pylint: disable=missing-class-docstring
+# pylint: disable=import-error
+# pylint: disable=no-name-in-module
+
+import logging
+
+import pytest
+
+
+_logger = logging.getLogger("mock_cambrionix_host_with_console")
+
+
+@pytest.fixture()
+def mock_host_with_no_cambrionix(mock_cambrionix_host):
+ def generate_host():
+ mock_cambrionix_host()
+
+ return generate_host
+
+
+@pytest.fixture()
+def mock_host_with_one_cambrionix(mock_cambrionix_host, mock_cambrionix_console):
+ def generate_host():
+ cambrionix_console = mock_cambrionix_console()
+ cambrionix_host = mock_cambrionix_host()
+ cambrionix_host.set_usb_devices(
+ {"usb-cambrionix_SuperSync15_0000003B71550C92-if01": cambrionix_console}
+ )
+
+ return cambrionix_host
+
+ return generate_host
+
+
+@pytest.fixture()
+def mock_host_with_multiple_cambrionix(mock_cambrionix_host, mock_cambrionix_console):
+ def generate_host():
+
+ cambrionix_console = mock_cambrionix_console()
+ cambrionix_host = mock_cambrionix_host()
+ cambrionix_host.set_usb_devices(
+ {
+ "usb-cambrionix_SuperSync15_0000003B71550C92-if01": cambrionix_console,
+ "usb-cambrionix_SuperSync15_0000003B71550C93-if01": cambrionix_console,
+ }
+ )
+
+ return cambrionix_host
+
+ return generate_host