blob: 3496c44bc85344108090f8aa7447ee1135d9b276 [file] [log] [blame] [edit]
# Copyright 2024 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Software interface to allow power cycling ports on plugable hubs."""
import logging
import os
import subprocess
import sys
import time
import servo.utils.usb_hierarchy as uh
from usb_hubs.common import base
# List of PID supported for power-cycle. For now, it's only v4 and v4p1.
# 0x520d: v4p1, 0x501b: v4
PWR_CYCLE_PIDS = [0x520D, 0x501B]
# VID to find all servo devices.
SERVO_VID = 0x18D1
class USBHubCommandsPlugable(base.USBHubCommands):
"""Hub interface implementation for plugable hubs."""
# Lookup table for action on uhubctl.
ACTION_DICT = {"on": 1, "off": 0, "reset": 2}
# Repetitions to perform on the uhubctl command to get it to trigger.
REPS = 100
# Time to sleep after reset to let kernel handle sysfs files
RESET_DEBOUNCE_S = 2
# time to sleep for the servo device to re-enumerate.
MAX_REINIT_SLEEP_S = 8
# Polling intervals to find the |devnum| file for reset device.
REINIT_POLL_SLEEP_S = 0.1
# Time to sleep and attempts to interact with servo console after reboot.
REBOOT_SLEEP_S = 1
REBOOT_TIMEOUT_ATTEMPTS = 4
# Time to sleep after power off
PWR_OFF_SLEEP_S = 1
@staticmethod
def is_hub_detected():
# By default the fleet always assume that a plugable hub is connected.
# TODO can try searching for a VID/PID but it would not be 100% successful.
return True
def __init__(self):
"""Setup scratch to use."""
self._logger = logging.getLogger(type(self).__name__)
def error(self, msg, *args):
# pylint: disable=invalid-name
"""Log error and exit.
Args:
msg: message to log
*args: args to pass to logging module
"""
self._logger.info(msg, *args)
sys.exit(1)
def _usb_path(self, serial):
"""Helper to get the device path.
Args:
serial: str, servo serial
Returns:
/sys/bus/usb/devices/ path to servo with |serial| or None if not found
"""
# This list is used to find all servos on the system.
vid_pid_list = [(SERVO_VID, None)]
devs = uh.Hierarchy.GetAllUsbDeviceSysfsPaths(vid_pid_list)
for dev_path in devs:
dev_serial = uh.Hierarchy.SerialFromSysfs(dev_path)
if dev_serial == serial:
return dev_path
return None
def _check_devnum_reset(self, dev_path, devnum, action):
"""Check that the |devnum| has changed after a reset/reboot/power-cycle
Args:
dev_path: device sysfs path
devnum: int, usb devnum (original devnum, before reset action)
action: str, action performed (used to print better errors/logs
Note: this helper will call self.error() (and thus exit) if
- the devnum does not change
- it fails to read the devnum after self.MAX_REINIT_SLEEP_S
"""
# Sleep a bit to let the device fully fall off, and the sysfs files be
# renewed.
time.sleep(self.RESET_DEBOUNCE_S)
# For |MAX_REINIT_SLEEP_S| seconds, try to find the new devnum for the
# device.
end = time.time() + self.MAX_REINIT_SLEEP_S
while time.time() < end:
try:
# check devnum reset
if devnum == uh.Hierarchy.DevNumFromSysfs(dev_path):
self.error(
"%r likely unsuccessful. devnum stayed the same.", action
)
# If |devnum| changed, then the goal is fulfilled. Move on.
break
except uh.HierarchyError:
# The device might not have re-enumerated yet. Sample again.
time.sleep(self.REINIT_POLL_SLEEP_S)
else:
# The while loop finished without breaking out e.g. we never read the
# |devnum| file successfully.
self.error(
"unable to read device |devnum| file after %ds. Giving up.",
self.MAX_REINIT_SLEEP_S,
)
def _run_uhubctl_command(self, hub, port, action):
"""Build |uhubctl| command performing |action| on |hub|'s |port|.
Args:
hub: hub-port path i.e. /sys/bus/usb/devices/ dirname of the hub
port: str, port number on the hub
action: one of 'on', 'off', 'reset'
"""
cmd = self._build_and_assert_uhubctl(hub=hub, port=port)
if action not in self.ACTION_DICT:
self.error("Action %s unknown", action)
action_number = self.ACTION_DICT[action]
# expand command to perform the uhubctl action.
cmd = cmd + ["-a", str(action_number), "-r", str(self.REPS)]
try:
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
self.error(
'Error performing the uhubctl command. ran: "%s". %s.',
" ".join(cmd),
str(e),
)
def _build_and_assert_uhubctl(self, hub=None, port=None):
"""Assert uhubctl exists and hub/port are known to it (if provided).
Args:
hub: hub-port path i.e. /sys/bus/usb/devices/ dirname of the hub
port: str, port number on the hub
Returns:
cmd: a list of args to call the uhubctl command (at hub/port if provided)
"""
cmd = ["sudo", "uhubctl"]
try:
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
self.error("uhubctl not available. Be sure to run as sudo. %s", str(e))
if hub is not None and port is not None:
# expand the command to check for hub and port existing.
# casting port to str to ensure we don't accidentally pass an int.
cmd = cmd + ["-l", hub, "-p", str(port)]
try:
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
self.error(
"hub %s with port %s unknown to uhubctl. "
"Be sure the hub is a supported smart hub. %s",
hub,
port,
str(e),
)
return cmd
def _get_hub_and_port(self, dev_path, pid):
"""Get the external hub and port paths for a given |pid|.
This is its own function, as different devices might have a different
internal topology. This method has to guarantee to implement all pids
in PWR_CYCLE_PIDS.
Args:
dev_path: /sys/bus/usb/devices/ path for servo
pid: servo pid
Returns:
(hub, port, hub3,) tuple, where hub is the external hub that the
servo is on port is the port on that hub that the
servo is on. hub3 is the usb3 virtual hub of the same
physical hub. It will be None if it does not exist
(hub only enumerated in usb2)
"""
# NOTE: if a servo device or configuration should be supported, this is the
# spot to implement it. If more devices get implemented here, make sure to
# document the setup under which the user can expect it to work and for the
# code to guard against other setups.
# For example: if a servo micro is attached to a servo v4, the code to reset
# the micro should not just reset the hub that the v4 is hanging on, as that
# would reset both.
# TODO(coconutruben): make pid permission more robust once we have device
# templates.
if pid in PWR_CYCLE_PIDS:
# For servo v4(p1), the dev_path points to the stm that's hanging on
# an internal usb hub.
internal_hub = uh.Hierarchy.GetSysfsParentHubStub(dev_path)
smart_hub_path = uh.Hierarchy.GetSysfsParentHubStub(internal_hub)
if smart_hub_path:
# The internal hub is hanging on the smart hub's port. So the last
# index is the port number.
port = internal_hub.rsplit(".", 1)[-1]
smart_hub = os.path.basename(smart_hub_path)
# |internal_hub| is always on usb2. Let's see if this hub also
# enumerated on usb3.
smart_hub_bus, unused_ = smart_hub.split("-")
busnum = int(smart_hub_bus)
busnum3 = uh.Hierarchy.ComplementBusNum(busnum)
smart_hub3 = None
if busnum3:
smart_hub3 = "busnum3-smart_hub_port_path"
smart_hub3_path = os.path.join(
os.path.dirname(smart_hub_path), smart_hub3
)
if not os.path.exists(smart_hub3_path):
# set back to None
smart_hub3 = None
return (smart_hub, port, smart_hub3)
self.error(
"Device does not seem to be hanging on a (smart) hub. %r", dev_path
)
self.error("Unimplemented pid: %04x", pid)
def power_cycle_servo(self, servo_serial_number, downtime_secs=5):
"""Perform a power-cycle on the device using uhubctl.
uhubctl exposes multiple knobs to control the power-cycling of a port.
This method uses the 'reset' knob by default. However, if the user
specifies |force|=True, it will issue an 'off' request, wait for
|PWR_OFF_SLEEP_S| seconds, before issuing an 'on' request. For some hubs
this has proven itself more reliably than a reset request.
Args:
force: bool, whether to perform a full power-cycle or just a reset
"""
self._build_and_assert_uhubctl()
dev_path = self._usb_path(servo_serial_number)
if not dev_path:
self.error("Device with serial %r not found.", servo_serial_number)
pid = uh.Hierarchy.ProductIDFromSysfs(dev_path)
if pid not in PWR_CYCLE_PIDS:
self.error(
"pid: 0x%04x currently not supported for usb power cycling. "
"Please use one of: %s",
pid,
", ".join("0x%04x" % p for p in PWR_CYCLE_PIDS),
)
# get devnum, and store it
devnum = uh.Hierarchy.DevNumFromSysfs(dev_path)
# extract the hub and check whether it's on uhubctl
hub, port, hub3 = self._get_hub_and_port(dev_path, pid)
# The sandwich (if usb2 and usb3 are available) is to first turn
# off usb2 and then usb3, before unrolling that operation.
self._run_uhubctl_command(hub=hub, port=port, action="off")
if hub3 is not None:
self._run_uhubctl_command(hub=hub3, port=port, action="off")
time.sleep(self.PWR_OFF_SLEEP_S)
if hub3 is not None:
self._run_uhubctl_command(hub=hub3, port=port, action="on")
self._run_uhubctl_command(hub=hub, port=port, action="on")
self._check_devnum_reset(dev_path, devnum, "power-cycle")
# At the end, no error was encountered, so indicate belief that reset was
# successful.
self._logger.info(
"Successfully power-cycled device with serial %r. "
"(At least reasonably confident).",
servo_serial_number,
)