blob: 9e72ed9389f3c6b7e811d89b2f6dc9d3d8c6c175 [file] [log] [blame]
#!/usr/bin/env python2
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Python version of Servo hardware debug & control board server."""
# pylint: disable=g-bad-import-order
# pkg_resources is erroneously suggested to be in the 1st party segment
import errno
import logging
import optparse
import os
import pkg_resources
import select
import signal
import SimpleXMLRPCServer
import socket
import sys
import threading
import time
import drv.loglevel
import ftdi_common
import multiservo
import servo_interfaces
import servo_server
import system_config
import terminal_freezer
import usb
VERSION = pkg_resources.require('servo')[0].version
# If user does not specify a port to use, try ports in this range. Traverse
# the range from high to low addresses to maintain backwards compatibility
# (the first checked default port is 9999, the range is such that all possible
# port numbers are 4 digits).
DEFAULT_PORT_RANGE = (9990, 9999)
# This text file holds servod configuration parameters. This is especially
# handy for multi servo operation.
# The file format is pretty loose:
# - text starting with # is ignored til the end of the line
# - empty lines are ignored
# - configuration lines consist of up to 4 comma separated fields (all
# but the first field optional):
# servo-name, serial-number, port-number, board-name
# where
# . servo-name - a user defined symbolic name, just a reference
# to a certain servo board
# . serial-number - serial number of the servo board this line pertains to
# . port-number - desired port number for servod for this board, can be
# overridden by the command line switch --port or
# environment variable setting SERVOD_PORT
# . board-name - board configuration file to use, can be
# overridden by the command line switch --board
# Since the same parameters could be defined using different means, there is a
# hierarchy of definitions:
# command line <- environment definition <- rc config file
DEFAULT_RC_FILE = '/home/%s/.servodrc' % os.getenv('SUDO_USER', '')
def usb_get_iserial(device):
"""Get USB device's iSerial string.
device: usb.Device object
iserial: USB devices iSerial string or empty string if the device has
no serial number.
# pylint: disable=broad-except
device_handle =
# The device has no serial number string descriptor.
if device.iSerialNumber == 0:
return ''
iserial = ''
iserial = device_handle.getString(device.iSerialNumber, MAX_ISERIAL_STR)
except usb.USBError, e:
# TODO(tbroch) other non-FTDI devices on my host cause following msg
# usb.USBError: error sending control message: Broken pipe
# Need to investigate further
except Exception as e:
# This was causing servod to fail to start in the presence of
# a broken usb interface.
print 'usb_get_iserial failed in an unknown way: [%s]' % e
return iserial
def usb_find(vendor, product, serialname):
"""Find USB devices based on vendor, product and serial identifiers.
Locates all USB devices that match the criteria of the arguments. In the
case where input arguments are 'None' that argument is a don't care
vendor: USB vendor id (integer)
product: USB product id (integer)
serialname: USB serial id (string)
matched_devices : list of pyusb devices matching input args
matched_devices = []
for bus in usb.busses():
for device in bus.devices:
if (not vendor or device.idVendor == vendor) and \
(not product or device.idProduct == product) and \
(not serialname or usb_get_iserial(device).endswith(serialname)):
return matched_devices
class ServodError(Exception):
"""Exception class for servod server."""
class ServodStarter(object):
"""Class to manage servod instance and rpc server its being served on."""
def __init__(self):
"""Prepare servod invocation.
Parse cmdline and prompt user for missing information if necessary to start
servod. Prepare servod instance & thread for it to be served from.
ServodError: if automatic config cannot be found
(options, _) = self._parse_args()
if options.debug:
level = 'debug'
level = drv.loglevel.DEFAULT_LOGLEVEL
loglevel, fmt = drv.loglevel.LOGLEVEL_MAP[level]
logging.basicConfig(level=loglevel, format=fmt)
# Servod needs to be running in the chroot without PID namespaces in order
# to freeze terminals when reading from the UARTs.
self._logger = logging.getLogger(os.path.basename(sys.argv[0]))'Start')
multiservo.get_env_options(self._logger, options)
if and options.serialname:
self._logger.error("Mutually exclusive '--name' or '--serialname' is "
servo_device = self.discover_servo(options,
if not servo_device:
self._host =
lot_id = self.get_lot_id(servo_device)
board_version = self.get_board_version(lot_id, servo_device.idProduct)
self._logger.debug('board_version = %s', board_version)
all_configs = []
if not options.noautoconfig:
all_configs += self.get_auto_configs(board_version)
if options.config:
for config in options.config:
# quietly ignore duplicate configs for backwards compatibility
if config not in all_configs:
if not all_configs:
raise ServodError('No automatic config found,'
' and no config specified with -c <file>')
scfg = system_config.SystemConfig()
if options.board:
board_config = 'servo_' + options.board + '_overlay.xml'
if not scfg.find_cfg_file(board_config):
self._logger.error('No XML overlay for board %s', options.board)
sys.exit(-1)'Found XML overlay for board %s', options.board)
for cfg_file in all_configs:
self._logger.debug('\n%s', scfg.display_config())
self._logger.debug('Servo is vid:0x%04x pid:0x%04x sid:%s',
servo_device.idVendor, servo_device.idProduct,
if options.port:
start_port = options.port
end_port = options.port
end_port, start_port = DEFAULT_PORT_RANGE
for self._servo_port in xrange(start_port, end_port - 1, -1):
self._server = SimpleXMLRPCServer.SimpleXMLRPCServer((self._host,
except socket.error as e:
if e.errno == errno.EADDRINUSE:
continue # Port taken, see if there is another one next to it.
self._logger.fatal("Problem opening Server's socket: %s", e)
if options.port:
err_msg = ('Port %d is busy' % options.port)
err_msg = ('Could not find a free port in %d..%d range' % (end_port,
self._servod = servo_server.Servod(
scfg, vendor=servo_device.idVendor, product=servo_device.idProduct,
interfaces=options.interfaces.split(), board=options.board,
version=board_version, usbkm232=options.usbkm232)
# Small timeout to allow interface threads to initialize.
# pylint: disable=undefined-loop-variable
# |servo_port| is either initialized in the loop or the loop fails.
self._server_thread = threading.Thread(target=self._serve)
self._server_thread.daemon = True
self._exit_status = 0
def handle_sig(self, signum):
"""Handle a signal by turning off the server & cleaning up servod."""'Received signal: %d. Attempting to turn off', signum)
self._servod.close()'Successfully turned off')
# TODO(tbroch) merge w/ parse_common_args properly
def _parse_args(self):
"""Parse commandline arguments.
Note, reads sys.argv directly
tuple (options, args) from optparse.parse_args().
description = (
'%prog is server to interact with servo debug & control board. '
'This server communicates to the board via USB and the client via '
'xmlrpc library. Launcher most specify at least one --config <file> '
'in order for the server to provide any functionality. In most cases, '
'multiple configs will be needed to expose complete functionality '
'between debug & DUT board.')
examples = (
'\nExamples:\n > %prog -b <path>/data/servo.xml\n\tLaunch server on '
'defualt host:port with configs native to servo\n > %prog -b <file> '
'-p 8888\n\tLaunch server listening on port 8888\n > %prog -b <file> '
'-v 0x18d1 -p 0x5001\n\tLaunch targetting usb device \n\twith vid:pid '
'== 0x18d1:0x5001 (Google/Servo)\n')
parser = optparse.OptionParser(version='%prog ' + VERSION)
parser.description = description
parser.add_option('-d', '--debug', action='store_true', default=False,
help='enable debug messages')
parser.add_option('', '--host', default='localhost', type=str,
help='hostname to start server on')
'', '--port', default=None, type=int,
help='port for server to listen on, by default '
'will try ports in %d..%d range, could also be supplied '
'through environment variable SERVOD_PORT' % DEFAULT_PORT_RANGE)
parser.add_option('-v', '--vendor', default=None, type=int,
help='vendor id of ftdi device to interface to')
parser.add_option('-p', '--product', default=None, type=int,
help='USB product id of ftdi device to interface with')
parser.add_option('-s', '--serialname', default=None, type=str,
help='device serialname stored in eeprom')
parser.add_option('-c', '--config', default=None, type=str, action='append',
help='system config files (XML) to read')
parser.add_option('-b', '--board', default='', type=str, action='store',
help='include config file (XML) for given board')
parser.add_option('--noautoconfig', action='store_true', default=False,
help='Disable automatic determination of config files')
parser.add_option('-i', '--interfaces', type=str, default='',
help='ordered space-delimited list of interfaces. '
'Valid choices are gpio|i2c|uart|gpiouart|dummy')
parser.add_option('-u', '--usbkm232', type=str,
help='path to USB-KM232 device which allow for '
'sending keyboard commands to DUTs that do not '
'have built in keyboards. Used in FAFT tests. '
'(Optional), e.g. /dev/ttyUSB0')
parser.set_usage(parser.get_usage() + examples)
return parser.parse_args()
def find_servod_match(self, options, all_servos, servodrc):
"""Find a servo matching one of servodrc lines.
Given a list of servod objects matching discovered servos, display the list
to the user and check if there is a configuration file line corresponding to
one of the servos.
If a line like that exists, and it includes options which are not yet
defined in the options object - set these options' values. If the option is
already defined - report that this config line setting is ignored.
options: an options object as returned by parse_options
all_servos: a list of servod objects corresponding to discovered servo
servodrc: a dictionary representing the contents of the servodrc file, as
returned by parse_rc() above (if any)
a matching servod object, if found, None otherwise
ServodError: in case required name is not found in the config file
for servo in all_servos:'Found servo, vid: 0x%04x pid: 0x%04x sid: %s',
servo.idVendor, servo.idProduct, usb_get_iserial(servo))
# If user specified servod name in the command line - match it to the serial
# number.
config = servodrc.get(
if not config:
raise ServodError("Name '%s' not in the config file" %
options.serialname = config['sn']
elif options.serialname:
# Let's try finding config for a serial name
for config in servodrc.itervalues():
if config['sn'] == options.serialname:
return None
if not options.serialname:
# There is nothing to match
return None
for servo in all_servos:
servo_sn = usb_get_iserial(servo)
if servo_sn != options.serialname:
# Match found, some sanity checks/updates before using it
matching_servo = servo
rc_port = config['port']
if rc_port:
if not options.port:
options.port = rc_port
elif options.port != rc_port:
self._logger.warning('Ignoring rc configured port %s for servo %s',
rc_port, servo_sn)
rc_board = config['board']
if rc_board:
if not options.board:
options.board = rc_board
elif options.board != rc_board:
self._logger.warning('Ignoring rc configured board name %s for servo '
'%s', rc_board, servo_sn)
return matching_servo
raise ServodError('No matching servo found')
def choose_servo(self, all_servos):
"""Let user choose a servo from available list of unique devices.
all_servos: a list of servod objects corresponding to discovered servo
servo object for the matching (or single) device, otherwise None
for i, servo in enumerate(all_servos):"Press '%d' for servo, vid: 0x%04x pid: 0x%04x sid: %s",
i, servo.idVendor, servo.idProduct,
(rlist, _, _) =[sys.stdin], [], [], 10)
if not rlist:
self._logger.warn('Timed out waiting for your choice\n')
return None
rsp = rlist[0].readline().strip()
rsp = int(rsp)
except ValueError:
self._logger.warn('%s not a valid choice ... ignoring', rsp)
return None
if rsp < 0 or rsp >= len(all_servos):
self._logger.warn('%s outside of choice range ... ignoring', rsp)
return None'')
servo = all_servos[rsp]'Chose %d ... starting servod on servo '
'vid: 0x%04x pid: 0x%04x sid: %s', rsp, servo.idVendor,
servo.idProduct, usb_get_iserial(servo))'')
return servo
def discover_servo(self, options, servodrc):
"""Find a servo USB device to use.
First, find all servo devices matching command line options, this may result
in discovering none, one or more devices.
Then try matching discovered servos and the configuration defined in
servodrc. A match this will result in reading missing options from servodrc
If there is a match - return the matching device.
If no match found, but there is only one servo connected - return it. If
there is no match found and multiple servos are connected - report an error
and return None.
options: the options object returned by opt_parse
servodrc: a dictionary representing the contents of the servodrc file, as
returned by parse_rc() above (if any)
servo object for the matching (or single) device, otherwise None
vendor, product, serialname = (options.vendor, options.product,
all_servos = []
for (vid, pid) in servo_interfaces.SERVO_ID_DEFAULTS:
if (vendor and vendor != vid) or \
(product and product != pid):
all_servos.extend(usb_find(vid, pid, serialname))
if not all_servos:
self._logger.error('No servos found')
return None
# See if there is a matching entry in servodrc
matching_servo = self.find_servod_match(options, all_servos, servodrc)
if matching_servo:
return matching_servo
if len(all_servos) == 1:
return all_servos[0]
# See if only one primary servo. Filter secordary servos, like servo-micro.
secondary_servos = (
servo_interfaces.SERVO_MICRO_DEFAULTS + servo_interfaces.CCD_DEFAULTS)
all_primary_servos = [
servo for servo in all_servos
if (servo.idVendor, servo.idProduct) not in secondary_servos
if len(all_primary_servos) == 1:
return all_primary_servos[0]
# Let user choose a servo
matching_servo = self.choose_servo(all_servos)
if matching_servo:
return matching_servo
self._logger.error('Use --vendor, --product or --serialname switches to '
'identify servo uniquely, or create a servodrc file '
'and use the --name switch')
return None
def get_board_version(self, lot_id, product_id):
"""Get board version string.
Typically this will be a string of format <boardname>_<version>.
For example, servo_v2.
lot_id: string, identifying which lot device was fabbed from or None
product_id: integer, USB product id
board_version: string, board & version or None if not found
if lot_id:
for (board_version, lot_ids) in \
if lot_id in lot_ids:
return board_version
for (board_version, vids) in \
if product_id in vids:
return board_version
return None
def get_lot_id(self, servo):
"""Get lot_id for a given servo.
servo: usb.Device object
lot_id of the servo device.
lot_id = None
iserial = usb_get_iserial(servo)
self._logger.debug('iserial = %s', iserial)
if not iserial:
self._logger.warn('Servo device has no iserial value')
(lot_id, _) = iserial.split('-')
except ValueError:
self._logger.warn("Servo device's iserial was unrecognized.")
return lot_id
def get_auto_configs(self, board_version):
"""Get xml configs that should be loaded.
board_version: string, board & version
configs: list of XML config files that should be loaded
if board_version not in ftdi_common.SERVO_CONFIG_DEFAULTS:
self._logger.warning('Unable to determine configs to load for board '
'version = %s', board_version)
return []
return ftdi_common.SERVO_CONFIG_DEFAULTS[board_version]
def _serve(self):
"""Wrapper around rpc server's serve_forever to catch server errors."""
# pylint: disable=broad-except'Listening on %s port %s', self._host, self._servo_port)
except Exception:
self._exit_status = 1
def serve(self):
"""Add signal handlers, start servod on its own thread & wait for signal.
Intercepts and handles SIGINT and SIGTERM.
handler = lambda signal, unused, starter=self: starter.handle_sig(signal)
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
def main():
starter = ServodStarter()
except ServodError as e:
print 'Error: ', e.message
if __name__ == '__main__':