#!/usr/bin/python3 """ =head1 NAME ipmi_sensor_ - Wildcard plugin for the sensors data provided by ipmi =head1 AUTHOR Copyright (c) 2006 Logilab Inspired by code written by Peter Palfrader =head1 CONFIGURATION ipmitool probably needs to be run as root, and it may take more than 10 seconds on some hosts. Add the following to your /etc/munin/munin-node: [ipmi_sensor_*] user root timeout 20 This plugin does not use environment variables (see section BUGS) =head1 LICENSE GNU GPLv2 or any later version =begin comment This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. =end comment =head1 BUGS Plugin reads /etc/munin/ipmi directly, instead of reading environment variables. =head1 MAGIC MARKERS #%# capabilities=autoconf suggest #%# family=contrib =cut """ from subprocess import Popen, PIPE from os import stat, access, R_OK, F_OK, environ from os.path import join from stat import ST_MTIME from time import time import sys import re CACHEDIR = environ['MUNIN_PLUGSTATE'] CACHEFILE = "plugin-ipmi_sensor.cache" CACHEAGE = 120 CONFIG = '/etc/munin/ipmi' def normalize_sensor(name): name = name.lower().replace("-", "M").replace("+", "P") name = re.sub("[^a-z0-9A-Z]", "_", name) return name def parse_data(data): """ Parse the data returned by ipmitool get which should be of the following form: Sensor ID : FAN 1 RPM (0x30) Entity ID : 7.1 Sensor Type (Analog) : Fan Sensor Reading : 6150 (+/- 75) RPM Status : ok Lower Non-Recoverable : na Lower Critical : 2025.000 Lower Non-Critical : na Upper Non-Critical : na Upper Critical : na Upper Non-Recoverable : na Assertion Events : Assertions Enabled : lcr- Deassertions Enabled : lcr- When a sensor reading is unavailable, ipmitool outputs: Sensor Reading : Unable to read sensor: Device Not Present """ sensors = {} cur_sensor = None for line in data.splitlines()[1:]: if not line.strip(): cur_sensor = None continue if line.startswith("Sensor ID"): label, data = line.split(":", 1) idm = re.match(r"(.*) \((0x.*)\)", data) if not idm: continue identifier = idm.group(1).strip() cur_sensor = {"id": idm.group(2)} sensors[identifier] = cur_sensor if not cur_sensor: continue if ":" in line: label, data = line.split(":", 1) cur_sensor[label.strip().lower()] = data.strip() return sensors def get_sensor_names(): try: p = Popen(["ipmitool", "-I", "open", "sensor"], shell=False, stdout=PIPE) except OSError: return {} data = p.stdout.read().decode().splitlines() units = {} for k, u in UNITS_TO_SENSORS.items(): units[u['vlabel'].lower()] = k sensors = {} for line in data: columns = [s.strip() for s in line.split('|')] key = units.get(columns[2].lower(), None) if key: lst = sensors.setdefault(key, []) lst.append(columns[0]) return sensors def get_sensors(): cache_filename = join(CACHEDIR, CACHEFILE) try: mtime = stat(cache_filename)[ST_MTIME] except OSError: mtime = 0 curtime = time() if curtime - mtime > CACHEAGE: if not SENSORS: try: p = Popen(["ipmitool", "-I", "open", "sensor"], shell=False, stdout=PIPE) except OSError: return {} else: try: p = Popen(["ipmitool", "-I", "open", "sensor", "get", "--"] + SENSORS, shell=False, stdout=PIPE) except OSError: return {} data = p.stdout.read().decode() try: with open(cache_filename, "w") as f: f.write(data) except OSError: pass else: with open(cache_filename) as f: data = f.read() return parse_data(data) def query_unit(arg): m = re.search('_u_(.*)$', arg) if not m: sys.stderr.write("Could not determine which unit you want based on executable name. " "Acceptable values: {}\n".format(" / ".join(get_sensor_names()))) sys.exit(1) return m.group(1) UNITS_TO_SENSORS = { 'volts': { 'title': "Voltages", 'args': '--base 1000', 'vlabel': 'Volts', 'info': "This graph shows the voltages as reported by IPMI", 'sensors': ['Voltage 2']}, 'degrees_c': { 'title': "Temperature", 'args': '--base 1000 -l 0', 'vlabel': 'Degrees C', 'info': "This graph shows the temperatures as reported by IPMI", 'sensors': ['Ambient Temp']}, 'rpm': { 'title': "RPMs", 'args': '--base 1000 -l 0', 'vlabel': 'RPM', 'info': "This graph shows the RPMs as reported by IPMI", 'sensors': ['FAN 1 RPM', 'FAN 2 RPM', 'FAN 3 RPM', 'FAN 4 RPM']}, 'amps': { 'title': "Amperes", 'args': '--base 1000', 'vlabel': 'Amperes', 'info': "This graph shows the amperes as reported by IPMI", 'sensors': ['Current 2']}, 'watts': { 'title': "Watts", 'args': '--base 1000', 'vlabel': 'Watts', 'info': "This graph shows the watts as reported by IPMI", 'sensors': ['System Level']}, } if access(CONFIG, R_OK): for line in open(CONFIG): if line.strip().startswith('#'): continue data = line.split('=', 1) if len(data) != 2: continue unit, sensors = [d.strip() for d in data] if unit not in UNITS_TO_SENSORS: continue sensor_list = [s.strip() for s in sensors.split(',') if s.strip()] UNITS_TO_SENSORS[unit]['sensors'] = sensor_list SENSORS = [] for v in UNITS_TO_SENSORS.values(): SENSORS += v['sensors'] def config_unit(unit): info = UNITS_TO_SENSORS[unit] data = get_sensors() print("graph_title IPMI Sensors:", info['title']) print("graph_args", info['args']) print("graph_vlabel", info['vlabel']) print("graph_category sensors") print("graph_info", info['info']) for lbl in info['sensors']: try: values = data[lbl] except KeyError: continue nname = normalize_sensor(lbl) print("%s.label %s" % (nname, lbl)) # detect warning and critical ranges try: assertions = values['assertions enabled'].split() except KeyError: # Some devices do not provide assertions (e.g. power supplies). Thus there is no way # to guess their warning/critical ranges. We can skip these. continue warn_l = warn_u = crit_l = crit_u = "" if 'lcr-' in assertions: crit_l = values['lower critical'].replace("na", "") if 'lnc-' in assertions: warn_l = values['lower non-critical'].replace("na", "") if 'ucr+' in assertions: crit_u = values['upper critical'].replace("na", "") if 'unc+' in assertions: warn_u = values['upper non-critical'].replace("na", "") warn = "%s:%s" % (warn_l, warn_u) crit = "%s:%s" % (crit_l, crit_u) if warn != ":": print("%s.warning %s" % (nname, warn)) if crit != ":": print("%s.critical %s" % (nname, crit)) def config(): unit = query_unit(sys.argv[0]) config_unit(unit) def report_unit(unit): info = UNITS_TO_SENSORS[unit] data = get_sensors() for lbl in info['sensors']: nname = normalize_sensor(lbl) try: value = data[lbl]["sensor reading"].split()[0] if 'Unable to read sensor' in data[lbl]["sensor reading"]: value = 'U' except KeyError: continue print("%s.value %s" % (nname, value)) def report(): unit = query_unit(sys.argv[0]) report_unit(unit) def autoconf(): data = get_sensors() if data: print("yes") else: print("no (no ipmitool output)") def suggest(): names = get_sensor_names() if not access(CONFIG, F_OK): with open(CONFIG, "w") as f: for key, sensors in names.items(): f.write("%s = %s\n" % (key, ",".join(sensors))) for key in names.keys(): print("u_%s" % key) def debug(): print(SENSORS) data = get_sensors() for key, value in data.items(): print("%s: %s (%s - %s) [%s - %s] %s" % (key, value['sensor reading'], value['lower non-critical'], value['upper non-critical'], value['lower critical'], value['upper critical'], value['assertions enabled'])) def main(): if len(sys.argv) > 1: command = sys.argv[1] else: command = "" if command == "autoconf": autoconf() elif command == "suggest": suggest() elif command == 'config': config() elif command == 'debug': debug() else: report() if __name__ == "__main__": main()