| # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """FPGA module for controlling the functions of FPGA. |
| |
| Using the FpgaController can control all its subsystems. |
| Usage: |
| import fpga |
| fpga_ctrl = fpga.FpgaController() |
| |
| # Control the HPD |
| fpga_ctrl.hpd.Plug(input_id) |
| fpga_ctrl.hpd.Unplug(input_id) |
| |
| # Select the video pass-through. |
| fpga_ctrl.vpass.Select(input_id) |
| """ |
| |
| import logging |
| import struct |
| |
| import chameleon_common # pylint: disable=W0611 |
| from chameleond.utils import audio |
| from chameleond.utils import ids |
| from chameleond.utils import mem |
| from chameleond.utils import system_tools |
| |
| |
| class FpgaController(object): |
| """A class to abstract the all behaviors of FPGA. |
| |
| An instance of this class also includes the instances of its subsystems. |
| The caller can directly access them via the member variables. |
| """ |
| |
| def __init__(self): |
| """Constructs a FpgaController object.""" |
| self.hpd = HpdController() |
| self.vpass = VideoPasser() |
| self.vdump0 = VideoDumper(0) |
| self.vdump1 = VideoDumper(1) |
| self.adump = AudioDumper() |
| self.aroute = AudioRouteController() |
| self.astream = AudioStreamController() |
| self.aiis = AudioI2SController() |
| self.hdmi_edid = EdidController(EdidController.HDMI_BASE) |
| self.vga_edid = EdidController(EdidController.VGA_BASE) |
| |
| |
| class HpdController(object): |
| """A class to abstract the behavior of HPD.""" |
| |
| _HPD_BASE = 0xff21a000 |
| _HPD_OFFSETS = { |
| ids.DP1: 0x4, |
| ids.DP2: 0x8, |
| ids.HDMI: 0xc |
| } |
| _BIT_UNPLUG = 0 |
| _BIT_PLUG = 1 |
| |
| def __init__(self): |
| """Constructs a HpdController object.""" |
| self._memory = mem.MemoryForController |
| |
| def IsPlugged(self, input_id): |
| """Returns if the HPD line is plugged. |
| |
| Args: |
| input_id: The ID of the input connector. Check the value in ids.py. |
| |
| Returns: |
| True if the HPD line is plugged; otherwise, False. |
| """ |
| return (self._memory.Read(self._HPD_BASE + self._HPD_OFFSETS[input_id]) == |
| self._BIT_PLUG) |
| |
| def Plug(self, input_id): |
| """Asserts HPD line to high, emulating plug. |
| |
| Args: |
| input_id: The ID of the input connector. Check the value in ids.py. |
| """ |
| self._memory.Write(self._HPD_BASE + self._HPD_OFFSETS[input_id], |
| self._BIT_PLUG) |
| |
| def Unplug(self, input_id): |
| """Deasserts HPD line to low, emulating unplug. |
| |
| Args: |
| input_id: The ID of the input connector. Check the value in ids.py. |
| """ |
| self._memory.Write(self._HPD_BASE + self._HPD_OFFSETS[input_id], |
| self._BIT_UNPLUG) |
| |
| def FireHpdPulse(self, input_id, deassert_interval_usec, assert_interval_usec, |
| repeat_count, end_level): |
| """Fires one or more HPD pulse (low -> high -> low -> ...). |
| |
| Args: |
| input_id: The ID of the input connector. |
| deassert_interval_usec: The time in microsecond of the deassert pulse. |
| assert_interval_usec: The time in microsecond of the assert pulse. |
| If None, then use the same value as |
| deassert_interval_usec. |
| repeat_count: The count of HPD pulses to fire. |
| end_level: HPD ends with 0 for LOW (unplugged) or 1 for HIGH (plugged). |
| """ |
| system_tools.SystemTools.Call( |
| 'hpd_control', 'repeat_pulse', |
| self._HPD_OFFSETS[input_id], int(deassert_interval_usec), |
| int(assert_interval_usec), repeat_count, end_level) |
| |
| def FireMixedHpdPulses(self, input_id, widths): |
| """Fires one or more HPD pulses, starting at low, of mixed widths. |
| |
| One must specify a list of segment widths in the widths argument where |
| widths[0] is the width of the first low segment, widths[1] is that of the |
| first high segment, widths[2] is that of the second low segment, ... etc. |
| The HPD line stops at low if even number of segment widths are specified; |
| otherwise, it stops at high. |
| |
| Args: |
| input_id: The ID of the input connector. |
| widths: list of pulse segment widths in usec. |
| """ |
| system_tools.SystemTools.Call( |
| 'hpd_control', 'pulse', |
| self._HPD_OFFSETS[input_id], |
| *tuple(widths)) |
| |
| class VideoPasser(object): |
| """A class to abstract the behavior of video pass-through. |
| |
| The pass-through video is output to the VGA output on the main board. |
| """ |
| |
| _REG_CTRL = 0xff21d004 |
| |
| _BIT_DATA_A = 0 |
| _BIT_DATA_B = 1 << 0 |
| _BIT_CLK_A = 0 |
| _BIT_CLK_B = 1 << 1 |
| |
| _VALUES_CTRL = { |
| ids.DP1: _BIT_CLK_A | _BIT_DATA_A, |
| ids.DP2: _BIT_CLK_B | _BIT_DATA_B, |
| ids.HDMI: _BIT_CLK_B | _BIT_DATA_A, |
| ids.VGA: _BIT_CLK_A | _BIT_DATA_A |
| } |
| |
| def __init__(self): |
| """Constructs a VideoPasser object.""" |
| self._memory = mem.MemoryForController |
| |
| def Select(self, input_id): |
| """Selects the given input for pass-through. |
| |
| Args: |
| input_id: The ID of the input connector. Check the value in ids.py. |
| """ |
| self._memory.Write(self._REG_CTRL, self._VALUES_CTRL[input_id]) |
| |
| |
| class VideoDumper(object): |
| """A class to control video dumper.""" |
| |
| _REGS_BASE = (0xff210000, # Dumper 0 |
| 0xff211000) # Dumper 1 |
| # Control register |
| _REG_CTRL = 0x0 |
| _BIT_CLK_NORMAL = 0 |
| _BIT_CLK_ALT = 1 << 1 |
| _BIT_STOP = 0 |
| _BIT_RUN = 1 << 2 |
| # Run only when both dumpers' _BIT_RUN_DUAL set. |
| _BIT_RUN_DUAL = 1 << 3 |
| # Set to generate 64-bit frame hash; otherwise, 32-bit. |
| _BIT_HASH_64 = 1 << 4 |
| # Set to enable cropping. |
| _BIT_CROP = 1 << 5 |
| |
| # Register which stores the offsets, related to 0xc0000000, for dump. |
| _REG_START_ADDR = 0x8 |
| _REG_END_ADDR = 0xc |
| _REG_LOOP = 0x10 |
| _REG_LIMIT = 0x14 |
| # Registers to get the width and height |
| _REG_WIDTH = 0x18 |
| _REG_HEIGHT = 0x1c |
| _REG_FRAME_COUNT = 0x20 |
| # Registers to crop frames |
| _REG_CROP_XRANGE = 0x24 |
| _REG_CROP_YRANGE = 0x28 |
| |
| # Frame hash buffer |
| _REG_HASH_BUF_BASE = 0x400 |
| _REG_HASH_BUF_SIZE = 1024 |
| |
| # Input | DP1 | DP2 | HDMI | CRT | |
| # ----------------------------------------------------------- |
| # (1) CLOCK | A | B | B | A | |
| # ----------------------------------------------------------- |
| # (2) SINGLE PIXEL DATA | A | B | B | A | |
| # (3) DUAL PIXEL EVEN PIXELS DATA | A | B | A | | |
| # (4) DUAL PIXEL ODD PIXELS DATA | B | A | B | | |
| PRIMARY_FLOW_INDEXES = { |
| ids.DP1: 0, |
| ids.DP2: 1, |
| ids.HDMI: 1, |
| ids.VGA: 0, |
| } |
| EVEN_PIXELS_FLOW_INDEXES = { |
| ids.DP1: 0, |
| ids.DP2: 1, |
| ids.HDMI: 0, |
| ids.VGA: 0, |
| } |
| |
| _DUMP_BASE_ADDRESS = 0xc0000000 |
| |
| _DUMP_START_ADDRESSES = (0x00000000, # Dumper 0 |
| 0x20000000) # Dumper 1 |
| _DUMP_BUFFER_SIZE = 0x1b400000 |
| |
| # For tempoary dump, don't interfere with the above buffer for capturing. |
| _DUMP_TEMP_START_ADDRESSES = (0x1b400000, # Dumper 0 |
| 0x3b400000) # Dumper 1 |
| # The size of a half of a 3840x2160 image is 0xbdd800. |
| _DUMP_TEMP_BUFFER_SIZE = 0x00c00000 |
| |
| _DEFAULT_LIMIT = 1 |
| _DEFAULT_ENABLE_LOOP = False |
| |
| def __init__(self, index): |
| """Constructs a VideoDumper object. |
| |
| Args: |
| index: 0 for Dumper A and 1 for Dumper B. |
| """ |
| self._memory = mem.MemoryForController |
| self._index = index |
| |
| def EnableCrop(self, x, y, width, height): |
| """Enable cropping frames. |
| |
| Only dump the pixels and its checksum within the given rectangle. |
| |
| Args: |
| x: The left column index of the rectangle. |
| y: The top row index of the rectangle. |
| width: The width of the rectangle. |
| height: The height of the rectangle. |
| """ |
| right = x + width |
| bottom = y + height |
| self._memory.Write(self._REGS_BASE[self._index] + self._REG_CROP_XRANGE, |
| right << 16 | x) |
| self._memory.Write(self._REGS_BASE[self._index] + self._REG_CROP_YRANGE, |
| bottom << 16 | y) |
| self._memory.SetMask(self._REGS_BASE[self._index] + self._REG_CTRL, |
| self._BIT_CROP) |
| |
| def DisableCrop(self): |
| """Disable cropping frames.""" |
| self._memory.ClearMask(self._REGS_BASE[self._index] + self._REG_CTRL, |
| self._BIT_CROP) |
| |
| def Stop(self): |
| """Stops dumping.""" |
| self._memory.ClearMask(self._REGS_BASE[self._index] + self._REG_CTRL, |
| self._BIT_RUN | self._BIT_RUN_DUAL) |
| |
| def Start(self, input_id, dual_pixel_mode): |
| """Starts dumping. |
| |
| Args: |
| input_id: The ID of the input connector. Check the value in ids.py. |
| dual_pixel_mode: True to use dual pixel mode; otherwise, False. |
| """ |
| if dual_pixel_mode: |
| bit_run = self._BIT_RUN_DUAL |
| elif self._index == self.PRIMARY_FLOW_INDEXES[input_id]: |
| bit_run = self._BIT_RUN |
| else: |
| return |
| self._memory.SetMask(self._REGS_BASE[self._index] + self._REG_CTRL, bit_run) |
| |
| @classmethod |
| def GetMaxFrameLimit(cls, width, height): |
| """Returns of the maximal number of frames which can be dumped.""" |
| BYTE_PER_PIXEL = 3 |
| PAGE_SIZE = 4096 |
| frame_size = width * height * BYTE_PER_PIXEL |
| frame_size = ((frame_size - 1) / PAGE_SIZE + 1) * PAGE_SIZE |
| return cls._DUMP_BUFFER_SIZE / frame_size |
| |
| def SetFrameLimit(self, frame_limit, loop=False): |
| """Sets the limitation of total frames to dump. |
| |
| Args: |
| frame_limit: The number of frames to dump. |
| loop: When the frame_limit is reached, True to reset the dump pointer |
| to the start address; False to do nothing. |
| """ |
| self._memory.Write(self._REGS_BASE[self._index] + self._REG_LIMIT, |
| frame_limit) |
| self._memory.Write(self._REGS_BASE[self._index] + self._REG_LOOP, |
| 1 if loop else 0) |
| |
| def SetDumpAddressForTemp(self): |
| """Sets the dump memory address space for temporary dump.""" |
| self._memory.Write(self._REGS_BASE[self._index] + self._REG_START_ADDR, |
| self._DUMP_TEMP_START_ADDRESSES[self._index]) |
| self._memory.Write(self._REGS_BASE[self._index] + self._REG_END_ADDR, |
| self._DUMP_TEMP_START_ADDRESSES[self._index] + |
| self._DUMP_TEMP_BUFFER_SIZE) |
| |
| def SetDumpAddressForCapture(self): |
| """Sets the dump memory address space for capture.""" |
| self._memory.Write(self._REGS_BASE[self._index] + self._REG_START_ADDR, |
| self._DUMP_START_ADDRESSES[self._index]) |
| self._memory.Write(self._REGS_BASE[self._index] + self._REG_END_ADDR, |
| self._DUMP_START_ADDRESSES[self._index] + |
| self._DUMP_BUFFER_SIZE) |
| |
| def Select(self, input_id, dual_pixel_mode): |
| """Selects the given input for dumping. |
| |
| Args: |
| input_id: The ID of the input connector. Check the value in ids.py. |
| dual_pixel_mode: True to use dual pixel mode; otherwise, False. |
| """ |
| self.Stop() |
| self.SetDumpAddressForTemp() |
| self.SetFrameLimit(self._DEFAULT_LIMIT, self._DEFAULT_ENABLE_LOOP) |
| # Use the proper CLK. |
| if self._index == self.PRIMARY_FLOW_INDEXES[input_id]: |
| ctrl_value = self._BIT_CLK_NORMAL |
| else: |
| ctrl_value = self._BIT_CLK_ALT |
| # Use the proper hash mode. |
| if not dual_pixel_mode: |
| ctrl_value = ctrl_value | self._BIT_HASH_64 |
| |
| self._memory.Write(self._REGS_BASE[self._index] + self._REG_CTRL, |
| ctrl_value) |
| # Start dumping. |
| self.Start(input_id, dual_pixel_mode) |
| |
| def GetWidth(self): |
| """Gets the width of the video path.""" |
| return self._memory.Read(self._REGS_BASE[self._index] + self._REG_WIDTH) |
| |
| def GetHeight(self): |
| """Gets the height of the video path.""" |
| return self._memory.Read(self._REGS_BASE[self._index] + self._REG_HEIGHT) |
| |
| def GetFrameCount(self): |
| """Gets the total count of frames captured.""" |
| return self._memory.Read(self._REGS_BASE[self._index] + |
| self._REG_FRAME_COUNT) |
| |
| def GetFrameHash(self, index, dual_pixel_mode): |
| """Gets the frame hash of the given frame index. |
| |
| FPGA overwrites the old hash values when exceeding the hash buffer |
| size. The caller should save the old values before that happens. |
| |
| Args: |
| index: The index of frame. The index can exceed the hash buffer size. |
| dual_pixel_mode: True if using the dual pixel mode; otherwise, False. |
| |
| Returns: |
| A list of hash16 values. |
| """ |
| hash_addr = lambda x: (self._REGS_BASE[self._index] + |
| self._REG_HASH_BUF_BASE + (x * 4) % self._REG_HASH_BUF_SIZE) |
| |
| if dual_pixel_mode: |
| hash32 = self._memory.Read(hash_addr(index)) |
| return [hash32 >> 16, hash32 & 0xffff] |
| else: |
| hash32s = [self._memory.Read(hash_addr(index * 2 + i)) for i in (0, 1)] |
| return [hash32s[1] >> 16, hash32s[1] & 0xffff, |
| hash32s[0] >> 16, hash32s[0] & 0xffff] |
| |
| @classmethod |
| def GetPixelDumpArgs(cls, input_id, dual_pixel_mode): |
| """Gets the arguments of pixeldump tool which selects the proper buffers. |
| |
| Args: |
| input_id: The ID of the input connector. Check the value in ids.py. |
| dual_pixel_mode: True to use dual pixel mode; otherwise, False. |
| """ |
| i = cls.PRIMARY_FLOW_INDEXES[input_id] |
| if dual_pixel_mode: |
| i = cls.EVEN_PIXELS_FLOW_INDEXES[input_id] |
| return ['-a', cls._DUMP_BASE_ADDRESS + cls._DUMP_START_ADDRESSES[i], |
| '-b', cls._DUMP_BASE_ADDRESS + cls._DUMP_START_ADDRESSES[1 - i]] |
| else: |
| return ['-a', cls._DUMP_BASE_ADDRESS + cls._DUMP_START_ADDRESSES[i]] |
| |
| |
| class EdidController(object): |
| """A class to abstract the behavior of the EDID controller.""" |
| |
| HDMI_BASE = 0xff217000 |
| VGA_BASE = 0xff219000 |
| _REG_CTRL = 0 |
| _BIT_RESET = 0 |
| _BIT_OPERATE = 1 |
| _EDID_MEM = 0x100 |
| |
| _EDID_SIZE = 256 |
| |
| def __init__(self, edid_base): |
| """Constructs a EdidController object. |
| |
| Args: |
| edid_base: The base of the memory address which stores the EDID. |
| """ |
| self._memory = mem.MemoryForController |
| self._edid_base = edid_base |
| |
| def WriteEdid(self, data): |
| """Writes the EDID content. |
| |
| Args: |
| data: The EDID control to write. |
| """ |
| for offset in range(0, len(data), 4): |
| value = struct.unpack('>I', data[offset:offset+4])[0] |
| self._memory.Write(self._edid_base + self._EDID_MEM + offset, value) |
| self._memory.Write(self._edid_base + self._REG_CTRL, self._BIT_OPERATE) |
| |
| def ReadEdid(self): |
| """Reads the EDID content. |
| |
| Returns: |
| A byte array of EDID data. |
| """ |
| all_value = '' |
| for offset in range(0, self._EDID_SIZE, 4): |
| value = self._memory.Read(self._edid_base + self._EDID_MEM + offset) |
| all_value += struct.pack('>I', value) |
| return all_value |
| |
| |
| # Audio Routing |
| # |
| # HDMI DP1 DP2 MIC LineIn Memory |
| # | | | | | | |
| # V V V V V V |
| # ---------- -------------- ------- --------------------- |
| # | RX I2S | |AudioGenerator| | Codec | |AudioStreamController| |
| # ---------- -------------- ------- --------------------- |
| # | | | | |
| # | |_____ ____| | |
| # |_________________ | | _________________| |
| # | | | | |
| # V V V V |
| # --------------------- |
| # | AudioRouteController| |
| # --------------------- |
| # | | |
| # ______| |________ |
| # (8-channel) | | (8-channel) |
| # V V |
| # ------------------ ----------- |
| # |AudioI2SController| |AudioDumper| |
| # ------------------ ----------- |
| # (2-channel) | | |
| # V V |
| # ------- Memory |
| # | Codec | |
| # ------- |
| # | |
| # V |
| # LineOut |
| |
| |
| class AudioDumperError(Exception): |
| """Exception raised when any error on AudioDumper.""" |
| pass |
| |
| |
| class AudioDumper(object): |
| """A class to control audio dumper.""" |
| |
| _REGS_BASE = 0xff212000 |
| |
| # Control register |
| _REG_CTRL = 0x0 |
| _BIT_RUN = 1 << 1 |
| |
| # Register which stores the offsets relative to _DUMP_BASE_ADDRESS for dump. |
| # The valid value is in the range of 0x00000000 to _DUMP_BUFFER_SIZE. |
| # Also, the address should be 4K aligned. |
| # Note that VideoDumper and AudioDumper share dump memory. Be careful to |
| # use these two dumpers at the same time. |
| _REG_START_ADDR = 0x8 |
| _REG_END_ADDR = 0xc |
| |
| # This address is in ARM's address space. |
| # Address relative to _DUMP_BASE_ADDRESS is in dumper's address space. |
| # E.g. address 0xdc000000 in ARM's address space is 0x1c000000 in |
| # dumper's address space. |
| _DUMP_BASE_ADDRESS = 0xc0000000 |
| _DUMP_BUFFER_SIZE = 0x3c000000 |
| |
| # If set to 1, the dump pointer is reset to Dump Start Address after it |
| # reaches Dump End Address. If set to 0, the dump pointer does not reset. |
| _REG_LOOP = 0x10 |
| |
| # Number of pages have been dumped. It starts from 0 when Run bit is set and |
| # wraps around at 65536. |
| _REG_PAGE_COUNT = 0x14 |
| |
| # The default address for audio dump. This area is 32 MBytes. |
| # These addresses are in dumper's address space. |
| _DEFAULT_START_ADDRESS = 0x1c000000 |
| _DEFAULT_END_ADDRESS = 0x1e000000 |
| |
| # Page size is 4K bytes. Address should be 4K-aligned. |
| PAGE_SIZE = 0x1000 |
| |
| # The rate of audio data is |
| # 8 channel * 4 bytes/sample * 48000 samples/sec = 1500 KBytes/sec. |
| # So default area which contains 0x2000 4K pages can dump |
| # 32 MBytes / 1500 KBytes = 21 sec of data. |
| MAX_DUMP_PAGES = ( |
| (_DEFAULT_END_ADDRESS - _DEFAULT_START_ADDRESS) / PAGE_SIZE) |
| MAX_DUMP_TIME_SECS = ( |
| (_DEFAULT_END_ADDRESS - _DEFAULT_START_ADDRESS) / (8 * 4 * 48000)) |
| |
| # Set loop to 1 so page count will increase over 0x2000 and we can detect the |
| # case where page count exceeds the limit. |
| # However, since page count will overflow at 65535, the number will not be |
| # reliable after that point. |
| _DEFAULT_LOOP = 1 |
| |
| # Audio data format of dumped data. Chameleond API user needs to get |
| # the format to read data correctly. |
| AUDIO_DATA_FORMAT = audio.AudioDataFormat( |
| file_type='raw', sample_format='S32_LE', channel=8, rate=48000) |
| |
| def __init__(self): |
| """Constructs an AudioDumper object.""" |
| self._memory = mem.MemoryForController |
| self._Stop() |
| |
| def _Stop(self): |
| """Stops dumping.""" |
| self._memory.ClearMask(self._REGS_BASE + self._REG_CTRL, self._BIT_RUN) |
| |
| def _Start(self): |
| """Starts dumping.""" |
| self._memory.SetMask(self._REGS_BASE + self._REG_CTRL, self._BIT_RUN) |
| |
| @property |
| def is_dumping(self): |
| """Is dumper running?""" |
| return self._memory.Read(self._REGS_BASE + self._REG_CTRL) & self._BIT_RUN |
| |
| def _CheckAddressValid(self, name, address): |
| """Checks an address is within valid range, and is aligned. |
| |
| Args: |
| name: The address name. |
| address: An address. |
| |
| Raises: |
| AudioDumperError if address is not valid. |
| """ |
| if address < 0 or address >= self._DUMP_BUFFER_SIZE: |
| raise AudioDumperError( |
| '%s address 0x%x is not in the range of 0 to 0x%x' % ( |
| name, address, self._DUMP_BUFFER_SIZE)) |
| if address & (self.PAGE_SIZE - 1): |
| raise AudioDumperError( |
| '%s address 0x%x is not aligned with 0x%x' % ( |
| name, address, self.PAGE_SIZE)) |
| |
| def StartDumpingToMemory(self): |
| """Starts dumping to memory.""" |
| #TODO(cychiang) Implement rotation dumping for long recording. |
| self._Stop() |
| |
| start_address = self._DEFAULT_START_ADDRESS |
| end_address = self._DEFAULT_END_ADDRESS |
| loop = self._DEFAULT_LOOP |
| |
| # Checks address is valid. |
| for name, address in [('start', start_address), |
| ('end', end_address)]: |
| self._CheckAddressValid(name, address) |
| |
| # Sets the memory addresses, loop for dump. |
| self._memory.Write(self._REGS_BASE + self._REG_START_ADDR, start_address) |
| self._memory.Write(self._REGS_BASE + self._REG_END_ADDR, end_address) |
| self._memory.Write(self._REGS_BASE + self._REG_LOOP, loop) |
| |
| self._Start() |
| |
| def StopDumpingToMemory(self): |
| """Stops dumping to memory. |
| |
| Returns: |
| A tuple of (Mapped start address, Page count) |
| """ |
| # Reads out the address and page count before stop, otherwise page_count |
| # will be cleared. |
| start_address = self._memory.Read( |
| self._REGS_BASE + self._REG_START_ADDR) |
| page_count = self._memory.Read( |
| self._REGS_BASE + self._REG_PAGE_COUNT) |
| self._Stop() |
| return AudioDumper._GetMappedAddress(start_address), page_count |
| |
| @classmethod |
| def _GetMappedAddress(cls, address): |
| """Gets mapped address in ARM's address space. |
| |
| Gets address in ARM's address space for a given address in |
| dumper's address space. |
| |
| Args: |
| address: An address relative to _DUMP_BASE_ADDRESS. |
| |
| Returns: |
| A mapped address which is the input address shifted by |
| _DUMP_BASE_ADDRESS. The address is in ARM's address space. |
| """ |
| return address + cls._DUMP_BASE_ADDRESS |
| |
| @property |
| def audio_data_format_as_dict(self): |
| """Format of the audio data dumped by this dumper. |
| |
| Returns: |
| A dict containing file_type, sample_format, channel, rate contained in |
| AUDIO_DATA_FORMAT. Refer to audio.AudioDataFormat docstring for details. |
| """ |
| return self.AUDIO_DATA_FORMAT.AsDict() |
| |
| |
| class AudioSource(object): |
| """Audio inputs available on audio source controller.""" |
| # Receives audio data from DP1, DP2 and HDMI. |
| RX_I2S = 'I2S Receiver' |
| # Audio generator. |
| GENERATOR = 'Audio generator' |
| # Audio codec, which records data from mic or linein. |
| CODEC = 'Audio codec' |
| # Audio streamer, which reads data from memory. |
| MEMORY = 'Audio streamer' |
| |
| |
| class AudioDestination(object): |
| """Audio destinations available on audio source controller.""" |
| # Sends audio data to I2S controller. |
| I2S = 'I2S controller' |
| # Sends audio data to audio dumper. |
| DUMPER = 'Audio dumper' |
| |
| |
| class AudioRouteControllerError(Exception): |
| """Exception raised when any error on AudioRouteController.""" |
| pass |
| |
| |
| class AudioRouteController(object): |
| """A class to control audio source controller. |
| |
| As shown in audio routing graph, this controller controls two destinations in |
| AudioDestination. They can be independently selected from four sources in |
| AudioSource. |
| """ |
| _REGS_BASE = 0xff213000 |
| |
| # Output selection register |
| _REG_INPUT_SELECT = 0x0 |
| _VALUES_INPUT_SELECT = { |
| AudioSource.RX_I2S: 0, |
| AudioSource.GENERATOR: 1, |
| AudioSource.CODEC: 2, |
| AudioSource.MEMORY: 3} |
| |
| _MASKS_INPUT_SELECT = { |
| AudioDestination.I2S: 0b00000011, |
| AudioDestination.DUMPER: 0b00001100} |
| |
| _SHIFTS_INPUT_SELECT = { |
| AudioDestination.I2S: 0, |
| AudioDestination.DUMPER: 2} |
| |
| _REG_GENERATOR_ENABLE = 0x4 |
| _BIT_GENERATOR_ENABLE = 1 |
| |
| def __init__(self): |
| """Constructs an AudioRouteController object.""" |
| self._memory = mem.MemoryForController |
| |
| def SetupRouteFromInputToDumper(self, input_id): |
| """Sets up audio source given input_id for audio dumper. |
| |
| Args: |
| input_id: The ID of the input connector. Check the value in ids.py. |
| """ |
| self._SetupRouteFromInput(input_id, AudioDestination.DUMPER) |
| |
| def SetupRouteFromInputToI2S(self, input_id): |
| """Sets up audio source given input_id for I2S controller. |
| |
| Args: |
| input_id: The ID of the input connector. Check the value in ids.py. |
| """ |
| # Check docstring of _EnableGenerator for the reason to enable generator. |
| self._EnableGenerator(True) |
| self._SetupRouteFromInput(input_id, AudioDestination.I2S) |
| |
| def _SetupRouteFromInput(self, input_id, destination): |
| """Sets up audio route given an input_id and a destination. |
| |
| Args: |
| input_id: The ID of the input connector. Check the value in ids.py. |
| destination: A destination in AudioDestination. |
| |
| Raises: |
| AudioRouteControllerError if input_id is not supported. |
| """ |
| if input_id in [ids.DP1, ids.DP2, ids.HDMI]: |
| self._SetupRoute(AudioSource.RX_I2S, destination) |
| return |
| if input_id in [ids.MIC, ids.LINEIN]: |
| self._SetupRoute(AudioSource.CODEC, destination) |
| return |
| raise AudioRouteControllerError( |
| 'input_id %s is not supported in AudioRouteController' % input_id) |
| |
| def SetupRouteFromMemoryToI2S(self): |
| """Sets up memory as audio source and I2S as destination.""" |
| # Check docstring of _EnableGenerator for the reason to enable generator. |
| self._EnableGenerator(True) |
| self._SetupRoute(AudioSource.MEMORY, AudioDestination.I2S) |
| |
| def _SetupRoute(self, source, destination): |
| """Sets up audio route given a source and a destination. |
| |
| Args: |
| source: An audio source in AudioSource. |
| destination: An audio destination in AudioDestination. |
| """ |
| logging.info('Select audio source: %r, destination: %r', |
| source, destination) |
| old_value = self._memory.Read( |
| self._REGS_BASE + self._REG_INPUT_SELECT) |
| # Only modifies bits in self._MASKS_INPUT_SELECT[destination]. |
| new_value = ( |
| (old_value & ~self._MASKS_INPUT_SELECT[destination]) | |
| (self._VALUES_INPUT_SELECT[source] << |
| self._SHIFTS_INPUT_SELECT[destination])) |
| |
| self._memory.Write( |
| self._REGS_BASE + self._REG_INPUT_SELECT, new_value) |
| |
| def _EnableGenerator(self, enable): |
| """Enables generator. |
| |
| The audio codec needs us feed its I2S clock 48K when recording/playing |
| audio. Generator generates a fixed 48K clock once it is turned on and it |
| is not controlled by divisor or volume control. |
| |
| Args: |
| enable: True to enable. |
| """ |
| if enable: |
| self._memory.SetMask( |
| self._REGS_BASE + self._REG_GENERATOR_ENABLE, |
| self._BIT_GENERATOR_ENABLE) |
| else: |
| self._memory.ClearMask( |
| self._REGS_BASE + self._REG_GENERATOR_ENABLE, |
| self._BIT_GENERATOR_ENABLE) |
| |
| class AudioStreamControllerError(Exception): |
| """Exception raised when any error on AudioStreamController.""" |
| pass |
| |
| |
| class AudioStreamController(object): |
| """A class to control audio stream controller.""" |
| _REGS_BASE = 0xff213000 |
| |
| _REG_STREAM_ENABLE = 0x10 |
| _BIT_STREAM_ENABLE = 1 |
| |
| # Underflow bit is set to high by fpga when stream enable bit is high but |
| # the data from start address to end address are all streamed. |
| # If loop register is set to 1, then underflow will not happen. |
| _REG_STREAM_UNDERFLOW = 0x14 |
| _BIT_STREAM_UNDERFLOW = 1 |
| |
| # Register which stores the offsets relative to _STREAM_BASE_ADDRESS. |
| # The valid value is in the range of 0x00000000 to _STREAM_BUFFER_SIZE. |
| # Also, the address should be 4K aligned. |
| # Note that VideoDumper and AudioDumper share dump memory, which is used |
| # as stream memory as well. Be careful to use these two dumpers and audio |
| # stream controller at the same time. |
| _REG_START_ADDR = 0x18 |
| _REG_END_ADDR = 0x1c |
| |
| # This address is in ARM's address space. |
| # Address relative to _STREAM_BASE_ADDRESS is in stream's address space. |
| # E.g. address 0xde000000 in ARM's address space is 0x1e000000 in |
| # stream's address space. |
| _STREAM_BASE_ADDRESS = 0xc0000000 |
| _STREAM_BUFFER_SIZE = 0x3c000000 |
| |
| # If set to 1, the stream pointer is reset to stream start address after it |
| # reaches stream end address. If set to 0, the stream pointer does not reset. |
| _REG_LOOP = 0x20 |
| |
| # Set loop to 0 to playback only once. |
| _DEFAULT_LOOP = 0 |
| |
| # Number of pages have been streamed. It starts from 0 when Run bit is set and |
| # wraps around at 65536. |
| _REG_PAGE_COUNT = 0x14 |
| |
| # Page size is 4K bytes. Address should be 4K-aligned. |
| PAGE_SIZE = 0x1000 |
| |
| # The default address for audio stream. This area is 32 MBytes. |
| # These addresses are in stream's address space. |
| _DEFAULT_START_ADDRESS = 0x1e000000 |
| _DEFAULT_END_ADDRESS = 0x20000000 |
| |
| # Default maximum size of stream in bytes. |
| MAX_STREAM_BUFFER_SIZE = _DEFAULT_END_ADDRESS - _DEFAULT_START_ADDRESS |
| |
| # Audio data format of stream data. AudioStreamController user needs to know |
| # the format to write data correctly. |
| AUDIO_DATA_FORMAT = audio.AudioDataFormat( |
| file_type='raw', sample_format='S32_LE', channel=8, rate=48000) |
| |
| # The rate of audio data is |
| # 8 channel * 4 bytes/sample * 48000 samples/sec = 1500 KBytes/sec. |
| # So default area which contains 0x2000000 bytes can stream |
| # 32 MBytes / 1500 KBytes = 21 sec of data. |
| MAX_STREAM_TIME_SECS = int( |
| MAX_STREAM_BUFFER_SIZE / (8 * 4 * 48000)) |
| |
| def __init__(self): |
| """Constructs an AudioStreamController object.""" |
| self._memory = mem.MemoryForController |
| self._Stop() |
| |
| @classmethod |
| def _GetMappedAddress(cls, address): |
| """Gets mapped address in ARM's address space. |
| |
| Gets address in ARM's address space for a given address in |
| stream's address space. |
| |
| Args: |
| address: An address relative to _STREAM_BASE_ADDRESS. |
| |
| Returns: |
| A mapped address which is the input address shifted by |
| _STREAM_BASE_ADDRESS. The address is in ARM's address space. |
| """ |
| return address + cls._STREAM_BASE_ADDRESS |
| |
| @property |
| def mapped_start_address(self): |
| """Returns mapped start address. |
| |
| This is the start address of memory allocated for streaming in ARM's |
| address space. |
| """ |
| return AudioStreamController._GetMappedAddress(self._DEFAULT_START_ADDRESS) |
| |
| @property |
| def audio_data_format_as_dict(self): |
| """Format of the audio data used by this stream controller. |
| |
| Returns: |
| A dict containing file_type, sample_format, channel, rate contained in |
| AUDIO_DATA_FORMAT. Refer to audio.AudioDataFormat docstring for details. |
| """ |
| return self.AUDIO_DATA_FORMAT.AsDict() |
| |
| @property |
| def is_streaming(self): |
| """Stream controller is streamming. |
| |
| Stream is running when enable bit is on and there is no underflow. |
| Underflow bit is set to high by fpga when stream enable bit is high but |
| the data from start address to end address are all streamed. |
| |
| Returns: |
| True if controller is streaming. |
| """ |
| stream_run = (self._memory.Read(self._REGS_BASE + self._REG_STREAM_ENABLE) & |
| self._BIT_STREAM_ENABLE) |
| underflow = (self._memory.Read(self._REGS_BASE + |
| self._REG_STREAM_UNDERFLOW) & |
| self._BIT_STREAM_UNDERFLOW) |
| return (stream_run and not underflow) |
| |
| def _Stop(self): |
| """Stops streaming.""" |
| self._memory.ClearMask(self._REGS_BASE + self._REG_STREAM_ENABLE, |
| self._BIT_STREAM_ENABLE) |
| |
| def _Start(self): |
| """Starts streaming.""" |
| self._memory.SetMask(self._REGS_BASE + self._REG_STREAM_ENABLE, |
| self._BIT_STREAM_ENABLE) |
| |
| def _CheckAddressValid(self, name, address): |
| """Checks an address is within valid range, and is aligned. |
| |
| Args: |
| name: The address name. |
| address: An address. |
| |
| Raises: |
| AudioStreamControllerError if address is not valid. |
| """ |
| if address < 0 or address >= self._STREAM_BUFFER_SIZE: |
| raise AudioStreamControllerError( |
| '%s address 0x%x is not in the range of 0 to 0x%x' % ( |
| name, address, self._STREAM_BUFFER_SIZE)) |
| if address & (self.PAGE_SIZE - 1): |
| raise AudioStreamControllerError( |
| '%s address 0x%x is not aligned with 0x%x' % ( |
| name, address, self.PAGE_SIZE)) |
| |
| def StartStreaming(self, size): |
| """Starts streaming a range of data from memory. |
| |
| User is responsible to put data in the stream memory. |
| |
| Args: |
| size: The size of data in bytes to be streamed. |
| """ |
| self._Stop() |
| |
| if size > self.MAX_STREAM_BUFFER_SIZE: |
| raise AudioStreamControllerError( |
| 'Stream size %d bytes exceeds limit %d bytes' %( |
| size, self.MAX_STREAM_BUFFER_SIZE)) |
| |
| start_address = self._DEFAULT_START_ADDRESS |
| end_address = start_address + size |
| loop = self._DEFAULT_LOOP |
| |
| # Checks address is valid. |
| for name, address in [('start', start_address), |
| ('end', end_address)]: |
| self._CheckAddressValid(name, address) |
| |
| # Sets the memory addresses, loop for streaming. |
| self._memory.Write(self._REGS_BASE + self._REG_START_ADDR, start_address) |
| self._memory.Write(self._REGS_BASE + self._REG_END_ADDR, end_address) |
| self._memory.Write(self._REGS_BASE + self._REG_LOOP, loop) |
| |
| self._Start() |
| |
| def StopStreaming(self): |
| """Stops streaming.""" |
| self._Stop() |
| logging.info('Stopped streaming.') |
| |
| def GetStreamedPages(self): |
| """Returns number of streamed pages |
| |
| Returns: |
| Number of streamed pages. |
| """ |
| return self._memory.Read(self._REGS_BASE + self._REG_PAGE_COUNT) |
| |
| |
| class AudioI2SController(object): |
| """A class to control I2S data sent to codec output.""" |
| _REGS_BASE = 0xff21e000 |
| |
| # Sets enable bit to 0 to mute output. 1 to unmute. |
| _REG_ENABLE = 0x0 |
| _BIT_ENABLE = 1 |
| |
| def __init__(self): |
| """Constructs an AudioI2SController object.""" |
| self._memory = mem.MemoryForController |
| |
| def Enable(self): |
| """Enables I2S data to codec output.""" |
| self._memory.SetMask(self._REGS_BASE + self._REG_ENABLE, |
| self._BIT_ENABLE) |
| |
| def Disable(self): |
| """Disables I2S data to codec output.""" |
| self._memory.ClearMask(self._REGS_BASE + self._REG_ENABLE, |
| self._BIT_ENABLE) |