Source code for pynq.lib.dma

#   Copyright (c) 2017, Xilinx, Inc.
#   SPDX-License-Identifier: BSD-3-Clause

import numpy
from pynqutils.runtime.repr_dict import ReprDict
from pynq import allocate
from pynq import DefaultIP, UnsupportedConfiguration



MAX_C_SG_LENGTH_WIDTH = 26
DMA_TYPE_TX = 1
DMA_TYPE_RX = 0


class _SDMAChannel:
    """Drives a single channel of the Xilinx AXI Simple DMA

    This driver is designed to be used in conjunction with the
    `pynq.allocate()` method of memory allocation. The channel has
    main functions `transfer` and `wait` which start and wait for
    the transfer to finish respectively. If interrupts are enabled
    there is also a `wait_async` coroutine.

    This class should not be constructed directly, instead used
    through the AxiDMA class.

    """

    def __init__(self, mmio, max_size, width, tx_rx, dre, interrupt=None):
        """Initialize the simple DMA object.

        Parameters
        ----------
        mmio : MMIO
            The MMIO controller used for DMA IP.
        max_size : int
            Max size of the DMA buffer. Exceeding this will hang the system.
        width : int
            Number of bytes for each data.
        tx_rx : int
            Set to DMA_TYPE_TX(1) for sending or DMA_TYPE_RX(0) for receiving.
        dre : bool
            Data alignment enable.
        interrupt: Interrupt
            Interrupt used by the DMA channel.

        """
        self._mmio = mmio
        self._interrupt = interrupt
        self._max_size = max_size
        self._active_buffer = None
        self._first_transfer = True
        self._align = width
        self._tx_rx = tx_rx
        self._dre = dre

        if tx_rx == DMA_TYPE_RX:
            self._offset = 0x30
            self._flush_before = False
        else:
            self._offset = 0
            self._flush_before = True

        self.transferred = 0
        self.start()

    @property
    def running(self):
        """True if the DMA engine is currently running"""
        return self._mmio.read(self._offset + 4) & 0x01 == 0x00

    @property
    def idle(self):
        """True if the DMA engine is idle

        `transfer` can only be called when the DMA is idle

        """
        return self._mmio.read(self._offset + 4) & 0x02 == 0x02

    @property
    def error(self):
        """True if DMA engine is in an error state"""
        return self._mmio.read(self._offset + 4) & 0x70 != 0x0

    def start(self):
        """Start the DMA engine if stopped"""
        if self._interrupt:
            self._mmio.write(self._offset, 0x1001)
        else:
            self._mmio.write(self._offset, 0x0001)
        while not self.running:
            pass
        self._first_transfer = True

    def stop(self):
        """Stops the DMA channel and aborts the current transfer"""
        self._mmio.write(self._offset, 0x0000)
        while self.running:
            pass

    def _clear_interrupt(self):
        self._mmio.write(self._offset + 4, 0x1000)

    def transfer(self, array, start=0, nbytes=0):
        """Transfer memory with the DMA

        Transfer must only be called when the channel is idle.
        For `nbytes`, 0 means everything after the starting point.

        If the AXI DMA is not configured for data re-alignment then a
        valid address must be aligned or undefined results occur.

        For MM2S (send), if Data Realignment Engine (DRE) is not included,
        the source address must be MM2S memory map data width aligned.

        For S2MM (recv), if Data Realignment Engine is not included,
        the destination address must be S2MM Memory Map data width aligned.

        For example, if memory map data width = 32, data is aligned if it is
        located at word offsets (32-bit offset), that is, 0x0, 0x4, 0x8, 0xC,
        and so forth.

        Parameters
        ----------
        array : ContiguousArray
            An contiguously allocated array to be transferred
        start : int
             Offset into array to start. Default is 0.
        nbytes : int
             Number of bytes to transfer. Default is 0.

        """
        if not self.running:
            raise RuntimeError("DMA channel not started")
        if not self.idle and not self._first_transfer:
            raise RuntimeError("DMA channel not idle")
        if nbytes == 0:
            nbytes = array.nbytes - start
        if nbytes > self._max_size:
            raise ValueError(
                "Transfer size is {} bytes, which exceeds "
                "the maximum DMA buffer size {}.".format(nbytes, self._max_size)
            )

        if not self._dre and ((array.physical_address + start) % self._align) != 0:
            raise RuntimeError(
                "DMA does not support unaligned transfers; "
                "Starting address must be aligned to "
                "{} bytes.".format(self._align)
            )
        if self._flush_before:
            array.flush()
        self.transferred = 0
        self._mmio.write(
            self._offset + 0x18, (array.physical_address + start) & 0xFFFFFFFF
        )
        self._mmio.write(
            self._offset + 0x1C, ((array.physical_address + start) >> 32) & 0xFFFFFFFF
        )
        self._mmio.write(self._offset + 0x28, nbytes)
        self._active_buffer = array
        self._first_transfer = False

    def wait(self):
        """Wait for the transfer to complete"""
        if not self.running:
            raise RuntimeError("DMA channel not started")
        while True:
            error = self._mmio.read(self._offset + 4)
            if self.error:
                if error & 0x10:
                    raise RuntimeError("DMA Internal Error (transfer length 0?)")
                if error & 0x20:
                    raise RuntimeError(
                        "DMA Slave Error (cannot access memory map interface)"
                    )
                if error & 0x40:
                    raise RuntimeError("DMA Decode Error (invalid address)")
            if self.idle:
                break
        if not self._flush_before:
            self._active_buffer.invalidate()
        self.transferred = self._mmio.read(self._offset + 0x28)

    async def wait_async(self):
        """Wait for the transfer to complete"""
        if not self.running:
            raise RuntimeError("DMA channel not started")
        while not self.idle:
            await self._interrupt.wait()
        self._clear_interrupt()
        if not self._flush_before:
            self._active_buffer.invalidate()
        self.transferred = self._mmio.read(self._offset + 0x28)


class _SGDMAChannel:
    """Drives a single channel of the Xilinx AXI Scatter-Gather DMA

    This driver is designed to be used in conjunction with the
    `pynq.allocate()` method of memory allocation. The channel has
    main functions `transfer` and `wait` which start and wait for
    the transfer to finish respectively. If interrupts are enabled
    there is also a `wait_async` coroutine.

    This class should not be constructed directly, instead used
    through the AxiDMA class.

    """

    def __init__(self, mmio, max_size, width, tx_rx, dre, interrupt=None):
        """Initialize the simple DMA object.

        Parameters
        ----------
        mmio : MMIO
            The MMIO controller used for DMA IP.
        max_size : int
            Max size of the DMA buffer. Exceeding this will hang the system.
        width : int
            Number of bytes for each data.
        tx_rx : int
            Set to DMA_TYPE_TX(1) for sending or DMA_TYPE_RX(0) for receiving.
        dre : bool
            Data alignment enable.
        interrupt: Interrupt
            Interrupt used by the DMA channel.

        """
        self._mmio = mmio
        self._interrupt = interrupt
        self._max_size = max_size
        self._active_buffer = None
        self._align = width
        self._tx_rx = tx_rx
        self._dre = dre
        self._cyclic = False

        if tx_rx == DMA_TYPE_RX:
            self._offset = 0x30
            self._flush_before = False
        else:
            self._offset = 0
            self._flush_before = True

        self.transferred = 0
        self._transfer_started = False
        self._descr = None
        self._num_descr = 0

        self.stop()

    @property
    def running(self):
        """True if the DMA engine is currently running"""
        return self._mmio.read(self._offset + 4) & 0x01 == 0x00

    @property
    def halted(self):
        """True if the DMA engine is halted.

        `transfer` can only be called when the DMA is halted

        """
        return self._mmio.read(self._offset + 4) & 0x01 == 0x1

    @property
    def idle(self):
        """True if the DMA engine is idle

        `transfer` can only be called when the DMA is idle

        """
        return self._mmio.read(self._offset + 4) & 0x02 == 0x02

    @property
    def error(self):
        """True if DMA engine is in an error state"""
        return self._mmio.read(self._offset + 4) & 0x770 != 0x0

    def start(self):
        """Start the DMA engine if stopped"""
        if not self._cyclic:
            if self._interrupt:
                self._mmio.write(self._offset, 0x1001)
            else:
                self._mmio.write(self._offset, 0x0001)
        else:
            if self._interrupt:
                self._mmio.write(self._offset, 0x1011)
            else:
                self._mmio.write(self._offset, 0x0011)
        while not self.running:
            pass
        self._transfer_started = True

    def stop(self):
        """Stops the DMA channel and aborts the current transfer"""
        self._mmio.write(self._offset, 0x0000)
        # In Cyclic BD mode the DMA requires a reset if stopped
        if self._cyclic:
            self._mmio.write(0x0, 0x0004)
            self._mmio.write(0x0, 0x0000)
        while self.running:
            pass
        self._transfer_started = False

    def _clear_interrupt(self):
        self._mmio.write(self._offset + 4, 0x1000)

    def transfer(self, array, start=0, nbytes=0, cyclic=False):
        """Transfer memory with the DMA

        Transfer must only be called when the channel is halted
        For `nbytes`, 0 means everything after the starting point.

        If the AXI DMA is not configured for data re-alignment then a
        valid address must be aligned or undefined results occur.

        For MM2S (send), if Data Realignment Engine (DRE) is not included,
        the source address must be MM2S memory map data width aligned.

        For S2MM (recv), if Data Realignment Engine is not included,
        the destination address must be S2MM Memory Map data width aligned.

        For example, if memory map data width = 32, data is aligned if it is
        located at word offsets (32-bit offset), that is, 0x0, 0x4, 0x8, 0xC,
        and so forth.
        
        Cyclic Buffer Descriptor (BD) mode allows the DMA to loop through the 
        buffer descriptors without user intervention. As the DMA cycles through 
        the BDs indefinitely, the wait() function is not valid in this mode. 
        Instead, use the stop() function to terminate DMA operation. This mode 
        is only valid for the sendchannel.

        Parameters
        ----------
        array : ContiguousArray
            An contiguously allocated array to be transferred
        start : int
             Offset into array to start. Default is 0.
        nbytes : int
             Number of bytes to transfer. Default is 0.
        cyclic : bool
             Enable cyclic BD mode. Default is False.

        """

        if not self.halted:
            raise RuntimeError("DMA channel not halted")
        if nbytes == 0:
            nbytes = array.nbytes - start
        if not self._dre and ((array.physical_address + start) % self._align) != 0:
            raise RuntimeError(
                "DMA does not support unaligned transfers; "
                "Starting address must be aligned to "
                "{} bytes.".format(self._align)
            )
            
        self._cyclic = cyclic

        if self._cyclic and (self._tx_rx != DMA_TYPE_TX):
            raise RuntimeError('Cyclic BD mode only valid in sendchannel')
        
        # Figure out largest possible block size, and no. of descriptors needed
        remain = nbytes
        blk_size = self._max_size - (self._max_size % self._align)

        # We need to always have at least two descriptors!
        if blk_size > remain:
            blk_size = int(remain / 2)
            blk_size -= blk_size % self._align

        self._num_descr = int((remain + (blk_size - 1)) / blk_size)

        # Zero-Allocate buffer for descriptors: uint32[_num_descr][16]
        # Descriptor is only 52 bytes but each one has to be 64-byte aligned!
        self._descr = allocate(shape=(self._num_descr, 16), dtype=numpy.uint32)

        # Idle DMA engine
        self.stop()

        # Fill out descriptors
        for i in range(0, self._num_descr):
            # Next descriptor (64-bit)
            if self._cyclic and (i == (self._num_descr - 1)):
                # In cyclic BD mode the last descriptor points back the the first
                self._descr[i, 0] = (
                self._descr.physical_address) & 0xffffffff
                self._descr[i, 1] = (
                self._descr.physical_address >> 32) & 0xffffffff
            else:
                self._descr[i, 0] = (
                self._descr.physical_address + (((i + 1) % self._num_descr) * 16 * 4)
                ) & 0xFFFFFFFF
                self._descr[i, 1] = (
                self._descr.physical_address + (((i + 1) % self._num_descr) * 16 * 4)
                >> 32
                ) & 0xFFFFFFFF

            # Buffer length
            if remain > blk_size:
                d_len = blk_size
            else:
                d_len = remain
            self._descr[i, 6] = d_len

            remain -= d_len

            # Buffer address (64-bit)
            self._descr[i, 2] = (array.physical_address + (i * blk_size)) & 0xFFFFFFFF
            self._descr[i, 3] = (
                (array.physical_address + (i * blk_size)) >> 32
            ) & 0xFFFFFFFF

            # First block
            if i == 0:
                self._descr[i, 6] |= 1 << 27

            # Last Block
            if remain == 0:
                self._descr[i, 6] |= 1 << 26

        if self._flush_before:
            array.flush()

        # Flush DMA descriptors
        self._descr.flush()

        # Write first desc
        self._mmio.write(self._offset + 0x08, self._descr.physical_address & 0xFFFFFFFF)
        self._mmio.write(
            self._offset + 0x0C, (self._descr.physical_address >> 32) & 0xFFFFFFFF
        )

        self._active_buffer = array

        # Let's go!
        self.transferred = 0
        self.start()

        # Writing last desc triggers the descriptor fetches
        if self._cyclic:
            # In cyclic BD mode the tail descriptor register must be programmed
            # with a value which is not part of the BD chain.
            self._mmio.write(
                    self._offset + 0x10, 
                    (self._descr.physical_address + 
                        ((self._num_descr) * 16 * 4)) & 0xffffffff)
            self._mmio.write(
                    self._offset + 0x14, 
                    ((self._descr.physical_address +  
                        ((self._num_descr) * 16 * 4)) >> 32) & 0xffffffff)
        else:
            self._mmio.write(
                    self._offset + 0x10, 
                    (self._descr.physical_address + 
                        ((self._num_descr - 1) * 16 * 4)) & 0xffffffff)
            self._mmio.write(
                    self._offset + 0x14, 
                    ((self._descr.physical_address +  
                        ((self._num_descr - 1) * 16 * 4)) >> 32) & 0xffffffff)

    def wait(self):
        """Wait for the transfer to complete"""
        # wait not valid in cyclic BD mode.
        if self._cyclic:
            raise RuntimeError('Cannot wait in cyclic BD mode')
        if not self._transfer_started:
            raise RuntimeError("DMA transfer not started")
        while True:
            if self.error:
                error = self._mmio.read(self._offset + 4)
                if error & 0x10:
                    raise RuntimeError("DMA Internal Error (transfer length 0?)")
                if error & 0x20:
                    raise RuntimeError(
                        "DMA Slave Error (cannot access memory map interface)"
                    )
                if error & 0x40:
                    raise RuntimeError("DMA Decode Error (invalid address)")
                if error & 0x100:
                    raise RuntimeError(
                        "Scatter-Gather Internal Error "
                        "(re-used completed descriptor)"
                    )
                if error & 0x200:
                    raise RuntimeError(
                        "Scatter-Gather Slave Error "
                        "(cannot access memory map interface)"
                    )
                if error & 0x400:
                    raise RuntimeError(
                        "Scatter-Gather Decode Error " "(invalid descriptor address)"
                    )
            if self.idle or self.halted:
                break
        if not self._flush_before:
            self._active_buffer.invalidate()

        # Work out transferred length
        self._descr.flush()
        self.transferred = 0
        for i in range(0, self._num_descr):
            # XXX if micro mode, this doesn't apply. Count descriptors instead.
            if self._descr[i, 7] & 0x30000000:
                raise RuntimeError("DMA Error in descriptor")
            self.transferred += self._descr[i, 7] & 0x03FFFFFF

        # Ensure engine is idled
        self.stop()

        # Clean up descriptor buffer
        self._descr.close()
        self._descr = None

    async def wait_async(self):
        """Wait for the transfer to complete"""
        # wait not valid in cyclic BD mode.
        if self._cyclic:
            raise RuntimeError('Cannot wait in cyclic BD mode')
        if not self._transfer_started:
            raise RuntimeError("DMA transfer not started")
        while not (self.idle or self.halted):
            await self._interrupt.wait()
        self._clear_interrupt()
        if not self._flush_before:
            self._active_buffer.invalidate()

        # Work out transferred length
        self._descr.flush()
        self.transferred = 0
        for i in range(0, self._num_descr):
            # XXX if micro mode, this doesn't apply.  Count descriptors instead.
            if self._descr[i, 7] & 0x30000000:
                raise RuntimeError("DMA Error in descriptor")
            self.transferred += self._descr[i, 7] & 0x03FFFFFF

        # Ensure engine is idled
        self.stop()

        # Clean up descriptor buffer
        self._descr.close()
        self._descr = None


[docs]class DMA(DefaultIP): """Class for Interacting with the AXI Simple DMA Engine This class provides two attributes for the read and write channels. The read channel copies data from the stream into memory and the write channel copies data from memory to the output stream. Both channels have an identical API consisting of `transfer` and `wait` functions. If interrupts have been enabled and connected for the DMA engine then `wait_async` is also present. Buffers to be transferred must be a `PynqBuffer` object allocated through `pynq.allocate()` function either directly or indirectly. This means that Frames from the video subsystem can be transferred using this class. Attributes ---------- recvchannel : _SDMAChannel / _SGDMAChannel The stream to memory channel (if enabled in hardware) sendchannel : _SDMAChannel / _SGDMAChannel The memory to stream channel (if enabled in hardware) buffer_max_size : int The maximum DMA transfer length. """ def __init__(self, description, *args, **kwargs): """Create an instance of the DMA Driver For DMA, max transfer length is (2^sg_length_width -1). See PG021 tables 2-15, 2-25, 2-31 and 2-38. Parameters ---------- description : dict The entry in the IP dict describing the DMA engine """ if type(description) not in [dict, ReprDict] or args or kwargs: raise RuntimeError( "You appear to want the old DMA driver which " "has been deprecated and moved to " "pynq.lib.deprecated" ) super().__init__(description=description) self.description = description if "parameters" not in description: raise RuntimeError( "Unable to get parameters from description; " "Users must use *.hwh files for overlays." ) if "c_micro_dma" in description["parameters"]: self._micro = bool(int(description["parameters"]["c_micro_dma"])) else: self._micro = False if "c_include_sg" in description["parameters"]: self._sg = bool(int(description["parameters"]["c_include_sg"])) else: self._sg = False if self._micro and self._sg: raise UnsupportedConfiguration( "Micro and Scatter-gather modes not supported simultaneously." ) if "c_sg_length_width" in description["parameters"]: self.buffer_max_size = 1 << int( description["parameters"]["c_sg_length_width"] ) else: self.buffer_max_size = 1 << MAX_C_SG_LENGTH_WIDTH self.buffer_max_size -= 1 self.sendchannel = None self.recvchannel = None self.set_up_tx_channel() self.set_up_rx_channel()
[docs] def set_up_tx_channel(self): """Set up the transmit channel. If transmit channel is enabled, we will work out the max transfer size first. Then depending on (1) whether interrupt is enabled, and (2) whether SG mode is used, we will create the transmit channel. """ if "c_include_mm2s" in self.description["parameters"] and bool( int(self.description["parameters"]["c_include_mm2s"]) ): if "c_include_mm2s_dre" in self.description["parameters"]: dre = bool(int(self.description["parameters"]["c_include_mm2s_dre"])) else: dre = False data_width = ( int(self.description["parameters"]["c_m_axi_mm2s_data_width"]) >> 3 ) if self._micro: max_size = data_width * int( self.description["parameters"]["c_mm2s_burst_size"] ) else: max_size = self.buffer_max_size if "mm2s_introut" in self.description["interrupts"]: if self._sg: self.sendchannel = _SGDMAChannel( self.mmio, max_size, data_width, DMA_TYPE_TX, dre, self.mm2s_introut, ) else: self.sendchannel = _SDMAChannel( self.mmio, max_size, data_width, DMA_TYPE_TX, dre, self.mm2s_introut, ) else: if self._sg: self.sendchannel = _SGDMAChannel( self.mmio, max_size, data_width, DMA_TYPE_TX, dre ) else: self.sendchannel = _SDMAChannel( self.mmio, max_size, data_width, DMA_TYPE_TX, dre )
[docs] def set_up_rx_channel(self): """Set up the receive channel. If receive channel is enabled, we will work out the max transfer size first. Then depending on (1) whether interrupt is enabled, and (2) whether SG mode is used, we will create the receive channel. """ if "c_include_s2mm" in self.description["parameters"] and bool( int(self.description["parameters"]["c_include_s2mm"]) ): if "c_include_s2mm_dre" in self.description["parameters"]: dre = bool(int(self.description["parameters"]["c_include_s2mm_dre"])) else: dre = False data_width = ( int(self.description["parameters"]["c_m_axi_s2mm_data_width"]) >> 3 ) if self._micro: max_size = data_width * int( self.description["parameters"]["c_s2mm_burst_size"] ) else: max_size = self.buffer_max_size if "s2mm_introut" in self.description["interrupts"]: if self._sg: self.recvchannel = _SGDMAChannel( self.mmio, max_size, data_width, DMA_TYPE_RX, dre, self.s2mm_introut, ) else: self.recvchannel = _SDMAChannel( self.mmio, max_size, data_width, DMA_TYPE_RX, dre, self.s2mm_introut, ) else: if self._sg: self.recvchannel = _SGDMAChannel( self.mmio, max_size, data_width, DMA_TYPE_RX, dre ) else: self.recvchannel = _SDMAChannel( self.mmio, max_size, data_width, DMA_TYPE_RX, dre )
bindto = ["xilinx.com:ip:axi_dma:7.1"]