labtation: Add code to abstract USB hub handling from servod.

Implement per port power cycling for Cambrionix in a way that
will allow all port cycling to go through this tool and for
servod to have all hub code removed.

BUG=b:278585904
TEST=manual run on a fizz-labstation with a cambronix hub and 10 duts
attached.

Change-Id: Ie6ec69c6d41451bc406ffbba639c1afd49444016
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/labstation/+/5450877
Tested-by: Keith Haddow <haddowk@chromium.org>
Auto-Submit: Keith Haddow <haddowk@chromium.org>
Commit-Queue: Keith Haddow <haddowk@chromium.org>
Reviewed-by: Garry Wang <xianuowang@chromium.org>
diff --git a/.isort.cfg b/.isort.cfg
new file mode 100644
index 0000000..8ea365a
--- /dev/null
+++ b/.isort.cfg
@@ -0,0 +1,60 @@
+# Copyright 2022 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+# Config file for the isort python module.
+# This is used to enforce import sorting standards.
+#
+# https://pycqa.github.io/isort/docs/configuration/options.html
+
+[settings]
+# Be compatible with `black` since it also matches what we want.
+profile = black
+
+line_length = 80
+length_sort = false
+force_single_line = true
+lines_after_imports = 2
+from_first = false
+case_sensitive = false
+force_sort_within_sections = true
+order_by_type = false
+
+
+# Have to mark third_party/ libs as third party.
+# And libraries we list in cros lint's PYTHONPATH.
+# NB: Keep in sync with pylintrc.
+known_third_party =
+    _emerge,
+    apiclient,
+    chromite.third_party,
+    elftools,
+    gcloud,
+    google,
+    googleapiclient,
+    httplib2,
+    jinja2,
+    jsonschema,
+    lddtree,
+    magic,
+    mock,
+    oauth2client,
+    portage,
+    pylint,
+    requests,
+    six,
+    sqlalchemy,
+    yaml,
+
+known_first_party =
+    chromite
+
+# Allow importing multiple classes on a single line from these modules.
+# https://google.github.io/styleguide/pyguide#s2.2-imports
+single_line_exclusions =
+    abc,
+    chromite.api.gen.config.replication_config_pb2,
+    chromite.third_party.google.protobuf.struct_pb2,
+    chromite.third_party.infra_libs.buildbucket.proto,
+    collections.abc,
+    typing,
diff --git a/Makefile b/Makefile
deleted file mode 100644
index e747843..0000000
--- a/Makefile
+++ /dev/null
@@ -1,15 +0,0 @@
-# Copyright 2024 The ChromiumOS Authors
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-SUBDIRS := os-dependent/chromeos/upstart-scripts os-dependent/chromeos
-
-all:
-	for dir in $(SUBDIRS); do \
-                $(MAKE) -C $$dir all || exit 1; \
-        done
-
-install:
-	for dir in $(SUBDIRS); do \
-                $(MAKE) -C $$dir install; \
-        done
diff --git a/PRESUBMIT.cfg b/PRESUBMIT.cfg
new file mode 100644
index 0000000..a5baf90
--- /dev/null
+++ b/PRESUBMIT.cfg
@@ -0,0 +1,13 @@
+# Per-project `repo upload` hook settings.
+# https://chromium.googlesource.com/chromiumos/repohooks/
+
+[Hook Scripts]
+# Make cros format do nothing, add two custom checks for
+# black and isort.
+cros format = echo
+black = bash -c 'RV=0; for file in ${PRESUBMIT_FILES}; do if [[ $file == *.py || `head -n1 $file | grep -e "/env python"` ]]; \
+    then if ! black --check $file; then RV=1; fi; fi done; exit $RV'
+isort = bash -c 'RV=0; for file in ${PRESUBMIT_FILES}; do if [[ $file == *.py || `head -n1 $file | grep -e "/env python"` ]]; \
+    then if ! isort -c $file; then RV=1; fi; fi done; exit $RV'
+
+[Hook Overrides]
diff --git a/os-dependent/chromeos/Makefile b/os-dependent/chromeos/Makefile
deleted file mode 100644
index 12e064f..0000000
--- a/os-dependent/chromeos/Makefile
+++ /dev/null
@@ -1,15 +0,0 @@
-# Copyright 2024 The ChromiumOS Authors
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-.PHONY: all
-
-all:
-
-SHAREDIR	= /usr/share/cros
-SHARE_DEST	= $(DESTDIR)$(SHAREDIR)
-SERVOD_UTILS	= servod_utils.sh
-
-install:
-	@mkdir -p $(SHARE_DEST)
-	install -m 0644 $(SERVOD_UTILS) $(SHARE_DEST)/$(SERVOD_UTILS)
diff --git a/os-dependent/chromeos/upstart-scripts/Makefile b/os-dependent/chromeos/upstart-scripts/Makefile
deleted file mode 100644
index b55e027..0000000
--- a/os-dependent/chromeos/upstart-scripts/Makefile
+++ /dev/null
@@ -1,8 +0,0 @@
-# Copyright 2024 The ChromiumOS Authors
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-all:
-
-install:
-	install -m 0644 -t $(DESTDIR)/etc/init -D *.conf
diff --git a/os-dependent/chromeos/upstart-scripts/find-cambronix-mapping.conf b/os-dependent/chromeos/upstart-scripts/find-cambronix-mapping.conf
new file mode 100644
index 0000000..e93dc18
--- /dev/null
+++ b/os-dependent/chromeos/upstart-scripts/find-cambronix-mapping.conf
@@ -0,0 +1,14 @@
+# Copyright 2024 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+description     "Generate the mapping of servo serial to cambrionix usb port"
+author          "chromium-os-dev@chromium.org"
+
+start on started system-services
+
+# It is exceedingly unlikely to OOM, as it's a simple dbus-send call,
+# but better to kill it than to panic the system.
+oom score -100
+
+exec /usr/bin/find-cambrionix-mapping
\ No newline at end of file
diff --git a/usb_hubs/__init__,py b/usb_hubs/__init__,py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/usb_hubs/__init__,py
diff --git a/usb_hubs/cambrionix/__init__,py b/usb_hubs/cambrionix/__init__,py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/usb_hubs/cambrionix/__init__,py
diff --git a/usb_hubs/cambrionix/console_lib.py b/usb_hubs/cambrionix/console_lib.py
new file mode 100644
index 0000000..da11372
--- /dev/null
+++ b/usb_hubs/cambrionix/console_lib.py
@@ -0,0 +1,145 @@
+# Copyright 2024 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+"""Software interface to allow power cycling ports on Cambrionix hub."""
+import pathlib
+from time import sleep
+
+import serial
+from usb_hubs.common import base
+import yaml
+
+
+CROS_TMP_DIR = "/usr/local/tmp/"
+MAPPING_FILENAME = "usb_hub_mapping.yaml"
+MAX_PORT_NUMBER = 15
+CONSOLE_CLEAR_CHR = chr(8)
+CONSOLE_CLEAR = CONSOLE_CLEAR_CHR * 20
+CONSOLE_ENTER = "\r"
+SERIAL_PATH = "/dev/serial/by-id/"
+CAMBRIONIX_GLOB = "usb-cambrionix_SuperSync15*"
+
+
+class USBHubCommandsCambrionix(base.USBHubCommands):
+    """Hub interface implementation for cambrionix hubs."""
+
+    def __init__(self):
+        """Initialize the class, opening the serial console to the Cambrionix hub."""
+        self.serial_to_port = None
+        self.console = None
+        self.usb_uartname = None
+        self.get_uartname()
+        self.open_console()
+
+    def open_console(self):
+        """Open the serial console to the Cambrionix hub."""
+        try:
+            self.console = serial.Serial(
+                self.usb_uartname, baudrate=115200, write_timeout=5, timeout=5
+            )
+        except serial.SerialException as e:
+            raise base.USBHubCommandsException(
+                "Failed to connect to cambrionix console"
+            ) from e
+
+    def load_serial_to_port(self):
+        """_summary_"""
+        try:
+            with open(
+                CROS_TMP_DIR + MAPPING_FILENAME, "r", encoding="utf-8"
+            ) as input_file:
+                serial_to_port = yaml.safe_load(input_file)
+            self.serial_to_port = {value: key for key, value in serial_to_port.items()}
+        except FileNotFoundError as e:
+            raise base.USBHubCommandsException("Unable to open mapping file.") from e
+
+    def write_serial_to_port(self, serial_to_port):
+        """Write out the servo serial number to port mapping as yaml.
+
+        Args:
+            serial_to_port (map{ string, string}): Mapping of servo serial number to
+            cambrionix port number.
+        """
+        with open(
+            CROS_TMP_DIR + "usb_hub_mapping.yaml", "w", encoding="utf-8"
+        ) as outfile:
+            yaml.dump(serial_to_port, outfile, default_flow_style=False)
+
+    def get_port_from_serial(self, servo_serial_number):
+        """Get the port number from the servo serial number.
+
+        Args:
+            servo_serial_number (string): Servo serial number.
+
+        Returns:
+            int : Cambrionix port number associated with servo serial number.
+        """
+        return self.serial_to_port.get(servo_serial_number)
+
+    def send_to_console(self, command):
+        """Send a command to the cambrionix console
+
+        Args:
+            command (string): cambrionix console command.
+        """
+        command_to_send = command + "\r"
+        self.console.write(command_to_send.encode("utf-8"))
+
+    def power_off_port(self, port):
+        """Send command to power off a specific hub ports.
+
+        Args:
+            port (int): hub port number.
+        """
+        self.send_to_console(f"mode off {port}")
+
+    def power_on_port(self, port):
+        """Send command to power on a specific hub ports.
+
+        Args:
+            port (int): hub port number.
+        """
+        self.send_to_console(f"mode sync {port}")
+
+    def power_on_all(self):
+        """Send command to power on all hub ports."""
+        self.send_to_console("mode sync")
+
+    def power_off_all(self):
+        """Send command to power off all hub ports."""
+        self.send_to_console("mode off")
+
+    def get_uartname(self):
+        """Search for the serial devices that matches the Cambrionix pattern and open
+        the first device.
+        """
+        usb_uartnames = [
+            str(name) for name in pathlib.Path(SERIAL_PATH).glob(CAMBRIONIX_GLOB)
+        ]
+        self.usb_uartname = usb_uartnames[0]
+
+    @staticmethod
+    def is_hub_detected():
+        """Search that to find the number of serial devices attached that match
+           the Cambrionix pattern.
+
+        Returns:
+            bool: True if one and only one cambrionix hub is detected.
+        """
+        usb_uartnames = list(pathlib.Path(SERIAL_PATH).glob(CAMBRIONIX_GLOB))
+        if len(usb_uartnames) > 1:
+            print("More than 1 cambrionix hub found panic !!")
+        return len(usb_uartnames) == 1
+
+    def power_cycle_servo(self, servo_serial_number, downtime_secs=5):
+        """Given a servo serial number power cycle the correct cambrionix port number.
+
+        Args:
+            servo_serial_number (string): serial number of a servo attached to the hub.
+            downtime_secs (int, optional): _description_. Defaults to 5.
+        """
+        self.load_serial_to_port()
+        port = self.get_port_from_serial(servo_serial_number)
+        self.power_off_port(port)
+        sleep(downtime_secs)
+        self.power_on_port(port)
diff --git a/usb_hubs/cambrionix/find_mapping.py b/usb_hubs/cambrionix/find_mapping.py
new file mode 100644
index 0000000..b173353
--- /dev/null
+++ b/usb_hubs/cambrionix/find_mapping.py
@@ -0,0 +1,88 @@
+#!/usr/bin/python3
+# Copyright 2024 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Code to generate a mapping between Cambrionix USB hub ports to servo serials.
+
+This is done by switching on all of the servos connected to the port then switching
+them off one by one and noting which serial number was missing.
+
+This is faster than switching them on one by one as each servo needs time to boot and
+the USB device enumerated by the kernel.  This way those boots are all done in parallel.
+
+It will fail to find any completly non responsive servo or servos that do not correctly
+report their serial number in the USB enumeration.
+
+"""
+
+import sys
+from time import sleep
+
+import usb
+from usb_hubs.cambrionix import console_lib
+
+
+SERVO_PIDS = [0x520B, 0x520D]
+
+
+def get_all_servo_serial():
+    """For each servo PID enumerate the USB devices and collect their serial number.
+
+    Returns:
+        list(string): List of connected servo serial numbers.
+    """
+    serialnos = []
+    for product in SERVO_PIDS:
+        for device in list(usb.core.find(idProduct=product, find_all=True)):
+            serialnos.append(usb.util.get_string(device, device.iSerialNumber))
+    return serialnos
+
+
+def reset_all(hub):
+    """Reset all of the servo devices connected to the hub.
+
+    The process of working out the mapping is destructive to servod processes anyway
+    there is better success if the device are all reset.
+
+    Args:
+        hub (USBHubCommandsCambrionix): Class that wraps the serial connection to a
+        specific Cambrionix hub.
+    """
+    hub.power_off_all()
+    hub.power_on_all()
+    # By experimentation 11 seconds is the minimum time it took for the 10 servos
+    # typically connected to boot and enumerate.
+    sleep(11)
+
+
+def main():
+    """Generate a yaml file with a servo serial number to usb port mapping.
+
+    Cycle through switching off each hub port and detecting the missing
+    serial number when as port is switched off.
+    """
+
+    if not console_lib.USBHubCommandsCambrionix.is_hub_detected():
+        print("No single Cambrionix hub detected.")
+        sys.exit(2)
+    hub = console_lib.USBHubCommandsCambrionix()
+    reset_all(hub)
+
+    mapping = {}
+    current_serialnos = get_all_servo_serial()
+    for port in range(1, console_lib.MAX_PORT_NUMBER + 1):
+        hub.power_off_port(port)
+        sleep(1)
+        new_serialnos = get_all_servo_serial()
+        serial_no = set(current_serialnos) - set(new_serialnos)
+        if serial_no:
+            mapping[port] = list(serial_no)[0]
+        current_serialnos = new_serialnos
+    hub.power_on_all()
+    hub.write_serial_to_port(mapping)
+
+
+if __name__ == "__main__":
+    main()
+    sys.exit(0)
diff --git a/usb_hubs/common/__init__,py b/usb_hubs/common/__init__,py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/usb_hubs/common/__init__,py
diff --git a/usb_hubs/common/base.py b/usb_hubs/common/base.py
new file mode 100644
index 0000000..b0ad2c9
--- /dev/null
+++ b/usb_hubs/common/base.py
@@ -0,0 +1,34 @@
+# Copyright 2024 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+"""Base class that details the methods all usb hub commands must implement."""
+
+
+class USBHubCommandsException(Exception):
+    """Generic exception for USB hub commands failing."""
+
+
+class USBHubCommands:
+    """USB Hub Commands that must be implemented by all usb hub instances.
+
+    Raises:
+        NotImplementedError: if a virtual method is not implemented.
+
+    """
+
+    @staticmethod
+    def is_hub_detected():
+        """Return true if this hub type is detected, otherwise false."""
+        raise NotImplementedError
+
+    def power_cycle_servo(self, servo_serial_number, downtime_secs=5):
+        """Use the hub controls to reboot the servo with serial number specified.
+
+        Not all hubs can respect the downtime_seconds, it is hardware dependent.
+
+        Args:
+            servo_serial_number (string): servo serial number
+            downtime_secs (int, optional): Number of seconds to wait before turning
+                the power on. Defaults to 5.
+        """
+        raise NotImplementedError
diff --git a/usb_hubs/per_port_powercycle.py b/usb_hubs/per_port_powercycle.py
new file mode 100644
index 0000000..3636a04
--- /dev/null
+++ b/usb_hubs/per_port_powercycle.py
@@ -0,0 +1,43 @@
+#!/usr/bin/python3
+# Copyright 2024 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+"""Command line tool that given a servo serial number will power cycle the servo"""
+import argparse
+import sys
+
+from usb_hubs.cambrionix import console_lib as cambrionix
+from usb_hubs.plugable import console_lib as plugable
+
+
+def parse_args(args):
+    """Parse the command line arguments.
+    Args:
+        args (list[str]): The list of command line arguments passed to the script.
+
+    Returns:
+       Namespace: parsed command line arguments.
+    """
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--servo_serial", required=True)
+    parser.add_argument("--downtime_secs", type=int, default=5)
+
+    return parser.parse_args(args)
+
+
+def main():
+    """_summary_"""
+    args = parse_args(sys.argv[1:])
+    if not args.servo_serial:
+        print("Must specify servo serial param. --servo_serial=<serial>")
+        sys.exit(2)
+    hub = plugable.USBHubCommandsPlugable()
+    if cambrionix.USBHubCommandsCambrionix.is_hub_detected():
+        hub = cambrionix.USBHubCommandsCambrionix()
+
+    hub.power_cycle_servo(args.servo_serial, args.downtime_secs)
+
+
+if __name__ == "__main__":
+    main()
+    sys.exit(0)
diff --git a/usb_hubs/plugable/__init__,py b/usb_hubs/plugable/__init__,py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/usb_hubs/plugable/__init__,py
diff --git a/usb_hubs/plugable/console_lib.py b/usb_hubs/plugable/console_lib.py
new file mode 100644
index 0000000..3496c44
--- /dev/null
+++ b/usb_hubs/plugable/console_lib.py
@@ -0,0 +1,283 @@
+# Copyright 2024 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+"""Software interface to allow power cycling ports on plugable hubs."""
+import logging
+import os
+import subprocess
+import sys
+import time
+
+import servo.utils.usb_hierarchy as uh
+from usb_hubs.common import base
+
+
+# List of PID supported for power-cycle. For now, it's only v4 and v4p1.
+# 0x520d: v4p1, 0x501b: v4
+PWR_CYCLE_PIDS = [0x520D, 0x501B]
+
+# VID to find all servo devices.
+SERVO_VID = 0x18D1
+
+
+class USBHubCommandsPlugable(base.USBHubCommands):
+    """Hub interface implementation for plugable hubs."""
+
+    # Lookup table for action on uhubctl.
+    ACTION_DICT = {"on": 1, "off": 0, "reset": 2}
+
+    # Repetitions to perform on the uhubctl command to get it to trigger.
+    REPS = 100
+
+    # Time to sleep after reset to let kernel handle sysfs files
+    RESET_DEBOUNCE_S = 2
+
+    # time to sleep for the servo device to re-enumerate.
+    MAX_REINIT_SLEEP_S = 8
+
+    # Polling intervals to find the |devnum| file for reset device.
+    REINIT_POLL_SLEEP_S = 0.1
+
+    # Time to sleep and attempts to interact with servo console after reboot.
+    REBOOT_SLEEP_S = 1
+    REBOOT_TIMEOUT_ATTEMPTS = 4
+
+    # Time to sleep after power off
+    PWR_OFF_SLEEP_S = 1
+
+    @staticmethod
+    def is_hub_detected():
+        # By default the fleet always assume that a plugable hub is connected.
+        # TODO can try searching for a VID/PID but it would not be 100% successful.
+        return True
+
+    def __init__(self):
+        """Setup scratch to use."""
+        self._logger = logging.getLogger(type(self).__name__)
+
+    def error(self, msg, *args):
+        # pylint: disable=invalid-name
+        """Log error and exit.
+
+        Args:
+          msg: message to log
+          *args: args to pass to logging module
+        """
+        self._logger.info(msg, *args)
+        sys.exit(1)
+
+    def _usb_path(self, serial):
+        """Helper to get the device path.
+
+        Args:
+          serial: str, servo serial
+
+        Returns:
+          /sys/bus/usb/devices/ path to servo with |serial| or None if not found
+        """
+        # This list is used to find all servos on the system.
+        vid_pid_list = [(SERVO_VID, None)]
+        devs = uh.Hierarchy.GetAllUsbDeviceSysfsPaths(vid_pid_list)
+        for dev_path in devs:
+            dev_serial = uh.Hierarchy.SerialFromSysfs(dev_path)
+            if dev_serial == serial:
+                return dev_path
+        return None
+
+    def _check_devnum_reset(self, dev_path, devnum, action):
+        """Check that the |devnum| has changed after a reset/reboot/power-cycle
+
+        Args:
+          dev_path: device sysfs path
+          devnum: int, usb devnum (original devnum, before reset action)
+          action: str, action performed (used to print better errors/logs
+
+        Note: this helper will call self.error() (and thus exit) if
+        - the devnum does not change
+        - it fails to read the devnum after self.MAX_REINIT_SLEEP_S
+        """
+        # Sleep a bit to let the device fully fall off, and the sysfs files be
+        # renewed.
+        time.sleep(self.RESET_DEBOUNCE_S)
+        # For |MAX_REINIT_SLEEP_S| seconds, try to find the new devnum for the
+        # device.
+        end = time.time() + self.MAX_REINIT_SLEEP_S
+        while time.time() < end:
+            try:
+                # check devnum reset
+                if devnum == uh.Hierarchy.DevNumFromSysfs(dev_path):
+                    self.error(
+                        "%r likely unsuccessful. devnum stayed the same.", action
+                    )
+                # If |devnum| changed, then the goal is fulfilled. Move on.
+                break
+            except uh.HierarchyError:
+                # The device might not have re-enumerated yet. Sample again.
+                time.sleep(self.REINIT_POLL_SLEEP_S)
+        else:
+            # The while loop finished without breaking out e.g. we never read the
+            # |devnum| file successfully.
+            self.error(
+                "unable to read device |devnum| file after %ds. Giving up.",
+                self.MAX_REINIT_SLEEP_S,
+            )
+
+    def _run_uhubctl_command(self, hub, port, action):
+        """Build |uhubctl| command performing |action| on |hub|'s |port|.
+
+        Args:
+          hub: hub-port path i.e. /sys/bus/usb/devices/ dirname of the hub
+          port: str, port number on the hub
+          action: one of 'on', 'off', 'reset'
+        """
+        cmd = self._build_and_assert_uhubctl(hub=hub, port=port)
+        if action not in self.ACTION_DICT:
+            self.error("Action %s unknown", action)
+        action_number = self.ACTION_DICT[action]
+        # expand command to perform the uhubctl action.
+        cmd = cmd + ["-a", str(action_number), "-r", str(self.REPS)]
+        try:
+            subprocess.check_output(cmd, stderr=subprocess.STDOUT)
+        except subprocess.CalledProcessError as e:
+            self.error(
+                'Error performing the uhubctl command. ran: "%s". %s.',
+                " ".join(cmd),
+                str(e),
+            )
+
+    def _build_and_assert_uhubctl(self, hub=None, port=None):
+        """Assert uhubctl exists and hub/port are known to it (if provided).
+
+        Args:
+          hub: hub-port path i.e. /sys/bus/usb/devices/ dirname of the hub
+          port: str, port number on the hub
+
+        Returns:
+          cmd: a list of args to call the uhubctl command (at hub/port if provided)
+        """
+        cmd = ["sudo", "uhubctl"]
+        try:
+            subprocess.check_output(cmd, stderr=subprocess.STDOUT)
+        except subprocess.CalledProcessError as e:
+            self.error("uhubctl not available. Be sure to run as sudo. %s", str(e))
+        if hub is not None and port is not None:
+            # expand the command to check for hub and port existing.
+            # casting port to str to ensure we don't accidentally pass an int.
+            cmd = cmd + ["-l", hub, "-p", str(port)]
+            try:
+                subprocess.check_output(cmd, stderr=subprocess.STDOUT)
+            except subprocess.CalledProcessError as e:
+                self.error(
+                    "hub %s with port %s unknown to uhubctl. "
+                    "Be sure the hub is a supported smart hub. %s",
+                    hub,
+                    port,
+                    str(e),
+                )
+        return cmd
+
+    def _get_hub_and_port(self, dev_path, pid):
+        """Get the external hub and port paths for a given |pid|.
+
+        This is its own function, as different devices might have a different
+        internal topology. This method has to guarantee to implement all pids
+        in PWR_CYCLE_PIDS.
+
+        Args:
+          dev_path: /sys/bus/usb/devices/ path for servo
+          pid: servo pid
+
+        Returns:
+          (hub, port, hub3,) tuple, where hub is the external hub that the
+                             servo is on port is the port on that hub that the
+                             servo is on. hub3 is the usb3 virtual hub of the same
+                             physical hub. It will be None if it does not exist
+                             (hub only enumerated in usb2)
+        """
+        # NOTE: if a servo device or configuration should be supported, this is the
+        # spot to implement it. If more devices get implemented here, make sure to
+        # document the setup under which the user can expect it to work and for the
+        # code to guard against other setups.
+        # For example: if a servo micro is attached to a servo v4, the code to reset
+        # the micro should not just reset the hub that the v4 is hanging on, as that
+        # would reset both.
+        # TODO(coconutruben): make pid permission more robust once we have device
+        # templates.
+        if pid in PWR_CYCLE_PIDS:
+            # For servo v4(p1), the dev_path points to the stm that's hanging on
+            # an internal usb hub.
+            internal_hub = uh.Hierarchy.GetSysfsParentHubStub(dev_path)
+            smart_hub_path = uh.Hierarchy.GetSysfsParentHubStub(internal_hub)
+            if smart_hub_path:
+                # The internal hub is hanging on the smart hub's port. So the last
+                # index is the port number.
+                port = internal_hub.rsplit(".", 1)[-1]
+                smart_hub = os.path.basename(smart_hub_path)
+                # |internal_hub| is always on usb2. Let's see if this hub also
+                # enumerated on usb3.
+                smart_hub_bus, unused_ = smart_hub.split("-")
+                busnum = int(smart_hub_bus)
+                busnum3 = uh.Hierarchy.ComplementBusNum(busnum)
+                smart_hub3 = None
+                if busnum3:
+                    smart_hub3 = "busnum3-smart_hub_port_path"
+                    smart_hub3_path = os.path.join(
+                        os.path.dirname(smart_hub_path), smart_hub3
+                    )
+                    if not os.path.exists(smart_hub3_path):
+                        # set back to None
+                        smart_hub3 = None
+                return (smart_hub, port, smart_hub3)
+            self.error(
+                "Device does not seem to be hanging on a (smart) hub. %r", dev_path
+            )
+
+        self.error("Unimplemented pid: %04x", pid)
+
+    def power_cycle_servo(self, servo_serial_number, downtime_secs=5):
+        """Perform a power-cycle on the device using uhubctl.
+
+        uhubctl exposes multiple knobs to control the power-cycling of a port.
+        This method uses the 'reset' knob by default. However, if the user
+        specifies |force|=True, it will issue an 'off' request, wait for
+        |PWR_OFF_SLEEP_S| seconds, before issuing an 'on' request. For some hubs
+        this has proven itself more reliably than a reset request.
+
+        Args:
+          force: bool, whether to perform a full power-cycle or just a reset
+        """
+        self._build_and_assert_uhubctl()
+        dev_path = self._usb_path(servo_serial_number)
+        if not dev_path:
+            self.error("Device with serial %r not found.", servo_serial_number)
+        pid = uh.Hierarchy.ProductIDFromSysfs(dev_path)
+        if pid not in PWR_CYCLE_PIDS:
+            self.error(
+                "pid: 0x%04x currently not supported for usb power cycling. "
+                "Please use one of: %s",
+                pid,
+                ", ".join("0x%04x" % p for p in PWR_CYCLE_PIDS),
+            )
+        # get devnum, and store it
+        devnum = uh.Hierarchy.DevNumFromSysfs(dev_path)
+        # extract the hub and check whether it's on uhubctl
+        hub, port, hub3 = self._get_hub_and_port(dev_path, pid)
+
+        # The sandwich (if usb2 and usb3 are available) is to first turn
+        # off usb2 and then usb3, before unrolling that operation.
+        self._run_uhubctl_command(hub=hub, port=port, action="off")
+        if hub3 is not None:
+            self._run_uhubctl_command(hub=hub3, port=port, action="off")
+        time.sleep(self.PWR_OFF_SLEEP_S)
+        if hub3 is not None:
+            self._run_uhubctl_command(hub=hub3, port=port, action="on")
+        self._run_uhubctl_command(hub=hub, port=port, action="on")
+
+        self._check_devnum_reset(dev_path, devnum, "power-cycle")
+        # At the end, no error was encountered, so indicate belief that reset was
+        # successful.
+        self._logger.info(
+            "Successfully power-cycled device with serial %r. "
+            "(At least reasonably confident).",
+            servo_serial_number,
+        )
diff --git a/usb_hubs/pyproject.toml b/usb_hubs/pyproject.toml
new file mode 100644
index 0000000..3480867
--- /dev/null
+++ b/usb_hubs/pyproject.toml
@@ -0,0 +1,4 @@
+[tool.pytest.ini_options]
+pythonpath = [
+  ".."
+]
\ No newline at end of file
diff --git a/usb_hubs/setup.py b/usb_hubs/setup.py
new file mode 100644
index 0000000..a14e722
--- /dev/null
+++ b/usb_hubs/setup.py
@@ -0,0 +1,32 @@
+#!/usr/bin/env python3
+# Copyright 2024 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Build the python code related to the labstation usb hub."""
+
+
+from setuptools import setup
+
+
+setup(
+    name="usb_hubs",
+    version="0.1",
+    maintainer="chromium os",
+    maintainer_email="chromium-os-dev@chromium.org",
+    license="Chromium",
+    package_dir={"usb_hubs": "../usb_hubs"},
+    packages=[
+        "usb_hubs",
+        "usb_hubs.common",
+        "usb_hubs.cambrionix",
+        "usb_hubs.plugable",
+    ],
+    entry_points={
+        "console_scripts": [
+            "find-cambrionix-mapping=usb_hubs.cambrionix.find_mapping:main",
+            "powercycle-servo-usbhub-port=usb_hubs.per_port_powercycle:main",
+        ],
+    },
+    description="Labstation usb hub tools.",
+)
diff --git a/usb_hubs/tests/__init__.py b/usb_hubs/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/usb_hubs/tests/__init__.py
diff --git a/usb_hubs/tests/conftest.py b/usb_hubs/tests/conftest.py
new file mode 100644
index 0000000..674ad28
--- /dev/null
+++ b/usb_hubs/tests/conftest.py
@@ -0,0 +1,25 @@
+# Copyright 2024 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+# These imports are necessary for pytest dependency injection.
+# pylint: disable=unused-import
+# pylint: disable=import-error
+# pylint: disable=missing-module-docstring
+# pylint: disable=missing-function-docstring
+# pylint: disable=missing-class-docstring
+
+import pytest
+from usb_hubs.tests.fixtures.mock_cambrionix_console import (
+    mock_cambrionix_console,
+)
+from usb_hubs.tests.fixtures.mock_cambrionix_host import mock_cambrionix_host
+from usb_hubs.tests.fixtures.mock_cambrionix_host_with_consoles import (
+    mock_host_with_multiple_cambrionix,
+)
+from usb_hubs.tests.fixtures.mock_cambrionix_host_with_consoles import (
+    mock_host_with_no_cambrionix,
+)
+from usb_hubs.tests.fixtures.mock_cambrionix_host_with_consoles import (
+    mock_host_with_one_cambrionix,
+)
diff --git a/usb_hubs/tests/e2e/cambronix/test_find_mapping.py b/usb_hubs/tests/e2e/cambronix/test_find_mapping.py
new file mode 100644
index 0000000..3cbb4b1
--- /dev/null
+++ b/usb_hubs/tests/e2e/cambronix/test_find_mapping.py
@@ -0,0 +1,143 @@
+# Copyright 2024 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+# pylint: disable=redefined-outer-name
+# pylint: disable=unused-argument
+# pylint: disable=import-error
+# pylint: disable=missing-module-docstring
+# pylint: disable=missing-function-docstring
+# pylint: disable=missing-class-docstring
+# pylint: disable=R0801
+# When pylint supports proto better remove
+# pylint: disable=no-member
+
+import sys
+from unittest.mock import call
+
+import pytest
+from usb_hubs.cambrionix.find_mapping import main
+
+
+class TestFindMapping:
+    def test_find_mapping_no_cambrionix(
+        self, monkeypatch, mock_host_with_no_cambrionix, capfd
+    ):
+        mock_host_with_no_cambrionix()
+
+        with monkeypatch.context() as patch:
+            patch.setattr(sys, "argv", ["find-cambrionix-mapping"])
+
+            with pytest.raises(SystemExit) as pytest_wrapped_e:
+                main()
+            out, unused_err = capfd.readouterr()
+            assert ("No single Cambrionix hub detected.\n") == out
+            assert SystemExit == pytest_wrapped_e.type
+
+    def test_find_mapping_cambrionix(
+        self, monkeypatch, mock_host_with_one_cambrionix, capfd
+    ):
+        host = mock_host_with_one_cambrionix()
+
+        with monkeypatch.context() as patch:
+            patch.setattr(sys, "argv", ["find-cambrionix-mapping"])
+            main()
+            out, unused_err = capfd.readouterr()
+            host.mock_open.assert_called_once_with(
+                "/usr/local/tmp/usb_hub_mapping.yaml", "w", encoding="utf-8"
+            )
+            write_calls = [
+                call("1"),
+                call(":"),
+                call(" "),
+                call("a"),
+                call("\n"),
+                call("2"),
+                call(":"),
+                call(" "),
+                call("b"),
+                call("\n"),
+                call("3"),
+                call(":"),
+                call(" "),
+                call("c"),
+                call("\n"),
+                call("4"),
+                call(":"),
+                call(" "),
+                call("d"),
+                call("\n"),
+                call("5"),
+                call(":"),
+                call(" "),
+                call("e"),
+                call("\n"),
+                call("6"),
+                call(":"),
+                call(" "),
+                call("f"),
+                call("\n"),
+                call("7"),
+                call(":"),
+                call(" "),
+                call("g"),
+                call("\n"),
+                call("8"),
+                call(":"),
+                call(" "),
+                call("h"),
+                call("\n"),
+                call("9"),
+                call(":"),
+                call(" "),
+                call("i"),
+                call("\n"),
+                call("10"),
+                call(":"),
+                call(" "),
+                call("j"),
+                call("\n"),
+                call("11"),
+                call(":"),
+                call(" "),
+                call("k"),
+                call("\n"),
+                call("12"),
+                call(":"),
+                call(" "),
+                call("l"),
+                call("\n"),
+                call("13"),
+                call(":"),
+                call(" "),
+                call("m"),
+                call("\n"),
+                call("14"),
+                call(":"),
+                call(" "),
+                call("n"),
+                call("\n"),
+                call("15"),
+                call(":"),
+                call(" "),
+                call("o"),
+                call("\n"),
+            ]
+            host.mock_open().write.assert_has_calls(calls=write_calls, any_order=False)
+            assert ("") == out
+
+    def test_find_mapping_multiple_cambrionix(
+        self, monkeypatch, mock_host_with_multiple_cambrionix, capfd
+    ):
+        mock_host_with_multiple_cambrionix()
+
+        with monkeypatch.context() as patch:
+            patch.setattr(sys, "argv", ["find-cambrionix-mapping"])
+
+            with pytest.raises(SystemExit) as pytest_wrapped_e:
+                main()
+            out, unused_err = capfd.readouterr()
+            assert (
+                "More than 1 cambrionix hub found panic !!\nNo single Cambrionix hub detected.\n"
+            ) == out
+            assert SystemExit == pytest_wrapped_e.type
diff --git a/usb_hubs/tests/e2e/cambronix/test_port_cycle.py b/usb_hubs/tests/e2e/cambronix/test_port_cycle.py
new file mode 100644
index 0000000..4813f6e
--- /dev/null
+++ b/usb_hubs/tests/e2e/cambronix/test_port_cycle.py
@@ -0,0 +1,52 @@
+# Copyright 2024 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+# pylint: disable=redefined-outer-name
+# pylint: disable=unused-argument
+# pylint: disable=import-error
+# pylint: disable=missing-module-docstring
+# pylint: disable=missing-function-docstring
+# pylint: disable=missing-class-docstring
+# pylint: disable=R0801
+# When pylint supports proto better remove
+# pylint: disable=no-member
+
+import sys
+
+import pytest
+from usb_hubs.per_port_powercycle import main
+
+
+class TestPortCycle:
+    def test_no_servo_serial_flag(
+        self, monkeypatch, mock_host_with_one_cambrionix, capfd
+    ):
+        mock_host_with_one_cambrionix()
+
+        with monkeypatch.context() as patch:
+            patch.setattr(sys, "argv", ["per_port_powercycle"])
+
+            with pytest.raises(SystemExit) as pytest_wrapped_e:
+                main()
+            out, unused_err = capfd.readouterr()
+            assert ("Must specify servo serial param. --servo_serial=<serial>\n") == out
+            assert SystemExit == pytest_wrapped_e.type
+
+    def test_basic(self, monkeypatch, mock_host_with_one_cambrionix, capfd):
+        host = mock_host_with_one_cambrionix()
+
+        with monkeypatch.context() as patch:
+            patch.setattr(sys, "argv", ["per_port_powercycle", "--servo_serial", "a"])
+            main()
+            out, unused_err = capfd.readouterr()
+            assert "" == out
+            assert b"mode off 1\rmode sync 1\r" == host.console.written
+
+        host.console.written = b""
+        with monkeypatch.context() as patch:
+            patch.setattr(sys, "argv", ["per_port_powercycle", "--servo_serial", "o"])
+            main()
+            out, unused_err = capfd.readouterr()
+            assert "" == out
+            assert b"mode off 15\rmode sync 15\r" == host.console.written
diff --git a/usb_hubs/tests/fixtures/mock_cambrionix_console.py b/usb_hubs/tests/fixtures/mock_cambrionix_console.py
new file mode 100644
index 0000000..2c2b677
--- /dev/null
+++ b/usb_hubs/tests/fixtures/mock_cambrionix_console.py
@@ -0,0 +1,90 @@
+# Copyright 2024 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+# pylint: disable=missing-module-docstring
+# pylint: disable=missing-function-docstring
+# pylint: disable=missing-class-docstring
+
+import contextlib
+import logging
+import re
+
+from cambrionix.console_lib import CONSOLE_CLEAR_CHR
+from cambrionix.console_lib import CONSOLE_ENTER
+import pytest
+
+
+_logger = logging.getLogger("mock_cambrionix_host")
+
+
+@pytest.fixture(scope="function")
+def mock_cambrionix_console():
+    def generate_cambrionix_console():
+        class MockCambrionixConsole(contextlib.AbstractContextManager):
+            def __init__(self):
+                self.last_write = None
+                self.written = b""
+                self.console_responses = {
+                    b"": None,
+                }
+
+                self.port_enabled_mapping = {
+                    1: True,
+                    2: True,
+                    3: True,
+                    4: True,
+                    5: True,
+                    6: True,
+                    7: True,
+                    8: True,
+                    9: True,
+                    10: True,
+                    11: True,
+                    12: True,
+                    13: True,
+                    14: True,
+                    15: True,
+                }
+
+            def reset_input_buffer(self):
+                pass
+
+            def reset_output_buffer(self):
+                pass
+
+            def write(self, text):
+                self.last_write = text.strip(
+                    f"{CONSOLE_ENTER}{CONSOLE_CLEAR_CHR} ".encode("utf-8")
+                )
+                self.written += text
+                if text.strip() == b"mode off":
+                    self.port_enabled_mapping = {
+                        port: False for port in self.port_enabled_mapping
+                    }
+                elif text.strip() == b"mode sync":
+                    self.port_enabled_mapping = {
+                        port: True for port in self.port_enabled_mapping
+                    }
+                match = re.match(r"mode off ([0-9]*)", text.strip().decode("utf-8"))
+                if match:
+                    self.port_enabled_mapping[int(match.group(1))] = False
+                match = re.match(r"mode on ([0-9]*)", text.strip().decode("utf-8"))
+                if match:
+                    self.port_enabled_mapping[int(match.group(1))] = True
+
+            def read(self):
+                return None
+
+            def read_until(self, unused_end_char):
+                return self.console_responses[self.last_write]
+
+            def flush(self):
+                pass
+
+            def __exit__(self, exc_type, exc_value, traceback):
+                pass
+
+        return MockCambrionixConsole()
+
+    return generate_cambrionix_console
diff --git a/usb_hubs/tests/fixtures/mock_cambrionix_host.py b/usb_hubs/tests/fixtures/mock_cambrionix_host.py
new file mode 100644
index 0000000..1a466f8
--- /dev/null
+++ b/usb_hubs/tests/fixtures/mock_cambrionix_host.py
@@ -0,0 +1,141 @@
+# Copyright 2024 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+# pylint: disable=unused-argument
+# pylint: disable=import-error
+# pylint: disable=missing-module-docstring
+# pylint: disable=missing-function-docstring
+# pylint: disable=missing-class-docstring
+# pylint: disable=import-error
+# pylint: disable=no-name-in-module
+
+
+import logging
+import os
+from pathlib import PosixPath
+import re
+
+import pytest
+import serial
+from usb_hubs.cambrionix.console_lib import SERIAL_PATH
+
+
+_logger = logging.getLogger("mock_cambrionix_host")
+
+
+class USBDevice:
+    servo_serial_number = None
+    vid = None
+
+    def __init__(self, serial_number, vid):
+        self.servo_serial_number = serial_number
+        self.vid = vid
+
+
+@pytest.fixture(scope="function")
+def mock_cambrionix_host(class_mocker, mocker):
+    def generate_cambrionix_host():
+        """Mock generator function.   This allows multiple tests to be run in
+        parallel as it generates a new mock for each test vs sharing the same
+        mock between tests.
+
+        Returns:
+            MockCambrionixHost:
+        """
+
+        class MockCambrionixHost:
+            def __init__(self):
+                self.usb_devices = []
+                class_mocker.patch("serial.Serial", side_effect=self.mock_serial_open)
+                class_mocker.patch(
+                    "pathlib.PosixPath.glob",
+                    side_effect=self.mock_glob,
+                )
+                class_mocker.patch(
+                    "usb.core.find",
+                    side_effect=self.mock_usbfind,
+                )
+
+                class_mocker.patch(
+                    "usb.util.get_string",
+                    side_effect=self.mock_usb_get_string,
+                )
+
+                mocker.patch(
+                    "time.sleep",
+                    side_effect=self.mock_sleep,
+                )
+
+                self.mock_open = mocker.mock_open(
+                    read_data=(
+                        "1: a\n2: b\n3: c\n4: d\n5: e\n6: f\n7: g\n8: h\n9: i\n10:"
+                        " j\n11: k\n12: l\n13: m\n14: n\n15: o\n"
+                    )
+                )
+
+                mocker.patch("builtins.open", self.mock_open)
+
+                self.port_device_mapping = {
+                    1: USBDevice("a", 0x520B),
+                    2: USBDevice("b", 0x520B),
+                    3: USBDevice("c", 0x520B),
+                    4: USBDevice("d", 0x520B),
+                    5: USBDevice("e", 0x520B),
+                    6: USBDevice("f", 0x520B),
+                    7: USBDevice("g", 0x520B),
+                    8: USBDevice("h", 0x520B),
+                    9: USBDevice("i", 0x520B),
+                    10: USBDevice("j", 0x520B),
+                    11: USBDevice("k", 0x520B),
+                    12: USBDevice("l", 0x520B),
+                    13: USBDevice("m", 0x520B),
+                    14: USBDevice("n", 0x520B),
+                    15: USBDevice("o", 0x520B),
+                }
+
+                self.console = None
+
+            def set_usb_devices(self, usb_devices):
+                self.usb_devices = usb_devices
+
+            def clear_usb_devices(self):
+                self.usb_devices = []
+
+            def raise_on_open(self):
+                class_mocker.patch(
+                    "serial.Serial", side_effect=serial.SerialException()
+                )
+
+            def mock_glob(self, glob_str):
+                matched_usb_devices = [
+                    device for device in self.usb_devices if re.match(glob_str, device)
+                ]
+                return [
+                    PosixPath(f"{SERIAL_PATH}{device}")
+                    for device in matched_usb_devices
+                ]
+
+            # pylint
+            def mock_serial_open(self, port, baudrate, timeout):
+                if not os.path.basename(port) in self.usb_devices:
+                    raise serial.SerialException(f"Device not found {port}.")
+                self.console = self.usb_devices[os.path.basename(port)]
+                return self.console
+
+            def mock_usbfind(self, vendor_id, find_all):
+                results = []
+                for port, enabled in self.console.port_enabled_mapping.items():
+                    if enabled and self.port_device_mapping[port].vid == vendor_id:
+                        results.append(self.port_device_mapping[port])
+                return results
+
+            def mock_usb_get_string(self, device, attr):
+                return attr
+
+            def mock_sleep(self, time):
+                return
+
+        return MockCambrionixHost()
+
+    return generate_cambrionix_host
diff --git a/usb_hubs/tests/fixtures/mock_cambrionix_host_with_consoles.py b/usb_hubs/tests/fixtures/mock_cambrionix_host_with_consoles.py
new file mode 100644
index 0000000..f0ba035
--- /dev/null
+++ b/usb_hubs/tests/fixtures/mock_cambrionix_host_with_consoles.py
@@ -0,0 +1,56 @@
+# Copyright 2024 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+# pylint: disable=missing-module-docstring
+# pylint: disable=missing-function-docstring
+# pylint: disable=missing-class-docstring
+# pylint: disable=import-error
+# pylint: disable=no-name-in-module
+
+import logging
+
+import pytest
+
+
+_logger = logging.getLogger("mock_cambrionix_host_with_console")
+
+
+@pytest.fixture()
+def mock_host_with_no_cambrionix(mock_cambrionix_host):
+    def generate_host():
+        mock_cambrionix_host()
+
+    return generate_host
+
+
+@pytest.fixture()
+def mock_host_with_one_cambrionix(mock_cambrionix_host, mock_cambrionix_console):
+    def generate_host():
+        cambrionix_console = mock_cambrionix_console()
+        cambrionix_host = mock_cambrionix_host()
+        cambrionix_host.set_usb_devices(
+            {"usb-cambrionix_SuperSync15_0000003B71550C92-if01": cambrionix_console}
+        )
+
+        return cambrionix_host
+
+    return generate_host
+
+
+@pytest.fixture()
+def mock_host_with_multiple_cambrionix(mock_cambrionix_host, mock_cambrionix_console):
+    def generate_host():
+
+        cambrionix_console = mock_cambrionix_console()
+        cambrionix_host = mock_cambrionix_host()
+        cambrionix_host.set_usb_devices(
+            {
+                "usb-cambrionix_SuperSync15_0000003B71550C92-if01": cambrionix_console,
+                "usb-cambrionix_SuperSync15_0000003B71550C93-if01": cambrionix_console,
+            }
+        )
+
+        return cambrionix_host
+
+    return generate_host