| # Copyright 2014 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """USB echo gadget module. |
| |
| This gadget has pairs of IN/OUT endpoints that echo packets back to the host. |
| """ |
| |
| import uuid |
| |
| import composite_gadget |
| import usb_constants |
| import usb_descriptors |
| |
| |
| class EchoCompositeFeature(composite_gadget.CompositeFeature): |
| """Composite device feature that echos data back to the host. |
| """ |
| |
| def __init__(self, endpoints): |
| """Create an echo gadget. |
| """ |
| fs_interfaces = [] |
| hs_interfaces = [] |
| |
| if len(endpoints) >= 1: |
| iface_num, iface_string, in_endpoint, out_endpoint = endpoints[0] |
| fs_intr_interface_desc = usb_descriptors.InterfaceDescriptor( |
| bInterfaceNumber=iface_num, |
| bInterfaceClass=usb_constants.DeviceClass.VENDOR, |
| bInterfaceSubClass=0, |
| bInterfaceProtocol=0, |
| iInterface=iface_string, |
| ) |
| fs_intr_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor( |
| bEndpointAddress=out_endpoint, |
| bmAttributes=usb_constants.TransferType.INTERRUPT, |
| wMaxPacketSize=64, |
| bInterval=1 # 1ms |
| )) |
| fs_intr_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor( |
| bEndpointAddress=in_endpoint, |
| bmAttributes=usb_constants.TransferType.INTERRUPT, |
| wMaxPacketSize=64, |
| bInterval=1 # 1ms |
| )) |
| fs_interfaces.append(fs_intr_interface_desc) |
| |
| hs_intr_interface_desc = usb_descriptors.InterfaceDescriptor( |
| bInterfaceNumber=iface_num, |
| bInterfaceClass=usb_constants.DeviceClass.VENDOR, |
| bInterfaceSubClass=0, |
| bInterfaceProtocol=0, |
| iInterface=iface_string |
| ) |
| hs_intr_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor( |
| bEndpointAddress=out_endpoint, |
| bmAttributes=usb_constants.TransferType.INTERRUPT, |
| wMaxPacketSize=64, |
| bInterval=4 # 1ms |
| )) |
| hs_intr_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor( |
| bEndpointAddress=in_endpoint, |
| bmAttributes=usb_constants.TransferType.INTERRUPT, |
| wMaxPacketSize=64, |
| bInterval=4 # 1ms |
| )) |
| hs_interfaces.append(hs_intr_interface_desc) |
| |
| if len(endpoints) >= 2: |
| iface_num, iface_string, in_endpoint, out_endpoint = endpoints[1] |
| fs_bulk_interface_desc = usb_descriptors.InterfaceDescriptor( |
| bInterfaceNumber=iface_num, |
| bInterfaceClass=usb_constants.DeviceClass.VENDOR, |
| bInterfaceSubClass=0, |
| bInterfaceProtocol=0, |
| iInterface=iface_string |
| ) |
| fs_bulk_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor( |
| bEndpointAddress=out_endpoint, |
| bmAttributes=usb_constants.TransferType.BULK, |
| wMaxPacketSize=64, |
| bInterval=0 |
| )) |
| fs_bulk_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor( |
| bEndpointAddress=in_endpoint, |
| bmAttributes=usb_constants.TransferType.BULK, |
| wMaxPacketSize=64, |
| bInterval=0 |
| )) |
| fs_interfaces.append(fs_bulk_interface_desc) |
| |
| hs_bulk_interface_desc = usb_descriptors.InterfaceDescriptor( |
| bInterfaceNumber=iface_num, |
| bInterfaceClass=usb_constants.DeviceClass.VENDOR, |
| bInterfaceSubClass=0, |
| bInterfaceProtocol=0, |
| iInterface=iface_string |
| ) |
| hs_bulk_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor( |
| bEndpointAddress=out_endpoint, |
| bmAttributes=usb_constants.TransferType.BULK, |
| wMaxPacketSize=512, |
| bInterval=0 |
| )) |
| hs_bulk_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor( |
| bEndpointAddress=in_endpoint, |
| bmAttributes=usb_constants.TransferType.BULK, |
| wMaxPacketSize=512, |
| bInterval=0 |
| )) |
| hs_interfaces.append(hs_bulk_interface_desc) |
| |
| if len(endpoints) >= 3: |
| iface_num, iface_string, in_endpoint, out_endpoint = endpoints[2] |
| fs_interfaces.append(usb_descriptors.InterfaceDescriptor( |
| bInterfaceNumber=iface_num, |
| bInterfaceClass=usb_constants.DeviceClass.VENDOR, |
| bInterfaceSubClass=0, |
| bInterfaceProtocol=0, |
| iInterface=iface_string |
| )) |
| fs_isoc_interface_desc = usb_descriptors.InterfaceDescriptor( |
| bInterfaceNumber=iface_num, |
| bAlternateSetting=1, |
| bInterfaceClass=usb_constants.DeviceClass.VENDOR, |
| bInterfaceSubClass=0, |
| bInterfaceProtocol=0, |
| iInterface=iface_string |
| ) |
| fs_isoc_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor( |
| bEndpointAddress=out_endpoint, |
| bmAttributes=usb_constants.TransferType.ISOCHRONOUS, |
| wMaxPacketSize=1023, |
| bInterval=1 # 1ms |
| )) |
| fs_isoc_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor( |
| bEndpointAddress=in_endpoint, |
| bmAttributes=usb_constants.TransferType.ISOCHRONOUS, |
| wMaxPacketSize=1023, |
| bInterval=1 # 1ms |
| )) |
| fs_interfaces.append(fs_isoc_interface_desc) |
| |
| hs_interfaces.append(usb_descriptors.InterfaceDescriptor( |
| bInterfaceNumber=iface_num, |
| bInterfaceClass=usb_constants.DeviceClass.VENDOR, |
| bInterfaceSubClass=0, |
| bInterfaceProtocol=0, |
| iInterface=iface_string |
| )) |
| hs_isoc_interface_desc = usb_descriptors.InterfaceDescriptor( |
| bInterfaceNumber=iface_num, |
| bAlternateSetting=1, |
| bInterfaceClass=usb_constants.DeviceClass.VENDOR, |
| bInterfaceSubClass=0, |
| bInterfaceProtocol=0, |
| iInterface=iface_string |
| ) |
| hs_isoc_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor( |
| bEndpointAddress=out_endpoint, |
| bmAttributes=usb_constants.TransferType.ISOCHRONOUS, |
| wMaxPacketSize=512, |
| bInterval=4 # 1ms |
| )) |
| hs_isoc_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor( |
| bEndpointAddress=in_endpoint, |
| bmAttributes=usb_constants.TransferType.ISOCHRONOUS, |
| wMaxPacketSize=512, |
| bInterval=4 # 1ms |
| )) |
| hs_interfaces.append(hs_isoc_interface_desc) |
| |
| super(EchoCompositeFeature, self).__init__(fs_interfaces, hs_interfaces) |
| |
| def ReceivePacket(self, endpoint, data): |
| """Echo a packet back to the host. |
| |
| Args: |
| endpoint: Incoming endpoint (must be an OUT pipe). |
| data: Packet data. |
| """ |
| assert endpoint & usb_constants.Dir.IN == 0 |
| |
| self.SendPacket(endpoint | usb_constants.Dir.IN, data) |
| |
| |
| class EchoGadget(composite_gadget.CompositeGadget): |
| """Echo gadget. |
| """ |
| |
| def __init__(self): |
| """Create an echo gadget. |
| """ |
| device_desc = usb_descriptors.DeviceDescriptor( |
| idVendor=usb_constants.VendorID.GOOGLE, |
| idProduct=usb_constants.ProductID.GOOGLE_ECHO_GADGET, |
| bcdUSB=0x0200, |
| iManufacturer=1, |
| iProduct=2, |
| iSerialNumber=3, |
| bcdDevice=0x0100) |
| |
| feature = EchoCompositeFeature( |
| endpoints=[(0, 4, 0x81, 0x01), (1, 5, 0x82, 0x02), (2, 6, 0x83, 0x03)]) |
| |
| super(EchoGadget, self).__init__(device_desc, [feature]) |
| self.AddStringDescriptor(1, 'Google Inc.') |
| self.AddStringDescriptor(2, 'Echo Gadget') |
| self.AddStringDescriptor(3, '{:06X}'.format(uuid.getnode())) |
| self.AddStringDescriptor(4, 'Interrupt Echo') |
| self.AddStringDescriptor(5, 'Bulk Echo') |
| self.AddStringDescriptor(6, 'Isochronous Echo') |
| |
| # Enable Microsoft OS Descriptors for Windows 8 and above. |
| self.EnableMicrosoftOSDescriptorsV1(vendor_code=0x01) |
| # These are used to force Windows to load WINUSB.SYS for the echo functions. |
| self.SetMicrosoftCompatId(0, 'WINUSB') |
| self.SetMicrosoftCompatId(1, 'WINUSB') |
| self.SetMicrosoftCompatId(2, 'WINUSB') |
| |
| self.AddDeviceCapabilityDescriptor(usb_descriptors.ContainerIdDescriptor( |
| ContainerID=uuid.uuid4().bytes_le)) |
| |
| def RegisterHandlers(): |
| """Registers web request handlers with the application server.""" |
| |
| import server |
| from tornado import web |
| |
| class WebConfigureHandler(web.RequestHandler): |
| |
| def post(self): |
| server.SwitchGadget(EchoGadget()) |
| |
| server.app.add_handlers('.*$', [ |
| (r'/echo/configure', WebConfigureHandler), |
| ]) |