blob: 1203251a31f0c1004232099a3e96de7e983077a1 [file] [log] [blame]
#!/usr/bin/python2 -u
#
# Copyright 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Utilities for serial port communication.
For some test cases, DUT needs to communicates with fixuture via USB-Serial
dungle. We provides FindTtyByDriver() to help finding the right
/dev/tty* path for the given driver; and OpenSerial() to open a serial port.
Provides an interface to communicate w/ a serial device: SerialDevice. See
class comment for details.
"""
from __future__ import print_function
import glob
import logging
import os
import re
# site-packages: dev-python/pyserial
import serial
import time
def OpenSerial(**params):
"""Tries to open a serial port.
Args:
params: a dict of parameters for a serial connection. Should contain
'port'. For other parameters, like 'baudrate', 'bytesize', 'parity',
'stopbits' and 'timeout', please refer pySerial documentation.
Returns:
serial object if successful.
Raises:
ValueError if params is invalid; otherwise, serial.SerialException.
"""
port = params.get('port')
if not port:
raise ValueError('Missing parameter "port".')
ser = serial.Serial(**params)
if not ser.isOpen():
raise serial.SerialException('Failed to open serial: %r' % port)
return ser
def FindTtyByDriver(driver_name, interface_protocol=None, multiple_ports=False):
"""Finds the tty terminal matched to the given driver_name and an optional
interface protocol.
Checks the interface protocol if specified. In some situations where there
may exist multiple ports with the same driver, use the interface_protocol
to distinguish between them. An example is Arduino DUE board with a
Programming Port and a Native USB Port.
In some cases there may be multiple ports with the same driver and with the
same interface_protocol, set multiple_ports to True and all matched paths
found will be returned in a list.
Args:
driver_name: driver name for the target TTY device.
interface_protocol: the interface protocol for the target TTY device.
multiple_ports: determines whether it returns all matched paths by list, or
just return the first found one.
Returns:
If multiple_ports is True, return /dev/tty path if driver_name is matched;
None if not found.
If multiple_ports is False, return a list of all matched /dev/tty path; An
empty list if not found.
"""
matched_candidates = []
for candidate in glob.glob('/dev/tty*'):
device_path = '/sys/class/tty/%s/device' % os.path.basename(candidate)
driver_path = os.path.realpath(os.path.join(device_path, 'driver'))
# Check if driver_name exist at the tail of driver_path.
if re.search(driver_name + '$', driver_path):
if (interface_protocol is None or
interface_protocol == DeviceInterfaceProtocol(device_path)):
if multiple_ports:
matched_candidates.append(candidate)
else:
return candidate
if multiple_ports:
return matched_candidates
else:
return None
def FindTtyByPortIndex(port_index, driver_name=None):
"""Finds serial port path tty* with given port index.
Port index is fixed as the layout of physical ports.
Example: if serial path is ttyUSB0 for port_index = 1-1, the system path
/sys/class/tty/ttyUSB0/device will be linked to
/sys/devices/pci0000..../..../usb1/1-1/...
Args:
port_index: String for serial connection port index.
driver_name: String for serial connection driver.
Returns:
matched /dev/tty path. Return None if no port has been detected.
"""
for candidate in glob.glob('/dev/tty*'):
device_path = '/sys/class/tty/%s/device' % os.path.basename(candidate)
driver_path = os.path.realpath(os.path.join(device_path, 'driver'))
# If driver_name is given, check if driver_name exists at the tail of
# driver_path.
if driver_name and not driver_path.endswith(driver_name):
continue
device_path = os.path.realpath(device_path)
# Check if port_index exists in device_path.
if '/%s/' % port_index in device_path:
logging.info('Find serial path : %s', candidate)
return candidate
return None
def DeviceInterfaceProtocol(device_path):
"""Extracts the interface protocol of the specified device path.
Args:
device_path: The tty device path in the sysfs.
Returns:
The interface protocol if found else ''
"""
interface_protocol_path = os.path.join(device_path, 'bInterfaceProtocol')
try:
with open(interface_protocol_path) as f:
return f.read().strip()
except IOError:
return ''
class SerialDevice(object):
"""Interface to communicate with a serial device.
Instead of giving a fixed port, it can look up port by driver name.
It has several handy methods, like SendRecv() and SendExpectRecv(),
which support fail retry.
Property:
log: True to enable logging.
Usage:
fixture = SerialDevice()
fixture.Connect(driver='pl2303')
# Send 'P' for ping fixture and expect an 'OK' response.
# Allow to retry twice.
fixture.SendExpectReceive('P', 'OK', retry=2)
# Send 'FID' for getting fixture ID. Return received result. No retry.
fixture_id = fixture.SendRecv('FID')
"""
def __init__(self, send_receive_interval_secs=0.2, retry_interval_secs=0.5,
log=False):
"""Constructor.
Sets intervals between send/receive and between retries.
Also, setting log to True to emit actions to logging.info.
Args:
send_receive_interval_secs: interval (seconds) between send-receive.
retry_interval_secs: interval (seconds) between retrying command.
log: True to enable logging.
"""
self._serial = None
self._port = ''
self.send_receive_interval_secs = send_receive_interval_secs
self.retry_interval_secs = retry_interval_secs
self.log = log
def __del__(self):
self.Disconnect()
def Connect(self, driver=None, port=None,
baudrate=9600, bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE,
timeout=0.5, writeTimeout=0.5):
"""Opens a serial connection by port or by device driver name.
Args:
driver: driver name of the target serial connection. Used to look up port
if port is not specified.
port, baudrate, bytesize, parity, stopbits, timeout, writeTimeout: See
serial.Serial().
Raises:
SerialException on errors.
"""
if driver and not port:
port = FindTtyByDriver(driver)
if not port:
raise serial.SerialException(
'Serial device with driver %r not found' % driver)
self._port = port
self._serial = OpenSerial(
port=port, baudrate=baudrate, bytesize=bytesize, parity=parity,
stopbits=stopbits, timeout=timeout, writeTimeout=writeTimeout)
if self.log:
logging.info('Serial port %r opened', port)
def Disconnect(self):
"""Closes the connection if it exists."""
if self._serial:
self._serial.close()
def SetTimeout(self, read_timeout, write_timeout):
"""Overrides read/write timeout.
Args:
read_timeout: read timeout.
write_timeout: write timeout.
"""
self._serial.setTimeout(read_timeout)
self._serial.setWriteTimeout(write_timeout)
def GetTimeout(self):
"""Returns (read timeout, write timeout)."""
return (self._serial.getTimeout(), self._serial.getWriteTimeout())
def Send(self, command):
"""Sends a command.
It blocks at most write_timeout seconds.
Args:
command: command to send.
Raises:
SerialTimeoutException if it is timeout and fails to send the command.
SerialException if it is disconnected during sending.
"""
try:
self._serial.write(command)
self._serial.flush()
if self.log:
logging.info('Successfully sent %r', command)
except serial.SerialTimeoutException:
error_message = 'Send %r timeout after %.2f seconds' % (
command, self._serial.getWriteTimeout())
if self.log:
logging.warning(error_message)
raise serial.SerialTimeoutException(error_message)
except serial.SerialException:
raise serial.SerialException('Serial disconnected')
def Receive(self, size=1):
"""Receives N bytes.
It blocks at most timeout seconds.
Args:
size: number of bytes to receive. 0 means receiving what already in the
input buffer.
Returns:
Received N bytes.
Raises:
SerialTimeoutException if it fails to receive N bytes.
"""
if size == 0:
size = self._serial.inWaiting()
response = self._serial.read(size)
if len(response) == size:
if self.log:
logging.info('Successfully received %r', response)
return response
else:
error_message = 'Receive %d bytes timeout after %.2f seconds' % (
size, self._serial.getTimeout())
if self.log:
logging.warning(error_message)
raise serial.SerialTimeoutException(error_message)
def FlushBuffer(self):
"""Flushes input/output buffer."""
self._serial.flushInput()
self._serial.flushOutput()
def SendReceive(self, command, size=1, retry=0, interval_secs=None,
suppress_log=False):
"""Sends a command and returns a N bytes response.
Args:
command: command to send
size: number of bytes to receive. 0 means receiving what already in the
input buffer.
retry: number of retry.
interval_secs: #seconds to wait between send and receive. If specified,
overrides self.send_receive_interval_secs.
suppress_log: True to disable log regardless of self.log value.
Returns:
Received N bytes.
Raises:
SerialTimeoutException if it fails to receive N bytes.
"""
for nth_run in range(retry + 1):
self.FlushBuffer()
try:
self.Send(command)
if interval_secs is None:
time.sleep(self.send_receive_interval_secs)
else:
time.sleep(interval_secs)
response = self.Receive(size)
if not suppress_log and self.log:
logging.info('Successfully sent %r and received %r', command,
response)
return response
except serial.SerialTimeoutException:
if nth_run < retry:
time.sleep(self.retry_interval_secs)
error_message = 'Timeout receiving %d bytes for command %r' % (size,
command)
if not suppress_log and self.log:
logging.warning(error_message)
raise serial.SerialTimeoutException(error_message)
def SendExpectReceive(self, command, expect_response, retry=0,
interval_secs=None):
"""Sends a command and expects to receive a response.
Args:
command: command to send
expect_response: expected response received
retry: number of retry.
interval_secs: #seconds to wait between send and receive. If specified,
overrides self.send_receive_interval_secs.
Returns:
True if command is sent and expected response received.
"""
try:
response = self.SendReceive(command, len(expect_response), retry=retry,
interval_secs=interval_secs,
suppress_log=True)
except serial.SerialTimeoutException:
if self.log:
logging.warning('SendReceive timeout for command %r', command)
return False
if self.log:
if response == expect_response:
logging.info('Successfully sent %r and received expected response %r',
command, expect_response)
else:
logging.warning('Sent %r but received %r (expected: %r)',
command, response, expect_response)
return response == expect_response