blob: 870b7743fa6753346349d667e540b7a41c287be2 [file] [log] [blame]
#!/usr/bin/env python2
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Client to test polld."""
import logging
import optparse
import sys
import xmlrpclib
import poll_common
__all__ = ['PollClientError', 'PollClient']
class PollClientError(Exception):
"""Exception class for poll_client."""
pass
class PollClient(object):
"""Class to interface with polld via XMLRPC."""
def __init__(self, host, tcp_port, verbose=False):
"""Constructor.
Args:
host: Name or IP address of servo server host.
tcp_port: TCP port on which servod is listening on.
verbose: Enables verbose messaging across xmlrpclib.ServerProxy.
"""
remote = 'http://%s:%s' % (host, tcp_port)
# TODO(jchuang): Keep alive in transport layer.
self._server = xmlrpclib.ServerProxy(remote, verbose=verbose)
def poll_gpio(self, gpio_port, edge):
"""Long-polls a GPIO port.
Args:
gpio_port: GPIO port
edge: value in GPIO_EDGE_LIST[]
Raises:
PollClientError: If error occurs when polling the GPIO port.
"""
try:
self._server.poll_gpio(gpio_port, edge)
except Exception as e:
raise PollClientError('Problem to poll GPIO %s %s %s' %
(str(gpio_port), edge, e))
def parse_args():
"""Parses commandline arguments.
Returns:
tuple (options, args) from optparse.parse_args().
"""
usage = (
'usage: %prog [options] <io_1:port_1> <io_2:port_2> ...\n'
'\t- io_<n> is either gpio_falling, gpio_rising, or gpio_both.\n'
'\t- port_<n> is the GPIO port.\n'
)
description = (
'%prog is command-line tool to test polld. '
)
examples = (
'\nExamples:\n'
' > %prog gpio_falling:7\n'
'\tLong polls GPIO port 7 when edge falling is triggered.\n'
)
parser = optparse.OptionParser(usage=usage)
parser.description = description
parser.add_option('-d', '--debug', action='store_true', default=False,
help='enable debug messages')
parser.add_option('', '--host', default=poll_common.DEFAULT_HOST,
type=str, help='hostname of server')
parser.add_option('', '--port', default=poll_common.DEFAULT_PORT,
type=int, help='port that server is listening on')
parser.add_option('-r', '--repeat', type=int,
help='repeat requested command multiple times', default=1)
parser.set_usage(parser.get_usage() + examples)
return parser.parse_args()
def iterate(io_ports, options, pclient):
"""Perform iterations on various IO ports.
Args:
io_ports: list of IO ports
options: optparse object options
pclient: PollClient object
"""
logger = logging.getLogger()
for _ in xrange(options.repeat):
for io_port in io_ports:
io, port = io_port.split(':')
if io not in poll_common.GPIO_EDGE_LIST:
raise PollClientError('invalid I/O %s' % io)
if not port.isdigit():
raise PollClientError('invalid port %s' % port)
pclient.poll_gpio(int(port), io)
logger.info('poll_gpio %s succeeds', io_port)
def real_main():
(options, io_ports) = parse_args()
if options.debug:
loglevel = logging.DEBUG
else:
loglevel = logging.INFO
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(level=loglevel, format=format)
pclient = PollClient(host=options.host, tcp_port=options.port,
verbose=options.debug)
iterate(io_ports, options, pclient)
def main():
try:
real_main()
except KeyboardInterrupt:
sys.exit(0)
except PollClientError as e:
sys.stderr.write(e.message + '\n')
sys.exit(1)
if __name__ == '__main__':
main()