blob: e8b775d3dbe55a5c4ddd365e96669af102d12dcc [file] [log] [blame]
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Input flow module which abstracts the entire flow for a specific input."""
import logging
import time
from abc import ABCMeta
import chameleon_common # pylint: disable=W0611
from chameleond.utils import audio_utils
from chameleond.utils import common
from chameleond.utils import edid
from chameleond.utils import fpga
from chameleond.utils import frame_manager
from chameleond.utils import ids
from chameleond.utils import io
from chameleond.utils import rx
class InputFlowError(Exception):
"""Exception raised when any error on InputFlow."""
pass
class InputFlow(object):
"""An abstraction of the entire flow for a specific input.
It provides the basic interfaces of Chameleond driver for a specific input.
Using this abstraction, each flow can have its own behavior. No need to
share the same Chameleond driver code.
"""
__metaclass__ = ABCMeta
_CONNECTOR_TYPE = 'Unknown' # A subclass should override it.
_RX_SLAVES = {
ids.DP1: rx.DpRx.SLAVE_ADDRESSES[0],
ids.DP2: rx.DpRx.SLAVE_ADDRESSES[1],
ids.HDMI: rx.HdmiRx.SLAVE_ADDRESSES[0],
ids.VGA: rx.VgaRx.SLAVE_ADDRESSES[0]
}
_MUX_CONFIGS = {
# Use a dual-pixel-mode setting for IO as no support for two flows
# simultaneously so far.
ids.DP1: io.MuxIo.CONFIG_DP1_DUAL,
ids.DP2: io.MuxIo.CONFIG_DP2_DUAL,
ids.HDMI: io.MuxIo.CONFIG_HDMI_DUAL,
ids.VGA: io.MuxIo.CONFIG_VGA
}
def __init__(self, input_id, main_i2c_bus, fpga_ctrl):
"""Constructs a InputFlow object.
Args:
input_id: The ID of the input connector. Check the value in ids.py.
main_i2c_bus: The main I2cBus object.
fpga_ctrl: The FpgaController object.
"""
self._input_id = input_id
self._main_bus = main_i2c_bus
self._fpga = fpga_ctrl
self._power_io = self._main_bus.GetSlave(io.PowerIo.SLAVE_ADDRESSES[0])
self._mux_io = self._main_bus.GetSlave(io.MuxIo.SLAVE_ADDRESSES[0])
self._rx = self._main_bus.GetSlave(self._RX_SLAVES[self._input_id])
self._frame_manager = frame_manager.FrameManager(
input_id, self._GetEffectiveVideoDumpers())
def _GetEffectiveVideoDumpers(self):
"""Gets effective video dumpers on the flow."""
if self.IsDualPixelMode():
if fpga.VideoDumper.EVEN_PIXELS_FLOW_INDEXES[self._input_id] == 0:
return [self._fpga.vdump0, self._fpga.vdump1]
else:
return [self._fpga.vdump1, self._fpga.vdump0]
elif fpga.VideoDumper.PRIMARY_FLOW_INDEXES[self._input_id] == 0:
return [self._fpga.vdump0]
else:
return [self._fpga.vdump1]
def Initialize(self):
"""Initializes the input flow."""
logging.info('Initialize InputFlow #%d.', self._input_id)
self._power_io.ResetReceiver(self._input_id)
self._rx.Initialize(self.IsDualPixelMode())
def Select(self):
"""Selects the input flow to set the proper muxes and FPGA paths."""
logging.info('Select InputFlow #%d.', self._input_id)
self._mux_io.SetConfig(self._MUX_CONFIGS[self._input_id])
self._fpga.vpass.Select(self._input_id)
self._fpga.vdump0.Select(self._input_id, self.IsDualPixelMode())
self._fpga.vdump1.Select(self._input_id, self.IsDualPixelMode())
self.WaitVideoOutputStable()
def GetPixelDumpArgs(self):
"""Gets the arguments of pixeldump tool which selects the proper buffers."""
return fpga.VideoDumper.GetPixelDumpArgs(self._input_id,
self.IsDualPixelMode())
@classmethod
def GetConnectorType(cls):
"""Returns the human readable string for the connector type."""
return cls._CONNECTOR_TYPE
def GetResolution(self):
"""Gets the resolution of the video flow."""
self.WaitVideoOutputStable()
width, height = self._frame_manager.ComputeResolution()
if width == 0 or height == 0:
raise InputFlowError('Something wrong with the resolution: %dx%d' %
(width, height))
return (width, height)
def GetMaxFrameLimit(self, width, height):
"""Returns of the maximal number of frames which can be dumped."""
if self.IsDualPixelMode():
width = width / 2
return fpga.VideoDumper.GetMaxFrameLimit(width, height)
def GetFrameHashes(self, start, stop):
"""Returns the list of the frame hashes.
Args:
start: The index of the start frame.
stop: The index of the stop frame (excluded).
Returns:
A list of frame hashes.
"""
return self._frame_manager.GetFrameHashes(start, stop)
def DumpFramesToLimit(self, frame_limit, x, y, width, height, timeout):
"""Dumps frames and waits for the given limit being reached or timeout.
Args:
frame_limit: The limitation of frame to dump.
x: The X position of the top-left corner of crop; None for a full-screen.
y: The Y position of the top-left corner of crop; None for a full-screen.
width: The width of the area of crop.
height: The height of the area of crop.
timeout: Time in second of timeout.
Raises:
common.TimeoutError on timeout.
"""
self.WaitVideoOutputStable()
self._frame_manager.DumpFramesToLimit(frame_limit, x, y, width, height,
timeout)
def StartDumpingFrames(self, frame_buffer_limit, x, y, width, height,
hash_buffer_limit):
"""Starts dumping frames continuously.
Args:
frame_buffer_limit: The size of the buffer which stores the frame.
Frames will be dumped to the beginning when full.
x: The X position of the top-left corner of crop; None for a full-screen.
y: The Y position of the top-left corner of crop; None for a full-screen.
width: The width of the area of crop.
height: The height of the area of crop.
hash_buffer_limit: The maximum number of hashes to monitor. Stop
capturing when this limitation is reached.
"""
self.WaitVideoOutputStable()
self._frame_manager.StartDumpingFrames(
frame_buffer_limit, x, y, width, height, hash_buffer_limit)
def StopDumpingFrames(self):
"""Stops dumping frames."""
self._frame_manager.StopDumpingFrames()
def GetDumpedFrameCount(self):
"""Gets the number of frames which is dumped."""
return self._frame_manager.GetFrameCount()
def Do_FSM(self):
"""Does the Finite-State-Machine to ensure the input flow ready.
The receiver requires to do the FSM in order to clear its state, in case
of some events happended, like mode change, power reattach, etc.
It should be called before doing any post-receiver-action, like capturing
frames.
"""
pass
def WaitVideoInputStable(self, unused_timeout=None):
"""Waits the video input stable or timeout. Returns success or not."""
return True
def WaitVideoOutputStable(self, unused_timeout=None):
"""Waits the video output stable or timeout. Returns success or not."""
return True
def IsDualPixelMode(self):
"""Returns if the input flow uses dual pixel mode."""
raise NotImplementedError('IsDualPixelMode')
def IsPhysicalPlugged(self):
"""Returns if the physical cable is plugged."""
raise NotImplementedError('IsPhysicalPlugged')
def IsPlugged(self):
"""Returns if the HPD line is plugged."""
raise NotImplementedError('IsPlugged')
def Plug(self):
"""Asserts HPD line to high, emulating plug."""
raise NotImplementedError('Plug')
def Unplug(self):
"""Deasserts HPD line to low, emulating unplug."""
raise NotImplementedError('Unplug')
def FireHpdPulse(self, deassert_interval_usec, assert_interval_usec,
repeat_count, end_level):
"""Fires one or more HPD pulse (low -> high -> low -> ...).
Args:
deassert_interval_usec: The time in microsecond of the deassert pulse.
assert_interval_usec: The time in microsecond of the assert pulse.
If None, then use the same value as
deassert_interval_usec.
repeat_count: The count of HPD pulses to fire.
end_level: HPD ends with 0 for LOW (unplugged) or 1 for HIGH (plugged).
"""
raise NotImplementedError('FireHpdPulse')
def FireMixedHpdPulses(self, widths):
"""Fires one or more HPD pulses, starting at low, of mixed widths.
One must specify a list of segment widths in the widths argument where
widths[0] is the width of the first low segment, widths[1] is that of the
first high segment, widths[2] is that of the second low segment, ... etc.
The HPD line stops at low if even number of segment widths are specified;
otherwise, it stops at high.
Args:
widths: list of pulse segment widths in usec.
"""
raise NotImplementedError('FireMixedHpdPulses')
def ReadEdid(self):
"""Reads the EDID content."""
raise NotImplementedError('ReadEdid')
def WriteEdid(self, data):
"""Writes the EDID content."""
raise NotImplementedError('WriteEdid')
class InputFlowWithAudio(InputFlow): # pylint: disable=W0223
"""An abstraction of an input flow which supports audio."""
def __init__(self, input_id, main_i2c_bus, fpga_ctrl):
"""Constructs a InputFlowWithAudio object.
Args:
input_id: The ID of the input connector. Check the value in ids.py.
main_i2c_bus: The main I2cBus object.
fpga_ctrl: The FpgaController object.
"""
super(InputFlowWithAudio, self).__init__(input_id, main_i2c_bus, fpga_ctrl)
self._audio_capture_manager = audio_utils.AudioCaptureManager(
self._fpga.adump)
def Select(self):
"""Selects the input flow to set the proper muxes and FPGA paths."""
super(InputFlowWithAudio, self).Select()
self._fpga.aroute.SetupRouteFromInputToDumper(self._input_id)
@property
def is_capturing_audio(self):
"""Is input flow capturing audio?"""
return self._audio_capture_manager.is_capturing
def StartCapturingAudio(self):
"""Starts capturing audio."""
self._audio_capture_manager.StartCapturingAudio()
def StopCapturingAudio(self):
"""Stops capturing audio.
Returns:
A tuple (data, format).
data: The captured audio data.
format: The dict representation of AudioDataFormat. Refer to docstring
of utils.audio.AudioDataFormat for detail.
Raises:
AudioCaptureManagerError: If captured time or page exceeds the limit.
AudioCaptureManagerError: If there is no captured data.
"""
return self._audio_capture_manager.StopCapturingAudio()
class DpInputFlow(InputFlow):
"""An abstraction of the entire flow for DisplayPort."""
_CONNECTOR_TYPE = 'DP'
_IS_DUAL_PIXEL_MODE = False
def __init__(self, *args):
super(DpInputFlow, self).__init__(*args)
self._edid = edid.DpEdid(args[0], self._main_bus)
def IsDualPixelMode(self):
"""Returns if the input flow uses dual pixel mode."""
return self._IS_DUAL_PIXEL_MODE
def IsPhysicalPlugged(self):
"""Returns if the physical cable is plugged."""
return self._rx.IsCablePowered()
def IsPlugged(self):
"""Returns if the HPD line is plugged."""
return self._fpga.hpd.IsPlugged(self._input_id)
def Plug(self):
"""Asserts HPD line to high, emulating plug."""
self._edid.Enable()
self._fpga.hpd.Plug(self._input_id)
def Unplug(self):
"""Deasserts HPD line to low, emulating unplug."""
self._fpga.hpd.Unplug(self._input_id)
self._edid.Disable()
def FireHpdPulse(self, deassert_interval_usec, assert_interval_usec,
repeat_count, end_level):
"""Fires one or more HPD pulse (low -> high -> low -> ...).
Args:
deassert_interval_usec: The time in microsecond of the deassert pulse.
assert_interval_usec: The time in microsecond of the assert pulse.
If None, then use the same value as
deassert_interval_usec.
repeat_count: The count of HPD pulses to fire.
end_level: HPD ends with 0 for LOW (unplugged) or 1 for HIGH (plugged).
"""
self._fpga.hpd.FireHpdPulse(self._input_id, deassert_interval_usec,
assert_interval_usec, repeat_count, end_level)
def FireMixedHpdPulses(self, widths):
"""Fires one or more HPD pulses, starting at low, of mixed widths.
One must specify a list of segment widths in the widths argument where
widths[0] is the width of the first low segment, widths[1] is that of the
first high segment, widths[2] is that of the second low segment, ... etc.
The HPD line stops at low if even number of segment widths are specified;
otherwise, it stops at high.
Args:
widths: list of pulse segment widths in usec.
"""
self._fpga.hpd.FireMixedHpdPulses(self._input_id, widths)
def ReadEdid(self):
"""Reads the EDID content."""
return self._edid.ReadEdid()
def WriteEdid(self, data):
"""Writes the EDID content."""
self._edid.WriteEdid(data)
def WaitVideoInputStable(self, unused_timeout=None):
"""Waits the video input stable or timeout. Returns success or not."""
# TODO(waihong): Implement this method.
return True
def WaitVideoOutputStable(self, unused_timeout=None):
"""Waits the video output stable or timeout. Returns success or not."""
# TODO(waihong): Implement this method.
return True
class HdmiInputFlow(InputFlowWithAudio):
"""An abstraction of the entire flow for HDMI."""
_CONNECTOR_TYPE = 'HDMI'
_IS_DUAL_PIXEL_MODE = True
_DELAY_VIDEO_MODE_PROBE = 0.1
_TIMEOUT_VIDEO_STABLE_PROBE = 10
_DELAY_WAITING_GOOD_PIXELS = 3
def __init__(self, *args):
super(HdmiInputFlow, self).__init__(*args)
self._edid = edid.HdmiEdid(self._main_bus)
def IsDualPixelMode(self):
"""Returns if the input flow uses dual pixel mode."""
return self._IS_DUAL_PIXEL_MODE
def IsPhysicalPlugged(self):
"""Returns if the physical cable is plugged."""
return self._rx.IsCablePowered()
def IsPlugged(self):
"""Returns if the HPD line is plugged."""
return self._fpga.hpd.IsPlugged(self._input_id)
def Plug(self):
"""Asserts HPD line to high, emulating plug."""
self._edid.Enable()
self._fpga.hpd.Plug(self._input_id)
def Unplug(self):
"""Deasserts HPD line to low, emulating unplug."""
self._fpga.hpd.Unplug(self._input_id)
self._edid.Disable()
def FireHpdPulse(self, deassert_interval_usec, assert_interval_usec,
repeat_count, end_level):
"""Fires one or more HPD pulse (low -> high -> low -> ...).
Args:
deassert_interval_usec: The time in microsecond of the deassert pulse.
assert_interval_usec: The time in microsecond of the assert pulse.
If None, then use the same value as
deassert_interval_usec.
repeat_count: The count of HPD pulses to fire.
end_level: HPD ends with 0 for LOW (unplugged) or 1 for HIGH (plugged).
"""
self._fpga.hpd.FireHpdPulse(self._input_id, deassert_interval_usec,
assert_interval_usec, repeat_count, end_level)
def FireMixedHpdPulses(self, widths):
"""Fires one or more HPD pulses, starting at low, of mixed widths.
One must specify a list of segment widths in the widths argument where
widths[0] is the width of the first low segment, widths[1] is that of the
first high segment, widths[2] is that of the second low segment, ... etc.
The HPD line stops at low if even number of segment widths are specified;
otherwise, it stops at high.
Args:
widths: list of pulse segment widths in usec.
"""
self._fpga.hpd.FireMixedHpdPulses(self._input_id, widths)
def ReadEdid(self):
"""Reads the EDID content."""
return self._edid.ReadEdid()
def WriteEdid(self, data):
"""Writes the EDID content."""
self._edid.WriteEdid(data)
def Do_FSM(self):
"""Does the Finite-State-Machine to ensure the input flow ready.
The receiver requires to do the FSM in order to clear its state, in case
of some events happended, like mode change, power reattach, etc.
It should be called before doing any post-receiver-action, like capturing
frames.
"""
if self.WaitVideoInputStable():
if self._rx.IsResetNeeded():
self._rx.Reset()
self.WaitVideoOutputStable()
# TODO(waihong): Remove this hack only for Nyan-Big.
# http://crbug.com/402152
time.sleep(self._DELAY_WAITING_GOOD_PIXELS)
else:
logging.warn('Skip doing receiver FSM as video input not stable.')
def WaitVideoInputStable(self, timeout=None):
"""Waits the video input stable or timeout. Returns success or not."""
if timeout is None:
timeout = self._TIMEOUT_VIDEO_STABLE_PROBE
try:
common.WaitForCondition(self._rx.IsVideoInputStable, True,
self._DELAY_VIDEO_MODE_PROBE, timeout)
except common.TimeoutError:
return False
return True
def _IsFrameLocked(self):
"""Returns whether the FPGA frame is locked.
It compares the resolution reported from the receiver with the FPGA.
Returns:
True if the frame is locked; otherwise, False.
"""
resolution_fpga = self._frame_manager.ComputeResolution()
resolution_rx = self._rx.GetResolution()
if resolution_fpga == resolution_rx:
logging.info('same resolution: %dx%d', *resolution_fpga)
return True
else:
logging.info('diff resolution: fpga:%dx%d != rx:%dx%d',
*(resolution_fpga + resolution_rx))
return False
def WaitVideoOutputStable(self, timeout=None):
"""Waits the video output stable or timeout. Returns success or not."""
if timeout is None:
timeout = self._TIMEOUT_VIDEO_STABLE_PROBE
try:
common.WaitForCondition(self._IsFrameLocked, True,
self._DELAY_VIDEO_MODE_PROBE, timeout)
except common.TimeoutError:
return False
return True
class VgaInputFlow(InputFlow):
"""An abstraction of the entire flow for VGA."""
_CONNECTOR_TYPE = 'VGA'
_IS_DUAL_PIXEL_MODE = False
_DELAY_CHECKING_STABLE_PROBE = 0.1
_TIMEOUT_CHECKING_STABLE = 5
_DELAY_RESOLUTION_PROBE = 0.05
def __init__(self, *args):
super(VgaInputFlow, self).__init__(*args)
self._edid = edid.VgaEdid(self._fpga)
def IsDualPixelMode(self):
"""Returns if the input flow uses dual pixel mode."""
return self._IS_DUAL_PIXEL_MODE
def IsPhysicalPlugged(self):
"""Returns if the physical cable is plugged."""
# VGA has no HPD to detect hot-plug. We check the source signal
# to make that decision. So plug it and wait a while to see any
# signal received. It does not work if DUT is not well-behaved.
plugged_before_check = self.IsPlugged()
if not plugged_before_check:
self.Plug()
is_stable = self.WaitVideoInputStable()
if not plugged_before_check:
self.Unplug()
return is_stable
def IsPlugged(self):
"""Returns if the HPD line is plugged."""
return not bool(self._mux_io.GetOutput() & io.MuxIo.MASK_VGA_BLOCK_SOURCE)
def Plug(self):
"""Asserts HPD line to high, emulating plug."""
self._edid.Enable()
# For VGA, unblock the RGB source to emulate plug.
self._mux_io.ClearOutputMask(io.MuxIo.MASK_VGA_BLOCK_SOURCE)
def Unplug(self):
"""Deasserts HPD line to low, emulating unplug."""
# For VGA, block the RGB source to emulate unplug.
self._mux_io.SetOutputMask(io.MuxIo.MASK_VGA_BLOCK_SOURCE)
self._edid.Disable()
def FireHpdPulse(self, deassert_interval_usec, assert_interval_usec,
repeat_count, end_level):
"""Fires one or more HPD pulse (low -> high -> low -> ...).
Args:
deassert_interval_usec: The time in microsecond of the deassert pulse.
assert_interval_usec: The time in microsecond of the assert pulse.
If None, then use the same value as
deassert_interval_usec.
repeat_count: The count of HPD pulses to fire.
end_level: HPD ends with 0 for LOW (unplugged) or 1 for HIGH (plugged).
"""
pass
def FireMixedHpdPulses(self, widths):
"""Fires one or more HPD pulses, starting at low, of mixed widths.
One must specify a list of segment widths in the widths argument where
widths[0] is the width of the first low segment, widths[1] is that of the
first high segment, widths[2] is that of the second low segment, ... etc.
The HPD line stops at low if even number of segment widths are specified;
otherwise, it stops at high.
Args:
widths: list of pulse segment widths in usec.
"""
pass
def ReadEdid(self):
"""Reads the EDID content."""
return self._edid.ReadEdid()
def WriteEdid(self, data):
"""Writes the EDID content."""
self._edid.WriteEdid(data)
def WaitVideoInputStable(self, timeout=None):
"""Waits the video input stable or timeout. Returns success or not."""
if timeout is None:
timeout = self._TIMEOUT_CHECKING_STABLE
try:
# Check if H-Sync/V-Sync recevied from the source.
common.WaitForCondition(self._rx.IsSyncDetected, True,
self._DELAY_CHECKING_STABLE_PROBE, timeout)
except common.TimeoutError:
return False
return True
def _IsResolutionValid(self):
"""Returns True if the resolution from FPGA is valid and not floating."""
resolution1 = self._frame_manager.ComputeResolution()
time.sleep(self._DELAY_RESOLUTION_PROBE)
resolution2 = self._frame_manager.ComputeResolution()
return resolution1 == resolution2 and 0 not in resolution1
def WaitVideoOutputStable(self, timeout=None):
"""Waits the video output stable or timeout. Returns success or not."""
if timeout is None:
timeout = self._TIMEOUT_CHECKING_STABLE
try:
# Wait a valid resolution and not floating.
common.WaitForCondition(self._IsResolutionValid, True,
self._DELAY_CHECKING_STABLE_PROBE, timeout)
except common.TimeoutError:
return False
return True