| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Servo Server.""" |
| import datetime |
| import fcntl |
| import fnmatch |
| import logging |
| import os |
| import re |
| import SimpleXMLRPCServer |
| import threading |
| import time |
| import usb |
| |
| import drv as servo_drv |
| import bbadc |
| import bbi2c |
| import bbgpio |
| import bbuart |
| import ec3po_interface |
| import ftdigpio |
| import ftdii2c |
| import ftdi_common |
| import ftdiuart |
| import i2cbus |
| import keyboard_handlers |
| import servo_interfaces |
| import servo_postinit |
| import stm32gpio |
| import stm32i2c |
| import stm32uart |
| |
| HwDriverError = servo_drv.hw_driver.HwDriverError |
| |
| MAX_I2C_CLOCK_HZ = 100000 |
| |
| |
| class ServodError(Exception): |
| """Exception class for servod.""" |
| |
| |
| class Servod(object): |
| """Main class for Servo debug/controller Daemon.""" |
| |
| # This is the key to get the main serial used in the _serialnames dict. |
| MAIN_SERIAL = 'main' |
| SERVO_MICRO_SERIAL = 'servo_micro' |
| CCD_SERIAL = 'ccd' |
| |
| # Timeout to wait for interfaces to become available again if reinitialization |
| # is taking place. In seconds. |
| INTERFACE_AVAILABILITY_TIMEOUT = 60 |
| |
| def init_servo_interfaces(self, vendor, product, serialname, interfaces): |
| """Init the servo interfaces with the given interfaces. |
| |
| We don't use the self._{vendor,product,serialname} attributes because we |
| want to allow other callers to initialize other interfaces that may not |
| be associated with the initialized attributes (e.g. a servo v4 servod object |
| that wants to also initialize a servo micro interface). |
| |
| Args: |
| vendor: USB vendor id of FTDI device. |
| product: USB product id of FTDI device. |
| serialname: String of device serialname/number as defined in FTDI |
| eeprom. |
| interfaces: List of strings of interface types the server will |
| instantiate. |
| |
| Raises: |
| ServodError if unable to locate init method for particular interface. |
| """ |
| # If it is a new device add it to the list |
| device = (vendor, product, serialname) |
| if device not in self._devices: |
| self._devices.append(device) |
| |
| # Extend the interface list if we need to. |
| interfaces_len = len(interfaces) |
| interface_list_len = len(self._interface_list) |
| if interfaces_len > interface_list_len: |
| self._interface_list += [None] * (interfaces_len - interface_list_len) |
| |
| for i, interface in enumerate(interfaces): |
| is_ftdi_interface = False |
| if type(interface) is dict: |
| name = interface['name'] |
| # Store interface index for those that care about it. |
| interface['index'] = i |
| elif type(interface) is str and interface != 'dummy': |
| name = interface |
| # It's a FTDI related interface. #0 is reserved for no use. |
| interface = ((i - 1) % ftdi_common.MAX_FTDI_INTERFACES_PER_DEVICE) + 1 |
| is_ftdi_interface = True |
| elif type(interface) is str and interface == 'dummy': |
| # 'dummy' reserves the interface for future use. Typically the |
| # interface will be managed by external third-party tools like |
| # openOCD for JTAG or flashrom for SPI. In the case of servo V4, |
| # it serves as a placeholder for servo micro interfaces. |
| continue |
| else: |
| raise ServodError('Illegal interface type %s' % type(interface)) |
| |
| # servos with multiple FTDI are guaranteed to have contiguous USB PIDs |
| product_increment = 0 |
| if is_ftdi_interface: |
| product_increment = (i - 1) / ftdi_common.MAX_FTDI_INTERFACES_PER_DEVICE |
| if product_increment: |
| self._logger.info('Use the next FTDI part @ pid = 0x%04x', |
| product + product_increment) |
| |
| self._logger.info('Initializing interface %d to %s', i, name) |
| try: |
| func = getattr(self, '_init_%s' % name) |
| except AttributeError: |
| raise ServodError('Unable to locate init for interface %s' % name) |
| result = func(vendor, product + product_increment, serialname, interface) |
| |
| if isinstance(result, tuple): |
| result_len = len(result) |
| self._interface_list[i:(i + result_len)] = result |
| else: |
| self._interface_list[i] = result |
| |
| def __init__(self, config, vendor, product, serialname=None, interfaces=None, |
| board='', version=None, usbkm232=None): |
| """Servod constructor. |
| |
| Args: |
| config: instance of SystemConfig containing all controls for |
| particular Servod invocation |
| vendor: usb vendor id of FTDI device |
| product: usb product id of FTDI device |
| serialname: string of device serialname/number as defined in FTDI eeprom. |
| interfaces: list of strings of interface types the server will instantiate |
| version: String. Servo board version. Examples: servo_v1, servo_v2, |
| servo_v2_r0, servo_v3 |
| usbkm232: String. Optional. Path to USB-KM232 device which allow for |
| sending keyboard commands to DUTs that do not have built in |
| keyboards. Used in FAFT tests. Use None for on board AVR MCU. |
| e.g. '/dev/ttyUSB0' or None. |
| |
| Raises: |
| ServodError: if unable to locate init method for particular interface |
| """ |
| self._logger = logging.getLogger('Servod') |
| self._logger.debug('') |
| self._ifaces_available = threading.Event() |
| # Initially interfaces should be available. |
| self._ifaces_available.set() |
| self._vendor = vendor |
| self._product = product |
| self._devices = [] |
| self._serialnames = {self.MAIN_SERIAL: serialname} |
| self._syscfg = config |
| # list of objects (Fi2c, Fgpio) to physical interfaces (gpio, i2c) that ftdi |
| # interfaces are mapped to |
| self._interface_list = [] |
| # Dict of Dict to map control name, function name to to tuple (params, drv) |
| # Ex) _drv_dict[name]['get'] = (params, drv) |
| self._drv_dict = {} |
| self._board = board |
| self._base_board = '' |
| self._version = version |
| self._usbkm232 = usbkm232 |
| if not interfaces: |
| try: |
| interfaces = servo_interfaces.INTERFACE_BOARDS[board][vendor][product] |
| except KeyError: |
| interfaces = servo_interfaces.INTERFACE_DEFAULTS[vendor][product] |
| self._interfaces = interfaces |
| |
| self.init_servo_interfaces(vendor, product, serialname, interfaces) |
| servo_postinit.post_init(self) |
| |
| def reinitialize(self): |
| """Reinitialize all interfaces that support reinitialization""" |
| for i, interface in enumerate(self._interface_list): |
| if hasattr(interface, 'reinitialize'): |
| interface.reinitialize() |
| else: |
| self._logger.debug('interface %d has no reset functionality', i) |
| # Indicate interfaces are safe to use again. |
| self._ifaces_available.set() |
| |
| def get_servo_interfaces(self, position, size): |
| """Get the list of servo interfaces. |
| |
| Args: |
| position: The index the first interface to get. |
| size: The number of the interfaces. |
| """ |
| return self._interface_list[position:(position + size)] |
| |
| def set_servo_interfaces(self, position, interfaces): |
| """Set the list of servo interfaces. |
| |
| Args: |
| position: The index the first interface to set. |
| interfaces: The list of interfaces to set. |
| """ |
| size = len(interfaces) |
| self._interface_list[position:(position + size)] = interfaces |
| |
| def _init_keyboard_handler(self, servo, board=''): |
| """Initialize the correct keyboard handler for board. |
| |
| Args: |
| servo: servo object. |
| board: string, board name. |
| |
| Returns: |
| keyboard handler object, or None if no keyboard supported. |
| """ |
| if board == 'parrot': |
| return keyboard_handlers.ParrotHandler(servo) |
| elif board == 'stout': |
| return keyboard_handlers.StoutHandler(servo) |
| elif board in ('buddy', 'cranky', 'guado', 'jecht', 'mccloud', 'monroe', |
| 'ninja', 'nyan_kitty', 'panther', 'rikku', 'sarien', |
| 'stumpy', 'sumo', 'tidus', 'tricky', 'veyron_fievel', |
| 'veyron_mickey', 'veyron_rialto', 'veyron_tiger', 'zako'): |
| if self._usbkm232 is None: |
| logging.info('No device path specified for usbkm232 handler. Use ' |
| 'the servo atmega chip to handle.') |
| |
| # Use servo onboard keyboard emulator. |
| if not self._syscfg.is_control('atmega_rst'): |
| logging.warn('No atmega in servo board. So no keyboard support.') |
| return None |
| |
| self.set('atmega_rst', 'on') |
| self.set('at_hwb', 'off') |
| self.set('atmega_rst', 'off') |
| self._usbkm232 = self.get('atmega_pty') |
| |
| # We don't need to set the atmega uart settings if we're a servo v4. |
| if 'servo_v4' not in self._version: |
| self.set('atmega_baudrate', '9600') |
| self.set('atmega_bits', 'eight') |
| self.set('atmega_parity', 'none') |
| self.set('atmega_sbits', 'one') |
| self.set('usb_mux_sel4', 'on') |
| self.set('usb_mux_oe4', 'on') |
| # Allow atmega bootup time. |
| time.sleep(1.0) |
| |
| self._logger.info('USBKM232: %s', self._usbkm232) |
| return keyboard_handlers.USBkm232Handler(servo, self._usbkm232) |
| else: |
| # The following boards don't use Chrome EC. |
| if board in ('alex', 'butterfly', 'lumpy', 'zgb'): |
| return keyboard_handlers.MatrixKeyboardHandler(servo) |
| return keyboard_handlers.ChromeECHandler(servo) |
| |
| def close(self): |
| """Servod turn down logic.""" |
| for i, interface in enumerate(self._interface_list): |
| self._logger.info('Turning down interface %d' % i) |
| if hasattr(interface, 'close'): |
| interface.close() |
| |
| def _init_ftdi_dummy(self, vendor, product, serialname, interface): |
| """Dummy interface for ftdi devices. |
| |
| This is a dummy function specifically for ftdi devices to not initialize |
| anything but to help pad the interface list. |
| |
| Returns: |
| None. |
| """ |
| return None |
| |
| def _init_ftdi_gpio(self, vendor, product, serialname, interface): |
| """Initialize gpio driver interface and open for use. |
| |
| Args: |
| interface: interface number of FTDI device to use. |
| |
| Returns: |
| Instance object of interface. |
| |
| Raises: |
| ServodError: If init fails |
| """ |
| fobj = ftdigpio.Fgpio(vendor, product, interface, serialname) |
| try: |
| fobj.open() |
| except ftdigpio.FgpioError as e: |
| raise ServodError('Opening gpio interface. %s ( %d )' % (e.msg, e.value)) |
| |
| return fobj |
| |
| def _init_stm32_uart(self, vendor, product, serialname, interface): |
| """Initialize stm32 uart interface and open for use |
| |
| Note, the uart runs in a separate thread. Users wishing to |
| interact with it will query control for the pty's pathname and connect |
| with their favorite console program. For example: |
| cu -l /dev/pts/22 |
| |
| Args: |
| interface: dict of interface parameters. |
| |
| Returns: |
| Instance object of interface |
| |
| Raises: |
| ServodError: Raised on init failure. |
| """ |
| self._logger.info('Suart: interface: %s' % interface) |
| sobj = stm32uart.Suart(vendor, product, interface['interface'], serialname) |
| |
| try: |
| sobj.run() |
| except stm32uart.SuartError as e: |
| raise ServodError('Running uart interface. %s ( %d )' % (e.msg, e.value)) |
| |
| self._logger.info('%s' % sobj.get_pty()) |
| return sobj |
| |
| def _init_stm32_gpio(self, vendor, product, serialname, interface): |
| """Initialize stm32 gpio interface. |
| Args: |
| interface: dict of interface parameters. |
| |
| Returns: |
| Instance object of interface |
| |
| Raises: |
| SgpioError: Raised on init failure. |
| """ |
| interface_number = interface |
| # Interface could be a dict. |
| if type(interface) is dict: |
| interface_number = interface['interface'] |
| self._logger.info('Sgpio: interface: %s' % interface_number) |
| return stm32gpio.Sgpio(vendor, product, interface_number, serialname) |
| |
| def _init_stm32_i2c(self, vendor, product, serialname, interface): |
| """Initialize stm32 USB to I2C bridge interface and open for use |
| |
| Args: |
| interface: dict of interface parameters. |
| |
| Returns: |
| Instance object of interface. |
| |
| Raises: |
| Si2cError: Raised on init failure. |
| """ |
| self._logger.info('Si2cBus: interface: %s' % interface) |
| port = interface.get('port', 0) |
| return stm32i2c.Si2cBus(vendor, product, interface['interface'], port=port, |
| serialname=serialname) |
| |
| def _init_bb_adc(self, vendor, product, serialname, interface): |
| """Initalize beaglebone ADC interface.""" |
| return bbadc.BBadc() |
| |
| def _init_bb_gpio(self, vendor, product, serialname, interface): |
| """Initalize beaglebone gpio interface.""" |
| return bbgpio.BBgpio() |
| |
| def _init_ftdi_i2c(self, vendor, product, serialname, interface): |
| """Initialize i2c interface and open for use. |
| |
| Args: |
| interface: interface number of FTDI device to use |
| |
| Returns: |
| Instance object of interface |
| |
| Raises: |
| ServodError: If init fails |
| """ |
| fobj = ftdii2c.Fi2c(vendor, product, interface, serialname) |
| try: |
| fobj.open() |
| except ftdii2c.Fi2cError as e: |
| raise ServodError('Opening i2c interface. %s ( %d )' % (e.msg, e.value)) |
| |
| # Set the frequency of operation of the i2c bus. |
| # TODO(tbroch) make configureable |
| fobj.setclock(MAX_I2C_CLOCK_HZ) |
| |
| return fobj |
| |
| # TODO (sbasi) crbug.com/187489 - Implement bb_i2c. |
| def _init_bb_i2c(self, interface): |
| """Initalize beaglebone i2c interface.""" |
| return bbi2c.BBi2c(interface) |
| |
| def _init_dev_i2c(self, vendor, product, serialname, interface): |
| """Initalize Linux i2c-dev interface.""" |
| return i2cbus.I2CBus('/dev/i2c-%d' % interface['bus_num']) |
| |
| def _init_ftdi_uart(self, vendor, product, serialname, interface): |
| """Initialize ftdi uart inteface and open for use |
| |
| Note, the uart runs in a separate thread (pthreads). Users wishing to |
| interact with it will query control for the pty's pathname and connect |
| with there favorite console program. For example: |
| cu -l /dev/pts/22 |
| |
| Args: |
| interface: interface number of FTDI device to use |
| |
| Returns: |
| Instance object of interface |
| |
| Raises: |
| ServodError: If init fails |
| """ |
| fobj = ftdiuart.Fuart(vendor, product, interface, serialname) |
| try: |
| fobj.run() |
| except ftdiuart.FuartError as e: |
| raise ServodError('Running uart interface. %s ( %d )' % (e.msg, e.value)) |
| |
| self._logger.info('%s' % fobj.get_pty()) |
| return fobj |
| |
| # TODO (sbasi) crbug.com/187492 - Implement bbuart. |
| def _init_bb_uart(self, vendor, product, serialname, interface): |
| """Initalize beaglebone uart interface.""" |
| logging.debug('UART INTERFACE: %s', interface) |
| return bbuart.BBuart(interface) |
| |
| def _init_ftdi_gpiouart(self, vendor, product, serialname, interface): |
| """Initialize special gpio + uart interface and open for use |
| |
| Note, the uart runs in a separate thread (pthreads). Users wishing to |
| interact with it will query control for the pty's pathname and connect |
| with there favorite console program. For example: |
| cu -l /dev/pts/22 |
| |
| Args: |
| interface: interface number of FTDI device to use |
| |
| Returns: |
| Instance objects of interface |
| |
| Raises: |
| ServodError: If init fails |
| """ |
| fgpio = self._init_ftdi_gpio(vendor, product, serialname, interface) |
| fuart = ftdiuart.Fuart(vendor, product, interface, serialname, fgpio._fc) |
| try: |
| fuart.run() |
| except ftdiuart.FuartError as e: |
| raise ServodError('Running uart interface. %s ( %d )' % (e.msg, e.value)) |
| |
| self._logger.info('uart pty: %s' % fuart.get_pty()) |
| return fgpio, fuart |
| |
| def _init_ec3po_uart(self, vendor, product, serialname, interface): |
| """Initialize EC-3PO console interpreter interface. |
| |
| Args: |
| interface: A dictionary representing the interface. |
| |
| Returns: |
| An EC3PO object representing the EC-3PO interface or None if there's no |
| interface for the USB PD UART. |
| """ |
| raw_uart_name = interface['raw_pty'] |
| raw_uart_source = interface['source'] |
| if self._syscfg.is_control(raw_uart_name): |
| raw_ec_uart = self.get(raw_uart_name) |
| return ec3po_interface.EC3PO(raw_ec_uart, raw_uart_source) |
| else: |
| # The overlay doesn't have the raw PTY defined, therefore we can skip |
| # initializing this interface since no control relies on it. |
| self._logger.debug( |
| 'Skip initializing EC3PO for %s, no control specified.', |
| raw_uart_name) |
| return None |
| |
| def _camel_case(self, string): |
| output = '' |
| for s in string.split('_'): |
| if output: |
| output += s.capitalize() |
| else: |
| output = s |
| return output |
| |
| def clear_cached_drv(self): |
| """Clear the cached drivers. |
| |
| The drivers are cached in the Dict _drv_dict when a control is got or set. |
| When the servo interfaces are relocated, the cached values may become wrong. |
| Should call this method to clear the cached values. |
| """ |
| self._drv_dict = {} |
| |
| def _get_servo_specific_param(self, params, param_key, control_name): |
| """Get |param_key| from params by looking for servo specific params first. |
| |
| Find the candidate servos. Using servo_v4 with a servo_micro connected as |
| example, the following shows the priority for selecting the interface. |
| |
| 1. The full name. (e.g. - 'servo_v4_with_servo_micro_interface') |
| 2. servo_micro_interface |
| 3. servo_v4_interface |
| 4. Fallback to the default, interface. |
| |
| Args: |
| params: params dictionary for a control |
| param_key: identifier in the params dictionary to look for |
| control_name: control name the params correspond to |
| |
| Returns: |
| The best suited param value for param_key given the servo type or |
| None if even the default is not defined. |
| """ |
| candidates = [self._version] |
| candidates.extend(reversed(self._version.split('_with_'))) |
| candidates = ['%s_%s' % (c, param_key) for c in candidates] |
| candidates.append(param_key) |
| for c in candidates: |
| if c in params: |
| self._logger.debug('Using %s parameter.', c) |
| return params[c] |
| self._logger.error('Unable to determine %s for %s', param_key, control_name) |
| self._logger.error('params: %r', params) |
| return None |
| |
| def _get_param_drv(self, control_name, is_get=True): |
| """Get access to driver for a given control. |
| |
| Note, some controls have different parameter dictionaries for 'getting' the |
| control's value versus 'setting' it. Boolean is_get distinguishes which is |
| being requested. |
| |
| Args: |
| control_name: string name of control |
| is_get: boolean to determine |
| |
| Returns: |
| tuple (param, drv) where: |
| param: param dictionary for control |
| drv: instance object of driver for particular control |
| |
| Raises: |
| ServodError: Error occurred while examining params dict |
| """ |
| self._logger.debug('') |
| # if already setup just return tuple from driver dict |
| if control_name in self._drv_dict: |
| if is_get and ('get' in self._drv_dict[control_name]): |
| return self._drv_dict[control_name]['get'] |
| if not is_get and ('set' in self._drv_dict[control_name]): |
| return self._drv_dict[control_name]['set'] |
| |
| params = self._syscfg.lookup_control_params(control_name, is_get) |
| |
| # Get the most suitable drv given the servo instance. |
| drv_name = self._get_servo_specific_param(params, 'drv', control_name) |
| if drv_name == 'na': |
| # 'na' drv can be used to selectively turn controls into noops for |
| # a given servo hardware. Ensure that there is an interface. |
| params.setdefault('interface', 'servo') |
| self._logger.debug('Setting interface to default to %r for %r unless ' |
| ' defined in params, as drv is %r.', 'servo', |
| control_name, 'na') |
| # Setting input_type to str allows all inputs through enabling a true noop |
| params.update({'input_type': 'str'}) |
| interface_id = self._get_servo_specific_param(params, 'interface', |
| control_name) |
| if None in [drv_name, interface_id]: |
| raise ServodError('No drv/interface for control %r found' % control_name) |
| |
| if interface_id == 'servo': |
| interface = self |
| else: |
| index = int(interface_id) |
| interface = self._interface_list[index] |
| |
| drv_module = getattr(servo_drv, drv_name) |
| drv_class = getattr(drv_module, self._camel_case(drv_name)) |
| drv = drv_class(interface, params) |
| if control_name not in self._drv_dict: |
| self._drv_dict[control_name] = {} |
| if is_get: |
| self._drv_dict[control_name]['get'] = (params, drv) |
| else: |
| self._drv_dict[control_name]['set'] = (params, drv) |
| return (params, drv) |
| |
| def doc_all(self): |
| """Return all documenation for controls. |
| |
| Returns: |
| string of <doc> text in config file (xml) and the params dictionary for |
| all controls. |
| |
| For example: |
| warm_reset :: Reset the device warmly |
| ------------------------> {'interface': '1', 'map': 'onoff_i', ... } |
| """ |
| return self._syscfg.display_config() |
| |
| def doc(self, name): |
| """Retreive doc string in system config file for given control name. |
| |
| Args: |
| name: name string of control to get doc string |
| |
| Returns: |
| doc string of name |
| |
| Raises: |
| NameError: if fails to locate control |
| """ |
| self._logger.debug('name(%s)' % (name)) |
| if self._syscfg.is_control(name): |
| return self._syscfg.get_control_docstring(name) |
| else: |
| raise NameError('No control %s' % name) |
| |
| def safe_switch_usbkey_power(self, power_state, _=None): |
| """Toggle the usb power safely. |
| |
| Args: |
| power_state: The setting to set for the usbkey power. |
| _: to conform to current API |
| |
| Returns: |
| An empty string to appease the xmlrpc gods. |
| """ |
| self.set('image_usbkey_pwr', power_state) |
| return '' |
| |
| def safe_switch_usbkey(self, mux_direction, _=0): |
| """Toggle the usb direction safely. |
| |
| Args: |
| mux_direction: "servo_sees_usbkey" or "dut_sees_usbkey". |
| _: to conform to current API |
| |
| Returns: |
| An empty string to appease the xmlrpc gods. |
| """ |
| self.set('image_usbkey_direction', mux_direction) |
| return '' |
| |
| def probe_host_usb_dev(self, _=0): |
| """Probe the USB disk device plugged in the servo from the host side. |
| |
| Args: |
| _: to conform to current API |
| |
| Returns: |
| USB disk path if one and only one USB disk path is found, otherwise an |
| """ |
| return self.get('image_usbkey_dev') |
| |
| def download_image_to_usb(self, image_path, _=0): |
| """Download image and save to the USB device found by probe_host_usb_dev. |
| If the image_path is a URL, it will download this url to the USB path; |
| otherwise it will simply copy the image_path's contents to the USB path. |
| |
| Args: |
| image_path: path or url to the recovery image. |
| _: to conform to current API |
| |
| Returns: |
| True|False: True if process completed successfully, False if error |
| occurred. |
| """ |
| try: |
| self.set('download_image_to_usb_dev', image_path) |
| return True |
| except Exception: |
| return False |
| |
| def make_image_noninteractive(self): |
| """Makes the recovery image noninteractive. |
| |
| A noninteractive image will reboot automatically after installation |
| instead of waiting for the USB device to be removed to initiate a system |
| reboot. |
| |
| Mounts partition 1 of the image stored on usb_dev and creates a file |
| called "non_interactive" so that the image will become noninteractive. |
| |
| Returns: |
| True|False: True if process completed successfully, False if error |
| occurred. |
| """ |
| try: |
| usb_dev = self.get('image_usbkey_dev') |
| usb_dev_partition = '%s1' % usb_dev |
| self.set('make_usb_dev_image_noninteractive', usb_dev_partition) |
| return True |
| except Exception: |
| return False |
| |
| def set_get_all(self, cmds): |
| """Set &| get one or more control values. |
| |
| Args: |
| cmds: list of control[:value] to get or set. |
| |
| Returns: |
| rv: list of responses from calling get or set methods. |
| """ |
| rv = [] |
| for cmd in cmds: |
| if ':' in cmd: |
| (control, value) = cmd.split(':') |
| rv.append(self.set(control, value)) |
| else: |
| rv.append(self.get(cmd)) |
| return rv |
| |
| def get_serial_number(self, name): |
| """Returns the desired serial number from the serialnames dict. |
| |
| Args: |
| name: A string which is the key into the _serialnames dictionary. |
| |
| Returns: |
| A string containing the serial number or "unknown". |
| """ |
| if not name: |
| name = 'main' |
| |
| try: |
| return self._serialnames[name] |
| except KeyError: |
| self._logger.debug("'%s_serialname' not found!", name) |
| return 'unknown' |
| |
| def get(self, name): |
| """Get control value. |
| |
| Args: |
| name: name string of control |
| |
| Returns: |
| Response from calling drv get method. Value is reformatted based on |
| control's dictionary parameters |
| |
| Raises: |
| HwDriverError: Error occurred while using drv |
| ServodError: if interfaces are not available within timeout period |
| """ |
| if not self._ifaces_available.wait(self.INTERFACE_AVAILABILITY_TIMEOUT): |
| raise ServodError('Timed out waiting for interfaces to become available.') |
| self._logger.debug('name(%s)' % (name)) |
| # This route is to retrieve serialnames on servo v4, which |
| # connects to multiple servo-micros or CCD, like the controls, |
| # 'ccd_serialname', 'servo_micro_for_soraka_serialname', etc. |
| # TODO(aaboagye): Refactor it. |
| if 'serialname' in name: |
| return self.get_serial_number(name.split('serialname')[0].strip('_')) |
| |
| (param, drv) = self._get_param_drv(name) |
| try: |
| val = drv.get() |
| rd_val = self._syscfg.reformat_val(param, val) |
| self._logger.debug('%s = %s' % (name, rd_val)) |
| return rd_val |
| except AttributeError, error: |
| self._logger.error('Getting %s: %s' % (name, error)) |
| raise |
| except HwDriverError: |
| self._logger.error('Getting %s' % (name)) |
| raise |
| |
| def get_all(self, verbose): |
| """Get all controls values. |
| |
| Args: |
| verbose: Boolean on whether to return doc info as well |
| |
| Returns: |
| string creating from trying to get all values of all controls. In case of |
| error attempting access to control, response is 'ERR'. |
| """ |
| rsp = [] |
| for name in self._syscfg.syscfg_dict['control']: |
| self._logger.debug('name = %s' % name) |
| try: |
| value = self.get(name) |
| except Exception: |
| value = 'ERR' |
| pass |
| if verbose: |
| rsp.append('GET %s = %s :: %s' % (name, value, self.doc(name))) |
| else: |
| rsp.append('%s:%s' % (name, value)) |
| return '\n'.join(sorted(rsp)) |
| |
| def set(self, name, wr_val_str): |
| """Set control. |
| |
| Args: |
| name: name string of control |
| wr_val_str: value string to write. Can be integer, float or a |
| alpha-numerical that is mapped to a integer or float. |
| |
| Raises: |
| HwDriverError: Error occurred while using driver |
| ServodError: if interfaces are not available within timeout period |
| """ |
| if not self._ifaces_available.wait(self.INTERFACE_AVAILABILITY_TIMEOUT): |
| raise ServodError('Timed out waiting for interfaces to become available.') |
| self._logger.debug('name(%s) wr_val(%s)' % (name, wr_val_str)) |
| (params, drv) = self._get_param_drv(name, False) |
| wr_val = self._syscfg.resolve_val(params, wr_val_str) |
| try: |
| drv.set(wr_val) |
| except HwDriverError: |
| self._logger.error('Setting %s -> %s' % (name, wr_val_str)) |
| raise |
| # TODO(crbug.com/841097) Figure out why despite allow_none=True for both |
| # xmlrpc server & client I still have to return something to appease the |
| # marshall/unmarshall |
| return True |
| |
| def hwinit(self, verbose=False): |
| """Initialize all controls. |
| |
| These values are part of the system config XML files of the form |
| init=<value>. This command should be used by clients wishing to return the |
| servo and DUT its connected to a known good/safe state. |
| |
| Note that initialization errors are ignored (as in some cases they could |
| be caused by DUT firmware deficiencies). This might need to be fine tuned |
| later. |
| |
| Args: |
| verbose: boolean, if True prints info about control initialized. |
| Otherwise prints nothing. |
| |
| Returns: |
| This function is called across RPC and as such is expected to return |
| something unless transferring 'none' across is allowed. Hence adding a |
| dummy return value to make things simpler. |
| """ |
| for control_name, value in self._syscfg.hwinit: |
| try: |
| # Workaround for bug chrome-os-partner:42349. Without this check, the |
| # gpio will briefly pulse low if we set it from high to high. |
| if self.get(control_name) != value: |
| self.set(control_name, value) |
| if verbose: |
| self._logger.info('Initialized %s to %s', control_name, value) |
| except Exception: |
| self._logger.exception( |
| 'Problem initializing %s -> %s', control_name, value) |
| |
| # Init keyboard after all the intefaces are up. |
| self._keyboard = self._init_keyboard_handler(self, self._board) |
| return True |
| |
| def echo(self, echo): |
| """Dummy echo function for testing/examples. |
| |
| Args: |
| echo: string to echo back to client |
| """ |
| self._logger.debug('echo(%s)' % (echo)) |
| return 'ECH0ING: %s' % (echo) |
| |
| def get_board(self): |
| """Return the board specified at startup, if any.""" |
| return self._board |
| |
| def get_base_board(self): |
| """Returns the board name of the base if present. |
| |
| Returns: |
| A string of the board name, or '' if not present. |
| """ |
| # The value is set in servo_postinit. |
| return self._base_board |
| |
| def get_version(self): |
| """Get servo board version.""" |
| return self._version |
| |
| def power_long_press(self): |
| """Simulate a long power button press.""" |
| # After a long power press, the EC may ignore the next power |
| # button press (at least on Alex). To guarantee that this |
| # won't happen, we need to allow the EC one second to |
| # collect itself. |
| return self.set('power_key', 'long_press') |
| |
| def power_normal_press(self): |
| """Simulate a normal power button press.""" |
| return self.set('power_key', 'press') |
| |
| def power_short_press(self): |
| """Simulate a short power button press.""" |
| return self.set('power_key', 'short_press') |
| |
| def power_key(self, press_secs=''): |
| """Simulate a power button press. |
| |
| Args: |
| press_secs: Time in seconds to simulate the keypress. |
| """ |
| return self.set('power_key', 'press' if press_secs is '' else press_secs) |
| |
| def ctrl_d(self, press_secs=''): |
| """Simulate Ctrl-d simultaneous button presses.""" |
| return self.set('ctrl_d', 'tab' if press_secs is '' else press_secs) |
| |
| def ctrl_u(self, press_secs=''): |
| """Simulate Ctrl-u simultaneous button presses.""" |
| return self.set('ctrl_u', 'tab' if press_secs is '' else press_secs) |
| |
| def ctrl_enter(self, press_secs=''): |
| """Simulate Ctrl-enter simultaneous button presses.""" |
| return self.set('ctrl_enter', 'tab' if press_secs is '' else press_secs) |
| |
| def d_key(self, press_secs=''): |
| """Simulate Enter key button press.""" |
| return self.set('d_key', 'tab' if press_secs is '' else press_secs) |
| |
| def ctrl_key(self, press_secs=''): |
| """Simulate Enter key button press.""" |
| return self.set('ctrl_key', 'tab' if press_secs is '' else press_secs) |
| |
| def enter_key(self, press_secs=''): |
| """Simulate Enter key button press.""" |
| return self.set('enter_key', 'tab' if press_secs is '' else press_secs) |
| |
| def refresh_key(self, press_secs=''): |
| """Simulate Refresh key (F3) button press.""" |
| return self.set('refresh_key', 'tab' if press_secs is '' else press_secs) |
| |
| def ctrl_refresh_key(self, press_secs=''): |
| """Simulate Ctrl and Refresh (F3) simultaneous press. |
| |
| This key combination is an alternative of Space key. |
| """ |
| return self.set('ctrl_refresh_key', ('tab' if press_secs is '' else |
| press_secs)) |
| |
| def imaginary_key(self, press_secs=''): |
| """Simulate imaginary key button press. |
| |
| Maps to a key that doesn't physically exist. |
| """ |
| return self.set('imaginary_key', 'tab' if press_secs is '' else press_secs) |
| |
| def sysrq_x(self, press_secs=''): |
| """Simulate Alt VolumeUp X simultaneous press. |
| |
| This key combination is the kernel system request (sysrq) x. |
| """ |
| return self.set('sysrq_x', 'tab' if press_secs is '' else press_secs) |
| |
| def get_servo_serials(self): |
| """Return all the serials associated with this process.""" |
| return self._serialnames |
| |
| |
| def test(): |
| """Integration testing. |
| |
| TODO(tbroch) Enhance integration test and add unittest (see mox) |
| """ |
| logging.basicConfig( |
| level=logging.DEBUG, |
| format='%(asctime)s - %(name)s - ' + '%(levelname)s - %(message)s') |
| # configure server & listen |
| servod_obj = Servod(1) |
| # 5 == number of interfaces on a FT4232H device |
| for i in xrange(1, 5): |
| if i == 2: |
| # its an i2c interface ... see __init__ for details and TODO to make |
| # this configureable |
| servod_obj._interface_list[i].wr_rd(0x21, [0], 1) |
| else: |
| # its a gpio interface |
| servod_obj._interface_list[i].wr_rd(0) |
| |
| server = SimpleXMLRPCServer.SimpleXMLRPCServer(('localhost', 9999), |
| allow_none=True) |
| server.register_introspection_functions() |
| server.register_multicall_functions() |
| server.register_instance(servod_obj) |
| logging.info('Listening on localhost port 9999') |
| server.serve_forever() |
| |
| |
| if __name__ == '__main__': |
| test() |
| |
| # simple client transaction would look like |
| """remote_uri = 'http://localhost:9999' client = xmlrpclib.ServerProxy(remote_uri, verbose=False) send_str = "Hello_there" print "Sent " + send_str + ", Recv " + client.echo(send_str) |
| |
| """ |