# Copyright (c) 2016, Xilinx, Inc.
# SPDX-License-Identifier: BSD-3-Clause
import asyncio
import os
from pynq import GPIO
from pynq import Interrupt
from pynq import MMIO
from pynq import DefaultHierarchy
from pynq import PL
[docs]class MBInterruptEvent:
"""The class provides and asyncio Event-like interface to
the interrupt subsystem for a Microblaze. The event is set by
raising an interrupt and cleared using the clear function.
Typical use is to call clear prior to sending a request to
the Microblaze and waiting in a loop until the response is received.
This order of operations will avoid race conditions between the
Microblaze and the host code.
"""
def __init__(self, intr_pin, intr_ack_gpio):
"""Create a new _MBInterruptEvent object
Parameters
----------
intr_pin : str
Name of the interrupt pin for the Microblaze.
intr_ack_gpio : int
Number of the GPIO pin used to clear the interrupt.
"""
self.interrupt = Interrupt(intr_pin)
self.gpio = GPIO(GPIO.get_gpio_pin(intr_ack_gpio), "out")
[docs] async def wait(self):
"""Coroutine to wait until the event is set by an interrupt."""
await self.interrupt.wait()
[docs] def clear(self):
"""Clear the interrupt and reset the event. Resetting the event
should be done before sending a request that will be acknowledged
interrupts.
"""
self.gpio.write(1)
self.gpio.write(0)
[docs]class PynqMicroblaze:
"""This class controls the active Microblaze instances in the system.
Attributes
----------
ip_name : str
The name of the IP corresponding to the Microblaze.
rst_name : str
The name of the reset pin for the Microblaze.
mb_program : str
The absolute path of the Microblaze program.
state : str
The status (IDLE, RUNNING, or STOPPED) of the Microblaze.
reset_pin : GPIO
The reset pin associated with the Microblaze.
mmio : MMIO
The MMIO instance associated with the Microblaze.
interrupt : Event
An asyncio.Event-like class for waiting on and clearing interrupts.
"""
def __init__(self, mb_info, mb_program, force=False):
"""Create a new Microblaze object.
It looks for active instances on the same Microblaze, and prevents
users from silently reloading the Microblaze program. Users are
notified with an exception if a program is already running on the
selected Microblaze, to prevent unwanted behavior.
Two cases:
1. No previous Microblaze program loaded in the system,
or users want to request another instance using the same program.
No exception will be raised in this case.
2. There is a previous Microblaze program loaded in the system.
Users want to request another instance with a different
program. An exception will be raised.
Note
----
When a Microblaze program is already loaded in the system, and users
want to instantiate another object using a different Microblaze
program, users are in danger of losing existing objects.
Parameters
----------
mb_info : dict
A dictionary storing Microblaze information, such as the
IP name and the reset name.
mb_program : str
The Microblaze program loaded for the processor.
Raises
------
RuntimeError
When another Microblaze program is already loaded.
Examples
--------
The `mb_info` is a dictionary storing Microblaze information:
>>> mb_info = {'ip_name': 'mb_bram_ctrl_1',
'rst_name': 'mb_reset_1',
'intr_pin_name': 'iop1/dff_en_reset_0/q',
'intr_ack_name': 'mb_1_intr_ack'}
"""
mem_dict = PL.mem_dict
gpio_dict = PL.gpio_dict
intr_dict = PL.interrupt_pins
# Check program path
if not os.path.isfile(mb_program):
raise ValueError("{} does not exist.".format(mb_program))
# Get IP information
ip_name = mb_info["ip_name"]
if ip_name not in mem_dict.keys():
raise ValueError("No such IP {}.".format(ip_name))
addr_base = mem_dict[ip_name]["base_address"]
addr_range = mem_dict[ip_name]["size"]
ip_state = mem_dict[ip_name]["state"]
# Get reset information
rst_name = mb_info["rst_name"]
if rst_name not in gpio_dict.keys():
raise ValueError("No such reset pin {}.".format(rst_name))
gpio_uix = gpio_dict[rst_name]["index"]
# Get interrupt pin information
if "intr_pin_name" in mb_info:
intr_pin_name = mb_info["intr_pin_name"]
if intr_pin_name not in intr_dict.keys():
raise ValueError("No such interrupt pin {}.".format(intr_pin_name))
else:
intr_pin_name = None
# Get interrupt ACK information
if "intr_ack_name" in mb_info:
intr_ack_name = mb_info["intr_ack_name"]
if intr_ack_name not in gpio_dict.keys():
raise ValueError("No such interrupt ACK {}.".format(intr_ack_name))
intr_ack_gpio = gpio_dict[intr_ack_name]["index"]
else:
intr_ack_gpio = None
# Set basic attributes
self.ip_name = ip_name
self.rst_name = rst_name
self.mb_program = mb_program
self.state = "IDLE"
self.reset_pin = GPIO(GPIO.get_gpio_pin(gpio_uix), "out")
self.mmio = MMIO(addr_base, addr_range)
# Check to see if Microblaze in user
if (ip_state is not None) and (ip_state != mb_program):
if force:
self.reset()
else:
raise RuntimeError(
"Another program {} already running.".format(ip_state)
)
# Set optional attributes
if (intr_pin_name is not None) and (intr_ack_gpio is not None):
self.interrupt = MBInterruptEvent(intr_pin_name, intr_ack_gpio)
else:
self.interrupt = None
# Reset, program, and run
self.program()
[docs] def run(self):
"""Start the Microblaze to run program loaded.
This method will update the status of the Microblaze.
Returns
-------
None
"""
self.state = "RUNNING"
self.reset_pin.write(0)
[docs] def reset(self):
"""Reset the Microblaze to stop it from running.
This method will update the status of the Microblaze.
Returns
-------
None
"""
self.state = "STOPPED"
self.reset_pin.write(1)
[docs] def program(self):
"""This method programs the Microblaze.
This method is called in __init__(); it can also be called after that.
It uses the attribute `self.mb_program` to program the Microblaze.
Returns
-------
None
"""
self.reset()
PL.load_ip_data(self.ip_name, self.mb_program, zero=True)
if self.interrupt:
self.interrupt.clear()
self.run()
[docs] def write(self, offset, data):
"""This method write data into the shared memory of the Microblaze.
Parameters
----------
offset : int
The beginning offset where data are written into.
data : int/list
A list of 32b words to be written.
Returns
-------
None
"""
if type(data) is int:
self.mmio.write(offset, data)
elif type(data) is list:
for i, word in enumerate(data):
self.mmio.write(offset + 4 * i, word)
else:
raise ValueError("Type of write data has to be int or lists.")
[docs] def read(self, offset, length=1):
"""This method reads data from the shared memory of Microblaze.
Parameters
----------
offset : int
The beginning offset where data are read from.
length : int
The number of data (32-bit int) to be read.
Returns
-------
int/list
An int of a list of data read from the shared memory.
"""
if length == 1:
return self.mmio.read(offset)
elif length > 1:
return [self.mmio.read(offset + 4 * i) for i in range(length)]
else:
raise ValueError("Length of read data has to be 1 or more.")
[docs]class MicroblazeHierarchy(DefaultHierarchy):
"""Hierarchy driver for the microblaze subsystem.
Enables the user to `load` programs on to the microblaze. All function
calls and member accesses are delegated to the loaded program.
"""
def __init__(self, description, mbtype="Unknown"):
super().__init__(description)
hier = description["fullpath"]
if hier.count("/") > 0:
parent, _, ip = hier.rpartition("/")
container = "{}/".format(parent)
else:
container = ""
ip = hier
self.mb_info = {
"ip_name": "{}/mb_bram_ctrl".format(hier),
"rst_name": "{}mb_{}_reset".format(container, ip),
"intr_pin_name": "{}/dff_en_reset_vector_0/q".format(hier),
"intr_ack_name": "{}mb_{}_intr_ack".format(container, ip),
"mbtype": mbtype,
"name": hier,
}
@property
def mbtype(self):
"""The defined type of the microblaze subsystem. Used by driver
programs to limit what microblaze subsystems the program is run on.
"""
return self.mb_info["mbtype"]
@mbtype.setter
def mbtype(self, value):
self.mb_info["mbtype"] = value
[docs] @staticmethod
def checkhierarchy(description):
return "mb_bram_ctrl" in description["memories"]