| # Copyright 2017 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Allow creation of uart/console interface via stm32 usb endpoint.""" |
| |
| import os |
| import select |
| import sys |
| import termios |
| import threading |
| import time |
| import tty |
| |
| import usb # pylint:disable=import-error |
| |
| from . import stm32usb |
| |
| |
| class SuartError(Exception): |
| """Class for exceptions of Suart.""" |
| |
| def __init__(self, msg, value=0): |
| """SuartError constructor. |
| |
| Args: |
| msg: string, message describing error in detail |
| value: integer, value of error when non-zero status returned. Default=0 |
| """ |
| super(SuartError, self).__init__(msg, value) |
| self.msg = msg |
| self.value = value |
| |
| |
| class Suart: |
| """Provide interface to stm32 serial usb endpoint.""" |
| |
| def __init__( |
| self, |
| vendor=0x18D1, |
| product=0x501A, |
| interface=0, |
| serialname=None, |
| debuglog=False, |
| ): |
| """Suart contstructor. |
| |
| Initializes stm32 USB stream interface. |
| |
| Args: |
| vendor: usb vendor id of stm32 device |
| product: usb product id of stm32 device |
| interface: interface number of stm32 device to use |
| serialname: serial name to target. Defaults to None. |
| debuglog: chatty output. Defaults to False. |
| |
| Raises: |
| SuartError: If init fails |
| """ |
| self._ptym = None |
| self._ptys = None |
| self._ptyname = None |
| self._rx_thread = None |
| self._tx_thread = None |
| self._debuglog = debuglog |
| self._susb = stm32usb.Susb( |
| vendor=vendor, |
| product=product, |
| interface=interface, |
| serialname=serialname, |
| ) |
| self._running = False |
| |
| def __del__(self): |
| """Suart destructor.""" |
| self.close() |
| |
| def close(self): |
| """Stop all running threads.""" |
| self._running = False |
| if self._rx_thread: |
| self._rx_thread.join(2) |
| self._rx_thread = None |
| if self._tx_thread: |
| self._tx_thread.join(2) |
| self._tx_thread = None |
| self._susb.close() |
| |
| def run_rx_thread(self): |
| """Background loop to pass data from USB to pty.""" |
| ep = select.epoll() |
| ep.register(self._ptym, select.EPOLLHUP) |
| try: |
| while self._running: |
| events = ep.poll(0) |
| # Check if the pty is connected to anything, or hungup. |
| if not events: |
| try: |
| r = self._susb._read_ep.read(64, self._susb.TIMEOUT_MS) |
| if r: |
| if self._debuglog: |
| print("".join([chr(x) for x in r]), end="") |
| os.write(self._ptym, r) |
| |
| # If we miss some characters on pty disconnect, that's fine. |
| # ep.read() also throws USBError on timeout, which we discard. |
| except (OSError, usb.core.USBError): |
| pass |
| else: |
| time.sleep(0.1) |
| except Exception as e: |
| raise e |
| |
| def run_tx_thread(self): |
| """Background loop to pass data from pty to USB.""" |
| ep = select.epoll() |
| ep.register(self._ptym, select.EPOLLHUP) |
| try: |
| while self._running: |
| events = ep.poll(0) |
| # Check if the pty is connected to anything, or hungup. |
| if not events: |
| try: |
| r = os.read(self._ptym, 64) |
| # TODO(crosbug.com/936182): Remove when the |
| # servo v4/micro console issues are fixed. |
| time.sleep(0.001) |
| if r: |
| self._susb._write_ep.write(r, self._susb.TIMEOUT_MS) |
| |
| except (OSError, usb.core.USBError): |
| pass |
| else: |
| time.sleep(0.1) |
| except Exception as e: |
| raise e |
| |
| def run(self): |
| """Creates pthreads to poll stm32 & PTY for data.""" |
| m, s = os.openpty() |
| self._ptyname = os.ttyname(s) |
| |
| self._ptym = m |
| self._ptys = s |
| |
| os.fchmod(s, 0o660) |
| |
| # Change the owner and group of the PTY to the user who started servod. |
| try: |
| uid = int(os.environ.get("SUDO_UID", -1)) |
| except TypeError: |
| uid = -1 |
| |
| try: |
| gid = int(os.environ.get("SUDO_GID", -1)) |
| except TypeError: |
| gid = -1 |
| os.fchown(s, uid, gid) |
| |
| tty.setraw(self._ptym, termios.TCSADRAIN) |
| |
| # Generate a HUP flag on pty secondary fd. |
| os.fdopen(s).close() |
| |
| self._running = True |
| |
| self._rx_thread = threading.Thread(target=self.run_rx_thread, args=[]) |
| self._rx_thread.daemon = True |
| self._rx_thread.start() |
| |
| self._tx_thread = threading.Thread(target=self.run_tx_thread, args=[]) |
| self._tx_thread.daemon = True |
| self._tx_thread.start() |
| |
| def get_uart_props(self): |
| """Get the uart's properties. |
| |
| Returns: |
| dict where: |
| baudrate: integer of uarts baudrate |
| bits: integer, number of bits of data Can be 5|6|7|8 inclusive |
| parity: integer, parity of 0-2 inclusive where: |
| 0: no parity |
| 1: odd parity |
| 2: even parity |
| sbits: integer, number of stop bits. Can be 0|1|2 inclusive where: |
| 0: 1 stop bit |
| 1: 1.5 stop bits |
| 2: 2 stop bits |
| """ |
| return { |
| "baudrate": 115200, |
| "bits": 8, |
| "parity": 0, |
| "sbits": 1, |
| } |
| |
| def set_uart_props(self, line_props): |
| """Set the uart's properties. |
| |
| Note that Suart cannot set properties |
| and will fail if the properties are not the default 115200,8n1. |
| |
| Args: |
| line_props: dict where: |
| baudrate: integer of uarts baudrate |
| bits: integer, number of bits of data ( prior to stop bit) |
| parity: integer, parity of 0-2 inclusive where |
| 0: no parity |
| 1: odd parity |
| 2: even parity |
| sbits: integer, number of stop bits. Can be 0|1|2 inclusive where: |
| 0: 1 stop bit |
| 1: 1.5 stop bits |
| 2: 2 stop bits |
| |
| Raises: |
| SuartError: If requested line properties are not the default. |
| """ |
| curr_props = self.get_uart_props() |
| for prop in line_props: |
| if line_props[prop] != curr_props[prop]: |
| raise SuartError( |
| "Line property %s cannot be set from %s to %s" |
| % (prop, curr_props[prop], line_props[prop]) |
| ) |
| return True |
| |
| def get_pty(self): |
| """Gets path to pty for communication to/from uart. |
| |
| Returns: |
| String path to the pty connected to the uart |
| """ |
| return self._ptyname |
| |
| |
| def main(): |
| """Run a suart test with the default parameters.""" |
| try: |
| sobj = Suart() |
| sobj.run() |
| |
| # run() is a thread so just busy wait to mimic server. |
| while True: |
| # Ours sleeps to eleven! |
| time.sleep(11) |
| except KeyboardInterrupt: |
| sys.exit(0) |
| |
| |
| if __name__ == "__main__": |
| main() |