# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Servo Server."""
import logging
import os
import re
from SimpleXMLRPCServer import SimpleXMLRPCServer
except ImportError:
from xmlrpc.server import SimpleXMLRPCServer
# TODO( This is for python3 compatibility.
# Remove once fully moved to python3.
import time
import usb
import drv as servo_drv
import interface as _interface
import servo_dev
import servo_interfaces
import servo_logging
import servo_postinit
HwDriverError = servo_drv.hw_driver.HwDriverError
class ServodError(Exception):
"""Exception class for servod."""
class Servod(object):
"""Main class for Servo debug/controller Daemon."""
# This is the key to get the main serial used in the _serialnames dict.
MAIN_SERIAL = 'main'
SERVO_MICRO_SERIAL = 'servo_micro'
CCD_SERIAL = 'ccd'
# Timeout to wait for interfaces to become available again if reinitialization
# is taking place. In seconds. This is supposed to recover from brief resets.
# If the interface disappears for more than 5 seconds, then someone probably
# intentionally disconnected the device. Servod shouldn't be responsible for
# waiting for the device during an intentional disconnect.
# Exceptions to count as known or ordinary. Any errors that aren't instances
# of these (or their subclasses) will be logged with "Please take a look."
KNOWN_EXCEPTIONS = (AttributeError, NameError, HwDriverError)
def init_servo_interfaces(self, vendor, product, serialname, interfaces):
"""Init the servo interfaces with the given interfaces.
We don't use the self._{vendor,product,serialname} attributes because we
want to allow other callers to initialize other interfaces that may not
be associated with the initialized attributes (e.g. a servo v4 servod object
that wants to also initialize a servo micro interface).
vendor: USB vendor id of FTDI device.
product: USB product id of FTDI device.
serialname: String of device serialname/number as defined in FTDI
interfaces: List of strings of interface types the server will
ServodError if unable to locate init method for particular interface.
# If it is a new device add it to the list
device = (vendor, product, serialname)
# Extend the interface list if we need to.
interfaces_len = len(interfaces)
interface_list_len = len(self._interface_list)
if interfaces_len > interface_list_len:
# Fill with dummies.
self._interface_list += [_interface.dummy.Dummy()] * (interfaces_len -
for i, interface_data in enumerate(interfaces):
if type(interface_data) is dict:
name = interface_data['name']
# Store interface index for those that care about it.
interface_data['index'] = i
elif type(interface_data) is str:
if interface_data in ['dummy', 'ftdi_dummy']:
# 'dummy' reserves the interface for future use. Typically the
# interface will be managed by external third-party tools like
# openOCD for JTAG or flashrom for SPI. In the case of servo V4,
# it serves as a placeholder for servo micro interfaces.
name = interface_data
raise ServodError('Illegal interface data type %s'
% type(interface_data))'Initializing interface %d to %s', i, name)
result = _interface.Build(name=name, index=i, vid=vendor, pid=product,
sid=serialname, interface_data=interface_data,
if isinstance(result, tuple):
result_len = len(result)
self._interface_list[i:(i + result_len)] = result
self._interface_list[i] = result
def __init__(self, config, vendor, product, serialname=None, interfaces=None,
board='', model='', version=None, usbkm232=None):
"""Servod constructor.
config: instance of SystemConfig containing all controls for
particular Servod invocation
vendor: usb vendor id of FTDI device
product: usb product id of FTDI device
serialname: string of device serialname/number as defined in FTDI eeprom.
interfaces: list of strings of interface types the server will instantiate
board: board name. e.g. octopus, coral, or scarlet.
model: model name of a given board. e.g. fleex, ampton, or apel.
version: String. Servo board version. Examples: servo_v1, servo_v2,
servo_v2_r0, servo_v3
usbkm232: String. Optional. Path to USB-KM232 device which allow for
sending keyboard commands to DUTs that do not have built in
keyboards. Used in FAFT tests. Use None for on board AVR MCU.
e.g. '/dev/ttyUSB0' or None.
ServodError: if unable to locate init method for particular interface
self._logger = logging.getLogger('Servod')
self._vendor = vendor
self._product = product
self._devices = {}
self._serialnames = {self.MAIN_SERIAL: serialname}
self._syscfg = config
# list of objects (Fi2c, Fgpio) to physical interfaces (gpio, i2c) that ftdi
# interfaces are mapped to
self._interface_list = []
# Dict of Dict to map control name, function name to to tuple (params, drv)
# Ex) _drv_dict[name]['get'] = (params, drv)
self._drv_dict = {}
self._base_board = ''
self._board = board
if model:
self._board += '_' + model
self._model = model
self._version = version
self._usbkm232 = usbkm232
self._keyboard = None
self._usb_keyboard = None
if not interfaces:
interfaces = servo_interfaces.INTERFACE_BOARDS[board][vendor][product]
except KeyError:
interfaces = servo_interfaces.INTERFACE_DEFAULTS[vendor][product]
self._interfaces = interfaces
self.init_servo_interfaces(vendor, product, serialname, interfaces)
def reinitialize(self):
"""Reinitialize all interfaces that support reinitialization"""
for i, interface in enumerate(self._interface_list):
# Indicate interfaces are safe to use again.
for device in self._devices.values():
def get_servo_interfaces(self, position, size):
"""Get the list of servo interfaces.
position: The index the first interface to get.
size: The number of the interfaces.
return self._interface_list[position:(position + size)]
def set_servo_interfaces(self, position, interfaces):
"""Set the list of servo interfaces.
position: The index the first interface to set.
interfaces: The list of interfaces to set.
size = len(interfaces)
self._interface_list[position:(position + size)] = interfaces
def close(self):
"""Servod turn down logic."""
for i, interface in enumerate(self._interface_list):'Turning down interface %d' % i)
def get_devices(self):
return self._devices.values()
def add_device(self, device):
if device not in self._devices:
vid, pid, serial = device
servod_device = servo_dev.ServoDevice(vid, pid, serial)
self._devices[device] = servod_device
def _camel_case(self, string):
output = ''
for s in string.split('_'):
if output:
output += s.capitalize()
output = s
return output
def clear_cached_drv(self):
"""Clear the cached drivers.
The drivers are cached in the Dict _drv_dict when a control is got or set.
When the servo interfaces are relocated, the cached values may become wrong.
Should call this method to clear the cached values.
self._drv_dict = {}
def _get_servo_specific_param(self, params, param_key, control_name):
"""Get |param_key| from params by looking for servo specific params first.
Find the candidate servos. Using servo_v4 with a servo_micro connected as
example, the following shows the priority for selecting the interface.
1. The full name. (e.g. - 'servo_v4_with_servo_micro_interface')
2. servo_micro_interface
3. servo_v4_interface
4. Fallback to the default, interface.
params: params dictionary for a control
param_key: identifier in the params dictionary to look for
control_name: control name the params correspond to
The best suited param value for param_key given the servo type or
None if even the default is not defined.
candidates = [self._version]
if '_with_' in self._version:
v4, raw_dut_device = self._version.split('_with_')
dut_devices = raw_dut_device.split('_and_')
# NOTE(coconutruben): all of this nonsense is going away with the new
# servod and is to bridge the time until then. Please forgive the below
# until then.
if '.' in control_name:
# In the current implementation, the only case where a '.' (a prefix)
# is in the control name is when there is a dual instance with micro and
# ccd on a v4.
dut_device = dut_devices[1]
# In the normal control name, we need to make sure the version used
# does not include the potential _and_ portion from a dual instance.
dut_device = dut_devices[0]
candidates.extend([dut_device, v4])
candidates = ['%s_%s' % (c, param_key) for c in candidates]
for c in candidates:
if c in params:
self._logger.debug('Using %s parameter.', c)
return params[c]
self._logger.error('Unable to determine %s for %s', param_key, control_name)
self._logger.error('params: %r', params)
return None
def _get_param_drv(self, control_name, is_get=True):
"""Get access to driver for a given control.
Note, some controls have different parameter dictionaries for 'getting' the
control's value versus 'setting' it. Boolean is_get distinguishes which is
being requested.
control_name: string name of control
is_get: boolean to determine
tuple (params, drv, device_info) where:
params: param dictionary for control
drv: instance object of driver for particular control
device_info: servo device information
ServodError: Error occurred while examining params dict
# if already setup just return tuple from driver dict
if control_name in self._drv_dict:
if is_get and ('get' in self._drv_dict[control_name]):
return self._drv_dict[control_name]['get']
if not is_get and ('set' in self._drv_dict[control_name]):
return self._drv_dict[control_name]['set']
params = self._syscfg.lookup_control_params(control_name, is_get)
# Get the most suitable drv given the servo instance.
drv_name = self._get_servo_specific_param(params, 'drv', control_name)
if drv_name == 'na':
# 'na' drv can be used to selectively turn controls into noops for
# a given servo hardware. Ensure that there is an interface.
params.setdefault('interface', 'servo')
self._logger.debug('Setting interface to default to %r for %r unless '
' defined in params, as drv is %r.', 'servo',
control_name, 'na')
# Setting input_type to str allows all inputs through enabling a true noop
params.update({'input_type': 'str'})
interface_id = self._get_servo_specific_param(params, 'interface',
if None in [drv_name, interface_id]:
raise ServodError('No drv/interface for control %r found' % control_name)
if interface_id == 'servo':
interface = self
index = int(interface_id)
interface = self._interface_list[index]
device_info = None
if hasattr(interface, 'get_device_info'):
device_info = interface.get_device_info()
drv_module = getattr(servo_drv, drv_name)
drv_class = getattr(drv_module, self._camel_case(drv_name))
drv = drv_class(interface, params)
if control_name not in self._drv_dict:
self._drv_dict[control_name] = {}
if is_get:
self._drv_dict[control_name]['get'] = (params, drv, device_info)
self._drv_dict[control_name]['set'] = (params, drv, device_info)
return (params, drv, device_info)
def doc_all(self):
"""Return all documenation for controls.
string of <doc> text in config file (xml) and the params dictionary for
all controls.
For example:
warm_reset :: Reset the device warmly
------------------------> {'interface': '1', 'map': 'onoff_i', ... }
return self._syscfg.display_config()
def doc(self, name):
"""Retreive doc string in system config file for given control name.
name: name string of control to get doc string
doc string of name
NameError: if fails to locate control
self._logger.debug('name(%s)' % (name))
if self._syscfg.is_control(name):
return self._syscfg.get_control_docstring(name)
raise NameError('No control %s' % name)
def set_get_all(self, cmds):
"""Set &| get one or more control values.
cmds: list of control[:value] to get or set.
rv: list of responses from calling get or set methods.
rv = []
for cmd in cmds:
if ':' in cmd:
(control, value) = cmd.split(':', 1)
rv.append(self.set(control, value))
return rv
def add_serial_number(self, name, serial_number):
"""Adds the serial number to the _serialnames dictionary.
name: A string which is the key into the _serialnames dictionary.
serial_number: A string which is the key into the _serialnames dictionary.
self._serialnames[name] = serial_number
self._logger.debug('Added %s %s to serialnames %r', name, serial_number,
def get_serial_number(self, name):
"""Returns the desired serial number from the serialnames dict.
name: A string which is the key into the _serialnames dictionary.
A string containing the serial number or "unknown".
# Remove the prefix from the serialname control. Serialnames are
# universal. It doesn't matter what the prefix is.
# The prefix is separated from the main control with '.'
name = name.split('.', 1)[-1]
if not name:
name = 'main'
return self._serialnames[name]
except KeyError:
self._logger.debug("'%s_serialname' not found!", name)
return 'unknown'
def get(self, name):
"""Get control value.
name: name string of control
Response from calling drv get method. Value is reformatted based on
control's dictionary parameters
HwDriverError: Error occurred while using drv
ServodError: if interfaces are not available within timeout period
if 'serialname' in name:
# This route is to retrieve serialnames on servo v4, which
# connects to multiple servo-micros or CCD, like the controls,
# 'ccd_serialname', 'servo_micro_for_soraka_serialname', etc.
# TODO(aaboagye): Refactor it.
return self.get_serial_number(name.split('serialname')[0].strip('_'))
with servo_logging.WrapGetCall(
name, known_exceptions=self.KNOWN_EXCEPTIONS) as wrapper:
(params, drv, device) = self._get_param_drv(name)
if device in self._devices:
val = drv.get()
rd_val = self._syscfg.reformat_val(params, val)
return rd_val
def get_all(self, verbose):
"""Get all controls values.
verbose: Boolean on whether to return doc info as well
string creating from trying to get all values of all controls. In case of
error attempting access to control, response is 'ERR'.
rsp = []
for name in self._syscfg.syscfg_dict['control']:
self._logger.debug('name = %s' % name)
value = self.get(name)
except Exception:
value = 'ERR'
if verbose:
rsp.append('GET %s = %s :: %s' % (name, value, self.doc(name)))
rsp.append('%s:%s' % (name, value))
return '\n'.join(sorted(rsp))
def set(self, name, wr_val_str):
"""Set control.
name: name string of control
wr_val_str: value string to write. Can be integer, float or a
alpha-numerical that is mapped to a integer or float.
HwDriverError: Error occurred while using driver
ServodError: if interfaces are not available within timeout period
with servo_logging.WrapSetCall(
name, wr_val_str, known_exceptions=self.KNOWN_EXCEPTIONS):
(params, drv, device) = self._get_param_drv(name, False)
if device in self._devices:
wr_val = self._syscfg.resolve_val(params, wr_val_str)
# TODO( Figure out why despite allow_none=True for both
# xmlrpc server & client I still have to return something to appease the
# marshall/unmarshall
return True
def hwinit(self, verbose=False):
"""Initialize all controls.
These values are part of the system config XML files of the form
init=<value>. This command should be used by clients wishing to return the
servo and DUT its connected to a known good/safe state.
Note that initialization errors are ignored (as in some cases they could
be caused by DUT firmware deficiencies). This might need to be fine tuned
verbose: boolean, if True prints info about control initialized.
Otherwise prints nothing.
This function is called across RPC and as such is expected to return
something unless transferring 'none' across is allowed. Hence adding a
dummy return value to make things simpler.
for control_name, value in self._syscfg.hwinit:
# Workaround for bug chrome-os-partner:42349. Without this check, the
# gpio will briefly pulse low if we set it from high to high.
if self.get(control_name) != value:
self.set(control_name, value)
if verbose:'Initialized %s to %s', control_name, value)
except Exception as e:
'Problem initializing %s -> %s', control_name, value)
self._logger.error('Please consider verifying the logs and if the '
'error is not just a setup issue, consider filing '
'a bug. Also checkout go/servo-ki.')
# If there is the control of 'active_v4_device', set active_v4_device to
# the default device as initialization.
if self._syscfg.is_control('active_v4_device'):
self.set('active_v4_device', 'default')
except servo_drv.active_v4_device.activeV4DeviceError as e:
self._logger.debug('Could not set active device: %s', str(e))
return True
def echo(self, echo):
"""Dummy echo function for testing/examples.
echo: string to echo back to client
self._logger.debug('echo(%s)' % (echo))
return 'ECH0ING: %s' % (echo)
def get_board(self):
"""Return the board specified at startup, if any."""
return self._board
def get_base_board(self):
"""Returns the board name of the base if present.
A string of the board name, or '' if not present.
# The value is set in servo_postinit.
return self._base_board
def get_version(self):
"""Get servo board version."""
return self._version
def get_servo_serials(self):
"""Return all the serials associated with this process."""
return self._serialnames
def test():
"""Integration testing.
TODO(tbroch) Enhance integration test and add unittest (see mox)
format='%(asctime)s - %(name)s - ' + '%(levelname)s - %(message)s')
# configure server & listen
servod_obj = Servod(1)
# 5 == number of interfaces on a FT4232H device
for i in range(1, 5):
if i == 2:
# its an i2c interface ... see __init__ for details and TODO to make
# this configureable
servod_obj._interface_list[i].wr_rd(0x21, [0], 1)
# its a gpio interface
server = SimpleXMLRPCServer(('localhost', 9999), allow_none=True)
server.register_instance(servod_obj)'Listening on localhost port 9999')
if __name__ == '__main__':
"""remote_uri = 'http://localhost:9999' client = xmlrpclib.ServerProxy(remote_uri, verbose=False) send_str = "Hello_there" print "Sent " + send_str + ", Recv " + client.echo(send_str)