{ "metadata": { "pycharm": { "stem_cell": { "cell_type": "raw", "source": [], "metadata": { "collapsed": false } } }, "kernelspec": { "name": "glue_pyspark", "display_name": "Glue PySpark", "language": "python" }, "language_info": { "name": "Python_Glue_Session", "mimetype": "text/x-python", "codemirror_mode": { "name": "python", "version": 3 }, "pygments_lexer": "python3", "file_extension": ".py" } }, "nbformat_minor": 4, "nbformat": 4, "cells": [ { "cell_type": "markdown", "source": "# Analytics On AWS workshop\n\nTake your time to read through the instructions provided in this notebook.\n\n#### Learning Objectives\n- Understand how to interactively author Glue ETL scripts using Glue Studio & Jupyter notebooks\n- Use boto3 to call Glue APIs to do Glue administrative and operational activities\n\n**Note:** \n - **Execute the code blocks one cell at a time.**\n - **It's a good practice to keep saving the notebook at regular intervals while you work through it.** Read more about saving the notebook here: https://docs.aws.amazon.com/glue/latest/ug/notebook-getting-started.html#save-notebook", "metadata": {} }, { "cell_type": "markdown", "source": "# Initial configuration\n- Lets configure \"session idle timeout\", \"worker type\" and \"number of workers\" with the help of available magics. \n - **%idle_timeout**: The number of minutes of inactivity after which a session will timeout after a cell has been executed. \n - **%worker_type**: Type of workers supported by AWS Glue. Default is G.1X.\n - **%number_of_workers**: The number of workers of a defined worker_type that are allocated when a job runs.\n\n#### Read more about magics supported by AWS Glue interactive sessions for Jupyter here: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions-magics.html \n\n#### Execute Code »", "metadata": { "tags": [] } }, { "cell_type": "code", "source": "%idle_timeout 60\n%worker_type G.1X\n%number_of_workers 2", "metadata": { "trusted": true }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": "#### Import Libraries \n- In this notebook we will be using the following classes, here are some of the important ones\n - SparkContext - Main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.\n - GlueContext - Wraps the Apache SparkSQL SQLContext object, and thereby provides mechanisms for interacting with the Apache Spark platform\n - boto3 - AWS's Python SDK, we will be using this library to make call to AWS APIs.\n - awsglue - AWS's pyspark library that provides the needed Python packages and extends PySpark to support serverless ETL on AWS.\n\n#### Execute Code »", "metadata": { "tags": [] } }, { "cell_type": "code", "source": "import sys\nfrom awsglue.transforms import *\nfrom awsglue.utils import getResolvedOptions\nfrom pyspark.context import SparkContext\nfrom awsglue.context import GlueContext\nfrom awsglue.job import Job\nimport boto3\nimport time", "metadata": { "trusted": true }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": "# Exploring your raw dataset\n\n- In this step you will:\n - Create a dynamic frame for your 'raw' table from AWS Glue catalog\n - Explore the schema of the datasets\n - Count rows in raw table\n - View a sample of the data \n\n## Glue Dynamic Frames Basics\n\n- AWS Glue's dynamic data frames is a powerful data structure.\n- They provide a precise representation of the underlying semi-structured data, especially when dealing with columns or fields with varying types.\n- They also provide powerful primitives to deal with nesting and unnesting.\n- A dynamic record is a self-describing record: Each record encodes its columns and types, so every record can have a schema that is unique from all others in the dynamic frame.\n- For ETL, we needed somthing more dynamic, hence we created the Glue Dynamic DataFrames. DDF are an implementaion of DF that relaxes the requiements of having a rigid schema. Its designed for semi-structured data.\n- It maintains a schema per-record, its easy to restucture, tag and modify. \n\n\n#### Read More: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html\n\n#### Execute Code »\n", "metadata": {} }, { "cell_type": "code", "source": "glueContext = GlueContext(SparkContext.getOrCreate())\nspark = glueContext.spark_session", "metadata": { "trusted": true }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": "# Create dynamic frame from Glue catalog\n- In this block we are using gluecontext to create a new dynamicframe from glue catalog\n\nOther ways to create dynamicframes in Glue:\n- create_dynamic_frame_from_rdd\n- create_dynamic_frame_from_catalog\n- create_dynamic_frame_from_options\n\n#### Read More:https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html\n\n#### Execute Code »", "metadata": {} }, { "cell_type": "code", "source": "raw_data = glueContext.create_dynamic_frame.from_catalog(database=\"analyticsworkshopdb\", table_name=\"raw\")\n\nreference_data = glueContext.create_dynamic_frame.from_catalog(database=\"analyticsworkshopdb\", table_name=\"reference_data\")", "metadata": { "trusted": true }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": "# View schema\n- In this step we view the schema of the dynamic frame\n- printSchema( ): Prints the schema of the underlying DataFrame.\n\n#### Execute Code »", "metadata": {} }, { "cell_type": "code", "source": "raw_data.printSchema()", "metadata": { "trusted": true }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": "reference_data.printSchema()", "metadata": { "trusted": true }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": "# Count records\n- In this step we will count the number of records in the dataframe\n- count( ): Returns the number of rows in the underlying DataFrame\n\n#### Execute Code »", "metadata": {} }, { "cell_type": "code", "source": "print(f'raw_data (count) = {raw_data.count()}')\nprint(f'reference_data (count) = {reference_data.count()}')", "metadata": { "trusted": true }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": "# Show sample records\n- to.DF() method converts a DynamicFrame to an Apache Spark DataFrame by converting DynamicRecords into DataFrame fields\n- use show() method to display a sample of records in the frame\n- here were are showing the top 5 records in the DF\n\n\n#### Execute Code »", "metadata": {} }, { "cell_type": "code", "source": "raw_data.toDF().show(5)", "metadata": { "trusted": true }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": "reference_data.toDF().show(5)", "metadata": { "trusted": true }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": "# Using Spark SQL to explore data\n\n- In Glue, you can leverage Spark's SQL engine to run SQL queries over your data\n- If you have a DynamicFrame called my_dynamic_frame, you can use the following snippet to convert the DynamicFrame to a DataFrame, issue a SQL query, and then convert back to a DynamicFrame\n\n### Spark SQL - Filtering & Counting - activity_type = Running\n- In this block, we will filter & count the number of events with activity_type = Running\n\n#### Execute Code »", "metadata": {} }, { "cell_type": "code", "source": "# Adding raw_data as a temporary table in sql context for spark\n\nraw_data.toDF().createOrReplaceTempView(\"temp_raw_data\")\n\n# Running the SQL statement which \nrunningDF = spark.sql(\"select * from temp_raw_data where activity_type = 'Running'\")\nprint(f'Running (count): {runningDF.count()}')\n\nrunningDF.show(5)", "metadata": { "trusted": true }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": "### Spark SQL - Filtering & Counting - activity_type = Working\n- In this block, we will filter & count the number of events with activity_type = Working\n\n#### Execute Code »", "metadata": {} }, { "cell_type": "code", "source": "# Running the SQL statement which \nworkingDF = spark.sql(\"select * from temp_raw_data where activity_type = 'Working'\")\nprint(f'Working (count): {workingDF.count()}')\n\nworkingDF.show(5)", "metadata": { "trusted": true }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": "### Glue Transforms - Filtering & Counting - activity_type = Running\n- Now, lets perform the same operation using Glue inbuilt transforms\n- We will use the **filter** transform\n- Filter() - Selects records from a DynamicFrame and returns a filtered DynamicFrame.\n- You specify a function, such as a function, which determines whether a record is output (function returns true) or not (function returns false).\n- In this function, we are filtering on the condition activity_type == 'Running'\n\n#### Read More: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-transforms-filter.html#aws-glue-api-crawler-pyspark-transforms-filter-example\n\n#### Execute Code »", "metadata": {} }, { "cell_type": "code", "source": "def filter_function(dynamic_record):\n\tif dynamic_record['activity_type'] == 'Running':\n\t\treturn True\n\telse:\n\t\treturn False\nrunningDF = Filter.apply(frame=raw_data, f=filter_function)\n\nprint(f'Running (count): {runningDF.count()}')", "metadata": { "trusted": true }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": "### Glue Transforms - Filtering & Counting - activity_type = Working (Using python Lambda Expressions)\n- Small anonymous functions can be created with the lambda keyword.\n- Lambda functions can be used wherever function objects are required. They are syntactically restricted to a single expression. \n- Example: This function returns the sum of its two arguments: lambda a, b: a+b.\n\n#### Execute Code »", "metadata": {} }, { "cell_type": "code", "source": "workingDF = Filter.apply(frame=raw_data, f=lambda x: x['activity_type'] == 'Working')\n\nprint(f'Working (count): {workingDF.count()}')", "metadata": { "trusted": true }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": "### Glue Transforms - Joining two dataframes \n- Performs an equality join on two DynamicFrames.\n- This transforms accepts the following arguments.\n - frame1: The first DynamicFrame to join\n - frame2: The second DynamicFrame to join\n - keys1: The keys to join on for the first frame\n - keys2: The keys to join on for the second frame\n- In our case we will be joining the these two frames : **raw_data** & **reference_data**\n- We will be joing these two frames on column **track_id**\n\n#### Read More: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-transforms-join.html\n\n#### Execute Code »", "metadata": {} }, { "cell_type": "code", "source": "joined_data = Join.apply(raw_data, reference_data, 'track_id', 'track_id')", "metadata": { "trusted": true }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": "### View schema\n- In this step we view the schema of the dynamic frame\n- printSchema( ): Prints the schema of the underlying DataFrame.\n\n#### Execute Code »", "metadata": {} }, { "cell_type": "code", "source": "joined_data.printSchema()", "metadata": { "trusted": true }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": "###### Cleaning up the joined_data dynamicframe\n- Other than the columns we were interested in we have the partition columns\n- These were generated by firehose for placing the files in yyyy/mm/dd/hh directory structure in S3\n- We will use Glue's in-built **DropFields** transform to drop partition columns\n\n#### Read more about AWS Glue transforms here : https://docs.aws.amazon.com/glue/latest/dg/built-in-transforms.html\n\n#### Execute Code »", "metadata": {} }, { "cell_type": "code", "source": "joined_data_clean = DropFields.apply(frame=joined_data, paths=['partition_0','partition_1','partition_2','partition_3'])", "metadata": { "trusted": true }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": "### View schema after DropFields transform\n#### Execute Code »", "metadata": {} }, { "cell_type": "code", "source": "joined_data_clean.printSchema()", "metadata": { "trusted": true }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": "###### sample data ", "metadata": {} }, { "cell_type": "code", "source": "joined_data_clean.toDF().show(5)", "metadata": { "trusted": true }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": "# Final step of the transform - Writing transformed data to S3\n- In this step we will be using Glue's write_dynamic_frame functionality to write transformed data to S3\n- We will be storing the transformed data in a different directory & in parquet format\n- make sure you change the S3 bucket name **yourname-analytics-workshop-bucket** to reflect your bucket name \n\n\n---\n- Why parquet format ? \n - Apache Parquet is a columnar storage formats that is optimized for fast retrieval of data and used in AWS analytical applications.\n - Columnar storage formats have the following characteristics that make them suitable for using with Athena:\n Compression by column, with compression algorithm selected for the column data type to save storage space in Amazon S3 and reduce disk space and I/O during query processing.\n - Predicate pushdown in Parquet and ORC enables queries to fetch only the blocks it needs, improving query performance.\n - When a query obtains specific column values from your data, it uses statistics from data block predicates, such as max/min values, to determine whether to read or skip the block.\n - Splitting of data in Parquet allows analytics tools to split the reading of data to multiple readers and increase parallelism during its query processing.\n \n#### Execute Code »", "metadata": {} }, { "cell_type": "code", "source": "try:\n datasink = glueContext.write_dynamic_frame.from_options(\n frame = joined_data_clean, connection_type=\"s3\",\n connection_options = {\"path\": \"s3://yourname-analytics-workshop-bucket/data/processed-data/\"},\n format = \"parquet\")\n print('Transformed data written to S3')\nexcept Exception as ex:\n print('Something went wrong')\n print(ex)", "metadata": { "trusted": true }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": "# Using boto3 to run & automate AWS Glue \n\n- Boto is the AWS SDK for Python. It enables Python developers to create, configure, and manage AWS services. Boto provides an easy to use, object-oriented API, as well as low-level access to AWS services.\n\n\n# Add transformed data set to glue catalog\n- Now that you have written your transformed data to S3, we need to add it to the glue catalog so you can query it using athena\n- This block of code takes close to 60 seconds to run, do not terminate or stop the execution\n\n\n#### Execute Code »", "metadata": {} }, { "cell_type": "code", "source": "glueclient = boto3.client('glue', region_name='us-east-1')\n\nresponse = glueclient.start_crawler(Name='AnalyticsworkshopCrawler')\n\nprint('---')\n\ncrawler_state = None\nwhile (crawler_state != 'STOPPING'):\n response = glueclient.get_crawler(Name='AnalyticsworkshopCrawler')\n crawler_state = str(response['Crawler']['State'])\n time.sleep(1)\n\nprint('Crawler Stopped')\nprint('---')\ntime.sleep(3)", "metadata": { "trusted": true }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": "# Use boto to view the list of tables in analyticsworkshopdb database\n\n#### Execute Code »", "metadata": {} }, { "cell_type": "code", "source": "print('** analyticsworkshopdb has following tables**')\nresponse = glueclient.get_tables(\n DatabaseName='analyticsworkshopdb',\n)\n\nfor table in response['TableList']:\n print(table['Name'])", "metadata": { "trusted": true }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": "#### Bonus Knowledge\n\n - After you have finished developing your notebook, you can save the job and then run it. You can find the script in the **Script** tab. Any magics you added to the notebook will be stripped away and won't be saved as part of the script of the generated AWS Glue job. AWS Glue Studio will auto-add a job.commit() to the end of your generated script from the notebook contents.For more information about running jobs, see [Start a job run](https://docs.aws.amazon.com/glue/latest/ug/managing-jobs-chapter.html#start-jobs).\n - You can schedule this job to run at hourly, daily, weekly, monthly or custom (cron expression) frequency under **Schedules** tab.\n - You can integrate your job with Git version control systems such as AWS CodeCommit and GitHub. Read more about it [here](https://docs.aws.amazon.com/glue/latest/ug/edit-job-add-source-control-integration.html).", "metadata": {} }, { "cell_type": "markdown", "source": "# \n=========================", "metadata": {} }, { "cell_type": "markdown", "source": "### If you wish you take this notebook and its output back home - you can download / export it using **Download Notebook** option.\n", "metadata": { "jp-MarkdownHeadingCollapsed": true, "tags": [] } }, { "cell_type": "markdown", "source": "# NEXT Steps: Go back to the lab guide", "metadata": {} }, { "cell_type": "markdown", "source": "=========================", "metadata": {} } ] }