| #!/usr/bin/env python |
| # Copyright (C) 2013-2021 Vincent Pelletier <plr.vincent@gmail.com> |
| # |
| # This library is free software; you can redistribute it and/or |
| # modify it under the terms of the GNU Lesser General Public |
| # License as published by the Free Software Foundation; either |
| # version 2.1 of the License, or (at your option) any later version. |
| # |
| # This library is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| # Lesser General Public License for more details. |
| # |
| # You should have received a copy of the GNU Lesser General Public |
| # License along with this library; if not, write to the Free Software |
| # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
| """ |
| Advanced hotplug examples. |
| Presents ways of integrating hotplug into your userland USB driver. |
| """ |
| |
| import select |
| import sys |
| import usb1 |
| |
| # A few helpers for demonstration... |
| mode_dict = {} |
| |
| class NoHotplugSupport(Exception): |
| pass |
| |
| def onAwesomeDeviceLeft(awesome_device): |
| print('Device left:', str(awesome_device)) |
| |
| def onAwesomeDeviceArrived(awesome_device): |
| awesome_device.onClose = onAwesomeDeviceLeft |
| print('Device arrived:', str(awesome_device)) |
| |
| class SelectPoller(object): |
| """ |
| Dummy poller based on select, because it exists on all platforms. |
| WARNING: this class is just for a trivial demonstration, and |
| inherits select() limitations. The most important limitation is |
| that regitering descriptors does not wake/affect a running poll. |
| """ |
| def __init__(self): |
| self._fd_dict = {} |
| |
| def register(self, fd, events): |
| self._fd_dict[fd] = events |
| |
| def unregister(self, fd): |
| self._fd_dict.pop(fd) |
| |
| def poll(self, timeout=None): |
| flag_list = (select.POLLIN, select.POLLOUT, select.POLLPRI) |
| result = {} |
| for fd_list, happened_flag in zip( |
| select.select(*([[ |
| fd |
| for fd, events in self._fd_dict.items() if events & flag |
| ] for flag in flag_list] + [timeout])), |
| flag_list, |
| ): |
| result[fd] = result.get(fd, 0) | happened_flag |
| return list(result.items()) |
| # (end of demonstration helpers) |
| |
| class AwesomeDevice(object): |
| # Application can set this property to do cleanup when device gets closed, |
| # for example when device has left. |
| onClose = lambda device: None |
| |
| def __init__(self, handle): |
| self._handle = handle |
| |
| def __str__(self): |
| # For demonstration purposes only. |
| return 'Awesome Device at ' + str(self._handle.getDevice()) |
| |
| def close(self): |
| # Note: device may have already left when this method is called, |
| # so catch USBErrorNoDevice around cleanup steps involving the device. |
| try: |
| self.onClose(self) |
| # Put device in low-power mode, release claimed interfaces... |
| pass |
| except usb1.USBErrorNoDevice: |
| pass |
| self._handle.close() |
| |
| class AwesomeDeviceHoarderBase(object): |
| """ |
| Manages the horde of connected devices. |
| """ |
| |
| def __init__( |
| self, |
| onDeviceArrived=(lambda awesome_device: False), |
| ): |
| """ |
| onDeviceArrived (callable) |
| Allows further actions by the application when a relevant device |
| arrives, so that it integrates with other devices (ex: send |
| different key events when the same button is pressed on different |
| devices). |
| Returns whether this device should be ignored. |
| Cannot call synchronous API. |
| """ |
| self.context = usb1.USBContext() |
| if not self.context.hasCapability(usb1.CAP_HAS_HOTPLUG): |
| raise NoHotplugSupport( |
| 'Hotplug support is missing. Please update your libusb version.' |
| ) |
| self._device_dict = {} |
| self._onDeviceArrived = onDeviceArrived |
| |
| def _registerCallback(self): |
| self.context.hotplugRegisterCallback( |
| self._onHotplugEvent, |
| # Just in case more events are added in the future. |
| events=usb1.HOTPLUG_EVENT_DEVICE_ARRIVED | usb1.HOTPLUG_EVENT_DEVICE_LEFT, |
| # Edit these if you handle devices from a single vendor, of a |
| # single product type or of a single device class; and simplify |
| # device filtering accordingly in _onHotplugEvent. |
| #vendor_id=, |
| #product_id=, |
| #dev_class=, |
| ) |
| |
| @staticmethod |
| def isDeviceSupported(vendor_id, device_id): |
| """ |
| Check if we should drive the device which arrived. |
| Simplify this if libusb hotplug API filter is enough (ex: handling a |
| single device type). |
| """ |
| # This example handles all devices. |
| return True |
| |
| def _onHotplugEvent(self, context, device, event): |
| if event == usb1.HOTPLUG_EVENT_DEVICE_LEFT: |
| awesome_device = self._device_dict.pop(device, None) |
| if awesome_device is not None: |
| awesome_device.close() |
| return |
| # Remove next branch if libusb hotplug API filtering is enough (ex: |
| # handling a single device type). |
| if not self.isDeviceSupported( |
| device.getVendorID(), |
| device.getProductID(), |
| ): |
| return |
| try: |
| handle = device.open() |
| except usb1.USBError: |
| return |
| awesome_device = AwesomeDevice(handle) |
| if self._onDeviceArrived(awesome_device): |
| awesome_device.close() |
| return |
| self._device_dict[device] = awesome_device |
| |
| # Below are alternative APIs. Choose the one most suitable to your needs, |
| # and ignore the others. This is of course not an exhaustive list. |
| |
| class AwesomeDeviceHoarderSimple(AwesomeDeviceHoarderBase): |
| """ |
| API 1: USB-event-centric application. |
| Simplest API, for userland drivers which only react to USB events. |
| """ |
| def run(self): |
| with self.context: |
| print('Registering hotplug callback...') |
| self._registerCallback() |
| print('Callback registered. Monitoring events, ^C to exit') |
| while True: |
| self.context.handleEvents() |
| |
| def simple(): |
| AwesomeDeviceHoarderSimple(onAwesomeDeviceArrived).run() |
| mode_dict['simple'] = simple |
| |
| class AwesomeDeviceHoarderEventLoop(AwesomeDeviceHoarderBase): |
| """ |
| API 2: |
| More complex, for userland drivers which need to react to other events |
| (sockets, user input, ...). Application must then integrate libusb |
| polling in its event loop (see usb1.USBPollerThread and usb1.USBPoller). |
| """ |
| def __enter__(self): |
| self.context.open() |
| print('Registering hotplug callback...') |
| self._registerCallback() |
| print('Callback registered.') |
| return self |
| |
| def __exit__(self, exc_type, exc_val, exc_tb): |
| self.context.close() |
| |
| def eventloop(): |
| with AwesomeDeviceHoarderEventLoop(onAwesomeDeviceArrived) as awesome_device_hoarder: |
| base_poller = SelectPoller() |
| # In real code, file descriptor would be independently registered |
| # to base_poller. |
| # The event loop would be something like: |
| poller = usb1.USBPoller(awesome_device_hoarder.context, base_poller) |
| print('Monitoring events, ^C to exit') |
| while True: |
| poller.poll() |
| mode_dict['eventloop'] = eventloop |
| |
| def main(): |
| try: |
| mode = mode_dict[sys.argv[1]] |
| except (KeyError, IndexError): |
| print('Usage: %s [%s]' % ( |
| sys.argv[0], |
| '|'.join(mode_dict), |
| )) |
| sys.exit(1) |
| print( |
| 'NOTE: this example needs sufficient permissions to be able to ' |
| 'open USB devices to produce any interesting output. If you see ' |
| 'nothing below, check you have USB devices plugged *and* that you ' |
| 'have sufficient permissions to open them.' |
| ) |
| try: |
| mode() |
| except NoHotplugSupport as exc: |
| print(exc.value) |
| sys.exit(1) |
| except (KeyboardInterrupt, SystemExit): |
| print('Exiting') |
| |
| if __name__ == '__main__': |
| main() |