# Copyright (c) 2020, Xilinx, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION). HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import json
import os
import shutil
import tempfile
import logging
import pkg_resources
import atexit
from distutils.dir_util import copy_tree, remove_tree, mkpath
from distutils.file_util import copy_file
from distutils.command.build import build as dist_build
from setuptools.command.build_py import build_py as _build_py
__author__ = "Giuseppe Natale"
__copyright__ = "Copyright 2020, Xilinx"
__email__ = "pynq_support@xilinx.com"
_function_text = """
import json
def _default_repr(obj):
return repr(obj)
def _resolve_global(name):
g = globals()
return g[name] if name in g else None
"""
class _ExtensionsManager:
"""Utility class to manage a list of available extensions registered for
discovery.
Parameters
----------
package_name: str
Name of the package to inspect for extensions
"""
def __init__(self, package_name):
self.package_name = package_name
self.list = [ext for ext in
pkg_resources.iter_entry_points(self.package_name)]
atexit.register(pkg_resources.cleanup_resources, force=True)
@staticmethod
def extension_path(extension_name):
"""Return the source path of the given extension name."""
# Define monkey patch for `pkg_resources.NullProvider.__init__` to use
# `module.__path__` instead of `module.__file__`, as the latter does
# not exist for namespace packages.
# Workaround for https://github.com/pypa/setuptools/issues/1407
def init(self, module):
self.loader = getattr(module, "__loader__", None)
module_path = [p for p in getattr(module, "__path__", "")][0]
self.module_path = module_path
# Temporarily apply monkey patch to
# `pkg_resources.NullProvider.__init__`
init_backup = pkg_resources.NullProvider.__init__
pkg_resources.NullProvider.__init__ = init
src_path = pkg_resources.resource_filename(extension_name, "")
# Restore original `pkg_resources.NullProvider.__init__`
pkg_resources.NullProvider.__init__ = init_backup
return src_path
@property
def printable(self):
"""Return a list of extension names and related parent packages
for printing.
"""
return ["{} (source: {})".format(e.name, e.module_name.split(".")[0])
for e in self.list]
@property
def paths(self):
"""Return a list of paths from the discovered extensions.
"""
return [self.extension_path(e.module_name) for e in self.list]
class _PynqLoggingFormatter(logging.Formatter):
FORMATS = {
logging.ERROR: "ERROR: %(msg)s",
logging.WARNING: "WARNING: %(msg)s",
logging.DEBUG: "DEBUG: %(module)s: %(lineno)d: %(msg)s",
"DEFAULT": "%(msg)s",
}
def format(self, record):
log_fmt = self.FORMATS.get(record.levelno, self.FORMATS["DEFAULT"])
formatter = logging.Formatter(log_fmt)
return formatter.format(record)
[docs]def get_logger(level=logging.INFO, force_lvl=False):
"""Returns an instance of the pynq.utils logger.
Parameters
----------
level: str or int
String or integer constant representing the logging level following
Python's logging standard levels. By default, the level is not
updated if the current level is higher, unless `force_lvl` is set
to `True`.
force_lvl: bool
If `True`, sets the logging level to `level` in any case.
"""
levels = {
"critical": logging.CRITICAL,
"error": logging.ERROR,
"warning": logging.WARNING,
"info": logging.INFO,
"debug": logging.DEBUG
}
logger = logging.getLogger(__name__)
if not logger.handlers:
ch = logging.StreamHandler()
ch.setFormatter(_PynqLoggingFormatter())
logger.addHandler(ch)
logger_lvl = logger.getEffectiveLevel()
if type(level) is str:
level = levels[level.lower()]
if level > logger_lvl or force_lvl:
logger.setLevel(level)
return logger
def _detect_devices(active_only=False):
"""Return a list containing all the detected devices names."""
from pynq.pl_server import Device
devices = Device.devices
if not devices:
raise RuntimeError("No device found in the system")
if active_only:
return Device.active_device.name
return [d.name for d in devices]
[docs]class DownloadedFileChecksumError(Exception):
"""This exception is raised when a downloaded file has an incorrect
checksum."""
pass
def _download_file(download_link, path, md5sum=None):
"""Download a file from the web.
Parameters
----------
download_link: str
The download link to use
path: str
The path where to save the file. The path must include the target
file
md5sum: str or None
If specified, it is used after download to check for correctness.
Raises a `DownloadedFileChecksumError` exception when the checksum
is incorrect, and deletes the downloaded file.
"""
import urllib.request
import hashlib
with urllib.request.urlopen(download_link) as response, \
open(path, "wb") as out_file:
data = response.read()
out_file.write(data)
if md5sum:
file_md5sum = hashlib.md5()
with open(path, "rb") as out_file:
for chunk in iter(lambda: out_file.read(4096), b""):
file_md5sum.update(chunk)
if md5sum != file_md5sum.hexdigest():
os.remove(path)
raise DownloadedFileChecksumError("Incorrect checksum for file "
"'{}'. The file has been "
"deleted as a result".format(
path))
def _find_local_overlay_res(device_name, overlay_res_filename, src_path):
"""Inspects ``overlay_res.ext.d` directory for an available
``overlay_res.ext`` file for ``device_name``.
Returns ``None`` if ``device_name`` is not found.
If a ``overlay_res.ext`` file is also found, always return that one
without doing any resolution based on ``device_name``.
Parameters
----------
device_name: str
The target device name
overlay_res_filename: str
The target filename to resolve
src_path: str
The path where to perform this search
"""
overlay_res_path = os.path.join(src_path, overlay_res_filename)
if os.path.isfile(overlay_res_path):
return overlay_res_path
overlay_res_filename_split = os.path.splitext(overlay_res_filename)
overlay_res_filename_ext = "{}.{}{}".format(overlay_res_filename_split[0],
device_name,
overlay_res_filename_split[1])
overlay_res_path = os.path.join(src_path, overlay_res_filename + ".d",
overlay_res_filename_ext)
if os.path.isfile(overlay_res_path):
return overlay_res_path
return None
def _find_remote_overlay_res(device_name, links_json_path):
"""Get download link for ``overlay_res.ext`` file and related checksum from
``overlay_res.ext.link`` json file, based on ``device_name``.
The ``.link`` file is generally a dict of device names and associated url
and md5sum.
.. code-block:: python3
{
"device_1": {
"url": "https://link.to/overlay.xclbin",
"md5sum": "da1e100gh8e7becb810976e37875de38"
}.
"device_2": {
"url": "https://link.to/overlay.xclbin",
"md5sum": "da1e100gh8e7becb810976e37875de38"
}
}
Expected return content from the ``.link`` json file is a dict with two
entries:
.. code-block:: python3
{
"url": "https://link.to/overlay.xclbin",
"md5sum": "da1e100gh8e7becb810976e37875de38"
}
Returns `None` if ``device_name`` is not found.
If the ``.link`` file contains a *url* and *md5sum* entries at the top
level, these are returned and no device-based resolution is performed.
Parameters
----------
device_name: str
The target device name
links_json_path: str
The full path to the ``.link`` json file
"""
with open(links_json_path) as f:
links = json.load(f)
if "url" in links and "md5sum" in links:
return {"url": links["url"], "md5sum": links["md5sum"]}
if device_name in links:
return links[device_name]
return None
[docs]class OverlayNotFoundError(Exception):
"""This exception is raised when an overlay for the target device could not
be located."""
pass
def _resolve_overlay_res_from_folder(device_name, overlay_res_folder, src_path,
dst_path, rel_path, files_to_copy):
"""Resolve ``overlay_res.ext`` file from ``overlay_res.ext.d`` folder,
based on ``device_name``. Updates ``files_to_copy`` with the resolved file
to use. If a ``overlay_res.ext.link`` file is found, resolution is skipped
here. This is to avoid inspecting the ``overlay_res.ext.d`` folder twice.
See ``_resolve_overlay_res_from_link()``.
"""
overlay_res_filename = os.path.splitext(overlay_res_folder)[0]
# Avoid checking a .d folder twice when also a
# related .link file is found
if not os.path.isfile(os.path.join(src_path,
overlay_res_filename + ".link")):
overlay_res_src_path = _find_local_overlay_res(device_name,
overlay_res_filename,
src_path)
if overlay_res_src_path:
overlay_res_dst_path = os.path.join(dst_path, rel_path,
overlay_res_filename)
files_to_copy[overlay_res_src_path] = overlay_res_dst_path
else:
raise OverlayNotFoundError(overlay_res_filename)
def _resolve_overlay_res_from_link(device_name, overlay_res_link, src_path,
dst_path, rel_path, files_to_copy,
files_to_move, logger):
"""Resolve ``overlay_res.ext`` file from ``overlay_res.ext.link`` file,
based on ``device_name``. Updates ``files_to_copy`` with the resolved file
to use if found locally (by inspecting ``overlay_res.ext.d`` folder), or
updates ``files_to_move`` in case the file is downloaded.
"""
overlay_res_filename = os.path.splitext(overlay_res_link)[0]
overlay_res_dst_path = os.path.join(dst_path, rel_path,
overlay_res_filename)
overlay_res_src_path = _find_local_overlay_res(device_name,
overlay_res_filename,
src_path)
if overlay_res_src_path:
files_to_copy[overlay_res_src_path] = overlay_res_dst_path
else:
overlay_res_download_dict = _find_remote_overlay_res(
device_name, os.path.join(src_path, overlay_res_link))
if overlay_res_download_dict:
# attempt overlay_res.ext file download
try:
tmp_file = tempfile.mkstemp()[1]
logger.info("Downloading file '{}'. This may take a while"
"...".format(overlay_res_filename))
_download_file(
overlay_res_download_dict["url"],
tmp_file,
overlay_res_download_dict["md5sum"]
)
files_to_move[tmp_file] = overlay_res_dst_path
except DownloadedFileChecksumError:
raise OverlayNotFoundError(overlay_res_filename)
else:
raise OverlayNotFoundError(overlay_res_filename)
def _copy_and_move_files(files_to_copy, files_to_move):
"""Copy and move files and folders. ``files_to_copy`` and ``files_to_move``
are expected to be dict where the key is the source path, and the value is
destination path.
"""
# copy files and folders
for src, dst in files_to_copy.items():
if os.path.isfile(src):
mkpath(os.path.dirname(dst))
copy_file(src, dst)
else:
copy_tree(src, dst)
# and move files previously downloaded
for src, dst in files_to_move.items():
shutil.move(src, dst)
def _roll_back_copy(files_to_copy, files_to_move):
"""Roll-back previously performed copy of files and folders.
``files_to_copy`` and ``files_to_move`` are expected to be dict where the
key is the source path, and the value is destination path.
"""
for _, dst in files_to_copy.items():
if os.path.isfile(dst):
os.remove(dst)
while(len(os.listdir(os.path.dirname(dst))) == 0):
os.rmdir(os.path.dirname(dst))
dst = os.path.dirname(dst)
elif os.path.isdir(dst):
remove_tree(dst)
for _, dst in files_to_move.items():
if os.path.isfile(dst):
os.remove(dst)
while(len(os.listdir(os.path.dirname(dst))) == 0):
os.rmdir(os.path.dirname(dst))
dst = os.path.dirname(dst)
[docs]def deliver_notebooks(device_name, src_path, dst_path, name, folder=False,
overlays_res_lookup=True):
"""Deliver notebooks to target destination path.
If a ``overlay_res.ext.link`` file or a ``overlay_res.ext.d`` folders is
found, then ``overlay_res.ext`` (where ``.ext`` represents a generic file
extension) is considered to be a file that need to be resolved dynamically,
based on ``device_name``.
The following resolution strategy is applied when inspecting ``src_path``:
1. If an ``overlay_res.ext`` file is found, prioritize that file and do
not perform any resolution.
2. In case step 1 fails, if a ``overlay_res.ext.d`` folder is found,
try to retrieve the right ``overlau_res.ext`` file from there. The
files in this folder are expected to contain the device name as a
string, before the file extension ``.ext``.
Format should be ``overlay_res.device_name.ext``.
3. In case step 2 fails, if there is an ``overlay_res.ext.link`` file,
attempt to download the correct file from the provided url, assumed
that a valid entry for ``device_name`` is available in the ``.link``
json file.
4. If all steps fail, notebooks that are in the same folder as
``overlay_res.ext`` are not delivered, and the user is warned.
For simplicity, it is assumed that ``.link`` files and ``.d`` folders are
located next to the notebooks that use the associated resource. Folders
that does not contain notebooks will not be inspected.
In case no ``.link`` or ``overlay_res.d`` files are found, notebooks are
simply copied as is, no resolution is performed.
It is assumed that for this scenario, overlays are delivered somewhere
else.
Parameters
----------
device_name: str
The target device name to use when doing resolution of ``.link``
files and ``.d`` folders. If an ``overlay_res.ext`` file is also
found, no resolution will be done and ``device_name`` will be
ignored, as it is assumed that the ``overlay_res.ext`` file is
prioritized and no automatic resolution is expected
src_path: str
The source path to copy from
dst_path: str
The destination path to copy to
name: str
The name of the notebooks module
folder: bool
Indicates whether to use ``name`` as target folder to copy
notebooks, inside ``dst_path``. Notebooks will be copied directly
in ``dst_path`` if ``False``.
overlays_res_lookup: bool
Dynamic resolution of ``.link`` files and ``.d`` folders is
disabled if ```False``.
"""
logger = get_logger()
dst_fullpath = os.path.join(dst_path, name) if folder else dst_path
files_to_copy = {}
files_to_move = {}
for root, dirs, files in os.walk(src_path):
# If there is at least one notebook, inspect the folder
if [f for f in files if f.endswith(".ipynb")]:
# If folder is in the list of files to copy, remove it as it is
# going to be inspected
if root in files_to_copy:
files_to_copy.pop(root)
relpath = os.path.relpath(root, src_path)
relpath = "" if relpath == "." else relpath
try:
files_to_copy_tmp = {}
files_to_move_tmp = {}
for d in dirs:
if d.endswith(".d"):
if overlays_res_lookup:
_resolve_overlay_res_from_folder(
device_name, d, root, dst_fullpath, relpath,
files_to_copy_tmp)
elif d != "__pycache__": # exclude __pycache__ folder
dir_dst_path = os.path.join(dst_fullpath, relpath, d)
files_to_copy_tmp[os.path.join(root, d)] = \
dir_dst_path
for f in files:
if f.endswith(".link"):
if overlays_res_lookup:
_resolve_overlay_res_from_link(
device_name, f, root, dst_fullpath, relpath,
files_to_copy_tmp, files_to_move_tmp, logger)
else:
file_dst_path = os.path.join(dst_fullpath, relpath, f)
files_to_copy_tmp[os.path.join(root, f)] = \
file_dst_path
# No OverlayNotFoundError exception raised, can add
# files_to_copy_tmp to files_to_copy
files_to_copy.update(files_to_copy_tmp)
# and files_to_move_tmp to files_to_move
files_to_move.update(files_to_move_tmp)
except OverlayNotFoundError as e:
# files_to_copy not updated, folder skipped
if relpath:
nb_str = os.path.join(name, relpath)
logger.info("Could not resolve file '{}' in folder "
"'{}', notebooks will not be "
"delivered".format(str(e), nb_str))
try:
# exclude root __init__.py from copy, if it exists
files_to_copy.pop(os.path.join(src_path, "__init__.py"))
except KeyError:
pass
try:
if not files_to_copy:
logger.info("The notebooks module '{}' could not be delivered. "
"The module has no notebooks, or no valid overlays "
"were found".format(name))
else:
_copy_and_move_files(files_to_copy, files_to_move)
except (Exception, KeyboardInterrupt) as e:
# roll-back copy
logger.info("Exception detected. Cleaning up as the delivery process "
"did not complete...")
_roll_back_copy(files_to_copy, files_to_move)
raise e
def _resolve_global_overlay_res(overlay_res_link, src_path, logger,
fail=False):
"""Resolve resource that is global to every device (using a ``device=None``
when calling ``_find_remote_overlay_res``). File is downloaded in
``src_path``.
"""
overlay_res_filename = os.path.splitext(overlay_res_link)[0]
overlay_res_download_dict = \
_find_remote_overlay_res(None,
os.path.join(src_path, overlay_res_link))
if overlay_res_download_dict:
overlay_res_fullpath = os.path.join(
src_path, overlay_res_filename)
try:
logger.info("Downloading file '{}'. "
"This may take a while"
"...".format(
overlay_res_filename))
_download_file(
overlay_res_download_dict["url"],
overlay_res_fullpath,
overlay_res_download_dict["md5sum"])
except Exception as e:
if fail:
raise e
finally:
if not os.path.isfile(
overlay_res_fullpath):
err_msg = "Could not resolve file '{}'".format(
overlay_res_filename)
logger.info(err_msg)
else:
return True # overlay_res_download_dict was not empty
return False
def _resolve_devices_overlay_res_helper(device, src_path, overlay_res_filename,
overlay_res_link, overlay_res_fullpath,
logger, fail=False,
overlay_res_download_path=None):
"""Helper function for `_resolve_devices_overlay_res`."""
overlay_res_src_path = _find_local_overlay_res(device,
overlay_res_filename,
src_path)
err_msg = "Could not resolve file '{}' for " \
"device '{}'".format(overlay_res_filename, device)
if not overlay_res_src_path:
overlay_res_download_dict = _find_remote_overlay_res(
device, os.path.join(src_path, overlay_res_link))
if overlay_res_download_dict:
if overlay_res_download_path:
mkpath(overlay_res_download_path)
try:
logger.info("Downloading file '{}'. This may take a while"
"...".format(overlay_res_filename))
_download_file(
overlay_res_download_dict["url"],
overlay_res_fullpath,
overlay_res_download_dict["md5sum"])
except Exception as e:
if fail:
raise e
finally:
if not os.path.isfile(
overlay_res_fullpath):
logger.info(err_msg)
if overlay_res_download_path and \
len(os.listdir(overlay_res_download_path)) == 0:
os.rmdir(overlay_res_download_path)
else:
if fail:
raise OverlayNotFoundError(err_msg)
logger.info(err_msg)
def _resolve_devices_overlay_res(overlay_res_link, src_path, devices, logger,
fail=False):
"""Resolve ``overlay_res.ext`` file for every device in ``devices``.
Files are downloaded in a ``overlay_res.ext.d`` folder in ``src_path``.
If the device is only one and is an edge device, file is resolved directly
to ``overlay_res.ext``.
"""
from pynq.pl_server.device import Device
from pynq.pl_server.embedded_device import EmbeddedDevice
overlay_res_filename = os.path.splitext(overlay_res_link)[0]
if len(devices) == 1 and type(Device.devices[0]) == EmbeddedDevice:
overlay_res_fullpath = os.path.join(src_path, overlay_res_filename)
_resolve_devices_overlay_res_helper(devices[0], src_path,
overlay_res_filename,
overlay_res_link,
overlay_res_fullpath, logger, fail)
return
for device in devices:
overlay_res_download_path = os.path.join(
src_path, overlay_res_filename + ".d")
overlay_res_filename_split = \
os.path.splitext(overlay_res_filename)
overlay_res_filename_ext = "{}.{}{}".format(
overlay_res_filename_split[0], device,
overlay_res_filename_split[1])
overlay_res_fullpath = os.path.join(overlay_res_download_path,
overlay_res_filename_ext)
_resolve_devices_overlay_res_helper(device, src_path,
overlay_res_filename,
overlay_res_link,
overlay_res_fullpath, logger, fail,
overlay_res_download_path)
def _resolve_all_overlay_res_from_link(overlay_res_link, src_path, logger,
fail=False):
"""Resolve every entry of ``.link`` files regardless of detected devices.
"""
overlay_res_filename = os.path.splitext(overlay_res_link)[0]
with open(os.path.join(src_path, overlay_res_link)) as f:
links = json.load(f)
if not _resolve_global_overlay_res(overlay_res_link, src_path, logger,
fail):
for device, download_link_dict in links.items():
if not _find_local_overlay_res(
device, overlay_res_filename, src_path):
err_msg = "Could not resolve file '{}' for " \
"device '{}'".format(overlay_res_filename, device)
overlay_res_download_path = os.path.join(
src_path, overlay_res_filename + ".d")
overlay_res_filename_split = \
os.path.splitext(overlay_res_filename)
overlay_res_filename_ext = "{}.{}{}".format(
overlay_res_filename_split[0], device,
overlay_res_filename_split[1])
mkpath(overlay_res_download_path)
overlay_res_fullpath = os.path.join(
overlay_res_download_path,
overlay_res_filename_ext)
try:
logger.info("Downloading file '{}'. "
"This may take a while"
"...".format(
overlay_res_filename))
_download_file(
download_link_dict["url"],
overlay_res_fullpath,
download_link_dict["md5sum"])
except Exception as e:
if fail:
raise e
finally:
if not os.path.isfile(
overlay_res_fullpath):
logger.info(err_msg)
if len(os.listdir(
overlay_res_download_path)) == 0:
os.rmdir(overlay_res_download_path)
[docs]def download_overlays(path, download_all=False, fail_at_lookup=False,
fail_at_device_detection=False, cleanup=False):
"""Download overlays for detected devices in destination path.
Resolve ``overlay_res.ext`` files from ``overlay_res.ext.link``
json files. Downloaded ``overlay_res.ext`` files are put in a
``overlay_res.ext.d`` directory, with the device name added to their
filename, as ``overlay_res.device_name.ext``.
If the detected device is only one and is an edge device, target file is
resolved directly to ``overlay_res.ext``.
If target ``overlay_res.ext`` already exists, resolution is skipped.
Parameters
----------
path: str
The path to inspect for overlays installation
download_all: bool
Causes all overlays to be downloaded from .link files, regardless
of the detected devices.
fail_at_lookup: bool
Determines whether the function should raise an exception in case
overlay lookup fails.
fail_at_device_detection: bool
Determines whether the function should raise an exception in case
no device is detected.
cleanup: bool
Dictates whether .link files need to be deleted after resolution.
If `True`, all .link files are removed as last step.
"""
logger = get_logger()
try:
devices = _detect_devices()
except RuntimeError as e:
if fail_at_device_detection:
raise e
devices = []
cleanup_list = []
for root, dirs, files in os.walk(path):
for f in files:
if f.endswith(".link"):
if not download_all:
if not _resolve_global_overlay_res(f, root, logger,
fail_at_lookup):
_resolve_devices_overlay_res(f, root, devices, logger,
fail_at_lookup)
else: # download all overlays regardless of detected devices
_resolve_all_overlay_res_from_link(f, root, logger,
fail_at_lookup)
if cleanup:
cleanup_list.append(os.path.join(root, f))
for f in cleanup_list:
os.remove(f)
class _download_overlays(dist_build):
"""Custom distutils command to download overlays using .link files."""
description = "Download overlays using .link files"
user_options = [("download-all", "a",
"forcibly download every overlay from .link files, "
"overriding download based on detected devices"),
("force-fail", "f",
"Do not complete setup if overlays lookup fails.")]
boolean_options = ["download-all", "force-fail"]
def initialize_options(self):
self.download_all = False
self.force_fail = False
def finalize_options(self):
pass
def run(self):
cmd = self.get_finalized_command("build_py")
for package, _, build_dir, _ in cmd.data_files:
if "." not in package: # sub-packages are skipped
download_overlays(build_dir,
download_all=self.download_all,
fail_at_lookup=self.force_fail)
[docs]class build_py(_build_py):
"""Overload the standard setuptools 'build_py' command to also call the
command 'download_overlays'.
"""
[docs] def run(self):
super().run()
self.run_command("download_overlays")
[docs]class NotebookResult:
"""Class representing the result of executing a notebook
Contains members with the form ``_[0-9]*`` with the output object for
each cell or ``None`` if the cell did not return an object.
The raw outputs are available in the ``outputs`` attribute. See the
Jupyter documentation for details on the format of the dictionary
"""
def __init__(self, nb):
self.outputs = [
c['outputs'] for c in nb['cells'] if c['cell_type'] == 'code'
]
objects = json.loads(self.outputs[-1][0]['text'])
for i, o in enumerate(objects):
setattr(self, "_" + str(i+1), o)
def _create_code(num):
call_line = "print(json.dumps([{}], default=_default_repr))".format(
", ".join(("_resolve_global('_{}')".format(i+1) for i in range(num))))
return _function_text + call_line
[docs]def run_notebook(notebook, root_path=".", timeout=30, prerun=None):
"""Run a notebook in Jupyter
This function will copy all of the files in ``root_path`` to a
temporary directory, run the notebook and then return a
``NotebookResult`` object containing the outputs for each cell.
The notebook is run in a separate process and only objects that
are serializable will be returned in their entirety, otherwise
the string representation will be returned instead.
Parameters
----------
notebook : str
The notebook to run relative to ``root_path``
root_path : str
The root notebook folder (default ".")
timeout : int
Length of time to run the notebook in seconds (default 30)
prerun : function
Function to run prior to starting the notebook, takes the
temporary copy of root_path as a parameter
"""
import nbformat
from nbconvert.preprocessors import ExecutePreprocessor
with tempfile.TemporaryDirectory() as td:
workdir = os.path.join(td, 'work')
notebook_dir = os.path.join(workdir, os.path.dirname(notebook))
shutil.copytree(root_path, workdir)
if prerun is not None:
prerun(workdir)
fullpath = os.path.join(workdir, notebook)
with open(fullpath, "r") as f:
nb = nbformat.read(f, as_version=4)
ep = ExecutePreprocessor(kernel_name='python3', timeout=timeout)
code_cells = [c for c in nb['cells'] if c['cell_type'] == 'code']
nb['cells'].append(
nbformat.from_dict({'cell_type': 'code',
'metadata': {},
'source': _create_code(len(code_cells))}
))
ep.preprocess(nb, {'metadata': {'path': notebook_dir}})
return NotebookResult(nb)
def _default_repr(obj):
return repr(obj)
[docs]class ReprDict(dict):
"""Subclass of the built-in dict that will display using the Jupyterlab
JSON repr.
The class is recursive in that any entries that are also dictionaries
will be converted to ReprDict objects when returned.
"""
def __init__(self, *args, rootname="root", expanded=False, **kwargs):
"""Dictionary constructor
Parameters
----------
rootname : str
The value to display at the root of the tree
expanded : bool
Whether the view of the tree should start expanded
"""
self._rootname = rootname
self._expanded = expanded
super().__init__(*args, **kwargs)
def _repr_json_(self):
return json.loads(json.dumps(self, default=_default_repr)), \
{'expanded': self._expanded, 'root': self._rootname}
def __getitem__(self, key):
obj = super().__getitem__(key)
if type(obj) is dict:
return ReprDict(obj, expanded=self._expanded, rootname=key)
else:
return obj