# Copyright (c) 2016, Xilinx, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION). HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import cffi
import glob
import os
import threading
import time
import warnings
__author__ = "Peter Ogden"
__copyright__ = "Copyright 2018, Xilinx"
__email__ = "pynq_support@xilinx.com"
_c_header = R"""
extern const char *libsensors_version;
typedef struct sensors_bus_id {
short type;
short nr;
} sensors_bus_id;
typedef struct sensors_chip_name {
char *prefix;
sensors_bus_id bus;
int addr;
char *path;
} sensors_chip_name;
int sensors_init(FILE *input);
void sensors_cleanup(void);
int sensors_parse_chip_name(const char *orig_name, sensors_chip_name *res);
void sensors_free_chip_name(sensors_chip_name *chip);
int sensors_snprintf_chip_name(char *str, size_t size,
const sensors_chip_name *chip);
const char *sensors_get_adapter_name(const sensors_bus_id *bus);
typedef struct sensors_feature sensors_feature;
char *sensors_get_label(const sensors_chip_name *name,
const sensors_feature *feature);
int sensors_get_value(const sensors_chip_name *name, int subfeat_nr,
double *value);
int sensors_set_value(const sensors_chip_name *name, int subfeat_nr,
double value);
int sensors_do_chip_sets(const sensors_chip_name *name);
const sensors_chip_name *sensors_get_detected_chips(const sensors_chip_name
*match, int *nr);
typedef enum sensors_feature_type {
SENSORS_FEATURE_IN = 0x00,
SENSORS_FEATURE_FAN = 0x01,
SENSORS_FEATURE_TEMP = 0x02,
SENSORS_FEATURE_POWER = 0x03,
SENSORS_FEATURE_ENERGY = 0x04,
SENSORS_FEATURE_CURR = 0x05,
SENSORS_FEATURE_HUMIDITY = 0x06,
SENSORS_FEATURE_MAX_MAIN,
SENSORS_FEATURE_VID = 0x10,
SENSORS_FEATURE_INTRUSION = 0x11,
SENSORS_FEATURE_MAX_OTHER,
SENSORS_FEATURE_BEEP_ENABLE = 0x18,
SENSORS_FEATURE_MAX,
SENSORS_FEATURE_UNKNOWN = 0x7fffffff,
} sensors_feature_type;
typedef enum sensors_subfeature_type {
SENSORS_SUBFEATURE_IN_INPUT = 0,
SENSORS_SUBFEATURE_IN_MIN,
SENSORS_SUBFEATURE_IN_MAX,
SENSORS_SUBFEATURE_IN_LCRIT,
SENSORS_SUBFEATURE_IN_CRIT,
SENSORS_SUBFEATURE_IN_AVERAGE,
SENSORS_SUBFEATURE_IN_LOWEST,
SENSORS_SUBFEATURE_IN_HIGHEST,
SENSORS_SUBFEATURE_IN_ALARM = 0x80,
SENSORS_SUBFEATURE_IN_MIN_ALARM,
SENSORS_SUBFEATURE_IN_MAX_ALARM,
SENSORS_SUBFEATURE_IN_BEEP,
SENSORS_SUBFEATURE_IN_LCRIT_ALARM,
SENSORS_SUBFEATURE_IN_CRIT_ALARM,
SENSORS_SUBFEATURE_FAN_INPUT = 0x100,
SENSORS_SUBFEATURE_FAN_MIN,
SENSORS_SUBFEATURE_FAN_MAX,
SENSORS_SUBFEATURE_FAN_ALARM = 0x180,
SENSORS_SUBFEATURE_FAN_FAULT,
SENSORS_SUBFEATURE_FAN_DIV,
SENSORS_SUBFEATURE_FAN_BEEP,
SENSORS_SUBFEATURE_FAN_PULSES,
SENSORS_SUBFEATURE_FAN_MIN_ALARM,
SENSORS_SUBFEATURE_FAN_MAX_ALARM,
SENSORS_SUBFEATURE_TEMP_INPUT = 0x200,
SENSORS_SUBFEATURE_TEMP_MAX,
SENSORS_SUBFEATURE_TEMP_MAX_HYST,
SENSORS_SUBFEATURE_TEMP_MIN,
SENSORS_SUBFEATURE_TEMP_CRIT,
SENSORS_SUBFEATURE_TEMP_CRIT_HYST,
SENSORS_SUBFEATURE_TEMP_LCRIT,
SENSORS_SUBFEATURE_TEMP_EMERGENCY,
SENSORS_SUBFEATURE_TEMP_EMERGENCY_HYST,
SENSORS_SUBFEATURE_TEMP_LOWEST,
SENSORS_SUBFEATURE_TEMP_HIGHEST,
SENSORS_SUBFEATURE_TEMP_MIN_HYST,
SENSORS_SUBFEATURE_TEMP_LCRIT_HYST,
SENSORS_SUBFEATURE_TEMP_ALARM = 0x280,
SENSORS_SUBFEATURE_TEMP_MAX_ALARM,
SENSORS_SUBFEATURE_TEMP_MIN_ALARM,
SENSORS_SUBFEATURE_TEMP_CRIT_ALARM,
SENSORS_SUBFEATURE_TEMP_FAULT,
SENSORS_SUBFEATURE_TEMP_TYPE,
SENSORS_SUBFEATURE_TEMP_OFFSET,
SENSORS_SUBFEATURE_TEMP_BEEP,
SENSORS_SUBFEATURE_TEMP_EMERGENCY_ALARM,
SENSORS_SUBFEATURE_TEMP_LCRIT_ALARM,
SENSORS_SUBFEATURE_POWER_AVERAGE = 0x300,
SENSORS_SUBFEATURE_POWER_AVERAGE_HIGHEST,
SENSORS_SUBFEATURE_POWER_AVERAGE_LOWEST,
SENSORS_SUBFEATURE_POWER_INPUT,
SENSORS_SUBFEATURE_POWER_INPUT_HIGHEST,
SENSORS_SUBFEATURE_POWER_INPUT_LOWEST,
SENSORS_SUBFEATURE_POWER_CAP,
SENSORS_SUBFEATURE_POWER_CAP_HYST,
SENSORS_SUBFEATURE_POWER_MAX,
SENSORS_SUBFEATURE_POWER_CRIT,
SENSORS_SUBFEATURE_POWER_AVERAGE_INTERVAL = 0x380,
SENSORS_SUBFEATURE_POWER_ALARM,
SENSORS_SUBFEATURE_POWER_CAP_ALARM,
SENSORS_SUBFEATURE_POWER_MAX_ALARM,
SENSORS_SUBFEATURE_POWER_CRIT_ALARM,
SENSORS_SUBFEATURE_ENERGY_INPUT = 0x400,
SENSORS_SUBFEATURE_CURR_INPUT = 0x500,
SENSORS_SUBFEATURE_CURR_MIN,
SENSORS_SUBFEATURE_CURR_MAX,
SENSORS_SUBFEATURE_CURR_LCRIT,
SENSORS_SUBFEATURE_CURR_CRIT,
SENSORS_SUBFEATURE_CURR_AVERAGE,
SENSORS_SUBFEATURE_CURR_LOWEST,
SENSORS_SUBFEATURE_CURR_HIGHEST,
SENSORS_SUBFEATURE_CURR_ALARM = 0x580,
SENSORS_SUBFEATURE_CURR_MIN_ALARM,
SENSORS_SUBFEATURE_CURR_MAX_ALARM,
SENSORS_SUBFEATURE_CURR_BEEP,
SENSORS_SUBFEATURE_CURR_LCRIT_ALARM,
SENSORS_SUBFEATURE_CURR_CRIT_ALARM,
SENSORS_SUBFEATURE_HUMIDITY_INPUT = 0x600,
SENSORS_SUBFEATURE_VID = 0x1000,
SENSORS_SUBFEATURE_INTRUSION_ALARM = 0x1100,
SENSORS_SUBFEATURE_INTRUSION_BEEP,
SENSORS_SUBFEATURE_BEEP_ENABLE = 0x1800,
SENSORS_SUBFEATURE_UNKNOWN = 0x7fffffff,
} sensors_subfeature_type;
struct sensors_feature {
char *name;
int number;
sensors_feature_type type;
int first_subfeature;
int padding1;
};
typedef struct sensors_subfeature {
char *name;
int number;
sensors_subfeature_type type;
int mapping;
unsigned int flags;
} sensors_subfeature;
const sensors_feature *
sensors_get_features(const sensors_chip_name *name, int *nr);
const sensors_subfeature *
sensors_get_all_subfeatures(const sensors_chip_name *name,
const sensors_feature *feature, int *nr);
const sensors_subfeature *
sensors_get_subfeature(const sensors_chip_name *name,
const sensors_feature *feature,
sensors_subfeature_type type);
"""
_ffi = cffi.FFI()
try:
_ffi.cdef(_c_header)
_lib = _ffi.dlopen("libsensors.so.4")
except Exception as e:
_lib = None
[docs]class SysFSSensor:
def __init__(self, path, unit, name, scale):
self._path = path
self._unit = unit
self.name = name
self._scale = scale
self.parents = tuple()
@property
def value(self):
with open(self._path, "r") as f:
raw_value = float(f.read())
return raw_value * self._scale
[docs] def get_value(self, parents=None):
return self.value
def __repr__(self):
return "Sensor {{name={}, value={}{}}}".format(
self.name, self.value, self._unit)
[docs]class DerivedPowerSensor:
def __init__(self, name, voltage, current):
parents = (voltage, current)
self.voltage_sensor = voltage
self.current_sensor = current
self.name = name
self.parents = (voltage, current)
[docs] def get_value(self, parents=None):
if parents is None:
return self.voltage_sensor.value * self.current_sensor.value
else:
return parents[0] * parents[1]
@property
def value(self):
return self.get_value()
def __repr__(self):
return "Sensor {{name={}, value={}W}}".format(
self.name, self.value)
[docs]class Sensor:
"""Interacts with a sensor exposed by libsensors
The value of the sensor is determined by the unit of the
underlying sensor API - that is generally Volts for potential
difference, Amperes for current, Watts for power and degrees
Centigrade for temperature
Attributes
----------
name : str
The name of the sensor
value : float
The current value of the sensor
"""
def __init__(self, chip, number, unit, name):
"""Create a new sensor object wrapping a libsensors chip and feature
Parameters
----------
chip : FFI sensors_chip_name*
The chip the sensor is on
number : int
The number of sensor on the chip
unit : str
Unit to append to the value when creating a string representation
name : str
Name of the sensor
"""
self._chip = chip
self._number = number
self._value = _ffi.new("double [1]")
self._unit = unit
self.name = name
self.parents = tuple()
@property
def value(self):
"""Read the current value of the sensor
"""
if _lib:
_lib.sensors_get_value(self._chip, self._number, self._value)
return self._value[0]
else:
return 0
[docs] def get_value(self, parents=None):
return self.value
def __repr__(self):
return "Sensor {{name={}, value={}{}}}".format(
self.name, self.value, self._unit)
[docs]class Rail:
"""Bundles up to three sensors monitoring the same power rail
Represents a power rail in the system monitored by up to three
sensors for voltage, current and power.
Attributes
----------
name : str
Name of the power rail
voltage : Sensor or None
Voltage sensor for the rail or None if not available
current : Sensor or None
Current sensor for the rail or None if not available
power : Sensor or None
Power sensor for the rail or None if not available
"""
def __init__(self, name):
"""Create a new Rail with the specified rail
"""
self.name = name
self.voltage = None
self.current = None
self.power = None
def __repr__(self):
args = ["name=" + self.name]
if self.voltage:
args.append("voltage=" + repr(self.voltage))
if self.current:
args.append("current=" + repr(self.current))
if self.power:
args.append("power=" + repr(self.power))
return "Rail {{{}}}".format(', '.join(args))
[docs]class XrtInfoDump:
def __init__(self, device):
self._device = device
self.parents = tuple()
[docs] def get_value(self, parents=None):
info = self._device.device_info
return {
"0v85_v": info.m0v85,
"12v_aux_v": info.m12VAux,
"12v_aux_i": info.mAuxCurr,
"12v_pex_v": info.m12VPex,
"12v_pex_i": info.mPexCurr,
"12v_sw_v": info.m12vSW,
"1v8_v": info.m1v8Top,
"3v3_aux_v": info.m3v3Aux,
"3v3_pex_v": info.m3v3Pex,
"mgt0v9avcc_v": info.mMgt0v9,
"mgtavtt_v": info.mMgtVtt,
"sys_5v5_v": info.mSys5v5,
"vccint_v": info.mVccIntVol,
"vccint_i": info.mCurrent
}
[docs]class XrtSensor:
def __init__(self, unit, name, scale, parent, field):
self.parents = (parent,)
self._unit = unit
self.name = name
self._scale = scale
self._field = field
[docs] def get_value(self, parents=None):
if parents is None:
parents = (self.parents[0].get_value(),)
return parents[0][self._field] * self._scale
@property
def value(self):
return self.get_value()
def __repr__(self):
return "Sensor {{name={}, value={}{}}}".format(
self.name, self.value, self._unit)
[docs]class XrtRail:
def __init__(self, name, sample_dict, parent):
self.name = name
if name + "_v" in sample_dict:
self.voltage = XrtSensor("V", name + "_vol", 0.001, parent, name + "_v")
else:
self.voltage = None
if name + "_i" in sample_dict:
self.current = XrtSensor("A", name + "_curr", 0.001, parent, name + "_i")
else:
self.current = None
if self.voltage and self.current:
self.power = DerivedPowerSensor(name + "_power",
self.voltage, self.current)
else:
self.power = None
def __repr__(self):
args = ["name=" + self.name]
if self.voltage:
args.append("voltage=" + repr(self.voltage))
if self.current:
args.append("current=" + repr(self.current))
if self.power:
args.append("power=" + repr(self.power))
return "XrtRail {{{}}}".format(', '.join(args))
[docs]def get_xrt_sysfs_rails(device=None):
if device is None:
from pynq.pl_server import Device
device = Device.active_device
rail_names = ["0v85", "12v_aux", "12v_pex", "12v_sw", "1v8", "3v3_aux",
"3v3_pex", "mgt0v9avcc", "mgtavtt", "sys_5v5", "vccint" ]
infodump = XrtInfoDump(device)
sample_dict = infodump.get_value()
rails = {}
for n in rail_names:
rails[n] = XrtRail(n, sample_dict, infodump)
return rails
def _enumerate_sensors(config_file=None):
if _lib is None:
warnings.warn("Could not initialise libsensors library")
return {}
if config_file:
with open(config_file, 'r') as handle:
_lib.sensors_init(handle);
else:
_lib.sensors_init(_ffi.NULL)
chip_nr = _ffi.new("int [1]")
feature_nr = _ffi.new("int [1]")
rails = {}
chip_nr[0] = 0
cn = _lib.sensors_get_detected_chips(_ffi.NULL, chip_nr)
while cn:
feature_nr[0] = 0
feature = _lib.sensors_get_features(cn, feature_nr)
while feature:
name = _ffi.string(_lib.sensors_get_label(cn, feature)).decode()
subfeature = None
if feature.type == _lib.SENSORS_FEATURE_POWER:
subfeature = _lib.sensors_get_subfeature(
cn, feature, _lib.SENSORS_SUBFEATURE_POWER_INPUT)
feature_type = "power"
unit = "W"
elif feature.type == _lib.SENSORS_FEATURE_IN:
subfeature = _lib.sensors_get_subfeature(
cn, feature, _lib.SENSORS_SUBFEATURE_IN_INPUT)
feature_type = "voltage"
unit = "V"
elif feature.type == _lib.SENSORS_FEATURE_CURR:
subfeature = _lib.sensors_get_subfeature(
cn, feature, _lib.SENSORS_SUBFEATURE_CURR_INPUT)
feature_type = "current"
unit = "A"
if subfeature:
if name not in rails:
rails[name] = Rail(name)
setattr(rails[name], feature_type,
Sensor(cn, subfeature.number, unit, "{}_{}".format(
name, feature_type)))
feature = _lib.sensors_get_features(cn, feature_nr)
cn = _lib.sensors_get_detected_chips(_ffi.NULL, chip_nr)
return rails
[docs]def get_rails(config_file=None):
"""Returns a dictionary of power rails
Parameters
----------
config_file : str
Path to a configuration file for libsensors to use
in place of the the system-wide default
Returns
-------
dict {str : Rail}
Dictionary of power rails with the name of the rail as
the key and a Rail object as the value
"""
return _enumerate_sensors(config_file)
[docs]class MultiSensor:
"""Class for efficiently collecting the readings from multiple sensors
"""
def __init__(self, sensors):
self._sensors = sensors
[docs] def get_values(self):
stored = {}
return tuple((self._get_value(s, stored) for s in self._sensors))
def _get_value(self, sensor, stored):
if sensor in stored:
return stored[sensor]
value = sensor.get_value([self._get_value(p, stored) for p in sensor.parents])
stored[sensor] = value
return value
[docs]class DataRecorder:
"""Class to record sensors during an execution
The DataRecorder provides a way of recording sensor data using a
`with` block.
"""
def __init__(self, *sensors):
"""Create a new DataRecorder attached to the specified sensors
"""
import pandas as pd
self._record_index = -1
self._sensors = sensors
self._getter = MultiSensor(sensors)
self._columns = ['Mark']
self._times = []
self._columns.extend([s.name for s in sensors])
self._frame = pd.DataFrame(columns=self._columns,
index = pd.DatetimeIndex([]),
dtype='f4')
self._callbacks = []
self._data = []
self._thread = None
def __del__(self):
if self._thread:
self.stop()
[docs] def reset(self):
"""Clear the internal state of the data recorder without
forgetting which sensors to record
"""
self._frame.drop(self._frame.index, inplace=True)
self._record_index = -1
[docs] def record(self, interval):
"""Start recording
"""
if self._thread:
raise RuntimeError("DataRecorder is already recording")
self._thread = threading.Thread(
target=DataRecorder._thread_func, args=[self])
self._interval = interval
self._done = False
self._record_index += 1
self._thread.start()
return self
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.stop()
return
[docs] def stop(self):
"""Stops recording
"""
self._done = True
self._thread.join()
self._thread = None
[docs] def mark(self):
"""Increment the Invocation count
"""
self._record_index += 1
return self._record_index
def _thread_func(self):
import pandas as pd
while not self._done:
row = [self._record_index]
row.extend(self._getter.get_values())
self._frame.loc[pd.Timestamp.now()] = row
time.sleep(self._interval)
@property
def frame(self):
"""Return a pandas DataFrame of the recorded data
The frame consists of the following fields
Index : The timestamp of the measurement
Mark : counts the number of times that record or mark was called
Sensors* : one column per sensor
"""
return self._frame