/* Copyright 2021 - tomw Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ------------------------------------------- Change history: 1.2.1 - tomw - Misc. improvements: more responsive Away Mode status; ignoring stale notifications and alerts 1.2.0 - cgmckeever - Added current flow time to allow custom reporting/rules/triggering 1.1.2 - robertcraft - Change usage attributes to Number 1.1.1 - tomw - Better handling for notifications when old notifications were deleted by the Flume app. 1.1.0 - tomw - Added PresenceSensor to represent away_mode status. 1.0.2 - tomw - User feedback: improved notification handling. 1.0.1 - tomw - Improved next Refresh scheduling for better reliability. 1.0.0 - tomw - Initial release. */ metadata { definition(name: "Flume Device", namespace: "tomw", author: "tomw", importUrl: "") { capability "Configuration" capability "Initialize" capability "PresenceSensor" capability "Refresh" capability "WaterSensor" command "setAwayMode" command "clearAwayMode" command "clearWetStatus" command "testWetStatus" attribute "commStatus", "string" attribute "usageLastMinute", "number" attribute "usageLastHour", "number" attribute "usageLastDay", "number" attribute "usageLastWeek", "number" attribute "usageLastMonth", "number" attribute "flowDurationMin", "number" attribute "flowStatus", "string" attribute "notificationStream", "string" } } preferences { section { input "clientID", "text", title: "client_id", required: true input "clientSecret", "text", title: "client_secret", required: true input "username", "text", title: "username", required: true input "password", "text", title: "password", required: true } section { input name: "slidingWindow", type: "bool", title: "Use sliding time windows for usage? (turn off to mirror behavior in Flume app)", defaultValue: false input "refreshInterval", "number", title: "Refresh interval (minutes)", defaultValue: 2, required: true } section { input "flowLookbackTime", "number", title: "Period of time to analyze when determining flow state. (Minutes)", defaultValue: 3, required: true } section { input name: "logEnable", type: "bool", title: "Enable debug logging", defaultValue: false } } def logDebug(msg) { if (logEnable) { log.debug(msg) } } def errorLine(error) { String str= error.getStackTrace().toString() def pattern = ( str =~ /groovy.(\d+)./ ) return pattern[0][1] } def updated() { configure() } def configure() { sendEvent(name: "commStatus", value: "unknown") try { state.clear() getTokens() fetchDevices() fetchLocations() setAlertCount(0) initialize(false) sendEvent(name: "commStatus", value: "good") } catch (Exception e) { errorLine = errorLine(e) log.error("error: ${e.message} @ Line: ${errorLine}") //logDebug("status = ${e.getResponse()?.getStatus()}") sendEvent(name: "commStatus", value: "error") } } def initialize() { initialize(true) } def initialize(doGetTokens) { if(doGetTokens) { getTokens() } clearWetStatus() refresh() } def refresh() { try { unschedule(refresh) processQueryResponse(queryDevice()) fetchUsageAlerts() fetchNotifications() fetchLocations() sendEvent(name: "commStatus", value: "good") } catch (Exception e) { errorLine = errorLine(e) log.error("error: ${e.message} @ Line: ${errorLine}") //log.error("status = ${e.getResponse()?.getStatus()}") sendEvent(name: "commStatus", value: "error") } // always schedule next refresh runIn((60 * refreshInterval).toFloat().toInteger(), refresh) } def getTokens() { def resp = httpExec("POST", genParamsTokens("get"), true)?.data logDebug("getTokens resp = ${resp}") updateAuthTokensStore(resp) } def refreshTokens() { def resp = httpExec("POST", genParamsTokens("refresh"), true)?.data logDebug("refreshTokens resp = ${resp}") updateAuthTokensStore(resp) } def fetchDevices() { def queryString = "?user=false&location=false" def resp = httpExec("GET", genParamsMain("users/${getUserId()}/devices/${queryString}"), true)?.data logDebug("fetchDevices resp = ${resp}") if(resp) { setDevices(resp.data) } } def fetchLocations() { def queryString = "?limit=50&offset=0&sort_field=id&sort_direction=ASC&list_shared=false" def resp = httpExec("GET", genParamsMain("users/${getUserId()}/locations/${queryString}"), true)?.data logDebug("fetchLocations resp = ${resp}") if(resp) { setLocations(resp.data) sendEvent(name: "presence", value: resp.data?.getAt(0)?.away_mode ? "not present" : "present") } } def fetchNotifications() { def queryString = "?limit=1000&sort_direction=ASC&flume_leak=true" def resp = httpExec("GET", genParamsMain("users/${getUserId()}/notifications/${queryString}"), true)?.data logDebug("fetchNotifications resp = ${resp}") if(resp?.data) { def tempCount = getNotifCount() ?: 0 def processingStale = (tempCount == 0) if(resp.data.size() >= tempCount) { // this is a typical state; // there are at least as many notifications as we think we already have read // hack the known ones off of the beginning, then process the rest for(i = 0; i < tempCount; i++) { resp.data.remove(0) } } else { // less are in the notification than we think we already have read, // so they must have been cleared in the app // reset count and process them all to catch up tempCount = 0 } for(notification in resp.data) { tempCount++ if(!processingStale) { sendEvent(name: "notificationStream", value: "[${notification.created_datetime}]: ${notification.message}", isStateChange: true) } // short pause to try to keep them sequential in the event log pauseExecution(10) } setNotifCount(tempCount) } } def setNotifCount(count) { state.notifCount = count } def getNotifCount() { return state.notifCount } def fetchUsageAlerts() { def tempCount = getAlertCount() ?: 0 def processingStale = (tempCount == 0) def queryString = "?limit=100&offset=${tempCount}&sort_direction=ASC&flume_leak=true" def resp = httpExec("GET", genParamsMain("users/${getUserId()}/usage-alerts/${queryString}"), true)?.data logDebug("fetchUsageAlerts resp = ${resp}") if(resp.data) { for(alert in resp.data) { tempCount++ // check for Flume Smart Leak alerts if(!processingStale && (alert.flume_leak == true)) { setWetStatus() } } setAlertCount(tempCount) } } def setLastPollTS() { state.lastPollTS = new Date().getTime() } def getLastPollTS() { lastPoll = state.lastPollTS == null ? 0 : state.lastPollTS.toLong() return lastPoll } def setFlowDetectedTS() { state.flowDetectedTS = getLastPollTS() } def getFlowDetectedTS() { flowDetectedTS = state.flowDetectedTS == null ? 0 : state.flowDetectedTS.toLong() return flowDetectedTS } def setAlertCount(count) { state.alertCount = count } def getAlertCount() { return state.alertCount } def queryDevice() { def now = new Date() def wkMult = 7 def nowMin, minAgo, hourAgo, yesterday, lastWeek, lastMonth, flowLookback, flowRefresh def qMin, qHour, qDay, qWeek, qMonth, qFlowLookback, qFlowRefresh nowMin = now.format('yyyy-MM-dd HH:mm:00') // `MIN` queries don't contain results for the current minute; request the previous full minute minAgo = new Date(now.getTime() - 60000).format('yyyy-MM-dd HH:mm:00') // make sure to check for missing setting (pre-1.2.0 update) flowLookbackTime = flowLookbackTime ?: 3 flowLookback = new Date(now.getTime() - (flowLookbackTime * 60 * 1000)).format('yyyy-MM-dd HH:mm:00') flowRefresh = new Date(now.getTime() - (refreshInterval * 60 * 1000)).format('yyyy-MM-dd HH:mm:00') qFlowLookback = [request_id: "flowLookback", since_datetime: flowLookback, until_datetime: nowMin, bucket: "MIN", group_multiplier: flowLookbackTime] qFlowRefresh = [request_id: "flowRefresh", since_datetime: flowRefresh, until_datetime: nowMin, bucket: "MIN", group_multiplier: refreshInterval] if(slidingWindow) { hourAgo = new Date(now.getTime() - (60 * 60 * 1000)).format('yyyy-MM-dd HH:mm:00') yesterday = new Date(now.getTime() - (24 * 60 * 60 * 1000)).format('yyyy-MM-dd HH:mm:00') lastWeek = new Date(now.getTime() - (7 * 24 * 60 * 60 * 1000)).format('yyyy-MM-dd HH:mm:00') lastMonth = new Date(now.getTime() - ((30 * 24 * 60 * 60 * 1000) & 0xffffffffL)).format('yyyy-MM-dd HH:mm:00') qMin = [request_id: "lastMin", since_datetime: minAgo, until_datetime: nowMin, bucket: "MIN"] qHour = [request_id: "lastHour", since_datetime: hourAgo, until_datetime: nowMin, bucket: "MIN", group_multiplier: 60] qDay = [request_id: "lastDay", since_datetime: yesterday, until_datetime: nowMin, bucket: "MIN", group_multiplier: 60 * 24] qWeek = [request_id: "lastWeek", since_datetime: lastWeek, until_datetime: nowMin, bucket: "MIN", group_multiplier: 60 * 24 * 7] qMonth = [request_id: "lastMonth", since_datetime: lastMonth, until_datetime: nowMin, bucket: "MIN", group_multiplier: 60 * 24 * 30] } else { hourAgo = new Date(now.getTime()).format('yyyy-MM-dd HH:00:00') yesterday = new Date(now.getTime()).format('yyyy-MM-dd 00:00:00') lastMonth = new Date(now.getTime()).format('yyyy-MM-01 00:00:00') // temp value, need to round to Sunday def lastWeekRaw = new Date(now.getTime() - (7 * 24 * 60 * 60 * 1000)) def i = 0 def found = false now.downto(lastWeekRaw) { it -> ++i if(it.format('E') == "Mon" && false == found) { lastWeek = new Date(it.getTime()).format('yyyy-MM-dd 00:00:00') wkMult = i found = true } } qMin = [request_id: "lastMin", since_datetime: minAgo, until_datetime: nowMin, bucket: "MIN"] qHour = [request_id: "lastHour", since_datetime: hourAgo, until_datetime: nowMin, bucket: "HR"] qDay = [request_id: "lastDay", since_datetime: yesterday, until_datetime: nowMin, bucket: "DAY"] qWeek = [request_id: "lastWeek", since_datetime: lastWeek, until_datetime: nowMin, bucket: "DAY", group_multiplier: wkMult] qMonth = [request_id: "lastMonth", since_datetime: lastMonth, until_datetime: nowMin, bucket: "MON"] } def query = new groovy.json.JsonOutput().toJson([queries: [qMin, qHour, qDay, qWeek, qMonth, qFlowLookback, qFlowRefresh]]) def resp = httpExec("POST", genParamsMain("users/${getUserId()}/devices/${getDevices()[0]?.id}/query", query), true)?.data if(resp) { setLastPollTS() } return resp } def processQueryResponse(resp) { if(resp) { logDebug("${resp.data}") sendEvent(name: "usageLastMinute", value: (resp.data[0]?.lastMin && (resp.data[0]?.lastMin?.size() > 0)) ? resp.data[0].lastMin.getAt(resp.data[0].lastMin.size() - 1)?.value?.toFloat().round(2) : "n/a") sendEvent(name: "usageLastHour", value: (resp.data[0]?.lastHour && (resp.data[0]?.lastHour?.size() > 0)) ? resp.data[0].lastHour.getAt(resp.data[0].lastHour.size() - 1)?.value?.toFloat().round(2) : "n/a") sendEvent(name: "usageLastDay", value: (resp.data[0]?.lastDay && (resp.data[0]?.lastDay?.size() > 0)) ? resp.data[0].lastDay.getAt(resp.data[0].lastDay.size() - 1)?.value?.toFloat().round(2) : "n/a") sendEvent(name: "usageLastWeek", value: (resp.data[0]?.lastWeek && (resp.data[0]?.lastWeek?.size() > 0)) ? resp.data[0].lastWeek.getAt(resp.data[0].lastWeek.size() - 1)?.value?.toFloat().round(2) : "n/a") sendEvent(name: "usageLastMonth", value: (resp.data[0]?.lastMonth && (resp.data[0]?.lastMonth?.size() > 0)) ? resp.data[0].lastMonth.getAt(resp.data[0].lastMonth.size() - 1)?.value?.toFloat().round(2) : "n/a") lookbackFlow = (resp.data[0]?.flowLookback && (resp.data[0]?.flowLookback?.size() > 0) && resp.data[0].flowLookback.getAt(resp.data[0].flowLookback.size() - 1)?.value?.toFloat() > 0) ? true : false refreshFlow = (resp.data[0]?.flowRefresh && (resp.data[0]?.flowRefresh?.size() > 0) && resp.data[0].flowRefresh.getAt(resp.data[0].flowRefresh.size() - 1)?.value?.toFloat() > 0) ? true : false currentFlowTS = getLastPollTS() flowDetectedTS = getFlowDetectedTS() flowStatus = device.currentValue("flowStatus") if (refreshFlow || lookbackFlow) { // [cgmckeever] future us, this `flowDetectedTS == 0` is an edge case // where water is flowing and preferences are saved. // `flowDetectedTS` state variable gets purged, // making runTime very large, and usually triggers an alert if (flowStatus == "stopped" || flowDetectedTS == 0 ) { setFlowDetectedTS() flowDetectedTS = getFlowDetectedTS() logDebug(">>>>>>>>>>>>> startFlowWatch: ${flowDetectedTS}") } if (refreshFlow) { status = "running" } else { status = "monitoring" } if (flowStatus != status) { sendEvent(name: "flowStatus", value: status) } runTime = Math.round((currentFlowTS - flowDetectedTS)/600)/100 sendEvent(name: "flowDurationMin", value: runTime) } else { if (flowStatus != "stopped") { if (flowStatus != null) { runTime = Math.round((currentFlowTS - flowDetectedTS)/600)/100 finalRunTime = runTime - (flowLookbackTime - refreshInterval) logDebug(">>>>>>>>>>>>> startFlowWatch: ${flowDetectedTS}") logDebug(" endFlowWatch: ${currentFlowTS}") logDebug(" runTime: ${finalRunTime}") } sendEvent(name: "flowDurationMin", value: 0) sendEvent(name: "flowStatus", value: "stopped") } } } } def setAwayMode(away) { def body = new groovy.json.JsonOutput().toJson([away_mode: away]) httpExec("PATCH", genParamsMain("users/${getUserId()}/locations/${getLocations()[0]?.id}", body), true)?.data pauseExecution(1000) // do a partial refresh to get updated home/away status, // so we don't have to wait for the refreshInterval to expire fetchLocations() } def setAwayMode() { setAwayMode(true) } def clearAwayMode() { setAwayMode(false) } def setWetStatus() { sendEvent(name: "water", value: "wet") } def clearWetStatus() { sendEvent(name: "water", value: "dry") } def testWetStatus() { setWetStatus() } def genParamsTokens(op) { def body switch(op) { case "get": body = new groovy.json.JsonOutput().toJson( [ grant_type: "password", client_id: clientID, client_secret: clientSecret, username: username, password: password ]) break case "refresh": body = new groovy.json.JsonOutput().toJson( [ grant_type: "refresh_token", refresh_token: getRefreshToken(), client_id: clientID, client_secret: clientSecret ]) break default: logDebug("unsupported op") return } def params = [ uri: getBaseURI() + getTokensSuffix(), body: body, contentType: "application/json", requestContentType: "application/json", ] return params } def genParamsMain(suffix, body = null) { def params = [ uri: getBaseURI() + suffix, headers: [ 'Authorization': "Bearer " + getAccessToken() ], contentType: "application/json", requestContentType: "application/json" ] if(body) { params['body'] = body } return params } def getBaseURI() { return "https://api.flumewater.com/" } def getTokensSuffix() { return "oauth/token" } def updateAuthTokensStore(resp) { if(resp) { setRefreshToken(resp.data?.getAt(0)?.refresh_token) setAccessToken(resp.data?.getAt(0)?.access_token) setUserId(new groovy.json.JsonSlurper().parseText(new String(resp.data?.getAt(0)?.access_token?.tokenize(".")?.getAt(1)?.decodeBase64()))?.user_id) def interval = resp.data?.getAt(0)?.expires_in?.toInteger() if(interval > 0) { // schedule an hour before expiration runIn(interval - 60 * 60, refreshTokens) } } } def setAccessToken(token) { state.accessToken = token } def getAccessToken() { return state.accessToken } def setRefreshToken(token) { state.refreshToken = token } def getRefreshToken() { return state.refreshToken } def setUserId(id) { state.userID = id } def getUserId() { return state.userID } def setDevices(devices) { state.devices = devices } def getDevices() { return state.devices } def setLocations(locations) { state.locations = locations } def getLocations() { return state.locations } def httpGetExec(params, throwToCaller = false) { logDebug("httpGetExec(${params})") try { def result httpGet(params) { resp -> if (resp) { //logDebug("resp.data = ${resp.data}") result = resp } } return result } catch (Exception e) { logDebug("httpGetExec() failed: ${e.message}") logDebug("status = ${e.getResponse().getStatus().toInteger()}") if(throwToCaller) { throw(e) } } } def httpPostExec(params, throwToCaller = false) { logDebug("httpPostExec(${params})") try { def result httpPost(params) { resp -> if (resp) { //logDebug("resp.data = ${resp.data}") result = resp } } return result } catch (Exception e) { logDebug("httpPostExec() failed: ${e.message}") //logDebug("status = ${e.getResponse().getStatus().toInteger()}") if(throwToCaller) { throw(e) } } } def httpPatchExec(params, throwToCaller = false) { logDebug("httpPatchExec(${params})") try { def result httpPatch(params) { resp -> if (resp) { //logDebug("resp.data = ${resp.data}") result = resp } } return result } catch (Exception e) { logDebug("httpPostExec() failed: ${e.message}") //logDebug("status = ${e.getResponse().getStatus().toInteger()}") if(throwToCaller) { throw(e) } } } def httpExec(operation, params, throwToCaller = false) { def res switch(operation) { default: logDebug("unsupported Http operation") if(throwToCaller) { throw new Exception("unsupported Http operation") } break case "PATCH": res = httpPatchExec(params, throwToCaller) break case "POST": res = httpPostExec(params, throwToCaller) break case "GET": res = httpGetExec(params, throwToCaller) break } logDebug("X-RateLimit-Remaining is ${res.headers['X-RateLimit-Remaining']?.value}") return res }