# Copyright (c) 2016, Xilinx, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION). HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import os
import struct
import math
import numpy
import cffi
import wave
import time
from pynq import PL
from pynq import GPIO
from pynq import DefaultIP
__author__ = "Benedikt Janssen, Yun Rock Qu"
__copyright__ = "Copyright 2016, Xilinx"
__email__ = "pynq_support@xilinx.com"
LIB_SEARCH_PATH = os.path.dirname(os.path.realpath(__file__))
[docs]class AudioDirect(DefaultIP):
"""Class to interact with audio controller.
Each audio sample is a 32-bit integer. The audio controller supports only
mono mode, and uses pulse density modulation (PDM).
Attributes
----------
mmio : MMIO
The MMIO object associated with the audio controller.
gpio : GPIO
The GPIO object associated with the audio controller.
buffer : numpy.ndarray
The numpy array to store the audio.
sample_rate: int
Sample rate of the current buffer content.
sample_len: int
Sample length of the current buffer content.
"""
def __init__(self, description, gpio_name=None):
"""Return a new Audio object based on the hierarchy description.
Parameters
----------
description : dict
The hierarchical description of the hierarchy
gpio_name : str
The name of the audio path selection GPIO. If None then the GPIO
pin in the hierarchy is used, otherwise the gpio_name is searched
in the list of pins on the hierarchy and the PL.gpio_dict.
"""
super().__init__(description)
if gpio_name is None:
if len(self._gpio) == 0:
raise RuntimeError('Could not find audio path select GPIO.')
elif len(self._gpio) > 1:
raise RuntimeError('Multiple possible audio path select GPIO.')
pin_name = next(iter(self._gpio.keys()))
self.gpio = getattr(self, pin_name)
else:
if gpio_name in self._gpio:
self.gpio = getattr(self, gpio_name)
elif gpio_name in PL.gpio_dict:
pin = GPIO.get_gpio_pin(PL.gpio_dict[gpio_name]['index'])
self.gpio = GPIO(pin, 'out')
else:
raise RuntimeError('Provided gpio_name not found.')
self._ffi = cffi.FFI()
self._libaudio = self._ffi.dlopen(LIB_SEARCH_PATH + "/libaudio.so")
self._ffi.cdef("""unsigned int Xil_Out32(unsigned int Addr,
unsigned int Value);""")
self._ffi.cdef("""unsigned int Xil_In32(unsigned int Addr);""")
self._ffi.cdef("""void record(unsigned int BaseAddr,
unsigned int * BufAddr,
unsigned int Num_Samles_32Bit);""")
self._ffi.cdef("""void play(unsigned int BaseAddr,
unsigned int * BufAddr,
unsigned int Num_Samles_32Bit);""")
char_adrp = self._ffi.from_buffer(self.mmio.mem)
self._uint_adrpv = self._ffi.cast('unsigned int', char_adrp)
self.buffer = numpy.zeros(0).astype(numpy.int)
self.sample_rate = 0
self.sample_len = 0
bindto = ['xilinx.com:user:audio_direct:1.1']
[docs] def record(self, seconds):
"""Record data from audio controller to audio buffer.
The sample rate per word is 192000Hz.
Parameters
----------
seconds : float
The number of seconds to be recorded.
Returns
-------
None
"""
if not 0 < seconds <= 60:
raise ValueError("Recording time has to be in (0,60].")
num_samples_32b = math.ceil(seconds * 192000)
# Create data buffer
self.buffer = numpy.zeros(num_samples_32b, dtype=numpy.int)
char_datp = self._ffi.from_buffer(self.buffer)
uint_datp = self._ffi.cast('unsigned int*', char_datp)
# Record
start = time.time()
self._libaudio.record(self._uint_adrpv, uint_datp, num_samples_32b)
end = time.time()
self.sample_rate = num_samples_32b / (end - start)
self.sample_len = num_samples_32b
[docs] def play(self):
"""Play audio buffer via audio jack.
Returns
-------
None
"""
char_datp = self._ffi.from_buffer(self.buffer)
uint_datp = self._ffi.cast('unsigned int*', char_datp)
self._libaudio.play(self._uint_adrpv, uint_datp, len(self.buffer))
[docs] def bypass_start(self):
"""Stream audio controller input directly to output.
Returns
-------
None
"""
self.gpio.write(1)
[docs] def bypass_stop(self):
"""Stop streaming input to output directly.
Returns
-------
None
"""
self.gpio.write(0)
[docs] def save(self, file):
"""Save audio buffer content to a file.
The recorded file is of format `*.pdm`.
Note
----
The saved file will be put into the specified path, or in the
working directory in case the path does not exist.
Parameters
----------
file : string
File name, with a default extension of `pdm`.
Returns
-------
None
"""
if self.buffer.dtype.type != numpy.int32:
raise ValueError("Internal audio buffer should be of type int32.")
if not isinstance(file, str):
raise ValueError("File name has to be a string.")
if os.path.isdir(os.path.dirname(file)):
file_abs = file
else:
file_abs = os.getcwd() + '/' + file
with wave.open(file_abs, 'wb') as pdm_file:
# Set the number of channels
pdm_file.setnchannels(1)
# Set the sample width to 2 bytes (16 bit)
pdm_file.setsampwidth(2)
# Set the frame rate to sample_rate
pdm_file.setframerate(self.sample_rate)
# Set the number of frames to sample_len
pdm_file.setnframes(self.sample_len)
# Set the compression type and description
pdm_file.setcomptype('NONE', "not compressed")
# Write data
pdm_file.writeframes(self.buffer.astype(numpy.int16))
[docs] def load(self, file):
"""Loads file into internal audio buffer.
The recorded file is of format `*.pdm`.
Note
----
The file will be searched in the specified path, or in the
working directory in case the path does not exist.
Parameters
----------
file : string
File name, with a default extension of `pdm`.
Returns
-------
None
"""
if not isinstance(file, str):
raise ValueError("File name has to be a string.")
if os.path.isdir(os.path.dirname(file)):
file_abs = file
else:
file_abs = os.getcwd() + '/' + file
with wave.open(file_abs, 'rb') as pdm_file:
temp_buffer = numpy.fromstring(pdm_file.readframes(
pdm_file.getnframes()), dtype='<u2')
self.sample_rate = pdm_file.getframerate()
self.sample_len = pdm_file.getnframes()
self.buffer = temp_buffer.astype(numpy.int32)
[docs] @staticmethod
def info(file):
"""Prints information about the sound files.
The information includes name, channels, samples, frames, etc.
Note
----
The file will be searched in the specified path, or in the
working directory in case the path does not exist.
Parameters
----------
file : string
File name, with a default extension of `pdm`.
Returns
-------
None
"""
if not isinstance(file, str):
raise ValueError("File name has to be a string.")
if os.path.isdir(os.path.dirname(file)):
file_abs = file
else:
file_abs = os.getcwd() + '/' + file
with wave.open(file_abs, 'rb') as sound_file:
print("File name: " + file)
print("Number of channels: " + str(sound_file.getnchannels()))
print("Sample width: " + str(sound_file.getsampwidth()))
print("Sample rate: " + str(sound_file.getframerate()))
print("Number of frames: " + str(sound_file.getnframes()))
print("Compression type: " + str(sound_file.getcomptype()))
print("Compression name: " + str(sound_file.getcompname()))
[docs]class AudioADAU1761(DefaultIP):
"""Class to interact with audio codec controller.
Each raw audio sample is a 24 bits, padded to 32 bits.
The audio controller supports both mono and stereo modes, and I2S format
of data.
Attributes
----------
buffer : numpy.ndarray
The numpy array to store the audio.
sample_rate: int
Sample rate of the codec.
sample_len: int
Sample length of the current buffer content.
iic_index : int
The index of the IIC instance in /dev.
uio_index : int
The index of the UIO instance in /dev.
"""
def __init__(self, description):
"""Return a new Audio object based on the hierarchy description.
Parameters
----------
description : dict
The hierarchical description of the hierarchy
"""
super().__init__(description)
self._ffi = cffi.FFI()
self._libaudio = self._ffi.dlopen(LIB_SEARCH_PATH + "/libaudio.so")
self._ffi.cdef("""void config_audio_pll(int iic_index);""")
self._ffi.cdef("""void config_audio_codec(int iic_index);""")
self._ffi.cdef("""void select_line_in(int iic_index);""")
self._ffi.cdef("""void select_mic(int iic_index);""")
self._ffi.cdef("""void deselect(int iic_index);""")
self._ffi.cdef("""void bypass(unsigned int audio_mmap_size,
unsigned int nsamples,
int uio_index, int iic_index) ;""")
self._ffi.cdef("""void record(unsigned int audio_mmap_size,
unsigned int BufAddr, unsigned int nsamples,
int uio_index, int iic_index);""")
self._ffi.cdef("""void play(unsigned int audio_mmap_size,
unsigned int BufAddr, unsigned int nsamples,
int uio_index, int iic_index);""")
self.buffer = numpy.zeros(0).astype(numpy.int32)
self.sample_rate = None
self.sample_len = len(self.buffer)
self.iic_index = None
self.uio_index = None
self.configure()
bindto = ['xilinx.com:user:audio_codec_ctrl:1.0']
[docs] def select_line_in(self):
"""Select LINE_IN on the board.
This method will select the LINE_IN as the input.
"""
self._libaudio.select_line_in(self.iic_index)
[docs] def select_microphone(self):
"""Select MIC on the board.
This method will select the MIC as the input.
"""
self._libaudio.select_mic(self.iic_index)
[docs] def record(self, seconds):
"""Record data from audio controller to audio buffer.
The sample rate for both channels is 48000Hz. Note that the
`sample_len` will only be changed when the buffer is modified.
Since both channels are sampled, the buffer size has to be twice
the sample length.
Parameters
----------
seconds : float
The number of seconds to be recorded.
Returns
-------
None
"""
if not 0 < seconds <= 60:
raise ValueError("Recording time has to be in (0,60].")
self.sample_len = math.ceil(seconds * self.sample_rate)
num_samples_32b = self.sample_len * 2
# Create data buffer
self.buffer = numpy.zeros(num_samples_32b, dtype=numpy.int32)
char_buffer = self._ffi.from_buffer(self.buffer)
uint_buffer = self._ffi.cast('unsigned int', char_buffer)
# Record
self._libaudio.record(self.mmio.length, uint_buffer,
self.sample_len, self.uio_index, self.iic_index)
[docs] def play(self):
"""Play audio buffer via audio jack.
Since both channels are sampled, the buffer size has to be twice
the sample length.
Returns
-------
None
"""
char_buffer = self._ffi.from_buffer(self.buffer)
uint_buffer = self._ffi.cast('unsigned int', char_buffer)
self._libaudio.play(self.mmio.length, uint_buffer,
self.sample_len, self.uio_index, self.iic_index)
[docs] def bypass(self, seconds):
"""Stream audio controller input directly to output.
It will run for a certain number of seconds, then stop automatically.
Parameters
----------
seconds : float
The number of seconds to be recorded.
Returns
-------
None
"""
if not 0 < seconds <= 60:
raise ValueError("Bypassing time has to be in (0,60].")
self.sample_len = math.ceil(seconds * self.sample_rate)
self._libaudio.bypass(self.mmio.length,
self.sample_len, self.uio_index, self.iic_index)
[docs] def save(self, file):
"""Save audio buffer content to a file.
The recorded file is of format `*.wav`. Note that only 24 bits out
of each 32-bit sample are the real samples; the highest 8 bits are
padding, which should be removed when writing the wave file.
Note
----
The saved file will be put into the specified path, or in the
working directory in case the path does not exist.
Parameters
----------
file : string
File name, with a default extension of `wav`.
Returns
-------
None
"""
if self.buffer.dtype.type != numpy.int32:
raise ValueError("Internal audio buffer should be of type int32.")
if not isinstance(file, str):
raise ValueError("File name has to be a string.")
if os.path.isdir(os.path.dirname(file)):
file_abs = file
else:
file_abs = os.getcwd() + '/' + file
samples_4byte = self.buffer.tobytes()
byte_format = ('%ds %dx ' % (3, 1)) * self.sample_len * 2
samples_3byte = b''.join(struct.unpack(byte_format, samples_4byte))
with wave.open(file_abs, 'wb') as wav_file:
# Set the number of channels
wav_file.setnchannels(2)
# Set the sample width to 3 bytes
wav_file.setsampwidth(3)
# Set the frame rate to sample_rate
wav_file.setframerate(self.sample_rate)
# Set the number of frames to sample_len
wav_file.setnframes(self.sample_len)
# Set the compression type and description
wav_file.setcomptype('NONE', "not compressed")
# Write data
wav_file.writeframes(samples_3byte)
[docs] def load(self, file):
"""Loads file into internal audio buffer.
The recorded file is of format `*.wav`. Note that we expect 32-bit
samples in the buffer while the each saved sample is only 24 bits.
Hence we need to pad the highest 8 bits when reading the wave file.
Note
----
The file will be searched in the specified path, or in the
working directory in case the path does not exist.
Parameters
----------
file : string
File name, with a default extension of `wav`.
Returns
-------
None
"""
if not isinstance(file, str):
raise ValueError("File name has to be a string.")
if os.path.isdir(os.path.dirname(file)):
file_abs = file
else:
file_abs = os.getcwd() + '/' + file
with wave.open(file_abs, 'rb') as wav_file:
samples_3byte = wav_file.readframes(wav_file.getnframes())
self.sample_rate = wav_file.getframerate()
self.sample_len = wav_file.getnframes()
byte_format = ('%ds' % 3) * self.sample_len * 2
samples_4byte = b'\x00'.join(struct.unpack(byte_format, samples_3byte))
samples_4byte += b'\x00'
self.buffer = numpy.fromstring(samples_4byte, dtype='<u4')
[docs] @staticmethod
def info(file):
"""Prints information about the sound files.
The information includes name, channels, samples, frames, etc.
Note
----
The file will be searched in the specified path, or in the
working directory in case the path does not exist.
Parameters
----------
file : string
File name, with a default extension of `wav`.
Returns
-------
None
"""
if not isinstance(file, str):
raise ValueError("File name has to be a string.")
if os.path.isdir(os.path.dirname(file)):
file_abs = file
else:
file_abs = os.getcwd() + '/' + file
with wave.open(file_abs, 'rb') as sound_file:
print("File name: " + file)
print("Number of channels: " + str(sound_file.getnchannels()))
print("Sample width: " + str(sound_file.getsampwidth()))
print("Sample rate: " + str(sound_file.getframerate()))
print("Number of frames: " + str(sound_file.getnframes()))
print("Compression type: " + str(sound_file.getcomptype()))
print("Compression name: " + str(sound_file.getcompname()))