# Copyright (c) 2016, Xilinx, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION). HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import asyncio
import contextlib
import functools
import numpy as np
import time
import warnings
from pynq import DefaultIP
from pynq import DefaultHierarchy
from pynq import Xlnk
from pynq.ps import CPU_ARCH_IS_SUPPORTED, CPU_ARCH
if CPU_ARCH_IS_SUPPORTED:
import pynq.lib._video
else:
warnings.warn("Pynq does not support the CPU Architecture: {}"
.format(CPU_ARCH), ResourceWarning)
__author__ = "Giuseppe Natale, Yun Rock Qu, Peter Ogden"
__copyright__ = "Copyright 2016, Xilinx"
__email__ = "pynq_support@xilinx.com"
[docs]class VideoMode:
"""Class for holding the information about a video mode
Attributes
----------
height : int
Height of the video frame in lines
width : int
Width of the video frame in pixels
stride : int
Width of a line in the video frame in bytes
bits_per_pixel : int
Bits per pixel
bytes_per_Pixel : int
Bytes required to represent each pixel
shape : tuple of int
Numpy-style tuple describing the video frame
"""
def __init__(self, width, height, bits_per_pixel, stride=None):
self.width = width
self.height = height
self.bits_per_pixel = bits_per_pixel
self.bytes_per_pixel = ((bits_per_pixel - 1) // 8) + 1
if stride:
self.stride = stride
else:
self.stride = width * self.bytes_per_pixel
if self.bytes_per_pixel == 1:
self.shape = (self.height, self.width)
else:
self.shape = (self.height, self.width, self.bytes_per_pixel)
def __repr__(self):
return ("VideoMode: width={} height={} bpp={}"
.format(self.width, self.height, self.bits_per_pixel))
[docs]class HDMIInFrontend(DefaultHierarchy):
"""Class for interacting the with HDMI input frontend
This class is used for enabling the HDMI input and retrieving
the mode of the incoming video
Attributes
----------
mode : VideoMode
The detected mode of the incoming video stream
"""
def __init__(self, description):
super().__init__(description)
[docs] def start(self, init_timeout=60):
"""Method that blocks until the video mode is
successfully detected
"""
ip_dict = self.description
gpio_description = ip_dict['ip']['axi_gpio_hdmiin']
gpio_dict = {
'BASEADDR': gpio_description['phys_addr'],
'INTERRUPT_PRESENT': 1,
'IS_DUAL': 1,
}
vtc_description = ip_dict['ip']['vtc_in']
vtc_capture_addr = vtc_description['phys_addr']
self._capture = pynq.lib._video._capture(gpio_dict,
vtc_capture_addr,
init_timeout)
[docs] def stop(self):
"""Currently empty function included for symmetry with
the HDMIOutFrontend class
"""
pass
[docs] @staticmethod
def checkhierarchy(description):
return ('vtc_in' in description['ip'] and
'axi_gpio_hdmiin' in description['ip'])
@property
def mode(self):
return VideoMode(self._capture.frame_width(),
self._capture.frame_height(), 24)
_outputmodes = {
(640, 480): 0,
(800, 600): 1,
(1280, 720): 2,
(1280, 1024): 3,
(1920, 1080): 4
}
[docs]class HDMIOutFrontend(DefaultHierarchy):
"""Class for interacting the HDMI output frontend
This class is used for enabling the HDMI output and setting
the desired mode of the video stream
Attributes
----------
mode : VideoMode
Desired mode for the output video. Must be set prior
to calling start
"""
[docs] @staticmethod
def checkhierarchy(description):
return ('vtc_out' in description['ip'] and
'axi_dynclk' in description['ip'])
def __init__(self, description):
"""Create the HDMI output front end
Parameters
----------
vtc_description : dict
The IP dictionary entry for the video timing controller to use
clock_description : dict
The IP dictionary entry for the clock generator to use
"""
super().__init__(description)
ip_dict = self.description['ip']
vtc_description = ip_dict['vtc_out']
clock_description = ip_dict['axi_dynclk']
vtc_capture_addr = vtc_description['phys_addr']
clock_addr = clock_description['phys_addr']
self._display = pynq.lib._video._display(vtc_capture_addr,
clock_addr, 1)
self.start = self._display.start
"""Start the HDMI output - requires the that mode is already set"""
self.stop = self._display.stop
"""Stop the HDMI output"""
@property
def mode(self):
"""Get or set the video mode for the HDMI output, must be set to one
of the following resolutions:
640x480
800x600
1280x720
1280x1024
1920x1080
Any other resolution will result in a ValueError being raised.
The bits per pixel will always be 24 when retrieved and ignored
when set.
"""
return VideoMode(self._display.frame_width(),
self._display.frame_height(), 24)
@mode.setter
def mode(self, value):
resolution = (value.width, value.height)
if resolution in _outputmodes:
self._display.mode(_outputmodes[resolution])
else:
raise ValueError("Invalid Output resolution {}x{}"
.format(value.width, value.height))
class _FrameCache:
_xlnk = None
def __init__(self, mode, capacity=5):
self._cache = []
self._mode = mode
self._capacity = capacity
def getframe(self):
"""Retrieve a frame from the cache or create a new frame if the
cache is empty. The freebuffer method of the returned array is
overriden to return the object to the cache rather than freeing
the object.
"""
if self._cache:
frame = _FrameCache._xlnk.cma_array(
shape=self._mode.shape, dtype='u1', cacheable=0,
pointer=self._cache.pop(), cache=self)
else:
if _FrameCache._xlnk is None:
_FrameCache._xlnk = Xlnk()
frame = _FrameCache._xlnk.cma_array(
shape=self._mode.shape, dtype=np.uint8, cacheable=0,
cache=self)
return frame
def return_pointer(self, pointer):
if len(self._cache) < self._capacity:
self._cache.append(pointer)
def clear(self):
self._cache.clear()
[docs]class AxiVDMA(DefaultIP):
"""Driver class for the Xilinx VideoDMA IP core
The driver is split into input and output channels are exposed using the
readchannel and writechannel attributes. Each channel has start and
stop methods to control the data transfer. All channels MUST be stopped
before reprogramming the bitstream or inconsistent behaviour may result.
The DMA uses a single ownership model of frames in that frames are either
owned by the DMA or the user code but not both. S2MMChannel.readframe
and MM2SChannel.newframe both return a frame to the user. It is the
user's responsibility to either free the frame using the freebuffer()
method or to hand ownership back to the DMA using MM2SChannel.writeframe.
Once ownership has been returned the user should not access the contents
of the frame as the underlying memory may be deleted without warning.
Attributes
----------
readchannel : AxiVDMA.S2MMChannel
Video input DMA channel
writechannel : AxiVDMA.MM2SChannel
Video output DMA channel
"""
class _FrameList:
"""Internal helper class for handling the list of frames associated
with a DMA channel. Assumes ownership of all frames it contains
unless explicitly removed with takeownership
"""
def __init__(self, parent, offset, count):
self._frames = [None] * count
self._mmio = parent._mmio
self._offset = offset
self._slaves = set()
self.count = count
self.reload = parent.reload
def __getitem__(self, index):
frame = self._frames[index]
return frame
def takeownership(self, index):
self._frames[index] = None
def __len__(self):
return self.count
def __setitem__(self, index, frame):
self._frames[index] = frame
if frame is not None:
self._mmio.write(self._offset + 4 * index,
frame.physical_address)
else:
self._mmio.write(self._offset + 4 * index, 0)
self.reload()
for s in self._slaves:
s[index] = frame
s.takeownership(index)
def addslave(self, slave):
self._slaves.add(slave)
for i in range(len(self._frames)):
slave[i] = self[i]
slave.takeownership(i)
slave.reload()
def removeslave(self, slave):
self._slaves.remove(slave)
[docs] class S2MMChannel:
"""Read channel of the Video DMA
Brings frames from the video input into memory. Hands ownership of
the read frames to the user code.
Attributes
----------
mode : VideoMode
The video mode of the DMA channel
"""
def __init__(self, parent, interrupt):
self._mmio = parent.mmio
self._frames = AxiVDMA._FrameList(self, 0xAC, parent.framecount)
self._interrupt = interrupt
self._sinkchannel = None
self._mode = None
def _readframe_internal(self):
self.irqframecount = 1
nextframe = self._cache.getframe()
previous_frame = (self.activeframe + 2) % len(self._frames)
captured = self._frames[previous_frame]
self._frames.takeownership(previous_frame)
self._frames[previous_frame] = nextframe
post_frame = (self.activeframe + 2) % len(self._frames)
captured.invalidate()
return captured
[docs] def readframe(self):
"""Read a frame from the channel and return to the user
This function may block until a complete frame has been read. A
single frame buffer is kept so the first frame read after a long
pause in reading may return a stale frame. To ensure an up-to-date
frame when starting processing video read an additional time
before starting the processing loop.
Returns
-------
numpy.ndarray of the video frame
"""
if not self.running:
raise RuntimeError('DMA channel not started')
while self._mmio.read(0x34) & 0x1000 == 0:
loop = asyncio.get_event_loop()
loop.run_until_complete(
asyncio.ensure_future(self._interrupt.wait()))
pass
self._mmio.write(0x34, 0x1000)
return self._readframe_internal()
[docs] async def readframe_async(self):
"""Read a frame from the channel, yielding instead of blocking
if no data is available. See readframe for more details
"""
if not self.running:
raise RuntimeError('DMA channel not started')
while self._mmio.read(0x34) & 0x1000 == 0:
await self._interrupt.wait()
self._mmio.write(0x34, 0x1000)
return self._readframe_internal()
@property
def activeframe(self):
"""The frame index currently being processed by the DMA
This process requires clearing any error bits in the DMA channel
"""
self._mmio.write(0x34, 0x4090)
return (self._mmio.read(0x28) >> 24) & 0x1F
@property
def desiredframe(self):
"""The next frame index to the processed by the DMA
"""
return (self._mmio.read(0x28) >> 8) & 0x1F
@desiredframe.setter
def desiredframe(self, frame_number):
if frame_number < 0 or frame_number >= len(self._frames):
raise ValueError("Invalid frame index")
register_value = self._mmio.read(0x28)
mask = ~(0x1F << 8)
register_value &= mask
register_value |= (frame_number << 8)
self._mmio.write(0x28, register_value)
@property
def mode(self):
"""The video mode of the DMA. Must be set prior to starting.
Changing this while the DMA is running will result in the DMA
being stopped.
"""
return self._mode
@mode.setter
def mode(self, value):
if self.running:
self.stop()
self._mode = value
@property
def running(self):
"""Is the DMA channel running
"""
return (self._mmio.read(0x34) & 0x1) == 0
@property
def parked(self):
"""Is the channel parked or running in circular buffer mode
"""
return self._mmio.read(0x30) & 0x2 == 0
@parked.setter
def parked(self, value):
register = self._mmio.read(0x30)
if value:
register &= ~0x2
else:
register |= 0x2
self._mmio.write(0x30, register)
@property
def irqframecount(self):
register = self._mmio.read(0x30)
return (register >> 16) & 0xFF
@irqframecount.setter
def irqframecount(self, val):
register = self._mmio.read(0x30)
newregister = (register & 0xFF00FFFF) | (val << 16)
if register != newregister:
self._mmio.write(0x30, newregister)
[docs] def start(self):
"""Start the DMA. The mode must be set prior to this being called
"""
if not self._mode:
raise RuntimeError("Video mode not set, channel not started")
self.desiredframe = 0
self._cache = _FrameCache(self._mode)
for i in range(len(self._frames)):
self._frames[i] = self._cache.getframe()
self._writemode()
self.reload()
self._mmio.write(0x30, 0x00011083) # Start DMA
self.irqframecount = 4 # Ensure all frames are written to
self._mmio.write(0x34, 0x1000) # Clear any interrupts
while not self.running:
pass
self.reload()
self.desiredframe = 1
[docs] def stop(self):
"""Stops the DMA, clears the frame cache and unhooks any tied
outputs
"""
self.tie(None)
self._mmio.write(0x30, 0x00011080)
while self.running:
pass
for i in range(len(self._frames)):
self._frames[i] = None
if hasattr(self, '_cache'):
self._cache.clear()
def _writemode(self):
self._mmio.write(0xA4, self._mode.width *
self._mode.bytes_per_pixel)
self._mmio.write(0xA8, self._mode.stride)
[docs] def reload(self):
"""Reload the configuration of the DMA. Should only be called
by the _FrameList class or if you really know what you are doing
"""
if self.running:
self._mmio.write(0xA0, self._mode.height)
[docs] def reset(self):
"""Soft reset the DMA. Finishes all transfers before starting
the reset process
"""
self.stop()
self._mmio.write(0x30, 0x00011084)
while self._mmio.read(0x30) & 0x4 == 4:
pass
[docs] def tie(self, channel):
"""Ties an output channel to this input channel. This is used
to pass video from input to output without invoking the CPU
for each frame. Main use case is when some slow processing is
being done on a subset of frames while the video is passed
through directly to the output. Only one output may be tied
to an output. The tie is broken either by calling tie(None) or
writing a frame to the tied output channel.
"""
if self._sinkchannel:
self._frames.removeslave(self._sinkchannel._frames)
self._sinkchannel.parked = True
self._sinkchannel.sourcechannel = None
self._sinkchannel = channel
if self._sinkchannel:
self._frames.addslave(self._sinkchannel._frames)
self._sinkchannel.parked = False
self._sinkchannel.framedelay = 1
self._sinkchannel.sourcechannel = self
[docs] class MM2SChannel:
"""DMA channel from memory to a video output.
Will continually repeat the most recent frame written.
Attributes
----------
mode : VideoMode
Video mode of the DMA channel
"""
def __init__(self, parent, interrupt):
self._mmio = parent.mmio
self._frames = AxiVDMA._FrameList(self, 0x5C, parent.framecount)
self._interrupt = interrupt
self._mode = None
self.sourcechannel = None
[docs] def start(self):
"""Start the DMA channel with a blank screen. The mode must
be set prior to calling or a RuntimeError will result.
"""
if not self._mode:
raise RuntimeError("Video mode not set, channel not started")
self._cache = _FrameCache(self._mode)
self._frames[0] = self._cache.getframe()
self._writemode()
self.reload()
self._mmio.write(0x00, 0x00011089)
while not self.running:
pass
self.reload()
self.desiredframe = 0
pass
[docs] def stop(self):
"""Stop the DMA channel and empty the frame cache
"""
self._mmio.write(0x00, 0x00011080)
while self.running:
pass
for i in range(len(self._frames)):
self._frames[i] = None
if hasattr(self, '_cache'):
self._cache.clear()
[docs] def reset(self):
"""Soft reset the DMA channel
"""
self.stop()
self._mmio.write(0x00, 0x00011084)
while self._mmio.read(0x00) & 0x4 == 4:
pass
def _writeframe_internal(self, frame):
if self.sourcechannel:
self.sourcechannel.tie(None)
frame.flush()
next_frame = (self.desiredframe + 1) % len(self._frames)
self._frames[next_frame] = frame
self.desiredframe = next_frame
[docs] def writeframe(self, frame):
"""Schedule the specified frame to be the next one displayed.
Assumes ownership of frame which should no longer be modified
by the user. May block if there is already a frame scheduled.
"""
if not self.running:
raise RuntimeError('DMA channel not started')
while self._mmio.read(0x04) & 0x1000 == 0:
loop = asyncio.get_event_loop()
loop.run_until_complete(
asyncio.ensure_future(self._interrupt.wait()))
self._mmio.write(0x04, 0x1000)
self._writeframe_internal(frame)
[docs] async def writeframe_async(self, frame):
"""Same as writeframe() but yields instead of blocking if a
frame is already scheduled
"""
if not self.running:
raise RuntimeError('DMA channel not started')
while self._mmio.read(0x04) & 0x1000 == 0:
await self._interrupt.wait()
self._mmio.write(0x04, 0x1000)
self._writeframe_internal(frame)
[docs] def setframe(self, frame):
"""Sets a frame without blocking or taking ownership. In most
circumstances writeframe() is more appropriate
"""
frameindex = self.desiredframe
self._frames[frameindex] = frame
self._frames.takeownership(frameindex)
def _writemode(self):
self._mmio.write(0x54, self._mode.width *
self._mode.bytes_per_pixel)
register = self._mmio.read(0x58)
register &= (0xF << 24)
register |= self._mode.stride
self._mmio.write(0x58, register)
[docs] def reload(self):
"""Reload the configuration of the DMA. Should only be called
by the _FrameList class or if you really know what you are doing
"""
if self.running:
self._mmio.write(0x50, self._mode.height)
[docs] def newframe(self):
"""Returns a frame of the appropriate size for the video mode.
The contents of the frame are undefined and should not be assumed
to be black
Returns
-------
numpy.ndarray video frame
"""
return self._cache.getframe()
@property
def activeframe(self):
self._mmio.write(0x04, 0x4090)
return (self._mmio.read(0x28) >> 16) & 0x1F
@property
def desiredframe(self):
return self._mmio.read(0x28) & 0x1F
@desiredframe.setter
def desiredframe(self, frame_number):
if frame_number < 0 or frame_number >= len(self._frames):
raise ValueError("Invalid Frame Index")
register_value = self._mmio.read(0x28)
mask = ~0x1F
register_value &= mask
register_value |= frame_number
self._mmio.write(0x28, register_value)
@property
def running(self):
return (self._mmio.read(0x04) & 0x1) == 0
@property
def mode(self):
"""The video mode of the DMA, must be called prior to starting.
If changed while the DMA channel is running the channel will be
stopped
"""
return self._mode
@mode.setter
def mode(self, value):
if self.running:
self.stop()
self._mode = value
@property
def parked(self):
"""Is the channel parked or running in circular buffer mode
"""
return self._mmio.read(0x00) & 0x2 == 0
@parked.setter
def parked(self, value):
register = self._mmio.read(0x00)
if value:
self.desiredframe = self.activeframe
register &= ~0x2
else:
register |= 0x2
self._mmio.write(0x00, register)
@property
def framedelay(self):
register = self._mmio.read(0x58)
return register >> 24
@framedelay.setter
def framedelay(self, value):
register = self._mmio.read(0x58)
register &= 0xFFFF
register |= value << 24
self._mmio.write(0x58, register)
def __init__(self, description, framecount=4):
"""Create a new instance of the AXI Video DMA driver
Parameters
----------
name : str
The name of the IP core to instantiate the driver for
"""
super().__init__(description)
self.framecount = framecount
self.readchannel = AxiVDMA.S2MMChannel(self, self.s2mm_introut)
self.writechannel = AxiVDMA.MM2SChannel(self, self.mm2s_introut)
bindto = ['xilinx.com:ip:axi_vdma:6.2',
'xilinx.com:ip:axi_vdma:6.3']
[docs]class ColorConverter(DefaultIP):
"""Driver for the color space converter
The colorspace convert implements a 3x4 matrix for performing arbitrary
linear color conversions. Each coefficient is represented as a 10 bit
signed fixed point number with 2 integer bits. The result of the
computation can visualised as a table
# in1 in2 in3 1
# out1 c1 c2 c3 c10
# out2 c4 c5 c6 c11
# out3 c7 c8 c9 c12
The color can be changed mid-stream.
Attributes
----------
colorspace : list of float
The coefficients of the colorspace conversion
"""
def __init__(self, description):
"""Construct an instance of the driver
Attributes
----------
description : dict
IP dict entry for the IP core
"""
super().__init__(description)
bindto = ['xilinx.com:hls:color_convert:1.0']
@staticmethod
def _signextend(value):
"""Sign extend a 10-bit number
Derived from https://stackoverflow.com/questions/32030412/
twos-complement-sign-extension-python
"""
return (value & 0x1FF) - (value & 0x200)
@property
def colorspace(self):
"""The colorspace to convert. See the class description for
details of the coefficients. The coefficients are a list of
floats of length 12
"""
return [ColorConverter._signextend(self.read(0x10 + 8 * i)) / 256
for i in range(12)]
@colorspace.setter
def colorspace(self, color):
if len(color) != 12:
raise ValueError("Wrong number of elements in color specification")
for i, c in enumerate(color):
self.write(0x10 + 8 * i, int(c * 256))
[docs]class PixelPacker(DefaultIP):
"""Driver for the pixel format convert
Changes the number of bits per pixel in the video stream. The stream
should be paused prior to the width being changed. This can be targeted
at either a pixel_pack or a pixel_unpack IP core.For a packer the input
is always 24 bits per pixel while for an unpacker the output 24 bits per
pixel.
"""
def __init__(self, description):
"""Construct an instance of the driver
Attributes
----------
description : dict
IP dict entry for the IP core
"""
super().__init__(description)
self._bpp = 24
self.write(0x10, 0)
self._resample = False
bindto = ['xilinx.com:hls:pixel_pack:1.0',
'xilinx.com:hls:pixel_unpack:1.0']
@property
def bits_per_pixel(self):
"""Number of bits per pixel in the stream
Valid values are 8, 24 and 32. The following table describes the
operation for packing and unpacking for each width
Mode Pack Unpack
8 bpp Keep only the first channel Pad other channels with 0
16 bpp Dependent on resample Dependent on resample
24 bpp No change No change
32 bpp Pad channel 4 with 0 Discard channel 4
"""
mode = self.read(0x10)
if mode == 0:
return 24
elif mode == 1:
return 32
elif mode == 2:
return 8
elif mode <= 4:
return 16
@bits_per_pixel.setter
def bits_per_pixel(self, value):
if value == 24:
mode = 0
elif value == 32:
mode = 1
elif value == 8:
mode = 2
elif value == 16:
if self._resample:
mode = 4
else:
mode = 3
else:
raise ValueError("Bits per pixel must be 8, 16, 24 or 32")
self._bpp = value
self.write(0x10, mode)
@property
def resample(self):
"""Perform chroma resampling in 16 bpp mode
Boolean property that only affects 16 bpp mode. If True then
the two chroma channels are multiplexed on to the second output
pixel, otherwise only the first and second channels are transferred
and the third is discarded
"""
return self._resample
@resample.setter
def resample(self, value):
self._resample = value
# Make sure the mode is updated
if self.bits_per_pixel == 16:
self.bits_per_pixel = 16
COLOR_IN_BGR = [1, 0, 0,
0, 1, 0,
0, 0, 1,
0, 0, 0]
COLOR_OUT_BGR = [1, 0, 0,
0, 1, 0,
0, 0, 1,
0, 0, 0]
COLOR_IN_RGB = [0, 0, 1,
0, 1, 0,
1, 0, 0,
0, 0, 0]
COLOR_OUT_RGB = [0, 0, 1,
0, 1, 0,
1, 0, 0,
0, 0, 0]
COLOR_IN_YCBCR = [0.114, 0.587, 0.299,
0.5, -0.331264, -0.168736,
-0.081312, -0.41866, 0.5,
0, 0.5, 0.5]
COLOR_OUT_YCBCR = [1, 1.772, 0,
1, -0.3344136, -0.714136,
1, 0, 1.402,
-0.886, 0.529136, -0.701]
COLOR_OUT_GRAY = [1, 0, 0,
1, 0, 0,
1, 0, 0,
0, 0, 0]
PIXEL_RGB = PixelFormat(24, COLOR_IN_RGB, COLOR_OUT_RGB)
PIXEL_RGBA = PixelFormat(32, COLOR_IN_RGB, COLOR_OUT_RGB)
PIXEL_BGR = PixelFormat(24, COLOR_IN_BGR, COLOR_OUT_BGR)
PIXEL_YCBCR = PixelFormat(24, COLOR_IN_YCBCR, COLOR_OUT_YCBCR)
PIXEL_GRAY = PixelFormat(8, COLOR_IN_YCBCR, COLOR_OUT_GRAY)
[docs]class HDMIIn(DefaultHierarchy):
"""Wrapper for the input video pipeline of the Pynq-Z1 base overlay
This wrapper assumes the following pipeline structure and naming
color_convert_in -> pixel_pack ->axi_vdma
with vtc_in and axi_gpio_hdmiiin helper IP
Attributes
----------
frontend : pynq.lib.video.HDMIInFrontend
The HDMI frontend for signal detection
color_convert : pynq.lib.video.ColorConverter
The input color format converter
pixel_pack : pynq.lib.video.PixelPacker
Converts the input pixel size to that required by the VDMA
"""
[docs] @staticmethod
def checkhierarchy(description):
if 'frontend' in description['hierarchies']:
frontend_dict = description['hierarchies']['frontend']
else:
return False
return (
'pixel_pack' in description['ip'] and
'color_convert' in description['ip'] and
description['ip']['pixel_pack']['driver'] == PixelPacker and
description['ip']['color_convert']['driver'] == ColorConverter and
HDMIInFrontend.checkhierarchy(frontend_dict))
def __init__(self, description, vdma=None):
"""Initialise the drivers for the pipeline
Parameters
----------
path : str
name of the hierarchy containing all of the video blocks
"""
super().__init__(description)
ip_dict = self.description
self._vdma = vdma
self._color = self.color_convert
self._pixel = self.pixel_pack
self._hdmi = self.frontend
[docs] def start(self):
"""Start the pipeline
"""
self._vdma.readchannel.start()
return self._stopcontextmanager()
[docs] def stop(self):
"""Stop the pipeline
"""
self._vdma.readchannel.stop()
@contextlib.contextmanager
def _stopcontextmanager(self):
"""Context Manager to stop the VDMA at the end of the block
"""
yield
self.stop()
@contextlib.contextmanager
def _closecontextmanager(self):
"""Context Manager to close the HDMI port at the end of the block
"""
yield
self.close()
[docs] def close(self):
"""Uninitialise the drivers, stopping the pipeline beforehand
"""
self.stop()
self._hdmi.stop()
@property
def colorspace(self):
"""The colorspace of the pipeline, can be changed without stopping
the pipeline
"""
return self._color.colorspace
@colorspace.setter
def colorspace(self, new_colorspace):
self._color.colorspace = new_colorspace
@property
def mode(self):
"""Video mode of the input
"""
return self._vdma.readchannel.mode
[docs] def readframe(self):
"""Read a video frame
See AxiVDMA.S2MMChannel.readframe for details
"""
return self._vdma.readchannel.readframe()
[docs] async def readframe_async(self):
"""Read a video frame
See AxiVDMA.S2MMChannel.readframe for details
"""
return await self._vdma.readchannel.readframe_async()
[docs] def tie(self, output):
"""Mirror the video input on to an output channel
Parameters
----------
output : HDMIOut
The output to mirror on to
"""
self._vdma.readchannel.tie(output._vdma.writechannel)
[docs]class HDMIOut(DefaultHierarchy):
"""Wrapper for the output video pipeline of the Pynq-Z1 base overlay
This wrapper assumes the following pipeline structure and naming
axi_vdma -> pixel_unpack -> color_convert -> frontend
with vtc_out and axi_dynclk helper IP
Attributes
----------
frontend : pynq.lib.video.HDMIOutFrontend
The HDMI frontend for mode setting
color_convert : pynq.lib.video.ColorConverter
The output color format converter
pixel_unpack : pynq.lib.video.PixelPacker
Converts the input pixel size to 24 bits-per-pixel
"""
[docs] @staticmethod
def checkhierarchy(description):
if 'frontend' in description['hierarchies']:
frontend_hierarchy = description['hierarchies']['frontend']
else:
return False
return (
'pixel_unpack' in description['ip'] and
'color_convert' in description['ip'] and
description['ip']['pixel_unpack']['driver'] == PixelPacker and
description['ip']['color_convert']['driver'] == ColorConverter and
HDMIOutFrontend.checkhierarchy(frontend_hierarchy))
def __init__(self, description, vdma=None):
"""Initialise the drivers for the pipeline
Parameters
----------
path : str
name of the hierarchy containing all of the video blocks
"""
super().__init__(description)
self._vdma = vdma
self._color = self.color_convert
self._pixel = self.pixel_unpack
self._hdmi = self.frontend
[docs] def start(self):
"""Start the pipeline
"""
self._vdma.writechannel.start()
return self._stopcontextmanager()
[docs] def stop(self):
"""Stop the pipeline
"""
self._vdma.writechannel.stop()
[docs] def close(self):
"""Close the pipeline an unintialise the drivers
"""
self.stop()
self._hdmi.stop()
@contextlib.contextmanager
def _stopcontextmanager(self):
"""Context Manager to stop the VDMA at the end of the block
"""
yield
self.stop()
@contextlib.contextmanager
def _closecontextmanager(self):
"""Context Manager to close the HDMI port at the end of the block
"""
yield
self.close()
@property
def colorspace(self):
"""Set the colorspace for the pipeline - can be done without
stopping the pipeline
"""
return self._color.colorspace
@colorspace.setter
def colorspace(self, new_colorspace):
self._color.colorspace = new_colorspace
@property
def mode(self):
"""The currently configured video mode
"""
return self._vdma.writechannel.mode
[docs] def newframe(self):
"""Return an unintialised video frame of the correct type for the
pipeline
"""
return self._vdma.writechannel.newframe()
[docs] def writeframe(self, frame):
"""Write the frame to the video output
See AxiVDMA.MM2SChannel.writeframe for more details
"""
self._vdma.writechannel.writeframe(frame)
[docs] async def writeframe_async(self, frame):
"""Write the frame to the video output
See AxiVDMA.MM2SChannel.writeframe for more details
"""
await self._vdma.writechannel.writeframe_async(frame)
[docs]class HDMIWrapper(DefaultHierarchy):
"""Hierarchy driver for the entire Pynq-Z1 video subsystem.
Exposes the input, output and video DMA as attributes. For most
use cases the wrappers for the input and output pipelines are
sufficient and the VDMA will not need to be used directly.
Attributes
----------
hdmi_in : pynq.lib.video.HDMIIn
The HDMI input pipeline
hdmi_out : pynq.lib.video.HDMIOut
The HDMI output pipeline
axi_vdma : pynq.lib.video.AxiVDMA
The video DMA.
"""
[docs] @staticmethod
def checkhierarchy(description):
if 'hdmi_in' in description['hierarchies']:
in_dict = description['hierarchies']['hdmi_in']
else:
return False
if 'hdmi_out' in description['hierarchies']:
out_dict = description['hierarchies']['hdmi_out']
else:
return False
return ('axi_vdma' in description['ip'] and
description['ip']['axi_vdma']['driver'] == AxiVDMA and
HDMIIn.checkhierarchy(in_dict) and
HDMIOut.checkhierarchy(out_dict))
def __init__(self, description):
super().__init__(description)
in_dict = description['hierarchies']['hdmi_in']
out_dict = description['hierarchies']['hdmi_out']
self.hdmi_in = HDMIIn(in_dict, self.axi_vdma)
self.hdmi_out = HDMIOut(out_dict, self.axi_vdma)