# Copyright (c) 2016, Xilinx, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION). HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
__author__ = "Peter Ogden"
__copyright__ = "Copyright 2017, Xilinx"
__email__ = "ogden@xilinx.com"
import pycparser
import struct
import functools
import itertools
from pycparser import c_ast
from pycparser import c_generator
from pycparser.plyparser import ParseError
from copy import deepcopy
from .compile import preprocess
from .streams import SimpleMBStream
from .streams import InterruptMBStream
from . import MicroblazeProgram
# Use a global parser and generator
_parser = pycparser.CParser()
_generator = c_generator.CGenerator()
# First we define a series of classes to represent types
# Each class is responsible for one particular type of C
# types
[docs]class PrimitiveWrapper:
""" Wrapper for C primitives that can be represented by
a single Struct string.
"""
def __init__(self, struct_string, type_):
self._struct = struct.Struct(struct_string)
self.typedefname = None
self.blocks = False
self._type = type_
[docs] def param_encode(self, old_val):
return self._struct.pack(old_val)
[docs] def param_decode(self, old_val, stream):
pass
[docs] def return_decode(self, stream):
data = stream.read(self._struct.size)
return self._struct.unpack(data)[0]
[docs] def pre_argument(self, name):
commands = []
commands.append(_generate_decl(name, self._type))
commands.append(_generate_read(name))
return commands
[docs] def post_argument(self, name):
return []
[docs]class VoidPointerWrapper:
""" Wrapper for a void* pointer that will refer to a
physically contiguous chunk of memory.
"""
def __init__(self, type_):
self._type = type_
self.typedefname = None
self.blocks = False
self._ptrstruct = struct.Struct('I')
[docs] def param_encode(self, old_val):
return self._ptrstruct.pack(old_val.physical_address)
[docs] def param_decode(self, old_val, stream):
pass
[docs] def return_decode(self, stream):
raise RuntimeError("Cannot return a void*")
[docs] def pre_argument(self, name):
commands = []
commands.append(_generate_decl(
name + '_int',
c_ast.TypeDecl(name + '_int', [],
c_ast.IdentifierType(['unsigned', 'int']))))
commands.append(_generate_read(name +'_int'))
commands.append(c_ast.Assignment(
'|=',
c_ast.ID(name + '_int'),
c_ast.Constant('int', '0x20000000')))
commands.append(
c_ast.Decl(name, [], [], [],
c_ast.PtrDecl(
[], c_ast.TypeDecl(name, [],
c_ast.IdentifierType(['void'])),
),
c_ast.Cast(
c_ast.Typename(
None, [], c_ast.PtrDecl(
[], c_ast.TypeDecl(
None, [],
c_ast.IdentifierType(['void'])))),
c_ast.ID(name + '_int')),
[]
))
return commands
[docs] def post_argument(self, name):
return []
[docs]class ConstPointerWrapper:
""" Wrapper for const T pointers, transfers data in only
one direction.
"""
def __init__(self, type_, struct_string):
self._lenstruct = struct.Struct('h')
self._struct_string = struct_string
self.typedefname = None
self.blocks = False
self._type = type_
[docs] def param_encode(self, old_val):
packed = struct.pack(self._struct_string * len(old_val), *old_val)
return self._lenstruct.pack(len(old_val)) + packed
[docs] def param_decode(self, old_val, stream):
pass
[docs] def return_decode(self, stream):
raise RuntimeError("Cannot use a const T* decoder as a return value")
[docs] def pre_argument(self, name):
commands = []
commands.append(
_generate_decl(
name + '_len',
c_ast.TypeDecl(name + '_len', [],
c_ast.IdentifierType(['unsigned', 'short']))))
commands.append(_generate_read(name + '_len'))
commands.append(_generate_arraydecl(name,
self._type,
c_ast.ID(name + '_len')))
commands.append(_generate_read(name, address=False))
return commands
[docs] def post_argument(self, name):
return []
[docs]class PointerWrapper:
""" Wrapper for non-const T pointers that retrieves any
data modified by the called function.
"""
def __init__(self, type_, struct_string):
self._lenstruct = struct.Struct('h')
self._struct_string = struct_string
self.typedefname = None
self.blocks = True
self._type = type_
[docs] def param_encode(self, old_val):
packed = struct.pack(self._struct_string * len(old_val), *old_val)
return self._lenstruct.pack(len(old_val)) + packed
[docs] def param_decode(self, old_val, stream):
data = stream.read(self._lenstruct.size)
length = self._lenstruct.unpack(data)[0]
assert(length == len(old_val))
data = stream.read(length * struct.calcsize(self._struct_string))
old_val[:] = struct.unpack(self._struct_string * len(old_val), data)
[docs] def return_decode(self, stream):
raise RuntimeError("Cannot use a T* decoder as a return value")
[docs] def pre_argument(self, name):
commands = []
commands.append(
_generate_decl(
name + '_len',
c_ast.TypeDecl(name + '_len', [],
c_ast.IdentifierType(['unsigned', 'short']))))
commands.append(_generate_read(name + '_len'))
commands.append(_generate_arraydecl(name,
self._type,
c_ast.ID(name + '_len')))
commands.append(_generate_read(name, address=False))
return commands
[docs] def post_argument(self, name):
commands = []
commands.append(_generate_write(name + '_len'))
commands.append(_generate_write(name, address=False))
return commands
[docs]class VoidWrapper:
""" Wraps void - only valid for return types
"""
def __init__(self):
self.typedefname = None
self.blocks = False
[docs] def param_encode(self, old_val):
return b''
[docs] def param_decode(self, old_val, stream):
pass
[docs] def return_decode(self, stream):
return None
[docs] def pre_argument(self, name):
return []
[docs] def post_argument(self, name):
return []
def _type_to_struct_string(tdecl):
if type(tdecl) is not c_ast.TypeDecl:
raise RuntimeError("Unsupport Type")
names = tdecl.type.names
signed = True
if len(names) > 1:
if names[0] == 'unsigned':
signed = False
name = names[1]
else:
name = names[0]
if name == 'void':
return ''
if name in ['long', 'int']:
if names.count('long') == 2:
if signed:
return 'q'
else:
return 'Q'
else:
if signed:
return 'l'
else:
return 'L'
if name == 'short':
if signed:
return 'h'
else:
return 'H'
if name == 'char':
if signed:
return 'b'
else:
return 'B'
if name == 'float':
return 'f'
raise RuntimeError('Unknown type {}'.format(name))
def _type_to_interface(tdecl, typedefs):
""" Returns a wrapper for a given C AST
"""
if type(tdecl) is c_ast.PtrDecl:
nested_type = tdecl.type
if type(nested_type) is not c_ast.TypeDecl:
raise RuntimeError("Only single level pointers supported")
struct_string = _type_to_struct_string(nested_type)
if struct_string:
if 'const' in nested_type.quals:
return ConstPointerWrapper(tdecl, struct_string)
else:
return PointerWrapper(tdecl, struct_string)
else:
return VoidPointerWrapper(tdecl)
elif type(tdecl) is not c_ast.TypeDecl:
raise RuntimeError("Unsupport Type")
names = tdecl.type.names
if len(names) == 1 and names[0] in typedefs:
interface = _type_to_interface(typedefs[names[0]], typedefs)
interface.typedefname = names[0]
return interface
struct_string = _type_to_struct_string(tdecl)
if struct_string:
return PrimitiveWrapper(struct_string, tdecl)
else:
return VoidWrapper()
def _generate_read(name, size=None, address=True):
""" Helper function to generate read functions. size
should be an AST fragment
"""
if size is None:
size = c_ast.UnaryOp('sizeof', c_ast.ID(name))
if address:
target = c_ast.UnaryOp('&', c_ast.ID(name))
else:
target = c_ast.ID(name)
return c_ast.FuncCall(
c_ast.ID('_rpc_read'),
c_ast.ExprList([target,
size]))
def _generate_write(name, address=True):
""" Helper function generate write functions
"""
if address:
target = c_ast.UnaryOp('&', c_ast.ID(name))
else:
target = c_ast.ID(name)
return c_ast.FuncCall(
c_ast.ID('_rpc_write'),
c_ast.ExprList([target,
c_ast.UnaryOp('sizeof', c_ast.ID(name))]))
def _generate_decl(name, decl):
""" Generates a new declaration with a difference name
but same type as the provided decl.
"""
typedecl = c_ast.TypeDecl(name, [], decl.type)
return c_ast.Decl(name, [], [], [], typedecl, [], [])
def _generate_arraydecl(name, decl, length):
""" Generates a new declaration with an array type
base on an existing declaration
"""
typedecl = c_ast.TypeDecl(name, [], decl.type)
arraydecl = c_ast.ArrayDecl(typedecl, length, [])
return c_ast.Decl(name, [], [], [], arraydecl, [], [])
[docs]class FuncAdapter:
"""Provides the C and Python interfaces for a function declaration
Attributes
----------
return_interface : TypeWrapper
The type wrapper for the return type
arg_interfaces : [TypeWrapper]
An array of type wrappers for the arguments
call_ast : pycparser.c_ast
Syntax tree for the wrapped function call
"""
def __init__(self, decl, typedefs):
self.return_interface = _type_to_interface(decl.type, typedefs)
self.name = decl.type.declname
self.docstring = ""
self.arg_interfaces = []
self.blocks = False
block_contents = []
post_block_contents = []
func_args = []
if decl.args:
for i, arg in enumerate(decl.args.params):
if type(arg) is c_ast.EllipsisParam:
raise RuntimeError("vararg functions not supported")
interface = _type_to_interface(arg.type, typedefs)
if type(interface) is VoidWrapper:
continue
block_contents.extend(interface.pre_argument('arg' + str(i)))
post_block_contents.extend(interface.post_argument(
'arg' + str(i)))
func_args.append(c_ast.ID('arg' + str(i)))
self.arg_interfaces.append(interface)
self.blocks = self.blocks | interface.blocks
function_call = c_ast.FuncCall(c_ast.ID(self.name),
c_ast.ExprList(func_args))
self.returns = type(self.return_interface) is not VoidWrapper
if self.returns:
ret_assign = c_ast.Decl(
'ret', [], [], [],
c_ast.TypeDecl('ret', [], decl.type.type),
function_call, []
)
block_contents.append(ret_assign)
block_contents.append(_generate_write('return_command'))
block_contents.append(_generate_write('ret'))
self.blocks = True
else:
block_contents.append(function_call)
if self.blocks:
block_contents.append(_generate_write('return_command'))
else:
block_contents.append(_generate_write('void_command'))
block_contents.extend(post_block_contents)
self.call_ast = c_ast.Compound(block_contents)
self.filename = decl.coord.file
[docs] def pack_args(self, *args):
"""Create a bytes of the provided arguments
"""
if len(args) != len(self.arg_interfaces):
raise RuntimeError(
"Wrong number of arguments: expected{0} got {1}".format(
len(self.arg_interfaces), len(args)
))
return b''.join(
[f.param_encode(a) for f, a in itertools.zip_longest(
self.arg_interfaces, args
)]
)
[docs] def receive_response(self, stream, *args):
"""Reads the response stream, updates arguments and
returns the value of the function call if applicable
"""
return_value = self.return_interface.return_decode(stream)
if len(args) != len(self.arg_interfaces):
raise RuntimeError(
"Wrong number of arguments: expected{0} got {1}".format(
len(self.arg_interfaces), len(args)
))
[f.param_decode(a, stream) for f, a in itertools.zip_longest(
self.arg_interfaces, args
)]
return return_value
[docs]class ParsedEnum:
"""Holds the values of an enum from the C source
"""
def __init__(self):
self.name = None
self.items = {}
[docs]class FuncDefVisitor(pycparser.c_ast.NodeVisitor):
"""Primary visitor that parses out function definitions,
typedes and enumerations from a syntax tree
"""
def __init__(self):
self.functions = {}
self.typedefs = {}
self.enums = []
self.defined = []
[docs] def visit_Typedef(self, node):
self.typedefs[node.name] = node.type
[docs] def visit_FuncDef(self, node):
self.defined.append(node.decl.name)
self.visit(node.decl)
[docs] def visit_FuncDecl(self, node):
if node.coord.file.startswith('/opt/microblaze'):
return
if type(node.type) is not c_ast.TypeDecl:
# Ignore functions that are returning pointers
return
name = node.type.declname
if 'static' in node.type.quals:
# Don't process static functions
return
try:
self.functions[name] = FuncAdapter(node, self.typedefs)
except RuntimeError as e:
if node.coord.file == '<stdin>':
print("Could not create interface for funcion {}: {}".format(
name, e))
[docs] def visit_Enum(self, node):
enum = ParsedEnum()
if node.name:
enum.name = node.name
cur_index = 0
for entry in node.values.enumerators:
if entry.value:
cur_index = int(entry.value.value, 0)
enum.items[entry.name] = cur_index
cur_index += 1
self.enums.append(enum)
def _build_case(functions):
""" Builds the switch statement that will form the foundation
of the RPC handler
"""
cases = []
for i, func in enumerate(functions.values()):
case = c_ast.Case(
c_ast.Constant('int', str(i)),
[
func.call_ast,
c_ast.Break()
])
cases.append(case)
return c_ast.Switch(
c_ast.ID('command'),
c_ast.Compound(cases)
)
def _build_handle_function(functions):
""" Wraps the switch statement in a function definition
"""
case_statement = _build_case(functions)
available_check = c_ast.If(
c_ast.BinaryOp(
'<', c_ast.FuncCall(
c_ast.ID('mailbox_available'),
c_ast.ExprList([c_ast.Constant('int', '2')])
),
c_ast.Constant('int', '4')
),
c_ast.Return(None),
None
)
handle_decl = c_ast.FuncDecl(
None, c_ast.TypeDecl('_handle_events', [],
c_ast.IdentifierType(['void'])),
)
command_decl = c_ast.Decl('command', [], [], [],
c_ast.TypeDecl('command', [],
c_ast.IdentifierType(['int'])),
[], [])
command_read = _generate_read('command')
body = c_ast.Compound([available_check,
command_decl,
command_read,
case_statement])
return c_ast.FuncDef(handle_decl, [], body)
def _build_main(program_text, functions):
sections = []
sections.append(R"""
#include <unistd.h>
#include <mailbox_io.h>
static const char return_command = 0;
static const char void_command = 2;
static void _rpc_read(void* data, int size) {
int available = mailbox_available(2);
while (available < size) {
available = mailbox_available(2);
}
read(2, data, size);
}
static void _rpc_write(const void* data, int size) {
int available = mailbox_available(3);
while (available < size) {
available = mailbox_available(3);
}
write(3, data, size);
}
""")
sections.append(program_text)
sections.append(_generator.visit(_build_handle_function(functions)))
sections.append(R"""
int main() {
while (1) {
_handle_events();
}
}
""")
return "\n".join(sections)
def _pyprintf(stream):
format_string = stream.read_string()
in_special = False
args = []
for i in range(len(format_string)):
if in_special:
if format_string[i:i+1] in [b'd', b'x', b'X', b'o', b'u']:
args.append(stream.read_int32())
elif format_string[i:i+1] in [b'f', b'F', b'g', b'G', b'e', b'E']:
args.append(stream.read_float())
elif format_string[i:i+1] == b's':
args.append(stream.read_string().decode())
elif format_string[i:i+1] == b'c':
args.append(stream.read_byte())
in_special = False
elif format_string[i:i+1] == b'%':
in_special = True
print(format_string.decode() % tuple(args), end='')
def _handle_command(command, stream):
if command == 1: # Void return
pass
if command == 2: # print command
_pyprintf(stream)
else:
raise RuntimeError('Unknown command {}'.format(command))
def _function_wrapper(stream, index, adapter, return_type, *args):
""" Calls a function in the microblaze, designed to be used
with functools.partial to build a new thing
"""
arg_string = struct.pack('l', index)
arg_string += adapter.pack_args(*args)
stream.write(arg_string)
if not adapter.returns:
return None
command = stream.read(1)[0]
while command != 0:
_handle_command(command, stream)
command = stream.read(1)[0]
response = adapter.receive_response(stream, *args)
if return_type:
return return_type(response)
else:
return response
def _create_typedef_classes(typedefs):
""" Creates an anonymous class for each typedef in the C function
"""
classes = {}
for k, v in typedefs.items():
class Wrapper:
"""Wrapper class for a C typedef
The attributes are dynamically from the C definition using
the functions name `type_`. If a function named this way
takes `type` as the parameter it is added as a member function
otherwise it is added as a static method.
"""
def __init__(self, val):
self.val = val
def __index__(self):
return self.val
def __int__(self):
return self.val
def _call_func(self, function, *args):
return function(self.val, *args)
def __repr__(self):
return "typedef {0} containing {1}".format(type(self).__name__,
repr(self.val))
Wrapper.__name__ = k
classes[k] = Wrapper
return classes
def _filter_typedefs(typedefs, function_names):
used_typedefs = set()
for t in typedefs:
if len([f for f in function_names if f.startswith(t + "_")]) > 0:
used_typedefs.add(t)
return used_typedefs
[docs]class MicroblazeFunction:
"""Calls a specific function
"""
def __init__(self, stream, index, function, return_type):
self.stream = stream
self.index = index
self.function = function
self.return_type = return_type
def _call_function(self, *args):
arg_string = struct.pack('l', self.index)
arg_string += self.function.pack_args(*args)
self.stream.write(arg_string)
def _handle_stream(self, *args):
command = self.stream.read(1)[0]
if command != 0:
_handle_command(command, self.stream)
return None, False
return self.function.receive_response(self.stream, *args), True
def __call__(self, *args):
self._call_function(*args)
if not self.function.blocks:
return None
return_value = None
done = False
while not done:
return_value, done = self._handle_stream(*args)
if self.return_type:
return self.return_type(return_value)
else:
return return_value
[docs] async def call_async(self, *args):
self._call_function(*args)
if not self.function.blocks:
return None
return_value = None
done = False
while return_value is None:
await self.stream.wait_for_data_async()
return_value, done = self._handle_stream(*args)
if self.return_type:
return self.return_type(return_value)
else:
return return_value
[docs]class MicroblazeRPC:
""" Provides a python interface to the Microblaze based on an RPC
mechanism.
The attributes of the class are generated dynamically from the
typedefs, enumerations and functions given in the provided source.
Functions are added as methods, the values in enumerations are
added as constants to the class and types are added as classes.
"""
def __init__(self, iop, program_text):
""" Create a new RPC instance
Parameters
----------
iop : MicroblazeHierarchy or mb_info
Microblaze instance to run the RPC server on
program_text : str
Source of the program to extract functions from
"""
preprocessed = preprocess(program_text, mb_info=iop)
try:
ast = _parser.parse(preprocessed, filename='program_text')
except ParseError as e:
raise RuntimeError("Error parsing code\n" + str(e))
visitor = FuncDefVisitor()
visitor.visit(ast)
main_text = _build_main(program_text, visitor.functions)
used_typedefs = _filter_typedefs(visitor.typedefs,
visitor.functions.keys())
typedef_classes = _create_typedef_classes(visitor.typedefs)
self._mb = MicroblazeProgram(iop, main_text)
self._rpc_stream = InterruptMBStream(
self._mb, read_offset=0xFC00, write_offset=0xF800)
self._build_functions(visitor.functions, typedef_classes)
self._build_constants(visitor.enums)
self._populate_typedefs(typedef_classes, visitor.functions)
self.visitor = visitor
self.active_functions = 0
def _build_constants(self, enums):
for enum in enums:
for name, value in enum.items.items():
setattr(self, name, value)
def _build_functions(self, functions, typedef_classes):
index = 0
for k, v in functions.items():
return_type = None
if v.return_interface.typedefname:
return_type = typedef_classes[v.return_interface.typedefname]
setattr(self, k,
MicroblazeFunction(
self._rpc_stream,
index, v, return_type)
)
index += 1
def _populate_typedefs(self, typedef_classes, functions):
for name, cls in typedef_classes.items():
for fname, func in functions.items():
if fname.startswith(name + "_"):
subname = fname[len(name)+1:]
if (len(func.arg_interfaces) > 0 and
func.arg_interfaces[0].typedefname == name):
setattr(cls, subname,
functools.partialmethod(
cls._call_func, getattr(self, fname)))
else:
setattr(cls, subname, getattr(self, fname))
[docs] def reset(self):
"""Reset and free the microblaze for use by other programs
"""
self._mb.reset()
[docs] def release(self):
"""Alias for `reset()`
"""
self.reset()
[docs]class MicroblazeLibrary(MicroblazeRPC):
"""Provides simple Python-only access to a set of Microblaze libraries.
The members of this class are determined by the libraries chosen and can
determined either by using ``dir`` on the instance or the ``?`` operator
inside of IPython
"""
def __init__(self, iop, libraries):
"""Create a Python API for a list of C libraries
Parameters
----------
iop : mb_info or MicroblazeHierarchy
The IOP to load the libraries on
libraries : [str]
The names of the libraries to load
Libraries should be passed as the name of the header file containing
the desired functions but without the ``.h`` extension
"""
source_text = "\n".join(['#include <{}.h>'.format(lib)
for lib in libraries])
super().__init__(iop, source_text)