blob: 50a1a5183d64d2b3424bdbb9e016586208ad6de4 [file] [log] [blame]
#!/usr/bin/env python2
# Copyright 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Client to test serial server."""
from __future__ import print_function
import logging
import optparse
import sys
import xmlrpclib
import dolphin_server
__all__ = ['SerialClientError', 'SerialClient']
class SerialClientError(Exception):
"""Exception class for serial_client."""
pass
class SerialClient(object):
"""Class to interface with serial_server via XMLRPC.
For multiple serial connection established of serial server. serial_index is
used for indicating the corresponding n-th serial connection (zero based)
while requesting 'send' and 'receive' commands on the server side.
"""
def __init__(self, host, tcp_port, verbose=False):
"""Constructor.
Args:
host: Name or IP address of serial server host.
tcp_port: TCP port on which serial server is listening on.
verbose: Enables verbose messaging across xmlrpclib.ServerProxy.
"""
remote = 'http://%s:%s' % (host, tcp_port)
self._server = xmlrpclib.ServerProxy(remote, verbose=verbose)
def send(self, serial_index, command):
"""Sends a command through serial server.
Args:
serial_index: index of serial connections.
command: command to send.
Raises:
SerialClientError if error occurs.
"""
try:
self._server.send(serial_index, command)
except Exception as e:
raise SerialClientError(
'Problem to send(%d, %s): %s' % (serial_index, command, e))
def receive(self, serial_index, num_bytes):
"""Receives N byte data through serial server.
Args:
serial_index: index of serial connections.
num_bytes: number of bytes to receive. 0 means receiving what already in
the input buffer.
Returns:
Received N bytes.
Raises:
SerialClientError if error occurs.
"""
try:
recv = self._server.receive(serial_index, num_bytes)
logging.info('Receive data: %s', recv)
except Exception as e:
raise SerialClientError(
'Problem to receive(%d, %d): %s' % (serial_index, num_bytes, e))
def parse_args():
"""Parses commandline arguments.
Returns:
tuple (options, args) from optparse.parse_args().
"""
usage = (
'usage: %prog [options] <function> <serial_index> <arg1 arg2 ...> ...\n'
'\t- function is [send, receive].\n'
'\t- serial_index is serial connection index.\n'
'\t- arg<n> is the function arguments.\n'
)
description = (
'%prog is command-line tool to test serial server. '
)
examples = (
'\nExamples:\n'
' > %prog send 0 usbc_action usb\n'
'\tSend command \'usbc_action usb\' to serial connection index 0.\n'
)
parser = optparse.OptionParser(usage=usage)
parser.description = description
parser.add_option('-d', '--debug', action='store_true', default=False,
help='enable debug messages')
parser.add_option('', '--host', default=dolphin_server.DEFAULT_HOST,
type=str, help='hostname of server')
parser.add_option('', '--port', default=dolphin_server.DEFAULT_PORT,
type=int, help='port that server is listening on')
parser.set_usage(parser.get_usage() + examples)
return parser.parse_args()
def call_function(commands, options, sclient):
"""Parses function call to serial server.
Args:
commands: list of function commands, like:
Send command: ['send', serial_index, contents to send...]
Receive command: ['receive', serial_index, number of bytes to receive]
options: optparse object options.
sclient: SerialClient object.
"""
logger = logging.getLogger()
if not commands:
raise SerialClientError('No command is given')
function = commands.pop(0)
if not commands:
raise SerialClientError('No serial index is assigned')
serial_index = int(commands.pop(0))
if function == 'receive':
sclient.receive(serial_index, int(commands[0]))
elif function == 'send':
sclient.send(serial_index, ' '.join(commands))
else:
raise SerialClientError('Invalid function ' + function)
def real_main():
(options, commands) = parse_args()
if options.debug:
loglevel = logging.DEBUG
else:
loglevel = logging.INFO
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(level=loglevel, format=format)
sclient = SerialClient(host=options.host, tcp_port=options.port,
verbose=options.debug)
call_function(commands, options, sclient)
def main():
try:
real_main()
except KeyboardInterrupt:
sys.exit(0)
except SerialClientError as e:
sys.stderr.write(e.message + '\n')
sys.exit(1)
if __name__ == '__main__':
main()