| # Copyright 2022 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Discover devices for a servod instance.""" |
| |
| import collections |
| from enum import Enum |
| import logging |
| import os |
| import pprint |
| import select |
| import sys |
| |
| from servo import servo_dev_templates |
| from servo.utils import scratch |
| from servo.utils import servo_dev_hierarchy |
| |
| |
| # Timeout in seconds of user interactive menu |
| INTERACTIVE_MENU_TIMEOUT_SECONDS = 30 |
| |
| |
| class ServoDeviceFinderError(Exception): |
| """ServoDeviceFinderError error class.""" |
| |
| |
| class ServoDeviceDiscoveryMode(Enum): |
| """Indicates how much auto discovery can be performed by ServoDeviceFinder.""" |
| |
| # Enable full auto discovery |
| # 1. Pull in complete clusters of the devices provided through cmdline and rc file |
| # 2. When user does not provide enough information for a device on cmdline |
| # and rc file (e.g. when vid/pid/serial is None), try selecting a device based |
| # on each device's priority. |
| FULL_AUTO = 0 |
| |
| # Enable minimum auto discovery |
| # 1. Pull in bare minimum devices that relate to the devices provided through |
| # cmdline and rc file. For any root hub device (e.g. ServoV4), all its cluster |
| # member are pulled in. For any non-root-hub device, only its |
| # cluster root gets pulled in. |
| # 2. When user does not provide enough information for a device on cmdline |
| # and rc file (e.g. when vid/pid/serial is None), try selecting a device based |
| # on each device's priority. |
| MIN_AUTO = 1 |
| |
| # No auto discovery |
| # 1. Only pull in the devices provided through cmdline and rc file. |
| # 2. When user does not provide enough information for a device on cmdline |
| # and rc file (e.g. when vid/pid/serial is None), let user choose through |
| # an interactive cmdline menu. |
| NO_AUTO = 2 |
| |
| |
| def _ClusterSortKey(servo_dev_entry): |
| """Return a key for sorting servos in a cluster with the root servo first. |
| |
| This function is intended for use as a sorting key. There is no reason to |
| call this function directly, instead use servo_dev_entry.is_cluster_root(). |
| |
| Args: |
| servo_dev_entry: servo.utils.servo_dev_hierarchy.ServoDeviceEntry |
| |
| Returns: |
| hashable object |
| """ |
| return (not servo_dev_entry.is_cluster_root(),) + servo_dev_entry.key |
| |
| |
| class ServoDeviceFinder: |
| """Discover devices to be served by a servod instance.""" |
| |
| def __init__( |
| self, |
| devopts, |
| devopts_generator, |
| dev_hierarchy, |
| servo_scratch, |
| discover_mode, |
| choose_device=None, |
| ): |
| """Set up the servo device finder. |
| |
| Args: |
| devopts: a list of device opts parsed from the servod starting |
| devopts_generator: a function that generates a default devopts |
| for devices pulled in during device auto-discovery. |
| dev_hierarchy: a ServoDeviceHierarchy generated when the servod starts |
| scratch: ServoScratch that manages information across different servod |
| instances. |
| discover_mode: ServoDeviceDiscoveryMode that indicates how much auto discovery |
| can be performed by the finder when discovering devices. |
| choose_device: a param used to mock behavior for user interactively choose |
| device on the cmd line. It should only be not None in tests. |
| """ |
| self._logger = logging.getLogger("ServoDeviceFinder") |
| self._devopts = devopts |
| self._devopts_generator = devopts_generator |
| self._dev_hierarchy = dev_hierarchy |
| self._scratch = servo_scratch |
| self.discover_mode = discover_mode |
| self.choose_device = ( |
| choose_device if choose_device is not None else self._choose_device |
| ) |
| |
| def discover_servos(self): |
| """Complete the device list of servod from servod command line device opts |
| and servo device hierarchy. |
| |
| The complete device list is derived from the invocation_devs (parsed from the |
| servod starting command line) and the servo device hierarchy in the following |
| ways: |
| (1) If invocation_devs is empty or all-inclusive, return all the devices in the |
| hierarchy. |
| (2) If discover_mode is FULL_AUTO, all member devices in a cluster will be |
| served with this servod instance. |
| For any device (e.g. ServoV4) in invocation_devs, all its cluster member |
| (i.e. servo devices attached to it and its root hub) get pulled into the |
| device list. |
| (3) If discover_mode is MIN_AUTO, only the bare minimum devices are pulled into |
| this servod instance. |
| For any cluster root device (e.g. ServoV4) in invocation_devs, all its |
| cluster member get pulled into the device list. For any non-root device in |
| invocation_devs, only its cluster root gets pulled in. Other devices in the |
| same cluster are not pulled in, unless they are already specified in |
| invocation_devs. |
| (4) If discover_mode is NO_AUTO, only pull in the devices in invocation_devs. |
| (5) If any device cannot be pulled in because it is already used in another |
| servod instance, we throw an error and exit servod. |
| |
| Returns: |
| A list representing the complete device list of servo. Each entry is a |
| ServoDeviceEntry. |
| |
| Raises: |
| ServoDeviceFinderError: if a device cannot be pulled because it is already |
| served by another servod instance |
| """ |
| self._logger.info("Start discovering all devices for this servod instance.") |
| # First pull in all the devices included in the command line invocation |
| invocation_devs = set() |
| for one_dev_opts in self._devopts: |
| (vid, pid, serial) = ( |
| one_dev_opts.vendor, |
| one_dev_opts.product, |
| one_dev_opts.serialname, |
| ) |
| dev_entry = self._find_one_device(vid, pid, serial) |
| dev_entry.devopts = one_dev_opts |
| self._logger.info( |
| "Pulling in device %s as it is included in invocation args.", dev_entry |
| ) |
| invocation_devs.add(dev_entry) |
| |
| # Then pull in all the devices connecting to the devices included in command |
| # line invocation |
| devices = invocation_devs.copy() |
| if self.discover_mode != ServoDeviceDiscoveryMode.NO_AUTO: |
| for dev_entry in invocation_devs: |
| # For a root hub device, include all cluster members. |
| # Same if "full" discovery mode was requested. |
| if ( |
| dev_entry.is_cluster_root() |
| or self.discover_mode == ServoDeviceDiscoveryMode.FULL_AUTO |
| ): |
| for member in sorted(dev_entry.cluster_root.cluster_members): |
| if member not in devices: |
| self._logger.info( |
| ( |
| "Pulling in device %s as it is a member of" |
| " the same cluster as %s." |
| ), |
| member, |
| dev_entry, |
| ) |
| self._complete_devopts(member, dev_entry) |
| devices.add(member) |
| elif dev_entry.cluster_root not in devices: |
| self._logger.info( |
| "Pulling in device %s as it is the parent hub of device %s.", |
| dev_entry.cluster_root, |
| dev_entry, |
| ) |
| self._complete_devopts(dev_entry.cluster_root, dev_entry) |
| devices.add(dev_entry.cluster_root) |
| |
| dev_list = list(devices) |
| self.validate_device_availability(dev_list) |
| return dev_list |
| |
| def _find_one_device(self, vid, pid, serial): |
| """Find 1 device given a dev_id (vid, pid, serial) from the device hierarchy. |
| |
| Any of the vid, pid, serial can be None. |
| |
| Args: |
| vid: vendor id of a device |
| pid: product id of a device |
| serial: serial name of a device |
| |
| Returns: |
| A device matching the given dev_id in the device hierarchy. |
| |
| Raises: |
| ServoDeviceFinderError: no device can be found |
| ServoDeviceFinderError: multiple devices are found and user fail to pick |
| one from them |
| """ |
| input_str = "vid: %s pid: %s serial: %s" % (vid, pid, serial) |
| if (not vid) and (not pid) and (not serial): |
| candidates = sorted(self._dev_hierarchy.get_all_entries().values()) |
| else: |
| candidates = sorted(self._dev_hierarchy.get_entries(vid, pid, serial)) |
| self._logger.info( |
| "Servo candidates:\n%s", "\n".join(repr(c) for c in candidates) |
| ) |
| |
| # Determine the servo clusters represented by all of the candidate servos. |
| # Sets are used here for uniqueness. |
| clusters = { |
| frozenset(self._dev_hierarchy.get_cluster(c.vid, c.pid, c.serial)) |
| for c in candidates |
| } |
| # Now replace the sets with sorted lists, for consistent logging output. |
| # Within each servo cluster, the root servo should be listed first. |
| clusters = sorted(sorted(clstr, key=_ClusterSortKey) for clstr in clusters) |
| self._logger.info( |
| "Servo clusters represented by the candidates:\n%s", |
| pprint.pformat(clusters), |
| ) |
| # The clusters list is currently only used for logging output, as requested |
| # in https://issuetracker.google.com/277768816 for ease of troubleshooting. |
| # It may in the future be useful for enhancing interactive servo selection. |
| |
| if len(candidates) < 1: |
| raise ServoDeviceFinderError( |
| "Cannot find a servo device with %s" % (input_str,) |
| ) |
| candidate = candidates[0] |
| if len(candidates) > 1: |
| self._logger.info("Found > 1 servo devices with %s", input_str) |
| # when user does not provide enough information for picking a device |
| # (e.g. when vid/pid/serial is None), try selecting a device based on\ |
| # each device's priority. |
| if self.discover_mode != ServoDeviceDiscoveryMode.NO_AUTO: |
| self._logger.info("Selecting a servo device among the candidates...") |
| prioritized_devs = ( |
| servo_dev_hierarchy.ServoDeviceHierarchy.generate_device_priority( |
| candidates |
| ) |
| ) |
| candidates = ( |
| servo_dev_hierarchy.ServoDeviceHierarchy.most_prioritized_devices( |
| prioritized_devs |
| ) |
| ) |
| candidate = candidates[0] |
| if len(candidates) > 1: |
| self._logger.info("") |
| self._logger.info( |
| "We have found multiple devices that match args provided %s", |
| input_str, |
| ) |
| candidate = self.choose_device(candidates) |
| if not candidate: |
| raise ServoDeviceFinderError( |
| "User did not choose a valid device for %s from candidates %s" |
| % (input_str, candidates) |
| ) |
| self._logger.debug("Found device %s with %s", candidate, input_str) |
| return candidate |
| |
| def _complete_devopts(self, new_dev, old_dev): |
| """Complete the device options for a device based on the ones of another device. |
| |
| Args: |
| new_dev: a ServoDeviceEntry whose device options is to be generated |
| old_dev: a ServoDeviceEntry which already has device options |
| """ |
| new_dev.devopts = self._devopts_generator() |
| for arg in "board", "model", "config", "noautoconfig": |
| setattr(new_dev.devopts, arg, getattr(old_dev.devopts, arg)) |
| |
| def choose_main_device(self, devs): |
| """Choose the main device of the servod instance. |
| |
| The main device is the servo device that processes controls sent to the |
| servod instance by default. It receives the prefix '' and 'main'. |
| |
| Args: |
| devs: a list representing the complete device list of servo. Each entry |
| is a ServoDeviceEntry. |
| |
| Returns: |
| A device as the main device. |
| |
| Raises: |
| ServoDeviceFinderError: User does not pick a main device from multiple |
| candidates |
| """ |
| prioritized_devs = ( |
| servo_dev_hierarchy.ServoDeviceHierarchy.generate_device_priority(devs) |
| ) |
| user_chosen_mains = prioritized_devs[servo_dev_hierarchy.PRIORITY_MAIN_DEV] |
| if user_chosen_mains: |
| candidate = user_chosen_mains[0] |
| if len(user_chosen_mains) > 1: |
| self._logger.info("") |
| self._logger.info( |
| "Please pick 1 of the following devices as the main device." |
| ) |
| candidate = self.choose_device(user_chosen_mains) |
| else: |
| self._logger.info( |
| "User did not select the main device. Servod will try to choose one." |
| ) |
| candidates = ( |
| servo_dev_hierarchy.ServoDeviceHierarchy.most_prioritized_devices( |
| prioritized_devs |
| ) |
| ) |
| candidate = candidates[0] |
| if len(candidates) > 1: |
| self._logger.info("") |
| self._logger.info( |
| "Please pick 1 of the following devices as the main device." |
| ) |
| candidate = self.choose_device(candidates) |
| if not candidate: |
| raise ServoDeviceFinderError("No device is picked as the main device.") |
| self._logger.info("Main device is chosen as the device %s", candidate) |
| return candidate |
| |
| def _choose_device(self, devs): |
| """Let user choose a device from available list of unique devices. |
| |
| Args: |
| devs: a list of ServoDeviceEntry devices |
| |
| Returns: |
| ServoDeviceEntry for a single device, or None if user does not choose |
| """ |
| logging.info("") |
| for i, dev in enumerate(devs): |
| logging.info("Press '%d' for device %s", i, dev) |
| |
| # Check if stdin exists |
| if not os.isatty(sys.stdin.fileno()): |
| logging.warning("No stdin exists for user to choose a device.") |
| return None |
| |
| (rlist, _unused, _unused) = select.select( |
| [sys.stdin], [], [], INTERACTIVE_MENU_TIMEOUT_SECONDS |
| ) |
| if not rlist: |
| logging.warning("Timed out waiting for your choice\n") |
| return None |
| |
| rsp = rlist[0].readline().strip() |
| try: |
| rsp = int(rsp) |
| except ValueError: |
| logging.warning("%s not a valid choice ... ignoring", rsp) |
| return None |
| |
| if rsp < 0 or rsp >= len(devs): |
| logging.warning("%s outside of choice range ... ignoring", rsp) |
| return None |
| |
| logging.info("") |
| dev = devs[rsp] |
| logging.info("Chose %d ... device %s", rsp, dev) |
| logging.info("") |
| return dev |
| |
| def generate_prefixes(self, devs, main_dev): |
| """Generate a prefix for the device if it does not have one. |
| |
| Args: |
| devs: a list of ServoDeviceEntry devices |
| """ |
| known_prefixes = set() |
| # a dictionary of devices keyed by device type |
| dev_type_map = collections.defaultdict(lambda: []) |
| for dev in devs: |
| dev_type_map[dev.dev_template.TYPE].append(dev) |
| dev_prefix = set(dev.devopts.prefix) - known_prefixes |
| |
| if dev == main_dev: |
| self._logger.debug( |
| "Device %s is the main device and is given prefix %r", |
| dev, |
| servo_dev_templates.MAIN_DEV_PREFIXES, |
| ) |
| dev_prefix.update(servo_dev_templates.MAIN_DEV_PREFIXES) |
| else: |
| # prevent non-main device from having main device prefixes |
| dev_prefix.difference_update(servo_dev_templates.MAIN_DEV_PREFIXES) |
| |
| if dev == main_dev.cluster_root: |
| self._logger.debug( |
| "Device %s is the root device and is given prefix %r", |
| dev, |
| servo_dev_templates.ROOT_DEV_PREFIX, |
| ) |
| dev_prefix.add(servo_dev_templates.ROOT_DEV_PREFIX) |
| else: |
| # prevent non-root device from having root device prefix |
| dev_prefix.discard(servo_dev_templates.ROOT_DEV_PREFIX) |
| |
| dev.devopts.prefix = sorted(dev_prefix) |
| known_prefixes.update(dev_prefix) |
| self._logger.debug( |
| "Device %s is given prefix %s during invocation", dev, dev_prefix |
| ) |
| |
| # auto generate prefix based on device type |
| for dev_type, devices in dev_type_map.items(): |
| for dev in devices: |
| # use the device type as the prefix if it is the only 1 device |
| # of the kind |
| if len(devices) == 1 and dev_type not in known_prefixes: |
| prefix = dev_type |
| # otherwise, use device type and the last 4 digit of serial |
| else: |
| prefix = "%s-%s" % (dev_type, dev.serial[-4:]) |
| if prefix in known_prefixes: |
| suffix = 2 |
| while "%s-%s" % (prefix, suffix) in known_prefixes: |
| suffix += 1 |
| prefix = "%s-%s" % (prefix, suffix) |
| dev.devopts.prefix.append(prefix) |
| known_prefixes.add(prefix) |
| self._logger.debug( |
| "Device %s is given prefix %s which is automatically generated", |
| dev, |
| prefix, |
| ) |
| |
| # Add ccd_gsc as a prefix for ccd_ti50 for backward compatibility |
| # TODO: decide whether to unify ccd_cr50 and ccd_ti50 to ccd_gsc |
| if prefix.startswith("ccd_ti50"): |
| gsc_prefix = prefix.replace("ccd_ti50", "ccd_gsc") |
| dev.devopts.prefix.append(gsc_prefix) |
| known_prefixes.add(gsc_prefix) |
| self._logger.debug( |
| "Device %s is given prefix %s which is automatically generated", |
| dev, |
| gsc_prefix, |
| ) |
| |
| def validate_device_availability(self, devs): |
| """Check against ServoScratch that all devices are not served by another |
| servod instance. |
| |
| Args: |
| devs: a list of ServoDeviceEntry devices |
| |
| Raises: |
| ServoDeviceFinderError: some device is served by another servod instance. |
| """ |
| has_error = False |
| for servod_instance in self._scratch.GetAllEntries(): |
| for dev in devs: |
| if dev.serial in servod_instance[scratch.SERIAL_KEY]: |
| has_error = True |
| self._logger.error( |
| "Device %s is already served by another servod instance" |
| "on port %s", |
| dev, |
| servod_instance[scratch.PORT_KEY], |
| ) |
| if has_error: |
| raise ServoDeviceFinderError( |
| "Not all devices requested are available right now." |
| ) |
| |
| def validate_devopts(self, devs): |
| """Validate devices have valid devopts. |
| |
| Args: |
| devs: a list of ServoDeviceEntry devices |
| |
| Raises: |
| ServoDeviceFinderError: some device does not have 'prefix' attribute |
| ServoDeviceFinderError: multiple devices have "main" or "" as 'prefix' |
| """ |
| main_prefix_count = 0 |
| for dev in devs: |
| if not dev.devopts.prefix: |
| raise ServoDeviceFinderError("Device %s does not have a prefix." % dev) |
| if set(dev.devopts.prefix).intersection( |
| set(servo_dev_templates.MAIN_DEV_PREFIXES) |
| ): |
| main_prefix_count += 1 |
| if main_prefix_count > 1: |
| raise ServoDeviceFinderError( |
| "Multiple devices are chosen as the main device." |
| ) |