{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# AWS SUMMIT 2022\n", "## Fist Cloud Journey - Building a Datalake on AWS\n", "\n", "\n", "Take your time to read throught the instructions provided in this notebook.\n", "\n", "###### Learning Objectives\n", "- Understand how to interactivly author Glue ETL scripts using Glue Dev Endpoints & SageMaker notebooks\n", "- Use Boto3 to call Glue APIs to do Glue administrative and operational activities\n", "\n", "\n", "**Execute the code blocks one cell at a time**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "###### Import Libraries \n", "- In this notebook we will be using the following classes, here are some of the important ones\n", " - SparkContext - Main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.\n", " - GlueContext - Wraps the Apache SparkSQL SQLContext object, and thereby provides mechanisms for interacting with the Apache Spark platform\n", " - boto3 - AWS's Python SDK, we will be using this library to make call to AWS APIs\n", " - awsglue - AWS's pyspark library which provides the needed \n", " \n", " \n", "# Here data transform that we we will perform\n", "\n", "#### Execute Code 🔻" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "import sys\n", "from awsglue.transforms import *\n", "from awsglue.utils import getResolvedOptions\n", "from pyspark.context import SparkContext\n", "from awsglue.context import GlueContext\n", "from awsglue.job import Job\n", "import boto3\n", "import time\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Exploring your raw dataset\n", "- In this step you will:\n", " - Create a dynamic frame for your 'raw' table from AWS Glue catalog\n", " - Explore the schema of the datasets\n", " - Count rows in raw table\n", " - View a sample of the data \n", "\n", "## Glue Dynamic Frames Basics\n", "\n", "- AWS Glue's dynamic data frames is a powerful data structure.\n", "- They provide a precise representation of the underlying semi-structured data, especially when dealing with columns or fields with varying types.\n", "- They also provide powerful primitives to deal with nesting and unnesting.\n", "- A dynamic record is a self-describing record: Each record encodes its columns and types, so every record can have a schema that is unique from all others in the dynamic frame.\n", "- For ETL, we needed somthing more dynamic, hence we created the Glue Dynamic DataFrames. DDF are an implementaion of DF that relaxes the requiements of having a rigid schema. Its designed for semi-structured data.\n", "- It maintains a schema per-record, its easy to restucture, tag and modify. \n", "\n", "\n", "#### Read More : https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html\n", "\n", "#### Execute Code 🔻\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "glueContext = GlueContext(SparkContext.getOrCreate())\n", "spark = glueContext.spark_session\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Crate dynamic frame from Glue catalog\n", "- In this block we are using gluecontext to create a new dynamicframe from glue catalog\n", "- \n", "\n", "Other ways to create dynamicframes in Glue:\n", "- create_dynamic_frame_from_rdd\n", "- create_dynamic_frame_from_catalog\n", "- create_dynamic_frame_from_options\n", "\n", "#### Read More:https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html\n", "\n", "#### Execute Code 🔻\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "raw_data = glueContext.create_dynamic_frame.from_catalog(database = \"summitdb\", table_name = \"raw\")\n", "\n", "reference_data = glueContext.create_dynamic_frame.from_catalog(database = \"summitdb\", table_name = \"reference_data\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# View schema\n", "- In this step we view the schema of the dynamic frame\n", "- printSchema( ) – Prints the schema of the underlying DataFrame.\n", "\n", "#### Execute Code 🔻" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "raw_data.printSchema()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "reference_data.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Count records\n", "- In this step we will count the number of records in the dataframe\n", "- count( ) – Returns the number of rows in the underlying DataFrame\n", "\n", "#### Execute Code 🔻" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print('raw_data (Count) = ' + str(raw_data.count()))\n", "print('reference_data (Count) = ' + str(reference_data.count()))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Show sample records\n", "- You can use to method to show samples of data in the datasets\n", "- use show() method to display a sample of records in the frame\n", "- here were are showing the top 5 records in the DF\n", "\n", "\n", "#### Execute Code 🔻" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "raw_data.toDF().show(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "reference_data.toDF().show(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Using Spark SQL to explore data\n", "\n", "- Having the ability of \n", "- In Glue, you can leverage Spark's SQL engine to run SQL queries over your data\n", "- If you have a DynamicFrame called my_dynamic_frame, you can use the following snippet to convert the DynamicFrame to a DataFrame, issue a SQL query, and then convert back to a DynamicFrame\n", "\n", "### Spark SQL - Filtering & Counting - activity_type = Running\n", "- In this block, we will filter & count the number of events with activity_type = Running\n", "\n", "#### Execute Code 🔻" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Adding raw_data as a temporary table in sql context for spark\n", "\n", "raw_data.toDF().createOrReplaceTempView(\"temp_raw_data\")\n", "\n", "# Running the SQL statement which \n", "runningDF = spark.sql(\"select * from temp_raw_data where activity_type = 'Running'\")\n", "print(\"Running (count) : \" + str(runningDF.count()))\n", "\n", "runningDF.show(5)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Spark SQL - Filtering & Counting - activity_type = Working\n", "- In this block, we will filter & count the number of events with activity_type = Working\n", "\n", "#### Execute Code 🔻" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Running the SQL statement which \n", "workingDF = spark.sql(\"select * from temp_raw_data where activity_type = 'Working'\")\n", "print(\"Working (count) : \" + str(workingDF.count()))\n", "\n", "workingDF.show(5)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Glue Transforms - Filtering & Counting - activity_type = Running\n", "- Now, lets perform the same operation using Glue inbuilt transforms\n", "- We will use the **filter** transform\n", "- Filter() - Selects records from a DynamicFrame and returns a filtered DynamicFrame.\n", "- You specify a function, such as a function, which determines whether a record is output (function returns true) or not (function returns false).\n", "- In this function, we are filtering on the condition activity_type == 'Running'\n", "\n", "#### Read More: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-transforms-filter.html#aws-glue-api-crawler-pyspark-transforms-filter-example\n", "\n", "#### Execute Code 🔻" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "def filter_function(dynamicRecord):\n", "\tif dynamicRecord['activity_type'] == 'Running':\n", "\t\treturn True\n", "\telse:\n", "\t\treturn False\n", "runningDF = Filter.apply(frame = raw_data, f = filter_function)\n", "\n", "print(\"Running (count) : \" + str(runningDF.count()))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Glue Transforms - Filtering & Counting - activity_type = Working (Using python Lambda Expressions)\n", "- Small anonymous functions can be created with the lambda keyword.\n", "- Lambda functions can be used wherever function objects are required. They are syntactically restricted to a single expression. \n", "- Example: This function returns the sum of its two arguments: lambda a, b: a+b.\n", "\n", "#### Execute Code 🔻" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "workingDF = Filter.apply(frame = raw_data, f = lambda x:x['activity_type']=='Working')\n", "\n", "print(\"Working (count) : \" + str(workingDF.count()))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Glue Transforms - Joining two dataframes \n", "- Performs an equality join on two DynamicFrames.\n", "- This transforms accepts the following arguments.\n", " - frame1 – The first DynamicFrame to join\n", " - frame2 – The second DynamicFrame to join\n", " - keys1 – The keys to join on for the first frame\n", " - keys2 – The keys to join on for the second frame\n", "- In our case we will be joining the these two frames : **raw_data** & **reference_data**\n", "- We will be joing these two frames on column **track_id**\n", "\n", "#### Read More: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-transforms-join.html\n", "\n", "#### Execute Code 🔻" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "joined_data = Join.apply(raw_data,reference_data, 'track_id', 'track_id')\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### View schema\n", "- In this step we view the schema of the dynamic frame\n", "- printSchema( ) – Prints the schema of the underlying DataFrame.\n", "\n", "#### Execute Code 🔻" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "joined_data.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "###### Cleaning up the joined_data dynamicframe\n", "- Other than the columns we were interested in we have the partition columns\n", "- These were generated by firehose for placing the files in yyyy/mm/dd/hh directory structure in S3\n", "- We will use Glue's in-built **DropFields** transform to drop partition columns\n", "\n", "###### Read more about AWS Glue transforms here : https://docs.aws.amazon.com/glue/latest/dg/built-in-transforms.html\n", "\n", "#### Execute Code 🔻" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "joined_data_clean = DropFields.apply(frame = joined_data, paths = ['partition_0','partition_1','partition_2','partition_3'])\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### View schema after DropFields transform\n", "#### Execute Code 🔻" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "joined_data_clean.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "###### sample data " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "joined_data_clean.toDF().show(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Final step of the transform - Writing transformed data to S3\n", "- In this step we will be using Glue's write_dynamic_frame functionality to write transformed data to S3\n", "- We will be storing the transformed data in a different directory & in parquet format\n", "- make sure you change the D3 bucket name **yourname-datalake-demo-bucket** to reflect your bucket name \n", "\n", "\n", "---\n", "- Why parquet format ? \n", " - Apache Parquet is a columnar storage formats that is optimized for fast retrieval of data and used in AWS analytical applications.\n", " - Columnar storage formats have the following characteristics that make them suitable for using with Athena:\n", " Compression by column, with compression algorithm selected for the column data type to save storage space in Amazon S3 and reduce disk space and I/O during query processing.\n", " - Predicate pushdown in Parquet and ORC enables queries to fetch only the blocks it needs, improving query performance.\n", " - When a query obtains specific column values from your data, it uses statistics from data block predicates, such as max/min values, to determine whether to read or skip the block.\n", " - Splitting of data in Parquet allows analytics tools to split the reading of data to multiple readers and increase parallelism during its query processing.\n", " \n", "#### Execute Code 🔻" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "try:\n", " datasink = glueContext.write_dynamic_frame.from_options(\n", " frame = joined_data_clean, connection_type = \"s3\",\n", " connection_options = {\"path\": \"s3://yourname-datalake-demo-bucket/data/processed-data/\"},\n", " format = \"parquet\")\n", " print('Transformed data written to S3')\n", "except Exception as ex:\n", " print('Something went wrong')\n", " print(ex)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Using boto3 to run & automate AWS Glue \n", "\n", "- Boto is the AWS SDK for Python. It enables Python developers to create, configure, and manage AWS services. Boto provides an easy to use, object-oriented API, as well as low-level access to AWS services.\n", "\n", "\n", "# Add transformed data set to glue catalog\n", "- Now they you have written your transformed data to S3, we need to add it to the glue catalog so you can query it using athena\n", "- This block of take take close to 60 seconds to run, do not terminate stop the execution\n", "\n", "\n", "#### Execute Code 🔻" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "glueclient = boto3.client('glue',region_name='us-east-1')\n", "\n", "response = glueclient.start_crawler(Name='summitcrawler')\n", "\n", "print('---')\n", "\n", "crawler_state = ''\n", "while (crawler_state != 'STOPPING'):\n", " response = glueclient.get_crawler(Name='summitcrawler')\n", " crawler_state = str(response['Crawler']['State'])\n", " time.sleep(1)\n", "\n", "print('Crawler : Stopped')\n", "print('---')\n", "time.sleep(3)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Use boto to view the list of tables in summitdb database\n", "\n", "#### Execute Code 🔻" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "print('** Summitdb has following tables**')\n", "response = glueclient.get_tables(\n", " DatabaseName='summitdb',\n", ")\n", "\n", "for table in response['TableList']:\n", " print(table['Name'])\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 😎\n", "=========================" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 👍 You did an AWSome job today with the lab ...! \n", "\n", "### If you wish you take this notebook & its output back home - you can download / export it\n", "\n", "### In Jupyter's menu bar:\n", "- Click: **File**\n", " - Download As: Notebook(.ipynb) (you can reimport it a jupyter notebook in the future)\n", " - Download As: HTML (shows code + results in an easy to read format)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# NEXT Steps: Go back to the lab guide" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "=========================" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Sparkmagic (PySpark)", "language": "", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 2 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python2" } }, "nbformat": 4, "nbformat_minor": 2 }