{==============================================================================} { WaterlyConnect - POST tag data to WaterlyConnect API } { Reads tag paths from CSV with schedules, sends in batches, logs to file+VTScada } {==============================================================================} WaterlyConnectPost( TriggerValue; { Input: watched trigger value } ScriptObj; { Output: script tag reference } ) [ { ----- Configuration (EDIT THESE VALUES) ----- } WaterlyURL = "https://app.waterlyapp.com/connect/submit"; WaterlyToken = ""; WaterlyDeviceID = ""; WaterlyTagsCsv = "C:\\VTScada\\MyApp\\WaterlyConnect\\Config\\WaterlyTags.csv"; LogPath = "C:\\VTScada\\MyApp\\WaterlyConnect\\Logs\\WaterlyConnect.log"; BatchSize = 100; { Tags per HTTP request } EnableLogging = TRUE; EnableFileLog = TRUE; { ----- Internal variables ----- } Response; Headers; JsonBody; TagData; CurrentTag; TagValue; TagTimestamp; NowTimestamp; TotalTags; TotalBatches; BatchNum; BatchStart; BatchEnd; i; FirstInBatch; SuccessCount; FailCount; PostResult; TagPaths; ScheduleCache; LastRunBySchedule; NowDay; NowSecondsOfDay; NowHour; NowMinute; NowSecond; LogInfo(Message) [ If EnableLogging; [ LogMessage(Concat("WaterlyConnect: ", Message)); ] If EnableFileLog; [ AppendLog(Concat("WaterlyConnect: ", Message)); ] ] AppendLog(Message) [ Existing; Line; NewContent; NowTimestamp = (CurrentTime(6) - 25569) * 86400; Line = Concat(Text(NowTimestamp, "#"), " ", Message); Existing = ReadFile(LogPath); If Valid(Existing); [ NewContent = Concat(Existing, "\r\n", Line); ] Else [ NewContent = Line; ] WriteFile(LogPath, NewContent); ] TrimCr(Value) [ Parts; Clean; idx; Parts = Split(Value, "\r"); Clean = ""; idx = 0; Loop idx < ArraySize(Parts); [ Clean = Concat(Clean, Parts[idx]); idx = idx + 1; ] Return(Clean); ] CleanToken(Value) [ Parts; SubParts; Clean; idx; j; Parts = Split(Value, " "); Clean = ""; idx = 0; Loop idx < ArraySize(Parts); [ SubParts = Split(Parts[idx], "\r"); j = 0; Loop j < ArraySize(SubParts); [ Clean = Concat(Clean, SubParts[j]); j = j + 1; ] idx = idx + 1; ] Return(Clean); ] InitNow() [ NowDay = Floor(CurrentTime(6)); NowSecondsOfDay = (CurrentTime(6) - NowDay) * 86400; NowHour = Floor(NowSecondsOfDay / 3600); NowMinute = Floor((NowSecondsOfDay - (NowHour * 3600)) / 60); NowSecond = Floor(NowSecondsOfDay - (NowHour * 3600) - (NowMinute * 60)); ] ScheduleIsDue(Interval; IntervalNum) [ CacheKey; WindowKey; Due; TargetHour; TargetMinute; TargetSecond; DivResult; Remainder; Times; TimeIndex; TimeValue; CacheKey = Concat(Interval, "|", IntervalNum); If Valid(ScheduleCache[CacheKey]); [ Return(ScheduleCache[CacheKey]); ] Due = FALSE; If Interval == "minute"; [ If IntervalNum < 1; IntervalNum = 1; DivResult = Floor(NowMinute / IntervalNum); Remainder = NowMinute - (DivResult * IntervalNum); WindowKey = (NowDay * 10000) + (NowHour * 100) + NowMinute; If NowSecond < 5 && Remainder == 0 && LastRunBySchedule[CacheKey] != WindowKey; [ Due = TRUE; ] ] Else If Interval == "hour"; [ If IntervalNum < 1; IntervalNum = 1; DivResult = Floor(NowHour / IntervalNum); Remainder = NowHour - (DivResult * IntervalNum); WindowKey = (NowDay * 100) + NowHour; If NowMinute == 0 && NowSecond < 5 && Remainder == 0 && LastRunBySchedule[CacheKey] != WindowKey; [ Due = TRUE; ] ] Else If Interval == "day_at_time"; [ Times = Split(IntervalNum, "|"); TimeIndex = 0; Loop TimeIndex < ArraySize(Times); [ TimeValue = Times[TimeIndex]; TargetHour = Floor(TimeValue / 10000); TargetMinute = Floor((TimeValue - (TargetHour * 10000)) / 100); TargetSecond = TimeValue - (TargetHour * 10000) - (TargetMinute * 100); WindowKey = (NowDay * 1000000) + TimeValue; If NowHour == TargetHour && NowMinute == TargetMinute && NowSecond >= TargetSecond && NowSecond < (TargetSecond + 5) && LastRunBySchedule[CacheKey] != WindowKey; [ Due = TRUE; ] TimeIndex = TimeIndex + 1; ] ] Else [ LogInfo(Concat("Unknown interval: ", Interval)); ] If Due; [ LastRunBySchedule[CacheKey] = WindowKey; ] ScheduleCache[CacheKey] = Due; Return(Due); ] LoadTagPaths() [ FileContent; Lines; Fields; TagList; Interval; IntervalNum; EnabledValue; TagPath; FileContent = ReadFile(WaterlyTagsCsv); If !Valid(FileContent); [ LogInfo(Concat("Tag CSV not found: ", WaterlyTagsCsv)); Return({}); ] Lines = Split(FileContent, "\n"); TagList = {}; { Skip header row at index 0 } i = 1; Loop i < ArraySize(Lines); [ Fields = Split(Lines[i], ","); If ArraySize(Fields) >= 4; [ TagPath = TrimCr(Fields[0]); EnabledValue = CleanToken(Fields[1]); Interval = CleanToken(Fields[2]); IntervalNum = CleanToken(Fields[3]); If EnabledValue == "1" && ScheduleIsDue(Interval, IntervalNum); [ TagList = ArrayAppend(TagList, TagPath); ] ] i = i + 1; ] Return(TagList); ] Main [ If Watch(TriggerValue) && TriggerValue == 1; [ If !Valid(LastRunBySchedule); [ LastRunBySchedule = New(Dictionary); ] ScheduleCache = New(Dictionary); InitNow(); TagPaths = LoadTagPaths(); TotalTags = ArraySize(TagPaths); If TotalTags == 0; [ LogInfo("No scheduled tags due at this time."); Return(); ] TotalBatches = Ceiling(TotalTags / BatchSize); SuccessCount = 0; FailCount = 0; { Build timestamp (Unix seconds) } NowTimestamp = (CurrentTime(6) - 25569) * 86400; { Build headers dictionary } Headers = New(Dictionary); Headers["x-waterly-request-type"] = "WaterlyConnect"; Headers["x-waterly-connect-token"] = WaterlyToken; LogInfo(Concat("Starting - ", Text(TotalTags, "#"), " tags in ", Text(TotalBatches, "#"), " batches")); BatchNum = 0; Loop BatchNum < TotalBatches; [ BatchStart = BatchNum * BatchSize; BatchEnd = Min(BatchStart + BatchSize, TotalTags); TagData = ""; FirstInBatch = 1; i = BatchStart; Loop i < BatchEnd; [ CurrentTag = TagPaths[i]; TagValue = GetTagValue(CurrentTag); TagTimestamp = (GetTagTimestamp(CurrentTag) - 25569) * 86400; If FirstInBatch == 0; [ TagData = Concat(TagData, ","); ] TagData = Concat(TagData, "{", "\"name\":\"", CurrentTag, "\",", "\"value\":\"", Text(TagValue, "#.######"), "\",", "\"last_change_timestamp\":", Text(TagTimestamp, "#"), "}" ); FirstInBatch = 0; i = i + 1; ] JsonBody = Concat( "{", "\"timestamp\":", Text(NowTimestamp, "#"), ",", "\"device\":{", "\"id\":\"", WaterlyDeviceID, "\",", "\"type\":\"VTScada\"", "},", "\"tags\":[", TagData, "]", "}" ); PostResult = System.HTTPClient.HTTPSend( WaterlyURL, { URL } &Response, { Response pointer } Invalid, { Username (not used) } Invalid, { Password (not used) } Headers, { Extra headers } "POST", { HTTP method } JsonBody, { Message body } "application/json", { Content type } 30, { Timeout seconds } 1, { Concurrent connections } FALSE, { Keep alive } FALSE, { Message only } Invalid { Max send rate } ); If Valid(Response) && Response["ReturnCode"] == 200; [ SuccessCount = SuccessCount + (BatchEnd - BatchStart); ] Else [ FailCount = FailCount + (BatchEnd - BatchStart); LogInfo(Concat("Batch ", Text(BatchNum + 1, "#"), " failed - Code: ", Text(PickValid(Response["ReturnCode"], -1), "#"))); ] BatchNum = BatchNum + 1; ] If FailCount == 0; [ LogInfo(Concat("Complete - ", Text(SuccessCount, "#"), " tags posted successfully")); ] Else [ LogInfo(Concat("Complete - ", Text(SuccessCount, "#"), " succeeded, ", Text(FailCount, "#"), " failed")); ] ] ] ]