| # Copyright 2019 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Servod device used by the server and watchdog.""" |
| |
| import json |
| import logging |
| import os |
| import sys |
| import termios |
| import threading |
| import time |
| import tty |
| |
| from servo import drv as servo_drv |
| from servo import interface as _interface |
| from servo import servo_dev_templates |
| from servo import servo_interfaces |
| from servo import servo_logging |
| from servo.utils import string_utils |
| import servo.utils.usb_hierarchy as usb_hierarchy |
| |
| |
| HwDriverError = servo_drv.hw_driver.HwDriverError |
| |
| |
| def _YesNoInput(message): |
| """Prompt for y/n character input. |
| |
| The y/n question will be repeated after any character input that is not |
| y/Y/n/N, until one of those characters is received. |
| |
| Args: |
| message: str or bytes - The prompt message to print, usually with a |
| trailing whitespace character. "[y/n] " will be appended |
| automatically. |
| |
| Returns: bool - True if user typed y or Y, False if they typed n or N. |
| """ |
| sys.stdout.write(message) |
| while True: |
| stdin_fd = sys.stdin.fileno() |
| stdin_termios = termios.tcgetattr(stdin_fd) |
| try: |
| # If stdin is a terminal (which it should be when this function is |
| # used) it is almost certainly line buffered. Temporarily set it to |
| # the "raw" mode termios settings so any character the user types is |
| # sent immediately. |
| # |
| # This way we can respond to Y and N keys without making the user |
| # type Y/N + Enter for each prompt. |
| # |
| # TODO(b/316639135): Instead of using the predefined "raw" mode, |
| # minimally modify the termios settings to disable line buffering |
| # while leaving other settings unchanged, especially input echo and |
| # SIGINT / SIGQUIT from keyboard. Then no need to write the |
| # character back to stdout, nor any need for the non-printable |
| # character sleep(). |
| tty.setraw(stdin_fd) |
| sys.stdout.write("[y/n] ") |
| sys.stdout.flush() |
| onechar = sys.stdin.read(1).lower() |
| if onechar and onechar.isprintable(): |
| sys.stdout.write(onechar) |
| sys.stdout.write("\n") |
| sys.stdout.flush() |
| if onechar in ("y", "Y"): |
| return True |
| if onechar in ("n", "N"): |
| return False |
| finally: |
| termios.tcsetattr(stdin_fd, termios.TCSADRAIN, stdin_termios) |
| if not (onechar and onechar.isprintable()): |
| # Make sure that if the user is mashing ctrl+c or ctrl+\ it gets |
| # through before the next loop iteration. |
| time.sleep(0.5) |
| |
| |
| class ServoDeviceError(Exception): |
| """General servo device error class.""" |
| |
| |
| class ServoDevice: |
| """Device class that each corresponds to a physical servo device.""" |
| |
| # Reinit capable devices. |
| REINIT_CAPABLE = set( |
| [ |
| servo_dev_templates.GetID("ccd_cr50"), |
| servo_dev_templates.GetID("ccd_gsc"), |
| servo_dev_templates.GetID("ccd_gsc_nt"), |
| ] |
| ) |
| |
| # Available attempts to reconnect a device |
| REINIT_ATTEMPTS = 100 |
| |
| # Exceptions to count as known or ordinary. Any errors that aren't instances |
| # of these (or their subclasses) will be logged with "Please take a look." |
| KNOWN_EXCEPTIONS = (AttributeError, NameError, HwDriverError) |
| |
| # Timeout to wait for interfaces to become available again if reinitialization |
| # is taking place. In seconds. This is supposed to recover from brief resets. |
| # If the interface disappears for more than 5 seconds, then someone probably |
| # intentionally disconnected the device. Servod shouldn't be responsible for |
| # waiting for the device during an intentional disconnect. |
| INTERFACE_AVAILABILITY_TIMEOUT = 5 |
| |
| def __init__(self, dev_entry, config, interfaces=None, servod=None): |
| """ServoDevice constructor. |
| |
| Args: |
| dev_entry: ServoDeviceEntry that holds USB, device hierarchy, and devopts |
| information for this servo device. |
| config: instance of SystemConfig containing all controls for |
| particular Servod invocation |
| interfaces: list of strings of interface types the server will instantiate |
| servod: a pointer to access servod to invoke controls targeted at the servod |
| daemon and other devices. |
| |
| Raises: |
| ServoDeviceError: if unable to locate init method for particular interface |
| """ |
| self.template = dev_entry.dev_template |
| self.prefixes = dev_entry.devopts.prefix |
| logger_prefix = self.prefixes[-1] if self.prefixes else "" |
| self._logger = logging.getLogger( |
| "ServoDevice %s - %s" % (self.template.TYPE, logger_prefix) |
| ) |
| vendor = self.template.VID |
| product = self.template.PID |
| self._serial = dev_entry.serial |
| board = dev_entry.devopts.board |
| self.board = board |
| self.base_board = "" |
| self.model = dev_entry.devopts.model |
| if self.model: |
| self.board += "_" + self.model |
| self._ifaces_available = threading.Event() |
| self.connect() |
| self._reinit_capable = (vendor, product) in self.REINIT_CAPABLE |
| self._disconnect_ok = False |
| self._sysfs_path = dev_entry.dev_path |
| # Associate the dev entry with a device |
| self.dev_entry = dev_entry |
| dev_entry.servo_device = self |
| |
| self.syscfg = config |
| # Dict of Dict to map control name, function name to to tuple (params, drv) |
| # Ex) _drv_dict[name]['get'] = (params, drv) |
| self._drv_dict = {} |
| |
| if interfaces: |
| self._manual_interfaces = True |
| self._interfaces = interfaces |
| else: |
| self._manual_interfaces = False |
| self._interfaces = servo_interfaces.INTERFACE_DEFAULTS[vendor][product] |
| # list of objects (Fi2c, Fgpio) to physical interfaces (gpio, i2c) that ftdi |
| # interfaces are mapped to |
| self._interface_list = [] |
| # Whether an interface has initialized to be the proper interface |
| self._interface_init = [] |
| self._sync_interface_lists() |
| self._servod = servod |
| self._token_db = dev_entry.devopts.token_db |
| |
| def __repr__(self): |
| return str(self) |
| |
| def __str__(self): |
| return "%s (%04x:%04x) %s" % ( |
| self.template.TYPE, |
| self.template.VID, |
| self.template.PID, |
| self._serial, |
| ) |
| |
| def wait(self, wait_time): |
| """Wait for the device to reconnect and the interfaces to become available. |
| |
| Args: |
| wait_time: time to wait in seconds |
| |
| Raises: |
| ServoDeviceError: if the interfaces aren't available within timeout period |
| """ |
| if not self._ifaces_available.wait(wait_time): |
| raise ServoDeviceError( |
| "Timed out waiting for interfaces to become available." |
| ) |
| |
| def connect(self): |
| """The device connected.""" |
| # Mark that the interfaces are available. |
| self._ifaces_available.set() |
| self._reinit_attempts = self.REINIT_ATTEMPTS |
| |
| def disconnect(self): |
| """The device disconnected.""" |
| # Mark that the interfaces are unavailable. |
| self._ifaces_available.clear() |
| |
| # If it's ok for the device to disconnect, allow it to stay disconnected |
| # indefinitely. |
| if self._disconnect_ok: |
| return |
| |
| self._reinit_attempts -= 1 |
| self._logger.debug("%d reinit attempts remaining.", self._reinit_attempts) |
| |
| def reinit_ok(self): |
| """Check whether reinit is okay.""" |
| return self._reinit_capable and (self._reinit_attempts > 0) |
| |
| def get_id(self): |
| """Return a tuple of the device information.""" |
| return self.template.VID, self.template.PID, self._serial |
| |
| def is_connected(self): |
| """Returns True if the device is connected.""" |
| return os.path.exists(self._sysfs_path) |
| |
| def get_prefixes(self): |
| """Get all prefixes of the device.""" |
| return self.prefixes |
| |
| def add_prefix(self, alias): |
| """Add a prefix for the device.""" |
| if alias not in self.prefixes: |
| self.prefixes += [alias] |
| |
| def set_disconnect_ok(self, disconnect_ok): |
| """Set if it's ok for the device to disconnect. |
| |
| Don't decrease the reinit_attempts count if this is True. The device can |
| be disconnected forever as long as disconnect is ok. |
| |
| Args: |
| disconnect_ok: True if it's ok for the device to disconnect. |
| """ |
| self._disconnect_ok = disconnect_ok |
| self._reinit_attempts = self.REINIT_ATTEMPTS |
| |
| def disconnect_is_ok(self): |
| """Returns True if it's ok for the device to disconnect.""" |
| return self._disconnect_ok |
| |
| def usb_devnum(self): |
| """Return the current usb devnum.""" |
| return usb_hierarchy.Hierarchy.DevNumFromSysfs(self._sysfs_path) |
| |
| def get_interface_list(self): |
| """Return interface_list.""" |
| return self._interface_list |
| |
| def init_servo_interfaces(self, fault_tolerant=False): |
| """Init the servo interfaces with the given interfaces. |
| |
| Args: |
| fault_tolerant: If True, initialization error on an interface is logged but |
| not result in an exception. If false, initialization error on |
| any interface leads to an exception. |
| |
| Raises: |
| ServoDeviceError: if unable to locate init method for particular interface. |
| """ |
| self.clear_cached_drv() |
| for i, interface_data in enumerate(self._interfaces): |
| if self._interface_init[i]: |
| # Ensure initialized interfaces are not reinitialized |
| continue |
| if isinstance(interface_data, dict): |
| name = interface_data["name"] |
| # Store interface index for those that care about it. |
| interface_data["index"] = i |
| elif isinstance(interface_data, str): |
| if interface_data in ["empty", "ftdi_empty"]: |
| # 'empty' reserves the interface for future use. Typically the |
| # interface will be managed by external third-party tools like |
| # openOCD for JTAG or flashrom for SPI. In the case of servo V4, |
| # it serves as a placeholder for servo micro interfaces. |
| continue |
| name = interface_data |
| else: |
| raise ServoDeviceError( |
| "Illegal interface data type %s" % type(interface_data) |
| ) |
| |
| self._logger.info("Initializing interface %d to %s", i, name) |
| try: |
| result = _interface.Build( |
| name=name, |
| index=i, |
| vid=self.template.VID, |
| pid=self.template.PID, |
| sid=self._serial, |
| interface_data=interface_data, |
| servo_device=self, |
| token_db=self._token_db, |
| ) |
| except Exception: |
| if fault_tolerant: |
| self._logger.warning( |
| "Failure trying to initialize interface %s (%s) " |
| "in fault tolerant mode, so this will not crash servod.", |
| i, |
| name, |
| ) |
| continue |
| raise |
| self._interface_list[i] = result |
| self._interface_init[i] = True |
| |
| def set_board_and_model(self, board, model=None): |
| """Set the board and model (if applicable) for this servo device. |
| |
| (1) check if interfaces need to change due to board |
| (2) set board/model attributes |
| (3) pull in board/model config file |
| |
| Args: |
| board: board name |
| model: model name |
| |
| Returns: |
| True if configuration file found for board/model False otherwise |
| """ |
| if not self._manual_interfaces: |
| # Only if interfaces were determined, and not set manually, try to |
| # get new interfaces from the board, otherwise, leave them be. |
| try: |
| interfaces = servo_interfaces.INTERFACE_BOARDS[board][ |
| self.template.VID |
| ][self.template.PID] |
| for i, interface_data in enumerate(interfaces): |
| if self._interfaces[i] != interface_data: |
| # If an interface is overwritten ensure that it's marked as not |
| # initialized regardless of previous status. |
| self._interface_init[i] = False |
| self._interfaces[i] = interface_data |
| self._sync_interface_lists() |
| except KeyError: |
| # Likely adding a new board does not change interfaces. This is not |
| # a fatal error. |
| self._logger.debug( |
| "Cannot find interfaces for board %s." |
| " Skip resetting the interfaces", |
| board, |
| ) |
| cfg, board_id = self.syscfg.get_board_model_config(board, model) |
| if cfg: |
| self.syscfg.set_board_cfg(cfg) |
| # |board_id| might include the |model| or not depending on whether it was used |
| # to determine the board config. |
| self.board = board_id |
| if model and board_id and board_id.endswith(model): |
| self.model = model |
| if cfg: |
| self.syscfg.add_cfg_file(self.prefixes[0], cfg) |
| return True |
| return False |
| |
| def _sync_interface_lists(self): |
| """Ensure when interfaces are changed, bookkeeping is kept in sync.""" |
| # Extend the interface list if we need to. |
| interfaces_len = len(self._interfaces) |
| interface_list_len = len(self._interface_list) |
| if interfaces_len > interface_list_len: |
| self._interface_list += [_interface.empty.Empty()] * ( |
| interfaces_len - interface_list_len |
| ) |
| self._interface_init += [False] * (interfaces_len - interface_list_len) |
| |
| def set_base_board(self, board): |
| """Set the board probed from ec. |
| |
| Args: |
| board: the board probed from ec (assuming this device is a dut controller) |
| """ |
| self.base_board = board |
| |
| def reinitialize(self): |
| """Reinitialize all interfaces that support reinitialization""" |
| for _unused, interface in enumerate(self._interface_list): |
| interface.reinitialize() |
| # Indicate interfaces are safe to use again. |
| self.connect() |
| |
| def close(self): |
| """Servo device turn down logic.""" |
| # Close ec3po interfaces first to remove all wrappers/pointers on the raw pty |
| for i, interface in enumerate(self._interface_list): |
| if isinstance(interface, _interface.ec3po_interface.EC3PO): |
| self._logger.info("Turning down interface %d", i) |
| interface.close() |
| |
| # Close all the other non-placeholder interfaces |
| for i, interface in enumerate(self._interface_list): |
| if not isinstance(interface, _interface.empty.Empty) and not isinstance( |
| interface, _interface.ec3po_interface.EC3PO |
| ): |
| # Only print this on real interfaces and not place holders. |
| self._logger.info("Turning down interface %d", i) |
| interface.close() |
| |
| def get(self, name): |
| """Get control value. |
| |
| Args: |
| name: name string of control |
| |
| Returns: |
| Response from calling drv get method. Value is reformatted based on |
| control's dictionary parameters |
| |
| Raises: |
| HwDriverError: Error occurred while using drv |
| ServoDeviceError: if interfaces are not available within timeout period |
| """ |
| with servo_logging.WrapGetCall( |
| name, known_exceptions=self.KNOWN_EXCEPTIONS |
| ) as wrapper: |
| (params, drv, device) = self._get_param_drv(name) |
| if device in self._servod._devices: |
| self._servod._devices[device].wait(self.INTERFACE_AVAILABILITY_TIMEOUT) |
| |
| val = drv.get() |
| rd_val = self.syscfg.reformat_val(params, val) |
| wrapper.got_result(rd_val) |
| return rd_val |
| |
| def set(self, name, wr_val_str): |
| """Set control. |
| |
| Args: |
| name: name string of control |
| wr_val_str: value string to write. Can be integer, float or a |
| alpha-numerical that is mapped to a integer or float. |
| |
| Raises: |
| HwDriverError: Error occurred while using driver |
| ServoDeviceError: if interfaces are not available within timeout period |
| """ |
| with servo_logging.WrapSetCall( |
| name, wr_val_str, known_exceptions=self.KNOWN_EXCEPTIONS |
| ): |
| (params, drv, device) = self._get_param_drv(name, False) |
| if device in self._servod._devices: |
| self._servod._devices[device].wait(self.INTERFACE_AVAILABILITY_TIMEOUT) |
| wr_val = self.syscfg.resolve_val(params, wr_val_str) |
| |
| drv.set(wr_val) |
| |
| # TODO(crbug.com/841097) Figure out why despite allow_none=True for both |
| # xmlrpc server & client I still have to return something to appease the |
| # marshall/unmarshall |
| return True |
| |
| def _get_param_drv(self, control_name, is_get=True): |
| """Get access to driver for a given control. |
| |
| Note, some controls have different parameter dictionaries for 'getting' the |
| control's value versus 'setting' it. Boolean is_get distinguishes which is |
| being requested. |
| |
| Args: |
| control_name: string name of control |
| is_get: boolean to determine |
| |
| Returns: |
| tuple (params, drv, device_info) where: |
| params: param dictionary for control |
| drv: instance object of driver for particular control |
| device_info: servo device information |
| |
| Raises: |
| ServoDeviceError: Error occurred while examining params dict |
| """ |
| # if already setup just return tuple from driver dict |
| if control_name in self._drv_dict: |
| if is_get and ("get" in self._drv_dict[control_name]): |
| return self._drv_dict[control_name]["get"] |
| if not is_get and ("set" in self._drv_dict[control_name]): |
| return self._drv_dict[control_name]["set"] |
| |
| self._logger.debug( |
| "Did not find cached drvs for %r. Will generate both set and get drvs.", |
| control_name, |
| ) |
| |
| set_params, get_params = self.syscfg.lookup_control_params(control_name) |
| |
| for params in [get_params, set_params]: |
| # |cmd| is guaranteed to be in each params. |
| mode = params["cmd"] |
| drv_prefix = params.get("drv") |
| if drv_prefix == "na": |
| # 'na' drv can be used to selectively turn controls into noops for |
| # a given servo hardware. Ensure that there is an interface. |
| params.setdefault("interface", "servo") |
| self._logger.debug( |
| "Setting interface to default to %r for %r unless " |
| " defined in params, as drv is %r.", |
| "servo", |
| control_name, |
| "na", |
| ) |
| # Setting input_type to str allows all inputs through enabling a true |
| # noop |
| params.update({"input_type": "str"}) |
| |
| interface_id = params.get("interface") |
| if None in [drv_prefix, interface_id]: |
| raise ServoDeviceError( |
| "No drv/interface for control %r found" % control_name |
| ) |
| # Store map params in params |
| map_name = params.get("map") |
| if map_name is not None: |
| map_params = self.syscfg.lookup_map_params(map_name) |
| params["map_params"] = map_params |
| |
| # Store this device name in params (necessary to scope control names when |
| # querying controls from a non-main servo device) |
| # TODO(b/275723447): remove this parameter once prefix string is no longer |
| # necessary in drivers |
| params["device_type"] = self.template.TYPE |
| |
| # this control only needs cross-servo-device communication and does not |
| # need hardware interface for low-level communication |
| if interface_id == "servo": |
| interface = None |
| servod = self._servod |
| # this control only needs hardware interface for low-level communication |
| # and does not need cross-servo-device communication |
| else: |
| index = int(interface_id) |
| interface = self._interface_list[index] |
| servod = None |
| |
| device_info = None |
| if hasattr(interface, "get_device_info"): |
| device_info = interface.get_device_info() |
| drv_module = getattr(servo_drv, drv_prefix) |
| drv_class = getattr(drv_module, string_utils.snake_to_camel(drv_prefix)) |
| drv = ( |
| drv_class(interface, params, servod) |
| if servod |
| else drv_class(interface, params) |
| ) |
| if control_name not in self._drv_dict: |
| self._drv_dict[control_name] = {} |
| # Store the information in the right mode. |
| self._drv_dict[control_name][mode] = (params, drv, device_info) |
| # At this point, both 'set' and 'get' have been generated. The last thing |
| # left to do is to pass each one of them a weak reference to the other. |
| # This ensures that if a control needs to do read/modify/write for |
| # instance it can do so without much overhead. |
| _unused, set_drv, _unused = self._drv_dict[control_name]["set"] |
| _unused, get_drv, _unused = self._drv_dict[control_name]["get"] |
| set_drv.set_complement(get_drv) |
| # Run the method again, as it will find the entries now in the cache. |
| return self._get_param_drv(control_name, is_get) |
| |
| def clear_cached_drv(self): |
| """Clear the cached drivers. |
| |
| The drivers are cached in the Dict _drv_dict when a control is got or set. |
| When the servo interfaces are relocated, the cached values may become wrong. |
| Should call this method to clear the cached values. |
| """ |
| self._drv_dict = {} |
| |
| def doc_all(self): |
| """Return all documentations for controls. |
| |
| Returns: |
| string of <doc> text in config file (xml) and the params dictionary for |
| all controls. |
| |
| For example: |
| warm_reset :: Reset the device warmly |
| ------------------------> {'interface': '1', 'map': 'onoff_i', ... } |
| """ |
| return self.syscfg.display_config() |
| |
| def doc(self, name): |
| """Retrieve doc string in system config file for given control name. |
| |
| Args: |
| name: name string of control to get doc string |
| |
| Returns: |
| doc string of name |
| |
| Raises: |
| NameError: if fails to locate control |
| """ |
| self._logger.debug("name(%s)", name) |
| if self.syscfg.is_control(name): |
| return self.syscfg.get_control_docstring(name) |
| raise NameError("No control %s" % name) |
| |
| def hwinit(self, verbose, skip_controls, step_init=False): |
| """Initialize all controls. |
| |
| These values are part of the system config XML files of the form |
| init=<value>. This command should be used by clients wishing to return the |
| servo and DUT its connected to a known good/safe state. |
| |
| Note that initialization errors are ignored (as in some cases they could |
| be caused by DUT firmware deficiencies). This might need to be fine tuned |
| later. |
| |
| Args: |
| verbose: boolean, if True prints info about control initialized. |
| Otherwise prints nothing. |
| skip_controls: a list of controls not to hwinit. For a root hub device, |
| if a control is already initialized for its children, do not initialized |
| the control for this device. |
| step_init: bool - if True, interactively prompt y/n whether to |
| initialize this control |
| |
| Returns: |
| This function is called across RPC and as such is expected to return |
| something unless transferring 'none' across is allowed. Hence adding a |
| mock return value to make things simpler. |
| """ |
| for control_name, value in self.syscfg.hwinit: |
| if control_name in skip_controls: |
| self._logger.debug( |
| "Skip initializing control %r because it is already initialized " |
| "for a child device.", |
| control_name, |
| ) |
| continue |
| if step_init and not _YesNoInput( |
| "Initialize control {!r} to value {!r}? ".format(control_name, value) |
| ): |
| self._logger.debug( |
| "Skip initializing control %r because step_init is enabled and " |
| "the user requested to not initialize this control.", |
| control_name, |
| ) |
| continue |
| try: |
| # Workaround for bug chrome-os-partner:42349. Without this check, the |
| # gpio will briefly pulse low if we set it from high to high. |
| if self.get(control_name) != value: |
| self.set(control_name, value) |
| if verbose: |
| self._logger.info("Initialized %s to %s", control_name, value) |
| except Exception as error: |
| self._logger.error("Problem initializing %s -> %s", control_name, value) |
| self._logger.error(str(error)) |
| self._logger.error( |
| "Please consider verifying the logs and if the " |
| "error is not just a setup issue, consider filing " |
| "a bug. Also checkout go/servo-ki." |
| ) |
| |
| # If there is the control of 'active_dut_controller', |
| # set active_dut_controller to the default device as initialization. |
| try: |
| if self.syscfg.is_control("active_dut_controller"): |
| self.set("active_dut_controller", "default") |
| except servo_drv.active_v4_device.activeV4DeviceError as error: |
| self._logger.debug("Could not set active device: %s", str(error)) |
| |
| return True |
| |
| def get_root_hub_device(self): |
| """Get the root hub device of this device, if it has one. |
| |
| This device might hang on another hub device. |
| |
| Returns: |
| The root hub device of this device, or None if it does not have one. |
| """ |
| root_entry = self.dev_entry.cluster_root |
| if root_entry is None: |
| return None |
| return root_entry.servo_device |
| |
| def is_root_hub_device(self): |
| """Check whether this device is a root hub device. |
| |
| Returns: |
| True if this device is a root hub device, false otherwise. |
| """ |
| return self.dev_entry.is_cluster_root() |
| |
| def get_child_devices(self): |
| """Get the devices hanging on this device (directly or indirectly). |
| |
| Returns: |
| A list of ServoDevice that hangs on this device directly or indirectly. |
| If this device is not a root hub device, return an empty list. |
| """ |
| child_devices = [] |
| if not self.is_root_hub_device(): |
| return child_devices |
| for member in self.dev_entry.cluster_members: |
| if member != self.dev_entry and member.servo_device is not None: |
| child_devices.append(member.servo_device) |
| return child_devices |
| |
| def to_json(self): |
| """Serialize this device to a json string.""" |
| root_hub_device = self.get_root_hub_device() |
| data = { |
| "prefix": self.prefixes, |
| "type": self.template.TYPE, |
| "vendor_id": self.template.VID, |
| "product_id": self.template.PID, |
| "serial": self._serial, |
| "sysfs_path": self._sysfs_path, |
| "root_hub_device": str(root_hub_device) if root_hub_device else None, |
| "child_devices": [str(dev) for dev in self.get_child_devices()], |
| } |
| return json.dumps(data, indent=4) |