Source code for pynq.interrupt

#   Copyright (c) 2017, Xilinx, Inc.
#   All rights reserved.
#
#   Redistribution and use in source and binary forms, with or without
#   modification, are permitted provided that the following conditions are met:
#
#   1.  Redistributions of source code must retain the above copyright notice,
#       this list of conditions and the following disclaimer.
#
#   2.  Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#
#   3.  Neither the name of the copyright holder nor the names of its
#       contributors may be used to endorse or promote products derived from
#       this software without specific prior written permission.
#
#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
#   AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
#   THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
#   PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
#   CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
#   EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
#   PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
#   OR BUSINESS INTERRUPTION). HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
#   WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
#   OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
#   ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


import asyncio
import functools
import os
import weakref
from .pl import PL
from .mmio import MMIO

__author__ = "Peter Ogden"
__copyright__ = "Copyright 2017, Xilinx"
__email__ = "pynq_support@xilinx.com"


[docs]class Interrupt(object): """Class that provides the core wait-based API to end users Provides a single coroutine wait that waits until the interrupt signal goes high. If the Overlay is changed or re-downloaded this object is invalidated and waiting results in undefined behaviour.""" def __init__(self, pinname): """Initialise an Interrupt object attached to the specified pin Parameters ---------- pinname : string Fully qualified name of the pin in the block diagram of the for ${cell}/${pin}. Raises an exception if the pin cannot be found in the currently active Overlay """ if pinname not in PL.interrupt_pins: raise ValueError("No Pin of name {} found".format(pinname)) parentname = PL.interrupt_pins[pinname]['controller'] self.number = PL.interrupt_pins[pinname]['index'] self.parent = weakref.ref( _InterruptController.get_controller(parentname)) self.event = asyncio.Event() self.waiting = False
[docs] @asyncio.coroutine def wait(self): """Wait for the interrupt to be active May raise an exception if the Overlay has been changed since initialisation. """ parent = self.parent() if parent is None: raise RuntimeError("Interrupt invalidated by Overlay change") if not self.waiting: self.event.clear() parent.add_event(self.event, self.number) self.waiting = True yield from self.event.wait() self.waiting = False
# Implementation Details Follow def _get_uio_device(irq): """Returns the UIO device path for a specified interrupt If the IRQ either cannot be found or does not correspond to a UIO device, None is returned Parameters ---------- irq : int The desired physical interrupt line """ dev_names = None with open('/proc/interrupts', 'r') as f: for line in f: cols = line.split() if len(cols) >= 7: if cols[4] == str(irq): # Hack to work on multiple kernel versions dev_names = [cols[5], cols[6]] if dev_names is None: return None for dev in os.listdir("/sys/class/uio"): with open('/sys/class/uio/' + dev + '/name', 'r') as f: name = f.read().strip() if name in dev_names: return '/dev/' + dev return None class _UioController(object): """Class that interacts directly with a UIO device""" def __init__(self, devname): self.uio = open(devname, 'r+b', buffering=0) # Register callback with asyncio asyncio.get_event_loop().add_reader(self.uio, functools.partial( _UioController._uio_callback, self)) self.wait_events = [] def __del__(self): asyncio.get_event_loop().remove_reader(self.uio) self.uio.close() def _uio_callback(self): self.uio.read(4) current_events = self.wait_events self.wait_events = [] for e in current_events: e.set() def add_event(self, event, number): if not self.wait_events: self.uio.write(bytes([0, 0, 0, 1])) self.wait_events.append(event) class _InterruptController(object): """Class that interacts with an AXI interrupt controller This class is not designed to be interacted with by end users directly - most uses will be via the register_interrupt API which will handle the creation and registration of _InterruptController instances """ _controllers = [] _last_timestamp = None @staticmethod def get_controller(name): """Returns the _InterruptController corresponding to the AXI interrupt controller with the specified name. Will invalidate all interrupt controllers if the Overlay has been changed. Should not be accessed by user code. Parameters ---------- name : str Name of the interrupt controller to return """ bitstream_timestamp = PL.timestamp if bitstream_timestamp != _InterruptController._last_timestamp: _InterruptController._controllers.clear() _InterruptController._last_timestamp = bitstream_timestamp for con in _InterruptController._controllers: if con.name == name: return con ret = _InterruptController(name) _InterruptController._controllers.append(ret) return ret def __init__(self, name): """Return a new _InterruptController Returns a new _InterruptController. As these are singleton objects, this should never be called directly, instead register_interrupt should be used, or get_controller if direct access is required Parameters ---------- name : str Name of the interrupt controller to return """ self.name = name self.mmio = MMIO(PL.ip_dict[name]['phys_addr'], 32) self.wait_handles = [[] for _ in range(32)] self.event_number = 0 self.waiting = False # Enable global interrupt self.mmio.write(0x1C, 0x00000003) # Disable Interrupt lines self.mmio.write(0x08, 0) parent = PL.interrupt_controllers[name]['parent'] number = PL.interrupt_controllers[name]['index'] if parent == "": uiodev = _get_uio_device(61 + number) if uiodev is None: raise ValueError('Could not find UIO device for interrupt pin ' 'for IRQ number {}'.format(number)) self.parent = _UioController(uiodev) self.number = 0 else: self.parent = _InterruptController.get_controller(parent) self.number = number def set(self): """Mimics the set function of an event. Should not be called by user code Allows for chaining of interrupt controllers by looking like an event to the parent controller. Will re-add the event if there are still interrupts left outstanding """ # Pull pending interrupts irqs = self.mmio.read(0x04) # Call all active IRQs work = irqs irq = 0 while work != 0: if work % 2 == 1: # Disable the interrupt self.mmio.write(0x14, 1 << irq) events = self.wait_handles[irq] self.wait_handles[irq] = [] for e in events: e.set() self.event_number -= len(events) work = work >> 1 irq = irq + 1 # Acknowledge the interrupts self.mmio.write(0x0C, irqs) if self.event_number: self.parent.add_event(self, self.number) def add_event(self, event, number): """Registers an event against an interrupt line When the interrupt is active, all events are signaled and the interrupt line is disabled. End user classes should clear the interrupt before re-adding the event. Parameters ---------- event : object Any object that provides a set method to notify of an active interrupt number : int Interrupt number to register event against """ if not self.wait_handles[number]: self.mmio.write(0x10, 1 << number) if not self.event_number: self.parent.add_event(self, self.number) self.wait_handles[number].append(event) self.event_number += 1