Source code for pynq.lib.logictools.waveform

#   Copyright (c) 2016, Xilinx, Inc.
#   All rights reserved.
#
#   Redistribution and use in source and binary forms, with or without
#   modification, are permitted provided that the following conditions are met:
#
#   1.  Redistributions of source code must retain the above copyright notice,
#       this list of conditions and the following disclaimer.
#
#   2.  Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#
#   3.  Neither the name of the copyright holder nor the names of its
#       contributors may be used to endorse or promote products derived from
#       this software without specific prior written permission.
#
#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
#   AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
#   THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
#   PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
#   CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
#   EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
#   PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
#   OR BUSINESS INTERRUPTION). HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
#   WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
#   OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
#   ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


from copy import deepcopy
import os
import re
import json
import subprocess
import base64
from xml.dom import minidom
import numpy as np
import IPython.core.display
import IPython.display
from .constants import *


__author__ = "Yun Rock Qu"
__copyright__ = "Copyright 2017, Xilinx"
__email__ = "pynq_support@xilinx.com"


PYNQ_JUPYTER_NOTEBOOKS = '/home/xilinx/jupyter_notebooks'


[docs]def bitstring_to_wave(bitstring): """Function to convert a pattern consisting of `0`, `1` into a sequence of `l`, `h`, and dots. For example, if the bit string is "010011000111", then the result will be "lhl.h.l..h..". Returns ------- str New wave tokens with valid tokens and dots. """ substitution_map = {'0': 'l', '1': 'h', '.': '.'} def insert_dots(match): return substitution_map[match.group()[0]] + \ '.' * (len(match.group()) - 1) bit_regex = re.compile(r'[0][0]*|[1][1]*') return re.sub(bit_regex, insert_dots, bitstring)
[docs]def wave_to_bitstring(wave): """Function to convert a pattern consisting of `l`, `h`, and dot to a sequence of `0` and `1`. Parameters ---------- wave : str The input string to convert. Returns ------- str A bit sequence of 0's and 1's. """ substitution_map = {'l': '0', 'h': '1'} def delete_dots(match): return substitution_map[match.group()[0]] * len(match.group()) wave_regex = re.compile(r'[l]\.*|[h]\.*') return re.sub(wave_regex, delete_dots, wave)
[docs]def bitstring_to_int(bitstring): """Function to convert a bit string to integer list. For example, if the bit string is '0110', then the integer list will be [0,1,1,0]. Parameters ---------- bitstring : str The input string to convert. Returns ------- list A list of elements, each element being 0 or 1. """ return [int(i, 10) for i in list(bitstring)]
[docs]def int_to_sample(bits): """Function to convert a bit list into a multi-bit sample. Example: [1, 1, 1, 0] will be converted to 7, since the LSB of the sample appears first in the sequence. Parameters ---------- bits : list A list of bits, each element being 0 or 1. Returns ------- int A numpy uint32 converted from the bit samples. """ return np.uint32(int("".join(map(str, list(bits[::-1]))), 2))
def _verify_wave_tokens(wave_lane): """Validate tokens in a WaveLane string. Parameters ---------- wave_lane : str A string consisting of the WaveLane tokens. Returns ------- Boolean True if all the tokens in the WaveLane are valid. """ valid_tokens = {'l', 'h', '.'} wave_lane_tokens = list(wave_lane) for token in wave_lane_tokens: if token not in valid_tokens: raise ValueError('Valid tokens are: {}'.format(valid_tokens))
[docs]def draw_wavedrom(data): """Display the waveform using the Wavedrom package. Users can call this method directly to draw any wavedrom data. Example usage: >>> a = { 'signal': [ {'name': 'clk', 'wave': 'p.....|...'}, {'name': 'dat', 'wave': 'x.345x|=.x', 'data': ['head', 'body', 'tail', 'data']}, {'name': 'req', 'wave': '0.1..0|1.0'}, {}, {'name': 'ack', 'wave': '1.....|01.'} ]} >>> draw_wavedrom(a) More information can be found at: https://github.com/witchard/nbwavedrom Parameters ---------- data : dict A dictionary of data as shown in the example. """ data = _dump_json_data(data) phantomjs = _find_phantomjs() if phantomjs: wavedrom_cli = _find_wavedrom_cli() return _draw_phantomjs(data, phantomjs, wavedrom_cli) else: return _draw_javascript(data)
def _dump_json_data(data): """Convert the data into Json dump. Parameters ---------- data : dict A dictionary of the Json formatted data. Returns ------- str A Json dump of the original data. """ return json.dumps(data) def _draw_javascript(data): """Display the waveform using the Wavedrom Javascript. This method requires 2 javascript files to be present. We get the relative paths for the 2 files in order to proceed. Users can call this method directly to draw any wavedrom data. Parameters ---------- data : str A dump of a Json formatted data. """ wavedrom_js = 'wavedrom.js' wavedromskin_js = 'wavedromskin.js' if not (_is_javascript_present(wavedrom_js) and _is_javascript_present(wavedromskin_js)): _copy_javascripts() current_path = os.getcwd() relative_path = os.path.relpath(PYNQ_JUPYTER_NOTEBOOKS, current_path) htmldata = '<script type="WaveDrom">' + data + '</script>' IPython.core.display.display_html(IPython.core.display.HTML(htmldata)) jsdata = 'WaveDrom.ProcessAll();' IPython.core.display.display_javascript( IPython.core.display.Javascript( data=jsdata, lib=[relative_path + '/js/wavedrom.js', relative_path + '/js/wavedromskin.js'])) def _is_javascript_present(javascript_name): """Check whether the Javascripts are present in the notebook folder. Parameters ---------- javascript_name : str The name of the JS file. Returns ------- bool True if the specified javascript is present. """ file_path = os.path.join(PYNQ_JUPYTER_NOTEBOOKS, 'js', javascript_name) return os.path.isfile(file_path) def _copy_javascripts(): """Copy the required javascripts from the pynq package to notebook folder. This method is only required when rendering the wavedrom using javascripts. This is not required for PhantomJS. """ src_folder = os.path.join( os.path.dirname(os.path.realpath(__file__)), 'js') dst_folder = PYNQ_JUPYTER_NOTEBOOKS if os.system('cp -rf ' + src_folder + ' ' + dst_folder): raise RuntimeError('Cannot copy the javascripts.') def _draw_phantomjs(data, phantomjs, wavedrom_cli): """Draw the wavedrom using PhantomJS. This method requires the PhantomJS to be properly installed on the board. Parameters ---------- data : str A dump of a Json formatted data. phantomjs : str The absolute path of the PhantomJS executable. wavedrom_cli : str The absolute path of the Wavedrom-cli Javascript. """ prog = subprocess.Popen([ phantomjs, wavedrom_cli, '-i', '-', '-s', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) svg, _ = prog.communicate(data.encode('utf-8')) # This code based on IPython.core.display.SVG x = minidom.parseString(svg) found_svg = x.getElementsByTagName('svg') if found_svg: svg = found_svg[0].toxml() else: # fallback on the input, trust the user # but this is probably an error. pass svgdata = base64.b64encode(svg.encode('utf-8')).decode('ascii') htmldata = ('<div class="output_svg">' '<img class="svg" style="max-width: none"' 'src="data:image/svg+xml;base64,{0}" alt="Image"></img>' '</div>').format(svgdata) IPython.display.display(IPython.display.HTML(htmldata)) def _find_wavedrom_cli(): """Get path for the Wavedrom CLI Javascript file. For more information, please check: https://github.com/witchard/nbwavedrom Parameters ---------- str The name of the JS file. Returns ------- str The full path of the JS file. """ jsfile = 'wavedrom-cli.js' base = os.path.dirname(os.path.realpath(__file__)) return os.path.join(base, 'js', jsfile) def _find_phantomjs(): """Find the PhantomJS executable path. Returns ------- str The path of the PhantomJS executable file. """ program = 'phantomjs' for path in os.environ['PATH'].split(os.pathsep): path = path.strip('"') exe_file = os.path.join(path, program) if _is_exe(exe_file): return exe_file return None def _is_exe(path): """Check whether the file is accessible. Parameters ---------- path : str The path of the file. Returns ------- bool The file can be found at the specified path and can be accessed. """ return os.path.isfile(path) and os.access(path, os.X_OK)
[docs]class Waveform: """A wrapper class for Wavedrom package and interfacing functions. This class wraps the key functions of the Wavedrom package, including waveform display, bit pattern converting, csv converting, etc. A typical example of the waveform dictionary is: >>> loopback_test = {'signal': [ ['stimulus', {'name': 'clk0', 'pin': 'D0', 'wave': 'lh' * 64}, {'name': 'clk1', 'pin': 'D1', 'wave': 'l.h.' * 32}, {'name': 'clk2', 'pin': 'D2', 'wave': 'l...h...' * 16}, {'name': 'clk3', 'pin': 'D3', 'wave': 'l.......h.......' * 8}], ['analysis', {'name': 'clk15', 'pin': 'D15'}, {'name': 'clk16', 'pin': 'D16'}, {'name': 'clk17', 'pin': 'D17'}, {'name': 'clk18', 'pin': 'D18'}, {'name': 'clk19', 'pin': 'D19'}] ], 'foot': {'tock': 1}, 'head': {'text': 'Loopback Test'}} Attributes ---------- waveform_dict : dict The json data stored in the dictionary. intf_spec : dict The interface specification, e.g., PYNQZ1_LOGICTOOLS_SPECIFICATION. stimulus_group_name : str Name of the WaveLane group for the stimulus, defaulted to `stimulus`. analysis_group_name : str Name of the WaveLane group for the analysis, defaulted to `analysis`. stimulus_group : list A group of lanes, each lane being a dict of name, pin label,and wave. analysis_group : list A group of lanes, each lane being a dict of name, pin label,and wave. """ def __init__(self, waveform_dict, intf_spec_name='PYNQZ1_LOGICTOOLS_SPECIFICATION', stimulus_group_name=None, analysis_group_name=None): """Initializer for this wrapper class. Parameters ---------- waveform_dict : dict Waveform dictionary in WaveJSON format. intf_spec_name : str The name of the interface specification. stimulus_group_name : str Name of the WaveLane group for the stimulus, defaulted to `stimulus`. analysis_group_name : str Name of the WaveLane group for the analysis, defaulted to `analysis`. """ self.waveform_dict = deepcopy(waveform_dict) self.stimulus_group_name = stimulus_group_name self.analysis_group_name = analysis_group_name self.intf_spec = eval(intf_spec_name) if self.stimulus_group_name is not None: self._verify_lanes(stimulus_group_name) if self.analysis_group_name is not None: self._verify_lanes(analysis_group_name)
[docs] def display(self): """Display the waveform using the Wavedrom package. This package requires 2 javascript files to be copied locally. """ draw_wavedrom(self.waveform_dict)
def _get_wavelane_group(self, group_name): """Return the WaveLane group if present in waveform_dict. Typical group names are `stimulus` and `analysis` by default. The returned WaveLane group looks like: [{'name': 'dat', 'pin': 'D1', 'wave': 'l...h...lhlh'}, {'name': 'req', 'pin': 'D2', 'wave': 'lhlhlhlh....'}] Parameters ---------- group_name : str Name of the WaveLane group. Returns ------- list A list of lanes, each lane being a dictionary of name, pin label, and wave. """ for group in self.waveform_dict['signal']: if group and (group[0] == group_name): return group[1:] return [] @property def stimulus_group(self): """Return the stimulus WaveLane group. A stimulus group looks like: [{'name': 'dat', 'pin': 'D1', 'wave': 'l...h...lhlh'}, {'name': 'req', 'pin': 'D2', 'wave': 'lhlhlhlh....'}] Returns ------- list A list of lanes, each lane being a dictionary of name, pin label, and wave. """ return self._get_wavelane_group(self.stimulus_group_name) @property def analysis_group(self): """Return the analysis WaveLane group. An analysis group looks like: [{'name': 'dat', 'pin': 'D1', 'wave': 'l...h...lhlh'}, {'name': 'req', 'pin': 'D2', 'wave': 'lhlhlhlh....'}] Returns ------- list A list of lanes, each lane being a dictionary of name, pin label, and wave. """ return self._get_wavelane_group(self.analysis_group_name) def _get_wavelane_names(self, group_name): """Returns all the names of a given group of WaveLanes. The returned names are in the same order as in the waveform dictionary. Parameters ---------- group_name : str The name of the group. Returns ------- list A list of names for all the WaveLanes in that group. """ wavelanes = self._get_wavelane_group(group_name) wavelane_names = [wavelane['name'] for wavelane in wavelanes] return wavelane_names @property def stimulus_names(self): """Returns all the names of the stimulus WaveLanes. The returned names are in the same order as in the waveform dictionary. Returns ------- list A list of names for all the stimulus WaveLanes. """ return self._get_wavelane_names(self.stimulus_group_name) @property def analysis_names(self): """Returns all the names of the analysis WaveLanes. The returned names are in the same order as in the waveform dictionary. Returns ------- list A list of names for all the analysis WaveLanes. """ return self._get_wavelane_names(self.analysis_group_name) def _get_wavelane_pins(self, group_name): """Returns all the pin labels of a given group of WaveLanes. The returned pin labels are in the same order as in the waveform dictionary. Parameters ---------- group_name : str The name of the group. Returns ------- list A list of pin labels for all the WaveLanes of a specified group. """ wavelanes = self._get_wavelane_group(group_name) wavelane_pins = [wavelane['pin'] for wavelane in wavelanes] return wavelane_pins @property def stimulus_pins(self): """Returns all the pin labels of the stimulus WaveLanes. The returned pin labels are in the same order as in the waveform dictionary. Returns ------- list A list of pin labels for all the stimulus WaveLanes. """ return self._get_wavelane_pins(self.stimulus_group_name) @property def analysis_pins(self): """Returns all the pin labels of the analysis WaveLanes. The returned pin labels are in the same order as in the waveform dictionary. Returns ------- list A list of pin labels for all the analysis WaveLanes. """ return self._get_wavelane_pins(self.analysis_group_name) def _get_wavelane_waves(self, group_name): """Returns all the waves for a specific group of WaveLanes. The returned waves are in the same order as in the waveform dictionary. Parameters ---------- group_name : str The name of the group. Returns ------- list A list of waves for all the stimulus WaveLanes. """ wavelanes = self._get_wavelane_group(group_name) wavelane_waves = [wavelane['wave'] for wavelane in wavelanes] return wavelane_waves @property def stimulus_waves(self): """Returns all the waves of the stimulus WaveLanes. The returned waves are in the same order as in the waveform dictionary. Returns ------- list A list of waves for all the stimulus WaveLanes. """ return self._get_wavelane_waves(self.stimulus_group_name) @property def analysis_waves(self): """Returns all the waves of the analysis WaveLanes. The returned waves are in the same order as in the waveform dictionary. Returns ------- list A list of waves for all the analysis WaveLanes. """ return self._get_wavelane_waves(self.analysis_group_name) def _verify_lanes(self, group_name): """Verify the pin labels, names, and tokens for all lanes in the group. Typical group names are `stimulus` and `analysis` by default. Parameters ---------- group_name: str name of lane group whose pin labels will be verified. Raises ------ ValueError Raises this error when the group name is not valid, or the pin label is not valid, or there are duplicated pin labels, duplicated lane names, or the wave token is not valid. """ if group_name == self.stimulus_group_name: valid_pins = self.intf_spec['traceable_io_pins'] elif group_name == self.analysis_group_name: valid_pins = self.intf_spec['traceable_io_pins'] else: raise ValueError("Valid group names are {},{}.".format( self.stimulus_group_name, self.analysis_group_name)) lane_group = self._get_wavelane_group(group_name) pin_labels = set() lane_names = set() for lane in lane_group: # Verify that each pin label maps to an external pin if lane['pin'] not in valid_pins: raise ValueError("Pin label {} in Lane {} is invalid." .format(lane['pin'], lane['name'])) # Verify the wave tokens if 'wave' in lane: _verify_wave_tokens(lane['wave']) # Add lane names and pin labels lane_names.add(lane['name']) pin_labels.add(lane['pin']) # Verify that each lane has a unique pin label if len(pin_labels) < len(lane_group): raise ValueError("Duplicate pin labels in group {}." .format(group_name)) # Verify that each lane has a unique name if len(lane_names) < len(lane_group): raise ValueError("Duplicate lane names in group {}." .format(group_name))
[docs] def update(self, group_name, wavelane_group): """Update waveform dictionary based on the specified WaveLane group. A typical use case of this method is that it gets the output returned by the analyzer and refreshes the data stored in the dictionary. Since the analyzer only knows the pin labels, the data returned from the pattern analyzer is usually of format: [{'name': '', 'pin': 'D1', 'wave': 'l...h...lhlh'}, {'name': '', 'pin': 'D2', 'wave': 'lhlhlhlh....'}] Note the all the lanes should have the same number of samples. Note each lane in the analysis group has its pin number. Based on this information, this function only updates the lanes specified. Parameters ---------- group_name : str The name of the WaveLane group to be updated. wavelane_group : list The WaveLane group specified for updating. """ pin_to_name = {} updated_group = [group_name] for group in self.waveform_dict['signal']: if group and (group[0] == group_name): for wavelane in group[1:]: name, pin = wavelane['name'], wavelane['pin'] pin_to_name[pin] = name for pin in pin_to_name: for wavelane in wavelane_group: if pin == wavelane['pin']: wave = wavelane['wave'] updated_dict = {'name': pin_to_name[pin], 'pin': pin, 'wave': wave} updated_group.append(updated_dict) break break for index, group in enumerate(self.waveform_dict['signal']): if group and (group[0] == group_name): self.waveform_dict['signal'][index] = updated_group
[docs] def append(self, group_name, wavelane_group): """Append new data to the existing waveform dictionary. A typical use case of this method is that it gets the output returned by the analyzer and append new data to the dictionary. Since the analyzer only knows the pin labels, the data returned from the pattern analyzer is usually of format: [{'name': '', 'pin': 'D1', 'wave': 'l...h...lhlh'}, {'name': '', 'pin': 'D2', 'wave': 'lhlhlhlh....'}] Note the all the lanes should have the same number of samples. Note each lane in the analysis group has its pin number. Based on this information, this function only updates the lanes specified. Parameters ---------- group_name : str The name of the WaveLane group to be updated. wavelane_group : list The WaveLane group specified for updating. """ pin_to_name = {} pin_to_wave = {} updated_group = [group_name] for group in self.waveform_dict['signal']: if group and (group[0] == group_name): for wavelane in group[1:]: name, pin = wavelane['name'], wavelane['pin'] if 'wave' in wavelane: pin_to_wave[pin] = wavelane['wave'] else: pin_to_wave[pin] = '' pin_to_name[pin] = name for pin in pin_to_name: for wavelane in wavelane_group: if pin == wavelane['pin']: wave = wavelane['wave'] if pin_to_wave[pin]: merged_wave = bitstring_to_wave( wave_to_bitstring(pin_to_wave[pin]) + wave_to_bitstring(wave)) else: merged_wave = wave updated_dict = {'name': pin_to_name[pin], 'pin': pin, 'wave': merged_wave} updated_group.append(updated_dict) break break for index, group in enumerate(self.waveform_dict['signal']): if group and (group[0] == group_name): self.waveform_dict['signal'][index] = updated_group
[docs] def clear_wave(self, group_name): """Clear the wave in the existing waveform dictionary. This method will clear the wave stored in each wavelane, so that a brand-new waveform dict can be constructed. Annotation is assumed to have an empty name, so the entire annotation lane will get deleted in this method. Parameters ---------- group_name : str The name of the WaveLane group to be updated. """ for index, group in enumerate(self.waveform_dict['signal']): if group and (group[0] == group_name): for lane_index, wavelane in enumerate(group): if type(wavelane) is dict: if 'wave' in wavelane: self.waveform_dict[ 'signal'][index][lane_index]['wave'] = '' if not wavelane['name']: del self.waveform_dict['signal'][index][lane_index]
[docs] def annotate(self, group_name, wavelane_group): """Add annotation to the existing waveform dictionary. This method will add annotation wavelane into the specified group. Usually this is used in combination with the trace analyzer. The annotation usually has the following format: [{name: '', wave: 'x.444x4.x', data: ['read', 'write', 'read', 'data']}] Parameters ---------- group_name : str The name of the WaveLane group to be updated. wavelane_group : list The WaveLane group specified for updating. """ for index, group in enumerate(self.waveform_dict['signal']): if group and (group[0] == group_name): self.waveform_dict['signal'][index].append(wavelane_group)