CREATE OR REPLACE APPLICATION SentimentAnalysisDemo; CREATE OR REPLACE SOURCE ReadLiveReviewsFromOracleDB USING OracleReader ( Username: '***', Password: '***', ConnectionURL: '***', Tables : 'AIDEMO.STORE_REVIEWS' ) OUTPUT TO sourceCDCStream; CREATE OR REPLACE STREAM ReviewSentimentStream OF Global.WAEvent; CREATE OR REPLACE OPEN PROCESSOR AnalyseReviewSentiment USING SentimentAnalyserAIAgent ( apiKey: '***', model: 'gpt-3.5-turbo-instruct', TableAndColumns: 'AIDEMO.STORE_REVIEWS(REVIEW_CONTENT)' ) INSERT INTO ReviewSentimentStream FROM sourceCDCStream; CREATE OR REPLACE TYPE NegativeReviews ( review_id String, store_id String KEY, review_sentiment String ); CREATE OR REPLACE STREAM NegativeReviewsStream OF NegativeReviews; CREATE OR REPLACE CQ FilterNegativeReviews INSERT INTO NegativeReviewsStream SELECT data[0] as review_id, data[1] as store_id, USERDATA(e,"reviewSentiment") as review_verdict FROM ReviewSentimentStream e where TO_STRING(USERDATA(e,"reviewSentiment")).toUpperCase().contains("NEGATIVE"); CREATE OR REPLACE JUMPING WINDOW NegativeReviewsWindow OVER NegativeReviewsStream KEEP 5 ROWS PARTITION BY store_id; CREATE OR REPLACE STREAM NegativeReviewAlertStream OF Global.AlertEvent; CREATE OR REPLACE CQ GenerateAlertEvents INSERT INTO NegativeReviewAlertStream SELECT 'Negative Review for storeID ' + store_id, store_id + '_' + DNOW(), 'warning', 'raise', 'Five negative Review received for store with ID : ' + store_id FROM NegativeReviewsWindow GROUP BY store_id; CREATE OR REPLACE SUBSCRIPTION NegativeReviewWebAlert USING WebAlertAdapter( ) INPUT FROM NegativeReviewAlertStream; END APPLICATION SentimentAnalysisDemo;