## Environment Setup

In [None]:
!mkdir -p ~/.aws && cp /content/drive/MyDrive/AWS/684947_admin ~/.aws/credentials
!chmod 600 ~/.aws/credentials
!pip install -qq awscli boto3
!aws sts get-caller-identity

## Streaming ETL with Glue

In this lab you will learn how to ingest, process, and consume streaming data using AWS serverless services such as Kinesis Data Streams, Glue, S3, and Athena. To simulate the data streaming input, we will use Kinesis Data Generator (KDG).

![](https://user-images.githubusercontent.com/62965911/214810281-014f57ff-ed16-4bf5-89b7-2c473e583aaf.png)

In [6]:
!aws cloudformation create-stack \
--stack-name KinesisGlue \
--template-body file://kinesis_glue.yml \
--capabilities CAPABILITY_NAMED_IAM

{
 "StackId": "arn:aws:cloudformation:us-east-1:684199068947:stack/KinesisGlue/821f3c20-6c97-11ed-a13b-0e9b6ec0e0ff"
}


### Set up the kinesis stream

1. Navigate to AWS Kinesis console 
1. Click “Create data stream” 
1. Enter the following details
 - Data stream name: TicketTransactionStreamingData
 - Capacity mode: Provisioned
 - Provisioned shards: 2
1. Click Create data stream

### Create Table for Kinesis Stream Source in Glue Data Catalog

1. Navigate to AWS Glue console 
1. On the AWS Glue menu, select Tables, then select Add table manually from the drop down list. 
1. Enter TicketTransactionStreamData as the table name
1. Click Add database and enter tickettransactiondatabase as the database name, and click create. 
1. Use the drop down to select the database we just created, and click Next. 
1. Select Kinesis as the source, select Stream in my account to select a Kinesis data stream, select the appropriate AWS region where you have created the stream, select the stream name as TicketTransactionStreamingData from the dropdown, and click Next. 
1. Choose JSON as the incoming data format, as we will be sending JSON payloads from the Kinesis Data Generator in the following steps. Click Next. 
1. Leave the schema as empty, as we will enable the schema detection feature when defining the Glue stream job. Click Next. 
1. Leave partition indices empty. Click Next. 
1. Review all the details and click Finish. 

### Create and trigger the Glue Streaming job

1. On the left-hand side of the Glue Console, click on Jobs. This will launch Glue Studio. 
1. Select the Visual with a blank canvas option. Click Create 
1. Select Amazon Kinesis from the Source drop down list. 
1. In the panel on the right under “Data source properties - Kinesis Stream”, configure as follows:
 - Amazon Kinesis Source: Data Catalog table
 - Database: tickettransactiondatabase
 - Table: tickettransactionstreamdata
 - Make sure that Detect schema is selected
 - Leave all other fields as default
1. Select Amazon S3 from the Target drop down list. 
1. Select the Data target - S3 bucket node at the bottom of the graph, and configure as follows:
 - Format: Parquet
 - Compression Type: None
 - S3 Target Location: Select Browse S3 and select the “mod-xxx-dmslabs3bucket-xxx” bucket
1. Append TicketTransactionStreamingData/ to the S3 URL. The path should look similar to s3://mod-xxx-dmslabs3bucket-xxx/TicketTransactionStreamingData/ - don’t forget the / at the end. The job will automatically create the folder. 
1. Finally, select the Job details tab at the top and configure as follows:
 - Name: TicketTransactionStreamingJob
 - IAM Role: Select the xxx-GlueLabRole-xxx from the drop down list
 - Type: Spark Streaming
1. Press the Save button in the top right-hand corner to create the job.
1. Once you see the Successfully created job message in the banner, click the Run button to start the job.

### Trigger streaming data from KDG

1. Launch KDG using the url you bookmarked from the lab setup. Login using the user and password you provided when deploying the Cloudformation stack.
1. Make sure you select the appropriate region, from the dropdown list, select the TicketTransactionStreamingData as the target Kinesis stream, leave Records per second as the default (100 records per second); for the record template, type in NormalTransaction as the payload name, and copy the template payload as follows:
 ```json
 {
 "customerId": "{{random.number(50)}}",
 "transactionAmount": {{random.number(
 {
 "min":10,
 "max":150
 }
 )}},
 "sourceIp" : "{{internet.ip}}",
 "status": "{{random.weightedArrayElement({
 "weights" : [0.8,0.1,0.1],
 "data": ["OK","FAIL","PENDING"]
 } 
 )}}",
 "transactionTime": "{{date.now}}" 
 }
 ```
1. Click Send data to trigger the simulated transaction streaming data. 

### Verify the Glue stream job

1. Navigate to Amazon S3 console 
1. Navigate to the S3 bucket path we’ve set as Glue Stream Job target, note the folder structure of the processed data. 
1. Check the folder content using current date and time as the folder name. Verify that the streaming data has been transformed into parquet format and persisted into corresponding folders.

### Create Glue Crawler for the transformed data

1. Navigate to AWS Glue console 
1. On the AWS Glue menu, select Crawlers and click Add crawler. 
1. Enter TicketTransactionParquetDataCrawler as the name of the crawler, click Next. 
1. Leave the default to specify Data stores as Crawler source type and Crawl all folders, click Next. 
1. Choose S3 as data store and choose Specified path in my account. Click the icon next to Include path input to select the S3 bucket. Make sure you select the folder TicketTransactionStreamingData. Click Select. 
1. Select No to indicate no other data store needed, then click Next. 
1. Choose an existing IAM role, using the dropdown list to select the role with GlueLabRole in the name, click Next. 
1. As the data is partitioned to hour, so we set the crawler to run every hour to make sure newly added partition is added. Click Next. 
1. Using the dropdown list to select tickettransactiondatabase as the output database, enter parquet_ as the prefix for the table, click Next. 
1. Review the crawler configuration and click Finish to create the crawler. 
1. Once the crawler is created, select the crawler and click Run crawler to trigger the first run. 
1. When crawler job stopped, go to Glue Data catalog, under Tables, verify that parquet_tickettransactionstreamingdata table is listed. 
1. Click the parquet_tickettransactionstreamingdata table, verify that Glue has correctly identified the streaming data format while transforming source data from Json format to Parquet. 

### Trigger abnormal transaction data from KDG

1. Keep the KDG streaming data running, open another browser and launch KDG using the url you bookmarked from the lab setup, login using the user and password you provided when deploying the stack.
1. Make sure you select the appropriate region, from the dropdown list, select the TicketTransactionStreamingData as the target Kinesis stream, put Records per second as 1; click Template 2, and prepare to copy abnormal transaction data, 
1. For the record template, type in AbnormalTransaction as the payload name, and copy the template payload as below,
 ```json
 {
 "customerId": "{{random.number(50)}}",
 "transactionAmount": {{random.number(
 {
 "min":10,
 "max":150
 }
 )}},
 "sourceIp" : "221.233.116.256",
 "status": "{{random.weightedArrayElement({
 "weights" : [0.8,0.1,0.1],
 "data": ["OK","FAIL","PENDING"]
 } 
 )}}",
 "transactionTime": "{{date.now}}" 
 }
 ```
1. Click Send data to simulate abnormal transactions (1 transaction per second all from the same source IP address).

### Detect Abnormal Transactions using Ad-Hoc query from Athena

1. Navigate to AWS Athena console 
1. Make sure you select AwsDataCatalog as Data source and tickettransactiondatabase as the database, refresh to make sure the parquet_tickettransactionstreamingdata is showing in the table list. 
1. Copy the query below, this is to query the last hour counting the number of transactions by source IP. You should see there’s a large amount of transactions from the same sourceip.
 ```sql
 SELECT count(*) as numberOfTransactions, sourceip
 FROM "tickettransactiondatabase"."parquet_tickettransactionstreamingdata" 
 WHERE ingest_year='2022'
 AND cast(ingest_year as bigint)=year(now())
 AND cast(ingest_month as bigint)=month(now())
 AND cast(ingest_day as bigint)=day_of_month(now())
 AND cast(ingest_hour as bigint)=hour(now())
 GROUP BY sourceip
 Order by numberOfTransactions DESC;
 ```
1. Copy query below, this is to further check if the transaction details are from the same source IP. The query verifies that the requests are coming from the same ip but with different customer id, so it’s verified as an abnormal transaction.
 ```sql
 SELECT *
 FROM "tickettransactiondatabase"."parquet_tickettransactionstreamingdata" 
 WHERE ingest_year='2022'
 AND cast(ingest_year as bigint)=year(now())
 AND cast(ingest_month as bigint)=month(now())
 AND cast(ingest_day as bigint)=day_of_month(now())
 AND cast(ingest_hour as bigint)=hour(now())
 AND sourceip='221.233.116.256'
 limit 100;
```

## Clean Up

1. In your AWS account, navigate to the CloudFormation console.
1. On the CloudFormation console, select stack which you have created.
1. Click on Action drop down and select delete stack

As you created, Kinesis Analytics application manually, so need to delete it by selecting your analytics application . Click on Action drop down and select delete application

Delete the Glue Crawlers, Tables and Databases also manually.