Source code for pynq.lib.pynqmicroblaze.compile

#   Copyright (c) 2016, Xilinx, Inc.
#   SPDX-License-Identifier: BSD-3-Clause


import shutil
import tempfile
from os import path
from subprocess import PIPE, Popen, run

from pynq import PL
from . import BSPs, Modules, PynqMicroblaze
from .streams import InterruptMBStream



[docs]def dependencies(source, bsp): args = ["mb-cpp", "-MM", "-D__attribute__(x)=", "-D__extension__=", "-D__asm__(x)="] for include_path in bsp.include_path: args.append("-I") args.append(include_path) paths = {} for name, module in Modules.items(): for include_path in module.include_path: args.append("-I") args.append(include_path) paths[include_path] = name source = source.replace("__extension__", "") result = run(args, stdout=PIPE, stderr=PIPE, input=source.encode()) if result.returncode: raise RuntimeError("Preprocessor failed: \n" + result.stderr.decode()) dependent_paths = result.stdout.decode() dependent_modules = {v for k, v in paths.items() if dependent_paths.find(k) != -1} return [Modules[k] for k in dependent_modules]
[docs]def recursive_dependencies(source, bsp, current_set=None): if current_set is None: current_set = set() modules = dependencies(source, bsp) for m in modules: if m not in current_set: current_set.add(m) for s in m.sources: with open(s, "r") as f: recursive_dependencies(f.read(), bsp, current_set) return list(current_set)
def _find_bsp(cell_name): target_bsp = "bsp_" + cell_name.replace("/", "_") matches = [bsp for bsp in BSPs.keys() if target_bsp.startswith(bsp)] if matches: return BSPs[max(matches, key=len)] raise RuntimeError("BSP not found for " + cell_name)
[docs]def preprocess(source, bsp=None, mb_info=None): if bsp is None: if hasattr(mb_info, "mb_info"): mb_info = mb_info.mb_info if mb_info is None: raise RuntimeError("Must provide either a BSP or mb_info") bsp = _find_bsp(mb_info["name"]) args = ["mb-cpp", "-D__attribute__(x)=", "-D__extension__=", "-D__asm__(x)="] for include_path in bsp.include_path: args.append("-I") args.append(include_path) for name, module in Modules.items(): for include_path in module.include_path: args.append("-I") args.append(include_path) source = "typedef int __builtin_va_list;\n" + source result = run(args, stdout=PIPE, stderr=PIPE, input=source.encode()) if result.returncode: raise RuntimeError("Preprocessor failed: \n" + result.stderr.decode()) return result.stdout.decode()
[docs]def checkmodule(name, mb_info): try: preprocess(f"#include <{name}.h>", mb_info=mb_info) except RuntimeError as exc: return False return True
[docs]class MicroblazeProgram(PynqMicroblaze): def __init__(self, mb_info, program_text, bsp=None): if hasattr(mb_info, "mb_info"): mb_info = mb_info.mb_info if bsp is None: bsp = _find_bsp(mb_info["name"]) mem_dict = PL.mem_dict ip_name = mb_info["ip_name"] if ip_name not in mem_dict.keys(): raise ValueError("No such IP {}.".format(ip_name)) ip_state = mem_dict[ip_name]["state"] force = False if ip_state and ip_state.startswith("/tmp/"): force = True modules = recursive_dependencies(program_text, bsp) lib_args = [] with tempfile.TemporaryDirectory() as tempdir: files = [path.join(tempdir, "main.c")] args = [ "mb-g++", "-o", path.join(tempdir, "a.out"), "-Wno-multichar", "-fno-exceptions", "-ffunction-sections", "-Wl,-gc-sections", ] args.extend(bsp.cflags) args.extend(bsp.sources) for include_path in bsp.include_path: args.append("-I{}".format(include_path)) for lib_path in bsp.library_path: lib_args.append("-L{}".format(lib_path)) for lib in bsp.libraries: lib_args.append("-l{}".format(lib)) args.append("-Wl,{}".format(bsp.linker_script)) args.extend(bsp.ldflags) for module in modules: files.extend(module.sources) for include_path in module.include_path: args.append("-I{}".format(include_path)) for lib_path in module.library_path: lib_args.append("-L{}".format(lib_path)) for lib in module.libraries: lib_args.append("-l{}".format(lib)) with open(path.join(tempdir, "main.c"), "w") as f: f.write('#line 1 "cell_magic"\n') f.write(program_text) shutil.copy(path.join(tempdir, "main.c"), "/tmp/last.c") result = run(args + files + lib_args, stdout=PIPE, stderr=PIPE) if result.returncode: raise RuntimeError(result.stderr.decode()) _ = run(["size", path.join(tempdir, "a.out")], stdout=PIPE, stderr=PIPE) result = Popen( ["nm", "-CSr", "--size-sort", path.join(tempdir, "a.out")], stdout=PIPE, stderr=PIPE, ) _ = Popen(["head", "-10"], stdin=result.stdout, stdout=PIPE, stderr=None) shutil.copy(path.join(tempdir, "a.out"), "/tmp/last.elf") result = run( [ "mb-objcopy", "-O", "binary", path.join(tempdir, "a.out"), path.join(tempdir, "a.bin"), ], stderr=PIPE, ) if result.returncode: raise RuntimeError("Objcopy Failed: {}".format(result.stderr.decode())) super().__init__(mb_info, path.join(tempdir, "a.bin"), force) self.stream = InterruptMBStream(self) self.read = self.stream.read self.write = self.stream.write self.read_async = self.stream.read_async