blob: 3be288aa1a32511e59c3a9c35e9ce1079dca09d4 [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Python version of Servo hardware debug & control board server."""
import errno
import logging
import optparse
import os
import SimpleXMLRPCServer
import socket
import sys
import usb
import dut
import ftdi_common
import servo_interfaces
import system_config
import servo_server
# TODO(tbroch) determine version string methodology.
VERSION = "0.0.1"
MAX_ISERIAL_STR = 128
# If user does not specify a port to use, try ports in this range. Traverse
# the range from high to low addresses to maintain backwards compatibility
# (the first checked default port is 9999, the range is such that all possible
# port numbers are 4 digits).
DEFAULT_PORT_RANGE = (9990, 9999)
class ServodError(Exception):
"""Exception class for servod server."""
# TODO(tbroch) merge w/ parse_common_args properly
def _parse_args():
"""Parse commandline arguments.
Note, reads sys.argv directly
Returns:
tuple (options, args) from optparse.parse_args().
"""
description = (
"%prog is server to interact with servo debug & control board. "
"This server communicates to the board via USB and the client via "
"xmlrpc library. Launcher most specify at least one --config <file> "
"in order for the server to provide any functionality. In most cases, "
"multiple configs will be needed to expose complete functionality "
"between debug & DUT board."
)
examples = (
"\nExamples:\n"
" > %prog -b <path>/data/servo.xml\n\tLaunch server on defualt host:port "
"with configs native to servo\n"
" > %prog -b <file> -p 8888\n\tLaunch server listening on "
"port 8888\n"
" > %prog -b <file> -v 0x18d1 -p 0x5001\n\tLaunch targetting usb device "
"\n\twith vid:pid == 0x18d1:0x5001 (Google/Servo)\n"
)
parser = optparse.OptionParser(version="%prog "+VERSION)
parser.description = description
parser.add_option("-d", "--debug", action="store_true", default=False,
help="enable debug messages")
parser.add_option("", "--host", default="localhost", type=str,
help="hostname to start server on")
parser.add_option("", "--port", default=None, type=int,
help="port for server to listen on, by default " +
"will try ports in %d..%d range" % (DEFAULT_PORT_RANGE))
parser.add_option("-v", "--vendor", default=None, type=int,
help="vendor id of ftdi device to interface to")
parser.add_option("-p", "--product", default=None, type=int,
help="USB product id of ftdi device to interface with")
parser.add_option("-s", "--serialname", default=None, type=str,
help="device serialname stored in eeprom")
parser.add_option("-c", "--config", default=None, type=str, action="append",
help="system config files (XML) to read")
parser.add_option("-b", "--board", default="", type=str, action="store",
help="include config file (XML) for given board")
parser.add_option("--noautoconfig", action="store_true", default=False,
help="Disable automatic determination of config files")
parser.add_option("-i", "--interfaces", type=str, default='',
help="ordered space-delimited list of interfaces. " +
"Valid choices are gpio|i2c|uart|gpiouart|dummy")
parser.set_usage(parser.get_usage() + examples)
return parser.parse_args()
def usb_get_iserial(device):
"""Get USB device's iSerial string
Args:
device: usb.Device object
Returns:
iserial: USB devices iSerial string
"""
device_handle = device.open()
iserial = None
try:
iserial = device_handle.getString(device.iSerialNumber, MAX_ISERIAL_STR)
except usb.USBError, e:
# TODO(tbroch) other non-FTDI devices on my host cause following msg
# usb.USBError: error sending control message: Broken pipe
# Need to investigate further
pass
except Exception, e:
raise ServodError("failed to retrieve USB serialname")
return iserial
def usb_find(vendor, product, serialname):
"""Find USB devices based on vendor, product and serial identifiers.
Locates all USB devices that match the criteria of the arguments. In the
case where input arguments are 'None' that argument is a don't care
Args:
vendor: USB vendor id (integer)
product: USB product id (integer)
serial: USB serial id (string)
Returns:
matched_devices : list of pyusb devices matching input args
"""
matched_devices = []
for bus in usb.busses():
for device in bus.devices:
if (not vendor or device.idVendor == vendor) and \
(not product or device.idProduct == product) and \
(not serialname or usb_get_iserial(device) == serialname):
matched_devices.append(device)
return matched_devices
def discover_servo(logger, vendor, product, serialname):
"""Find unique servo USB device.
Args:
vendor: USB vendor id (integer)
product: USB product id (integer)
serial: USB serial id (string)
Returns:
devices: list of usb.Device objects that are servo board(s) or
empty list if none
"""
all_servos = []
for (vid, pid) in servo_interfaces.SERVO_ID_DEFAULTS:
if (vendor and vendor != vid) or \
(product and product != pid):
continue
all_servos.extend(usb_find(vid, pid, serialname))
return all_servos
def get_board_version(lot_id, product_id):
"""Get board version string.
Typically this will be a string of format <boardname>_<version>.
For example, servo_v2.
Args:
lot_id: string, identifying which lot device was fabbed from or None
product_id: integer, USB product id
Returns:
board_version: string, board & version or None if not found
"""
if lot_id:
for (board_version, lot_ids) in \
ftdi_common.SERVO_LOT_ID_DEFAULTS.iteritems():
if lot_id in lot_ids:
return board_version
for (board_version, vids) in \
ftdi_common.SERVO_PID_DEFAULTS.iteritems():
if product_id in vids:
return board_version
return None
def get_lot_id(logger, servo):
"""Get lot_id for a given servo.
Args:
servo: usb.Device object
Returns:
lot_id of the servo device.
"""
lot_id = None
iserial = usb_get_iserial(servo)
logger.debug('iserial = %s', iserial)
if not iserial:
logger.warn("Servo device has no iserial value")
else:
try:
(lot_id, _) = iserial.split('-')
except ValueError:
logger.warn("Servo device's iserial was unrecognized.")
return lot_id
def get_auto_configs(logger, board_version):
"""Get xml configs that should be loaded.
Args:
board_version: string, board & version
Returns:
configs: list of XML config files that should be loaded
"""
if board_version not in ftdi_common.SERVO_CONFIG_DEFAULTS:
logger.warning('Unable to determine configs to load for board version = %s',
board_version)
return []
return ftdi_common.SERVO_CONFIG_DEFAULTS[board_version]
def main_function():
(options, args) = _parse_args()
loglevel = logging.INFO
format="%(asctime)s - %(name)s - %(levelname)s"
if options.debug:
loglevel = logging.DEBUG
format += " - %(filename)s:%(lineno)d:%(funcName)s"
format += " - %(message)s"
logging.basicConfig(level=loglevel, format=format)
logger = logging.getLogger(os.path.basename(sys.argv[0]))
logger.info("Start")
servo_like_devices = discover_servo(logger, options.vendor, options.product,
options.serialname)
for device in servo_like_devices:
logger.info("Found servo, vid: 0x%04x pid: 0x%04x sid: %s", device.idVendor,
device.idProduct, usb_get_iserial(device))
if not servo_like_devices:
logger.error("No servos found")
if len(servo_like_devices) != 1:
logger.error("Use --vendor, --product or --serialname switches to "
"identify servo uniquely")
sys.exit(-1)
servo_device = servo_like_devices[0]
lot_id = get_lot_id(logger, servo_device)
board_version = get_board_version(lot_id, servo_device.idProduct)
logger.debug('board_version = %s', board_version)
all_configs = []
if not options.noautoconfig:
all_configs += get_auto_configs(logger, board_version)
if options.config:
for config in options.config:
# quietly ignore duplicate configs for backwards compatibility
if config not in all_configs:
all_configs.append(config)
if not all_configs:
raise ServodError("No automatic config found,"
" and no config specified with -c <file>")
scfg = system_config.SystemConfig()
options.board = dut.get_board_name(options.board)
if options.board:
board_config = "servo_" + options.board + "_overlay.xml"
if not scfg.find_cfg_file(board_config):
logger.error("No XML overlay for board %s", options.board)
sys.exit(-1)
logger.info("Found XML overlay for board %s", options.board)
all_configs.append(board_config)
for cfg_file in all_configs:
scfg.add_cfg_file(cfg_file)
logger.debug("\n" + scfg.display_config())
logger.debug("Servo is vid:0x%04x pid:0x%04x sid:%s" % \
(servo_device.idVendor, servo_device.idProduct,
usb_get_iserial(servo_device)))
if options.port:
start_port = options.port
end_port = options.port
else:
end_port, start_port = DEFAULT_PORT_RANGE
for servo_port in xrange(start_port, end_port - 1, -1):
try:
server = SimpleXMLRPCServer.SimpleXMLRPCServer(
(options.host, servo_port), logRequests=False)
break
except socket.error as e:
if e.errno == errno.EADDRINUSE:
continue # Port taken, see if there is another one next to it.
logger.fatal("Problem opening Server's socket: %s", e)
sys.exit(-1)
else:
if options.port:
err_msg = ("Port %d is busy" % options.port)
else:
err_msg = ("Could not find a free port in %d..%d range" % (
end_port, start_port))
logger.fatal(err_msg)
sys.exit(-1)
servod = servo_server.Servod(scfg, vendor=servo_device.idVendor,
product=servo_device.idProduct,
serialname=usb_get_iserial(servo_device),
interfaces=options.interfaces.split(),
board=options.board,
version=board_version)
servod.hwinit(verbose=True)
server.register_introspection_functions()
server.register_multicall_functions()
server.register_instance(servod)
logger.info("Listening on %s port %s" % (options.host, servo_port))
server.serve_forever()
def main():
"""Main function wrapper to catch exceptions properly"""
try:
main_function()
except KeyboardInterrupt:
sys.exit(0)
except ServodError as e:
print "Error: ", e.message
sys.exit(1)
if __name__ == '__main__':
main()