/* groovylint-disable DuplicateNumberLiteral, DuplicateStringLiteral, ImplementationAsType, InvertedCondition, LineLength, MethodReturnTypeRequired, MethodSize, NestedBlockDepth, NglParseError, NoDef, NoJavaUtilDate, NoWildcardImports, ParameterReassignment, UnnecessaryGString, UnnecessaryObjectReferences, UnnecessaryToString, UnusedImport, VariableTypeRequired */ /* * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . * */ /* Notes 2020-08-18 - staylorx - A couple of dumb coding errors, and still trying to sort out TCP 2020-08-18 - staylorx - Received version from original author (great start!) - Attemping RFC5424 format for syslog - Date/time stamping with the hub timezone 2023-5-5 - Mavrrick - Update code to output to InfluxDB Database with all needed connectivity options 2024-4-15 - Mavrrick - Update code to allow implement a Post queue */ import groovy.transform.Field @Field static List loggerQueueMap = [] @Field static List postQueueMap = [] @Field static String uri = "" @Field static def header = [:] metadata { /* groovylint-disable-next-line DuplicateStringLiteral */ definition(name: "InfluxDB Hubitat Live Logger", namespace: "Mavrrick", author: "Mavrrick") { capability "Initialize" } command "disconnect" preferences { input("maxRec", "integer", title: "Max Pending Records", description: "Please specifiy the max records before forcing a DB write", defaultValue: 100, required: true) input("dbWriteTime", "integer", title: "How long to wait between DB Write Attempts", description: "Enter the amount of time between 0 and 60 to wait to post your data", defaultValue: 30, required: true) input("maxBacklog", "number", title: "Max allowed backlog before purging ", description: "The size of backlog allowed befor purging records.", defaultValue: 30, required: true) input("useSSL", "bool", title: "Use SSL to connect to server", description: "", defaultValue: false) input("ip", "text", title: "InfluxDB IP Address", description: "ip address of the InfluxDB server", required: true) input("org", "text", title: "InfluxDB IP Org", description: "InfluxDB Organization", required: true) input("bucket", "text", title: "InfluxDB IP Bucket", description: "InfluxDB Bucket", required: true) input("token", "text", title: "Auth Token", description: "Auth Token for InfluxDB access", required: true) input("port", "number", title: "InfluxDB server Port", description: "InfluxDB server port", defaultValue: 8086, required: true) input("hubName", "text", title: "Hub Name", description: "Host name for records in DB", required: true) input("prefIgnoreSSLIssues", "bool", title: "Ignore SSL Issues", description: "", defaultValue: true) input("logEnable", "bool", title: "Enable debug logging", description: "", defaultValue: false) } } import groovy.json.JsonSlurper import hubitat.device.HubAction import hubitat.device.Protocol import java.time.* void logsOff() { log.warn "debug logging disabled..." device.updateSetting("logEnable", [value:"false", type:"bool"]) } void installed() { if (logEnable) { log.debug "installed()" } updated() } void updated() { if (logEnable) { log.debug "updated()" } initialize() //turn off debug logs after 30 minutes if (logEnable) { runIn(1800, logsOff) } } void parse(String description) { def descData = new JsonSlurper().parseText(description) if ("${descData.id}" != "${device.id}") { String name = escapeStringForInfluxDB(descData.name) String message = escapeStringHTMlforMsg(descData.msg) // String msg = '"' + descData.msg + '"' String msg = '"' + message + '"' String timestmp = '"' + descData.time + '"' String id = descData.id.toString() switch (descData.level) { case 'error': sevCode = 3 break case 'warn': sevCode = 4 break case 'info': sevCode = 6 break default : sevCode = 7 } switch (descData.level) { case 'error': severity = "err" break case 'warn': severity = "warning" break case 'info': severity = "info" break default : severity = "debug" } String level = descData.level.toString() String type = descData.type.toString() long timeNow = new Date().time def dateFormat = "yyyy-MM-dd HH:mm:ss.SSS" def date = Date.parse(dateFormat, descData.time) epoch_milis = date.getTime() * 1000000 loggerQueueMap += ["syslog,appname=${name},facility=${type},host=${hubName},hostname=${hubName},severity=${severity} facility_code=1,message=${msg},procid=${id},severity_code=${sevCode},timestamp=${timestmp} ${epoch_milis}"] } if (loggerQueueMap.size().toInteger() > maxRec.toInteger() ) { if (logEnable) { log.debug "parse(): loggerQueueMap size is greater then ${maxRec}. Triggering Write to DB now" } writeQueuedDataToInfluxDb() } } /** * writeQueuedDataToInfluxDb() * * Performs write to influxDB database. * * NB: Function name handleInfluxResponse must be kept for backward compatibility **/ void writeQueuedDataToInfluxDb(){ if (loggerQueueMap.size() > 0) { // if (logEnable) { log.debug "writeQueuedDataToInfluxDb(): loggerQueueMap is ${loggerQueueMap}" } postQueueMap = loggerQueueMap // if (logEnable) { log.debug "writeQueuedDataToInfluxDb():postQueueMap is ${postQueueMap}" } int postCount = postQueueMap.size() String data = postQueueMap.subList(0, postCount).toArray().join('\n') long timeNow = new Date().time def postParams = [ uri: uri, requestContentType: 'application/json', contentType: 'application/json', headers: header, ignoreSSLIssues: settings.prefIgnoreSSLIssues, timeout: 60, body: data ] // if (logEnable) { log.debug "writeQueuedDataToInfluxDb(): Post of ${postCount} completed. " } asynchttpPost('handleInfluxResponse', postParams, [ postTime: timeNow ]) } else { if (logEnable) log.debug "writeQueuedDataToInfluxDb(): size: no records to process" } } /** * handleInfluxResponse() * * Handles response from post made in writeQueuedDataToInfluxDb(). * * NB: Function name handleInfluxResponse must be kept for backward compatibility **/ void handleInfluxResponse(hubResponse, closure) { // Migration: Transitioning from older versions will not have closure set Double elapsed = (closure) ? (now() - closure.postTime) / 1000 : 0 int postCount = loggerQueueMap.size() if (hubResponse.status < 400) { if (logEnable) { log.warn "Post of log data complete - elapsed time ${elapsed} seconds - Status: ${hubResponse.status}. Dropping last ${postCount} records" } loggerQueueMap = loggerQueueMap.minus(postQueueMap) postQueueMap = postQueueMap.minus(postQueueMap) if (logEnable) { log.warn "${postQueueMap.size()} records left to post" } } else { if (logEnable) { log.warn "Post of of log data failed - elapsed time ${elapsed} seconds - Status: ${hubResponse.status}, Error: ${hubResponse.errorMessage}, Headers: ${hubResponse.headers}, Data: ${data}" } if (loggerQueueMap.size() > maxBacklog){ over = loggerQueueMap.size() - maxBacklog loggerQueueMap = loggerQueueMap.minus(loggerQueueMap.subList(0, over)) } } } void connect() { if (logEnable) { log.debug "attempting connection" } try { interfaces.webSocket.connect("http://localhost:8080/logsocket") pauseExecution(1000) } catch (e) { log.error "initialize error: ${e.message}" logger.error("Exception", e) } } void disconnect() { interfaces.webSocket.close() } void uninstalled() { disconnect() } void initialize() { if (logEnable) { log.debug "initialize()" } state.remove("uri") state.remove("header") state.remove("postCount") header = [:] if (useSSL) { uri = "https://" + ip + ":" + port + "/api/v2/write?org=" + org + "&bucket=" + bucket } else { uri = "http://" + ip + ":" + port + "/api/v2/write?org=" + org + "&bucket=" + bucket } loggerQueueMap = [] String token3 = token // if (logEnable) { log.debug "initialize() ${token3} ${uri}" } header.put("Authorization", "Token ${token3}") if (logEnable) { log.debug "initialize() Header:${header} URI: ${uri}" } // run 'mymethod' every 30 seconds. writeTime = '*/'+dbWriteTime+' * * ? * *' schedule(writeTime, writeQueuedDataToInfluxDb) // schedule('*/dbWriteTime * * ? * *', writeQueuedDataToInfluxDb) runIn(5, "connect") } void webSocketStatus(String message) { // handle error messages and reconnect if (logEnable) { log.debug "Got status ${message}" } if (message.startsWith("failure")) { // reconnect in a little bit runIn(5, connect) } } /** * escapeStringForInfluxDB() * * Escape values to InfluxDB. * * If a tag key, tag value, or field key contains a space, comma, or an equals sign = it must * be escaped using the backslash character \. Backslash characters do not need to be escaped. * Commas and spaces will also need to be escaped for measurements, though equals signs = do not. * * Further info: https://docs.influxdata.com/influxdb/v0.10/write_protocols/write_syntax/ **/ private String escapeStringForInfluxDB(String str) { //logger("$str", "info") if (str) { str = str.replaceAll(" ", "\\\\ ") // Escape spaces. str = str.replaceAll(",", "\\\\,") // Escape commas. str = str.replaceAll("=", "\\\\=") // Escape equal signs. str = str.replaceAll("\"", "\\\\\"") // Escape double quotes. //str = str.replaceAll("'", "_") // Replace apostrophes with underscores. } else { str = 'null' } return str } private String escapeStringHTMlforMsg(String str) { //logger("$str", "info") if (str) { str = str.replaceAll("'", "&") // Escape spaces. str = str.replaceAll("<", "<") // Escape commas. str = str.replaceAll(">", ">") // Escape equal signs. str = str.replaceAll("", "'") // Escape double quotes. str = str.replaceAll("'", "'") // Replace apostrophes with underscores. str = str.replaceAll("'", "'") } else { str = 'null' } return str }