# Copyright (c) 2017, Xilinx, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION). HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from pynq import DefaultIP
import os
import cffi
import functools
import signal
import numpy as np
import warnings
from pynq.ps import CPU_ARCH_IS_SUPPORTED, CPU_ARCH
__author__ = 'Peter Ogden, Anurag Dubey'
__copyright__ = 'Copyright 2017, Xilinx'
__email__ = 'pynq_support@xilinx.com'
[docs]class timeout:
"""Internal timeout functions.
This class is only used internally.
"""
def __init__(self, seconds=1, error_message='Timeout'):
self.seconds = seconds
self.error_message = error_message
[docs] def handle_timeout(self, signum, frame):
raise TimeoutError(self.error_message)
def __enter__(self):
signal.signal(signal.SIGALRM, self.handle_timeout)
signal.alarm(self.seconds)
def __exit__(self, type, value, traceback):
signal.alarm(0)
[docs]class LegacyDMA:
"""Python class which controls DMA.
This is a generic DMA class that can be used to access main memory.
The DMA direction can be:
(0)`DMA_TO_DEV` : DMA sends data to PL.
(1)`DMA_FROM_DEV` : DMA receives data from PL.
(3)`DMA_BIDIRECTIONAL` : DMA can send/receive data from PL.
Attributes
----------
buf : cffi.FFI.CData
A pointer to physically contiguous buffer.
bufLength : int
Length of internal buffer in bytes.
phyAddress : int
Physical address of the DMA device.
DMAengine : cdata 'XAxiDma *'
DMA engine instance defined in C. Not to be directly modified.
DMAinstance : cdata 'XAxiDma_Config *'
DMA configuration instance struct. Not to be directly modified.
direction : int
The direction indicating whether DMA sends/receives data from PL.
Configuration : dict
Current DMAinstance configuration values.
Note
----
If this class is parsed on an unsupported architecture it will issue
a warning and leave the class variables libxlnk and libdma undefined
"""
ffi = cffi.FFI()
ffi.cdef("""
typedef unsigned int* XAxiDma_Bd[20];
typedef struct {
uint32_t ChanBase; /**< physical base address*/
int IsRxChannel; /**< Is this a receive channel */
volatile int RunState; /**< Whether channel is running */
int HasStsCntrlStrm; /**< Whether has stscntrl stream */
int HasDRE;
int DataWidth;
int Addr_ext;
uint32_t MaxTransferLen;
uint32_t* FirstBdPhysAddr; /**< Physical address of 1st BD in list */
uint32_t* FirstBdAddr; /**< Virtual address of 1st BD in list */
uint32_t* LastBdAddr; /**< Virtual address of last BD in the list */
uint32_t Length; /**< Total size of ring in bytes */
uint32_t* Separation; /**< Number of bytes between the starting
address of adjacent BDs */
XAxiDma_Bd *FreeHead; /**< First BD in the free group */
XAxiDma_Bd *PreHead; /**< First BD in the pre-work group */
XAxiDma_Bd *HwHead; /**< First BD in the work group */
XAxiDma_Bd *HwTail; /**< Last BD in the work group */
XAxiDma_Bd *PostHead; /**< First BD in the post-work group */
XAxiDma_Bd *BdaRestart; /**< BD to load when channel is started */
XAxiDma_Bd *CyclicBd; /**< Useful for Cyclic DMA operations */
int FreeCnt; /**< Number of allocatable BDs in free group */
int PreCnt; /**< Number of BDs in pre-work group */
int HwCnt; /**< Number of BDs in work group */
int PostCnt; /**< Number of BDs in post-work group */
int AllCnt; /**< Total Number of BDs for channel */
int RingIndex; /**< Ring Index */
int Cyclic; /**< Check for cyclic DMA Mode */
} XAxiDma_BdRing;
typedef struct {
uint32_t DeviceId;
uint32_t* BaseAddr;
int HasStsCntrlStrm;
int HasMm2S;
int HasMm2SDRE;
int Mm2SDataWidth;
int HasS2Mm;
int HasS2MmDRE;
int S2MmDataWidth;
int HasSg;
int Mm2sNumChannels;
int S2MmNumChannels;
int Mm2SBurstSize;
int S2MmBurstSize;
int MicroDmaMode;
int AddrWidth; /**< Address Width */
} XAxiDma_Config;
typedef struct XAxiDma {
uint32_t RegBase; /* Virtual base address of DMA engine */
int HasMm2S; /* Has transmit channel */
int HasS2Mm; /* Has receive channel */
int Initialized; /* Driver has been initialized */
int HasSg;
XAxiDma_BdRing TxBdRing;
XAxiDma_BdRing RxBdRing[16];
int TxNumChannels;
int RxNumChannels;
int MicroDmaMode;
int AddrWidth; /**< Address Width */
} XAxiDma;
int AxiDma_CfgInitialize(XAxiDma * InstancePtr, XAxiDma_Config *Config);
void AxiDma_Reset(XAxiDma * InstancePtr);
int AxiDma_ResetIsDone(XAxiDma * InstancePtr);
int AxiDma_Pause(XAxiDma * InstancePtr);
int AxiDma_Resume(XAxiDma * InstancePtr);
uint32_t AxiDma_Busy(XAxiDma *InstancePtr,int Direction);
uint32_t AxiDma_SimpleTransfer(XAxiDma *InstancePtr, uint32_t* BuffPtr,
uint32_t Length, int Direction);
int AxiDma_SelectKeyHole(XAxiDma *InstancePtr, int Direction,
int Select);
int AxiDma_SelectCyclicMode(XAxiDma *InstancePtr, int Direction,
int Select);
int AxiDma_Selftest(XAxiDma * InstancePtr);
void AxiDma_DisableInterruptsAll(XAxiDma * InstancePtr);
""")
LIB_SEARCH_PATH = os.path.dirname(os.path.realpath(__file__))
if CPU_ARCH_IS_SUPPORTED:
libdma = ffi.dlopen(LIB_SEARCH_PATH + "/libdma.so")
else:
warnings.warn("Pynq does not support the CPU Architecture: {}"
.format(CPU_ARCH), ResourceWarning)
DMA_TO_DEV = 0
DMA_FROM_DEV = 1
DMA_BIDIRECTIONAL = 3
DMA_TRANSFER_LIMIT_BYTES = 8388607
DeviceId = 0
DefaultConfig = {
'DeviceId': 0,
'BaseAddr': ffi.cast("uint32_t *", 0x00000000),
'HasStsCntrlStrm': 0,
'HasMm2S': 0,
'HasMm2SDRE': 0,
'Mm2SDataWidth': 32,
'HasS2Mm': 1,
'HasS2MmDRE': 0,
'S2MmDataWidth': 64,
'HasSg': 0,
'Mm2sNumChannels': 1,
'S2MmNumChannels': 1,
'Mm2SBurstSize': 16,
'S2MmBurstSize': 64,
'MicroDmaMode': 0,
'AddrWidth': 32
}
memapi = cffi.FFI()
memapi.cdef("""
static uint32_t xlnkBufCnt = 0;
unsigned long cma_mmap(unsigned long phyAddr, uint32_t len);
uint32_t cma_munmap(void* buf, uint32_t len);
void *cma_alloc(uint32_t len, uint32_t cacheable);
unsigned long cma_get_phy_addr(void *buf);
void cma_free(void *buf);
uint32_t cma_pages_available(void);
""")
if CPU_ARCH_IS_SUPPORTED:
libxlnk = memapi.dlopen("/usr/lib/libsds_lib.so")
else:
warnings.warn("Pynq does not support the CPU Architecture: {}"
.format(CPU_ARCH), ResourceWarning)
def __init__(self, address, direction=DMA_FROM_DEV, attr_dict=None):
"""Initializes a new DMA object.
Uses the Default configuration parameters to initialize
a DMA. After initialization, the DMA is reset and the
interrupts are disabled for DMA.
The DMA direction can be:
(0)`DMA_TO_DEV` : DMA sends data to PL.
(1)`DMA_FROM_DEV` : DMA receives data from PL.
(3)`DMA_BIDIRECTIONAL` : DMA can send/receive data from PL.
The keys in `attr_dict` should exactly match the ones used in default
config. All the keys are not required. The default configuration is
defined in self.DefaultConfig dict. Users can reinitialize the DMA
with new configuratiuon after creating the object.
Parameters
----------
address: int
Physical address of the DMA IP.
direction : int
The direction indicating whether DMA sends/receives data from PL.
attr_dict : dict
An optional dictionary specifying DMA configuration values.
"""
self.buf = None
self._bufPtr = None
self.direction = direction
self.bufLength = None
self.phyAddress = address
self.DMAengine = self.ffi.new("XAxiDma *")
self.DMAinstance = self.ffi.new("XAxiDma_Config *")
self.Configuration = {}
self._TransferInitiated = 0
self._gen_config(address, direction, attr_dict)
status = self.libdma.AxiDma_CfgInitialize(self.DMAengine,
self.DMAinstance)
if status != 0:
raise RuntimeError("Failed to initialize DMA!")
self.libdma.AxiDma_Reset(self.DMAengine)
self.libdma.AxiDma_DisableInterruptsAll(self.DMAengine)
def _gen_config(self, address, direction, attr_dict):
"""Build configuration and map memory.
This is an internal method used for initialization and
should not be called by user.
"""
for key in self.DefaultConfig.keys():
self.DMAinstance.__setattr__(key, self.DefaultConfig[key])
if direction == self.DMA_TO_DEV:
self.DMAinstance.HasS2Mm = 0
self.DMAinstance.HasMm2S = 1
elif direction == self.DMA_BIDIRECTIONAL:
self.DMAinstance.HasS2Mm = 1
self.DMAinstance.HasMm2S = 1
self._bufPtr = None
self._TransferInitiated = 0
if attr_dict is not None:
if type(attr_dict) is dict:
for key in attr_dict.keys():
self.DMAinstance.__setattr__(key, attr_dict[key])
else:
raise ValueError("Expecting 3rd Arg to be a dict.")
virt = self.libxlnk.cma_mmap(address, 0x10000)
if virt == -1:
raise RuntimeError("Memory map of driver failed.")
self.DMAinstance.BaseAddr = self.ffi.cast("uint32_t *", virt)
self.DMAinstance.DeviceId = self.DeviceId
self.DeviceId += 1
for key in self.DefaultConfig.keys():
self.Configuration[key] = self.DMAinstance.__getattribute__(key)
def __del__(self):
"""Destructor for DMA object.
Frees the internal buffer and Resets the DMA.
Returns
-------
None
"""
if self.buf is not None and self.buf is not self.ffi.NULL:
self.free_buf()
self.libdma.AxiDma_Reset(self.DMAengine)
[docs] def transfer(self, num_bytes=-1, direction=-1):
"""Transfer data using DMA (Non-blocking).
Used to initiate transfer of data between a physically contiguous
buffer and PL. The buffer should be allocated using `create_buf`
or get_ndarray before this call.
The `num_bytes` defaults to the buffer size and be both
less than or equal to the buffer size and `DMA_TRANSFER_LIMIT_BYTES`.
Possible values for `direction` are:
(0)`DMA_TO_DEV` : DMA sends data to PL.
(1)`DMA_FROM_DEV` : DMA receives data from PL.
If the direction is not specified it uses the direction passed at
initialisation. This is not valid for bidirectional DMA.
Parameters
----------
num_bytes : int
Number of bytes to transfer.
direction : int
Direction in which DMA transfers data.
Returns
-------
None
"""
if num_bytes == -1:
num_bytes = self.bufLength
if direction == -1:
direction = self.direction
if num_bytes > self.bufLength:
raise RuntimeError("Buffer size smaller than the transfer size")
if num_bytes > self.DMA_TRANSFER_LIMIT_BYTES:
raise RuntimeError("DMA transfer size > {}".format(
self.DMA_TRANSFER_LIMIT_BYTES))
if direction not in [self.DMA_FROM_DEV, self.DMA_TO_DEV]:
raise RuntimeError("Invalid direction for transfer.")
self.direction = direction
if self.buf is not None:
if self.libdma.AxiDma_SimpleTransfer(
self.DMAengine, self._bufPtr,
num_bytes, self.direction) != 0:
raise RuntimeError("DMA transfer failed.")
self._TransferInitiated = 1
else:
raise RuntimeError("Buffer not allocated.")
[docs] def create_buf(self, num_bytes, cacheable=0):
"""Allocate physically contiguous memory buffer.
Allocates/Reallocates buffer needed for DMA operations.
Possible values for parameter `cacheable` are:
`1`: the memory buffer is cacheable.
`0`: the memory buffer is non-cacheable.
Note
----
This buffer is allocated inside the kernel space using
xlnk driver. The maximum allocatable memory is defined
at kernel build time using the CMA memory parameters.
For Pynq-Z1 kernel, it is specified as 128MB.
Parameters
----------
num_bytes : int
Length of the allocated array in bytes.
cacheable : int
Indicating whether or not the memory buffer is cacheable
Returns
-------
None
"""
if self.buf is None:
self.buf = self.libxlnk.cma_alloc(num_bytes, cacheable)
if self.buf == self.ffi.NULL:
raise RuntimeError("Memory allocation failed.")
else:
self.libxlnk.cma_free(self.buf)
self.buf = self.libxlnk.cma_alloc(num_bytes, cacheable)
buf_addr = self.libxlnk.cma_get_phy_addr(self.buf)
self._bufPtr = self.ffi.cast("uint32_t *", buf_addr)
self.bufLength = num_bytes
[docs] def free_buf(self):
"""Free the memory buffer associated with this object.
Use this to free a previously allocated memory buffer.
This is specially useful for reallocations.
Returns
-------
None
"""
if self.buf is None or self.buf == self.ffi.NULL:
return
self.libxlnk.cma_free(self.buf)
[docs] def wait(self, wait_timeout=10):
"""Block till DMA is busy or a timeout occurs.
Default value of timeout is 10 seconds.
Parameters
----------
wait_timeout : int
Time to wait in seconds before timing out wait operation.
Returns
-------
None
"""
if self._TransferInitiated == 0:
return
error = "DMA wait timed out."
with timeout(seconds=wait_timeout, error_message=error):
while True:
if self.libdma.AxiDma_Busy(self.DMAengine,
self.direction) == 0:
break
[docs] def get_buf(self, width=32):
"""Get a CFFI pointer to object's internal buffer.
This can be accessed like a regular array in python. The width can be
either 32 or 64.
Parameters
----------
width : int
The data width in the buffer.
Returns
-------
cffi.FFI.CData
An CFFI object which can be accessed similar to arrays in C.
"""
if self.buf is not None:
if width == 32:
return self.ffi.cast("unsigned int *", self.buf)
elif width == 64:
return self.ffi.cast("long long *", self.buf)
else:
raise RuntimeError("Buffer not created.")
[docs] def get_ndarray(self, shape=None, dtype=np.float32, cacheable=0):
"""Get a numpy ndarray of the DMA buffer, if shape is provided the
buffer is resized to fit the specified shape.
Parameters
----------
shape : int array
Shape of the numpy array to return
dtype : numpy.dtype
Type of the numpy array to return
cacheable : int
Passed to create_buf if a shape is provided
Returns
-------
numpy.ndarray
Numpy view of the DMA buffer
"""
if shape:
totalsize = (functools.reduce(lambda x, y: x * y, shape) *
dtype().itemsize)
self.create_buf(totalsize, cacheable)
if not self.buf:
raise RuntimeError("Buffer not created or shape not specified")
buffer = self.ffi.buffer(self.buf, self.bufLength)
ret = np.frombuffer(buffer, dtype=dtype)
ret.shape = shape
return ret
class _DMAChannel:
"""Drives a single channel of the Xilinx AXI DMA
This driver is designed to be used in conjunction with the
`Xlnk.cma_array` method of memory allocation. The channel has
main functions `transfer` and `wait` which start and wait for
the transfer to finish respectively. If interrupts are enabled
there is also a `wait_async` coroutine.
This class should not be constructed directly, instead used
through the AxiDMA class.
"""
def __init__(self, mmio, offset, flush_before, interrupt=None):
self._mmio = mmio
self._offset = offset
self._interrupt = interrupt
self._flush_before = flush_before
self.start()
@property
def running(self):
"""True if the DMA engine is currently running
"""
return self._mmio.read(self._offset + 4) & 0x01 == 0x00
@property
def idle(self):
"""True if the DMA engine is idle
`transfer` can only be called when the DMA is idle
"""
return self._mmio.read(self._offset + 4) & 0x02 == 0x02
def start(self):
"""Start the DMA engine if stopped
"""
if self._interrupt:
self._mmio.write(self._offset, 0x1001)
else:
self._mmio.write(self._offset, 0x0001)
while not self.running:
pass
self._first_transfer = True
def stop(self):
"""Stops the DMA channel and aborts the current transfer
"""
self._mmio.write(self._offset, 0x0000)
while self.running:
pass
def _clear_interrupt(self):
self._mmio.write(self._offset + 4, 0x1000)
def transfer(self, array):
"""Transfer memory with the DMA
Transfer must only be called when the channel is idle.
Parameters
----------
array : ContiguousArray
An xlnk allocated array to be transferred
"""
if not self.running:
raise RuntimeError('DMA channel not started')
if not self.idle and not self._first_transfer:
raise RuntimeError('DMA channel not idle')
if self._flush_before:
array.flush()
self._mmio.write(self._offset + 0x18, array.physical_address)
self._mmio.write(self._offset + 0x28, array.nbytes)
self._active_buffer = array
self._first_transfer = False
def wait(self):
"""Wait for the transfer to complete
"""
if not self.running:
raise RuntimeError('DMA channel not started')
while not self.idle:
pass
if not self._flush_before:
self._active_buffer.invalidate()
async def wait_async(self):
"""Wait for the transfer to complete
"""
if not self.running:
raise RuntimeError('DMA channel not started')
while not self.idle:
await self._interrupt.wait()
self._clear_interrupt()
if not self._flush_before:
self._active_buffer.invalidate()
[docs]class DMA(DefaultIP):
"""Class for Interacting with the AXI Simple DMA Engine
This class provides two attributes for the read and write channels.
The read channel copies data from the stream into memory and
the write channel copies data from memory to the output stream.
Both channels have an identical API consisting of `transfer` and
`wait` functions. If interrupts have been enabled and connected
for the DMA engine then `wait_async` is also present.
Buffers to be transferred must be allocated through the Xlnk driver
using the cma_array function either directly or indirectly. This
means that Frames from the video subsystem can be transferred using
this class.
Attributes
----------
recvchannel : _DMAChannel
The stream to memory channel
sendchannel : _DMAChannel
The memory to stream channel
"""
def __init__(self, description, *args, **kwargs):
"""Create an instance of the DMA Driver
Parameters
----------
description : dict
The entry in the IP dict describing the DMA engine
"""
if type(description) is not dict or args or kwargs:
raise RuntimeError('You appear to want the old DMA driver which '
'has been deprecated and moved to '
'pynq.lib.deprecated')
super().__init__(description=description)
if 'mm2s_introut' in description['interrupts']:
self.sendchannel = _DMAChannel(self.mmio, 0x0,
True,self.mm2s_introut)
else:
self.sendchannel = _DMAChannel(self.mmio, 0x0, True)
if 's2mm_introut' in description['interrupts']:
self.recvchannel = _DMAChannel(self.mmio, 0x30,
False, self.s2mm_introut)
else:
self.recvchannel = _DMAChannel(self.mmio, 0x30, False)
bindto = ['xilinx.com:ip:axi_dma:7.1']