Source code for pynq.lib.audio

#   Copyright (c) 2016, Xilinx, Inc.
#   All rights reserved.
# 
#   Redistribution and use in source and binary forms, with or without 
#   modification, are permitted provided that the following conditions are met:
#
#   1.  Redistributions of source code must retain the above copyright notice, 
#       this list of conditions and the following disclaimer.
#
#   2.  Redistributions in binary form must reproduce the above copyright 
#       notice, this list of conditions and the following disclaimer in the 
#       documentation and/or other materials provided with the distribution.
#
#   3.  Neither the name of the copyright holder nor the names of its 
#       contributors may be used to endorse or promote products derived from 
#       this software without specific prior written permission.
#
#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
#   AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 
#   THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
#   PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
#   CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
#   EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
#   PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
#   OR BUSINESS INTERRUPTION). HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 
#   WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 
#   OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 
#   ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

__author__      = "Benedikt Janssen"
__copyright__   = "Copyright 2016, Xilinx"
__email__       = "pynq_support@xilinx.com"

import os
import math
import numpy
import cffi
import wave
import time
from pynq import PL
from pynq import GPIO
from pynq import MMIO
from pynq import DefaultHierarchy

LIB_SEARCH_PATH = os.path.dirname(os.path.realpath(__file__))


[docs]class Audio(DefaultHierarchy): """Class to interact with audio controller. Each audio sample is a 32-bit integer. The audio controller supports only mono mode, and uses pulse density modulation (PDM). Attributes ---------- mmio : MMIO The MMIO object associated with the audio controller. gpio : GPIO The GPIO object associated with the audio controller. buffer : numpy.ndarray The numpy array to store the audio. sample_rate: int Sample rate of the current buffer content. sample_len: int Sample length of the current buffer content. """ def __init__(self, description, gpio_name=None): """Return a new Audio object based on the hierarchy description. Parameters ---------- description : dict The hierarchical description of the hierarchy gpio_name : str The name of the audio path selection GPIO. If None then the GPIO pin in the hierarchy is used, otherwise the gpio_name is searched in the list of pins on the hierarchy and the PL.gpio_dict. """ super().__init__(description) self.mmio = self.d_axi_pdm_1.mmio if gpio_name is None: if len(description['gpio']) == 0: raise RuntimeError('Could not find audio path select GPIO') elif len(description['gpio']) > 1: raise RuntimeError('Multiple possible GPIO pins') pin_name = next(iter(description['gpio'].keys())) self.gpio = getattr(self, pin_name) else: if gpio_name in description['gpio']: self.gpio = getattr(self, pin_name) elif gpio_name in PL.gpio_dict: pin = GPIO.get_gpio_pin(PL.gpio_dict[gpio_name]['index']) self.gpio = GPIO(pin, 'out') else: raise RuntimeError('Provided gpio_name not found') self._ffi = cffi.FFI() self._libaudio = self._ffi.dlopen(LIB_SEARCH_PATH + "/libaudio.so") self._ffi.cdef("""unsigned int Xil_Out32(unsigned int Addr, unsigned int Value);""") self._ffi.cdef("""unsigned int Xil_In32(unsigned int Addr);""") self._ffi.cdef("""void _Pynq_record(unsigned int BaseAddr, unsigned int * BufAddr, unsigned int Num_Samles_32Bit);""") self._ffi.cdef("""void _Pynq_play(unsigned int BaseAddr, unsigned int * BufAddr, unsigned int Num_Samles_32Bit);""") char_adrp = self._ffi.from_buffer(self.mmio.mem) self._uint_adrpv = self._ffi.cast('unsigned int', char_adrp) self.buffer = numpy.zeros(0).astype(numpy.int) self.sample_rate = 0 self.sample_len = 0
[docs] def record(self, seconds): """Record data from audio controller to audio buffer. The sample rate per word is 192000Hz. Parameters ---------- seconds : float The number of seconds to be recorded. Returns ------- None """ if not 0 < seconds <= 60: raise ValueError("Recording time has to be in (0,60].") num_samples_32b = math.ceil(seconds * 192000) # Create data buffer self.buffer = numpy.zeros(num_samples_32b, dtype=numpy.int) char_datp = self._ffi.from_buffer(self.buffer) uint_datp = self._ffi.cast('unsigned int*', char_datp) # Record start = time.time() self._libaudio._Pynq_record(self._uint_adrpv, uint_datp, num_samples_32b) end = time.time() self.sample_rate = num_samples_32b / (end - start) self.sample_len = num_samples_32b
[docs] def play(self): """Play audio buffer via audio jack. Returns ------- None """ char_datp = self._ffi.from_buffer(self.buffer) uint_datp = self._ffi.cast('unsigned int*', char_datp) char_adrp = self._ffi.from_buffer(self.mmio.mem) uint_adrp = self._ffi.cast('unsigned int', char_adrp) self._libaudio._Pynq_play(self._uint_adrpv, uint_datp, len(self.buffer))
[docs] def bypass_start(self): """Stream audio controller input directly to output. Returns ------- None """ self.gpio.write(1) del gpio_pin
[docs] def bypass_stop(self): """Stop streaming input to output directly. Returns ------- None """ self.gpio.write(0) del gpio_pin
[docs] def save(self, file): """Save audio buffer content to a file. The recorded file is of format `*.pdm`. Note ---- The saved file will be put into the specified path, or in the working directory in case the path does not exist. Parameters ---------- file : string File name, with a default extension of `pdm`. Returns ------- None """ if self.buffer.dtype.type != numpy.int32: raise ValueError("Internal audio buffer should be of type int32.") if not isinstance(file, str) : raise ValueError("File name has to be a string.") if os.path.isdir(os.path.dirname(file)): file_abs = file else: file_abs = os.getcwd() + '/' + file with wave.open(file_abs, 'wb') as pdm_file: # Set the number of channels pdm_file.setnchannels(1) # Set the sample width to 2 bytes (16 bit) pdm_file.setsampwidth(2) # Set the frame rate to sample_rate pdm_file.setframerate(self.sample_rate) # Set the number of frames to sample_len pdm_file.setnframes(self.sample_len) # Set the compression type and description pdm_file.setcomptype('NONE', "not compressed") # Write data pdm_file.writeframes(self.buffer.astype(numpy.int16))
[docs] def load(self, file): """Loads file into internal audio buffer. The recorded file is of format `*.pdm`. Note ---- The file will be searched in the specified path, or in the working directory in case the path does not exist. Parameters ---------- file : string File name, with a default extension of `pdm`. Returns ------- None """ if not isinstance(file, str) : raise ValueError("File name has to be a string.") if os.path.isdir(os.path.dirname(file)): file_abs = file else: file_abs = os.getcwd() + '/' + file with wave.open(file_abs, 'rb') as pdm_file: temp_buffer = numpy.fromstring(pdm_file.readframes( pdm_file.getnframes()), dtype='<u2') self.sample_rate = pdm_file.getframerate() self.sample_len = pdm_file.getnframes() self.buffer = temp_buffer.astype(numpy.int32)
[docs] @staticmethod def info(file): """Prints information about pdm files. The information includes name, channels, samples, frames, etc. Note ---- The file will be searched in the specified path, or in the working directory in case the path does not exist. Parameters ---------- file : string File name, with a default extension of `pdm`. Returns ------- None """ if not isinstance(file, str) : raise ValueError("File name has to be a string.") if os.path.isdir(os.path.dirname(file)): file_abs = file else: file_abs = os.getcwd() + '/' + file with wave.open(file_abs, 'rb') as pdm_file: print("File name: " + file) print("Number of channels: " + str(pdm_file.getnchannels())) print("Sample width: " + str(pdm_file.getsampwidth())) print("Sample rate: " + str(pdm_file.getframerate())) print("Number of frames: " + str(pdm_file.getnframes())) print("Compression type: " + str(pdm_file.getcomptype())) print("Compression name: " + str(pdm_file.getcompname()))
[docs] @staticmethod def checkhierarchy(description): """Hierarchy can be bound if it contains the 'd_axi_pdm_1' IP and it contains exactly one GPIO pin. """ if ('d_axi_pdm_1' in description['ip'] and len(description['gpio']) == 1): return True return False