Source code for pynq.lib.dma

#   Copyright (c) 2017, Xilinx, Inc.
#   All rights reserved.
#
#   Redistribution and use in source and binary forms, with or without
#   modification, are permitted provided that the following conditions are met:
#
#   1.  Redistributions of source code must retain the above copyright notice,
#       this list of conditions and the following disclaimer.
#
#   2.  Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#
#   3.  Neither the name of the copyright holder nor the names of its
#       contributors may be used to endorse or promote products derived from
#       this software without specific prior written permission.
#
#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
#   AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
#   THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
#   PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
#   CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
#   EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
#   PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
#   OR BUSINESS INTERRUPTION). HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
#   WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
#   OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
#   ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from pynq import DefaultIP
import warnings


__author__ = 'Peter Ogden, Anurag Dubey'
__copyright__ = 'Copyright 2017, Xilinx'
__email__ = 'pynq_support@xilinx.com'


MAX_C_SG_LENGTH_WIDTH = 26


class _DMAChannel:
    """Drives a single channel of the Xilinx AXI DMA

    This driver is designed to be used in conjunction with the
    `Xlnk.cma_array` method of memory allocation. The channel has
    main functions `transfer` and `wait` which start and wait for
    the transfer to finish respectively. If interrupts are enabled
    there is also a `wait_async` coroutine.

    This class should not be constructed directly, instead used
    through the AxiDMA class.

    """
    def __init__(self, mmio, offset, size, flush_before, interrupt=None):
        self._mmio = mmio
        self._offset = offset
        self._interrupt = interrupt
        self._flush_before = flush_before
        self._size = size
        self._active_buffer = None
        self._first_transfer = True
        self.start()

    @property
    def running(self):
        """True if the DMA engine is currently running

        """
        return self._mmio.read(self._offset + 4) & 0x01 == 0x00

    @property
    def idle(self):
        """True if the DMA engine is idle

        `transfer` can only be called when the DMA is idle

        """
        return self._mmio.read(self._offset + 4) & 0x02 == 0x02

    def start(self):
        """Start the DMA engine if stopped

        """
        if self._interrupt:
            self._mmio.write(self._offset, 0x1001)
        else:
            self._mmio.write(self._offset, 0x0001)
        while not self.running:
            pass
        self._first_transfer = True

    def stop(self):
        """Stops the DMA channel and aborts the current transfer

        """
        self._mmio.write(self._offset, 0x0000)
        while self.running:
            pass

    def _clear_interrupt(self):
        self._mmio.write(self._offset + 4, 0x1000)

    def transfer(self, array):
        """Transfer memory with the DMA

        Transfer must only be called when the channel is idle.

        Parameters
        ----------
        array : ContiguousArray
            An xlnk allocated array to be transferred

        """
        if array.nbytes > self._size:
            raise ValueError('Transferred array is {} bytes, which exceeds '
                             'the maximum DMA buffer size {}.'.format(
                              array.nbytes, self._size))
        if not self.running:
            raise RuntimeError('DMA channel not started')
        if not self.idle and not self._first_transfer:
            raise RuntimeError('DMA channel not idle')
        if self._flush_before:
            array.flush()
        self._mmio.write(self._offset + 0x18, array.physical_address)
        self._mmio.write(self._offset + 0x28, array.nbytes)
        self._active_buffer = array
        self._first_transfer = False

    def wait(self):
        """Wait for the transfer to complete

        """
        if not self.running:
            raise RuntimeError('DMA channel not started')
        while not self.idle:
            pass
        if not self._flush_before:
            self._active_buffer.invalidate()

    async def wait_async(self):
        """Wait for the transfer to complete

        """
        if not self.running:
            raise RuntimeError('DMA channel not started')
        while not self.idle:
            await self._interrupt.wait()
        self._clear_interrupt()
        if not self._flush_before:
            self._active_buffer.invalidate()


[docs]class DMA(DefaultIP): """Class for Interacting with the AXI Simple DMA Engine This class provides two attributes for the read and write channels. The read channel copies data from the stream into memory and the write channel copies data from memory to the output stream. Both channels have an identical API consisting of `transfer` and `wait` functions. If interrupts have been enabled and connected for the DMA engine then `wait_async` is also present. Buffers to be transferred must be allocated through the Xlnk driver using the cma_array function either directly or indirectly. This means that Frames from the video subsystem can be transferred using this class. Attributes ---------- recvchannel : _DMAChannel The stream to memory channel sendchannel : _DMAChannel The memory to stream channel """ def __init__(self, description, *args, **kwargs): """Create an instance of the DMA Driver Parameters ---------- description : dict The entry in the IP dict describing the DMA engine """ if type(description) is not dict or args or kwargs: raise RuntimeError('You appear to want the old DMA driver which ' 'has been deprecated and moved to ' 'pynq.lib.deprecated') super().__init__(description=description) if 'parameters' in description and \ 'c_sg_length_width' in description['parameters']: self.buffer_max_size = \ 1 << int(description['parameters']['c_sg_length_width']) else: self.buffer_max_size = 1 << MAX_C_SG_LENGTH_WIDTH message = 'Failed to find parameter c_sg_length_width; ' \ 'users should really use *.hwh files for overlays.' warnings.warn(message, UserWarning) if 'mm2s_introut' in description['interrupts']: self.sendchannel = _DMAChannel(self.mmio, 0x0, self.buffer_max_size, True, self.mm2s_introut) else: self.sendchannel = _DMAChannel(self.mmio, 0x0, self.buffer_max_size, True) if 's2mm_introut' in description['interrupts']: self.recvchannel = _DMAChannel(self.mmio, 0x30, self.buffer_max_size, False, self.s2mm_introut) else: self.recvchannel = _DMAChannel(self.mmio, 0x30, self.buffer_max_size, False) bindto = ['xilinx.com:ip:axi_dma:7.1']