## Environment Setup

In [None]:
!mkdir -p ~/.aws && cp /content/drive/MyDrive/AWS/684947_admin ~/.aws/credentials
!chmod 600 ~/.aws/credentials
!pip install -qq awscli boto3
!aws sts get-caller-identity

### Create S3 Bucket and Clone Files

In [52]:
BUCKET_NAME = "wys-glueworkshop"
%env BUCKET_NAME=$BUCKET_NAME

env: BUCKET_NAME=wys-glueworkshop


In [None]:
!aws s3 mb s3://$BUCKET_NAME
!aws s3api put-public-access-block --bucket $BUCKET_NAME \
  --public-access-block-configuration "BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true"

Note: We will use data from a public COVID-19 dataset curated by AWS. If you are interested in learning more about the dataset, read this [blog post](https://aws.amazon.com/blogs/big-data/a-public-data-lake-for-analysis-of-covid-19-data/)  for more information.

In [None]:
!curl 'https://static.us-east-1.prod.workshops.aws/public/058af3a5-469d-4a2a-9619-1190c6a970ec/static/download/glue-workshop.zip' --output glue-workshop.zip
!unzip glue-workshop.zip

!mkdir glue-workshop/library
!mkdir glue-workshop/output

!git clone https://github.com/jefftune/pycountry-convert.git
%cd pycountry-convert
!zip -r pycountry_convert.zip pycountry_convert/
%cd ..
!mv pycountry-convert/pycountry_convert.zip glue-workshop/library/

!aws s3 sync glue-workshop/code/ s3://$BUCKET_NAME/script/
!aws s3 sync glue-workshop/data/ s3://$BUCKET_NAME/input/
!aws s3 sync glue-workshop/library/ s3://$BUCKET_NAME/library/
!aws s3 sync s3://covid19-lake/rearc-covid-19-testing-data/json/states_daily/ s3://$BUCKET_NAME/input/lab5/json/

In [14]:
!aws s3 ls $BUCKET_NAME/

                           PRE input/
                           PRE library/
                           PRE script/


### Deploy CloudFormation Template

In [None]:
%%writefile NoVPC.yaml
AWSTemplateFormatVersion: 2010-09-09
Parameters:
  UniquePostfix:
    Type: String
    Default: glueworkshop
    Description: 'Enter a unique postfix value, must be all lower cases!'
  S3Bucket:
    Type: String
    Default: s3://
    Description: 'enter the S3 bucket path for workshop'
Resources:
  AWSGlueServiceRole:
    Type: 'AWS::IAM::Role'
    Properties:
      RoleName: !Join 
        - '' 
        - - AWSGlueServiceRole- 
          - !Ref UniquePostfix
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service: glue.amazonaws.com
            Action: 'sts:AssumeRole'
      ManagedPolicyArns:
        - 'arn:aws:iam::aws:policy/AmazonS3FullAccess'
        - 'arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole'
        - 'arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess'
        - 'arn:aws:iam::aws:policy/AmazonKinesisFullAccess'
      Policies:
        - PolicyName: "iam-passrole"
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action: 'iam:PassRole'
                Resource: !Sub 'arn:aws:iam::${AWS::AccountId}:role/AWSGlueServiceRole-${UniquePostfix}'
  AWSGlueServiceSageMakerNotebookRole:
    Type: 'AWS::IAM::Role'
    Properties:
      RoleName: !Join 
        - ''
        - - AWSGlueServiceSageMakerNotebookRole-
          - !Ref UniquePostfix
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service: sagemaker.amazonaws.com
            Action: 'sts:AssumeRole'
      ManagedPolicyArns:
        - 'arn:aws:iam::aws:policy/AmazonS3FullAccess'
        - 'arn:aws:iam::aws:policy/service-role/AWSGlueServiceNotebookRole'
        - 'arn:aws:iam::aws:policy/AmazonSageMakerFullAccess'
        - 'arn:aws:iam::aws:policy/CloudWatchLogsFullAccess'   
  AWSGlueDataBrewServiceRole:
    Type: 'AWS::IAM::Role'
    Properties:
      RoleName: !Join 
        - ''
        - - AWSGlueDataBrewServiceRole-
          - !Ref UniquePostfix
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service: databrew.amazonaws.com
            Action: 'sts:AssumeRole'
      ManagedPolicyArns:
        - 'arn:aws:iam::aws:policy/AmazonS3FullAccess'
        - 'arn:aws:iam::aws:policy/service-role/AWSGlueDataBrewServiceRole'
  KinesisStream: 
    Type: AWS::Kinesis::Stream 
    Properties: 
        Name: !Ref UniquePostfix  
        RetentionPeriodHours: 24 
        ShardCount: 2
  GlueCatalogDatabase:
    Type: AWS::Glue::Database
    Properties:
      CatalogId: !Ref AWS::AccountId
      DatabaseInput:
        Name: !Join 
            - '' 
            - - !Ref UniquePostfix 
              - -cloudformation 
        Description: Database to tables for workshop
  JsonStreamingTable:
    DependsOn: GlueCatalogDatabase
    Type: AWS::Glue::Table
    Properties:
      CatalogId: !Ref AWS::AccountId
      DatabaseName: !Ref GlueCatalogDatabase
      TableInput:
        Name: json-streaming-table
        Description: Define schema for streaming json
        TableType: EXTERNAL_TABLE
        Parameters: { "classification": "json" }
        StorageDescriptor:
          OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
          InputFormat: org.apache.hadoop.mapred.TextInputFormat
          Columns:
          - Name: "uuid"
            Type: bigint
          - Name: "country"
            Type: string
          - Name: "item type"
            Type: string
          - Name: "sales channel"
            Type: string  
          - Name: "order priority"
            Type: string
          - Name: "order date"
            Type: string
          - Name: "region"
            Type: string
          - Name: "ship date"
            Type: string
          - Name: "units sold"
            Type: int
          - Name: "unit price"
            Type: decimal
          - Name: "unit cost"
            Type: decimal
          - Name: "total revenue"
            Type: decimal
          - Name: "total cost"
            Type: decimal
          - Name: "total profit"
            Type: decimal             
          Parameters: {"endpointUrl": "https://kinesis.us-east-2.amazonaws.com", "streamName": !Ref UniquePostfix,"typeOfData": "kinesis"}
          SerdeInfo:
            Parameters: {"paths": "Country,Item Type,Order Date,Order Priority,Region,Sales Channel,Ship Date,Total Cost,Total Profit,Total Revenue,Unit Cost,Unit Price,Units Sold,uuid"}
            SerializationLibrary: org.openx.data.jsonserde.JsonSerDe
  JsonStaticTable:
    DependsOn: GlueCatalogDatabase
    Type: AWS::Glue::Table
    Properties:
      CatalogId: !Ref AWS::AccountId
      DatabaseName: !Ref GlueCatalogDatabase
      TableInput:
        Name: json-static-table
        Description: Define schema for static json
        TableType: EXTERNAL_TABLE
        Parameters: { "classification": "json" }
        StorageDescriptor:
          OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
          InputFormat: org.apache.hadoop.mapred.TextInputFormat
          Columns:
          - Name: "uuid"
            Type: bigint
          - Name: "country"
            Type: string
          - Name: "item type"
            Type: string
          - Name: "sales channel"
            Type: string  
          - Name: "order priority"
            Type: string
          - Name: "order date"
            Type: string
          - Name: "region"
            Type: string
          - Name: "ship date"
            Type: string
          - Name: "units sold"
            Type: int
          - Name: "unit price"
            Type: decimal
          - Name: "unit cost"
            Type: decimal
          - Name: "total revenue"
            Type: decimal
          - Name: "total cost"
            Type: decimal
          - Name: "total profit"
            Type: decimal             
          Location: !Join 
            - '' 
            - - !Ref S3Bucket 
              - input/lab4/json/
          SerdeInfo:
            Parameters: {"paths": "Country,Item Type,Order Date,Order Priority,Region,Sales Channel,Ship Date,Total Cost,Total Profit,Total Revenue,Unit Cost,Unit Price,Units Sold,uuid"}
            SerializationLibrary: org.openx.data.jsonserde.JsonSerDe
  GlueDevEndpoint:
    Type: 'AWS::Glue::DevEndpoint'
    Properties: 
      EndpointName: !Join 
        - ''
        - - GlueSageMakerNotebook-
          - !Ref UniquePostfix
      Arguments: 
        GLUE_PYTHON_VERSION: 3
      GlueVersion: 1.0
      NumberOfWorkers: 4
      WorkerType: Standard
      RoleArn: !GetAtt AWSGlueServiceRole.Arn
      ExtraPythonLibsS3Path: !Join 
            - '' 
            - - !Ref S3Bucket 
              - 'library/pycountry_convert.zip'
    DependsOn: AWSGlueServiceRole
Outputs:
  EndpointName:
    Value: !Ref GlueDevEndpoint
    Description: Endpoint created for Glue Workshop Lab.

In [None]:
!aws cloudformation create-stack --stack-name glueworkshop \
            --template-body file://NoVPC.yaml \
            --capabilities CAPABILITY_NAMED_IAM \
            --parameters \
            ParameterKey=UniquePostfix,ParameterValue=glueworkshop \
            ParameterKey=S3Bucket,ParameterValue=s3://$BUCKET_NAME/

## Glue Data Catalog

We will configure an AWS Glue crawler to scan and create metadata definitions in the Glue Data Catalog.

### Create Data Catalog Database

In [16]:
!head glue-workshop/data/lab1/csv/sample.csv

uuid,Country,Item Type,Sales Channel,Order Priority,Order Date,Region,Ship Date,Units Sold,Unit Price,Unit Cost,Total Revenue,Total Cost,Total Profit
535113847,Azerbaijan,Snacks,Online,C,10/8/14,Middle East and North Africa,10/23/14,934,152.58,97.44,142509.72,91008.96,51500.76
874708545,Panama,Cosmetics,Offline,L,2/22/15,Central America and the Caribbean,2/27/15,4551,437.2,263.33,1989697.2,1198414.83,791282.37
854349935,Sao Tome and Principe,Fruits,Offline,M,12/9/15,Sub-Saharan Africa,1/18/16,9986,9.33,6.92,93169.38,69103.12,24066.26
892836844,Sao Tome and Principe,Personal Care,Online,M,9/17/14,Sub-Saharan Africa,10/12/14,9118,81.73,56.67,745214.14,516717.06,228497.08
129280602,Belize,Household,Offline,H,2/4/10,Central America and the Caribbean,3/5/10,5858,668.27,502.54,3914725.66,2943879.32,970846.34
473105037,Denmark,Clothes,Online,C,2/20/13,Europe,2/28/13,1149,109.28,35.84,125562.72,41180.16,84382.56
754046475,Germany,Cosmetics,Offline,M,3/31/13,Europe,5/3/13,7964,437.2,263.

Go to the AWS Glue console , click Databases on the left. You should see a database with name glueworkshop-cloudformation. This was created by the CloudFormation template we launched during workshop setup and contains two pre-defined tables that we will use later in Glue streaming lab.

Create another database with name glueworkshop by clicking Add Database and then clicking Create.

### Create Data Crawler

We will create 2 crawlers to crawl CSV and JSON folders.

**Add Crawler for CSV folder**

1. Click Crawlers on the left.
1. Click Add Crawler.
1. Provide a name for the new Crawler such as lab1-csv, click Next.
1. On the Crawler source type page keep default values and click Next.
1. On the Data Store page, under Include path, pick s3://${BUCKET_NAME}/input/lab1/csv/. Make sure you pick the csv folder rather than the file inside the folder, and then click Next.
1. Click Next on the Add another data store page.
1. Click Choose an existing IAM role and pick the role AWSGlueServiceRole-glueworkshop then click Next.
1. Click Next on the "Create a schedule for this crawler" page.
1. On the output page choose glueworkshop from the Database dropdown list then click Next.
1. Click Finish.

**Add Crawler for JSON folder**

1. Click Crawlers on the left.
2. Click Add Crawler.
3. Provide a name for the new Crawler such as covid-testing, click Next.
4. On the Crawler source type page keep default values and click Next.
5. On the Data Store page, under Include path, pick s3://${BUCKET_NAME}/input/lab5/json/. Make sure you pick the json folder rather than the file inside the folder, and then click Next.
6. Click Next on the Add another data store page.
7. Click Choose an existing IAM role and pick the role AWSGlueServiceRole-glueworkshop then click Next.
8. Click Next on the "Create a schedule for this crawler" page.
9. On the output page choose glueworkshop from the Database dropdown list then click Next.
10. Click Finish.

### Crawl Data in Catalog Table

- Once we have created both crawlers, click the check box next to each and choose to run the crawler by clicking the Run crawler button at the top of the page. It will take a minute or two for each crawler to run.
- In our use case, CSV and JSON classifiers will be used to scan the files.
- Once the crawlers finish running, you can see the results by clicking Tables on the left of the page. You should see 2 new tables that were created by the crawlers - csv and json.
- Click on table csv and you will see the table schema automatically generated by the crawler based on the csv file.
- Click on table json and you will see the table schema automatically generated by the crawler based on the json file.



## Setup Glue development environment

AWS Glue provides multiple options to develop and test Spark code. Data engineers and data scientists can use tools of their choice to author Glue ETL scripts before deploying them to production. Data scientists can continue to work with Sagemaker notebooks connected to Glue Dev Endpoint, others can use Glue Job Notebooks to quickly launch and use jupyter-based fully-managed notebooks directly in browser. If you prefer to work locally, you can use Glue interactive sessions.

### Use Glue Studio Notebook

AWS Glue Studio Job Notebooks allows you to interactively author extract-transform-and-load (ETL) jobs in a notebook interface based on Jupyter Notebooks. AWS Glue Studio Job Notebooks requires minimal setup so developers can get started quickly, and feature one-click conversion of notebooks into AWS Glue data integration jobs. Notebooks also support live data integration, fast startup times, and built-in cost management. In this module you will learn how to create, configure and use AWS Glue Studio Job Notebooks to develop code that will be used as an independent Glue Job.

**Create Glue Job Notebook**

1. Go to Glue Studio Jobs  and Choose “Jupyter Notebook” among the other options to Create Job.
2. Choose “Create a new notebook from scratch”. Then click Create button.
3. Give your notebook a name and choose Glue Service IAM role and click Start notebook job and wait for notebook to load.
4. Before we start coding let’s change some of the default configurations that Glue Notebook initialized with. Insert three new cells after the Available Magics markdown cell.:
    1. Run `%idle_timeout 30` to ensure your session will automatically stop after 30 minutes of inactivity so you dont need to pay for idle resources.
    2. Run `%number_of_workers 2` to reduce number of active workers.
    3. Run `%extra_py_files "s3://${BUCKET_NAME}/library/pycountry_convert.zip"` - this will load 3rd party python library that we will use in next modules.
5. Now it is time to start a Glue Session. Click on the cell with the Glue Initialization code and press Run the Selected cells control.
6. Wait until you see a message: Session <some long id> has been created. Now insert new cell and run %status magic to review Glue Notebook configurations and ensure that your changes were applied.
7. Now you are ready to start developing some Spark code.

## Glue ETL Job

We will see how you can use the development environment created to create and test ETL code. We will package and deploy the code we created to AWS Glue and execute it as a Glue job. Then we will use Glue Triggers and a Workflow to manage job executions.

### Develop Code in Notebook

We will now write some PySpark code. For each step you will need to copy the code block into a notebook cell and then click Run.

The first code we will write is some boiler-plate imports that will generally be included in the start of every Spark/Glue job and then an import statement for the 3rd party library.

In [None]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType, StringType
from pyspark import SparkContext
from pyspark.sql import SQLContext

from datetime import datetime
from pycountry_convert import (
    convert_country_alpha2_to_country_name,
    convert_country_alpha2_to_continent,
    convert_country_name_to_country_alpha2,
    convert_country_alpha3_to_country_alpha2,
)

We will define a UDF (user defined function) to use for processing a Spark dataframe. UDFs allow a developer to extend the standard Spark functionality using Python code. To do that your code needs to be in the form of a UDF lambda. The code below creates a Spark UDF udf_get_country_code2 to convert a country name into a two-letter code.

In [None]:
def get_country_code2(country_name):
    country_code2 = 'US'
    try:
        country_code2 = convert_country_name_to_country_alpha2(country_name)
    except KeyError:
        country_code2 = ''
    return country_code2


udf_get_country_code2 = udf(lambda z: get_country_code2(z), StringType())

Load the data from S3 into a Spark dataframe (named sparkDF) then check the schema of the Spark DataFrame.

In [None]:
sparkDF = spark.read.load("s3://${BUCKET_NAME}/input/lab2/sample.csv", 
                          format="csv", 
                          sep=",", 
                          inferSchema="true",
                          header="true")

Now let's look at the data loaded by Spark. Compare this with the data you examined earlier and see if the schema inferred by Spark is the same as you what you saw earlier.

In [None]:
sparkDF.printSchema()

Next we will create a new dataframe that includes a column created using the UDF we created previously.

In [None]:
new_df = sparkDF.withColumn('country_code_2', udf_get_country_code2(col("Country")))
new_df.printSchema()

Let's take a look at the data in this new dataframe - notice the new column country_code_2. This contains two-letter country codes that were determined based on the Country column.

In [None]:
new_df.show(10)

So far, we have been running standard Spark code. Now, we will try some Glue-flavored code. Remember the Glue Data Catalog tables we created earlier? We will now load them into a Glue dynamic frame. After the data is loaded into a Glue dynamic frame, compare the schema it presented with the schema stored in the Glue Data Catalog table.

Glue dynamic frames differ from Spark dataframes because they have a flexible schema definition - hence the name dynamic. This enables each record to have a different schema which is computed on-the-fly. This is especially useful in handling messy data.

Notice here we don't need to specify an S3 location - this is because the Glue Data Catalog knows where the data lives thanks to the crawler we configured and ran earlier.

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

glueContext = GlueContext(SparkContext.getOrCreate())

dynaFrame = glueContext.create_dynamic_frame.from_catalog(database="glueworkshop", table_name="csv")
dynaFrame.printSchema()

Just as with the Spark dataframe, we can view the data in the Glue dynamic frame by calling the toDF function on it and then using the standard Spark show function.

In [None]:
dynaFrame.toDF().show(10)

### Deploy Glue ETL Job

Now we will package together the code snippets we have been testing and exploring on their own and create a Spark script for Glue. The code below is the combined and cleaned-up code from previous sections that we had run in the notebook. It is standard Spark code - not using anything Glue-specific except for the imports at the beginning.

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql import SQLContext
from pyspark.context import SparkContext

from datetime import datetime
from pycountry_convert import (
    convert_country_alpha2_to_country_name,
    convert_country_alpha2_to_continent,
    convert_country_name_to_country_alpha2,
    convert_country_alpha3_to_country_alpha2,
)


def get_country_code2(country_name):
    country_code2 = 'US'
    try:
        country_code2 = convert_country_name_to_country_alpha2(country_name)
    except KeyError:
        country_code2 = ''
    return country_code2


udf_get_country_code2 = udf(lambda z: get_country_code2(z), StringType())

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME', 's3_bucket'])

s3_bucket = args['s3_bucket']
job_time_string = datetime.now().strftime("%Y%m%d%H%M%S")

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

df = spark.read.load(s3_bucket + "input/lab2/sample.csv", 
                     format="csv", 
                     sep=",", 
                     inferSchema="true", 
                     header="true")
new_df = df.withColumn('country_code_2', udf_get_country_code2(col("Country")))
new_df.write.csv(s3_bucket + "/output/lab3/notebook/" + job_time_string + "/")

job.commit()

Run the following command to create a Glue ETL job glueworkshop-lab3-etl-job with the same Spark code we created earlier which is stored in s3://${BUCKET_NAME}/script/lab3/spark.py.

In [54]:
!aws glue create-job \
    --name glueworkshop-lab3-etl-job \
    --role AWSGlueServiceRole-glueworkshop \
    --command "Name=glueetl,ScriptLocation=s3://${BUCKET_NAME}/script/lab3/spark.py,PythonVersion=3" \
    --glue-version '2.0' \
    --default-arguments "{\"--extra-py-files\": \"s3://${BUCKET_NAME}/library/pycountry_convert.zip\", \
        \"--s3_bucket\": \"s3://${BUCKET_NAME}/\" }"

{
    "Name": "glueworkshop-lab3-etl-job"
}


### Run Glue ETL Job

1. Go to the Glue Job console and explore the Job you created. Explore the Script and Job details section.
2. Click on the Run button to run the job.
3. Go to the Runs section to track the Job. You can check the job execution status and log by clicking on the highlighted Log hyperlink. It will bring you to the CloudWatch console and show a detailed log. Wait for the Job to finish.
4. Once the job finishes, you can go to the S3 console  and to your s3://${BUCKET_NAME}/output/lab3/ folder. You should see a new folder created with recent timestamp. You can download these files to your local environment using following command: `aws s3 cp s3://${BUCKET_NAME}/output/ glue-workshop/output --recursive`

In [55]:
!aws s3 sync s3://wys-glueworkshop/output/ glue-workshop/output

download: s3://wys-glueworkshop/output/lab3/20221124062626/part-00000-60212af7-cbbc-4187-9b7c-d034d2241dd2-c000.csv to glue-workshop/output/lab3/20221124062626/part-00000-60212af7-cbbc-4187-9b7c-d034d2241dd2-c000.csv
download: s3://wys-glueworkshop/output/lab3/20221124062626/part-00002-60212af7-cbbc-4187-9b7c-d034d2241dd2-c000.csv to glue-workshop/output/lab3/20221124062626/part-00002-60212af7-cbbc-4187-9b7c-d034d2241dd2-c000.csv
download: s3://wys-glueworkshop/output/lab3/20221124062626/part-00001-60212af7-cbbc-4187-9b7c-d034d2241dd2-c000.csv to glue-workshop/output/lab3/20221124062626/part-00001-60212af7-cbbc-4187-9b7c-d034d2241dd2-c000.csv


### Create Glue Trigger

Follow the steps below to create a scheduled trigger to run the ETL job every hour.

1. Click Triggers on the left.
2. Click Add Trigger.
3. In Set up your trigger's properties, set the trigger name as glueworkshop-lab3-etl-job-trigger, set Trigger type as Schedule, set Frequency as Hourly and Start Minute as 00, then click Next.
4. In Choose jobs to trigger, click the highlighted Add next to glueworkshop-lab3-etl-job and it will be added under Jobs to start list, click Next.
5. Click Finish.
6. Now you have a Glue trigger which, when activated, will kick off the attached jobs on a regular schedule. The trigger we just created is not activated on creation. If you want to use it to trigger execution every hour, check the checkbox next to the job name, and choose Activate trigger in Action ▼ dropdown. Now the jobs associated with the trigger will run every hour.

## Glue Streaming Job

### Develop Glue Streaming Job in Notebook

Before creating a streaming ETL job, you must manually create a Data Catalog table that specifies the source data stream properties, including the data schema. This table is used as the data source for the streaming ETL job. We will use the Data Catalog table json-streaming-table created earlier by CloudFormation. This table's data source is AWS Kinesis and it has the schema definition of the JSON data we will send through the stream.

Go to the AWS Kinesis console  and click Data streams on the left to open the UI for Kinesis Data Streams. You should see a data stream with name glueworkshop which was created by CloudFormation.

During the streaming processing, we will use a lookup table to convert a country name from the full name to a 2-letter country code. The lookup table data is stored in S3 at s3://${BUCKET_NAME}/input/lab4/country_lookup/.

Next, we will develop the code for the streaming job. Glue streaming is micro-batch based and the streaming job processes incoming data using a Tumbling Window method. All data inside a given window will be processed by a batch function. Inside the Glue Streaming job, the invocation of the tumbling window's function is shown below. The window functions is named batch_function and takes in the micro-batch dataframe, processing it at the window interval (20 seconds in this case).

In [None]:
glueContext.forEachBatch(frame=sourceData,
                         batch_function=processBatch,
                         options={"windowSize": "60 seconds", "checkpointLocation": checkpoint_location})

Our goal is to use this Jupyter Notebook to develop and test a batch_function named processBatch. This function will process a given dataframe with the same schema as the streaming data inside our development environment.

Copy the following code to notebook cells.

Set up the environment and variables for the test.

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType, StringType
from pyspark import SparkContext
from pyspark.sql import SQLContext
from datetime import datetime

glueContext = GlueContext(SparkContext.getOrCreate())
s3_bucket = "s3://${BUCKET_NAME}"
output_path = s3_bucket + "/output/lab4/notebook/"
job_time_string = datetime.now().strftime("%Y%m%d%H%M%S")
s3_target = output_path + job_time_string

Load the lookup dataframe from the S3 folder.

In [None]:
country_lookup_frame = glueContext.create_dynamic_frame.from_options(
                            format_options = {"withHeader":True, "separator":',', "quoteChar":"\""},
                            connection_type = "s3",
                            format = "csv",
                            connection_options = {"paths": [s3_bucket + "/input/lab4/country_lookup/"], "recurse":True}, 
                            transformation_ctx = "country_lookup_frame")

Here is the batch function body where we do type conversion and a look-up transformation on the incoming data.

In [None]:
def processBatch(data_frame, batchId):
    if (data_frame.count() > 0):
        dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame")
        apply_mapping = ApplyMapping.apply(frame=dynamic_frame, mappings=[
            ("uuid", "string", "uuid", "bigint"),
            ("country", "string", "country", "string"),
            ("item type", "string", "item type", "string"),
            ("sales channel", "string", "sales channel", "string"),
            ("order priority", "string", "order priority", "string"),
            ("order date", "string", "order date", "string"),
            ("region", "string", "region", "string"),
            ("ship date", "string", "ship date", "string"),
            ("units sold", "int", "units sold", "int"),
            ("unit price", "string", "unit price", "decimal"),
            ("unit cost", "string", "unit cost", "decimal"),
            ("total revenue", "string", "total revenue", "decimal"),
            ("total cost", "string", "total cost", "decimal"),
            ("total profit", "string", "total profit", "decimal")],
                                           transformation_ctx="apply_mapping")

        final_frame = Join.apply(apply_mapping, country_lookup_frame, 'country', 'CountryName').drop_fields(
            ['CountryName', 'country', 'unit price', 'unit cost', 'total revenue', 'total cost', 'total profit'])

        s3sink = glueContext.write_dynamic_frame.from_options(frame=final_frame,
                                                              connection_type="s3",
                                                              connection_options={"path": s3_target},
                                                              format="csv",
                                                              transformation_ctx="s3sink")

Now we will load some test data to test the batch function.

In [None]:
dynaFrame = glueContext.create_dynamic_frame.from_catalog(database="glueworkshop-cloudformation", 
                                                          table_name="json-static-table")
processBatch(dynaFrame.toDF(), "12")

Check the output path of s3://${BUCKET_NAME}/output/lab4/notebook/ and you should see some new folders which are generated by the test script. Copy the following to your terminal. You should see a new folder with a recent timestamp. When you check the folder you should see some new files created by the test script.

In [None]:
aws s3 cp s3://${BUCKET_NAME}/output/ ~/environment/glue-workshop/output --recursive

### Deploy Glue Streaming Job

In [None]:
import sys
from datetime import datetime
import boto3
import base64
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame

args = getResolvedOptions(sys.argv, ['JOB_NAME', 's3_bucket'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# S3 sink locations
output_path = args['s3_bucket'] + "/output/lab4/"
job_time_string = datetime.now().strftime("%Y%m%d%H%M%S")
s3_target = output_path + job_time_string
checkpoint_location = output_path + "checkpoint/"
temp_path = output_path + "temp/"

country_lookup_path = args['s3_bucket'] + "/input/lab4/country_lookup/"
country_lookup_frame = glueContext.create_dynamic_frame.from_options( \
                            format_options = {"withHeader"\:True, "separator":",", "quoteChar":"\""}, \
                            connection_type = "s3", \
                            format = "csv", \
                            connection_options = {"paths": [country_lookup_path], "recurse"\:True}, \
                            transformation_ctx = "country_lookup_frame")

def processBatch(data_frame, batchId):
    if (data_frame.count() > 0):
        dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame")
        apply_mapping = ApplyMapping.apply(frame = dynamic_frame, mappings = [ \
            ("uuid", "string", "uuid", "bigint"), \
            ("country", "string", "country", "string"), \
            ("item type", "string", "item type", "string"), \
            ("sales channel", "string", "sales channel", "string"), \
            ("order priority", "string", "order priority", "string"), \
            ("order date", "string", "order date", "string"), \
            ("region", "string", "region", "string"), \
            ("ship date", "string", "ship date", "string"), \
            ("units sold", "int", "units sold", "int"), \
            ("unit price", "string", "unit price", "decimal"), \
            ("unit cost", "string", "unit cost", "decimal"), \
            ("total revenue", "string", "total revenue", "decimal"), \
            ("total cost", "string", "total cost", "decimal"), \
            ("total profit", "string", "total profit", "decimal")],\
            transformation_ctx = "apply_mapping")

        final_frame = Join.apply(apply_mapping, country_lookup_frame, 'country', 'CountryName').drop_fields( \
                ['CountryName', 'country', 'unit price', 'unit cost', 'total revenue', 'total cost', 'total profit'])

        s3sink = glueContext.write_dynamic_frame.from_options(  frame = final_frame, \
                                                                connection_type = "s3", \
                                                                connection_options = {"path": s3_target}, \
                                                                format = "csv", \
                                                                transformation_ctx = "s3sink")

# Read from Kinesis Data Stream from catalog table
sourceData = glueContext.create_data_frame.from_catalog( \
    database = "glueworkshop-cloudformation", \
    table_name = "json-streaming-table", \
    transformation_ctx = "datasource0", \
    additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "true"})

glueContext.forEachBatch(frame = sourceData, \
                        batch_function = processBatch, \
                        options = {"windowSize": "60 seconds", "checkpointLocation": checkpoint_location})
job.commit()

Run the following command in the Cloud9 terminal to create a Glue Streaming job with the name glueworkshop_lab4_glue_streaming and the Spark code we created earlier. This code is stored in s3://${BUCKET_NAME}/script/lab4/streaming.py.

In [56]:
!aws glue create-job \
    --name glueworkshop_lab4_glue_streaming \
    --role AWSGlueServiceRole-glueworkshop  \
    --command "Name=gluestreaming,ScriptLocation=s3://${BUCKET_NAME}/script/lab4/streaming.py,PythonVersion=3" \
    --glue-version "2.0" \
    --default-arguments "{\"--s3_bucket\": \"s3://${BUCKET_NAME}/\" }"

{
    "Name": "glueworkshop_lab4_glue_streaming"
}


### Run Glue Streaming Job

1. Go to the AWS Glue Console  and click on Jobs on the left. You should see the newly created Glue streaming job with name glueworkshop_lab4_glue_streaming.
2. Click the checkbox next to the job name and select Edit job from the Action ▼ dropdown menu to examine the details of the steaming job configuration. Select Edit script from the Action ▼ dropdown menu to examine the Python script. It is the same code as shown earlier.
3. When you are done exploring the job details, click Run job from the Action ▼ dropdown menu. Click the checkbox next to the job. This will display the job detail pane with 4 tabs. You can check the job execution log by clicking on the highlighted Log hyperlink. It will bring you to the CloudWatch console and display a detailed log.
4. Once the streaming job is started, we will use a script to publish messages into Kinesis Stream. Use the following script to add data to the Kinesis data stream. You will see the script add a number of JSON messages into the Kinesis stream. You can use ^C (Ctrl-C) to stop the script.
    ```sh
    cd glue-workshop
    python code/lab4/PutRecord_Kinesis.py
    ```
5. Once the job start running, wait a few minutes for the streaming data to be processed by the window function. Now go to the S3 console  and to your s3://${BUCKET_NAME}/output/lab4/ folder. You should see a new folder created with a recent timestamp. That is the output from the Glue ETL streaming job.
6. You can download the files to your terminal. As long as there is new data coming from the Kinesis stream, there will be new output files being added to the S3 folder every 60 seconds. This is because the window size of the streaming job is 60 seconds, indicating it will deliver on that schedule.
    ```sh
    aws s3 sync s3://${BUCKET_NAME}/output/ output
    ```
7. Once you are done exploring the streaming job details and you have checked the result files, you can stop the streaming job by clicking Stop job run from the Action ▼ dropdown menu, selecting the running job from the list and clicking Stop job run. And go to the terminal that is running the script to add data to Kinesis, stop the script that is publishing messages to the Kinesis Data Stream by pressing ^C (Ctrl-C). The Glue streaming job will keep running if you don't stop it!

## Glue Databrew

We will use a dataset from a public data lake for COVID-19 research and development hosted by AWS.

### Glue Databrew Dataset

We will first create a DataBrew Dataset using a Glue crawler to explore the COVID-19 data stored in a Data Catalog table. In DataBrew, Dataset simply means a set of data—rows or records that are divided into columns or fields.

When you create a DataBrew project, you connect to or upload data that you want to transform or prepare. DataBrew can work with data from any source, imported from formatted files, and it connects directly to a growing list of data stores. In DataBrew, a Dataset is a read-only connection to your data.

DataBrew collects a set of descriptive metadata to refer to the data. No actual data can be altered or stored by DataBrew. For simplicity, we use the word dataset to refer to both the actual dataset and the metadata that DataBrew uses.

Follow the steps below to create a new DataBrew dataset

1. Click the DATASETS icon on the left.
2. Click the Connect new dataset button in the middle.
3. Under New dataset details set Dataset name to covid-testing.
4. Under Connect to new dataset click Amazon S3 tables. Then, under AWS Glue Data Catalog on the left, click glueworkshop, and click the radio button next to json.
5. Click Create dataset.

Once the new dataset is created, we will run the profiling job on the new dataset. When you profile your data, DataBrew creates a report called a data profile. This summary tells you about the existing shape of your data, including the context of the content, the structure of the data, and its relationships. You can make a data profile for any dataset by running a data profile job.

6. Click the checkbox next to the dataset.
8. Click ▶ Run data profile on the top.
9. Click Create profile job to open Create job page.
10. Under Job details, set Job name to covid-testing-profile-job.
11. Under Job run sample select Full dataset.
12. Under Job output settings set the S3 location to s3://${BUCKET_NAME}/output/lab5/profile/
13. Under Permission, select AWSGlueDataBrewServiceRole-glueworkshop for Role name.
14. Click Create and run job.
15. The profile job takes about 5 minutes. Once the job finishes, go to the covid-testing dataset and click the Data profile overview tab. Here you can explore the data profile information created by the profiling job.
16. From the correlation coefficient, you can see positive has high correlation to death and hospitalization.
17. At the bottom of the tab, you can see detailed information about each column, including data type, cardinality, data quality, distribution, min/max value and others. You can also click on each column to get more detailed statistics about an individual column.

### Glue DataBrew Project

We will create a DataBrew project and a recipe to transform the dataset we created earlier.

1. Click Project on the left.
2. Click Create project.
3. Under Project details set Project name to covid-testing.
4. Under Select a dataset choose My datasets then select the covid-testing dataset.
5. Under Permission select AWSGlueDataBrewServiceRole-glueworkshop for Role name from the dropdown list.
6. Click Create project.

Once the project is created, you will have a work area with the sample data displayed in the data grid. You can also check the schema and profile of the data by clicking SCHEMA and PROFILE in the upper right-corner of the grid. On the right of the screen is the recipe work area.

Next, we are going to create a simple recipe using built-in transformations.

1. You should see a Recipe work area to the right of the data grid. If not, click the RECIPE icon in the upper-right corner.
1. Click the Add step button in the recipe work area. You will see a long list of transformations provided by DataBrew that you can use to build your recipe. You can take a look to see what transformations are provided.
1. Click Delete in the COLUMN ACTIONS group. In the Source columns list select:
    ```
    dateChecked
    death
    deathIncrease
    fips
    hash
    hospitalized
    hospitalizedIncrease
    negative
    negativeIncrease
    pending
    positive
    total
    totalTestResults
    ```
1. Click Apply. You'll see the selected columns are gone in the sample data grid as a result.
1. Click the Add step icon on the upper-right corner of the recipe area, then the Add step configuration will show up again on the right.
1. Select DATEFORMAT in DATE FUNCTION group, select Source column in Values using, select date in Source column, select yyyy-mm-dd in Data format, name destination column date_formated, click Apply. A new string column with name date_formated is added in the grid.
1. Click the Add step icon, select Change type in COLUMN ACTION group, select date_formated in Source column, select date in Change type to, then click Apply. Notice the datatype of column date_formated changed from string to date.
1. Click the Add step icon, select By Condition in FILTER group, select state in Source column, select Is exactly in Filter condition, de-select all states values, then choose NY and CA in the list and click Apply. Notice sample data in the grid now only contains data from those 2 states.
1. Click Reorder steps icon on the upper-right corner of the recipe area next to Add step icon. A new pop-up box will show all steps of the current recipe. Move the Filter values by state from the 4th step to the 2nd step then click Next. The validation will check the change. Once validation is finished, click Done. Notice in the recipe work area the order of the steps has changed.
1. Click the Add step icon, select Delete in COLUMN ACTIONS group, select date in Source columns, click Apply.
1. Click the Add step icon, select DIVIDE in MATH FUNCTION group, select Source columns in Values using. First check positiveIncrease and then totalTestResultsIncrease in Source columns. Set positiveRate as the name of Destination column then click Apply. A new double column with name positiveRate is added in the grid. Note that the sequence you check the source columns matters in divide function! The new positiveRate column should have values between 0 and 1. If you see values greater than 1 for the new column, please click Edit next to the step and redo it.
1. Click the Add step icon, select MULTIPLY in MATH FUNCTION group, select Source column and value in Values using list, select positiverate in Source column, set Custom value to 100, set Destination column name to positivePercentage then click Apply. A new double column with name positivePercentage is added in the grid.
1. Click the Add step icon, selete Delete in COLUMN ACTIONS group then in Source columns list select:
    ```
    positiveIncrease
    totalTestResultsIncrease
    positiveRate
    ```
1. Click Apply.
1. Click the Add step icon, select Pivot - rows to columns in PIVOT group. Select state in Pivot column list, select mean and positivePercentage in Pivot values, click Update preview to see what the pivot table will look like then click Finish. A new table structure will be created based on the pivot operation with 2 new columns named state_CA_positivePercentage_mean and state_NY_positivePercentage_mean


### Manage Glue DataBrew Recipe

As you proceed with developing your recipe, you can save your work by publishing the recipe. DataBrew maintains a list of published versions for your recipe. You can use any published version in a recipe job to transform your dataset. You can also download a copy of the recipe steps so you can reuse the recipe in other projects or other transformations.

1. Click the Publish icon at the upper right corner of you recipe work area to publish your recipe.
2. Under Version description put in Note `Convert COVID testing data to time series of positive percentage in NY and CA`.
3. Click the Publish button.
4. Click the RECIPES icon on the left. You will see the recipe you just published.
5. Click on the recipe name, covid-testing-recipe. You will see the details of the recipe.
6. While still in recipe details, click the Action ▼ dropdown list and select Download as YMAL to store the recipe locally as a yaml file. This allows you to export and import recipes for reuse in other projects.
    ```yaml
    - Action:
        Operation: DELETE
        Parameters:
        sourceColumns: >-
            ["dateChecked","death","deathIncrease","fips","hash","hospitalized","negative","negativeIncrease","pending","positive","total","totalTestResults"]
    - Action:
        Operation: REMOVE_VALUES
        Parameters:
        sourceColumn: state
    ConditionExpressions:
        - Condition: IS_NOT
        Value: '["NY","CA"]'
        TargetColumn: state
    - Action:
        Operation: DATE_FORMAT
        Parameters:
        dateTimeFormat: yyyy-mm-dd
        functionStepType: DATE_FORMAT
        sourceColumn: date
        targetColumn: date_formated
    - Action:
        Operation: CHANGE_DATA_TYPE
        Parameters:
        columnDataType: date
        replaceType: REPLACE_WITH_NULL
        sourceColumn: date_formated
    - Action:
        Operation: DELETE
        Parameters:
        sourceColumns: '["date"]'
    - Action:
        Operation: DIVIDE
        Parameters:
        functionStepType: DIVIDE
        sourceColumn1: positiveIncrease
        sourceColumn2: totalTestResultsIncrease
        targetColumn: positiveRate
    - Action:
        Operation: MULTIPLY
        Parameters:
        functionStepType: MULTIPLY
        sourceColumn1: positiveRate
        targetColumn: positivePercentage
        value2: '100'
    - Action:
        Operation: DELETE
        Parameters:
        sourceColumns: '["totalTestResultsIncrease","positiveRate","positiveIncrease"]'
    - Action:
        Operation: PIVOT
        Parameters:
        aggregateFunction: MEAN
        sourceColumn: state
        valueColumn: positivePercentage
    ```
7. Click the RECIPES icon on the left and click the Published ▼ dropdown menu. Select All recipes. You should see both the published and working version of your recipes.



### Run Glue DataBrew Job

**Create a new DataBrew job**

1. Click JOBS on the left.
1. Click Create job button.
1. Under Job details set the Job name to covid-testing-recipe-job.
1. Under Job type select Create a recipe job.
1. Under Job input select Run on Project. In Select a project select covid-testing from the list.
1. Under Job output settings set S3 location to s3://${BUCKET_NAME}/output/lab5/csv/.
1. Under Permission select AWSGlueDataBrewServiceRole-glueworkshop in Role name.
1. Click Create and run job.
1. Once the job has been created, click the JOB icon on the left. Under the Recipe jobs tab you will see a new job with name covid-testing-job in the Running state. Wait for the job to finish.
1. Once the job finishes, click on the job name. You should see one succeeded job run under Job run history tab. Click the Data linage tab to see the data linage graph for the job.
1. You should see a new folder in S3 under s3://\${BUCKET_NAME}/output/lab5/csv/ which contains the output of the job. Use the command below to copy the output files locally and explore the output.

In [62]:
!aws s3 sync s3://$BUCKET_NAME/output/ glue-workshop/output

Completed 199.1 KiB/199.1 KiB (239.2 KiB/s) with 1 file(s) remainingdownload: s3://wys-glueworkshop/output/lab5/profile/covid-testing_9c3921a56e8b9e67b5dd03c847f957d1551d3fa280d9969dd2d05d476f3b6e19.json to glue-workshop/output/lab5/profile/covid-testing_9c3921a56e8b9e67b5dd03c847f957d1551d3fa280d9969dd2d05d476f3b6e19.json


## Glue Studio

### Create Glue Studio Job

We will be creating a new Glue Studio job. Start by opening the menu on the left and clicking Jobs.

1. Under Create job, select Blank graph.
1. Click Create.
1. Rename the job to glueworkshop-lab6-etl-job. Now you have a blank Glue Studio visual job editor. On the top of the editor are the tabs for different configurations.
1. Click Script tab, you should see an empty shell of Glue ETL script. As we add new steps in the Visual editor the script will be updated automatically. You will not ba able to edit the script inside Glue Studio.
1. Click Job Details tab you can see all job configurations.
1. Under IAM role select AWSGlueServiceRole-glueworkshop.
1. Under Job bookmark select Disable. Note: In a production environment, you probably want to enable the bookmark. We will disable it for this lab so we can reuse the test dataset.
1. You don't need to change any other settings here, but you should take some time to explore what settings are available in this tab. When you are done exploring, click Save on the upper right to save the changed settings.
1. Click the Visual tab again to go back to visual editor. You should see 3 dropdown buttons: Source, Transform, and Target. Click Transform. You will notice there are a limited number of transformations compared to what Glue DataBrew offers as we have seen in a previous lab. This is because Glue Studio is designed to be used by developers who write Apache Spark code but want to leverage Glue Studio for job orchestration and monitoring. In a later section of this lab, we will demonstrate how to develop custom code in Glue Studio.
1. Click Source dropdown icon and select S3,
    - On the right side, under Data source properties - S3 tab
    - Under S3 source type, select Data Catalog table
    - Under Database select glueworkshop
    - Under Table select json
    - This is the same COVID-19 dataset we used earlier - you should be familiar with the schema.
1. Click Transform dropdown icon and select Custom transform. Copy the code below into code block on the right.
    ```py
    def DeleteFields (glueContext, dfc) -> DynamicFrameCollection:
    sparkDF = dfc.select(list(dfc.keys())[0]).toDF()
    sparkDF.createOrReplaceTempView("covidtesting")

    df = spark.sql("select  date, \
                            state , \
                            positiveIncrease ,  \
                            totalTestResultsIncrease \
                    from covidtesting")

    dyf = DynamicFrame.fromDF(df, glueContext, "results")
    return DynamicFrameCollection({"CustomTransform0": dyf}, glueContext)
    ```
1. Click the Output Schema tab on the right then click Edit. Leave only the following column in the output schema then click Apply.
    ```
    date
    state
    positiveincrease
    totaltestresultsincrease
    ```
1. Click Transform dropdown and select SelectFromCollection. Make sure under Transform tab that the Frame index value is 0.
1. Click the Target dropdown and select S3. Under the Data target properties - S3 tab set the S3 Target Location value to s3://${BUCKET_NAME}/output/lab6/json/temp1/ then replace \${BUCKET_NAME} with your own S3 bucket name.
1. Click the Save button in the upper right corner then click the Run button next to it.
1. You can use the command below to copy the output files to your environment and explore the content. You will see the output JSON file only contains 4 fields - date, state, positiveIncrease and totalTestResultsIncrease.

In [65]:
!aws s3 sync s3://$BUCKET_NAME/output/ glue-workshop/output

Completed 256.0 KiB/1.8 MiB (339.0 KiB/s) with 1 file(s) remainingCompleted 512.0 KiB/1.8 MiB (607.6 KiB/s) with 1 file(s) remainingCompleted 768.0 KiB/1.8 MiB (829.3 KiB/s) with 1 file(s) remainingCompleted 1.0 MiB/1.8 MiB (1.1 MiB/s) with 1 file(s) remaining    Completed 1.2 MiB/1.8 MiB (1.2 MiB/s) with 1 file(s) remaining    Completed 1.5 MiB/1.8 MiB (1.5 MiB/s) with 1 file(s) remaining    Completed 1.8 MiB/1.8 MiB (1.7 MiB/s) with 1 file(s) remaining    Completed 1.8 MiB/1.8 MiB (1.8 MiB/s) with 1 file(s) remaining    download: s3://wys-glueworkshop/output/lab6/json/temp1/run-1669278265659-part-r-00000 to glue-workshop/output/lab6/json/temp1/run-1669278265659-part-r-00000


### Glue Studio Custom Transformation

We used a Custom Transformation Node in last section of the lab. Here we will have a deep dive into the Custom Transformation Node and provide guidelines on how to develop scripts for Custom Transformation Node.

A Custom Transformation Node can have any number of parent nodes, each providing a DynamicFrame as an input. A Custom Transformation Node returns a collection of DynamicFrames. Each DynamicFrame that is used as input has an associated schema. You must add a schema that describes each DynamicFrame returned by the custom code node.

A Custom Transformation Node's script looks like the following. The input will be a GlueContext and a DynamicFrameCollection. The DynamicFrameCollection contains 1 to n DynamicFrame and at the start of the script we will get an Apache DataFrame from each of the DynamicFrame. Then transformations will be performed and the resulting Apache DataFrames will be converted back to DynamicFrame and returned in a DynamicFrameCollection.

In [None]:
def CustomTransform (glueContext, dfc) -> DynamicFrameCollection:
    df0 = dfc.select(list(dfc.keys())[0]).toDF()
    df1 = dfc.select(list(dfc.keys())[1]).toDF()
    ...

    # do transformation on the Spark DataFrame df0, df1, ... 
    ...
    
    # The result DataFrames named have names like resultDF0, resultDF1, ... in the end
    # Convert them to DynamicFrame and return in a DynamicFrameCollection

    dyf0 = DynamicFrame.fromDF(resultDF, glueContext, "result0")
    dyf1 = DynamicFrame.fromDF(resultDF, glueContext, "result1")
    ...
    return(DynamicFrameCollection(  {
                                    "CustomTransform0": dyf0,
                                    "CustomTransform1": dyf1,
                                    ...
                                    }, 
                                    glueContext))

Click on the data target node then click Remove at the top of the visual editor to remove it from the graph.

Click the Transform dropdown icon and select Custom transform. If the new node is not connected to the existing SelectFromCollection node, click Node properties and select it in the Node parents dropdown. Then copy the code below to the Code block field under Transform tab.

In [None]:
def ConvertDateStringToDate (glueContext, dfc) -> DynamicFrameCollection:
    sparkDF = dfc.select(list(dfc.keys())[0]).toDF()
    sparkDF.createOrReplaceTempView("inputTable")

    df = spark.sql("select TO_DATE(CAST(UNIX_TIMESTAMP(date, 'yyyyMMdd') AS TIMESTAMP)) as date, \
                           state , \
                           positiveIncrease ,  \
                           totalTestResultsIncrease \
                    from   inputTable")

    dyf = DynamicFrame.fromDF(df, glueContext, "results")
    return DynamicFrameCollection({"CustomTransform0": dyf}, glueContext)

Click the Output schema tab and click Edit. Change the data type of date from string to date, then click Apply.

Click the Transform dropdown icon and select SelectFromCollection. Make sure under the Transform tab, the Frame index value is 0.

Note: At this point we have showed the full process of developing scripts for Custom Transformation Node. You can repeat this process as you develop more custom scripts using Spark SQL. In later part of this section, we will add more Custom Transformation Nodes and finish the ETL job.

Click the Transform dropdown icon and select Custom transform. Copy the code below to the Code block field under the Transform tab.

In [None]:
def FilterAndCalculatePercentage (glueContext, dfc) -> DynamicFrameCollection:
    sparkDF = dfc.select(list(dfc.keys())[0]).toDF()
    sparkDF.createOrReplaceTempView("inputTable")

    df = spark.sql("select  date , \
                            state , \
                            (positiveIncrease * 100 / totalTestResultsIncrease) as positivePercentage \
                    from inputTable \
                    where state in ('NY', 'CA')")

    dyf = DynamicFrame.fromDF(df, glueContext, "results")
    return DynamicFrameCollection({"CustomTransform0": dyf}, glueContext)

Click the Output schema tab then click Edit. Remove columns positiveIncrease and totalTestResultsIncrease. Add a new column named positivePercentage with type double by clicking ... and then Add root key. Finally, click Apply.

Click the Transform dropdown icon and choose SelectFromCollection. Make sure under the Transform tab the Frame index value is 0.

Click the Transform dropdown icon and select Custom transform. Copy the code below to the Code block field under Transform tab.

In [None]:
def PivotValue (glueContext, dfc) -> DynamicFrameCollection:
    sparkDF = dfc.select(list(dfc.keys())[0]).toDF()
    sparkDF.createOrReplaceTempView("inputTable")

    df = spark.sql("select * from inputTable \
                    pivot (avg(positivePercentage) as positivePercentage \
                    for state in ('NY' as positivePercentageNY, 'CA' as positivePercentageCA))")

    dyf = DynamicFrame.fromDF(df, glueContext, "results")
    return DynamicFrameCollection({"CustomTransform0": dyf}, glueContext)

- Click the Output schema tab then click Edit. Remove columns state and positivePercentage and add 2 new columns positivePercentageNY and positivePercentageCA with type double, then click Apply.

- Click the Transform dropdown icon and select SelectFromCollection. Make sure under Transform tab the Frame index value is 0.

- Click the Target dropdown icon and select S3. Under the Data target properties - S3 tab, set the S3 Target Location value to s3://${BUCKET_NAME}/output/lab6/json/finalResult/.

- Click the Save button on the upper right corner and then click Run.

- You can use the command below to copy the output files to your environment and explore the content. You will see that the output JSON file only contains 3 fields date, positivePercentageNY and positivePercentageCA.

In [None]:
!aws s3 sync s3://$BUCKET_NAME/output/ glue-workshop/output

### Glue Studio Streaming Job

We will see how to create a Glue streaming ETL job with Glue Studio.

1. Go to AWS Glue Studio Console , click Jobs in the menu on the left, select Blank graph and click Create.
1. Rename the job to glueworkshop-lab6-streaming-job, click Job details tab and select AWSGlueServiceRole-glueworkshop for the IAM Role. Set Number of workers to 2 then under Job bookmark select Disable and click Save.
1. Click Visual tab, click Source dropdown icon and select Kinesis. On the right side, under Data source properties - Kinesis Stream tab, select glueworkshop-cloudformation for the Database then select json-streaming-table under Table.
1. Click the Source dropdown icon and select S3. On the right side under Node properties tab, set the node Name to country_lookup. Under the Data source properties - S3 tab, select S3 location then under S3 source type, set S3 URL to s3://${BUCKET_NAME}/input/lab6/country_lookup/. Set the Data format to CSV then click Infer schema button.
1. Click the Transform dropdown icon and select Join. On the right side under the Node properties tab, find Node parents and select both nodes. Under the Transform tab, set Join type to Left join. Under Join conditions, select country. Under Kinesis, select CountryName for country_lookup. Note: when setting Join conditions in the UI, Kinesis should be on the left. If you see country_lookup on the left of the UI, go back to Node properties tab and de-select both nodes, select Kinesis, then country_lookup.
1. Click the Transform dropdown icon and select DropFields on the right side. Under the Tranform tab, select the following columns:
    ```
    order priority
    order date
    region
    ship date
    units sold
    unit price
    unit cost
    CountryName
    ```
1. Click the Target dropdown icon and select S3. On the right side under Data target properties - S3 tab, select CSV for Format, None for Compression Type, and set S3 Target Location to s3://\${BUCKET_NAME}/output/lab6/streamingoutput/.
1. Click Save button in the upper right corner and then click Run.
1. Go to your terminal window where you ran the script to add data into the Kinesis data stream earlier. If the script is still running, leave it. If not, copy the commands below to publish JSON messages into the Kinesis data stream.
    ```sh
    cd glue-workshop
    python code/lab4/PutRecord_Kinesis.py 
    ```
1. In the terminal that is not running the script to add data to Kinesis data stream, use the command below to copy the output files and explore the content. You will see a directory structures like /output/lab6/streamingoutput/ingest_year=20xx/ingest_month=xx/ingest_day=xx/ingest_hour=xx/ and output files inside the directories.

In [None]:
!aws s3 sync s3://$BUCKET_NAME/output/ glue-workshop/output

### Monitoring with Glue Studio

1. Click Monitor on the left - you should see the Glue Studio Monitor dashboard.
1. At the top of the dashboard are high-level KPIs for the Glue jobs.
1. In the middle of the dashboard you will find Job type breakdown and Worker type breakdown which contains information about the jobs. You should see one running streaming job that you started in the previous section.
1. Below them is the Job runs timeline which contains the timeline of job executions.
1. Scroll down to the bottom of the dashboard to see the Jobs history. This section includes all the jobs we have run for this workshop. You can use the DPU hours in the dashboard to estimate the cost for each job. You should see the streaming job from Lab 6 at Running state in the job list.

## Clean Up

Execute the commands below to delete the resources. If you choose to use names that are different than the default names provided in the labs, please modify the names match your own resources in the command below.

Delete the DataBrew project, jobs, and dataset.

In [None]:
!aws databrew delete-job --name covid-testing-recipe-job
!aws databrew delete-job --name covid-testing-profile-job
!aws databrew delete-project --name covid-testing
!aws databrew delete-dataset --name covid-testing

Find the recipe versions in the recipe store and delete the versions list by above cli command.

In [None]:
!aws databrew list-recipe-versions --name covid-testing-recipe

Replace the recipe version number with what you find from command above.

In [None]:
!aws databrew batch-delete-recipe-version --name covid-testing-recipe --recipe-version "1.0"

Delete the Glue Data Catalog database, crawlers, jobs, and triggers created manually.

In [None]:
!aws glue delete-job --job-name glueworkshop-lab3-etl-job 
!aws glue delete-job --job-name glueworkshop_lab4_glue_streaming 
!aws glue delete-job --job-name glueworkshop-lab6-streaming-job
!aws glue delete-job --job-name glueworkshop-lab6-etl-job
!aws glue delete-trigger --name glueworkshop-lab3-etl-job-trigger
!aws glue delete-crawler --name lab1-csv
!aws glue delete-crawler --name covid-testing
!aws glue delete-database --name glueworkshop

Delete the CloudFormation stack.

In [74]:
!aws cloudformation delete-stack --stack-name glueworkshop

Delete the S3 bucket.

In [None]:
!aws s3 rb s3://$BUCKET_NAME --force