{"nbformat":4,"nbformat_minor":0,"metadata":{"colab":{"name":"2022-01-08-serverless-ml.ipynb","provenance":[{"file_id":"https://github.com/osipov/smlbook/blob/master/ch2.ipynb","timestamp":1644511876857}],"collapsed_sections":[],"toc_visible":true},"kernelspec":{"name":"python3","display_name":"Python 3"}},"cells":[{"cell_type":"markdown","metadata":{"id":"77ZN-BAylPoU"},"source":["# Serverless Machine Learning in Action"]},{"cell_type":"markdown","source":["> Note: https://livebook.manning.com/book/serverless-machine-learning-in-action/chapter-2?a_aid=osipov&a_bid=fa913283&"],"metadata":{"id":"AQ7LHTvvjE-t"}},{"cell_type":"markdown","metadata":{"id":"NCaMbSI-OFtB"},"source":["## Download the DC taxi dataset\n","\n","`gdown` is a Python utility for downloading files stored in Google Drive. The bash script in the following cell iterates through a collection of Google Drive identifiers that match files `taxi_2015.zip` through `taxi_2019.zip` stored in a shared Google Drive. This script uses these files instead of the original files from https://opendata.dc.gov/search?categories=transportation&q=taxi&type=document%20link since the originals cannot be easily downloaded using a bash script.\n","\n","* **the next cell should take about a minute to finish**"]},{"cell_type":"code","metadata":{"id":"_dnRoxG2J-BY"},"source":["%%bash\n","pip install gdown\n","for ID in '1yF2hYrVjAZ3VPFo1dDkN80wUV2eEq65O'\\\n"," '1Z7ZVi79wKEbnc0FH3o0XKHGUS8MQOU6R'\\\n"," '1I_uraLKNbGPe3IeE7FUa9gPfwBHjthu4'\\\n"," '1MoY3THytktrxam6_hFSC8kW1DeHZ8_g1'\\\n"," '1balxhT6Qh_WDp4wq4OsG40iwqFa86QgW'\n","do\n","\n"," gdown --id $ID\n","\n","done"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"dcuxUE1oP5zB"},"source":["## Unzip the dataset\n","\n","The script in the following cell unzips the downloaded dataset files to the `dctaxi` subdirectory in the current directory of the notebook. The `-o` flag used by the `unzip` command overwrites existing files in case you execute the next cell more than once.\n","\n","* **the next cell should take about 3 minutes to complete**"]},{"cell_type":"code","metadata":{"id":"RqJ9uEg_LDR5"},"source":["%%bash\n","\n","mkdir -p dctaxi\n","\n","for YEAR in '2015' \\\n"," '2016' \\\n"," '2017' \\\n"," '2018' \\\n"," '2019'\n","do\n","\n"," unzip -o taxi_$YEAR.zip -d dctaxi\n"," \n","done"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"kvYQMYZSQjpK"},"source":["## Report on the disk space used by the dataset files\n","\n","The next cell reports on the disk usage (`du`) by the files from the DC taxi dataset. All of the files in the dataset have the `taxi_` prefix. Since the entire output of the `du` command lists the disk usage of all of the files, the `tail` command is used to limit the output to just the last line. You can remove the `tail` command (in other words, leave just `du -cha taxi_*.txt` in the next cell) if you wish to report on the disk usage by the individual files in the dataset.\n","\n","For reference, the entire report on disk usage is also available as a Github Gist here: https://gist.github.com/osipov/032505a9c7e7388a2384f893be9e0681"]},{"cell_type":"code","metadata":{"id":"5TXVXaanNUJE"},"source":["!du -cha --block-size=1MB dctaxi/taxi_*.txt | tail -n 1"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"AJrzrRFaSejc"},"source":["## Scan the dataset documentation\n","\n","The dataset includes a `README_DC_Taxicab_trip.txt` file with a brief documentation about the dataset contents. Run the next cell and take a moment to review the documentation, focusing on the schema used by the dataset."]},{"cell_type":"code","metadata":{"id":"OKbux5ecLSCK"},"source":["%%bash\n","cat dctaxi/README_DC_Taxicab_trip.txt"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"gCyrbSReS55v"},"source":["## Preview the dataset\n","\n","Run the next cell to confirm that the dataset consists of pipe (`|`) separated values organized according to the schema described by the documentation. The `taxi_2015_09.txt` file used in the next cell was picked arbitrarily, just to illustrate the dataset."]},{"cell_type":"code","metadata":{"id":"lCm6l_DLS9VZ"},"source":["!head dctaxi/taxi_2015_09.txt"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"GZ2rTEBfU20C"},"source":["## Download and install AWS CLI"]},{"cell_type":"code","metadata":{"id":"ei0Vm3p9UkT1"},"source":["%%bash\n","curl \"https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip\" -o \"awscliv2.zip\"\n","unzip -o awscliv2.zip\n","sudo ./aws/install"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"1xoSKwf7U77e"},"source":["## Specify AWS credentials\n","\n","Modify the contents of the next cell to specify your AWS credentials as strings. \n","\n","If you see the following exception:\n","\n","`TypeError: str expected, not NoneType`\n","\n","It means that you did not specify the credentials correctly."]},{"cell_type":"code","metadata":{"id":"CaRjFdSoT-q1"},"source":["import os\n","# *** REPLACE None in the next 2 lines with your AWS key values ***\n","os.environ['AWS_ACCESS_KEY_ID'] = None\n","os.environ['AWS_SECRET_ACCESS_KEY'] = None"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"aAMFo90AVJuI"},"source":["## Confirm the credentials\n","\n","Run the next cell to validate your credentials."]},{"cell_type":"code","metadata":{"id":"VZqAz5PjS_f1"},"source":["%%bash\n","aws sts get-caller-identity"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"66DsruTZWERS"},"source":["If you have specified the correct credentials as values for the `AWS_ACCESS_KEY_ID` and the `AWS_SECRET_ACCESS_KEY` environment variables, then `aws sts get-caller-identity` used by the previous cell should have returned back the `UserId`, `Account` and the `Arn` for the credentials, resembling the following\n","\n","```\n","{\n"," \"UserId\": \"█████████████████████\",\n"," \"Account\": \"████████████\",\n"," \"Arn\": \"arn:aws:iam::████████████:user/█████████\"\n","}\n","```"]},{"cell_type":"markdown","metadata":{"id":"wywu4hC-WPxV"},"source":["## Specify the region\n","\n","Replace the `None` in the next cell with your AWS region name, for example `us-west-2`."]},{"cell_type":"code","metadata":{"id":"IowJTSN1e8B-"},"source":["# *** REPLACE None in the next line with your AWS region ***\n","os.environ['AWS_DEFAULT_REGION'] = None"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"ZwJSUTvlfSE0"},"source":["If you have specified the region correctly, the following cell should return back the region that you have specifies."]},{"cell_type":"code","metadata":{"id":"2CssvgRfUSu9"},"source":["%%bash\n","echo $AWS_DEFAULT_REGION"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"_s0XszRAi4sO"},"source":["## Create unique bucket ID\n","\n","Use the bash `$RANDOM` pseudo-random number generator and the first 32 characters of the `md5sum` output to produce a unique bucket ID."]},{"cell_type":"code","metadata":{"id":"R99zeeQkfmak"},"source":["BUCKET_ID = !echo $(echo $RANDOM | md5sum | cut -c -32)\n","os.environ['BUCKET_ID'] = next(iter(BUCKET_ID))\n","os.environ['BUCKET_ID']"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"9KPk03iKkQnq"},"source":["## Save a backup copy of the `BUCKET_ID`\n","\n","The next cell saves the contents of the `BUCKET_ID` environment variable to a `BUCKET_ID` file as a backup."]},{"cell_type":"code","metadata":{"id":"UiKvBqAof63B"},"source":["val = os.environ['BUCKET_ID']\n","%store val > BUCKET_ID\n","!cat BUCKET_ID"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"GSR-WujrkVhF"},"source":["## Download the `BUCKET_ID` file\n","\n","Ensure that you have a backup copy of the `BUCKET_ID` file created by the previous cell before proceeding. The contents of the `BUCKET_ID` file are going to be reused later in this notebook and in the other notebooks.\n"]},{"cell_type":"markdown","metadata":{"id":"2RRQXy_AmfrD"},"source":["## Create an AWS bucket"]},{"cell_type":"code","metadata":{"id":"C4OrMK8bmjUM"},"source":["%%bash\n","aws s3api create-bucket --bucket dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION --create-bucket-configuration LocationConstraint=$AWS_DEFAULT_REGION"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"jRg9sMTFmqEV"},"source":["If the previous cell executed successfully, then it should have produced an output resembling the following:\n","\n","```\n","{\n"," \"Location\": \"http:/dc-taxi-████████████████████████████████-█████████.s3.amazonaws.com/\"\n","}\n","```\n","\n","You can return back the name of the bucket by running the following cell:"]},{"cell_type":"code","metadata":{"id":"VMekJlTGm-11"},"source":["!echo s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"4CJ2C81YnIO6"},"source":["You can also use the AWS CLI `list-buckets` command to print out all the buckets that exist in your AWS account, however the printed names will not show the `s3://` prefix:"]},{"cell_type":"code","metadata":{"id":"FM6SjdPBkuwH"},"source":["!aws s3api list-buckets"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"IlTfJ8S5ruJU"},"source":["## Upload the DC taxi dataset to AWS S3\n","\n","Synchronize the contents of the `dctaxi` directory (where you unzipped the dataset) to the `csv` folder in the S3 bucket you just created. \n","\n","* **the next cell should take about 4 minutes to complete**"]},{"cell_type":"code","metadata":{"id":"BQ3JWPb5kPBU"},"source":["%%bash\n","aws s3 sync \\\n"," --exclude 'README*' \\\n"," dctaxi/ s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/csv/"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"m6cK-rVvsrQx"},"source":["## Confirm a successful copy\n","\n","You can check whether the `aws sync` command completed successfully, by listing the contents of the newly created bucket. Run the following cell:"]},{"cell_type":"code","metadata":{"id":"yD0AuIsbrmhR"},"source":["!aws s3 ls --recursive --summarize --human-readable s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/csv/ | tail -n 2"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"Cwjo2RU0tCkB"},"source":["which should have returned\n","\n","```\n","Total Objects: 54\n"," Total Size: 11.2 GiB\n","```\n","\n","if the dataset was copied to S3 successfully.\n","\n","As before you can remove the `tail -n 2` part in the previous cell to report the entire contents of the `csv` folder on S3."]},{"cell_type":"markdown","metadata":{"id":"H8BgQUXCufMo"},"source":["## Create AWS role and policy to allow Glue to access the S3 bucket"]},{"cell_type":"code","metadata":{"id":"JJNiedyRspHT"},"source":["%%bash\n","aws iam detach-role-policy --role-name AWSGlueServiceRole-dc-taxi --policy-arn arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole && \\\n","aws iam delete-role-policy --role-name AWSGlueServiceRole-dc-taxi --policy-name GlueBucketPolicy && \\\n","aws iam delete-role --role-name AWSGlueServiceRole-dc-taxi\n","\n","aws iam create-role --path \"/service-role/\" --role-name AWSGlueServiceRole-dc-taxi --assume-role-policy-document '{\n"," \"Version\": \"2012-10-17\",\n"," \"Statement\": [\n"," {\n"," \"Effect\": \"Allow\",\n"," \"Principal\": {\n"," \"Service\": \"glue.amazonaws.com\"\n"," },\n"," \"Action\": \"sts:AssumeRole\"\n"," }\n"," ]\n","}'\n","\n","aws iam attach-role-policy --role-name AWSGlueServiceRole-dc-taxi --policy-arn arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole\n","\n","aws iam put-role-policy --role-name AWSGlueServiceRole-dc-taxi --policy-name GlueBucketPolicy --policy-document '{\n"," \"Version\": \"2012-10-17\",\n"," \"Statement\": [\n"," {\n"," \"Effect\": \"Allow\",\n"," \"Action\": [\n"," \"s3:*\"\n"," ],\n"," \"Resource\": [\n"," \"arn:aws:s3:::dc-taxi-'$BUCKET_ID'-'$AWS_DEFAULT_REGION'/*\"\n"," ]\n"," }\n"," ]\n","}'"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"aZgCUjxAxAu2"},"source":["## Create an AWS Glue database"]},{"cell_type":"code","metadata":{"id":"joKA9y_9wLtH"},"source":["%%bash\n","aws glue delete-database --name dc_taxi_db 2> /dev/null\n","\n","aws glue create-database --database-input '{\n"," \"Name\": \"dc_taxi_db\"\n","}'\n","\n","aws glue get-database --name 'dc_taxi_db'"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"GFmG7ra7xlaD"},"source":["## Create an AWS Glue crawler\n","\n","Save the results of crawling the S3 bucket with the DC taxi dataset to the AWS Glue database created in the previous cell."]},{"cell_type":"code","metadata":{"id":"n-BMVq29xXi0"},"source":["%%bash\n","aws glue delete-crawler --name dc-taxi-csv-crawler 2> /dev/null\n","\n","aws glue create-crawler \\\n"," --name dc-taxi-csv-crawler \\\n"," --database-name dc_taxi_db \\\n"," --table-prefix dc_taxi_ \\\n"," --role $( aws iam get-role \\\n"," --role-name AWSGlueServiceRole-dc-taxi \\\n"," --query 'Role.Arn' \\\n"," --output text ) \\\n"," --targets '{\n"," \"S3Targets\": [\n"," {\n"," \"Path\": \"s3://dc-taxi-'$BUCKET_ID'-'$AWS_DEFAULT_REGION'/csv\"\n"," }]\n","}'\n","\n","aws glue start-crawler --name dc-taxi-csv-crawler"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"nS7INrVvyHL8"},"source":["## Check the status of the AWS Glue crawler\n","\n","When running this notebook, you need to re-run the next cell to get updates on crawler status. It should cycle through `STARTING`, `RUNNING`, `STOPPING`, and `READY`. \n","\n","It will take the crawler about a minute to finish crawling the DC taxi dataset."]},{"cell_type":"code","metadata":{"id":"m_XxAWuDx66T"},"source":["%%bash\n","aws glue get-crawler --name dc-taxi-csv-crawler --query 'Crawler.State' --output text"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"I_DisBrCIQaL"},"source":["Poll the crawler state every minute to wait for it to finish."]},{"cell_type":"code","metadata":{"id":"i5PfPixmKlVs"},"source":["%%bash\n","printf \"Waiting for crawler to finish...\"\n","until echo \"$(aws glue get-crawler --name dc-taxi-csv-crawler --query 'Crawler.State' --output text)\" | grep -q \"READY\"; do\n"," sleep 60\n"," printf \"...\"\n","done\n","printf \"done\\n\""],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"RvmEu4JiyNPq"},"source":["## Find out the last known status of the AWS Glue crawler"]},{"cell_type":"code","metadata":{"id":"Nm30IriYx_yH"},"source":["!aws glue get-crawler --name dc-taxi-csv-crawler --query 'Crawler.LastCrawl'"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"I5CH23_WyZNS"},"source":["## Describe the table created by the AWS Glue crawler"]},{"cell_type":"code","metadata":{"id":"xHPAzHfGyLzF"},"source":["!aws glue get-table --database-name dc_taxi_db --name dc_taxi_csv"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"osbkOyeDLHvi"},"source":["## Create a PySpark job to convert CSV to Parquet\n","\n","The next cell uses the Jupyter `%%writefile` magic to save the source code for the PySpark job to the `dctaxi_csv_to_parquet.py` file."]},{"cell_type":"code","metadata":{"id":"880hw7aeKqXB"},"source":["%%writefile dctaxi_csv_to_parquet.py\n","import sys\n","from awsglue.transforms import *\n","from awsglue.utils import getResolvedOptions\n","from pyspark.context import SparkContext\n","from awsglue.context import GlueContext\n","from awsglue.job import Job\n","\n","args = getResolvedOptions(sys.argv, ['JOB_NAME',\n"," 'BUCKET_SRC_PATH',\n"," 'BUCKET_DST_PATH',\n","\t\t\t\t\t\t\t\t\t 'DST_VIEW_NAME'])\n","\n","BUCKET_SRC_PATH = args['BUCKET_SRC_PATH']\n","BUCKET_DST_PATH = args['BUCKET_DST_PATH']\n","DST_VIEW_NAME = args['DST_VIEW_NAME']\n","\n","sc = SparkContext()\n","glueContext = GlueContext(sc)\n","logger = glueContext.get_logger()\n","spark = glueContext.spark_session\n","\n","job = Job(glueContext)\n","job.init(args['JOB_NAME'], args)\n","\n","df = ( spark.read.format(\"csv\")\n","\t\t.option(\"header\", True)\n","\t\t.option(\"inferSchema\", True)\n"," .option(\"multiLine\", True) \n","\t\t.option(\"delimiter\", \"|\")\n","\t\t.load(f\"{BUCKET_SRC_PATH}\") )\n","\n","df.createOrReplaceTempView(f\"{DST_VIEW_NAME}\")\n","\n","query_df = spark.sql(f\"\"\"\n","SELECT \n","\n"," CAST(fareamount AS DOUBLE) AS fareamount_double,\n"," CAST(fareamount AS STRING) AS fareamount_string,\n","\n"," CAST(origindatetime_tr AS STRING) AS origindatetime_tr,\n","\n"," CAST(origin_block_latitude AS DOUBLE) AS origin_block_latitude_double,\n"," CAST(origin_block_latitude AS STRING) AS origin_block_latitude_string,\n","\n"," CAST(origin_block_longitude AS DOUBLE) AS origin_block_longitude_double,\n"," CAST(origin_block_longitude AS STRING) AS origin_block_longitude_string,\n","\n"," CAST(destination_block_latitude AS DOUBLE) AS destination_block_latitude_double,\n"," CAST(destination_block_latitude AS STRING) AS destination_block_latitude_string,\n","\n"," CAST(destination_block_longitude AS DOUBLE) AS destination_block_longitude_double,\n"," CAST(destination_block_longitude AS STRING) AS destination_block_longitude_string,\n","\n"," CAST(mileage AS DOUBLE) AS mileage_double,\n"," CAST(mileage AS STRING) AS mileage_string \n","\n","FROM {DST_VIEW_NAME}\"\"\".replace('\\n', ''))\n","\n","query_df.write.parquet(f\"{BUCKET_DST_PATH}\", mode=\"overwrite\")\n","\n","job.commit()"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"b5kw70zRLskj"},"source":["## Copy the PySpark job code to the S3 bucket"]},{"cell_type":"code","metadata":{"id":"sy84Mn4YLsDH"},"source":["%%bash\n","aws s3 cp dctaxi_csv_to_parquet.py s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/glue/\n","aws s3 ls s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/glue/dctaxi_csv_to_parquet.py"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"m2LA3Y2IgM9p"},"source":["## Create and start the PySpark job"]},{"cell_type":"code","metadata":{"id":"8djrkqIFL1qY"},"source":["%%bash\n","aws glue delete-job --job-name dc-taxi-csv-to-parquet-job 2> /dev/null\n","\n","aws glue create-job \\\n"," --name dc-taxi-csv-to-parquet-job \\\n"," --role $(aws iam get-role --role-name AWSGlueServiceRole-dc-taxi --query 'Role.Arn' --output text) \\\n"," --default-arguments '{\"--TempDir\":\"s3://dc-taxi-'$BUCKET_ID'-'$AWS_DEFAULT_REGION'/glue/\"}' \\\n"," --command '{\n"," \"ScriptLocation\": \"s3://dc-taxi-'$BUCKET_ID'-'$AWS_DEFAULT_REGION'/glue/dctaxi_csv_to_parquet.py\",\n"," \"Name\": \"glueetl\",\n"," \"PythonVersion\": \"3\"\n"," }' \\\n"," --glue-version \"2.0\"\n","\n","aws glue start-job-run \\\n"," --job-name dc-taxi-csv-to-parquet-job \\\n"," --arguments='--BUCKET_SRC_PATH=\"'$(\n"," echo s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/csv/*.txt\n"," )'\",\n"," --BUCKET_DST_PATH=\"'$(\n"," echo s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/parquet/\n"," )'\",\n"," --DST_VIEW_NAME=\"dc_taxi_csv\"'"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"zx1NQZYLgdel"},"source":["In case of a successful completion, the last cell should have produced an output similar to the following:\n","\n","```\n","{\n"," \"Name\": \"dc-taxi-csv-to-parquet-job\"\n","}\n","{\n"," \"JobRunId\": \"jr_925ab8ea6e5bdd64d4491c6f641bcc58f5c7b0140edcdba9896052c70d3675fe\"\n","}\n","```"]},{"cell_type":"markdown","metadata":{"id":"FYrM2iS9gnmX"},"source":["## Monitor the job execution\n","\n","* **it should take about 3 minutes for the job to complete**\n","\n","Once the PySpark job completes successfully, the job execution status should change from `RUNNING` to `SUCCEEDED`. You can re-run the next cell to get the updated job status."]},{"cell_type":"code","metadata":{"id":"oYwrYdVFL4WH"},"source":["!aws glue get-job-runs --job-name dc-taxi-csv-to-parquet-job --output text --query 'JobRuns[0].JobRunState'"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"U2iFXtBuKB9i"},"source":["Poll the job every minute to wait for it to finish"]},{"cell_type":"code","metadata":{"id":"HDNKiOukOgco"},"source":["%%bash\n","printf \"Waiting for the job to finish...\"\n","while echo \"$(aws glue get-job-runs --job-name dc-taxi-csv-to-parquet-job --query 'JobRuns[0].JobRunState')\" | grep -q -E \"STARTING|RUNNING|STOPPING\"; do\n"," sleep 60\n"," printf \"...\"\n","done\n","aws glue get-job-runs --job-name dc-taxi-csv-to-parquet-job --output text --query 'JobRuns[0].JobRunState'"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"MYxJwg-cgu3a"},"source":["## Confirm the CSV to Parquet convertion job completed successfully\n","\n"]},{"cell_type":"code","metadata":{"id":"9uoI1v1Jgszl"},"source":["!aws s3 ls --recursive --summarize --human-readable s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/parquet/ | tail -n 2"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"ZJguy-X4hoKi"},"source":["Assuming the Parquet files have been correctly created in the S3 bucket, the previous cell should output the following:\n","\n","```\n","Total Objects: 53\n"," Total Size: 941.3 MiB\n","```"]},{"cell_type":"markdown","metadata":{"id":"3xxR3fpLiIQc"},"source":["## Crawl the Parquet dataset"]},{"cell_type":"code","metadata":{"id":"F_wYQ0GXg2q7"},"source":["%%bash\n","aws glue delete-crawler --name dc-taxi-parquet-crawler 2> /dev/null\n","\n","aws glue create-crawler --name dc-taxi-parquet-crawler --database-name dc_taxi_db --table-prefix dc_taxi_ --role `aws iam get-role --role-name AWSGlueServiceRole-dc-taxi --query 'Role.Arn' --output text` --targets '{\n"," \"S3Targets\": [\n"," {\n"," \"Path\": \"s3://dc-taxi-'$BUCKET_ID'-'$AWS_DEFAULT_REGION'/parquet/\"\n"," }]\n","}'\n","\n","aws glue start-crawler --name dc-taxi-parquet-crawler"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"rjpuQT4Tijp_"},"source":["## Monitor the Parquet crawler status\n","\n","* **the crawler should take about 2 minutes to finish**\n","\n","Once done, the crawler should return to a `READY` state."]},{"cell_type":"code","metadata":{"id":"97lxt-GHiUaI"},"source":["!aws glue get-crawler --name dc-taxi-parquet-crawler --query 'Crawler.State' --output text"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"thUt_02jKmRS"},"source":["Poll the crawler status every minute to wait for it to finish"]},{"cell_type":"code","metadata":{"id":"Ys-tvL_xO3ly"},"source":["%%bash\n","printf \"Waiting for crawler to finish...\"\n","until echo \"$(aws glue get-crawler --name dc-taxi-parquet-crawler --query 'Crawler.State' --output text)\" | grep -q \"READY\"; do\n"," sleep 10\n"," printf \"...\"\n","done\n","aws glue get-crawler --name dc-taxi-parquet-crawler --query 'Crawler.State' --output text"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"8omIXmOojBlA"},"source":["## Confirm that the crawler successfully created the `dc_taxi_parquet` table\n","\n","If the crawler completed successfully, the number of records in the `dc_taxi_parquet` table as reported by the following command should be equal to `53173692`"]},{"cell_type":"code","metadata":{"id":"3Id27YhNihfz"},"source":["!aws glue get-table --database-name dc_taxi_db --name dc_taxi_parquet --query \"Table.Parameters.recordCount\" --output text"],"execution_count":null,"outputs":[]}]}