blob: 2a11cf72b3778770870c0b87364da3cd224722a3 [file] [log] [blame]
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Allow creation of uart interface via libftdiuart library for FTDI devices."""
import ctypes
import logging
import errno
import os
import sys
import termios
import threading
import time
import tty
import ftdi_utils
import ftdi_common
# TODO(tbroch) need some way to xref these to values in ftdiuart.h
FUART_NAME_SIZE = 128
FUART_BUF_SIZE = 128
FUART_USEC_SLEEP = 1000
FUART_MAX_BUFFER_SIZE = 20000 # Do not keep more than this number of bytes
# when capturing.
class FuartError(Exception):
"""Class for exceptions of Fuart."""
def __init__(self, msg, value=0):
"""FuartError constructor.
Args:
msg: string, message describing error in detail
value: integer, value of error when non-zero status returned. Default=0
"""
super(FuartError, self).__init__(msg, value)
self.msg = msg
self.value = value
class UartCfg(ctypes.Structure):
"""Defines uart configuration values.
These values are supplied to the libftdi API:
ftdi_set_line_property
ftdi_set_baudrate
Declared in ftdi_common.h and named uart_cfg.
"""
_fields_ = [("baudrate", ctypes.c_uint),
("bits", ctypes.c_uint),
("parity", ctypes.c_uint),
("sbits", ctypes.c_uint)]
class FuartContext(ctypes.Structure):
"""Defines primary context structure for libftdiuart.
Declared in ftdiuart.h and named fuart_context
"""
_fields_ = [("fc", ctypes.POINTER(ftdi_common.FtdiContext)),
("gpio", ftdi_common.Gpio),
("name", ctypes.c_char * FUART_NAME_SIZE),
("cfg", UartCfg),
("is_open", ctypes.c_int),
("usecs_to_sleep", ctypes.c_int),
("fd", ctypes.c_int),
("buf", ctypes.c_ubyte * FUART_BUF_SIZE),
("error", ctypes.c_int),
("lock", ctypes.POINTER(ctypes.c_int))]
class Fuart(object):
"""Provide interface to libftdiuart c-library via python ctypes module."""
def __init__(self, vendor=ftdi_common.DEFAULT_VID,
product=ftdi_common.DEFAULT_PID, interface=3,
serialname=None,
ftdi_context=None):
"""Fuart contstructor.
Loads libraries for libftdi, libftdiuart. Creates instance objects
(Structures), FuartContext, FtdiContext and Gpio to iteract with the library
and intializes them.
Args:
vendor: usb vendor id of FTDI device
product: usb product id of FTDI device
interface: interface number of FTDI device to use
serialname: string of device serialname/number as defined in FTDI eeprom.
ftdi_context: ftdi context created previously or None if one should be
allocated here. This shared context functionality is seen in miniservo
which has a uart + 4 gpios
Raises:
FuartError: If either ftdi or fuart inits fail
"""
self._logger = logging.getLogger("Fuart")
self._logger.debug("")
(self._flib, self._lib) = ftdi_utils.load_libs("ftdi", "ftdiuart")
self._fargs = ftdi_common.FtdiCommonArgs(vendor_id=vendor,
product_id=product,
interface=interface,
serialname=serialname,
speed=115200,
bits=8, # BITS_8 in ftdi.h
partity=0, # NONE in ftdi.h
sbits=0 # STOP_BIT_1 in ftdi.h
)
self._is_closed = True
self._fuartc = FuartContext()
self._capture_active = 0
self._capture_buffer = []
self._capture_thread = None
self._capture_lock = threading.Lock()
# Remember parent thread to be able to find out if it is still running.
self._parent_thread = threading.current_thread()
if ftdi_context:
self._fc = ftdi_context
else:
self._fc = ftdi_common.FtdiContext()
err = self._flib.ftdi_init(ctypes.byref(self._fc))
if err:
raise FuartError("doing ftdi_init", err)
err = self._lib.fuart_init(ctypes.byref(self._fuartc),
ctypes.byref(self._fc))
if err:
raise FuartError("doing fuart_init", err)
def __del__(self):
"""Fuart destructor."""
self._logger.debug("")
if not self._is_closed:
self.close()
def _capture_function(self):
"""Capture uart output and store it in the buffer.
This function runs on a separate thread.
Open the uart interface in non-blocking mode and start polling it for
data, adding data to _capture_buffer until it overflows (in which case add
an overflow indication and start dropping data on the floor).
Adding stuff to _capture_buffer is protected by _capture_lock. The client
is supposed to be reading data, each read attempt copying _capture_buffer
contents to the client and emptying the buffer.
Finish when the client requests to stop capturing or when the parent
thread terminates for whatever reason.
Raises:
OSError in case attempt to read fails for an unexpected reason.
"""
self._logger.debug('starting capture')
uart_fd = os.open(self._fuartc.name,
os.O_RDONLY | os.O_NONBLOCK | os.O_NOCTTY)
saved_conf = termios.tcgetattr(uart_fd)
tty.setraw(uart_fd)
buffer_overflow = False
while self._capture_active and self._parent_thread.is_alive():
try:
data = os.read(uart_fd, 100)
except OSError, e:
if e.errno == errno.EWOULDBLOCK: # Data unavailable
time.sleep(.1)
continue
os.close(uart_fd)
raise
self._capture_lock.acquire()
if len(self._capture_buffer) < FUART_MAX_BUFFER_SIZE:
buffer_overflow = False
self._capture_buffer.append(data)
else:
if not buffer_overflow:
self._capture_buffer.append(
'\n\n........capture buffer overflow........\n\n')
buffer_overflow = True
self._capture_lock.release()
termios.tcsetattr(uart_fd, termios.TCSANOW, saved_conf)
os.close(uart_fd)
self._logger.debug('quitting capture')
def open(self):
"""Opens access to FTDI uart interface.
Raises:
FuartError: If open fails
"""
self._logger.debug("")
err = self._lib.fuart_open(ctypes.byref(self._fuartc),
ctypes.byref(self._fargs))
if err:
raise FuartError("doing fuart_open", err)
self._is_closed = False
def close(self):
"""Closes connection to FTDI uart interface.
Raises:
FuartError: If close fails
"""
self._logger.debug("")
err = self._lib.fuart_close(ctypes.byref(self._fuartc))
if err:
raise FuartError("doing fuart_close", err)
self._is_closed = True
def run(self):
"""Creates a pthread to poll FTDI & PTY for data.
Raises:
FuartError: If thread creation fails
"""
self._logger.debug("")
if self._is_closed:
self.open()
err = self._lib.fuart_run(ctypes.byref(self._fuartc), FUART_USEC_SLEEP)
if err:
raise FuartError("Failure with fuart_run", err)
def get_uart_props(self):
"""Get the uart's properties.
Returns:
dict where:
baudrate: integer of uarts baudrate
bits: integer, number of bits of data Can be 5|6|7|8 inclusive
parity: integer, parity of 0-2 inclusive where:
0: no parity
1: odd parity
2: even parity
sbits: integer, number of stop bits. Can be 0|1|2 inclusive where:
0: 1 stop bit
1: 1.5 stop bits
2: 2 stop bits
"""
self._logger.debug("")
return {'baudrate': self._fuartc.cfg.baudrate,
'bits': self._fuartc.cfg.bits,
'parity': self._fuartc.cfg.parity,
'sbits': self._fuartc.cfg.sbits}
def set_uart_props(self, line_props):
"""Set the uart's properties.
Args:
line_props: dict where:
baudrate: integer of uarts baudrate
bits: integer, number of bits of data ( prior to stop bit)
parity: integer, parity of 0-2 inclusive where
0: no parity
1: odd parity
2: even parity
sbits: integer, number of stop bits. Can be 0|1|2 inclusive where:
0: 1 stop bit
1: 1.5 stop bits
2: 2 stop bits
Raises:
FuartError: If failed to set line properties
"""
self._logger.debug("")
if line_props['bits'] not in (5,6,7,8):
raise FuartError("Data bits must be 5|6|7|8")
if line_props['sbits'] not in (0,1,2):
raise FuartError("Stop bits must be 0|1|2. Where 0==1bit," + \
"1==1.5bits 2==2bits")
if line_props['parity'] not in (0,1,2):
raise FuartError("Parity must be 0|1|2. Where 0==none, 1==odd, 2==even")
cfg = UartCfg()
cfg.baudrate = line_props['baudrate']
cfg.bits = line_props['bits']
cfg.parity = line_props['parity']
cfg.sbits = line_props['sbits']
if self._lib.fuart_stty(ctypes.byref(self._fuartc), ctypes.byref(cfg)):
raise FuartError("Failed to set line properties requested")
def get_pty(self):
"""Gets path to pty for communication to/from uart.
Returns:
String path to the pty connected to the uart
"""
self._logger.debug("")
return self._fuartc.name
def get_capture_active(self):
"""Return state of the 'capture_active' control for this interface.
Returns:
Current capture mode expressed as an integer (0 or 1)
"""
self._logger.debug("")
return self._capture_active
def set_capture_active(self, activate):
"""Enable/disable the capture mode on this interface.
Args:
activate: a Boolean, indicating whether capture should be activated or
deactivated
"""
self._logger.debug("")
if activate and not self._capture_active:
# Need to start capturing
self._capture_buffer = []
self._capture_thread = threading.Thread(target=self._capture_function)
self._capture_active = activate
self._capture_thread.start()
return
if not activate and self._capture_active:
# Need to stop capturing
self._capture_active = activate
self._capture_thread.join()
self._capture_thread = None
def get_stream(self):
"""Return UART stream accumulated since last time"""
self._logger.debug("")
self._capture_lock.acquire()
rv = repr(''.join(self._capture_buffer))
self._capture_buffer = []
self._capture_lock.release()
return rv
def test():
(options, args) = ftdi_utils.parse_common_args(interface=3)
format="%(asctime)s - %(name)s - %(levelname)s"
loglevel = logging.INFO
if options.debug:
loglevel = logging.DEBUG
format += " - %(filename)s:%(lineno)d:%(funcName)s"
format += " - %(message)s"
logging.basicConfig(level=loglevel, format=format)
logger = logging.getLogger(os.path.basename(sys.argv[0]))
logger.info("Start")
fobj = Fuart(options.vendor, options.product, options.interface)
fobj.run()
logging.info("%s" % fobj.get_pty())
# run() is a thread so just busy wait to mimic server
while True:
# ours sleeps to eleven!
time.sleep(11)
if __name__ == "__main__":
try:
test()
except KeyboardInterrupt:
sys.exit(0)