blob: 8985ba1e53ef622114fad08170ad46649f35a23d [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import ast
import fdpexpect
import os
import pexpect
import hw_driver
DEFAULT_UART_TIMEOUT = 3 # 3 seconds is plenty even for slow platforms
class ptyError(Exception):
"""Exception class for ec."""
UART_PARAMS = {'uart_cmd': None,
'uart_regexp': None,
'uart_timeout': DEFAULT_UART_TIMEOUT
}
class ptyDriver(hw_driver.HwDriver):
"""."""
def __init__(self, interface, params):
"""."""
super(ptyDriver, self).__init__(interface, params)
self._child = None
self._fd = None
self._pty_path = self._interface.get_pty()
self._dict = UART_PARAMS
def _open(self):
"""Connect to serial device and create pexpect interface."""
self._fd = os.open(self._pty_path, os.O_RDWR | os.O_NONBLOCK)
self._child = fdpexpect.fdspawn(self._fd)
def _close(self):
"""Close serial device connection."""
os.close(self._fd)
self._fd = None
self._child = None
def _flush(self):
"""Flush device output to prevent previous messages interfering."""
self._child.sendline("")
while True:
try:
self._child.expect(".", timeout=0.01)
except pexpect.TIMEOUT:
break
def _send(self, cmd):
"""Send command to EC.
This function always flushes serial device before sending, and is used as
a wrapper function to make sure the channel is always flushed before
sending commands.
Args:
cmd: The command string to send to the device.
Raises:
ptyError: Raised when writing to the device fails.
"""
self._flush()
if self._child.sendline(cmd) != len(cmd) + 1:
raise ptyError("Failed to send command.")
def _issue_cmd(self, cmd):
"""Send command to the device and do not wait for response.
Args:
cmd: The command string to send to EC.
"""
self._issue_cmd_get_results(cmd, [])
def _issue_cmd_get_results(self, cmd,
regex_list, timeout=DEFAULT_UART_TIMEOUT):
"""Send command to the device and wait for response.
This function waits for response message matching a regular
expressions.
Args:
cmd: The command issued.
regex_list: List of Regular expressions used to match response message.
Note1, list must be ordered.
Note2, empty list sends and returns.
Returns:
List of tuples, each of which contains the entire matched string and
all the subgroups of the match. None if not matched.
For example:
response of the given command:
High temp: 37.2
Low temp: 36.4
regex_list:
['High temp: (\d+)\.(\d+)', 'Low temp: (\d+)\.(\d+)']
returns:
[('High temp: 37.2', '37', '2'), ('Low temp: 36.4', '36', '4')]
Raises:
ptyError: If timed out waiting for a response
"""
result_list = []
self._open()
try:
self._send(cmd)
self._logger.debug("Sent cmd: %s" % cmd)
for regex in regex_list:
self._child.expect(regex, timeout)
match = self._child.match
lastindex = match.lastindex if match and match.lastindex else 0
# Create a tuple which contains the entire matched string and all
# the subgroups of the match.
result = match.group(*range(lastindex + 1)) if match else None
result_list.append(result)
self._logger.debug("Result: %s" % str(result))
except pexpect.TIMEOUT:
self._logger.debug("Before: ^%s^" % self._child.before)
self._logger.debug("After: ^%s^" % self._child.after)
raise ptyError("Timeout waiting for response.")
finally:
self._close()
return result_list
def _issue_cmd_get_multi_results(self, cmd, regex):
"""Send command to the device and wait for multiple response.
This function waits for arbitary number of response message
matching a regular expression.
Args:
cmd: The command issued.
regex: Regular expression used to match response message.
Returns:
List of tuples, each of which contains the entire matched string and
all the subgroups of the match. None if not matched.
"""
result_list = []
self._open()
try:
self._send(cmd)
self._logger.debug("Sending cmd: %s" % cmd)
while True:
try:
self._child.expect(regex, timeout=0.1)
match = self._child.match
lastindex = match.lastindex if match and match.lastindex else 0
# Create a tuple which contains the entire matched string and all
# the subgroups of the match.
result = match.group(*range(lastindex + 1)) if match else None
result_list.append(result)
self._logger.debug("Got result: %s" % str(result))
except pexpect.TIMEOUT:
break
finally:
self._close()
return result_list
def _Set_uart_timeout(self, timeout):
"""Set timeout value for waiting for the device response.
Args:
timeout: Timeout value in second.
"""
self._dict['uart_timeout'] = timeout
def _Get_uart_timeout(self):
"""Get timeout value for waiting for the device response.
Returns:
Timeout value in second.
"""
return self._dict['uart_timeout']
def _Set_uart_regexp(self, regexp):
"""Set the list of regular expressions which matches the command response.
Args:
regexp: A string which contains a list of regular expressions.
"""
if not isinstance(regexp, str):
raise ecError('The argument regexp should be a string.')
self._dict['uart_regexp'] = ast.literal_eval(regexp)
def _Get_uart_regexp(self):
"""Get the list of regular expressions which matches the command response.
Returns:
A string which contains a list of regular expressions.
"""
return str(self._dict['uart_regexp'])
def _Set_uart_cmd(self, cmd):
"""Set the UART command and send it to the device.
If ec_uart_regexp is 'None', the command is just sent and it doesn't care
about its response.
If ec_uart_regexp is not 'None', the command is send and its response,
which matches the regular expression of ec_uart_regexp, will be kept.
Use its getter to obtain this result. If no match after ec_uart_timeout
seconds, a timeout error will be raised.
Args:
cmd: A string of UART command.
"""
if self._dict['uart_regexp']:
self._dict['uart_cmd'] = self._issue_cmd_get_results(
cmd,
self._dict['uart_regexp'],
self._dict['uart_timeout'])
else:
self._dict['uart_cmd'] = None
self._issue_cmd(cmd)
def _Get_uart_cmd(self):
"""Get the result of the latest UART command.
Returns:
A string which contains a list of tuples, each of which contains the
entire matched string and all the subgroups of the match. 'None' if
the ec_uart_regexp is 'None'.
"""
return str(self._dict['uart_cmd'])