blob: 94d3c1774bb7fb76d00a5bdd0dd979619be77d54 [file] [log] [blame] [edit]
# Copyright 2021 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A simple driver to set/retrieve simple data from a cros ec interface."""
from servo.drv import ec
from servo.drv import hw_driver
# pylint: disable=invalid-name
# naming convention needed for servod driver query.
class simpleEc(ec.ec):
"""Object to access drv=simple_ec controls."""
# This drv requires the
# - the command to run |uart_cmd|
# - the regex to match against |regex|
# - the group within the regex to retrieve |group| or 0 to retrieve
# the entire match
REQUIRED_GET_PARAMS = ["uart_cmd", "regex", "group"]
# This drv for setting a control requires
# - the command template to run
REQUIRED_SET_PARAMS = ["uart_cmd"]
# The default regex to use for set. It simply checks whether the control
# finished and a new line is printed. This helps servod avoid returning before
# the control has actually finished executing on the ec console.
SET_RE_DEFAULT = ">"
def _drv_init(self):
"""Driver specific initializer."""
super(simpleEc, self)._drv_init()
self._uart_cmd = self._params["uart_cmd"]
if self._is_get():
# These controls are required for |get| but not for |set|.
self._regex = self._params["regex"]
self._group = int(self._params["group"])
else:
# guaranteed to be in 'set'.
self._regex = self._params.get("regex", self.SET_RE_DEFAULT)
self._group = int(self._params.get("group", "0"))
# User can optionally provide a |debug_info| string to log
# when this control fails.
self._debug_info = self._params.get("debug_info", "")
def _process_output(self, pre_result):
"""Helper to perform extra formatting out the output of |_Get_output|.
Formatting is defined in the param 'formatting' and comma separated. It
is performed in sequence. Supported formatting operations are.
- strip: strips white-space and new-lines as the end
- splitlines: split the output by lines and leave as list
- splitlines_str: split the output by lines and concat as str with
whitespace
- int: cast |pre_result| into int for each member, if splitlines was
previously used
Args:
pre_result: str, the output to format
Returns:
result, str, after processing from |pre_result|
"""
if "formatting" in self._params:
requests = self._params["formatting"].split(",")
for request in requests:
if request == "strip":
pre_result = pre_result.strip()
elif request == "splitlines":
pre_result = pre_result.splitlines()
elif request == "splitlines_str":
pre_result = " ".join(pre_result.splitlines())
elif request == "int":
if isinstance(pre_result, list):
pre_result = [int(m) for m in pre_result]
else:
pre_result = int(pre_result, 0)
elif request == "float":
pre_result = float(pre_result)
elif request == "bool":
pre_result = bool(pre_result)
elif request == "negative":
pre_result = pre_result * -1
elif request == "gpio":
pre_result = (
"\n" + pre_result.replace("gpioget", "").replace("\r", "")[:-1]
)
elif request == "map_open":
pre_result = 1 if pre_result == "open" else 0
else:
self._logger.debug(
"ec output formatting %r unknown. Ignoring.", request
)
return pre_result
def _error(self, cmd, regex, e=None):
"""Helper to raise errors in a standardized way.
Args:
cmd: command that was run
regex: regex that was used
e: error object, or string to provide extra information
Raises:
ec.ecError: always, with |errmsg|
"""
errmsg = "Failed to retrieve output for %r matching regex %r" % (cmd, regex)
if e:
self._logger.error(e)
if self._debug_info:
self._logger.error(self._debug_info)
raise ec.ecError("%s. %s" % (errmsg, e))
def _get_safe_output(self, cmd, regex):
"""Safely retrieve the output of |cmd| from the |self._interface|.
Args:
cmd: command to run
regex: regex to match.
Returns:
output of running |cmd| and matching with |regex| on |self._interface|
Raises:
ecError: if the output from the |self._uart_cmd| matched with the
|self._regex| is None
"""
retries = int(self._params["retries"]) if "retries" in self._params else 1
while retries > 0:
retries -= 1
try:
self._limit_channel()
results = self._issue_cmd_get_results(cmd, [regex])
break
except hw_driver.HwDriverError as e:
if retries <= 0:
# Any known error is coming through as a HwDriverError derivative.
self._error(cmd, regex, e)
self._logger.warning(
"Retry retrieving output for %r matching regex %r" % (cmd, regex)
)
finally:
self._restore_channel()
# |results| should always be a list of tuples.
# TODO(b/180764962) remove this
if not isinstance(results[0], tuple):
results[0] = (results[0],)
if results is None:
self._error(cmd, regex)
# Extract the requested group. This control does not support a list of regex
# but rather just expects one regex. Therefore we access the 1st element of
# the results (results[0]) always.
return results[0]
def _get(self):
"""Generic get from MCU console, using |self._params| for cmd and regex.
Runs |self._uart_cmd| on |self._interface| (has to be a uart interface)
and matches the output with |self._regex| before returning the result.
Returns:
result of |self._uart_cmd| after matching with |self._regex| and
processing
"""
results = self._get_safe_output(self._uart_cmd, self._regex)
# The desired output is inside the self._group member of |results|. However,
# given how python regex works, this can be None, and we cannot marshall
# None across the channel, so make sure to cast it into a string first.
result = results[self._group]
if result is None:
self._logger.debug(
"Requested result group returned None, casting into string."
)
result = str(result)
return self._process_output(result)
def _set(self, value):
"""Generic set method to set |self._uart_cmd| + str(|value|) to the console.
Note: the control to send to the |self._interface| console is created
by appending a space and the string cast of |value| to |self._uart_cmd|.
Args:
value: the value passed through by servod to set
"""
# NOTE: This mechanism is limited, but effective for many use-cases.
# Should the situation arise multiple times where a more complex control
# generation is required e.g. string formatting so that the value is
# in the middle of the string somewhere, please file a feature request bug.
full_cmd = "%s %s" % (self._uart_cmd, value)
self._logger.debug("About to issue %r", full_cmd)
# Ignore the return type as we only want to send the |cmd|
self._get_safe_output(full_cmd, self._regex)