blob: fc8738a4cc794eeb006769d03ae175fb2b23cb89 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2011 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Client to control DUT hardware connected to servo debug board."""
import collections
import logging
from socket import error as SocketError
import sys
import time
import numpy
from servo import client
from servo import servo_parsing
class ControlError(Exception):
pass
# used to aid sorting of dict keys
KEY_PREFIX = "__"
STATS_PREFIX = "@@"
GNUPLOT_PREFIX = "##"
# dict key for tracking sampling time
TIME_KEY = KEY_PREFIX + "sample_msecs"
def _build_parser():
"""Build command line parser for dut_control.
Returns:
ServodClientParser with dut_control arguments
"""
description = (
"dut-control allows users to set and get various controls on a DUT system "
"via the servo debug & control board. This client communicates to the "
"board via a socket connection to the servo server."
)
examples = [
("--get-all", "gets value for all controls"),
("--verbose", "gets value for all controls verbosely"),
(
"i2c_mux",
"gets value for i2c_mux control. If the exact control"
" name is not found, a list of similar controls is printed",
),
("-r 100 i2c_mux", "gets value for i2c_mux control 100 times"),
("-t 2 loc_0x40_mv", "gets value for loc_0x40_mv control for 2 seconds"),
(
"-y -t 2 loc_0x40_mv",
"gets value for loc_0x40_mv control for 2"
" seconds and prepends time in seconds to results",
),
(
"-g -y -t 2 loc_0x40_mv loc_0x41_mv",
"gets value for "
"loc_0x4[0|1]_mv control for 2 seconds with gnuplot style",
),
(
"-z 100 -t 2 loc_0x40_mv",
"gets value for loc_0x40_mv control for 2 seconds sampling every 100ms",
),
("--verbose i2c_mux", "gets value for i2c_mux control verbosely"),
("i2c_mux:remote_adcs", "sets i2c_mux to value remote_adcs"),
]
parser = servo_parsing.ServodClientParser(
description=description, examples=examples
)
# Add double-dashes in help message to unify docker and standalone versions
parser.prog = parser.prog + " --"
info_g = parser.add_mutually_exclusive_group()
info_g.add_argument(
"-i",
"--info",
help="show info about controls",
action="store_true",
default=False,
)
info_g.add_argument(
"--hwinit",
help="Initialize controls to their POR/safe state",
action="store_true",
default=False,
)
info_g.add_argument(
"-o",
"--value_only",
help="show the value only",
action="store_true",
default=False,
)
info_g.add_argument(
"--get-all",
action="store_true",
default=False,
help="get all servod 'get' controls. Runs before any "
"passed in args. WARNING: super slow, and alters state.",
)
print_g = parser.add_mutually_exclusive_group()
print_g.add_argument(
"-g",
"--gnuplot",
help="gnuplot style to stdout. Implies print_time",
action="store_true",
default=False,
)
print_g.add_argument(
"--verbose",
help="show verbose info about controls",
action="store_true",
default=False,
)
parser.add_argument(
"-r",
"--repeat",
type=int,
help="repeat requested command multiple times",
default=1,
)
parser.add_argument(
"-t",
"--time_in_secs",
help="repeat requested command for this many seconds",
type=float,
default=0.0,
)
parser.add_argument(
"-y",
"--print_time",
help="print time in seconds with queries to stdout",
action="store_true",
default=False,
)
parser.add_argument(
"-z",
"--sleep_msecs",
type=float,
default=0.0,
help="sleep for this many milliseconds between queries",
)
return parser
def display_table(table, prefix):
"""Display a two-dimensional array ( list-of-lists ) as a table.
The table will be spaced out.
>>> table = [['aaa', 'bbb'], ['1', '2222']]
>>> display_table(table)
@@ aaa bbb
@@ 1 2222
>>> display_table(table, prefix='%')
% aaa bbb
% 1 2222
>>> table = [['a']]
>>> display_table(table)
@@ a
>>> table = []
>>> display_table(table)
>>> table = [[]]
>>> display_table(table)
>>> table = [['a'], ['1', '2']]
>>> display_table(table)
Traceback (most recent call last):
...
IndexError: list index out of range
>>> table = [['a', 'b'], ['1']]
>>> display_table(table)
Traceback (most recent call last):
...
IndexError: list index out of range
>>> table = [['aaa', 'bbb', 'c'], ['1', '2222', '0']]
>>> display_table(table)
@@ aaa bbb c
@@ 1 2222 0
Args:
table: A two-dimensional array (list of lists) to show.
prefix: All lines will be prefixed with this and a space.
"""
if not table or not table[0]:
return
max_col_width = []
for col_idx in range(len(table[0])):
col_item_widths = [len(row[col_idx]) for row in table]
max_col_width.append(max(col_item_widths))
for row in table:
out_str = ""
for i in range(len(row)):
out_str += row[i].rjust(max_col_width[i] + 2)
print(prefix, out_str)
def display_stats(stats, prefix=STATS_PREFIX):
"""Display various statistics for data captured in a table.
>>> stats = {}
>>> stats[TIME_KEY] = [50.0, 25.0, 40.0, 10.0]
>>> stats['frobnicate'] = [11.5, 9.0]
>>> stats['foobar'] = [11111.0, 22222.0]
>>> display_stats(stats)
@@ NAME COUNT AVERAGE STDDEV MAX MIN
@@ sample_msecs 4 31.25 15.16 50.00 10.00
@@ foobar 2 16666.50 5555.50 22222.00 11111.00
@@ frobnicate 2 10.25 1.25 11.50 9.00
Args:
stats: A dictionary of stats to show. Key is name of result and value is a
list of floating point values to show stats for. See doctest.
Any key starting with '__' will be sorted first and have its prefix
stripped.
prefix: All lines will be prefixed with this and a space.
"""
table = [["NAME", "COUNT", "AVERAGE", "STDDEV", "MAX", "MIN"]]
for key in sorted(stats.keys()):
if stats[key]:
stats_np = numpy.array(stats[key])
disp_key = key.lstrip(KEY_PREFIX)
row = [disp_key, str(len(stats_np))]
row.append("%.4f" % stats_np.mean())
row.append("%.4f" % stats_np.std())
row.append("%.4f" % stats_np.max())
row.append("%.4f" % stats_np.min())
table.append(row)
display_table(table, prefix)
def _print_gnuplot_header(control_args):
"""Prints gnuplot header.
Args:
control_args: list of controls to get or set
Note, calls sys.exit()
"""
hdr = []
# Don't put setting of controls into gnuplot output
hdr.extend(arg for arg in control_args if ":" not in arg)
if not hdr:
logging.critical(
"Can't use --gnuplot without supplying controls to read on command line"
)
sys.exit(-1)
print(GNUPLOT_PREFIX + " seconds " + " seconds ".join(hdr))
def _pretty_print_result(result):
if isinstance(result, list):
return ", ".join(result)
if isinstance(result, dict):
return "\n".join(["%s: %s" % (k, v) for k, v in result.items()])
return result
def do_iteration(requests, options, sclient, stats):
"""Perform one iteration across the controls.
Args:
requests: list of strings to make requests to servo about
Example = ['dev_mode', 'dev_mode:on', 'dev_mode']
options: optparse object options
sclient: ServoRequest object
stats: dict of key=control name, value=control value for stats calcs
Returns:
out_str: results string from iteration based on formats in options
"""
results = []
out_list = []
time_str = ""
sample_start = time.time()
if options.info:
for request_str in requests:
control = request_str
if ":" in request_str:
logging.warning(
"Ignoring %s, can't perform set with --info", request_str
)
continue
results.append(sclient.doc(control))
else:
results = sclient.set_get_all(requests)
if options.print_time:
time_str = "%.4f " % (time.time() - _start_time)
for i, result in enumerate(results):
control = requests[i]
if options.info:
request_type = "doc"
elif ":" in control:
request_type = "set"
else:
request_type = "get"
try:
stats[control].append(float(result))
except ValueError:
pass
except TypeError:
pass
result = _pretty_print_result(result)
if options.verbose:
out_list.append(
"%s%s %s -> %s" % (time_str, request_type.upper(), control, result)
)
elif request_type != "set":
if options.gnuplot:
out_list.append("%s%s" % (time_str, result))
else:
if options.value_only:
out_list.append(str(result))
else:
out_list.append("%s%s:%s" % (time_str, control, result))
# format of gnuplot is <seconds_val1> <val1> <seconds_val2> <val2> ... such
# that plotting can then be done with time on x-axis, value on y-axis. For
# example, this
# command would plot two values across time
# plot "file.out" using 1:2 with linespoint
# replot "file.out" using 3:4 with linespoint
if options.gnuplot:
out_str = " ".join(out_list)
else:
out_str = "\n".join(out_list)
iter_time_msecs = (time.time() - sample_start) * 1000
stats[TIME_KEY].append(iter_time_msecs)
if options.sleep_msecs:
if iter_time_msecs < options.sleep_msecs:
time.sleep((options.sleep_msecs - iter_time_msecs) / 1000)
return out_str
def iterate(controls, options, sclient):
"""Perform iterations on various controls.
Args:
controls: list of controls to iterate over
options: optparse object options
sclient: ServoRequest object
"""
if options.gnuplot:
options.print_time = True
_print_gnuplot_header(controls)
stats = collections.defaultdict(list)
repeat = options.repeat
time_in_secs = options.time_in_secs
while (repeat >= 1) or (time_in_secs > 0):
start_time = time.time()
iter_output = do_iteration(controls, options, sclient, stats)
if iter_output: # Avoid printing empty lines
print(iter_output)
secs_so_far = time.time() - start_time
repeat = repeat - 1
time_in_secs = time_in_secs - secs_so_far
if (options.repeat != 1) or (options.time_in_secs > 0):
prefix = STATS_PREFIX
if options.gnuplot:
prefix = GNUPLOT_PREFIX
display_stats(stats, prefix=prefix)
def real_main(cmdline):
"""actual main method logic."""
parser = _build_parser()
options, args = parser.parse_known_args(cmdline)
loglevel = logging.INFO
if options.debug:
loglevel = logging.DEBUG
logging.basicConfig(
level=loglevel, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
sclient = client.ServoClient(
host=options.host, port=options.port, verbose=options.verbose
)
global _start_time
_start_time = time.time()
# Perform 1st in order to allow user to then override below
if options.hwinit:
sclient.hwinit()
if options.get_all:
print(sclient.get_all())
if not args:
if options.info:
# print all the doc info for the controls
print(sclient.doc_all())
elif not (options.hwinit or options.get_all):
# Just print the help message and exit. The condition checks for the
# permissible options that can run without arguments, and where a help
# message is not required.
parser.print_help()
else:
iterate(args, options, sclient)
# pylint: disable=dangerous-default-value
# Ability to pass an arbitrary or artificial cmdline for testing is desirable.
def main(cmdline=sys.argv[1:]):
"""main method exception wrapper."""
try:
if len(cmdline) > 0 and cmdline[0] == "--":
cmdline = cmdline[1:]
real_main(cmdline)
except KeyboardInterrupt:
sys.exit(0)
except (client.ServoClientError, ControlError) as e:
sys.stderr.write(e.message + "\n")
sys.exit(1)
except SocketError as e:
sys.stderr.write(e.strerror + "\n")
sys.exit(1)
# global start time for script
_start_time = 0
if __name__ == "__main__":
main()