CREATE APPLICATION TableLag; CREATE OR REPLACE TYPE TableLagEMail_Type ( name java.lang.String, keyval java.lang.String, severity java.lang.String, flag java.lang.String, message java.lang.String, emailsubject java.lang.String, emaillist java.lang.String); CREATE OR REPLACE TYPE TableLagStream_type ( tablename java.lang.String, lastmergetime java.lang.String, laginminutes java.lang.Long); CREATE OR REPLACE TYPE TableLEE_SendToOPStream_Type ( placeholder java.lang.String); CREATE OR REPLACE TYPE TableLEE_ReturnFromOPStream_Type ( tablename java.lang.String, lastmergetime java.lang.String, laginminutes java.lang.Integer); CREATE OR REPLACE TYPE TableLagThresholdsType ( TableName java.lang.String KEY, LagThresholdinMinutes java.lang.Integer, EmailTo java.lang.String); CREATE STREAM TableLagHBStream OF TableLEE_SendToOPStream_Type; CREATE STREAM TableLag_FromOPOutStream OF TableLEE_ReturnFromOPStream_Type; CREATE OR REPLACE STREAM TableLagEmails OF Global.AlertEvent; CREATE OR REPLACE CACHE TableLagThresholdsFromFile USING Global.FileReader ( positionbyeof: true, rolloverstyle: 'Default', blocksize: 64, skipbom: true, includesubdirectories: false, directory: '/Uploaded Files/Lag-Monitr/', wildcard: 'lagthresholds.csv' ) PARSE USING Global.DSVParser ( blockascompleterecord: false, charset: 'UTF-8', columndelimiter: ',', columndelimittill: '-1', handler: 'com.webaction.proc.DSVParser_1_0', header: false, headerlineno: 0, ignoreemptycolumn: false, ignoremultiplerecordbegin: 'true', ignorerowdelimiterinquote: false, linenumber: '-1', nocolumndelimiter: false, quoteset: '\"', rowdelimiter: '\n', separator: ':', trimquote: true, trimwhitespace: false ) QUERY ( keytomap: 'TableName', skipinvalid: 'false' ) OF TableLagThresholdsType; CREATE CQ TableLagHBCQ INSERT INTO TableLagHBStream SELECT "Target Mon Check" as checkmon FROM heartbeat(interval 5 second) h; CREATE TARGET TableLagSysout USING Global.SysOut ( name: 'TableLagSysoutTgt' ) INPUT FROM TableLagEmails; CREATE OR REPLACE TARGET SlackNot USING Global.SlackAlertAdapter ( ChannelName: 'lag-monitor-recipe', adapterName: 'SlackAlertAdapter', OauthToken: '************', OauthToken_encrypted: 'true' ) INPUT FROM TableLagEmails; CREATE OR REPLACE CQ TableLagConditionalAlertCQ INSERT INTO TableLagEmails SELECT 'testmail' as name, '12345' as keyval, 'error' as severity, 'raise' as flag, ls.tablename + ' is lagging by ' + ls.laginminutes + ' minutes' as message FROM TableLag_FromOPOutStream ls, TableLagThresholdsFromFile ltff WHERE ls.tablename=ltff.tablename and ls.laginminutes > ltff.lagthresholdinminutes; END APPLICATION TableLag;