/* groovylint-disable DuplicateNumberLiteral, DuplicateStringLiteral, ImplementationAsType, InvertedCondition, LineLength, MethodReturnTypeRequired, MethodSize, NestedBlockDepth, NglParseError, NoDef, NoJavaUtilDate, NoWildcardImports, ParameterReassignment, UnnecessaryGString, UnnecessaryObjectReferences, UnnecessaryToString, UnusedImport, VariableTypeRequired */ /* * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . * */ /* Notes 2020-08-18 - staylorx - A couple of dumb coding errors, and still trying to sort out TCP 2020-08-18 - staylorx - Received version from original author (great start!) - Attemping RFC5424 format for syslog - Date/time stamping with the hub timezone 2023-5-5 - Mavrrick - Update code to output to InfluxDB Database with all needed connectivity options 2024-4-15 - Mavrrick - Update code to allow implement a Post queue 2026-3-14 - Mavrrick - Updated code for InfluxDB3 */ import groovy.transform.Field @Field static List loggerQueueMapV3 = [] @Field static List postQueueMapV3 = [] @Field static String influxDBuriV3 = "" @Field static def influxDBAuthV3 = [:] metadata { /* groovylint-disable-next-line DuplicateStringLiteral */ definition(name: "InfluxDBv3 Hubitat Live Logger", namespace: "Mavrrick", author: "Mavrrick") { capability "Initialize" } command "disconnect" preferences { input("maxRec", "integer", title: "Max Pending Records", description: "Please specifiy the max records before forcing a DB write", defaultValue: 100, required: true) input("dbWriteTime", "integer", title: "How long to wait between DB Write Attempts", description: "Enter the amount of time between 0 and 60 to wait to post your data", defaultValue: 30, required: true) input("maxBacklog", "number", title: "Max allowed backlog before purging ", description: "The size of backlog allowed befor purging records.", defaultValue: 30, required: true) input("useSSL", "bool", title: "Use SSL to connect to server", description: "", defaultValue: false) input("ip", "text", title: "InfluxDB IP Address", description: "ip address of the InfluxDB server", required: true) input("database", "text", title: "InfluxDB IP Database", description: "InfluxDB Database", required: true) input("token", "text", title: "Auth Token", description: "Auth Token for InfluxDB access", required: true) input("port", "number", title: "InfluxDB server Port", description: "InfluxDB server port", defaultValue: 8181, required: true) input("prefIgnoreSSLIssues", "bool", title: "Ignore SSL Issues", description: "", defaultValue: true) input("logEnable", "bool", title: "Enable debug logging", description: "", defaultValue: false) } } import groovy.json.JsonSlurper import hubitat.device.HubAction import hubitat.device.Protocol import java.time.* void logsOff() { log.warn "debug logging disabled..." device.updateSetting("logEnable", [value:"false", type:"bool"]) } void installed() { if (logEnable) { log.debug "installed()" } updated() } void updated() { if (logEnable) { log.debug "updated()" } initialize() //turn off debug logs after 30 minutes if (logEnable) { runIn(1800, logsOff) } } void parse(String description) { def descData = new JsonSlurper().parseText(description) if ("${descData.id}" != "${device.id}") { String name = escapeStringForInfluxDB(descData.name) String message = escapeStringHTMlforMsg(descData.msg) // String msg = '"' + descData.msg + '"' String msg = '"' + message + '"' String timestmp = '"' + descData.time + '"' String id = descData.id.toString() switch (descData.level) { case 'error': sevCode = 3 break case 'warn': sevCode = 4 break case 'info': sevCode = 6 break default : sevCode = 7 } switch (descData.level) { case 'error': severity = "err" break case 'warn': severity = "warning" break case 'info': severity = "info" break default : severity = "debug" } String level = descData.level.toString() String type = descData.type.toString() long timeNow = new Date().time def dateFormat = "yyyy-MM-dd HH:mm:ss.SSS" def date = Date.parse(dateFormat, descData.time) epoch_milis = date.getTime() * 1000000 loggerQueueMapV3 += ["syslog,appname=${name},facility=${type},host=${escapeStringForInfluxDB(location.hub.name)},hostname=${escapeStringForInfluxDB(location.hub.name)},severity=${severity} facility_code=1,message=${msg},procid=${id},severity_code=${sevCode},timestamp=${timestmp} ${epoch_milis}"] } if (loggerQueueMapV3.size().toInteger() > maxRec.toInteger() ) { if (logEnable) { log.debug "parse(): loggerQueueMapV3 size is greater then ${maxRec}. Triggering Write to DB now" } // writeQueuedDataToInfluxDb() } } /** * writeQueuedDataToInfluxDb() * * Performs write to influxDB database. * * NB: Function name handleInfluxResponse must be kept for backward compatibility **/ void writeQueuedDataToInfluxDb(){ if ( influxDBuriV3 == null || influxDBAuthV3 == [:]) { retrieveConnectionInfo() } if (loggerQueueMapV3.size() > 0) { // if (logEnable) { log.debug "writeQueuedDataToInfluxDb(): loggerQueueMapV3 is ${loggerQueueMapV3}" } postQueueMapV3 = loggerQueueMapV3 // if (logEnable) { log.debug "writeQueuedDataToInfluxDb():postQueueMapV3 is ${postQueueMapV3}" } if (logEnable) { log.debug "writeQueuedDataToInfluxDb():uri is ${influxDBuriV3}. the Headers are ${influxDBAuthV3}" } int postCount = postQueueMapV3.size() String data = postQueueMapV3.subList(0, postCount).toArray().join('\n') long timeNow = new Date().time def postParams = [ uri: influxDBuriV3, requestContentType: 'application/json', contentType: 'application/json', headers: influxDBAuthV3, ignoreSSLIssues: settings.prefIgnoreSSLIssues, timeout: 60, body: data ] if (logEnable) { log.debug "writeQueuedDataToInfluxDb(): Postparms are of ${postParams} completed. " } asynchttpPost('handleInfluxResponse', postParams, [ postTime: timeNow ]) } else { if (logEnable) log.debug "writeQueuedDataToInfluxDb(): size: no records to process" } } /** * handleInfluxResponse() * * Handles response from post made in writeQueuedDataToInfluxDb(). * * NB: Function name handleInfluxResponse must be kept for backward compatibility **/ void handleInfluxResponse(hubResponse, closure) { // Migration: Transitioning from older versions will not have closure set Double elapsed = (closure) ? (now() - closure.postTime) / 1000 : 0 int postCount = loggerQueueMapV3.size() if (hubResponse.status < 400) { if (logEnable) { log.warn "Post of log data complete - elapsed time ${elapsed} seconds - Status: ${hubResponse.status}. Dropping last ${postCount} records" } loggerQueueMapV3 = loggerQueueMapV3.minus(postQueueMapV3) postQueueMapV3 = postQueueMapV3.minus(postQueueMapV3) if (logEnable) { log.warn "${postQueueMapV3.size()} records left to post" } } else { log.error "Post of of log data failed - elapsed time ${elapsed} seconds - Status: ${hubResponse}, URI: ${influxDBuriV3}, Headers: ${influxDBAuthV3}, Data: ${data}" if (loggerQueueMapV3.size() > maxBacklog){ int over = loggerQueueMapV3.size() - maxBacklog loggerQueueMapV3 = loggerQueueMapV3.minus(loggerQueueMapV3.subList(0, over)) } } } void connect() { if (logEnable) { log.debug "attempting connection" } try { interfaces.webSocket.connect("http://localhost:8080/logsocket") pauseExecution(1000) } catch (e) { log.error "initialize error: ${e.message}" logger.error("Exception", e) } } void disconnect() { interfaces.webSocket.close() } void uninstalled() { disconnect() } void initialize() { if (logEnable) { log.debug "initialize()" } state.remove("uri") state.remove("header") state.remove("postCount") loggerQueueMapV3 = [] retrieveConnectionInfo() String writeTime = '*/'+dbWriteTime+' * * ? * *' schedule(writeTime, writeQueuedDataToInfluxDb) runIn(5, "connect") } void retrieveConnectionInfo() { if (useSSL) { influxDBuriV3 = "https://" + ip + ":" + port + "/api/v3/write_lp?db=" + database } else { influxDBuriV3 = "http://" + ip + ":" + port + "/api/v3/write_lp?db=" + database } String token3 = token influxDBAuthV3.put("Authorization", "Bearer ${token3}") if (logEnable) { log.debug "initialize() Header:${influxDBAuthV3} URI: ${influxDBuriV3}" } } void webSocketStatus(String message) { // handle error messages and reconnect if (logEnable) { log.debug "Got status ${message}" } if (message.startsWith("failure")) { // reconnect in a little bit runIn(5, connect) } } /** * escapeStringForInfluxDB() * * Escape values to InfluxDB. * * If a tag key, tag value, or field key contains a space, comma, or an equals sign = it must * be escaped using the backslash character \. Backslash characters do not need to be escaped. * Commas and spaces will also need to be escaped for measurements, though equals signs = do not. * * Further info: https://docs.influxdata.com/influxdb/v0.10/write_protocols/write_syntax/ **/ private String escapeStringForInfluxDB(String str) { //logger("$str", "info") if (str) { str = str.replaceAll(" ", "\\\\ ") // Escape spaces. str = str.replaceAll(",", "\\\\,") // Escape commas. str = str.replaceAll("=", "\\\\=") // Escape equal signs. str = str.replaceAll("\"", "\\\\\"") // Escape double quotes. str = str.replaceAll(/<[^>]*>/, "") // Remove html tages from string } else { str = 'null' } return str } private String escapeStringHTMlforMsg(String str) { //logger("$str", "info") if (str) { str = str.replaceAll("'", "&") // Escape spaces. str = str.replaceAll("<", "<") // Escape commas. str = str.replaceAll(">", ">") // Escape equal signs. str = str.replaceAll("", "'") // Escape double quotes. str = str.replaceAll("'", "'") // Replace apostrophes with underscores. str = str.replaceAll("'", "'") } else { str = 'null' } return str }