blob: 9a9fa600eccab66a9950833f433dc15464dd1efd [file] [log] [blame]
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Common Functionality required by Servo Uart Interfaces."""
import errno
import logging
import os
import termios
import threading
import time
import tty
MAX_BUFFER_SIZE = 20000 # Do not keep more than this number of bytes
# when capturing.
class Uart(object):
"""Base Class for UART interface implementations.
Most the common functionality involves capturing the UART stream as this is
not implementation dependent.
Instance Variables:
_capture_active: boolean indicating if we are currently capturing.
_capture_buffer: buffer of values read off the UART port.
_capture_thread: thread currently polling and reading values off the UART
_capture_lock: lock used to protect the _capture_buffer.
def __init__(self):
self._logger = logging.getLogger('Uart')
self._capture_active = False
self._capture_buffer = []
self._capture_thread = None
self._capture_lock = threading.Lock()
self._capture_paused = False
# Remember parent thread to be able to find out if it is still running.
self._parent_thread = threading.current_thread()
def pause_capture(self):
"""Wait to capture data until it is resumed."""
self._capture_paused = True
def resume_capture(self):
"""Reenable capturing data from the uart."""
self._capture_paused = False
def _capture_function(self):
"""Captures uart output and store it in the buffer.
This function runs on a separate thread.
Open the uart interface in non-blocking mode and start polling it for
data, adding data to _capture_buffer until it overflows (in which case add
an overflow indication and start dropping data on the floor).
Adding stuff to _capture_buffer is protected by _capture_lock. The client
is supposed to be reading data, each read attempt copying _capture_buffer
contents to the client and emptying the buffer.
Finish when the client requests to stop capturing or when the parent
thread terminates for whatever reason.
OSError in case attempt to read fails for an unexpected reason.
self._logger.debug('starting capture')
uart_fd =, os.O_RDONLY | os.O_NONBLOCK | os.O_NOCTTY)
saved_conf = termios.tcgetattr(uart_fd)
buffer_overflow = False
capture_pause_count = 0
while self._capture_active and self._parent_thread.is_alive():
# The pty_driver may pause capture, so it may be searching the console
# output and wants to prevent capture from consuming that. Wait to read
# from the pty until capture is resumed.
if self._capture_paused:
if (capture_pause_count % 10) == 0:
self._logger.debug('capture paused %d', capture_pause_count)
capture_pause_count += 1
capture_pause_count = 0
data =, 100)
except OSError, e:
if e.errno == errno.EWOULDBLOCK: # Data unavailable
if len(self._capture_buffer) < MAX_BUFFER_SIZE:
buffer_overflow = False
elif buffer_overflow:
'\n\n........capture buffer overflow........\n\n')
buffer_overflow = True
termios.tcsetattr(uart_fd, termios.TCSANOW, saved_conf)
self._logger.debug('quitting capture')
def open(self):
"""Opens access to uart interface."""
raise NotImplementedError('open not yet implemented.')
def close(self):
"""Closes connection to FTDI uart interface."""
raise NotImplementedError('close not yet implemented.')
def run(self):
"""Creates a pthread to poll PTY for data."""
raise NotImplementedError('run not yet implemented.')
def get_uart_props(self):
"""Gets the uart's properties.
dict where:
baudrate: integer of uarts baudrate
bits: integer, number of bits of data Can be 5|6|7|8 inclusive
parity: integer, parity of 0-2 inclusive where:
0: no parity
1: odd parity
2: even parity
sbits: integer, number of stop bits. Can be 0|1|2 inclusive where:
0: 1 stop bit
1: 1.5 stop bits
2: 2 stop bits
raise NotImplementedError('get_uart_props not yet implemented.')
def _uart_props_validation(self, line_props, exception_type=Exception):
"""Validate the line_props.
If they are invalid raise an exception of type exception_type.
line_props: dict where:
baudrate: integer of uarts baudrate
bits: integer, number of bits of data ( prior to stop bit)
parity: integer, parity of 0-2 inclusive where
0: no parity
1: odd parity
2: even parity
sbits: integer, number of stop bits. Can be 0|1|2 inclusive where:
0: 1 stop bit
1: 1.5 stop bits
2: 2 stop bits
exception_type: type of exception to throw if line_props are invalid.
exception_type: If line_props are invalid.
if line_props['bits'] not in [5, 6, 7, 8]:
raise exception_type('Data bits must be 5|6|7|8')
if line_props['sbits'] not in [0, 1, 2]:
raise exception_type('Stop bits must be 0|1|2. Where 0==1bit, '
'1==1.5bits, 2==2bits')
if line_props['parity'] not in [0, 1, 2]:
raise exception_type('Parity must be 0|1|2. Where 0==none, 1==odd, '
def set_uart_props(self, line_props):
"""Sets the uart's properties.
line_props: dict where:
baudrate: integer of uarts baudrate
bits: integer, number of bits of data ( prior to stop bit)
parity: integer, parity of 0-2 inclusive where
0: no parity
1: odd parity
2: even parity
sbits: integer, number of stop bits. Can be 0|1|2 inclusive where:
0: 1 stop bit
1: 1.5 stop bits
2: 2 stop bits
raise NotImplementedError('set_uart_props not yet implemented.')
def get_pty(self):
"""Gets path to pty for communication to/from uart.
String path to the pty connected to the uart
raise NotImplementedError('get_pty not yet implemented.')
def get_capture_active(self):
"""Returns state of the 'capture_active' control for this interface.
Current capture mode expressed as an integer (0 or 1)
return self._capture_active
def set_capture_active(self, activate):
"""Enable/disable the capture mode on this interface.
activate: a Boolean, indicating whether capture should be activated or
if activate and not self._capture_active:
# Need to start capturing
self._capture_buffer = []
self._capture_thread = threading.Thread(target=self._capture_function)
self._capture_active = activate
self._capture_paused = False
if not activate and self._capture_active:
# Need to stop capturing
self._capture_active = activate
self._capture_thread = None
def get_stream(self):
"""Return UART stream accumulated since last time."""
rv = repr(''.join(self._capture_buffer))
self._capture_buffer = []
return rv