blob: 7d16e3385b07d4abd68d82c57887a41614db08d0 [file] [log] [blame]
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Chameleond Driver for FPGA customized platform with the TIO card."""
import functools
import glob
import logging
import os
import xmlrpclib
import chameleon_common # pylint: disable=W0611
from chameleond.interface import ChameleondInterface
from chameleond.utils import audio_board
from chameleond.utils import avsync_probe_flow
from chameleond.utils import bluetooth_hid_flow
from chameleond.utils import caching_server
from chameleond.utils import codec_flow
from chameleond.utils import fpga
from chameleond.utils import i2c
from chameleond.utils import ids
from chameleond.utils import input_flow
from chameleond.utils import system_tools
from chameleond.utils import usb
from chameleond.utils import usb_audio_flow
from chameleond.utils import usb_hid_flow
class DriverError(Exception):
"""Exception raised when any error on FPGA driver."""
pass
def _AudioMethod(input_only=False, output_only=False):
"""Decorator that checks the port_id argument is an audio port.
Args:
input_only: True to check if port is an input port.
output_only: True to check if port is an output port.
"""
def _ActualDecorator(func):
@functools.wraps(func)
def wrapper(instance, port_id, *args, **kwargs):
if not ids.IsAudioPort(port_id):
raise DriverError(
'Not a valid port_id for audio operation: %d' % port_id)
if input_only and not ids.IsInputPort(port_id):
raise DriverError(
'Not a valid port_id for input operation: %d' % port_id)
elif output_only and not ids.IsOutputPort(port_id):
raise DriverError(
'Not a valid port_id for output operation: %d' % port_id)
return func(instance, port_id, *args, **kwargs)
return wrapper
return _ActualDecorator
def _VideoMethod(func):
"""Decorator that checks the port_id argument is a video port."""
@functools.wraps(func)
def wrapper(instance, port_id, *args, **kwargs):
if not ids.IsVideoPort(port_id):
raise DriverError('Not a valid port_id for video operation: %d' % port_id)
return func(instance, port_id, *args, **kwargs)
return wrapper
def _AudioBoardMethod(func):
"""Decorator that checks there is an audio board."""
@functools.wraps(func)
def wrapper(instance, *args, **kwargs):
if not instance.HasAudioBoard():
raise DriverError('There is no audio board')
return func(instance, *args, **kwargs)
return wrapper
def _USBHIDMethod(func):
"""Decorator that checks the port_id argument is a USB HID port."""
@functools.wraps(func)
def wrapper(instance, port_id, *args, **kwargs):
if not ids.IsUSBHIDPort(port_id):
raise DriverError('Not a valid port_id for HID operation: %d' % port_id)
return func(instance, port_id, *args, **kwargs)
return wrapper
class ChameleondDriver(ChameleondInterface):
"""Chameleond Driver for FPGA customized platform."""
_I2C_BUS_MAIN = 0
_I2C_BUS_AUDIO_CODEC = 1
_I2C_BUS_AUDIO_BOARD = 3
# Time to wait for video frame dump to start before a timeout error is raised
_TIMEOUT_FRAME_DUMP_PROBE = 60.0
# The frame index which is used for the regular DumpPixels API.
_DEFAULT_FRAME_INDEX = 0
_DEFAULT_FRAME_LIMIT = _DEFAULT_FRAME_INDEX + 1
# Limit the period of async capture to 3min (in 60fps).
_MAX_CAPTURED_FRAME_COUNT = 3 * 60 * 60
def __init__(self, *args, **kwargs):
super(ChameleondDriver, self).__init__(*args, **kwargs)
self._selected_input = None
self._selected_output = None
self._captured_params = {}
self._process = None
# Reserve index 0 as the default EDID.
self._all_edids = [self._ReadDefaultEdid()]
main_bus = i2c.I2cBus(self._I2C_BUS_MAIN)
audio_codec_bus = i2c.I2cBus(self._I2C_BUS_AUDIO_CODEC)
fpga_ctrl = fpga.FpgaController()
usb_audio_ctrl = usb.USBAudioController()
usb_hid_ctrl = usb.USBController('g_hid')
bluetooth_hid_ctrl = usb.USBController('ftdi_sio')
self._flows = {
ids.DP1: input_flow.DpInputFlow(ids.DP1, main_bus, fpga_ctrl),
ids.DP2: input_flow.DpInputFlow(ids.DP2, main_bus, fpga_ctrl),
ids.HDMI: input_flow.HdmiInputFlow(ids.HDMI, main_bus, fpga_ctrl),
ids.VGA: input_flow.VgaInputFlow(ids.VGA, main_bus, fpga_ctrl),
ids.MIC: codec_flow.InputCodecFlow(ids.MIC, audio_codec_bus, fpga_ctrl),
ids.LINEIN: codec_flow.InputCodecFlow(ids.LINEIN, audio_codec_bus,
fpga_ctrl),
ids.LINEOUT: codec_flow.OutputCodecFlow(
ids.LINEOUT, audio_codec_bus, fpga_ctrl),
ids.USB_AUDIO_IN: usb_audio_flow.InputUSBAudioFlow(
ids.USB_AUDIO_IN, usb_audio_ctrl),
ids.USB_AUDIO_OUT: usb_audio_flow.OutputUSBAudioFlow(
ids.USB_AUDIO_OUT, usb_audio_ctrl),
ids.USB_KEYBOARD: usb_hid_flow.KeyboardUSBHIDFlow(
ids.USB_KEYBOARD, usb_hid_ctrl),
ids.USB_TOUCH: usb_hid_flow.TouchUSBHIDFlow(
ids.USB_TOUCH, usb_hid_ctrl),
ids.BLUETOOTH_HID_MOUSE: bluetooth_hid_flow.BluetoothHIDMouseFlow(
ids.BLUETOOTH_HID_MOUSE, bluetooth_hid_ctrl),
ids.AVSYNC_PROBE: avsync_probe_flow.AVSyncProbeFlow(ids.AVSYNC_PROBE),
}
# Allow to accees the mouse methods through bluetooth_mouse member object.
# Hence, there is no need to export the mouse methods in ChameleondDriver.
self.bluetooth_mouse = self._flows[ids.BLUETOOTH_HID_MOUSE]
self.avsync_probe = self._flows[ids.AVSYNC_PROBE]
for flow in self._flows.itervalues():
if flow:
flow.Initialize()
# Some Chameleon might not have audio board installed.
self._audio_board = None
try:
audio_board_bus = i2c.I2cBus(self._I2C_BUS_AUDIO_BOARD)
self._audio_board = audio_board.AudioBoard(audio_board_bus)
except audio_board.AudioBoardException:
logging.warning('There is no audio board on this Chameleon')
else:
logging.info('There is an audio board on this Chameleon')
self.Reset()
def Reset(self):
"""Resets Chameleon board."""
logging.info('Execute the reset process')
# TODO(waihong): Add other reset routines.
logging.info('Apply the default EDID and enable DDC on all video inputs')
for port_id in self.GetSupportedInputs():
if self.HasVideoSupport(port_id):
self.ApplyEdid(port_id, ids.EDID_ID_DEFAULT)
self.SetDdcState(port_id, enabled=True)
for port_id in self.GetSupportedPorts():
if self.HasAudioSupport(port_id):
# Stops all audio capturing.
if ids.IsInputPort(port_id) and self._flows[port_id].is_capturing_audio:
self._flows[port_id].StopCapturingAudio()
self._flows[port_id].ResetRoute()
if self.HasAudioBoard():
self._audio_board.Reset()
self._ClearAudioFiles()
caching_server.ClearCachedDir()
# Set all ports unplugged on initialization.
for port_id in self.GetSupportedPorts():
self.Unplug(port_id)
def Reboot(self):
"""Reboots Chameleon board."""
logging.info('The chameleon board is going to reboot.')
system_tools.SystemTools.Call('reboot')
def GetSupportedPorts(self):
"""Returns all supported ports on the board.
Not like the ProbePorts() method which only returns the ports which
are connected, this method returns all supported ports on the board.
Returns:
A tuple of port_id, for all supported ports on the board.
"""
return self._flows.keys()
def GetSupportedInputs(self):
"""Returns all supported input ports on the board.
Not like the ProbeInputs() method which only returns the input ports which
are connected, this method returns all supported input ports on the board.
Returns:
A tuple of port_id, for all supported input port on the board.
"""
return ids.INPUT_PORTS
def GetSupportedOutputs(self):
"""Returns all supported output ports on the board.
Not like the ProbeOutputs() method which only returns the output ports which
are connected, this method returns all supported output ports on the board.
Returns:
A tuple of port_id, for all supported output port on the board.
"""
return ids.OUTPUT_PORTS
def IsPhysicalPlugged(self, port_id):
"""Returns true if the physical cable is plugged between DUT and Chameleon.
Args:
port_id: The ID of the input/output port.
Returns:
True if the physical cable is plugged; otherwise, False.
"""
return self._flows[port_id].IsPhysicalPlugged()
def ProbePorts(self):
"""Probes all the connected ports on Chameleon board.
Returns:
A tuple of port_id, for the ports connected to DUT.
"""
return tuple(port_id for port_id in self.GetSupportedPorts()
if self.IsPhysicalPlugged(port_id))
def ProbeInputs(self):
"""Probes all the connected input ports on Chameleon board.
Returns:
A tuple of port_id, for the input ports connected to DUT.
"""
return tuple(port_id for port_id in self.GetSupportedInputs()
if self.IsPhysicalPlugged(port_id))
def ProbeOutputs(self):
"""Probes all the connected output ports on Chameleon board.
Returns:
A tuple of port_id, for the output ports connected to DUT.
"""
return tuple(port_id for port_id in self.GetSupportedOutputs()
if self.IsPhysicalPlugged(port_id))
def GetConnectorType(self, port_id):
"""Returns the human readable string for the connector type.
Args:
port_id: The ID of the input/output port.
Returns:
A string, like "HDMI", "DP", "MIC", etc.
"""
return self._flows[port_id].GetConnectorType()
def HasAudioSupport(self, port_id):
"""Returns true if the port has audio support.
Args:
port_id: The ID of the input/output port.
Returns:
True if the input/output port has audio support; otherwise, False.
"""
return ids.IsAudioPort(port_id)
def HasVideoSupport(self, port_id):
"""Returns true if the port has video support.
Args:
port_id: The ID of the input/output port.
Returns:
True if the input/output port has video support; otherwise, False.
"""
return ids.IsVideoPort(port_id)
@_VideoMethod
def SetVgaMode(self, port_id, mode):
"""Sets the mode for VGA monitor.
Args:
port_id: The ID of the VGA port.
mode: A string of the mode name, e.g. 'PC_1920x1080x60'. Use 'auto'
to detect the VGA mode automatically.
"""
if port_id == ids.VGA:
logging.info('Set VGA port #%d to mode: %s', port_id, mode)
self._flows[port_id].SetVgaMode(mode)
else:
raise DriverError('SetVgaMode only works on VGA port.')
@_VideoMethod
def WaitVideoInputStable(self, port_id, timeout=None):
"""Waits the video input stable or timeout.
Args:
port_id: The ID of the video input port.
timeout: The time period to wait for.
Returns:
True if the video input becomes stable within the timeout period;
otherwise, False.
"""
self._SelectInput(port_id)
return self._flows[port_id].WaitVideoInputStable(timeout)
def _ReadDefaultEdid(self):
"""Reads the default EDID from file.
Returns:
A byte array of EDID data.
"""
driver_dir = os.path.dirname(os.path.realpath(__file__))
edid_path = os.path.join(driver_dir, '..', 'data', 'default_edid.bin')
return open(edid_path).read()
def CreateEdid(self, edid):
"""Creates an internal record of EDID using the given byte array.
Args:
edid: A byte array of EDID data, wrapped in a xmlrpclib.Binary object.
Returns:
An edid_id.
"""
if None in self._all_edids:
last = self._all_edids.index(None)
self._all_edids[last] = edid.data
else:
last = len(self._all_edids)
self._all_edids.append(edid.data)
return last
def DestroyEdid(self, edid_id):
"""Destroys the internal record of EDID. The internal data will be freed.
Args:
edid_id: The ID of the EDID, which was created by CreateEdid().
"""
if edid_id > ids.EDID_ID_DEFAULT:
self._all_edids[edid_id] = None
else:
raise DriverError('Not a valid edid_id.')
@_VideoMethod
def SetDdcState(self, port_id, enabled):
"""Sets the enabled/disabled state of DDC bus on the given video input.
Args:
port_id: The ID of the video input port.
enabled: True to enable DDC bus due to an user request; False to
disable it.
"""
logging.info('Set DDC bus on port #%d to enabled %r', port_id, enabled)
self._flows[port_id].SetDdcState(enabled)
@_VideoMethod
def IsDdcEnabled(self, port_id):
"""Checks if the DDC bus is enabled or disabled on the given video input.
Args:
port_id: The ID of the video input port.
Returns:
True if the DDC bus is enabled; False if disabled.
"""
return self._flows[port_id].IsDdcEnabled()
@_VideoMethod
def ReadEdid(self, port_id):
"""Reads the EDID content of the selected video input on Chameleon.
Args:
port_id: The ID of the video input port.
Returns:
A byte array of EDID data, wrapped in a xmlrpclib.Binary object,
or None if the EDID is disabled.
"""
if self._flows[port_id].IsEdidEnabled():
return xmlrpclib.Binary(self._flows[port_id].ReadEdid())
else:
logging.debug('Read EDID on port #%d which is disabled.', port_id)
return None
@_VideoMethod
def ApplyEdid(self, port_id, edid_id):
"""Applies the EDID to the selected video input.
Note that this method doesn't pulse the HPD line. Should call Plug(),
Unplug(), or FireHpdPulse() later.
Args:
port_id: The ID of the video input port.
edid_id: The ID of the EDID.
"""
if edid_id == ids.EDID_ID_DISABLE:
logging.info('Disable EDID on port #%d', port_id)
self._flows[port_id].SetEdidState(False)
elif edid_id >= ids.EDID_ID_DEFAULT:
logging.info('Apply EDID #%d to port #%d', edid_id, port_id)
self._flows[port_id].WriteEdid(self._all_edids[edid_id])
self._flows[port_id].SetEdidState(True)
else:
raise DriverError('Not a valid edid_id.')
def IsPlugged(self, port_id):
"""Returns true if the port is emulated as plugged.
Args:
port_id: The ID of the input/output port.
Returns:
True if the port is emualted as plugged; otherwise, False.
"""
return self._flows[port_id].IsPlugged()
def Plug(self, port_id):
"""Emualtes plug, like asserting HPD line to high on a video port.
Args:
port_id: The ID of the input/output port.
"""
logging.info('Plug port #%d', port_id)
return self._flows[port_id].Plug()
def Unplug(self, port_id):
"""Emulates unplug, like deasserting HPD line to low on a video port.
Args:
port_id: The ID of the input/output port.
"""
logging.info('Unplug port #%d', port_id)
return self._flows[port_id].Unplug()
@_VideoMethod
def FireHpdPulse(self, port_id, deassert_interval_usec,
assert_interval_usec=None, repeat_count=1,
end_level=1):
"""Fires one or more HPD pulse (low -> high -> low -> ...).
Args:
port_id: The ID of the video input port.
deassert_interval_usec: The time in microsecond of the deassert pulse.
assert_interval_usec: The time in microsecond of the assert pulse.
If None, then use the same value as
deassert_interval_usec.
repeat_count: The count of HPD pulses to fire.
end_level: HPD ends with 0 for LOW (unplugged) or 1 for HIGH (plugged).
"""
if assert_interval_usec is None:
# Fall back to use the same value as deassertion if not given.
assert_interval_usec = deassert_interval_usec
logging.info('Fire HPD pulse on port #%d, ending with %s',
port_id, 'high' if end_level else 'low')
return self._flows[port_id].FireHpdPulse(
deassert_interval_usec, assert_interval_usec, repeat_count, end_level)
@_VideoMethod
def FireMixedHpdPulses(self, port_id, widths_msec):
"""Fires one or more HPD pulses, starting at low, of mixed widths.
One must specify a list of segment widths in the widths_msec argument where
widths_msec[0] is the width of the first low segment, widths_msec[1] is that
of the first high segment, widths_msec[2] is that of the second low segment,
etc.
The HPD line stops at low if even number of segment widths are specified;
otherwise, it stops at high.
The method is equivalent to a series of calls to Unplug() and Plug()
separated by specified pulse widths.
Args:
port_id: The ID of the video input port.
widths_msec: list of pulse segment widths in milli-second.
"""
logging.info('Fire mixed HPD pulse on port #%d, ending with %s',
port_id, 'high' if len(widths_msec) % 2 else 'low')
return self._flows[port_id].FireMixedHpdPulses(widths_msec)
@_VideoMethod
def SetContentProtection(self, port_id, enabled):
"""Sets the content protection state on the port.
Args:
port_id: The ID of the video input port.
enabled: True to enable; False to disable.
"""
logging.info('Set content protection on port #%d: %r', port_id, enabled)
self._flows[port_id].SetContentProtection(enabled)
@_VideoMethod
def IsContentProtectionEnabled(self, port_id):
"""Returns True if the content protection is enabled on the port.
Args:
port_id: The ID of the video input port.
Returns:
True if the content protection is enabled; otherwise, False.
"""
return self._flows[port_id].IsContentProtectionEnabled()
@_VideoMethod
def IsVideoInputEncrypted(self, port_id):
"""Returns True if the video input on the port is encrypted.
Args:
port_id: The ID of the video input port.
Returns:
True if the video input is encrypted; otherwise, False.
"""
return self._flows[port_id].IsVideoInputEncrypted()
def _SelectInput(self, port_id):
"""Selects the input on Chameleon.
Args:
port_id: The ID of the input port.
"""
if port_id != self._selected_input:
self._flows[port_id].Select()
self._selected_input = port_id
self._flows[port_id].DoFSM()
def _SelectOutput(self, port_id):
"""Selects the output on Chameleon.
Args:
port_id: The ID of the output port.
"""
if port_id != self._selected_output:
self._flows[port_id].Select()
self._selected_output = port_id
self._flows[port_id].DoFSM()
def StartMonitoringAudioVideoCapturingDelay(self):
"""Starts an audio/video synchronization utility
The example of usage:
chameleon.StartMonitoringAudioVideoCapturingDelay()
chameleon.StartCapturingVideo(hdmi_input)
chameleon.StartCapturingAudio(hdmi_input)
time.sleep(2)
chameleon.StopCapturingVideo()
chameleon.StopCapturingAudio(hdmi_input)
delay = chameleon.GetAudioVideoCapturingDelay()
"""
self._process = system_tools.SystemTools.RunInSubprocess('avsync')
def GetAudioVideoCapturingDelay(self):
"""Get the time interval between the first audio/video cpatured data
Returns:
A floating points indicating the time interval between the first
audio/video data captured. If the result is negative, then the first
video data is earlier, otherwise the first audio data is earlier.
Raises:
DriverError if there is no output from the monitoring process.
"""
if self._process.poll() == None:
self._process.terminate()
raise DriverError('The monitoring process has not finished.')
return_code, out, err = system_tools.SystemTools.GetSubprocessOutput(
self._process)
if return_code != 0 or err:
raise DriverError('Runtime error in the monitoring process')
if not out:
raise DriverError('No output from the monitoring process.')
return float(out)
@_VideoMethod
def DumpPixels(self, port_id, x=None, y=None, width=None, height=None):
"""Dumps the raw pixel array of the selected area.
If not given the area, default to capture the whole screen.
Args:
port_id: The ID of the video input port.
x: The X position of the top-left corner.
y: The Y position of the top-left corner.
width: The width of the area.
height: The height of the area.
Returns:
A byte-array of the pixels, wrapped in a xmlrpclib.Binary object.
"""
x, y, width, height = self._AutoFillArea(port_id, x, y, width, height)
self.CaptureVideo(port_id, self._DEFAULT_FRAME_LIMIT, x, y, width, height)
return self.ReadCapturedFrame(self._DEFAULT_FRAME_INDEX)
def _AutoFillArea(self, port_id, x, y, width, height):
"""Verifies the area argument correctness and fills the default values.
It keeps x=None and y=None if all of the x, y, width, and height are None.
That hints FPGA to use a full-screen capture, not a cropped-sccren capture.
Args:
port_id: The ID of the video input port.
x: The X position of the top-left corner.
y: The Y position of the top-left corner.
width: The width of the area.
height: The height of the area.
Returns:
A tuple of (x, y, width, height)
Raises:
DriverError if the area is not specified correctly.
"""
if (x, y, width, height) == (None, ) * 4:
return (None, None) + self.DetectResolution(port_id)
elif (x, y) == (None, ) * 2 or None not in (x, y, width, height):
return (x, y, width, height)
else:
raise DriverError('Some of area arguments are not specified.')
@_VideoMethod
def GetMaxFrameLimit(self, port_id, width, height):
"""Gets the maximal number of frames which are accommodated in the buffer.
It depends on the size of the internal buffer on the board and the
size of area to capture (full screen or cropped area).
Args:
port_id: The ID of the video input port.
width: The width of the area to capture.
height: The height of the area to capture.
Returns:
A number of the frame limit.
"""
# This result is related to the video flow status, e.g.
# single/dual pixel mode, progressive/interlaced mode.
# Need to select the input flow first.
self._SelectInput(port_id)
return self._flows[port_id].GetMaxFrameLimit(width, height)
def _PrepareCapturingVideo(self, port_id, x, y, width, height):
"""Prepares capturing video on the given video input.
Args:
port_id: The ID of the video input port.
x: The X position of the top-left corner of crop.
y: The Y position of the top-left corner of crop.
width: The width of the area of crop.
height: The height of the area of crop.
"""
caching_server.ClearCachedDir()
self._SelectInput(port_id)
if not self.IsPlugged(port_id):
raise DriverError('HPD is unplugged. No signal is expected.')
self._captured_params = {
'port_id': port_id,
'max_frame_limit': self._flows[port_id].GetMaxFrameLimit(width, height)
}
@_VideoMethod
def StartCapturingVideo(self, port_id, x=None, y=None, width=None,
height=None):
"""Starts video capturing continuously on the given video input.
This API is an asynchronous call. It returns after the video starts
capturing. The caller should call StopCapturingVideo to stop it.
The example of usage:
chameleon.StartCapturingVideo(hdmi_input)
time.sleep(2)
chameleon.StopCapturingVideo()
for i in xrange(chameleon.GetCapturedFrameCount()):
frame = chameleon.ReadCapturedFrame(i, *area).data
CompareFrame(frame, golden_frames[i])
Args:
port_id: The ID of the video input port.
x: The X position of the top-left corner of crop.
y: The Y position of the top-left corner of crop.
width: The width of the area of crop.
height: The height of the area of crop.
"""
x, y, width, height = self._AutoFillArea(port_id, x, y, width, height)
self._PrepareCapturingVideo(port_id, x, y, width, height)
max_frame_limit = self._captured_params['max_frame_limit']
logging.info('Start capturing video from port #%d', port_id)
self._flows[port_id].StartDumpingFrames(
max_frame_limit, x, y, width, height, self._MAX_CAPTURED_FRAME_COUNT)
def StopCapturingVideo(self, stop_index=None):
"""Stops video capturing which was started previously.
Args:
stop_index: Wait for the captured frame count to reach this index. If
not given, stop immediately. Note that the captured frame of
stop_index should not be read.
Raises:
DriverError if the capture period is longer than the capture limitation.
"""
port_id = self._captured_params['port_id']
if stop_index:
if stop_index >= self._MAX_CAPTURED_FRAME_COUNT:
raise DriverError('Exceeded the limit of capture, stop_index >= %d' %
self._MAX_CAPTURED_FRAME_COUNT)
logging.info('Waiting the captured frame count reaches %d...', stop_index)
while self.GetCapturedFrameCount() < stop_index:
pass
self._flows[port_id].StopDumpingFrames()
logging.info('Stopped capturing video from port #%d', port_id)
if self.GetCapturedFrameCount() >= self._MAX_CAPTURED_FRAME_COUNT:
raise DriverError('Exceeded the limit of capture, frame_count >= %d' %
self._MAX_CAPTURED_FRAME_COUNT)
@_VideoMethod
def CaptureVideo(self, port_id, total_frame, x=None, y=None, width=None,
height=None):
"""Captures the video stream on the given video input to the buffer.
This API is a synchronous call. It returns after all the frames are
captured. The frames can be read using the ReadCapturedFrame API.
The example of usage:
chameleon.CaptureVideo(hdmi_input, total_frame)
for i in xrange(total_frame):
frame = chameleon.ReadCapturedFrame(i, *area).data
CompareFrame(frame, golden_frames[i])
Args:
port_id: The ID of the video input port.
total_frame: The total number of frames to capture, should not larger
than value of GetMaxFrameLimit.
x: The X position of the top-left corner of crop.
y: The Y position of the top-left corner of crop.
width: The width of the area of crop.
height: The height of the area of crop.
"""
x, y, width, height = self._AutoFillArea(port_id, x, y, width, height)
logging.info('Capture video from port #%d', port_id)
self._PrepareCapturingVideo(port_id, x, y, width, height)
max_frame_limit = self._captured_params['max_frame_limit']
if total_frame > max_frame_limit:
raise DriverError('Exceed the max frame limit %d > %d',
total_frame, max_frame_limit)
# TODO(waihong): Make the timeout value based on the FPS rate.
self._flows[port_id].DumpFramesToLimit(
total_frame, x, y, width, height, self._TIMEOUT_FRAME_DUMP_PROBE)
def GetCapturedFrameCount(self):
"""Gets the total count of the captured frames.
Returns:
The number of frames captured.
"""
port_id = self._captured_params['port_id']
return self._flows[port_id].GetDumpedFrameCount()
def GetCapturedResolution(self):
"""Gets the resolution of the captured frame.
If a cropping area is specified on capturing, returns the cropped
resolution.
Returns:
A (width, height) tuple.
"""
port_id = self._captured_params['port_id']
return self._flows[port_id].GetCapturedResolution()
def ReadCapturedFrame(self, frame_index):
"""Reads the content of the captured frame from the buffer.
Args:
frame_index: The index of the frame to read.
Returns:
A byte-array of the pixels, wrapped in a xmlrpclib.Binary object.
"""
port_id = self._captured_params['port_id']
total_frame = self.GetCapturedFrameCount()
max_frame_limit = self._captured_params['max_frame_limit']
# The captured frames are store in a circular buffer. Only the latest
# max_frame_limit frames are valid.
first_valid_index = max(0, total_frame - max_frame_limit)
if not first_valid_index <= frame_index < total_frame:
raise DriverError('The frame index is out-of-range: %d not in [%d, %d)' %
(frame_index, first_valid_index, total_frame))
# Use the projected index.
frame_index = frame_index % max_frame_limit
screen = self._flows[port_id].ReadCapturedFrame(frame_index)
return xmlrpclib.Binary(screen)
def CacheFrameThumbnail(self, frame_index, ratio=2):
"""Caches the thumbnail of the dumped field to a temp file.
Args:
frame_index: The index of the frame to cache.
ratio: The ratio to scale down the image.
Returns:
An ID to identify the cached thumbnail.
"""
port_id = self._captured_params['port_id']
return self._flows[port_id].CacheFrameThumbnail(frame_index, ratio)
def _GetCapturedSignals(self, signal_func_name, start_index=0,
stop_index=None):
"""Gets the list of signals of the captured frames.
Args:
signal_func_name: The name of the signal function, e.g. 'GetFrameHashes'.
start_index: The index of the start frame. Default is 0.
stop_index: The index of the stop frame (excluded). Default is the
value of GetCapturedFrameCount.
Returns:
The list of signals.
"""
port_id = self._captured_params['port_id']
total_frame = self.GetCapturedFrameCount()
if stop_index is None:
stop_index = total_frame
if not 0 <= start_index < total_frame:
raise DriverError('The start index is out-of-range: %d not in [0, %d)' %
(start_index, total_frame))
if not 0 < stop_index <= total_frame:
raise DriverError('The stop index is out-of-range: %d not in (0, %d]' %
(stop_index, total_frame))
signal_func = getattr(self._flows[port_id], signal_func_name)
return signal_func(start_index, stop_index)
def GetCapturedChecksums(self, start_index=0, stop_index=None):
"""Gets the list of checksums of the captured frames.
Args:
start_index: The index of the start frame. Default is 0.
stop_index: The index of the stop frame (excluded). Default is the
value of GetCapturedFrameCount.
Returns:
The list of checksums of frames.
"""
return self._GetCapturedSignals('GetFrameHashes', start_index, stop_index)
def GetCapturedHistograms(self, start_index=0, stop_index=None):
"""Gets the list of histograms of the captured frames.
Args:
start_index: The index of the start frame. Default is 0.
stop_index: The index of the stop frame (excluded). Default is the
value of GetCapturedFrameCount.
Returns:
The list of histograms of frames.
"""
return self._GetCapturedSignals('GetHistograms', start_index, stop_index)
@_VideoMethod
def ComputePixelChecksum(
self, port_id, x=None, y=None, width=None, height=None):
"""Computes the checksum of pixels in the selected area.
If not given the area, default to compute the whole screen.
Args:
port_id: The ID of the video input port.
x: The X position of the top-left corner.
y: The Y position of the top-left corner.
width: The width of the area.
height: The height of the area.
Returns:
The checksum of the pixels.
"""
x, y, width, height = self._AutoFillArea(port_id, x, y, width, height)
self.CaptureVideo(port_id, self._DEFAULT_FRAME_LIMIT, x, y, width, height)
return self.GetCapturedChecksums(self._DEFAULT_FRAME_INDEX,
self._DEFAULT_FRAME_INDEX + 1)[0]
@_VideoMethod
def DetectResolution(self, port_id):
"""Detects the video source resolution.
Args:
port_id: The ID of the video input port.
Returns:
A (width, height) tuple.
"""
self._SelectInput(port_id)
resolution = self._flows[port_id].GetResolution()
logging.info('Detected resolution on port #%d: %dx%d', port_id, *resolution)
return resolution
def HasAudioBoard(self):
"""Returns True if there is an audio board.
Returns:
True if there is an audio board. False otherwise.
"""
return self._audio_board is not None
@_AudioMethod(input_only=True)
def StartCapturingAudio(self, port_id, has_file=True):
"""Starts capturing audio.
Refer to the docstring of StartPlayingEcho about the restriction of
capturing and echoing at the same time.
Args:
port_id: The ID of the audio input port.
has_file: True for saving audio data to file. False otherwise.
"""
self._SelectInput(port_id)
logging.info('Start capturing audio from port #%d', port_id)
self._flows[port_id].StartCapturingAudio(has_file)
@_AudioMethod(input_only=True)
def StopCapturingAudio(self, port_id):
"""Stops capturing audio and returns recorded data path and format.
Args:
port_id: The ID of the audio input port.
Returns:
A tuple (path, format).
path: The path to the captured audio data.
format: The dict representation of AudioDataFormat. Refer to docstring
of utils.audio.AudioDataFormat for detail.
Currently, the data format supported is
dict(file_type='raw', sample_format='S32_LE', channel=8, rate=48000)
If we assign parameter has_file=False in StartCapturingAudio, we will get
both empty string in path and format.
Raises:
DriverError: Input is selected to port other than port_id.
This happens if user has used API related to input operation on
other port. The API includes CaptureVideo, StartCapturingVideo,
DetectResolution, StartCapturingAudio, StartPlayingEcho.
"""
if self._selected_input != port_id:
raise DriverError(
'The input is selected to %r not %r', self._selected_input, port_id)
path, data_format = self._flows[port_id].StopCapturingAudio()
logging.info('Stopped capturing audio from port #%d', port_id)
# If there is no path, set it to empty string. Because XMLRPC doesn't
# support None as return value.
if path is None and data_format is None:
return '', ''
return path, data_format
@_AudioMethod(output_only=True)
def StartPlayingAudio(self, port_id, path, data_format):
"""Playing audio data from an output port.
Play audio data at given path using given format from port_id port.
Args:
port_id: The ID of the output connector.
path: The path to the audio data to play.
data_format: The dict representation of AudioDataFormat.
Refer to docstring of utils.audio.AudioDataFormat for detail.
Currently Chameleon only accepts data format if it meets
dict(file_type='raw', sample_format='S32_LE', channel=8, rate=48000)
Chameleon user should do the format conversion to minimize work load
on Chameleon board.
Raises:
DriverError: There is no file at the path.
"""
if not os.path.exists(path):
raise DriverError('File path %r does not exist' % path)
self._SelectOutput(port_id)
logging.info('Start playing audio from port #%d', port_id)
self._flows[port_id].StartPlayingAudio(path, data_format)
@_AudioMethod(output_only=True)
def StartPlayingEcho(self, port_id, input_id):
"""Echoes audio data received from input_id and plays to port_id.
Echoes audio data received from input_id and plays to port_id.
Chameleon does not support echoing from HDMI and capturing from LineIn/Mic
at the same time. The echoing/capturing needs to be stop first before
another action starts.
For example, user can call
StartPlayingEcho(3, 7) --> StopPlayingAudio(3) --> StartCapturingAudio(6)
or
StartCapturingAudio(6) --> StopCapturingAudio(6) --> StartPlayingEcho(3, 7)
but user can not call
StartPlayingEcho(3, 7) --> StartCapturingAudio(6)
or
StartCapturingAudio(6) --> StartPlayingEcho(3, 7)
Exception is raised when conflicting actions are performed.
Args:
port_id: The ID of the output connector. Check the value in ids.py.
input_id: The ID of the input connector. Check the value in ids.py.
Raises:
DriverError: input_id is not valid for audio operation.
"""
if not ids.IsAudioPort(input_id) or not ids.IsInputPort(input_id):
raise DriverError(
'Not a valid input_id for audio operation: %d' % input_id)
self._SelectInput(input_id)
self._SelectOutput(port_id)
logging.info('Start playing echo from port #%d using source from port#%d',
port_id, input_id)
self._flows[port_id].StartPlayingEcho(input_id)
@_AudioMethod(output_only=True)
def StopPlayingAudio(self, port_id):
"""Stops playing audio from port_id port.
Args:
port_id: The ID of the output connector.
Raises:
DriverError: Output is selected to port other than port_id.
This happens if user has used API related to output operation on other
port. The API includes StartPlayingAudio, StartPlayingEcho.
"""
if self._selected_output != port_id:
raise DriverError(
'The output is selected to %r not %r', self._selected_output, port_id)
logging.info('Stop playing audio from port #%d', port_id)
self._flows[port_id].StopPlayingAudio()
def _ClearAudioFiles(self):
"""Clears temporary audio files.
Chameleon board does not reboot very often. We should clear the temporary
audio files used in capturing audio or playing audio when Reset is called.
"""
for path in glob.glob('/tmp/audio_*'):
os.unlink(path)
@_AudioBoardMethod
def AudioBoardConnect(self, bus_number, endpoint):
"""Connects an endpoint to an audio bus.
Args:
bus_number: 1 or 2 for audio bus 1 or bus 2.
endpoint: An endpoint defined in audio_board.AudioBusEndpoint.
Raises:
DriverError: If the endpoint is a source and there is other source
endpoint occupying audio bus.
"""
if audio_board.IsSource(endpoint):
current_sources, _ = self._audio_board.GetConnections(bus_number)
if current_sources and endpoint not in current_sources:
raise DriverError(
'Sources %s other than %s are currently occupying audio bus.' %
(current_sources, endpoint))
self._audio_board.SetConnection(
bus_number, endpoint, True)
@_AudioBoardMethod
def AudioBoardDisconnect(self, bus_number, endpoint):
"""Disconnects an endpoint to an audio bus.
Args:
bus_number: 1 or 2 for audio bus 1 or bus 2.
endpoint: An endpoint defined in audio_board.AudioBusEndpoint.
Raises:
DriverError: If the endpoint is not connected to audio bus.
"""
if not self._audio_board.IsConnected(bus_number, endpoint):
raise DriverError(
'Endpoint %s is not connected to audio bus %d.' %
(endpoint, bus_number))
self._audio_board.SetConnection(
bus_number, endpoint, False)
@_AudioBoardMethod
def AudioBoardGetRoutes(self, bus_number):
"""Gets a list of routes on audio bus.
Args:
bus_number: 1 or 2 for audio bus 1 or bus 2.
Returns:
A list of tuples (source, sink) that are routed on audio bus
where source and sink are endpoints defined in
audio_board.AudioBusEndpoint.
"""
sources, sinks = self._audio_board.GetConnections(bus_number)
routes = []
for source in sources:
for sink in sinks:
logging.info('Route on bus %d: %s ---> %s',
bus_number, source, sink)
routes.append((source, sink))
return routes
@_AudioBoardMethod
def AudioBoardClearRoutes(self, bus_number):
"""Clears routes on an audio bus.
Args:
bus_number: 1 or 2 for audio bus 1 or bus 2.
"""
self._audio_board.ResetConnections(bus_number)
@_AudioBoardMethod
def AudioBoardHasJackPlugger(self):
"""If there is jack plugger on audio board.
Audio board must have the motor cable connected in order to control
jack plugger of audio box.
Returns:
True if there is jack plugger on audio board. False otherwise.
"""
return self._audio_board.HasJackPlugger()
@_AudioBoardMethod
def AudioBoardAudioJackPlug(self):
"""Plugs audio jack to connect audio board and Cros device."""
logging.info('Plug audio jack to connect audio board and Cros device.')
self._audio_board.SetJackPlugger(True)
@_AudioBoardMethod
def AudioBoardAudioJackUnplug(self):
"""Unplugs audio jack to disconnect audio board and Cros device."""
logging.info('Unplug audio jack to disconnect audio board and Cros device.')
self._audio_board.SetJackPlugger(False)
@_AudioBoardMethod
def AudioBoardResetBluetooth(self):
"""Resets bluetooth module on audio board."""
logging.info('Resets bluetooth module on audio board.')
self._audio_board.ResetBluetooth()
@_AudioBoardMethod
def AudioBoardDisableBluetooth(self):
"""Disables bluetooth module on audio board."""
logging.info('Disables bluetooth module on audio board.')
self._audio_board.DisableBluetooth()
@_AudioBoardMethod
def AudioBoardIsBluetoothEnabled(self):
"""Checks if bluetooth module on audio board is enabled.
Returns:
True if bluetooth module is enabled. False otherwise.
"""
return self._audio_board.IsBluetoothEnabled()
def SetUSBDriverPlaybackConfigs(self, playback_data_format):
"""Updates the corresponding playback configurations to argument values.
This provides flexibility for simulating the USB gadget driver using other
configurations different from the default values.
Args:
playback_data_format: The dict form of an AudioDataFormat object. The
'file_type' field will be ignored since for playback, there is no need
for setting file type before playing audio. It is specified by the audio
file passed in for playback. Other fields are used to set USB driver
configurations.
Raises:
DriverError if any of the USB Flows is playing or capturing audio.
"""
if (self._flows[ids.USB_AUDIO_IN].is_capturing_audio or
self._flows[ids.USB_AUDIO_OUT].is_playing_audio):
error_message = ('Configuration changes not allowed when USB audio '
'driver is still performing playback/capture in one of '
'the flows.')
raise DriverError(error_message)
self._flows[ids.USB_AUDIO_OUT].SetDriverPlaybackConfigs(
playback_data_format)
def SetUSBDriverCaptureConfigs(self, capture_data_format):
"""Updates the corresponding capture configurations to argument values.
This provides flexibility for simulating the USB gadget driver using other
configurations different from the default values.
Args:
capture_data_format: The dict form of an AudioDataFormat object. The
'file_type' field will be saved by InputUSBAudioFlow as the file type
for captured data. Other fields are used to set USB driver
configurations.
Raises:
DriverError if any of the USB audio Flows is playing or capturing audio.
"""
if (self._flows[ids.USB_AUDIO_IN].is_capturing_audio or
self._flows[ids.USB_AUDIO_OUT].is_playing_audio):
error_message = ('Configuration changes not allowed when USB audio '
'driver is still performing playback/capture in one of '
'the flows.')
raise DriverError(error_message)
self._flows[ids.USB_AUDIO_IN].SetDriverCaptureConfigs(capture_data_format)
def GetMacAddress(self):
"""Gets the MAC address of this Chameleon.
Returns:
A string for MAC address.
"""
return open('/sys/class/net/eth0/address').read().strip()
@_USBHIDMethod
def SendHIDEvent(self, port_id, event_type, *args, **kwargs):
"""Sends HID event with event_type and arguments for HID port #port_id.
Args:
port_id: The ID of the HID port.
event_type: Supported event type of string for HID port #port_id.
Returns:
Returns as event function if applicable.
"""
return self._flows[port_id].Send(event_type, *args, **kwargs)