# AWS SUMMIT 2022
## Fist Cloud Journey - Building a Datalake on AWS


Take your time to read throught the instructions provided in this notebook.

###### Learning Objectives
- Understand how to interactivly author Glue ETL scripts using Glue Dev Endpoints & SageMaker notebooks
- Use Boto3 to call Glue APIs to do Glue administrative and operational activities


**Execute the code blocks one cell at a time**

###### Import Libraries 
- In this notebook we will be using the following classes, here are some of the important ones
 - SparkContext - Main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
 - GlueContext - Wraps the Apache SparkSQL SQLContext object, and thereby provides mechanisms for interacting with the Apache Spark platform
 - boto3 - AWS's Python SDK, we will be using this library to make call to AWS APIs
 - awsglue - AWS's pyspark library which provides the needed 
 
 
# Here data transform that we we will perform

#### Execute Code 🔻

In [None]:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3
import time


# Exploring your raw dataset
- In this step you will:
 - Create a dynamic frame for your 'raw' table from AWS Glue catalog
 - Explore the schema of the datasets
 - Count rows in raw table
 - View a sample of the data 

## Glue Dynamic Frames Basics

- AWS Glue's dynamic data frames is a powerful data structure.
- They provide a precise representation of the underlying semi-structured data, especially when dealing with columns or fields with varying types.
- They also provide powerful primitives to deal with nesting and unnesting.
- A dynamic record is a self-describing record: Each record encodes its columns and types, so every record can have a schema that is unique from all others in the dynamic frame.
- For ETL, we needed somthing more dynamic, hence we created the Glue Dynamic DataFrames. DDF are an implementaion of DF that relaxes the requiements of having a rigid schema. Its designed for semi-structured data.
- It maintains a schema per-record, its easy to restucture, tag and modify. 


#### Read More : https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html

#### Execute Code 🔻


In [None]:

glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session


# Crate dynamic frame from Glue catalog
- In this block we are using gluecontext to create a new dynamicframe from glue catalog
- 

Other ways to create dynamicframes in Glue:
- create_dynamic_frame_from_rdd
- create_dynamic_frame_from_catalog
- create_dynamic_frame_from_options

#### Read More:https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html

#### Execute Code 🔻


In [None]:
raw_data = glueContext.create_dynamic_frame.from_catalog(database = "summitdb", table_name = "raw")

reference_data = glueContext.create_dynamic_frame.from_catalog(database = "summitdb", table_name = "reference_data")

# View schema
- In this step we view the schema of the dynamic frame
- printSchema( ) – Prints the schema of the underlying DataFrame.

#### Execute Code 🔻

In [None]:
raw_data.printSchema()

In [None]:
reference_data.printSchema()

# Count records
- In this step we will count the number of records in the dataframe
- count( ) – Returns the number of rows in the underlying DataFrame

#### Execute Code 🔻

In [None]:
print('raw_data (Count) = ' + str(raw_data.count()))
print('reference_data (Count) = ' + str(reference_data.count()))

# Show sample records
- You can use to method to show samples of data in the datasets
- use show() method to display a sample of records in the frame
- here were are showing the top 5 records in the DF


#### Execute Code 🔻

In [None]:
raw_data.toDF().show(5)

In [None]:
reference_data.toDF().show(5)

# Using Spark SQL to explore data

- Having the ability of 
- In Glue, you can leverage Spark's SQL engine to run SQL queries over your data
- If you have a DynamicFrame called my_dynamic_frame, you can use the following snippet to convert the DynamicFrame to a DataFrame, issue a SQL query, and then convert back to a DynamicFrame

### Spark SQL - Filtering & Counting - activity_type = Running
- In this block, we will filter & count the number of events with activity_type = Running

#### Execute Code 🔻

In [None]:
# Adding raw_data as a temporary table in sql context for spark

raw_data.toDF().createOrReplaceTempView("temp_raw_data")

# Running the SQL statement which 
runningDF = spark.sql("select * from temp_raw_data where activity_type = 'Running'")
print("Running (count) : " + str(runningDF.count()))

runningDF.show(5)


### Spark SQL - Filtering & Counting - activity_type = Working
- In this block, we will filter & count the number of events with activity_type = Working

#### Execute Code 🔻

In [None]:
# Running the SQL statement which 
workingDF = spark.sql("select * from temp_raw_data where activity_type = 'Working'")
print("Working (count) : " + str(workingDF.count()))

workingDF.show(5)


### Glue Transforms - Filtering & Counting - activity_type = Running
- Now, lets perform the same operation using Glue inbuilt transforms
- We will use the **filter** transform
- Filter() - Selects records from a DynamicFrame and returns a filtered DynamicFrame.
- You specify a function, such as a function, which determines whether a record is output (function returns true) or not (function returns false).
- In this function, we are filtering on the condition activity_type == 'Running'

#### Read More: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-transforms-filter.html#aws-glue-api-crawler-pyspark-transforms-filter-example

#### Execute Code 🔻

In [None]:

def filter_function(dynamicRecord):
	if dynamicRecord['activity_type'] == 'Running':
		return True
	else:
		return False
runningDF = Filter.apply(frame = raw_data, f = filter_function)

print("Running (count) : " + str(runningDF.count()))

### Glue Transforms - Filtering & Counting - activity_type = Working (Using python Lambda Expressions)
- Small anonymous functions can be created with the lambda keyword.
- Lambda functions can be used wherever function objects are required. They are syntactically restricted to a single expression. 
- Example: This function returns the sum of its two arguments: lambda a, b: a+b.

#### Execute Code 🔻

In [None]:

workingDF = Filter.apply(frame = raw_data, f = lambda x:x['activity_type']=='Working')

print("Working (count) : " + str(workingDF.count()))

### Glue Transforms - Joining two dataframes 
- Performs an equality join on two DynamicFrames.
- This transforms accepts the following arguments.
 - frame1 – The first DynamicFrame to join
 - frame2 – The second DynamicFrame to join
 - keys1 – The keys to join on for the first frame
 - keys2 – The keys to join on for the second frame
- In our case we will be joining the these two frames : **raw_data** & **reference_data**
- We will be joing these two frames on column **track_id**

#### Read More: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-transforms-join.html

#### Execute Code 🔻

In [None]:

joined_data = Join.apply(raw_data,reference_data, 'track_id', 'track_id')


### View schema
- In this step we view the schema of the dynamic frame
- printSchema( ) – Prints the schema of the underlying DataFrame.

#### Execute Code 🔻

In [None]:
joined_data.printSchema()

###### Cleaning up the joined_data dynamicframe
- Other than the columns we were interested in we have the partition columns
- These were generated by firehose for placing the files in yyyy/mm/dd/hh directory structure in S3
- We will use Glue's in-built **DropFields** transform to drop partition columns

###### Read more about AWS Glue transforms here : https://docs.aws.amazon.com/glue/latest/dg/built-in-transforms.html

#### Execute Code 🔻

In [None]:

joined_data_clean = DropFields.apply(frame = joined_data, paths = ['partition_0','partition_1','partition_2','partition_3'])



### View schema after DropFields transform
#### Execute Code 🔻

In [None]:
joined_data_clean.printSchema()

###### sample data 

In [None]:
joined_data_clean.toDF().show(5)

# Final step of the transform - Writing transformed data to S3
- In this step we will be using Glue's write_dynamic_frame functionality to write transformed data to S3
- We will be storing the transformed data in a different directory & in parquet format
- make sure you change the D3 bucket name **yourname-datalake-demo-bucket** to reflect your bucket name 


---
- Why parquet format ? 
 - Apache Parquet is a columnar storage formats that is optimized for fast retrieval of data and used in AWS analytical applications.
 - Columnar storage formats have the following characteristics that make them suitable for using with Athena:
 Compression by column, with compression algorithm selected for the column data type to save storage space in Amazon S3 and reduce disk space and I/O during query processing.
 - Predicate pushdown in Parquet and ORC enables queries to fetch only the blocks it needs, improving query performance.
 - When a query obtains specific column values from your data, it uses statistics from data block predicates, such as max/min values, to determine whether to read or skip the block.
 - Splitting of data in Parquet allows analytics tools to split the reading of data to multiple readers and increase parallelism during its query processing.
 
#### Execute Code 🔻

In [None]:
try:
 datasink = glueContext.write_dynamic_frame.from_options(
 frame = joined_data_clean, connection_type = "s3",
 connection_options = {"path": "s3://yourname-datalake-demo-bucket/data/processed-data/"},
 format = "parquet")
 print('Transformed data written to S3')
except Exception as ex:
 print('Something went wrong')
 print(ex)

# Using boto3 to run & automate AWS Glue 

- Boto is the AWS SDK for Python. It enables Python developers to create, configure, and manage AWS services. Boto provides an easy to use, object-oriented API, as well as low-level access to AWS services.


# Add transformed data set to glue catalog
- Now they you have written your transformed data to S3, we need to add it to the glue catalog so you can query it using athena
- This block of take take close to 60 seconds to run, do not terminate stop the execution


#### Execute Code 🔻

In [None]:

glueclient = boto3.client('glue',region_name='us-east-1')

response = glueclient.start_crawler(Name='summitcrawler')

print('---')

crawler_state = ''
while (crawler_state != 'STOPPING'):
 response = glueclient.get_crawler(Name='summitcrawler')
 crawler_state = str(response['Crawler']['State'])
 time.sleep(1)

print('Crawler : Stopped')
print('---')
time.sleep(3)


# Use boto to view the list of tables in summitdb database

#### Execute Code 🔻

In [None]:

print('** Summitdb has following tables**')
response = glueclient.get_tables(
 DatabaseName='summitdb',
)

for table in response['TableList']:
 print(table['Name'])



# 😎
=========================

# 👍 You did an AWSome job today with the lab ...! 

### If you wish you take this notebook & its output back home - you can download / export it

### In Jupyter's menu bar:
- Click: **File**
 - Download As: Notebook(.ipynb) (you can reimport it a jupyter notebook in the future)
 - Download As: HTML (shows code + results in an easy to read format)


# NEXT Steps: Go back to the lab guide

=========================