| # -*- coding: utf-8 -*- |
| # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Chameleond Driver for FPGA customized platform with the TIO card.""" |
| |
| from __future__ import print_function |
| |
| import functools |
| import glob |
| # TODO: to port chromite.lib.cros_logging to replace legacy logging |
| import logging # pylint: disable=cros-logging-import |
| import os |
| import time |
| import xmlrpc.client |
| |
| from . import chameleon_common # pylint: disable=W0611 |
| |
| from chameleond.devices import audio_board |
| from chameleond.devices import avsync_probe |
| from chameleond.devices import bluetooth_base_flow |
| from chameleond.devices import bluetooth_hid_flow |
| from chameleond.devices import codec_flow |
| from chameleond.devices import input_flow |
| from chameleond.devices import motor_board |
| from chameleond.devices import usb_audio_flow |
| from chameleond.devices import usb_hid_flow |
| from chameleond.devices import usb_printer_device |
| from chameleond.interface import ChameleondInterface |
| from chameleond.utils import caching_server |
| from chameleond.utils import device_manager |
| from chameleond.utils import fpga |
| from chameleond.utils import flow_manager |
| from chameleond.utils import i2c |
| from chameleond.utils import ids |
| from chameleond.utils import system_tools |
| from chameleond.utils import usb |
| from chameleond.utils import usb_printer_control |
| from chameleond.utils.common import lazy |
| from chameleond.utils import bluetooth_a2dp |
| |
| if os.environ.get('PLATFORM') != 'FPGA': |
| # pylint: disable=C0412 |
| from chameleond.utils import bluetooth_tester |
| |
| if os.environ.get('PLATFORM') == 'RASPI': |
| # pylint: disable=C0412 |
| from chameleond.devices import raspi_bluetooth_flow |
| |
| |
| class DriverError(Exception): |
| """Exception raised when any error on FPGA driver.""" |
| pass |
| |
| |
| def _DeviceMethod(device_id): |
| """Decorator that checks if the device exists. |
| |
| Args: |
| device_id: The device ID. |
| """ |
| def _ActualDecorator(func): |
| @functools.wraps(func) |
| def wrapper(instance, *args, **kwargs): |
| if not instance.HasDevice(device_id): |
| raise DriverError('There is no %s' % ids.DEVICE_NAMES[device_id]) |
| return func(instance, *args, **kwargs) |
| return wrapper |
| return _ActualDecorator |
| |
| |
| class ChameleondDriver(ChameleondInterface): |
| """Chameleond Driver for FPGA customized platform.""" |
| |
| _I2C_BUS_MAIN = 0 |
| _I2C_BUS_AUDIO_CODEC = 1 |
| _I2C_BUS_EXT_BOARD = 3 |
| |
| # Time to wait for video frame dump to start before a timeout error is raised |
| _TIMEOUT_FRAME_DUMP_PROBE = 60.0 |
| |
| # The frame index which is used for the regular DumpPixels API. |
| _DEFAULT_FRAME_INDEX = 0 |
| _DEFAULT_FRAME_LIMIT = _DEFAULT_FRAME_INDEX + 1 |
| |
| # Limit the period of async capture to 3min (in 60fps). |
| _MAX_CAPTURED_FRAME_COUNT = 3 * 60 * 60 |
| |
| def __init__(self, *args, **kwargs): |
| super(ChameleondDriver, self).__init__(*args, **kwargs) |
| |
| self._platform = os.environ.get('PLATFORM') |
| logging.info('platform: %s', self._platform) |
| |
| self.dump_bt_peer_info() |
| |
| self._captured_params = {} |
| self._process = None |
| |
| # waihong@chromium.org suggests to use a lazy wrapper which instantiates |
| # the following control objects when requested at the first time. |
| self._main_bus = lazy(i2c.I2cBus)(self._I2C_BUS_MAIN) |
| self._ext_board_bus = lazy(i2c.I2cBus)(self._I2C_BUS_EXT_BOARD) |
| self._audio_codec_bus = lazy(i2c.I2cBus)(self._I2C_BUS_AUDIO_CODEC) |
| self._fpga_ctrl = lazy(fpga.FpgaController)() |
| self._usb_audio_ctrl = lazy(usb.USBAudioController)() |
| self._usb_hid_ctrl = lazy(usb.USBController)('g_hid') |
| self._usb_printer_ctrl = lazy(usb_printer_control.USBPrinterController)() |
| self._bluetooth_hid_ctrl = lazy(usb.USBController)( |
| bluetooth_hid_flow.BluetoothHIDMouseFlow.DRIVER) |
| self._bluetooth_a2dp_sink_ctrl = lazy(usb.USBController)( |
| bluetooth_a2dp.BluetoothA2DPSinkFlow.DRIVER) |
| # See explanation for using DRIVER_MODULE in bluetooth_nrf52.py |
| self._ble_hid_ctrl = lazy(usb.USBController)( |
| bluetooth_hid_flow.BleHIDMouseFlow.DRIVER_MODULE) |
| # A base control comes without any driver. |
| self._base_ctrl = lazy(usb.USBController)('') |
| |
| if self._platform == 'CHROMEOS': |
| self._devices = self.init_devices_for_chromeos() |
| elif self._platform == 'RASPI': |
| self._devices = self.init_devices_for_raspi() |
| else: |
| self._devices = self.init_devices_for_fpga() |
| |
| self._device_manager = device_manager.DeviceManager(self._devices) |
| self._device_manager.Init() |
| self._flows = self._device_manager.GetDetectedFlows() |
| |
| # Allow to access the methods through object. |
| # Hence, there is no need to export the methods in ChameleondDriver. |
| # An object in the following would be None if it is not instantiated |
| # in self._devices above. |
| self.audio_board = self._device_manager.GetChameleonDevice(ids.AUDIO_BOARD) |
| self.bluetooth_audio = self._device_manager.GetChameleonDevice( |
| ids.BLUETOOTH_AUDIO) |
| self.bluetooth_base = self._device_manager.GetChameleonDevice( |
| ids.BLUETOOTH_BASE) |
| self.bluetooth_mouse = self._device_manager.GetChameleonDevice( |
| ids.BLUETOOTH_HID_MOUSE) |
| self.bluetooth_keyboard = self._device_manager.GetChameleonDevice( |
| ids.BLUETOOTH_HID_KEYBOARD) |
| self.bluetooth_tester = self._device_manager.GetChameleonDevice( |
| ids.BLUETOOTH_TESTER) |
| self.avsync_probe = self._device_manager.GetChameleonDevice( |
| ids.AVSYNC_PROBE) |
| self.motor_board = self._device_manager.GetChameleonDevice(ids.MOTOR_BOARD) |
| self.printer = self._device_manager.GetChameleonDevice(ids.USB_PRINTER) |
| self.bluetooth_a2dp_sink = self._device_manager.GetChameleonDevice( |
| ids.BLUETOOTH_A2DP_SINK) |
| self.ble_mouse = self._device_manager.GetChameleonDevice( |
| ids.BLE_MOUSE) |
| self.ble_keyboard = self._device_manager.GetChameleonDevice( |
| ids.BLE_KEYBOARD) |
| self.ble_phone = self._device_manager.GetChameleonDevice( |
| ids.BLE_PHONE) |
| self._flow_manager = flow_manager.FlowManager(self._flows) |
| |
| self.Reset() |
| |
| |
| def get_platform(self): |
| return self._platform |
| |
| def init_devices_for_chromeos(self): |
| devices = { |
| ids.BLUETOOTH_HID_MOUSE: |
| bluetooth_hid_flow.BluetoothHIDMouseFlow( |
| ids.BLUETOOTH_HID_MOUSE, self._bluetooth_hid_ctrl), |
| ids.BLUETOOTH_A2DP_SINK: |
| bluetooth_a2dp.BluetoothA2DPSinkFlow( |
| ids.BLUETOOTH_A2DP_SINK, self._bluetooth_a2dp_sink_ctrl), |
| ids.BLE_MOUSE: |
| bluetooth_hid_flow.BleHIDMouseFlow( |
| ids.BLE_MOUSE, self._ble_hid_ctrl), |
| ids.BLUETOOTH_BASE: |
| bluetooth_base_flow.BluetoothBaseFlow( |
| ids.BLUETOOTH_BASE, self._base_ctrl), |
| } |
| return devices |
| |
| def init_devices_for_fpga(self): |
| devices = { |
| ids.DP1: input_flow.DpInputFlow( |
| ids.DP1, self._main_bus, self._fpga_ctrl), |
| ids.DP2: input_flow.DpInputFlow( |
| ids.DP2, self._main_bus, self._fpga_ctrl), |
| ids.HDMI: input_flow.HdmiInputFlow( |
| ids.HDMI, self._main_bus, self._fpga_ctrl), |
| ids.VGA: input_flow.VgaInputFlow( |
| ids.VGA, self._main_bus, self._fpga_ctrl), |
| ids.MIC: codec_flow.InputCodecFlow( |
| ids.MIC, self._audio_codec_bus, self._fpga_ctrl), |
| ids.LINEIN: codec_flow.InputCodecFlow( |
| ids.LINEIN, self._audio_codec_bus, self._fpga_ctrl), |
| ids.LINEOUT: codec_flow.OutputCodecFlow( |
| ids.LINEOUT, self._audio_codec_bus, self._fpga_ctrl), |
| ids.USB_AUDIO_IN: usb_audio_flow.InputUSBAudioFlow( |
| ids.USB_AUDIO_IN, self._usb_audio_ctrl), |
| ids.USB_AUDIO_OUT: usb_audio_flow.OutputUSBAudioFlow( |
| ids.USB_AUDIO_OUT, self._usb_audio_ctrl), |
| ids.USB_KEYBOARD: usb_hid_flow.KeyboardUSBHIDFlow( |
| ids.USB_KEYBOARD, self._usb_hid_ctrl), |
| ids.USB_TOUCH: usb_hid_flow.TouchUSBHIDFlow( |
| ids.USB_TOUCH, self._usb_hid_ctrl), |
| ids.AVSYNC_PROBE: avsync_probe.AVSyncProbe(ids.AVSYNC_PROBE), |
| ids.AUDIO_BOARD: audio_board.AudioBoard(self._ext_board_bus), |
| ids.MOTOR_BOARD: motor_board.MotorBoard(self._ext_board_bus), |
| ids.USB_PRINTER: usb_printer_device.USBPrinter(self._usb_printer_ctrl), |
| } |
| |
| return devices |
| |
| def init_devices_for_raspi(self): |
| devices = { |
| ids.BLUETOOTH_HID_KEYBOARD: raspi_bluetooth_flow.RaspiHIDKeyboard(), |
| ids.BLUETOOTH_HID_MOUSE: raspi_bluetooth_flow.RaspiHIDMouse(), |
| ids.BLE_KEYBOARD: raspi_bluetooth_flow.RaspiHIDKeyboard(), |
| ids.BLE_MOUSE: raspi_bluetooth_flow.RaspiHIDMouse(), |
| ids.BLUETOOTH_BASE: |
| bluetooth_base_flow.BluetoothBaseFlow( |
| ids.BLUETOOTH_BASE, self._base_ctrl), |
| ids.BLUETOOTH_TESTER: bluetooth_tester.BluetoothTester(), |
| ids.BLE_PHONE: raspi_bluetooth_flow.RaspiBLEPhone(), |
| ids.BLUETOOTH_AUDIO: raspi_bluetooth_flow.RaspiBluetoothAudioFlow(), |
| } |
| return devices |
| |
| def Reset(self): |
| """Resets Chameleon board.""" |
| logging.info('Execute the reset process') |
| self._flow_manager.Reset() |
| self._device_manager.Reset() |
| |
| self._ClearAudioFiles() |
| self._ClearPrinterFiles() |
| caching_server.ClearCachedDir() |
| |
| def Reboot(self): |
| """Reboots Chameleon board.""" |
| logging.info('The chameleon board is going to reboot.') |
| system_tools.SystemTools.Call('reboot') |
| |
| def dump_bt_peer_info(self): |
| """Dump the kernel, BlueZ, chameleon bundle info""" |
| kernel_info = system_tools.SystemTools.Output('uname', '-a').strip() |
| bluez_info = system_tools.SystemTools.Output('bluetoothd', '-v').strip() |
| |
| logging.info('kernel_info: %s', kernel_info) |
| logging.info('bluez_version: %s', bluez_info) |
| logging.info('bt_pkg_commit: %s', self.get_bt_commit_hash()) |
| |
| def get_bt_commit_hash(self): |
| """Return the last commit hash in chameleon_commits.""" |
| try: |
| output = system_tools.SystemTools.Output('grep', 'BUNDLE_COMMIT_HASH', |
| '/etc/default/chameleond') |
| logging.debug('output is %s', output) |
| commit = output.split('=')[1].strip().replace('"', '') |
| logging.debug('got commit %s', commit) |
| return commit |
| except Exception as e: |
| logging.error('Exception %s in get_bt_commit_hash', str(e)) |
| return '' |
| |
| def log_message(self, msg): |
| """Write the msg to the chamelond log and system log.""" |
| try: |
| logging.info(msg) |
| system_tools.SystemTools.Call('logger', msg) |
| except Exception as e: |
| logging.error("Exception '%s' in log_message msg '%s'", str(e), msg) |
| |
| def GetDetectedStatus(self): |
| """Returns detetcted status of all devices. |
| |
| User can use this API to know the capability of the chameleon board. |
| |
| Returns: |
| A list of a tuple of detected devices' strings detected status. |
| e.g. [('HDMI', True), ('MIC', False)] |
| """ |
| detected_list = [] |
| for device_id in self._devices: |
| detected = False |
| if self._device_manager.GetChameleonDevice(device_id): |
| detected = True |
| detected_list.append((ids.DEVICE_NAMES[device_id], detected)) |
| return detected_list |
| |
| def GetSupportedPorts(self): |
| """Returns all supported ports on the board. |
| |
| Not like the ProbePorts() method which only returns the ports which |
| are connected, this method returns all supported ports on the board. |
| |
| Returns: |
| A tuple of port_id, for all supported ports on the board. |
| """ |
| return self._flow_manager.GetSupportedPorts() |
| |
| def GetSupportedInputs(self): |
| """Returns all supported input ports on the board. |
| |
| Not like the ProbeInputs() method which only returns the input ports which |
| are connected, this method returns all supported input ports on the board. |
| |
| Returns: |
| A tuple of port_id, for all supported input port on the board. |
| """ |
| return self._flow_manager.GetSupportedInputs() |
| |
| def GetSupportedOutputs(self): |
| """Returns all supported output ports on the board. |
| |
| Not like the ProbeOutputs() method which only returns the output ports which |
| are connected, this method returns all supported output ports on the board. |
| |
| Returns: |
| A tuple of port_id, for all supported output port on the board. |
| """ |
| return self._flow_manager.GetSupportedOutputs() |
| |
| def IsPhysicalPlugged(self, port_id): |
| """Returns true if the physical cable is plugged between DUT and Chameleon. |
| |
| Args: |
| port_id: The ID of the input/output port. |
| |
| Returns: |
| True if the physical cable is plugged; otherwise, False. |
| """ |
| return self._flow_manager.IsPhysicalPlugged(port_id) |
| |
| def ProbePorts(self): |
| """Probes all the connected ports on Chameleon board. |
| |
| Returns: |
| A tuple of port_id, for the ports connected to DUT. |
| """ |
| return self._flow_manager.ProbePorts() |
| |
| def ProbeInputs(self): |
| """Probes all the connected input ports on Chameleon board. |
| |
| Returns: |
| A tuple of port_id, for the input ports connected to DUT. |
| """ |
| return self._flow_manager.ProbeInputs() |
| |
| def ProbeOutputs(self): |
| """Probes all the connected output ports on Chameleon board. |
| |
| Returns: |
| A tuple of port_id, for the output ports connected to DUT. |
| """ |
| return self._flow_manager.ProbeOutputs() |
| |
| def GetConnectorType(self, port_id): |
| """Returns the human readable string for the connector type. |
| |
| Args: |
| port_id: The ID of the input/output port. |
| |
| Returns: |
| A string, like "HDMI", "DP", "MIC", etc. |
| """ |
| return self._flow_manager.GetConnectorType(port_id) |
| |
| def HasAudioSupport(self, port_id): |
| """Returns true if the port has audio support. |
| |
| Args: |
| port_id: The ID of the input/output port. |
| |
| Returns: |
| True if the input/output port has audio support; otherwise, False. |
| """ |
| return self._flow_manager.HasAudioSupport(port_id) |
| |
| def HasVideoSupport(self, port_id): |
| """Returns true if the port has video support. |
| |
| Args: |
| port_id: The ID of the input/output port. |
| |
| Returns: |
| True if the input/output port has video support; otherwise, False. |
| """ |
| return self._flow_manager.HasVideoSupport(port_id) |
| |
| def SetVgaMode(self, port_id, mode): |
| """Sets the mode for VGA monitor. |
| |
| Args: |
| port_id: The ID of the VGA port. |
| mode: A string of the mode name, e.g. 'PC_1920x1080x60'. Use 'auto' |
| to detect the VGA mode automatically. |
| """ |
| return self._flow_manager.SetVgaMode(port_id, mode) |
| |
| def WaitVideoInputStable(self, port_id, timeout=None): |
| """Waits the video input stable or timeout. |
| |
| Args: |
| port_id: The ID of the video input port. |
| timeout: The time period to wait for. |
| |
| Returns: |
| True if the video input becomes stable within the timeout period; |
| otherwise, False. |
| """ |
| return self._flow_manager.WaitVideoInputStable(port_id, timeout) |
| |
| def CreateEdid(self, edid): |
| """Creates an internal record of EDID using the given byte array. |
| |
| Args: |
| edid: A byte array of EDID data, wrapped in a xmlrpclib.Binary object. |
| |
| Returns: |
| An edid_id. |
| """ |
| return self._flow_manager.CreateEdid(edid) |
| |
| def DestroyEdid(self, edid_id): |
| """Destroys the internal record of EDID. The internal data will be freed. |
| |
| Args: |
| edid_id: The ID of the EDID, which was created by CreateEdid(). |
| """ |
| self._flow_manager.DestroyEdid(edid_id) |
| |
| def SetDdcState(self, port_id, enabled): |
| """Sets the enabled/disabled state of DDC bus on the given video input. |
| |
| Args: |
| port_id: The ID of the video input port. |
| enabled: True to enable DDC bus due to an user request; False to |
| disable it. |
| """ |
| self._flow_manager.SetDdcState(port_id, enabled) |
| |
| def IsDdcEnabled(self, port_id): |
| """Checks if the DDC bus is enabled or disabled on the given video input. |
| |
| Args: |
| port_id: The ID of the video input port. |
| |
| Returns: |
| True if the DDC bus is enabled; False if disabled. |
| """ |
| return self._flow_manager.IsDdcEnabled(port_id) |
| |
| def ReadEdid(self, port_id): |
| """Reads the EDID content of the selected video input on Chameleon. |
| |
| Args: |
| port_id: The ID of the video input port. |
| |
| Returns: |
| A byte array of EDID data, wrapped in a xmlrpclib.Binary object, |
| or None if the EDID is disabled. |
| """ |
| return self._flow_manager.ReadEdid(port_id) |
| |
| def ApplyEdid(self, port_id, edid_id): |
| """Applies the EDID to the selected video input. |
| |
| Note that this method doesn't pulse the HPD line. Should call Plug(), |
| Unplug(), or FireHpdPulse() later. |
| |
| Args: |
| port_id: The ID of the video input port. |
| edid_id: The ID of the EDID. |
| """ |
| self._flow_manager.ApplyEdid(port_id, edid_id) |
| |
| def IsPlugged(self, port_id): |
| """Returns true if the port is emulated as plugged. |
| |
| Args: |
| port_id: The ID of the input/output port. |
| |
| Returns: |
| True if the port is emualted as plugged; otherwise, False. |
| """ |
| return self._flow_manager.IsPlugged(port_id) |
| |
| def Plug(self, port_id): |
| """Emualtes plug, like asserting HPD line to high on a video port. |
| |
| Args: |
| port_id: The ID of the input/output port. |
| """ |
| return self._flow_manager.Plug(port_id) |
| |
| def Unplug(self, port_id): |
| """Emulates unplug, like deasserting HPD line to low on a video port. |
| |
| Args: |
| port_id: The ID of the input/output port. |
| """ |
| return self._flow_manager.Unplug(port_id) |
| |
| def UnplugHPD(self, port_id): |
| """Only deassert HPD line to low on a video port. |
| |
| Args: |
| port_id: The ID of the input/output port. |
| """ |
| return self._flow_manager.UnplugHPD(port_id) |
| |
| def FireHpdPulse(self, port_id, deassert_interval_usec, |
| assert_interval_usec=None, repeat_count=1, |
| end_level=1): |
| """Fires one or more HPD pulse (low -> high -> low -> ...). |
| |
| Args: |
| port_id: The ID of the video input port. |
| deassert_interval_usec: The time in microsecond of the deassert pulse. |
| assert_interval_usec: The time in microsecond of the assert pulse. |
| If None, then use the same value as |
| deassert_interval_usec. |
| repeat_count: The count of HPD pulses to fire. |
| end_level: HPD ends with 0 for LOW (unplugged) or 1 for HIGH (plugged). |
| """ |
| return self._flow_manager.FireHpdPulse( |
| port_id, deassert_interval_usec, assert_interval_usec, repeat_count, |
| end_level) |
| |
| def FireMixedHpdPulses(self, port_id, widths_msec): |
| """Fires one or more HPD pulses, starting at low, of mixed widths. |
| |
| One must specify a list of segment widths in the widths_msec argument where |
| widths_msec[0] is the width of the first low segment, widths_msec[1] is that |
| of the first high segment, widths_msec[2] is that of the second low segment, |
| etc. |
| The HPD line stops at low if even number of segment widths are specified; |
| otherwise, it stops at high. |
| |
| The method is equivalent to a series of calls to Unplug() and Plug() |
| separated by specified pulse widths. |
| |
| Args: |
| port_id: The ID of the video input port. |
| widths_msec: list of pulse segment widths in milli-second. |
| """ |
| return self._flow_manager.FireMixedHpdPulses(port_id, widths_msec) |
| |
| def ScheduleHpdToggle(self, port_id, delay_ms, rising_edge): |
| """Schedules one HPD Toggle, with a delay between the toggle. |
| |
| Args: |
| port_id: The ID of the video input port. |
| delay_ms: Delay in milli-second before the toggle takes place. |
| rising_edge: Whether the toggle should be a rising edge or a falling edge. |
| """ |
| return self._flow_manager.ScheduleHpdToggle(port_id, delay_ms, rising_edge) |
| |
| def SetContentProtection(self, port_id, enabled): |
| """Sets the content protection state on the port. |
| |
| Args: |
| port_id: The ID of the video input port. |
| enabled: True to enable; False to disable. |
| """ |
| self._flow_manager.SetContentProtection(port_id, enabled) |
| |
| def IsContentProtectionEnabled(self, port_id): |
| """Returns True if the content protection is enabled on the port. |
| |
| Args: |
| port_id: The ID of the video input port. |
| |
| Returns: |
| True if the content protection is enabled; otherwise, False. |
| """ |
| return self._flow_manager.IsContentProtectionEnabled(port_id) |
| |
| def IsVideoInputEncrypted(self, port_id): |
| """Returns True if the video input on the port is encrypted. |
| |
| Args: |
| port_id: The ID of the video input port. |
| |
| Returns: |
| True if the video input is encrypted; otherwise, False. |
| """ |
| return self._flow_manager.IsVideoInputEncrypted(port_id) |
| |
| def StartMonitoringAudioVideoCapturingDelay(self): |
| """Starts an audio/video synchronization utility |
| |
| The example of usage: |
| chameleon.StartMonitoringAudioVideoCapturingDelay() |
| chameleon.StartCapturingVideo(hdmi_input) |
| chameleon.StartCapturingAudio(hdmi_input) |
| time.sleep(2) |
| chameleon.StopCapturingVideo() |
| chameleon.StopCapturingAudio(hdmi_input) |
| delay = chameleon.GetAudioVideoCapturingDelay() |
| """ |
| self._process = system_tools.SystemTools.RunInSubprocess('avsync') |
| |
| def GetAudioVideoCapturingDelay(self): |
| """Get the time interval between the first audio/video cpatured data |
| |
| Returns: |
| A floating points indicating the time interval between the first |
| audio/video data captured. If the result is negative, then the first |
| video data is earlier, otherwise the first audio data is earlier. |
| |
| Raises: |
| DriverError if there is no output from the monitoring process. |
| """ |
| |
| if self._process.poll() is None: |
| self._process.terminate() |
| raise DriverError('The monitoring process has not finished.') |
| |
| return_code, out, err = system_tools.SystemTools.GetSubprocessOutput( |
| self._process) |
| |
| if return_code != 0 or err: |
| raise DriverError('Runtime error in the monitoring process') |
| |
| if not out: |
| raise DriverError('No output from the monitoring process.') |
| |
| return float(out) |
| |
| def DumpPixels(self, port_id, x=None, y=None, width=None, height=None): |
| """Dumps the raw pixel array of the selected area. |
| |
| If not given the area, default to capture the whole screen. |
| |
| Args: |
| port_id: The ID of the video input port. |
| x: The X position of the top-left corner. |
| y: The Y position of the top-left corner. |
| width: The width of the area. |
| height: The height of the area. |
| |
| Returns: |
| A byte-array of the pixels, wrapped in a xmlrpclib.Binary object. |
| """ |
| x, y, width, height = self._AutoFillArea(port_id, x, y, width, height) |
| self.CaptureVideo(port_id, self._DEFAULT_FRAME_LIMIT, x, y, width, height) |
| return self.ReadCapturedFrame(self._DEFAULT_FRAME_INDEX) |
| |
| def _AutoFillArea(self, port_id, x, y, width, height): |
| """Verifies the area argument correctness and fills the default values. |
| |
| It keeps x=None and y=None if all of the x, y, width, and height are None. |
| That hints FPGA to use a full-screen capture, not a cropped-sccren capture. |
| |
| Args: |
| port_id: The ID of the video input port. |
| x: The X position of the top-left corner. |
| y: The Y position of the top-left corner. |
| width: The width of the area. |
| height: The height of the area. |
| |
| Returns: |
| A tuple of (x, y, width, height) |
| |
| Raises: |
| DriverError if the area is not specified correctly. |
| """ |
| if (x, y, width, height) == (None, ) * 4: |
| return (None, None) + self.DetectResolution(port_id) |
| elif (x, y) == (None, ) * 2 or None not in (x, y, width, height): |
| return (x, y, width, height) |
| else: |
| raise DriverError('Some of area arguments are not specified.') |
| |
| def GetMaxFrameLimit(self, port_id, width, height): |
| """Gets the maximal number of frames which are accommodated in the buffer. |
| |
| It depends on the size of the internal buffer on the board and the |
| size of area to capture (full screen or cropped area). |
| |
| Args: |
| port_id: The ID of the video input port. |
| width: The width of the area to capture. |
| height: The height of the area to capture. |
| |
| Returns: |
| A number of the frame limit. |
| """ |
| return self._flow_manager.GetMaxFrameLimit(port_id, width, height) |
| |
| def _PrepareCapturingVideo(self, port_id, width, height): |
| """Prepares capturing video on the given video input. |
| |
| Args: |
| port_id: The ID of the video input port. |
| width: The width of the area of crop. |
| height: The height of the area of crop. |
| """ |
| caching_server.ClearCachedDir() |
| self._flow_manager.SelectInput(port_id) |
| if not self._flow_manager.IsPlugged(port_id): |
| raise DriverError('HPD is unplugged. No signal is expected.') |
| self._captured_params = { |
| 'port_id': port_id, |
| 'max_frame_limit': self._flow_manager.GetMaxFrameLimit(port_id, |
| width, height) |
| } |
| |
| def StartCapturingVideo(self, port_id, x=None, y=None, width=None, |
| height=None): |
| """Starts video capturing continuously on the given video input. |
| |
| This API is an asynchronous call. It returns after the video starts |
| capturing. The caller should call StopCapturingVideo to stop it. |
| |
| The example of usage: |
| chameleon.StartCapturingVideo(hdmi_input) |
| time.sleep(2) |
| chameleon.StopCapturingVideo() |
| for i in xrange(chameleon.GetCapturedFrameCount()): |
| frame = chameleon.ReadCapturedFrame(i, *area).data |
| CompareFrame(frame, golden_frames[i]) |
| |
| Args: |
| port_id: The ID of the video input port. |
| x: The X position of the top-left corner of crop. |
| y: The Y position of the top-left corner of crop. |
| width: The width of the area of crop. |
| height: The height of the area of crop. |
| """ |
| x, y, width, height = self._AutoFillArea(port_id, x, y, width, height) |
| self._PrepareCapturingVideo(port_id, width, height) |
| max_frame_limit = self._captured_params['max_frame_limit'] |
| logging.info('Start capturing video from port #%d', port_id) |
| self._flow_manager.StartDumpingFrames( |
| port_id, max_frame_limit, x, y, width, height, |
| self._MAX_CAPTURED_FRAME_COUNT) |
| |
| def StopCapturingVideo(self, stop_index=None): |
| """Stops video capturing which was started previously. |
| |
| Args: |
| stop_index: Wait for the captured frame count to reach this index. If |
| not given, stop immediately. Note that the captured frame of |
| stop_index should not be read. |
| |
| Raises: |
| DriverError if the capture period is longer than the capture limitation. |
| """ |
| port_id = self._captured_params['port_id'] |
| if stop_index: |
| if stop_index >= self._MAX_CAPTURED_FRAME_COUNT: |
| raise DriverError('Exceeded the limit of capture, stop_index >= %d' % |
| self._MAX_CAPTURED_FRAME_COUNT) |
| logging.info('Waiting the captured frame count reaches %d...', stop_index) |
| while self.GetCapturedFrameCount() < stop_index: |
| pass |
| |
| self._flow_manager.StopDumpingFrames(port_id) |
| logging.info('Stopped capturing video from port #%d', port_id) |
| if self.GetCapturedFrameCount() >= self._MAX_CAPTURED_FRAME_COUNT: |
| raise DriverError('Exceeded the limit of capture, frame_count >= %d' % |
| self._MAX_CAPTURED_FRAME_COUNT) |
| |
| def CaptureVideo(self, port_id, total_frame, x=None, y=None, width=None, |
| height=None): |
| """Captures the video stream on the given video input to the buffer. |
| |
| This API is a synchronous call. It returns after all the frames are |
| captured. The frames can be read using the ReadCapturedFrame API. |
| |
| The example of usage: |
| chameleon.CaptureVideo(hdmi_input, total_frame) |
| for i in xrange(total_frame): |
| frame = chameleon.ReadCapturedFrame(i, *area).data |
| CompareFrame(frame, golden_frames[i]) |
| |
| Args: |
| port_id: The ID of the video input port. |
| total_frame: The total number of frames to capture, should not larger |
| than value of GetMaxFrameLimit. |
| x: The X position of the top-left corner of crop. |
| y: The Y position of the top-left corner of crop. |
| width: The width of the area of crop. |
| height: The height of the area of crop. |
| """ |
| x, y, width, height = self._AutoFillArea(port_id, x, y, width, height) |
| logging.info('Capture video from port #%d', port_id) |
| self._PrepareCapturingVideo(port_id, width, height) |
| max_frame_limit = self._captured_params['max_frame_limit'] |
| if total_frame > max_frame_limit: |
| raise DriverError('Exceed the max frame limit %d > %d' % |
| (total_frame, max_frame_limit)) |
| |
| # TODO(waihong): Make the timeout value based on the FPS rate. |
| self._flow_manager.DumpFramesToLimit( |
| port_id, total_frame, x, y, width, height, |
| self._TIMEOUT_FRAME_DUMP_PROBE) |
| |
| def GetCapturedFrameCount(self): |
| """Gets the total count of the captured frames. |
| |
| Returns: |
| The number of frames captured. |
| """ |
| port_id = self._captured_params['port_id'] |
| return self._flow_manager.GetDumpedFrameCount(port_id) |
| |
| def GetCapturedResolution(self): |
| """Gets the resolution of the captured frame. |
| |
| If a cropping area is specified on capturing, returns the cropped |
| resolution. |
| |
| Returns: |
| A (width, height) tuple. |
| """ |
| port_id = self._captured_params['port_id'] |
| return self._flow_manager.GetCapturedResolution(port_id) |
| |
| def ReadCapturedFrame(self, frame_index): |
| """Reads the content of the captured frame from the buffer. |
| |
| Args: |
| frame_index: The index of the frame to read. |
| |
| Returns: |
| A byte-array of the pixels, wrapped in a xmlrpclib.Binary object. |
| """ |
| port_id = self._captured_params['port_id'] |
| total_frame = self.GetCapturedFrameCount() |
| max_frame_limit = self._captured_params['max_frame_limit'] |
| # The captured frames are store in a circular buffer. Only the latest |
| # max_frame_limit frames are valid. |
| first_valid_index = max(0, total_frame - max_frame_limit) |
| if not first_valid_index <= frame_index < total_frame: |
| raise DriverError('The frame index is out-of-range: %d not in [%d, %d)' % |
| (frame_index, first_valid_index, total_frame)) |
| |
| # Use the projected index. |
| frame_index = frame_index % max_frame_limit |
| screen = self._flow_manager.ReadCapturedFrame(port_id, frame_index) |
| return xmlrpc.client.Binary(screen) |
| |
| def CacheFrameThumbnail(self, frame_index, ratio=2): |
| """Caches the thumbnail of the dumped field to a temp file. |
| |
| Args: |
| frame_index: The index of the frame to cache. |
| ratio: The ratio to scale down the image. |
| |
| Returns: |
| An ID to identify the cached thumbnail. |
| """ |
| port_id = self._captured_params['port_id'] |
| return self._flow_manager.CacheFrameThumbnail(port_id, frame_index, ratio) |
| |
| def _GetCapturedSignals(self, signal_func_name, start_index=0, |
| stop_index=None): |
| """Gets the list of signals of the captured frames. |
| |
| Args: |
| signal_func_name: The name of the signal function, e.g. 'GetFrameHashes'. |
| start_index: The index of the start frame. Default is 0. |
| stop_index: The index of the stop frame (excluded). Default is the |
| value of GetCapturedFrameCount. |
| |
| Returns: |
| The list of signals. |
| """ |
| port_id = self._captured_params['port_id'] |
| total_frame = self.GetCapturedFrameCount() |
| if stop_index is None: |
| stop_index = total_frame |
| if not 0 <= start_index < total_frame: |
| raise DriverError('The start index is out-of-range: %d not in [0, %d)' % |
| (start_index, total_frame)) |
| if not 0 < stop_index <= total_frame: |
| raise DriverError('The stop index is out-of-range: %d not in (0, %d]' % |
| (stop_index, total_frame)) |
| signal_func = getattr(self._flows[port_id], signal_func_name) |
| return signal_func(start_index, stop_index) |
| |
| def GetCapturedChecksums(self, start_index=0, stop_index=None): |
| """Gets the list of checksums of the captured frames. |
| |
| Args: |
| start_index: The index of the start frame. Default is 0. |
| stop_index: The index of the stop frame (excluded). Default is the |
| value of GetCapturedFrameCount. |
| |
| Returns: |
| The list of checksums of frames. |
| """ |
| return self._GetCapturedSignals('GetFrameHashes', start_index, stop_index) |
| |
| def GetCapturedHistograms(self, start_index=0, stop_index=None): |
| """Gets the list of histograms of the captured frames. |
| |
| Args: |
| start_index: The index of the start frame. Default is 0. |
| stop_index: The index of the stop frame (excluded). Default is the |
| value of GetCapturedFrameCount. |
| |
| Returns: |
| The list of histograms of frames. |
| """ |
| return self._GetCapturedSignals('GetHistograms', start_index, stop_index) |
| |
| def ComputePixelChecksum( |
| self, port_id, x=None, y=None, width=None, height=None): |
| """Computes the checksum of pixels in the selected area. |
| |
| If not given the area, default to compute the whole screen. |
| |
| Args: |
| port_id: The ID of the video input port. |
| x: The X position of the top-left corner. |
| y: The Y position of the top-left corner. |
| width: The width of the area. |
| height: The height of the area. |
| |
| Returns: |
| The checksum of the pixels. |
| """ |
| x, y, width, height = self._AutoFillArea(port_id, x, y, width, height) |
| self.CaptureVideo(port_id, self._DEFAULT_FRAME_LIMIT, x, y, width, height) |
| return self.GetCapturedChecksums(self._DEFAULT_FRAME_INDEX, |
| self._DEFAULT_FRAME_INDEX + 1)[0] |
| |
| def DetectResolution(self, port_id): |
| """Detects the video source resolution. |
| |
| Args: |
| port_id: The ID of the video input port. |
| |
| Returns: |
| A (width, height) tuple. |
| """ |
| return self._flow_manager.DetectResolution(port_id) |
| |
| def GetVideoParams(self, port_id): |
| """Gets video parameters. |
| |
| Args: |
| port_id: The ID of the video input port. |
| |
| Returns: |
| A dict containing video parameters. Fields are omitted if unknown. |
| """ |
| return self._flow_manager.GetVideoParams(port_id) |
| |
| def GetLastInfoFrame(self, port_id, infoframe_type): |
| """Obtains the last received InfoFrame of the specified type. |
| |
| Args: |
| port_id: The ID of the video input port |
| infoframe_type (string): the InfoFrame type |
| |
| Returns: |
| A dict containing the InfoFrame. |
| """ |
| infoframe = self._flow_manager.GetLastInfoFrame(port_id, infoframe_type) |
| # Wrap the binary blob in an xmlrpclib object so that it can be serialized |
| # in the XML-RPC reply. |
| infoframe['payload'] = xmlrpc.client.Binary(infoframe['payload']) |
| return infoframe |
| |
| def HasAudioBoard(self): |
| """Returns True if there is an audio board. |
| |
| Returns: |
| True if there is an audio board. False otherwise. |
| """ |
| return self.audio_board is not None |
| |
| def HasDevice(self, device_id): |
| """Returns True if there is a device. |
| |
| Returns: |
| True if there is a device . False otherwise. |
| """ |
| return self._device_manager.GetChameleonDevice(device_id) is not None |
| |
| def GetAudioChannelMapping(self, port_id): |
| """Obtains the channel mapping for an audio port. |
| |
| Args: |
| port_id: The ID of the audio port. |
| |
| Returns: |
| An array of integers. There is one element per Chameleon channel. |
| For audio input ports, each element indicates which input channel the |
| capture channel is mapped to. For audio output ports, each element |
| indicates which output channel the playback channel is mapped to. As a |
| special case, -1 means the channel isn't mapped. |
| |
| Raises: |
| FlowManagerError: no audio capture in progress |
| """ |
| return self._flow_manager.GetAudioChannelMapping(port_id) |
| |
| def GetAudioFormat(self, port_id): |
| """Gets the format currently used by audio capture. |
| |
| Args: |
| port_id: The ID of the audio input port. |
| |
| Returns: |
| A dict containing the format properties. The keys are: |
| file_type: 'raw' or 'wav' |
| sample_format: 'S32_LE' for 32-bit signed integers in little-endian. See |
| aplay(1) for more formats. |
| channel: number of channels |
| rate: sampling rate in Hz (or zero if unknown) |
| |
| Raises: |
| FlowManagerError: no audio capture in progress |
| """ |
| return self._flow_manager.GetAudioFormat(port_id).AsDict() |
| |
| def TriggerLinkFailure(self, port_id): |
| """Trigger a link failure on the port. |
| |
| Args: |
| port_id: The ID of the input port. |
| """ |
| self._flow_manager.TriggerLinkFailure(port_id) |
| |
| def StartCapturingAudio(self, port_id, has_file=True): |
| """Starts capturing audio. |
| |
| Refer to the docstring of StartPlayingEcho about the restriction of |
| capturing and echoing at the same time. |
| |
| Args: |
| port_id: The ID of the audio input port. |
| has_file: True for saving audio data to file. False otherwise. |
| """ |
| self._flow_manager.StartCapturingAudio(port_id, has_file) |
| |
| def StopCapturingAudio(self, port_id): |
| """Stops capturing audio and returns recorded data path and format. |
| |
| Args: |
| port_id: The ID of the audio input port. |
| |
| Returns: |
| A tuple (path, format). |
| path: The path to the captured audio data. |
| format: The format of the captured data. See GetAudioFormat. Note that |
| the returned audio frequency may not be correct, for this reason |
| calling GetAudioFormat during the capture is preferred. |
| If we assign parameter has_file=False in StartCapturingAudio, we will get |
| both empty string in path and format. |
| """ |
| return self._flow_manager.StopCapturingAudio(port_id) |
| |
| def StartPlayingAudio(self, port_id, path, data_format): |
| """Playing audio data from an output port. |
| |
| Play audio data at given path using given format from port_id port. |
| |
| Args: |
| port_id: The ID of the output connector. |
| path: The path to the audio data to play. |
| data_format: The dict representation of AudioDataFormat. |
| Refer to docstring of utils.audio.AudioDataFormat for detail. |
| Currently Chameleon only accepts data format if it meets |
| dict(file_type='raw', sample_format='S32_LE', channel=8, rate=48000) |
| Chameleon user should do the format conversion to minimize work load |
| on Chameleon board. |
| """ |
| self._flow_manager.StartPlayingAudio(port_id, path, data_format) |
| |
| def StartPlayingEcho(self, port_id, input_id): |
| """Echoes audio data received from input_id and plays to port_id. |
| |
| Echoes audio data received from input_id and plays to port_id. |
| |
| Chameleon does not support echoing from HDMI and capturing from LineIn/Mic |
| at the same time. The echoing/capturing needs to be stop first before |
| another action starts. |
| |
| For example, user can call |
| |
| StartPlayingEcho(3, 7) --> StopPlayingAudio(3) --> StartCapturingAudio(6) |
| |
| or |
| |
| StartCapturingAudio(6) --> StopCapturingAudio(6) --> StartPlayingEcho(3, 7) |
| |
| but user can not call |
| |
| StartPlayingEcho(3, 7) --> StartCapturingAudio(6) |
| |
| or |
| |
| StartCapturingAudio(6) --> StartPlayingEcho(3, 7) |
| |
| Exception is raised when conflicting actions are performed. |
| |
| Args: |
| port_id: The ID of the output connector. Check the value in ids.py. |
| input_id: The ID of the input connector. Check the value in ids.py. |
| """ |
| self._flow_manager.StartPlayingEcho(port_id, input_id) |
| |
| def StopPlayingAudio(self, port_id): |
| """Stops playing audio from port_id port. |
| |
| Args: |
| port_id: The ID of the output connector. |
| """ |
| self._flow_manager.StopPlayingAudio(port_id) |
| |
| def _ClearPrinterFiles(self): |
| """Clears temporary printer files. |
| |
| Chameleon board does not reboot very often. We should clear the temporary |
| printer files used in capturing printer data. |
| """ |
| for path in glob.glob('/tmp/printer_*'): |
| os.unlink(path) |
| |
| def _ClearAudioFiles(self): |
| """Clears temporary audio files. |
| |
| Chameleon board does not reboot very often. We should clear the temporary |
| audio files used in capturing audio or playing audio when Reset is called. |
| """ |
| for path in glob.glob('/tmp/audio_*'): |
| os.unlink(path) |
| |
| @_DeviceMethod(ids.AUDIO_BOARD) |
| def AudioBoardConnect(self, bus_number, endpoint): |
| """Connects an endpoint to an audio bus. |
| |
| Args: |
| bus_number: 1 or 2 for audio bus 1 or bus 2. |
| endpoint: An endpoint defined in audio_board.AudioBusEndpoint. |
| |
| Raises: |
| DriverError: If the endpoint is a source and there is other source |
| endpoint occupying audio bus. |
| """ |
| if audio_board.IsSource(endpoint): |
| current_sources, _ = self.audio_board.GetConnections(bus_number) |
| if current_sources and endpoint not in current_sources: |
| raise DriverError( |
| 'Sources %s other than %s are currently occupying audio bus.' % |
| (current_sources, endpoint)) |
| |
| self.audio_board.SetConnection( |
| bus_number, endpoint, True) |
| |
| @_DeviceMethod(ids.AUDIO_BOARD) |
| def AudioBoardDisconnect(self, bus_number, endpoint): |
| """Disconnects an endpoint to an audio bus. |
| |
| Args: |
| bus_number: 1 or 2 for audio bus 1 or bus 2. |
| endpoint: An endpoint defined in audio_board.AudioBusEndpoint. |
| |
| Raises: |
| DriverError: If the endpoint is not connected to audio bus. |
| """ |
| if not self.audio_board.IsConnected(bus_number, endpoint): |
| raise DriverError( |
| 'Endpoint %s is not connected to audio bus %d.' % |
| (endpoint, bus_number)) |
| |
| self.audio_board.SetConnection( |
| bus_number, endpoint, False) |
| |
| @_DeviceMethod(ids.AUDIO_BOARD) |
| def AudioBoardGetRoutes(self, bus_number): |
| """Gets a list of routes on audio bus. |
| |
| Args: |
| bus_number: 1 or 2 for audio bus 1 or bus 2. |
| |
| Returns: |
| A list of tuples (source, sink) that are routed on audio bus |
| where source and sink are endpoints defined in |
| audio_board.AudioBusEndpoint. |
| """ |
| sources, sinks = self.audio_board.GetConnections(bus_number) |
| routes = [] |
| for source in sources: |
| for sink in sinks: |
| logging.info('Route on bus %d: %s ---> %s', |
| bus_number, source, sink) |
| routes.append((source, sink)) |
| return routes |
| |
| @_DeviceMethod(ids.AUDIO_BOARD) |
| def AudioBoardClearRoutes(self, bus_number): |
| """Clears routes on an audio bus. |
| |
| Args: |
| bus_number: 1 or 2 for audio bus 1 or bus 2. |
| """ |
| self.audio_board.ResetConnections(bus_number) |
| |
| @_DeviceMethod(ids.AUDIO_BOARD) |
| def AudioBoardHasJackPlugger(self): |
| """If there is jack plugger on audio board. |
| |
| Audio board must have the motor cable connected in order to control |
| jack plugger of audio box. |
| |
| Returns: |
| True if there is jack plugger on audio board. False otherwise. |
| """ |
| return self.audio_board.HasJackPlugger() |
| |
| @_DeviceMethod(ids.AUDIO_BOARD) |
| def AudioBoardAudioJackPlug(self): |
| """Plugs audio jack to connect audio board and Cros device.""" |
| logging.info('Plug audio jack to connect audio board and Cros device.') |
| self.audio_board.SetJackPlugger(True) |
| |
| @_DeviceMethod(ids.AUDIO_BOARD) |
| def AudioBoardAudioJackUnplug(self): |
| """Unplugs audio jack to disconnect audio board and Cros device.""" |
| logging.info('Unplug audio jack to disconnect audio board and Cros device.') |
| self.audio_board.SetJackPlugger(False) |
| |
| @_DeviceMethod(ids.AUDIO_BOARD) |
| def AudioBoardResetBluetooth(self): |
| """Resets bluetooth module on audio board.""" |
| logging.info('Resets bluetooth module on audio board.') |
| self.audio_board.ResetBluetooth() |
| |
| @_DeviceMethod(ids.AUDIO_BOARD) |
| def AudioBoardDisableBluetooth(self): |
| """Disables bluetooth module on audio board.""" |
| logging.info('Disables bluetooth module on audio board.') |
| self.audio_board.DisableBluetooth() |
| |
| @_DeviceMethod(ids.AUDIO_BOARD) |
| def AudioBoardIsBluetoothEnabled(self): |
| """Checks if bluetooth module on audio board is enabled. |
| |
| Returns: |
| True if bluetooth module is enabled. False otherwise. |
| """ |
| return self.audio_board.IsBluetoothEnabled() |
| |
| def SetUSBDriverPlaybackConfigs(self, playback_data_format): |
| """Updates the corresponding playback configurations to argument values. |
| |
| This provides flexibility for simulating the USB gadget driver using other |
| configurations different from the default values. |
| |
| Args: |
| playback_data_format: The dict form of an AudioDataFormat object. The |
| 'file_type' field will be ignored since for playback, there is no need |
| for setting file type before playing audio. It is specified by the audio |
| file passed in for playback. Other fields are used to set USB driver |
| configurations. |
| """ |
| self._flow_manager.SetUSBDriverPlaybackConfigs(playback_data_format) |
| |
| def SetUSBDriverCaptureConfigs(self, capture_data_format): |
| """Updates the corresponding capture configurations to argument values. |
| |
| This provides flexibility for simulating the USB gadget driver using other |
| configurations different from the default values. |
| |
| Args: |
| capture_data_format: The dict form of an AudioDataFormat object. The |
| 'file_type' field will be saved by InputUSBAudioFlow as the file type |
| for captured data. Other fields are used to set USB driver |
| configurations. |
| """ |
| self._flow_manager.SetUSBDriverCaptureConfigs(capture_data_format) |
| |
| def GetMacAddress(self): |
| """Gets the MAC address of this Chameleon. |
| |
| Returns: |
| A string for MAC address. |
| """ |
| return open('/sys/class/net/eth0/address').read().strip() |
| |
| def SendHIDEvent(self, port_id, event_type, *args, **kwargs): |
| """Sends HID event with event_type and arguments for HID port #port_id. |
| |
| Args: |
| port_id: The ID of the HID port. |
| event_type: Supported event type of string for HID port #port_id. |
| |
| Returns: |
| Returns as event function if applicable. |
| """ |
| return self._flow_manager.SendHIDEvent(port_id, event_type, *args, **kwargs) |
| |
| def ResetBluetoothRef(self): |
| """Reset BTREF""" |
| # Reset the RN52 and wait for it to reset |
| RN52_RESET_WAIT_SECS = 3 |
| self.bluetooth_a2dp_sink.PowerCycle() |
| time.sleep(RN52_RESET_WAIT_SECS) |
| # Reloads serial port driver if needed |
| self._bluetooth_a2dp_sink_ctrl.EnableDriver() |
| # Reads peripheral configuration settings |
| self.bluetooth_a2dp_sink.GetBasicSettings() |
| |
| def EnableBluetoothRef(self): |
| """Enable BTREF""" |
| self._bluetooth_a2dp_sink_ctrl.EnableDriver() |
| |
| def DisableBluetoothRef(self): |
| """Disable BTREF""" |
| self.bluetooth_a2dp_sink.Close() |
| |
| def IsBluetoothRefDisabled(self): |
| """Check if BTREF is enabled""" |
| raise NotImplementedError('IsBluetoothRefDisabled') |