| #!/usr/bin/env python |
| # Copyright 2016 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Allow creation of uart/console interface via usb google serial endpoint.""" |
| |
| # Note: This is a py2/3 compatible file. |
| |
| from __future__ import print_function |
| import argparse |
| import array |
| import os |
| import sys |
| import termios |
| import threading |
| import time |
| import traceback |
| import tty |
| try: |
| import usb |
| except: |
| print("import usb failed") |
| print("try running these commands:") |
| print(" sudo apt-get install python-pip") |
| print(" sudo pip install --pre pyusb") |
| print() |
| sys.exit(-1) |
| |
| import six |
| |
| |
| def GetBuffer(stream): |
| if six.PY3: |
| return stream.buffer |
| return stream |
| |
| |
| """Class Susb covers USB device discovery and initialization. |
| |
| It can find a particular endpoint by vid:pid, serial number, |
| and interface number. |
| """ |
| |
| class SusbError(Exception): |
| """Class for exceptions of Susb.""" |
| def __init__(self, msg, value=0): |
| """SusbError constructor. |
| |
| Args: |
| msg: string, message describing error in detail |
| value: integer, value of error when non-zero status returned. Default=0 |
| """ |
| super(SusbError, self).__init__(msg, value) |
| self.msg = msg |
| self.value = value |
| |
| class Susb(): |
| """Provide USB functionality. |
| |
| Instance Variables: |
| _read_ep: pyUSB read endpoint for this interface |
| _write_ep: pyUSB write endpoint for this interface |
| """ |
| READ_ENDPOINT = 0x81 |
| WRITE_ENDPOINT = 0x1 |
| TIMEOUT_MS = 100 |
| |
| def __init__(self, vendor=0x18d1, |
| product=0x500f, interface=1, serialname=None): |
| """Susb constructor. |
| |
| Discovers and connects to USB endpoints. |
| |
| Args: |
| vendor : usb vendor id of device |
| product : usb product id of device |
| interface : interface number ( 1 - 8 ) of device to use |
| serialname: string of device serialnumber. |
| |
| Raises: |
| SusbError: An error accessing Susb object |
| """ |
| # Find the device. |
| dev_g = usb.core.find(idVendor=vendor, idProduct=product, find_all=True) |
| dev_list = list(dev_g) |
| if dev_list is None: |
| raise SusbError("USB device not found") |
| |
| # Check if we have multiple devices. |
| dev = None |
| if serialname: |
| for d in dev_list: |
| dev_serial = "PyUSB doesn't have a stable interface" |
| try: |
| dev_serial = usb.util.get_string(d, 256, d.iSerialNumber) |
| except: |
| dev_serial = usb.util.get_string(d, d.iSerialNumber) |
| if dev_serial == serialname: |
| dev = d |
| break |
| if dev is None: |
| raise SusbError("USB device(%s) not found" % (serialname,)) |
| else: |
| try: |
| dev = dev_list[0] |
| except: |
| try: |
| dev = dev_list.next() |
| except: |
| raise SusbError("USB device %04x:%04x not found" % (vendor, product)) |
| |
| # If we can't set configuration, it's already been set. |
| try: |
| dev.set_configuration() |
| except usb.core.USBError: |
| pass |
| |
| # Get an endpoint instance. |
| cfg = dev.get_active_configuration() |
| intf = usb.util.find_descriptor(cfg, bInterfaceNumber=interface) |
| self._intf = intf |
| |
| if not intf: |
| raise SusbError("Interface not found") |
| |
| # Detach raiden.ko if it is loaded. |
| if dev.is_kernel_driver_active(intf.bInterfaceNumber) is True: |
| dev.detach_kernel_driver(intf.bInterfaceNumber) |
| |
| read_ep_number = intf.bInterfaceNumber + self.READ_ENDPOINT |
| read_ep = usb.util.find_descriptor(intf, bEndpointAddress=read_ep_number) |
| self._read_ep = read_ep |
| |
| write_ep_number = intf.bInterfaceNumber + self.WRITE_ENDPOINT |
| write_ep = usb.util.find_descriptor(intf, bEndpointAddress=write_ep_number) |
| self._write_ep = write_ep |
| |
| |
| """Suart class implements a stream interface, to access Google's USB class. |
| |
| This creates a send and receive thread that monitors USB and console input |
| and forwards them across. This particular class is hardcoded to stdin/out. |
| """ |
| |
| class SuartError(Exception): |
| """Class for exceptions of Suart.""" |
| def __init__(self, msg, value=0): |
| """SuartError constructor. |
| |
| Args: |
| msg: string, message describing error in detail |
| value: integer, value of error when non-zero status returned. Default=0 |
| """ |
| super(SuartError, self).__init__(msg, value) |
| self.msg = msg |
| self.value = value |
| |
| |
| class Suart(): |
| """Provide interface to serial usb endpoint.""" |
| |
| def __init__(self, vendor=0x18d1, product=0x501c, interface=0, |
| serialname=None): |
| """Suart contstructor. |
| |
| Initializes USB stream interface. |
| |
| Args: |
| vendor: usb vendor id of device |
| product: usb product id of device |
| interface: interface number of device to use |
| serialname: Defaults to None. |
| |
| Raises: |
| SuartError: If init fails |
| """ |
| self._done = threading.Event() |
| self._susb = Susb(vendor=vendor, product=product, |
| interface=interface, serialname=serialname) |
| |
| def wait_until_done(self, timeout=None): |
| return self._done.wait(timeout=timeout) |
| |
| def run_rx_thread(self): |
| try: |
| while True: |
| try: |
| r = self._susb._read_ep.read(64, self._susb.TIMEOUT_MS) |
| if r: |
| GetBuffer(sys.stdout).write(r.tostring()) |
| GetBuffer(sys.stdout).flush() |
| |
| except Exception as e: |
| # If we miss some characters on pty disconnect, that's fine. |
| # ep.read() also throws USBError on timeout, which we discard. |
| if not isinstance(e, (OSError, usb.core.USBError)): |
| print("rx %s" % e) |
| finally: |
| self._done.set() |
| |
| def run_tx_thread(self): |
| try: |
| while True: |
| try: |
| r = GetBuffer(sys.stdin).read(1) |
| if not r or r == b"\x03": |
| break |
| if r: |
| self._susb._write_ep.write(array.array('B', r), |
| self._susb.TIMEOUT_MS) |
| except Exception as e: |
| print("tx %s" % e) |
| finally: |
| self._done.set() |
| |
| def run(self): |
| """Creates pthreads to poll USB & PTY for data. |
| """ |
| self._exit = False |
| |
| self._rx_thread = threading.Thread(target=self.run_rx_thread) |
| self._rx_thread.daemon = True |
| self._rx_thread.start() |
| |
| self._tx_thread = threading.Thread(target=self.run_tx_thread) |
| self._tx_thread.daemon = True |
| self._tx_thread.start() |
| |
| |
| |
| """Command line functionality |
| |
| Allows specifying vid:pid, serialnumber, interface. |
| Ctrl-C exits. |
| """ |
| |
| parser = argparse.ArgumentParser(description="Open a console to a USB device") |
| parser.add_argument('-d', '--device', type=str, |
| help="vid:pid of target device", default="18d1:501c") |
| parser.add_argument('-i', '--interface', type=int, |
| help="interface number of console", default=0) |
| parser.add_argument('-s', '--serialno', type=str, |
| help="serial number of device", default="") |
| parser.add_argument('-S', '--notty-exit-sleep', type=float, default=0.2, |
| help="When stdin is *not* a TTY, wait this many seconds after EOF from " |
| "stdin before exiting, to give time for receiving a reply from the USB " |
| "device.") |
| |
| |
| def runconsole(): |
| """Run the usb console code |
| |
| Starts the pty thread, and idles until a ^C is caught. |
| """ |
| args = parser.parse_args() |
| |
| vidstr, pidstr = args.device.split(':') |
| vid = int(vidstr, 16) |
| pid = int(pidstr, 16) |
| |
| serialno = args.serialno |
| interface = args.interface |
| |
| sobj = Suart(vendor=vid, product=pid, interface=interface, |
| serialname=serialno) |
| if sys.stdin.isatty(): |
| tty.setraw(sys.stdin.fileno()) |
| sobj.run() |
| sobj.wait_until_done() |
| if not sys.stdin.isatty() and args.notty_exit_sleep > 0: |
| time.sleep(args.notty_exit_sleep) |
| |
| |
| def main(): |
| stdin_isatty = sys.stdin.isatty() |
| if stdin_isatty: |
| fd = sys.stdin.fileno() |
| os.system("stty -echo") |
| old_settings = termios.tcgetattr(fd) |
| |
| try: |
| runconsole() |
| finally: |
| if stdin_isatty: |
| termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) |
| os.system("stty echo") |
| # Avoid having the user's shell prompt start mid-line after the final output |
| # from this program. |
| print() |
| |
| |
| if __name__ == '__main__': |
| main() |