blob: 35d8a4809dbf781de6fdfd71f7600e266be31cf2 [file] [log] [blame]
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Specific Servo PostInit functions."""
import collections
import logging
import os
import re
import subprocess
import usb
import servo_interfaces
import system_config
POST_INIT = collections.defaultdict(dict)
class UsbHierarchy(object):
"""A helper class to analyze the sysfs hierarchy of USB devices."""
USB_SYSFS_PATH = '/sys/bus/usb/devices'
CHILD_RE = re.compile(r'\d+-\d+(\.\d+){1,}\Z')
BUS_FILE = 'busnum'
DEV_FILE = 'devnum'
def __init__(self):
# Get the current USB sysfs hierarchy.
def refresh_usb_hierarchy(self):
"""Walk through usb sysfs files and gather parent identifiers.
The usb sysfs dir contains dirs of the following format:
- 1-2
- 1-2:1.0
- 1-2.4
- 1-2.4:1.0
The naming convention works like so:
<roothub>-<hub port>[.port[.port]]:config.interface
We are only going to be concerned with the roothub, hub port and port.
We are going to create a hierarchy where each device will store the usb
sysfs path of its roothub and hub port. We will also grab the device's
bus and device number to help correlate to a usb.core.Device object.
We will walk through each dir and only match on device dirs
(e.g. '1-2.4') and ignore config.interface dirs. When we get a hit, we'll
grab the bus/dev and infer the roothub and hub port from the dir name
('1-2' from '1-2.4') and store the info into a dict.
The dict key will be a tuple of (bus, dev) and value be the sysfs path.
Dict of tuple (bus,dev) to sysfs path.
usb_hierarchy = {}
for usb_dir in os.listdir(self.USB_SYSFS_PATH):
bus_file = os.path.join(self.USB_SYSFS_PATH, usb_dir, self.BUS_FILE)
dev_file = os.path.join(self.USB_SYSFS_PATH, usb_dir, self.DEV_FILE)
if (self.CHILD_RE.match(usb_dir)
and os.path.exists(bus_file)
and os.path.exists(dev_file)):
parent_arr = usb_dir.split('.')[:-1]
parent = '.'.join(parent_arr)
bus = ''
with open(bus_file, 'r') as bfile:
bus =
dev = ''
with open(dev_file, 'r') as dfile:
dev =
usb_hierarchy[(bus, dev)] = parent
self._usb_hierarchy = usb_hierarchy
def _get_parent_path(self, usb_device):
"""Return the USB sysfs path of the supplied usb_device's parent.
usb_device: usb.core.Device object.
SysFS path string of parent of the supplied usb device.
return self._usb_hierarchy.get((str(usb_device.bus),
def share_same_parent(self, usb_device1, usb_device2):
"""Check if the given two USB devices share the same parent.
usb_device1: usb.core.Device object.
usb_device2: usb.core.Device object.
True if they share the same parent; otherwise, False.
usb_parent1 = self._get_parent_path(usb_device1)
usb_parent2 = self._get_parent_path(usb_device2)
return (usb_parent1 and usb_parent2
and usb_parent1 == usb_parent2)
class BasePostInit(object):
"""Base Class for Post Init classes."""
def __init__(self, servod):
self.servod = servod
self._logger = logging.getLogger(self.__class__.__name__)
def post_init(self):
"""Main entry to the post init class, must be implemented by subclasses."""
raise NotImplementedError('post_init not implemented')
class ServoV4PostInit(BasePostInit):
"""Servo v4 Post init class.
We're going to check if there are any dut interfaces attached and if they are
connected through the specific servo v4 initialized in the servod object. If
so, we're going to initialize the dut interfaces connected to the v4 so that
the user can control the servo v4 and dut interfaces through one servod
SERVO_MICRO_CFG = 'servo_micro.xml'
# Only support CR50 CCD.
CCD_CFG = 'ccd_cr50.xml'
def _get_all_usb_devices(self, vid_pid_list):
"""Return all associated USB devices which match the given VID/PID's.
vid_pid_list: List of tuple (vid, pid).
List of usb.core.Device objects.
all_devices = []
for vid, pid in vid_pid_list:
devs = usb.core.find(idVendor=vid, idProduct=pid, find_all=True)
if devs:
return all_devices
def get_servo_v4_usb_device(self):
"""Return associated servo v4 usb.core.Device object.
servo v4 usb.core.Device object associated with the servod instance.
servo_v4_candidates = self._get_all_usb_devices(
for d in servo_v4_candidates:
d_serial = usb.util.get_string(d, 256, d.iSerialNumber)
if (not self.servod._serialnames[self.servod.MAIN_SERIAL] or
d_serial == self.servod._serialnames[self.servod.MAIN_SERIAL]):
return d
return None
def get_servo_micro_devices(self):
"""Return all servo micros detected.
List of servo micro devices as usb.core.Device objects.
return self._get_all_usb_devices(servo_interfaces.SERVO_MICRO_DEFAULTS)
def get_ccd_devices(self):
"""Return all CCD USB devices detected.
List of CCD USB devices as usb.core.Device objects.
return self._get_all_usb_devices(servo_interfaces.CCD_DEFAULTS)
def prepend_config(self, new_cfg_file):
"""Prepend the given new config file to the existing system config.
The new config, like servo_micro, is properly overwritten by some board
overlays. So we will recreate the config list but with the new config
in front and append the rest of the existing config files loaded
up. Duplicates are ok since the SystemConfig object keeps track of that
for us and will ignore them.
new_cfg_file: List of config files.
cfg_files = [new_cfg_file]
[os.path.basename(f) for f in self.servod._syscfg._loaded_xml_files])
self._logger.debug("Resetting system config files")
new_syscfg = system_config.SystemConfig()
for cfg_file in cfg_files:
self.servod._syscfg = new_syscfg
def add_servo_serial(self, servo_usb, servo_serial_key):
"""Add the servo serial number.
servo_usb: usb.core.Device object that represents the new detected
servo we should be checking against.
servo_serial_key: Key to the servo serial dict.
serial = usb.util.get_string(servo_usb, 256, servo_usb.iSerialNumber)
self.servod._serialnames[servo_serial_key] = serial
def init_servo_interfaces(self, servo_usb):
"""Initialize the new servo interfaces.
servo_usb: usb.core.Device object that represents the new detected
servo we should be checking against.
vendor = servo_usb.idVendor
product = servo_usb.idProduct
serial = usb.util.get_string(servo_usb, 256, servo_usb.iSerialNumber)
servo_interface = servo_interfaces.INTERFACE_DEFAULTS[vendor][product]
self.servod.init_servo_interfaces(vendor, product, serial,
def kick_devices(self):
"""General method to do misc actions.
We'll need to do certain things (like 'lsusb' for servo micro) to ensure
we can detect and initialize extra devices properly. This method is here
to hold all those necessary pre-postinit actions.
# Run 'lsusb' so that servo micros are configured and show up in sysfs.['lsusb'])
def post_init(self):
# Do misc actions so we can detect devices we might want to initialize.
# Snapshot the USB hierarchy at this moment.
usb_hierarchy = UsbHierarchy()
# We want to check if we have a servo micro connected to this servo v4
# and if so, initialize it and add it to the servod instance.
servo_v4 = self.get_servo_v4_usb_device()
servo_micro_candidates = self.get_servo_micro_devices()
for servo_micro in servo_micro_candidates:
# The micro-servo and the STM chip of servo v4 share the same internal hub
# on servo v4 board. Check the USB hierarchy to find the micro-servo
# behind. Assume we have at most one servo micro behind the servo v4.
if usb_hierarchy.share_same_parent(servo_v4, servo_micro):
self.add_servo_serial(servo_micro, self.servod.MICRO_SERVO_SERIAL)
self.servod._version += "_with_servo_micro"
# Try to enable CCD iff no servo-micro is detected.
ccd_candidates = self.get_ccd_devices()
for ccd in ccd_candidates:
# Pick the proper CCD endpoint behind the servo v4.
if usb_hierarchy.share_same_parent(servo_v4, ccd):
self.add_servo_serial(ccd, self.servod.CCD_SERIAL)
self.servod._version += "_with_ccd_cr50"
# If no CCD endpoint is detected, print a message to inform users."No micro-servo and CCD detected.")
# Add in servo v4 post init method.
for vid, pid in servo_interfaces.SERVO_V4_DEFAULTS:
POST_INIT[vid][pid] = ServoV4PostInit
def post_init(servod):
"""Entry point to call post init for a given vid/pid and servod.
servod: servo_server.Servod object.
post_init_class = POST_INIT.get(servod._vendor, {}).get(servod._product)
if post_init_class: