# Copyright (c) 2016-2020, Xilinx, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION). HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import numpy as np
from . import Arduino
from . import MAILBOX_OFFSET
from . import ARDUINO_NUM_ANALOG_PINS
__author__ = "Yun Rock Qu, Mario Ruiz"
__copyright__ = "Copyright 2016-2020, Xilinx"
__email__ = "pynq_support@xilinx.com"
ARDUINO_ANALOG_PROGRAM = "arduino_analog.bin"
ARDUINO_ANALOG_LOG_START = MAILBOX_OFFSET+16
ARDUINO_MAX_SAMPLES = 1018
CONFIG_IOP_SWITCH = 0x1
GET_RAW_DATA = 0x3
READ_AND_LOG_RAW = 0x7
RESET_ANALOG = 0xB
V_Conv = 3.33 / 65536
[docs]class Arduino_Analog(object):
"""This class controls the Arduino Analog.
XADC is an internal analog controller in the hardware. This class
provides API to do analog reads from IOP.
Attributes
----------
microblaze : Arduino
Microblaze processor instance used by this module.
log_running : int
The state of the log (0: stopped, 1: started).
log_interval_ms : int
Time in milliseconds between samples on the same channel.
gr_pin : list
A group of pins on arduino-grove shield.
num_channels : int
The number of channels sampled.
"""
def __init__(self, mb_info, gr_pin):
"""Return a new instance of an Arduino_Analog object.
Note
----
The parameter `gr_pin` is a list of analog pins enabled.
Parameters
----------
mb_info : dict
A dictionary storing Microblaze information, such as the
IP name and the reset name.
gr_pin: list
A group of pins on arduino-grove shield.
"""
for pin in gr_pin:
if pin not in range(ARDUINO_NUM_ANALOG_PINS):
raise ValueError("Analog pin number can only be 0 - {}."
.format(ARDUINO_NUM_ANALOG_PINS-1))
self.microblaze = Arduino(mb_info, ARDUINO_ANALOG_PROGRAM)
self.log_interval_ms = 1000
self.log_running = 0
self.gr_pin = gr_pin
self.num_channels = len(gr_pin)
# Calculate the offset address of the end of the log
self._samples_channel = ARDUINO_MAX_SAMPLES // self.num_channels
self._log_end = ARDUINO_ANALOG_LOG_START + 4 * self.num_channels * \
self._samples_channel
# Enable all the analog pins
data = [0 for _ in range(ARDUINO_NUM_ANALOG_PINS)]
self.microblaze.write_mailbox(0, data)
# Write configuration and wait for ACK
self.microblaze.write_blocking_command(CONFIG_IOP_SWITCH)
[docs] def read(self, out_format = 'voltage'):
"""Read the shared mailbox memory with the adc raw value
from the analog peripheral.
Parameters
----------
out_format : str
Selects the return type, either 'raw' or 'voltage'
Returns
-------
list
Either the 'raw' values or 'voltage' depending on out_format
"""
if out_format not in ['raw', 'voltage']:
raise ValueError("out_format can only be 'raw' or 'voltage'")
data_channels = 0
for channel in self.gr_pin:
data_channels |= (0x1 << channel)
cmd = (data_channels << 8) + GET_RAW_DATA
self.microblaze.write_blocking_command(cmd)
reading = np.asarray(\
self.microblaze.read_mailbox(0, self.num_channels))
if out_format == 'raw':
return reading
else:
return reading * V_Conv
[docs] def set_log_interval_ms(self, log_interval_ms):
"""Set the length of the log for the analog peripheral.
This method can set the time interval between two samples, so that
users can read out multiple values in a single log.
Parameters
----------
log_interval_ms : int
The time between two samples in milliseconds, for logging only.
Returns
-------
None
"""
if not isinstance(log_interval_ms, int):
raise ValueError("Time between samples should be integer.")
elif log_interval_ms < 0:
raise ValueError("Time between samples should be no less than 0.")
self.log_interval_ms = log_interval_ms
self.microblaze.write_mailbox(4, log_interval_ms)
[docs] def start_log(self):
"""Start recording multiple analog samples (raw) values in a log.
This method will first call set_log_interval_ms() before writing to
the MMIO.
Returns
-------
None
"""
self.log_running = 1
self.set_log_interval_ms(self.log_interval_ms)
data_channels = 0
for channel in self.gr_pin:
data_channels |= (0x1 << channel)
cmd = (data_channels << 8) + READ_AND_LOG_RAW
self.microblaze.write_non_blocking_command(cmd)
[docs] def stop_log(self):
"""Stop recording the raw values in the log.
Simply write 0xC to the MMIO to stop the log.
Returns
-------
None
"""
if self.log_running == 1:
self.microblaze.write_non_blocking_command(RESET_ANALOG)
self.log_running = 0
else:
raise RuntimeError("No analog log running.")
[docs] def get_log(self, out_format = 'voltage'):
"""Return list of logged raw samples.
Parameters
----------
out_format : str
Selects the return type, either 'raw' or 'voltage'
Returns
-------
Numpy Array
Numpy array of valid samples from the analog device,
either 'raw' or 'voltage'
"""
# Stop logging
self.stop_log()
if out_format not in ['raw', 'voltage']:
raise ValueError("out_format can only be 'raw' or 'voltage'")
# Prep iterators and results list
[head_ptr, tail_ptr] = self.microblaze.read_mailbox(0x8, 2)
readings = []
for _ in range(self.num_channels):
readings.append([])
# Sweep circular buffer for samples
if head_ptr == tail_ptr:
return None
elif head_ptr < tail_ptr:
for i in range(head_ptr, tail_ptr, 4*self.num_channels):
raw = np.atleast_1d(self.microblaze.read(i, self.num_channels))
for j in range(self.num_channels):
readings[j].append(raw[j])
else:
for i in range(head_ptr, self._log_end, 4*self.num_channels):
raw = np.atleast_1d(self.microblaze.read(i, self.num_channels))
for j in range(self.num_channels):
readings[j].append(raw[j])
for i in range(ARDUINO_ANALOG_LOG_START, tail_ptr,
4*self.num_channels):
raw = np.atleast_1d(self.microblaze.read(i, self.num_channels))
for j in range(self.num_channels):
readings[j].append(raw[j])
readings_arr = np.asarray(readings)
if out_format == 'raw':
return readings_arr
else:
return readings_arr * V_Conv
[docs] def reset(self):
"""Resets the system monitor for analog devices.
Returns
-------
None
"""
self.microblaze.write_blocking_command(RESET_ANALOG)
[docs] def read_raw(self):
"""Read the analog raw value from the analog peripheral.
Returns
-------
list
The raw values from the analog device.
"""
return self.read('raw')
[docs] def get_log_raw(self):
"""Return list of logged raw samples.
Returns
-------
list
List of valid raw samples from the analog device.
"""
return self.get_log('raw')
[docs] def stop_log_raw(self):
"""Stop recording the raw values in the log.
Simply write 0xC to the MMIO to stop the log.
Returns
-------
None
"""
self.stop_log()
[docs] def start_log_raw(self):
"""Start recording raw data in a log.
This method will first call set_log_interval_ms() before writing to
the MMIO.
Returns
-------
None
"""
self.start_log()