| # Copyright 2012 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Servo Server.""" |
| |
| import collections |
| import logging |
| import sys |
| |
| from servo import recovery |
| from servo import servo_dev_templates |
| from servo import sversion_util |
| from servo.utils import diagnose |
| from servo.utils import usb_hierarchy |
| |
| |
| class ServodError(Exception): |
| """Exception class for servod.""" |
| |
| |
| class Servod: |
| """Main class for Servo debug/controller Daemon.""" |
| |
| # Separator for control strings between servo device prefix and control name |
| PREFIX_DELIMITER = "." |
| # Constant for flex Control |
| _IS_FLEX_CTRL = "is_flex_board" |
| |
| def __init__(self, usbkm232=None): |
| """Servod constructor. |
| |
| Args: |
| usbkm232: String. Optional. Path to USB-KM232 device which allow for |
| sending keyboard commands to DUTs that do not have built in |
| keyboards. Used in FAFT tests. Use None for on board AVR MCU. |
| e.g. '/dev/ttyUSB0' or None. |
| |
| Raises: |
| ServodError: if unable to locate init method for particular interface |
| """ |
| self._logger = logging.getLogger("Servod") |
| self._logger.debug("") |
| self._usbkm232 = usbkm232 |
| self._keyboard = None |
| self._usb_keyboard = None |
| # A map of device serialname strings keyed by the device's name/prefix. |
| self._serialnames = collections.defaultdict(lambda: None) |
| # A map of ServoDevices keyed by their name/prefix. |
| # A ServoDevice can have multiple name/prefix (e.g. 'main', '') |
| self._devices = collections.defaultdict(lambda: None) |
| # A map of ServoDevices keyed by their id (vid, pid, serial) |
| # Each ServoDevice has a unique id |
| self._unique_devices = collections.defaultdict(lambda: None) |
| # All known controls of this servod instance |
| self._controls = set() |
| |
| def add_device(self, device, prefix): |
| """Add a ServoDevice to Servod. |
| |
| Args: |
| device: a ServoDevice that can interact with Servod, dut, and other |
| ServoDevices |
| prefix: prefix of the ServoDevice recognized by Servod |
| """ |
| self._logger.debug("Adding ServoDevice %s to instance.", device) |
| if prefix in self._devices: |
| if device != self._devices[prefix]: |
| raise ServodError( |
| ( |
| "ServoDevice prefix %s already represents device %s and " |
| "cannot be added as %s." |
| ) |
| % (prefix, self._devices[prefix], device) |
| ) |
| self._logger.debug( |
| "ServoDevice prefix %s is already added as %s.", |
| prefix, |
| self._devices[prefix], |
| ) |
| return |
| |
| self._unique_devices[device.get_id()] = device |
| self._devices[prefix] = device |
| self.add_serial_number(prefix, device._serial) |
| # ensure the device records all its alias prefixes |
| device.add_prefix(prefix) |
| if prefix == servo_dev_templates.MAIN_DEV_PREFIX: |
| # This is the main device as the prefix is empty. Add prefix alias here |
| # for the main device. |
| self._devices[servo_dev_templates.MAIN_DEV_PREFIX_ALIAS] = device |
| self.add_serial_number( |
| servo_dev_templates.MAIN_DEV_PREFIX_ALIAS, device._serial |
| ) |
| |
| def reinitialize(self): |
| """Reinitialize all devices that support reinitialization""" |
| for device in self.get_devices(): |
| try: |
| device.reinitialize() |
| except usb_hierarchy.HierarchyError as e: |
| if not device.disconnect_is_ok(): |
| raise |
| self._logger.info( |
| "Ignoring failed re-initialization of device that " |
| "is ok to be disconnected. %s error(%s).", |
| device, |
| e, |
| ) |
| |
| def close(self): |
| """Servod turn down logic.""" |
| for dev in self.get_devices(): |
| dev.close() |
| |
| def get_devices(self): |
| """Get all devices connected to this servod instance.""" |
| return list(self._unique_devices.values()) |
| |
| @staticmethod |
| def _get_control_prefix_and_name(name): |
| """Return (prefix, name) tuple from name passed into servod. |
| |
| Args: |
| name: name passed into any of the RPCs |
| |
| Returns: |
| (prefix, name) tuple, where |
| prefix is anything before the '.' |
| name anything after it |
| If there is no '.' in |name| return ('', name) |
| Raises: |
| ServodError: if there are more than one PREFIX_DELIMITER in |name|922gg |
| """ |
| if Servod.PREFIX_DELIMITER not in name: |
| return ("", name) |
| parts = name.split(Servod.PREFIX_DELIMITER) |
| if len(parts) > 2: |
| # Returned early if no |PREFIX_DELIMITER| in name, therefore after the |
| # split there have to be at least 2 parts here. |
| raise ServodError( |
| "Name %r is malformed: at most one %r is allowed." |
| % (name, Servod.PREFIX_DELIMITER) |
| ) |
| return tuple(parts) |
| |
| def _is_main_dev_prefix(self, prefix): |
| """Return whether |prefix| is the main device prefix. |
| |
| Args: |
| prefix: prefix to query |
| |
| Returns: |
| True, if the prefix has a main device prefix |
| """ |
| return prefix in servo_dev_templates.MAIN_DEV_PREFIXES |
| |
| def _get_dev_and_name(self, name): |
| """Return (dev, name) tuple after processing the name's prefix. |
| |
| Args: |
| name: control name to get dev for |
| |
| Returns: |
| (dev, name) tuple where |
| dev is the ServoDevice instance to handle |name| |
| name is the name after removing the prefix, if there was one |
| Raises: |
| ServodError: if no ServoDev can be found to handle |name| |
| NameError: if |name| not a known control on its servo dev. |
| """ |
| prefix, processed_name = Servod._get_control_prefix_and_name(name) |
| if prefix not in self._devices: |
| error_msg = ( |
| "No control named '%s' registered. " |
| "No servo device registered for prefix %s." |
| ) % (name, prefix) |
| raise ServodError(error_msg) |
| dev = self._devices[prefix] |
| |
| # Controls routed to main that are not covered by main are covered by their |
| # root hub device. |
| if not dev.syscfg.is_control(processed_name): |
| if self._is_main_dev_prefix(prefix) and self.get_root_device() is not None: |
| dev = self.get_root_device() |
| |
| if not dev.syscfg.is_control(processed_name): |
| error_msg = ( |
| "No control named '%s' registered with any connected servo device.\n" |
| "Servo device %s (prefix: %s) is picked as the target device for " |
| "the control.\n" |
| ) % (name, dev, dev.get_prefixes()) |
| candidates = [ctrl for ctrl in self._controls if name in ctrl] |
| if candidates: |
| error_msg += "Do you mean %s?\n" % candidates |
| error_msg += ( |
| "You can check all servod controls with 'dut-control -- all_controls'." |
| ) |
| raise ServodError(error_msg) |
| |
| self._logger.debug("Using servo device %s for control %s.", dev, name) |
| return (dev, processed_name) |
| |
| def hwinit(self, verbose=True, step_init=False): |
| """Initialize controls for servo devices.""" |
| for servo_device in self.get_devices(): |
| skip_controls = set() |
| for dev in servo_device.get_child_devices(): |
| skip_controls.update( |
| set(control_name for control_name, _unused in dev.syscfg.hwinit) |
| ) |
| servo_device.hwinit( |
| verbose=verbose, skip_controls=skip_controls, step_init=step_init |
| ) |
| |
| # Autotest directly uses this method, so we have to return True |
| # TODO(crbug.com/841097) Figure out why despite allow_none=True for both |
| # xmlrpc server & client I still have to return something to appease the |
| # marshall/unmarshall |
| return True |
| |
| def get(self, name): |
| """Get control value. |
| |
| Args: |
| name: name string of control |
| |
| Returns: |
| Response from calling drv get method. Value is reformatted based on |
| control's dictionary parameters |
| |
| Raises: |
| HwDriverError: Error occurred while using drv |
| ServodError: if interfaces are not available within timeout period |
| """ |
| if name.endswith("_serialname"): |
| # This route is to retrieve serialnames on servo v4, which |
| # connects to multiple servo-micros or CCD, like the controls, |
| # 'ccd_serialname', 'servo_micro_serialname', etc. |
| return self.get_legacy_serial_number(name) |
| |
| dev, name = self._get_dev_and_name(name) |
| return dev.get(name) |
| |
| def get_legacy_serial_number(self, control_name): |
| """Returns the desired serial number of a device. |
| |
| This is a method to support the legacy controls of <...>_serialname. |
| Consider removing it if all clients move away from this pattern. |
| |
| Args: |
| control_name: Name of the control. |
| |
| Returns: |
| A string containing the serial number or "unknown". |
| """ |
| # Remove the prefix from the serialname control. Serialnames are |
| # universal. It doesn't matter what the prefix is. |
| control_name = control_name.split(".", 1)[-1] |
| |
| suffix = "_serialname" |
| dev_type = control_name[: -len(suffix)] |
| board_model = "" |
| main_dev = self.get_main_device() |
| |
| if not dev_type: |
| return main_dev._serial |
| |
| if "_for_" in control_name: |
| board_model = dev_type.split("_for_")[1] |
| dev_type = dev_type.split("_for_")[0] |
| |
| candidates = set() |
| for _unused, dev in self._unique_devices.items(): |
| if dev_type not in dev.template.TYPE: |
| continue |
| if not board_model: |
| candidates.add(dev) |
| elif board_model in [dev.board, dev.model, main_dev.board, main_dev.model]: |
| candidates.add(dev) |
| |
| if len(candidates) == 0: |
| self._logger.info("'%s' not found!", control_name) |
| return "unknown" |
| if len(candidates) == 1: |
| return candidates.pop()._serial |
| self._logger.info( |
| "'%s' is ambiguous as there are multiple matching devices %s", |
| control_name, |
| candidates, |
| ) |
| return "unknown" |
| |
| def set(self, name, wr_val_str): |
| """Set control on servo device. |
| |
| Args: |
| name: name string of control |
| wr_val_str: value string to write. Can be integer, float or a |
| alpha-numerical that is mapped to a integer or float. |
| |
| Returns: |
| True, coming from the device's set call, to appease xmlrpc |
| |
| Raises: |
| HwDriverError: Error occurred while using driver |
| ServodError: if interfaces are not available within timeout period |
| """ |
| dev, name = self._get_dev_and_name(name) |
| return dev.set(name, wr_val_str) |
| |
| def update_known_ctrls(self): |
| """Helper to generate a list of all accessible controls in servod.""" |
| known_ctrls = set() |
| for prefix, dev in self._devices.items(): |
| dev_ctrls = dev.syscfg.get_all_controls() |
| # controls for root and main dev does not need to have prefixes |
| new_ctrls = ( |
| set("%s.%s" % (prefix, ctrl) for ctrl in dev_ctrls) |
| if prefix |
| else dev_ctrls |
| ) |
| if prefix == servo_dev_templates.ROOT_DEV_PREFIX: |
| new_ctrls |= dev_ctrls |
| known_ctrls |= new_ctrls |
| self._controls = sorted(list(known_ctrls)) |
| |
| def has_control(self, control): |
| """Returns True if control is available in servod.""" |
| return control in self._controls |
| |
| def doc_all(self): |
| """Return all documentation for controls. |
| |
| Returns: |
| string of <doc> text in config file (xml) and the params dictionary for |
| all controls. |
| |
| For example: |
| warm_reset :: Reset the device warmly |
| ------------------------> {'interface': '1', 'map': 'onoff_i', ... } |
| """ |
| self.update_known_ctrls() |
| rsp = [] |
| for name in self._controls: |
| dev, control = self._get_dev_and_name(name) |
| rsp.append(dev.syscfg.get_control_str(control)) |
| self._logger.debug("rsp %s", rsp) |
| return "\n".join(rsp) |
| |
| def doc(self, name): |
| """Retrieve doc string in system config file for given control name. |
| |
| Args: |
| name: name string of control to get doc string |
| |
| Returns: |
| doc string of name |
| |
| Raises: |
| NameError: if fails to locate control |
| """ |
| dev, control = self._get_dev_and_name(name) |
| return dev.doc(control) |
| |
| def set_get_all(self, cmds): |
| """Set &| get one or more control values. |
| |
| Args: |
| cmds: list of control[:value] to get or set. |
| |
| Returns: |
| rv: list of responses from calling get or set methods. |
| """ |
| rv = [] |
| for cmd in cmds: |
| if ":" in cmd: |
| (control, value) = cmd.split(":", 1) |
| rv.append(self.set(control, value)) |
| else: |
| rv.append(self.get(cmd)) |
| return rv |
| |
| def get_all(self, verbose): |
| """Get all controls values. |
| |
| Args: |
| verbose: Boolean on whether to return doc info as well |
| |
| Returns: |
| string creating from trying to get all values of all controls. In case of |
| error attempting access to control, response is 'ERR'. |
| """ |
| self.update_known_ctrls() |
| rsp = [] |
| for name in self._controls: |
| # avoid repeating the controls starting with 'main.' and 'root.' |
| if name.startswith( |
| "%s." % servo_dev_templates.MAIN_DEV_PREFIX |
| ) or name.startswith("%s." % servo_dev_templates.ROOT_DEV_PREFIX): |
| continue |
| self._logger.debug("name = %s", name) |
| try: |
| value = self.get(name) |
| except Exception: |
| value = "ERR" |
| if verbose: |
| rsp.append("GET %s = %s :: %s" % (name, value, self.doc(name))) |
| else: |
| rsp.append("%s:%s" % (name, value)) |
| return "\n".join(sorted(rsp)) |
| |
| def echo(self, echo): |
| """Mock echo function for testing/examples. |
| |
| Args: |
| echo: string to echo back to client |
| """ |
| self._logger.debug("echo(%s)", echo) |
| return "ECH0ING: %s" % (echo) |
| |
| def get_board(self): |
| """Returns the board specified for the main device. |
| |
| Returns: |
| A string of the board name, or '' if not present. |
| """ |
| return self.get_main_device().board |
| |
| def get_base_board(self): |
| """Returns the board probed from EC in case the main device is a dut controller. |
| |
| Returns: |
| A string of the board name, or '' if not present. |
| """ |
| return self.get_main_device().base_board |
| |
| def get_servo_serials(self): |
| """Return all the serials associated with this process. |
| |
| This gets passed directly to the xml rpc response to xmlrpc client request |
| get_servo_serials(). It needs to be in a basic type for Python to |
| correctly marshal. See b/279006079 |
| |
| Returns: |
| {str: str} - Dict of control prefix mapped to servo serial number. |
| Multiple prefixes may be mapped to the same serial number. |
| |
| Each call to this function returns a new dict. The returned dict may be |
| mutated without affecting any other state. |
| """ |
| return dict(self._serialnames) |
| |
| def add_serial_number(self, key, serial_number): |
| """Adds the serial number to the _serialnames dictionary. |
| |
| Args: |
| key: A string which is the key into the _serialnames dictionary. |
| serial_number: A string which is the key into the _serialnames dictionary. |
| """ |
| self._serialnames[key] = serial_number |
| self._logger.debug("Added %s %s to serialnames.", key, serial_number) |
| |
| def get_main_device(self): |
| """Gets the main servo device.""" |
| return self._devices[servo_dev_templates.MAIN_DEV_PREFIX] |
| |
| def get_root_device(self): |
| """Gets the root servo device.""" |
| if servo_dev_templates.ROOT_DEV_PREFIX not in self._devices: |
| return None |
| return self._devices[servo_dev_templates.ROOT_DEV_PREFIX] |
| |
| def get_controls_for_tag(self, tag): |
| """Get list of controls for a given tag. |
| |
| Args: |
| tag: str, tag to query |
| |
| Returns: |
| list of controls with that tag, or an empty list if no such tag, or |
| controls under that tag |
| """ |
| controls = set() |
| no_prefix_devs = [self.get_main_device(), self.get_root_device()] |
| for prefix, dev in self._devices.items(): |
| # controls for root and main dev does not need to have prefixes |
| no_prefix = dev in no_prefix_devs |
| for dev_ctrl in dev.syscfg.get_controls_for_tag(tag): |
| controls.add(dev_ctrl if no_prefix else "%s.%s" % (prefix, dev_ctrl)) |
| return sorted(list(controls)) |
| |
| def get_config_files(self): |
| """Gets the configuration files used for this servo server invocation""" |
| config_files = {} |
| for dev in self.get_devices(): |
| xml_files = dev.syscfg._loaded_xml_files |
| # See system_config.py for schema, but entry[0] is the file name |
| config_files[str(dev)] = [entry[0] for entry in xml_files] |
| return config_files |
| |
| def get_interface_list(self): |
| interfaces = [] |
| for dev in self.get_devices(): |
| for interface in dev.get_interface_list(): |
| interfaces += [(str(dev), interface)] |
| return interfaces |
| |
| def is_flex_board(self): |
| """Returns true if servod is run with a flex board""" |
| if not self.has_control(self._IS_FLEX_CTRL): |
| return False |
| is_flex_board = self.get(self._IS_FLEX_CTRL) |
| if is_flex_board == "no": |
| return False |
| if is_flex_board == "yes": |
| return True |
| raise ServodError( |
| "Control %r has invalid value %r." % (self._IS_FLEX_CTRL, is_flex_board) |
| ) |
| |
| def validate_dut_controller(self): |
| """Validate the servod instance has at least 1 dut controller.""" |
| for dev in self.get_devices(): |
| if dev.template.DUT_CONTROLLER: |
| return |
| |
| if self.is_flex_board(): |
| self._logger.info("Flex board doesn't use DUT controller, skip check.") |
| return |
| |
| # Start diagnosing why servod does not have DUT controller. |
| # Fail if we requested board control but don't have an interface for this. |
| if self.get_board(): |
| if self.get("dut_connection_type") == "type-c": |
| faults = diagnose.diagnose_ccd(self.get_main_device()) |
| if diagnose.SBU_VOLTAGE_FLOAT in faults: |
| self.set("dut_sbu_voltage_float_fault", "on") |
| # No need to check for the LOW voltage signal here as the fault |
| # is valid for both ccd and for servo micro: a controller is missing |
| self.set("dut_controller_missing_fault", "on") |
| |
| self._logger.error( |
| "No Servo Micro, C2D2, or CCD detected for board %s", self.get_board() |
| ) |
| self._logger.error( |
| "Try flipping the USB type C cable if you were using " |
| "servo v4 type C." |
| ) |
| self._logger.error( |
| "If flipping the cable allows CCD, please file a bug " |
| "against the DUT platform with reproducing details." |
| ) |
| |
| dut_controller_tolerant = recovery.is_recovery_active() |
| if dut_controller_tolerant: |
| self._logger.info( |
| "Will continue startup as recovery mode has been requested" |
| ) |
| else: |
| self._logger.fatal( |
| "No device interface (Servo Micro, C2D2, or CCD) connected." |
| ) |
| sys.exit(-1) |
| |
| def get_version(self): |
| """Gets the type of the servo device setups. |
| |
| DEPRECATED. External clients (e.g. autotest) should not directly call this |
| method of servo_server as it is an implementation detail. They should migrate |
| to using 'servo_type' control. |
| |
| TODO(konmari): remove this public method after all clients move away from |
| directly calling methods. |
| """ |
| return self._get_version() |
| |
| def _get_version(self): |
| """Gets the type of the servo device setups. |
| |
| NOTE: please avoid assuming the format of servo type string and parsing it. |
| Use 'devices' control to fetch all servo devices of this servod instance |
| instead. |
| """ |
| main_device = self.get_main_device() |
| root_device = self.get_root_device() |
| type_ = main_device.template.TYPE |
| if root_device and root_device is not main_device: |
| type_ = root_device.template.TYPE + "_with_" + type_ |
| for dev in root_device.get_child_devices(): |
| if dev.template.DUT_CONTROLLER and dev != main_device: |
| type_ += "_and_" + dev.template.TYPE |
| return type_ |
| |
| def servod_version(self): |
| return sversion_util.extended_version() |