{"nbformat":4,"nbformat_minor":0,"metadata":{"colab":{"name":"2022-01-08-eda-aws-athena.ipynb","provenance":[{"file_id":"https://github.com/osipov/smlbook/blob/master/ch3.ipynb","timestamp":1644512667682}],"collapsed_sections":[],"toc_visible":true},"kernelspec":{"name":"python3","display_name":"Python 3"}},"cells":[{"cell_type":"markdown","metadata":{"id":"77ZN-BAylPoU"},"source":["# Data exploration at scale with AWS Athena"]},{"cell_type":"markdown","metadata":{"id":"8HPLiT_xlliJ"},"source":["> Note: https://learning.oreilly.com/library/view/mlops-engineering-at/9781617297762/OEBPS/Text/03.htm"]},{"cell_type":"markdown","metadata":{"id":"nWyD1Bt9k26w"},"source":["## Upload the `BUCKET_ID` file\n","\n","Before proceeding, ensure that you have a backup copy of the `BUCKET_ID` file created in the [Chapter 2](https://colab.research.google.com/github/osipov/smlbook/blob/master/ch2.ipynb) notebook before proceeding. The contents of the `BUCKET_ID` file are reused later in this notebook and in the other notebooks.\n"]},{"cell_type":"code","metadata":{"id":"cwPOIYDdnXKN"},"source":["import os\n","from pathlib import Path\n","assert Path('BUCKET_ID').exists(), \"Place the BUCKET_ID file in the current directory before proceeding\"\n","\n","BUCKET_ID = Path('BUCKET_ID').read_text().strip()\n","os.environ['BUCKET_ID'] = BUCKET_ID\n","os.environ['BUCKET_ID']"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"GZ2rTEBfU20C"},"source":["## **OPTIONAL:** Download and install AWS CLI\n","\n","This is unnecessary if you have already installed AWS CLI in a preceding notebook."]},{"cell_type":"code","metadata":{"id":"ei0Vm3p9UkT1"},"source":["%%bash\n","curl \"https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip\" -o \"awscliv2.zip\"\n","unzip -o awscliv2.zip\n","sudo ./aws/install"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"1xoSKwf7U77e"},"source":["## Specify AWS credentials\n","\n","Modify the contents of the next cell to specify your AWS credentials as strings. \n","\n","If you see the following exception:\n","\n","`TypeError: str expected, not NoneType`\n","\n","It means that you did not specify the credentials correctly."]},{"cell_type":"code","metadata":{"id":"CaRjFdSoT-q1"},"source":["import os\n","# *** REPLACE None in the next 2 lines with your AWS key values ***\n","os.environ['AWS_ACCESS_KEY_ID'] = None\n","os.environ['AWS_SECRET_ACCESS_KEY'] = None"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"aAMFo90AVJuI"},"source":["## Confirm the credentials\n","\n","Run the next cell to validate your credentials."]},{"cell_type":"code","metadata":{"id":"VZqAz5PjS_f1"},"source":["%%bash\n","aws sts get-caller-identity"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"66DsruTZWERS"},"source":["If you have specified the correct credentials as values for the `AWS_ACCESS_KEY_ID` and the `AWS_SECRET_ACCESS_KEY` environment variables, then `aws sts get-caller-identity` used by the previous cell should have returned back the `UserId`, `Account` and the `Arn` for the credentials, resembling the following\n","\n","```\n","{\n"," \"UserId\": \"█████████████████████\",\n"," \"Account\": \"████████████\",\n"," \"Arn\": \"arn:aws:iam::████████████:user/█████████\"\n","}\n","```"]},{"cell_type":"markdown","metadata":{"id":"wywu4hC-WPxV"},"source":["## Specify the region\n","\n","Replace the `None` in the next cell with your AWS region name, for example `us-west-2`."]},{"cell_type":"code","metadata":{"id":"IowJTSN1e8B-"},"source":["# *** REPLACE None in the next line with your AWS region ***\n","os.environ['AWS_DEFAULT_REGION'] = None"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"ZwJSUTvlfSE0"},"source":["If you have specified the region correctly, the following cell should return back the region that you have specifies."]},{"cell_type":"code","metadata":{"id":"2CssvgRfUSu9"},"source":["%%bash\n","echo $AWS_DEFAULT_REGION"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"2RRQXy_AmfrD"},"source":["## Download a tiny sample\n","\n","Download a tiny sample of the dataset from https://gist.github.com/osipov/1fc0265f8f829d9d9eee8393657423a9 to a `trips_sample.csv` file which you are going to use to learn about using the Athena interface."]},{"cell_type":"code","metadata":{"id":"D23pmPM2p3Mk"},"source":["%%bash\n","wget -q https://gist.githubusercontent.com/osipov/1fc0265f8f829d9d9eee8393657423a9/raw/9957c1f09cdfa64f8b8d89cfec532a0e150d5178/trips_sample.csv\n","ls -ltr trips_sample.csv\n","cat trips_sample.csv"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"BrZqHUzLrHdQ"},"source":["Assuming the previous cell executed successfully, it should have returned the following result:\n","\n","```\n","-rw-r--r-- 1 root root 378 Nov 23 19:50 trips_sample.csv\n","fareamount_double,origin_block_latitude,origin_block_longitude,destination_block_latitude,destination_block_longitude\n","8.11,38.900769,-77.033644,38.912239,-77.036514\n","5.95,38.912609,-77.030788,38.906445,-77.023978\n","7.57,38.900773,-77.03655,38.896131,-77.024975\n","11.61,38.892101000000004,-77.044208,38.905969,-77.06564399999999\n","4.87,38.899615000000004,-76.980387,38.900638,-76.97023\n","```"]},{"cell_type":"markdown","metadata":{"id":"ifWi_kpRrSIp"},"source":["## Upload `trips_sample.csv` to your object storage bucket"]},{"cell_type":"code","metadata":{"id":"_t1IzWylptua"},"source":["%%bash\n","aws s3 cp trips_sample.csv s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/samples/trips_sample.csv\n","aws s3 ls s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/samples/trips_sample.csv"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"RaWojJMWuV0Z"},"source":["The output of the `aws s3 ls` command above should include the following, confirming that the file was uploaded successfully.\n","\n","```\n","2020-11-23 20:07:31 378 trips_sample.csv\n","```"]},{"cell_type":"markdown","metadata":{"id":"UtGQ6KAx3Yed"},"source":["## Create an Athena workgroup\n","\n","Create a `dc_taxi_athena_workgroup` for your Athena project, assuming one does not exist yet."]},{"cell_type":"code","metadata":{"id":"B9DhccSFxZ35"},"source":["%%bash\n","aws athena delete-work-group --work-group dc_taxi_athena_workgroup --recursive-delete-option 2> /dev/null\n","aws athena create-work-group --name dc_taxi_athena_workgroup \\\n","--configuration \"ResultConfiguration={OutputLocation=s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/athena},EnforceWorkGroupConfiguration=false,PublishCloudWatchMetricsEnabled=false\""],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"Mcn4o2CrDbpW"},"source":["## Query Athena and Report Query Status"]},{"cell_type":"code","metadata":{"id":"iQP_USHQu2Kx"},"source":["%%bash\n","SQL=\"\n","CREATE EXTERNAL TABLE IF NOT EXISTS dc_taxi_db.dc_taxi_csv_sample_strings(\n"," fareamount STRING,\n"," origin_block_latitude STRING,\n"," origin_block_longitude STRING,\n"," destination_block_latitude STRING,\n"," destination_block_longitude STRING\n",")\n","ROW FORMAT DELIMITED FIELDS TERMINATED BY ','\n","LOCATION 's3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/samples/'\n","TBLPROPERTIES ('skip.header.line.count'='1');\"\n","\n","ATHENA_QUERY_ID=$(aws athena start-query-execution \\\n","--work-group dc_taxi_athena_workgroup \\\n","--query 'QueryExecutionId' \\\n","--output text \\\n","--query-string \"$SQL\")\n","\n","echo $SQL\n","\n","echo $ATHENA_QUERY_ID\n","until aws athena get-query-execution \\\n"," --query 'QueryExecution.Status.State' \\\n"," --output text \\\n"," --query-execution-id $ATHENA_QUERY_ID | grep -v \"RUNNING\";\n","do\n"," printf '.'\n"," sleep 1; \n","done"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"Zduq4AQbDlTp"},"source":["## Download and Preview a Utility Script to Query Athena\n","\n","The script is downloaded as `utils.sh` and is loaded in the upcoming cells using `source utils.sh` command."]},{"cell_type":"code","metadata":{"id":"-1A2IUgK62N2"},"source":["%%bash\n","wget -q https://raw.githubusercontent.com/osipov/smlbook/master/utils.sh\n","ls -l utils.sh"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"de1AFcAUDt3R"},"source":["## Output Athena Query to a Text Table"]},{"cell_type":"code","metadata":{"id":"siQ1_XcX8jn2"},"source":["%%bash\n","source utils.sh\n","SQL=\"\n","SELECT\n","\n","origin_block_latitude || ' , ' || origin_block_longitude\n"," AS origin,\n","\n","destination_block_latitude || ' , ' || destination_block_longitude\n"," AS destination\n","\n","FROM\n"," dc_taxi_db.dc_taxi_csv_sample_strings\n","\"\n","athena_query_to_table \"$SQL\" \"ResultSet.Rows[*].[Data[0].VarCharValue,Data[1].VarCharValue]\""],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"2U6ugJFND0ga"},"source":["## Output Athena Query to JSON for a Pandas DataFrame"]},{"cell_type":"code","metadata":{"id":"Ba8rMXvhBWdU"},"source":["%%bash\n","source utils.sh ; athena_query_to_pandas \"\"\"\n","SELECT\n","\n","origin_block_latitude || ' , ' || origin_block_longitude\n"," AS origin,\n","\n","destination_block_latitude || ' , ' || destination_block_longitude\n"," AS destination\n","\n","FROM\n"," dc_taxi_db.dc_taxi_csv_sample_strings\n","\"\"\""],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"5TmJx0tQD40z"},"source":["## Create a Utility Function to Read AWS CLI JSON as Pandas\n","\n","Note that the `utils.sh` script saves the output from Athena to `/tmp/awscli.json`"]},{"cell_type":"code","metadata":{"id":"VT20bpOrCVZ9"},"source":["import pandas as pd\n","def awscli_to_df():\n"," json_df = pd.read_json('/tmp/awscli.json')\n"," df = pd.DataFrame(json_df[0].tolist(), index = json_df.index, columns = json_df[0].tolist()[0]).drop(0, axis = 0)\n"," return df"],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"id":"MQi-CfOHAqgU"},"source":["awscli_to_df()"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"m0THrt-DOtdv"},"source":["## Apply Athena schema-on-read with columns as `DOUBLE`"]},{"cell_type":"code","metadata":{"id":"bScMRJ-L28J-"},"source":["%%bash\n","source utils.sh ; athena_query \"\n","CREATE EXTERNAL TABLE IF NOT EXISTS dc_taxi_db.dc_taxi_csv_sample_double(\n"," fareamount DOUBLE,\n"," origin_block_latitude DOUBLE,\n"," origin_block_longitude DOUBLE,\n"," destination_block_latitude DOUBLE,\n"," destination_block_longitude DOUBLE\n",")\n","ROW FORMAT DELIMITED FIELDS TERMINATED BY ','\n","LOCATION 's3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/samples/'\n","TBLPROPERTIES ('skip.header.line.count'='1');\n","\""],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"id":"hGIsmGgezora"},"source":["%%bash\n","source utils.sh ; athena_query_to_pandas \"\n","SELECT ROUND(MAX(fareamount) - MIN(fareamount), 2)\n","FROM dc_taxi_db.dc_taxi_csv_sample_double\n","\""],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"id":"K2kyee1VFAIx"},"source":["awscli_to_df()"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"Rp71QW-ZOj2Q"},"source":["## Explore 10 records from the DC taxi dataset"]},{"cell_type":"code","metadata":{"id":"ZV4CZ8Zd8wNL"},"source":["%%bash\n","source utils.sh ; athena_query_to_pandas \"\n","SELECT fareamount_double,\n"," origin_block_latitude_double,\n"," origin_block_longitude_double,\n"," destination_block_latitude_double,\n"," destination_block_longitude_double,\n"," origindatetime_tr\n","FROM dc_taxi_db.dc_taxi_parquet\n","LIMIT 10\n","\""],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"id":"_lhTTGCKFId5"},"source":["awscli_to_df()"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"W-6X9lnOOWHv"},"source":["## What is the number of the timestamps with NULL values?"]},{"cell_type":"code","metadata":{"id":"kIEuIU3tJMXZ"},"source":["%%bash\n","source utils.sh ; athena_query_to_pandas \"\n","SELECT\n"," (SELECT COUNT(*) FROM dc_taxi_db.dc_taxi_parquet) AS total,\n"," COUNT(*) AS null_origindate_time_total\n","FROM\n"," dc_taxi_db.dc_taxi_parquet\n","WHERE\n"," origindatetime_tr IS NULL\n","\""],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"id":"vmilocpPF5F4"},"source":["awscli_to_df()"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"Yt5PI2xyOLhQ"},"source":["## How many timestamps are un-parsable?"]},{"cell_type":"code","metadata":{"id":"y-iugDT4KZDW"},"source":["%%bash\n","source utils.sh ; athena_query_to_pandas \"\n","SELECT\n"," (SELECT COUNT(*) FROM dc_taxi_db.dc_taxi_parquet)\n"," - COUNT(DATE_PARSE(origindatetime_tr, '%m/%d/%Y %H:%i'))\n"," AS origindatetime_not_parsed\n","FROM\n"," dc_taxi_db.dc_taxi_parquet\n","WHERE\n"," origindatetime_tr IS NOT NULL;\n","\""],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"id":"qovgo1ToGByv"},"source":["awscli_to_df()"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"ZJOQp7FvN81f"},"source":["## How often are parts of the pick up location coordinate missing?"]},{"cell_type":"code","metadata":{"id":"SzuPPwdlRkQo"},"source":["%%bash\n","source utils.sh ; athena_query_to_pandas \"\n","SELECT\n"," ROUND(100.0 * COUNT(*) / (SELECT COUNT(*)\n"," FROM dc_taxi_db.dc_taxi_parquet), 2)\n","\n"," AS percentage_null,\n","\n"," (SELECT COUNT(*)\n"," FROM dc_taxi_db.dc_taxi_parquet\n"," WHERE origin_block_longitude_double IS NULL\n"," OR origin_block_latitude_double IS NULL)\n","\n"," AS either_null,\n","\n"," (SELECT COUNT(*)\n"," FROM dc_taxi_db.dc_taxi_parquet\n"," WHERE origin_block_longitude_double IS NULL\n"," AND origin_block_latitude_double IS NULL)\n","\n"," AS both_null\n","\n","FROM\n"," dc_taxi_db.dc_taxi_parquet\n","WHERE\n"," origin_block_longitude_double IS NULL\n"," OR origin_block_latitude_double IS NULL\n","\""],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"id":"i1-i7wx-R9Op"},"source":["awscli_to_df()"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"fJMCC1D0MgoL"},"source":["## How often are parts of the drop off coordinates missing?\n","\n","Repeat the previous analysis"]},{"cell_type":"code","metadata":{"id":"pWp6SMuySyXM"},"source":["%%bash\n","source utils.sh ; athena_query_to_pandas \"\n","SELECT\n"," ROUND(100.0 * COUNT(*) / (SELECT COUNT(*)\n"," FROM dc_taxi_db.dc_taxi_parquet), 2)\n","\n"," AS percentage_null,\n","\n"," (SELECT COUNT(*)\n"," FROM dc_taxi_db.dc_taxi_parquet\n"," WHERE destination_block_longitude_double IS NULL\n"," OR destination_block_latitude_double IS NULL)\n","\n"," AS either_null,\n","\n"," (SELECT COUNT(*)\n"," FROM dc_taxi_db.dc_taxi_parquet\n"," WHERE destination_block_longitude_double IS NULL\n"," AND destination_block_latitude_double IS NULL)\n","\n"," AS both_null\n","\n","FROM\n"," dc_taxi_db.dc_taxi_parquet\n","WHERE\n"," destination_block_longitude_double IS NULL\n"," OR destination_block_latitude_double IS NULL\n","\""],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"id":"os9ckvk4TQSd"},"source":["awscli_to_df()"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"FL-blOX9MZtV"},"source":[" ## Find the count and the fraction of the missing coordinates"]},{"cell_type":"code","metadata":{"id":"LosJk-AyTxO8"},"source":["%%bash\n","source utils.sh ; athena_query_to_pandas \"\n","SELECT\n"," COUNT(*)\n"," AS total,\n","\n"," ROUND(100.0 * COUNT(*) / (SELECT COUNT(*)\n"," FROM dc_taxi_db.dc_taxi_parquet), 2)\n"," AS percent\n","\n","FROM\n"," dc_taxi_db.dc_taxi_parquet\n","\n","WHERE\n"," origin_block_latitude_double IS NULL\n"," OR origin_block_longitude_double IS NULL\n"," OR destination_block_latitude_double IS NULL\n"," OR destination_block_longitude_double IS NULL\n","\""],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"id":"znSbval0T5OV"},"source":["awscli_to_df()"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"PN3LuLQkMVcs"},"source":["## Query for the values and quantities of fareamount_string that failed to parse as a double"]},{"cell_type":"code","metadata":{"id":"ydZ17NanUF4I"},"source":["%%bash\n","source utils.sh ; athena_query_to_pandas \"\n","SELECT\n"," fareamount_string,\n"," COUNT(fareamount_string) AS rows,\n"," ROUND(100.0 * COUNT(fareamount_string) /\n"," ( SELECT COUNT(*)\n"," FROM dc_taxi_db.dc_taxi_parquet), 2)\n","\n"," AS percent\n","FROM\n"," dc_taxi_db.dc_taxi_parquet\n","WHERE\n"," fareamount_double IS NULL\n"," AND fareamount_string IS NOT NULL\n","GROUP BY\n"," fareamount_string;\n","\""],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"id":"mhQDbvztULpN"},"source":["awscli_to_df()"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"2xWUyJ1hMLq6"},"source":["## Explore summary statistics of the `fareamount_double` column"]},{"cell_type":"code","metadata":{"id":"Glbl854gVSdD"},"source":["%%bash\n","source utils.sh ; athena_query_to_pandas \"\n","WITH\n","src AS (SELECT\n"," fareamount_double AS val\n"," FROM\n"," dc_taxi_db.dc_taxi_parquet),\n","\n","stats AS\n"," (SELECT\n"," MIN(val) AS min,\n"," APPROX_PERCENTILE(val, 0.25) AS q1,\n"," APPROX_PERCENTILE(val ,0.5) AS q2,\n"," APPROX_PERCENTILE(val, 0.75) AS q3,\n"," AVG(val) AS mean,\n"," STDDEV(val) AS std,\n"," MAX(val) AS max\n"," FROM\n"," src)\n","\n","SELECT\n"," DISTINCT min, q1, q2, q3, max\n","\n","FROM\n"," dc_taxi_db.dc_taxi_parquet, stats\n","\""],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"id":"lTHagOwHVaBf"},"source":["awscli_to_df()"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"wHlNB-JnMAmX"},"source":["## What percentage of fare amount values are null or below the minimum threshold?"]},{"cell_type":"code","metadata":{"id":"OFQcU6sVVqtc"},"source":["%%bash\n","source utils.sh ; athena_query_to_pandas \"\n","WITH\n","src AS (SELECT\n"," COUNT(*) AS total\n"," FROM\n"," dc_taxi_db.dc_taxi_parquet\n"," WHERE\n"," fareamount_double IS NOT NULL)\n","\n","SELECT\n"," ROUND(100.0 * COUNT(fareamount_double) / MIN(total), 2) AS percent\n","FROM\n"," dc_taxi_db.dc_taxi_parquet, src\n","WHERE\n"," fareamount_double < 3.25\n"," AND fareamount_double IS NOT NULL\n","\""],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"id":"T5WqqGHjVzAv"},"source":["awscli_to_df()"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"gfNCCzsILt6c"},"source":["## Compute summary statistics for the cases where the fareamount_string failed to parse"]},{"cell_type":"code","metadata":{"id":"8fnCYXMUV-Da"},"source":["%%bash\n","source utils.sh ; athena_query_to_pandas \"\n","SELECT\n"," fareamount_string,\n"," ROUND( MIN(mileage_double), 2) AS min,\n"," ROUND( APPROX_PERCENTILE(mileage_double, 0.25), 2) AS q1,\n"," ROUND( APPROX_PERCENTILE(mileage_double ,0.5), 2) AS q2,\n"," ROUND( APPROX_PERCENTILE(mileage_double, 0.75), 2) AS q3,\n"," ROUND( MAX(mileage_double), 2) AS max\n","FROM\n"," dc_taxi_db.dc_taxi_parquet\n","WHERE\n"," fareamount_string LIKE 'NULL'\n","GROUP BY\n"," fareamount_string\n","\""],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"id":"CJ5QxWdbWEFD"},"source":["awscli_to_df()"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"492StbYgLLf3"},"source":["## Figure out the lower left and upper right boundary locations\n","\n","Plugging the latitude and longitude coordinates reported by the query into OpenStreetMap ( https://www.openstreetmap.org/directions?engine=fossgis_osrm_car&route=38.8110%2C-77.1130%3B38.9950%2C-76.9100#map=11/38.9025/-77.0094 ) yields 21.13 miles or an estimate of $ \\$48.89 (21.13 * \\$2.16/mile + \\$3.25) $.\n"]},{"cell_type":"code","metadata":{"id":"u-IkoJL4Uf1N"},"source":["%%bash\n","source utils.sh ; athena_query_to_pandas \"\n","SELECT \n"," MIN(lat) AS lower_left_latitude,\n"," MIN(lon) AS lower_left_longitude,\n"," MAX(lat) AS upper_right_latitude,\n"," MAX(lon) AS upper_right_longitude\n","\n"," FROM (\n"," SELECT \n"," MIN(origin_block_latitude_double) AS lat,\n"," MIN(origin_block_longitude_double) AS lon \n"," FROM \"dc_taxi_db\".\"dc_taxi_parquet\" \n"," \n"," UNION\n","\n"," SELECT \n"," MIN(destination_block_latitude_double) AS lat, \n"," MIN(destination_block_longitude_double) AS lon \n"," FROM \"dc_taxi_db\".\"dc_taxi_parquet\" \n"," \n"," UNION\n","\n"," SELECT \n"," MAX(origin_block_latitude_double) AS lat, \n"," MAX(origin_block_longitude_double) AS lon \n"," FROM \"dc_taxi_db\".\"dc_taxi_parquet\" \n"," \n"," UNION\n","\n"," SELECT \n"," MAX(destination_block_latitude_double) AS lat, \n"," MAX(destination_block_longitude_double) AS lon \n"," FROM \"dc_taxi_db\".\"dc_taxi_parquet\"\n","\n",")\n","\""],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"id":"K4jRkxVhWZeu"},"source":["awscli_to_df()"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"gV-yM36xJpMK"},"source":["## Compute normally distributed averages of random samples"]},{"cell_type":"code","metadata":{"id":"N8SORcI7XFGd"},"source":["%%bash\n","source utils.sh ; athena_query_to_pandas \"\n","WITH dc_taxi AS \n","(SELECT *, \n"," origindatetime_tr \n"," || fareamount_string \n"," || origin_block_latitude_string \n"," || origin_block_longitude_string \n"," || destination_block_latitude_string \n"," || destination_block_longitude_string \n"," || mileage_string AS objectid\n","\n"," FROM \"dc_taxi_db\".\"dc_taxi_parquet\"\n","\n"," WHERE fareamount_double >= 3.25\n"," AND fareamount_double IS NOT NULL\n"," AND mileage_double > 0 )\n","\n","SELECT AVG(mileage_double) AS average_mileage\n","FROM dc_taxi\n","WHERE objectid IS NOT NULL\n","GROUP BY MOD( ABS( from_big_endian_64( xxhash64( to_utf8( objectid ) ) ) ), 1000)\n","\" ResultSet.Rows[*].[Data[].VarCharValue] 1000"],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"id":"lNf2VXJ6XFN_"},"source":["awscli_to_df()"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"knnL4gesLAiy"},"source":["## Visually confirm that the means of samples are normally distributed"]},{"cell_type":"code","metadata":{"id":"oDL4h5yiKHO_"},"source":["%matplotlib inline\n","import matplotlib.pyplot as plt\n","plt.figure(figsize = (12, 9))\n","\n","df = awscli_to_df()\n","df.average_mileage = df.average_mileage.astype(float)\n","df.average_mileage -= df.average_mileage.mean()\n","df.average_mileage /= df.average_mileage.std()\n","df.average_mileage.plot.hist(bins = 30);"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"7ccBsRpiJa6j"},"source":["## Produce a statistical upper bound estimate for the mileage"]},{"cell_type":"code","metadata":{"id":"f0NKqmy6toiR"},"source":["%%bash\n","source utils.sh ; athena_query_to_pandas \"\n","WITH dc_taxi AS \n","(SELECT *, \n"," origindatetime_tr \n"," || fareamount_string \n"," || origin_block_latitude_string \n"," || origin_block_longitude_string \n"," || destination_block_latitude_string \n"," || destination_block_longitude_string \n"," || mileage_string AS objectid\n","\n"," FROM \"dc_taxi_db\".\"dc_taxi_parquet\"\n","\n"," WHERE fareamount_double >= 3.25\n"," AND fareamount_double IS NOT NULL\n"," AND mileage_double > 0 ),\n","\n","dc_taxi_samples AS (\n"," SELECT AVG(mileage_double) AS average_mileage\n"," FROM dc_taxi\n"," WHERE objectid IS NOT NULL\n"," GROUP BY MOD( ABS( from_big_endian_64( xxhash64( to_utf8( objectid ) ) ) ) , 1000)\n",")\n","SELECT AVG(average_mileage) + 4 * STDDEV(average_mileage)\n","FROM dc_taxi_samples\n","\""],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"id":"LoQWBPVTt0P9"},"source":["awscli_to_df()"],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"id":"OJUb_ncZwust"},"source":["upper_mileage = 2.16 * awscli_to_df().mean().item() + 3.25\n","upper_mileage"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"4cKoV6ZOJSvb"},"source":["## Produce the final estimate for the upper bound on fare amount"]},{"cell_type":"code","metadata":{"id":"UslfD8rgxviR"},"source":["%%bash\n","source utils.sh ; athena_query_to_pandas \"\n","WITH dc_taxi AS \n","(SELECT *, \n"," origindatetime_tr \n"," || fareamount_string \n"," || origin_block_latitude_string \n"," || origin_block_longitude_string \n"," || destination_block_latitude_string \n"," || destination_block_longitude_string \n"," || mileage_string AS objectid\n","\n"," FROM \"dc_taxi_db\".\"dc_taxi_parquet\"\n","\n"," WHERE fareamount_double >= 3.25\n"," AND fareamount_double IS NOT NULL\n"," AND mileage_double > 0 ),\n","\n","dc_taxi_samples AS (\n"," SELECT AVG(fareamount_double) AS average_fareamount\n"," FROM dc_taxi\n"," WHERE objectid IS NOT NULL\n"," GROUP BY MOD( ABS( from_big_endian_64( xxhash64( to_utf8( objectid ) ) ) ) , 1000)\n",")\n","SELECT AVG(average_fareamount) + 4 * STDDEV(average_fareamount)\n","FROM dc_taxi_samples\n","\""],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"id":"0zl0aN90xvo9"},"source":["awscli_to_df()"],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"id":"_Ws1qkOJyJ6A"},"source":["upper_fareamount = awscli_to_df().mean().item()\n","upper_fareamount"],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"id":"HnRDFMN6yUmF"},"source":["means = [15.96, 29.19, 48.89, 560, 2,422.45]\n","sum(means) / len(means)"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"ZHWQVcIHJDuz"},"source":["## Check the percentage of the dataset above the upper fare amount bound"]},{"cell_type":"code","metadata":{"id":"jxNv5mAVyh7K"},"source":["%%bash\n","source utils.sh ; athena_query_to_pandas \"\n","SELECT\n"," 100.0 * COUNT(fareamount_double) / \n"," (SELECT COUNT(*) \n"," FROM dc_taxi_db.dc_taxi_parquet \n"," WHERE fareamount_double IS NOT NULL) AS percent\n","FROM\n"," dc_taxi_db.dc_taxi_parquet\n","WHERE (fareamount_double < 3.25 OR fareamount_double > 179.75)\n"," AND fareamount_double IS NOT NULL;\n","\""],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"id":"-L_WoWDAyjeR"},"source":["awscli_to_df()"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"ntK1nMfLI8wk"},"source":["## Produce final summary statistics"]},{"cell_type":"code","metadata":{"id":"FSHcmmTlyvvK"},"source":["%%bash\n","source utils.sh ; athena_query_to_pandas \"\n","WITH src AS (SELECT fareamount_double AS val\n"," FROM dc_taxi_db.dc_taxi_parquet\n"," WHERE fareamount_double IS NOT NULL\n"," AND fareamount_double >= 3.25\n"," AND fareamount_double <= 180.0),\n","stats AS\n"," (SELECT\n"," ROUND(MIN(val), 2) AS min,\n"," ROUND(APPROX_PERCENTILE(val, 0.25), 2) AS q1,\n"," ROUND(APPROX_PERCENTILE(val, 0.5), 2) AS q2,\n"," ROUND(APPROX_PERCENTILE(val, 0.75), 2) AS q3,\n"," ROUND(AVG(val), 2) AS mean,\n"," ROUND(STDDEV(val), 2) AS std,\n"," ROUND(MAX(val), 2) AS max\n"," FROM src)\n","SELECT min, q1, q2, q3, max, mean, std\n","FROM stats;\n","\""],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"id":"yDcKPEtPyv0c"},"source":["awscli_to_df()"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"8Rb-Rba7Iied"},"source":["## Check that minimum and maximum locations are within DC boundaries\n","\n","Using the SQL query and OpenStreetMap ( https://www.openstreetmap.org/directions?engine=fossgis_osrm_car&route=38.8106%2C-77.1134%3B38.9940%2C-76.9100#map=11/38.9025/-77.0210 ) check that the minimum and maximum coordinates for the origin latitude and longitude columns confirm that resulting pairs (38.81138, -77.113633) and (38.994217, -76.910012) as well as ( 38.994217,\t-76.910012) and (38.81138,\t-77.113633) (https://www.openstreetmap.org/directions?engine=fossgis_osrm_car&route=38.994217%2C-76.910012%3B38.81138%2C-77.113633#map=11/38.9025/-77.0210 ) are within DC boundaries. \n","\n","\n","\n"]},{"cell_type":"code","metadata":{"id":"EGxcZAdzzCxg"},"source":["%%bash\n","source utils.sh ; athena_query_to_pandas \"\n","SELECT\n"," MIN(origin_block_latitude_double) AS olat_min,\n"," MIN(origin_block_longitude_double) AS olon_min,\n"," MAX(origin_block_latitude_double) AS olat_max,\n"," MAX(origin_block_longitude_double) AS olon_max,\n"," MIN(destination_block_latitude_double) AS dlat_min,\n"," MIN(destination_block_longitude_double) AS dlon_min,\n"," MAX(destination_block_latitude_double) AS dlat_max,\n"," MAX(destination_block_longitude_double) AS dlon_max\n","FROM\n"," dc_taxi_db.dc_taxi_parquet\n","\""],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"id":"-zAPUH8yzCua"},"source":["awscli_to_df()"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"rQVARZpYD9gw"},"source":["## Use a PySpark job to create a VACUUMed dataset\n","\n","The next cell uses the Jupyter `%%writefile` magic to save the source code for the PySpark job to the `dctaxi_parquet_vacuum.py` file."]},{"cell_type":"code","metadata":{"id":"b6lJXTYXyN5U"},"source":["%%writefile dctaxi_parquet_vacuum.py\n","import sys\n","from awsglue.transforms import *\n","from awsglue.utils import getResolvedOptions\n","from pyspark.context import SparkContext\n","from awsglue.context import GlueContext\n","from awsglue.job import Job\n","\n","args = getResolvedOptions(sys.argv, ['JOB_NAME',\n"," 'BUCKET_SRC_PATH',\n"," 'BUCKET_DST_PATH',\n"," ])\n","\n","BUCKET_SRC_PATH = args['BUCKET_SRC_PATH']\n","BUCKET_DST_PATH = args['BUCKET_DST_PATH']\n","\n","sc = SparkContext()\n","glueContext = GlueContext(sc)\n","logger = glueContext.get_logger()\n","spark = glueContext.spark_session\n","\n","job = Job(glueContext)\n","job.init(args['JOB_NAME'], args)\n","\n","df = ( spark\n"," .read\n"," .parquet(f\"{BUCKET_SRC_PATH}\") )\n"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"sh4FbJVrzr44"},"source":["Apply the SQL query developed though the VACUUM-based analysis of the data to prepare a version of the dataset without any `NULL` values and with an interval applied to the `fareamount_double` column."]},{"cell_type":"code","metadata":{"id":"E0Upxn7oyScc"},"source":["%%writefile -a dctaxi_parquet_vacuum.py\n","\n","df.createOrReplaceTempView(\"dc_taxi_parquet\")\n","\n","query_df = spark.sql(\"\"\"\n","SELECT\n"," fareamount_double,\n"," origindatetime_tr,\n"," origin_block_latitude_double,\n"," origin_block_longitude_double,\n"," destination_block_latitude_double,\n"," destination_block_longitude_double \n","FROM \n"," dc_taxi_parquet \n","WHERE \n"," origindatetime_tr IS NOT NULL\n"," AND fareamount_double IS NOT NULL\n"," AND fareamount_double >= 3.25\n"," AND fareamount_double <= 180.0\n"," AND origin_block_latitude_double IS NOT NULL\n"," AND origin_block_longitude_double IS NOT NULL\n"," AND destination_block_latitude_double IS NOT NULL\n"," AND destination_block_longitude_double IS NOT NULL\n","\"\"\".replace('\\n', ''))\n","\n"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"T08Ergub2N9j"},"source":["Convert the original, `STRING` formatted `origindatetime_tr` column into a SQL `TIMESTAMP` column named `origindatetime_ts`. The conversion is needed to extract the year, month, day of the week (`dow`), and hour of the taxi trip as separate numeric, `INTEGER` columns for machine learning. Lastly, drop any records that are missing values (for example due to failed conversion), or are duplicated in the dataset."]},{"cell_type":"code","metadata":{"id":"D7AcexukD9EI"},"source":["%%writefile -a dctaxi_parquet_vacuum.py\n","\n","\n","#parse to check for valid value of the original timestamp\n","from pyspark.sql.functions import col, to_timestamp, dayofweek, year, month, hour\n","from pyspark.sql.types import IntegerType\n","\n","#convert the source timestamp into numeric data needed for machine learning\n","query_df = (query_df\n"," .withColumn(\"origindatetime_ts\", to_timestamp(\"origindatetime_tr\", \"dd/MM/yyyy HH:mm\"))\n"," .where(col(\"origindatetime_ts\").isNotNull())\n"," .drop(\"origindatetime_tr\")\n"," .withColumn( 'year_integer', year('origindatetime_ts').cast(IntegerType()) )\n"," .withColumn( 'month_integer', month('origindatetime_ts').cast(IntegerType()) )\n"," .withColumn( 'dow_integer', dayofweek('origindatetime_ts').cast(IntegerType()) )\n"," .withColumn( 'hour_integer', hour('origindatetime_ts').cast(IntegerType()) )\n"," .drop('origindatetime_ts') )\n","\n","#drop missing data and duplicates\n","query_df = ( query_df\n"," .dropna()\n"," .drop_duplicates() )\n","\n"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"ioxdeCsb39h3"},"source":["Persists the cleaned up dataset as a Parquet formatted dataset in the AWS S3 location specified by the `BUCKET_DST_PATH` parameter. The `save_stats_metadata` function computes summary statistics of the clean up dataset and saves the statistics as a single CSV file located in a AWS S3 subfolder named `.meta/stats` under the S3 location from the `BUCKET_DST_PATH` parameter.\n"]},{"cell_type":"code","metadata":{"id":"dygcsspizkU0"},"source":["%%writefile -a dctaxi_parquet_vacuum.py\n","\n","\n","(query_df\n"," .write\n"," .parquet(f\"{BUCKET_DST_PATH}\", mode=\"overwrite\"))\n","\n","def save_stats_metadata(df, dest, header = 'true', mode = 'overwrite'):\n"," return (df.describe()\n"," .coalesce(1)\n"," .write\n"," .option(\"header\", header)\n"," .csv(dest, mode = mode))\n","\n","save_stats_metadata(query_df, f\"{BUCKET_DST_PATH}/.meta/stats\")\n","\n","job.commit()\n"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"IAqxnQVKEHhb"},"source":["## Run and monitor the PySpark job\n","* **it should take about 5 minutes for the job to complete**\n","\n","Once the PySpark job completes successfully, the job execution status should change from `RUNNING` to `SUCCEEDED`.\n"]},{"cell_type":"code","metadata":{"id":"QO2lJ5rPEJ2_"},"source":["%%bash\n","source utils.sh\n","\n","PYSPARK_SRC_NAME=dctaxi_parquet_vacuum.py \\\n","PYSPARK_JOB_NAME=dc-taxi-parquet-vacuum-job \\\n","BUCKET_SRC_PATH=s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/parquet \\\n","BUCKET_DST_PATH=s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/parquet/vacuum \\\n","run_job"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"-td5JdiHEPhx"},"source":["In case of a successful completion, the last cell should have produced an output similar to the following:\n","\n","```\n","2021-06-01 23:34:56 1840 dctaxi_parquet_vacuum.py\n","{\n"," \"JobName\": \"dc-taxi-parquet-vacuum-job\"\n","}\n","{\n"," \"Name\": \"dc-taxi-parquet-vacuum-job\"\n","}\n","{\n"," \"JobRunId\": \"jr_59eee7f229f448b39286f1bd19428c9082aaf6bed232342cc05e68f9246d131e\"\n","}\n","Waiting for the job to finish...............SUCCEEDED\n","```\n","\n","Once the PySpark job completes successfully, the job execution status should change from `RUNNING` to `SUCCEEDED`. You can run the next cell to get the updated job status.\n"]},{"cell_type":"code","metadata":{"id":"SHBy543wESN9"},"source":["!aws glue get-job-runs --job-name dc-taxi-parquet-vacuum-job --output text --query 'JobRuns[0].JobRunState'"],"execution_count":null,"outputs":[]}]}