{ "cells": [ { "cell_type": "markdown", "id": "71414ff8", "metadata": {}, "source": [ "# Module 6 Homework\n", "\n", "In this homework we'll put what we learned about Spark in practice.\n", "\n", "For this homework we will be using the Yellow 2025-11 data from the official website:\n", "\n", "```bash\n", "wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-11.parquet\n", "```\n" ] }, { "cell_type": "markdown", "id": "6e7b46b3", "metadata": {}, "source": [ "\n", "## Question 1: Install Spark and PySpark\n", "\n", "- Install Spark\n", "- Run PySpark\n", "- Create a local spark session\n", "- Execute spark.version.\n", "\n", "What's the output?\n", "\n", "> [!NOTE]\n", "> To install PySpark follow this [guide](https://github.com/DataTalksClub/data-engineering-zoomcamp/blob/main/06-batch/setup/)\n", "\n", "**Answer:** Spark version: 4.1.1" ] }, { "cell_type": "code", "execution_count": 2, "id": "67ec651c", "metadata": {}, "outputs": [], "source": [ "import os" ] }, { "cell_type": "code", "execution_count": 1, "id": "71fe26eb", "metadata": {}, "outputs": [], "source": [ "hadoop_dir = \"C:/hadoop-3.3.6\"" ] }, { "cell_type": "code", "execution_count": 3, "id": "76f8754d", "metadata": {}, "outputs": [], "source": [ "os.environ['HADOOP_HOME'] = hadoop_dir\n", "os.environ['PATH'] += os.pathsep + hadoop_dir + \"/bin\"" ] }, { "cell_type": "code", "execution_count": 4, "id": "4239b182", "metadata": { "vscode": { "languageId": "plaintext" } }, "outputs": [], "source": [ "import pyspark\n", "from pyspark.sql import SparkSession" ] }, { "cell_type": "code", "execution_count": 5, "id": "e3dd369e", "metadata": { "scrolled": true, "vscode": { "languageId": "plaintext" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Spark version: 4.1.1\n" ] } ], "source": [ "spark = SparkSession.builder \\\n", " .master(\"local[*]\") \\\n", " .appName(\"taxi_homework\") \\\n", " .config(\"spark.hadoop.fs.file.impl\", \"org.apache.hadoop.fs.RawLocalFileSystem\") \\\n", " .getOrCreate()\n", "\n", "print(f\"Spark version: {spark.version}\")" ] }, { "cell_type": "code", "execution_count": 6, "id": "e4e37967", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Hadoop Home in JVM: C:/hadoop-3.3.6\n" ] } ], "source": [ "jvm_hadoop_home = spark._jvm.java.lang.System.getenv(\"HADOOP_HOME\")\n", "print(f\"Hadoop Home in JVM: {jvm_hadoop_home}\")" ] }, { "cell_type": "code", "execution_count": null, "id": "2412579b", "metadata": {}, "outputs": [], "source": [ "print(\"Hadoop Home in JVM now:\", spark._jvm.java.lang.System.getProperty(\"hadoop.home.dir\"))" ] }, { "cell_type": "markdown", "id": "0bbcd7c9", "metadata": {}, "source": [ "## Question 2: Yellow November 2025\n", "\n", "Read the November 2025 Yellow into a Spark Dataframe.\n", "\n", "Repartition the Dataframe to 4 partitions and save it to parquet.\n", "\n", "What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.\n", "\n", "- 6MB\n", "- 25MB\n", "- 75MB\n", "- 100MB\n", "\n", "**Answer:** 25MB" ] }, { "cell_type": "code", "execution_count": null, "id": "e98b609f", "metadata": {}, "outputs": [], "source": [ "yellow_schema = types.StructType([\n", " types.StructField(\"VendorID\", types.IntegerType(), True),\n", " types.StructField(\"tpep_pickup_datetime\", types.TimestampType(), True),\n", " types.StructField(\"tpep_dropoff_datetime\", types.TimestampType(), True),\n", " types.StructField(\"passenger_count\", types.LongType(), True),\n", " types.StructField(\"trip_distance\", types.DoubleType(), True),\n", " types.StructField(\"RatecodeID\", types.LongType(), True),\n", " types.StructField(\"store_and_fwd_flag\", types.StringType(), True),\n", " types.StructField(\"PULocationID\", types.IntegerType(), True),\n", " types.StructField(\"DOLocationID\", types.IntegerType(), True),\n", " types.StructField(\"payment_type\", types.LongType(), True),\n", " types.StructField(\"fare_amount\", types.DoubleType(), True),\n", " types.StructField(\"extra\", types.DoubleType(), True),\n", " types.StructField(\"mta_tax\", types.DoubleType(), True),\n", " types.StructField(\"tip_amount\", types.DoubleType(), True),\n", " types.StructField(\"tolls_amount\", types.DoubleType(), True),\n", " types.StructField(\"improvement_surcharge\", types.DoubleType(), True),\n", " types.StructField(\"total_amount\", types.DoubleType(), True),\n", " types.StructField(\"congestion_surcharge\", types.DoubleType(), True),\n", " types.StructField(\"Airport_fee\", types.DoubleType(), True),\n", " types.StructField(\"cbd_congestion_fee\", types.DoubleType(), True)\n", "])" ] }, { "cell_type": "code", "execution_count": 7, "id": "285da203", "metadata": {}, "outputs": [], "source": [ "df = spark.read.parquet('yellow_tripdata_2025-11.parquet')" ] }, { "cell_type": "code", "execution_count": 8, "id": "0e319054", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "StructType([StructField('VendorID', IntegerType(), True), StructField('tpep_pickup_datetime', TimestampNTZType(), True), StructField('tpep_dropoff_datetime', TimestampNTZType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', LongType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('Airport_fee', DoubleType(), True), StructField('cbd_congestion_fee', DoubleType(), True)])" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.schema" ] }, { "cell_type": "code", "execution_count": 9, "id": "9a08c032", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+\n", "|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|\n", "+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+\n", "| 7| 2025-11-01 00:13:25| 2025-11-01 00:13:25| 1| 1.68| 1| N| 43| 186| 1| 14.9| 0.0| 0.5| 1.5| 0.0| 1.0| 22.15| 2.5| 0.0| 0.75|\n", "| 2| 2025-11-01 00:49:07| 2025-11-01 01:01:22| 1| 2.28| 1| N| 142| 237| 1| 14.2| 1.0| 0.5| 4.99| 0.0| 1.0| 24.94| 2.5| 0.0| 0.75|\n", "| 1| 2025-11-01 00:07:19| 2025-11-01 00:20:41| 0| 2.7| 1| N| 163| 238| 1| 15.6| 4.25| 0.5| 4.27| 0.0| 1.0| 25.62| 2.5| 0.0| 0.75|\n", "| 2| 2025-11-01 00:00:00| 2025-11-01 01:01:03| 3| 12.87| 1| N| 138| 261| 1| 66.7| 6.0| 0.5| 0.0| 6.94| 1.0| 86.14| 2.5| 1.75| 0.75|\n", "| 1| 2025-11-01 00:18:50| 2025-11-01 00:49:32| 0| 8.4| 1| N| 138| 37| 2| 39.4| 7.75| 0.5| 0.0| 0.0| 1.0| 48.65| 0.0| 1.75| 0.0|\n", "+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+\n", "only showing top 5 rows\n" ] } ], "source": [ "df.show(5)" ] }, { "cell_type": "code", "execution_count": 10, "id": "4578fc90", "metadata": {}, "outputs": [], "source": [ "df.repartition(4).write.mode(\"overwrite\").parquet('yellow/2025/11')" ] }, { "cell_type": "markdown", "id": "5b91c521", "metadata": {}, "source": [ "## Question 3: Count records\n", "\n", "How many taxi trips were there on the 15th of November?\n", "\n", "Consider only trips that started on the 15th of November.\n", "\n", "- 62,610\n", "- 102,340\n", "- 162,604\n", "- 225,768\n", "\n", "**Answer:** 162,604\n" ] }, { "cell_type": "code", "execution_count": 11, "id": "a4195483", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import functions as F" ] }, { "cell_type": "code", "execution_count": 12, "id": "9ebd2f98", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Total trips on November 15th: 162604\n" ] } ], "source": [ "df_partitioned = spark.read.parquet('yellow/2025/11')\n", "\n", "trips_nov_15 = df_partitioned.filter(F.to_date(df_partitioned.tpep_pickup_datetime) == '2025-11-15')\n", "\n", "print(f\"Total trips on November 15th: {trips_nov_15.count()}\")" ] }, { "cell_type": "markdown", "id": "45cdd870", "metadata": {}, "source": [ "\n", "## Question 4: Longest trip\n", "\n", "What is the length of the longest trip in the dataset in hours?\n", "\n", "- 22.7\n", "- 58.2\n", "- 90.6\n", "- 134.5\n", "\n", "**Answer:** 90.6\n" ] }, { "cell_type": "code", "execution_count": 18, "id": "a4f9d515", "metadata": {}, "outputs": [], "source": [ "df_with_duration = df_partitioned.withColumn(\n", " 'trip_duration_hours', \n", " (F.col(\"tpep_dropoff_datetime\") - F.col(\"tpep_pickup_datetime\")).cast('long') / 3600\n", ")" ] }, { "cell_type": "code", "execution_count": 20, "id": "4609ae90", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "The longest trip duration is: 90.6 hours\n" ] } ], "source": [ "longest_trip = df_with_duration.select(F.max(\"trip_duration_hours\")).collect()[0][0]\n", "\n", "print(f\"The longest trip duration is: {longest_trip:.1f} hours\")" ] }, { "cell_type": "markdown", "id": "61457b90", "metadata": {}, "source": [ "## Question 5: User Interface\n", "\n", "Spark's User Interface which shows the application's dashboard runs on which local port?\n", "\n", "- 80\n", "- 443\n", "- 4040\n", "- 8080\n", "\n", "**Answer:** 4040" ] }, { "cell_type": "markdown", "id": "8e6ffd72", "metadata": {}, "source": [ "## Question 6: Least frequent pickup location zone\n", "\n", "Load the zone lookup data into a temp view in Spark:\n", "\n", "```bash\n", "wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv\n", "```\n", "\n", "Using the zone lookup data and the Yellow November 2025 data, what is the name of the LEAST frequent pickup location Zone?\n", "\n", "- Governor's Island/Ellis Island/Liberty Island\n", "- Arden Heights\n", "- Rikers Island\n", "- Jamaica Bay\n", "\n", "If multiple answers are correct, select any\n", "\n", "**Answer:** \n", "- Eltingville/Annadale/Prince's Bay\n", "- Governor's Island/Ellis Island/Liberty Island\n", "- Arden Heights\n" ] }, { "cell_type": "code", "execution_count": 22, "id": "854df312", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+\n", "|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|\n", "+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+\n", "| 2| 2025-11-02 08:11:08| 2025-11-02 08:15:21| 1| 1.24| 1| N| 186| 230| 1| 7.9| 0.0| 0.5| 2.53| 0.0| 1.0| 15.18| 2.5| 0.0| 0.75|\n", "| 2| 2025-11-06 14:01:48| 2025-11-06 14:25:53| 2| 1.84| 1| N| 164| 237| 2| 20.5| 0.0| 0.5| 0.0| 0.0| 1.0| 25.25| 2.5| 0.0| 0.75|\n", "| 2| 2025-11-07 16:53:08| 2025-11-07 17:10:10| 1| 1.15| 1| N| 186| 161| 1| 14.9| 2.5| 0.5| 0.04| 0.0| 1.0| 22.19| 2.5| 0.0| 0.75|\n", "| 2| 2025-11-09 10:55:05| 2025-11-09 10:58:57| 1| 0.55| 1| N| 68| 246| 1| 5.8| 0.0| 0.5| 2.11| 0.0| 1.0| 12.66| 2.5| 0.0| 0.75|\n", "| 2| 2025-11-03 13:35:09| 2025-11-03 13:47:48| 1| 0.91| 1| N| 90| 164| 1| 12.1| 0.0| 0.5| 3.37| 0.0| 1.0| 20.22| 2.5| 0.0| 0.75|\n", "+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+\n", "only showing top 5 rows\n" ] } ], "source": [ "df_partitioned.show(5)" ] }, { "cell_type": "code", "execution_count": 23, "id": "236f8502", "metadata": {}, "outputs": [], "source": [ "df_zones = spark.read \\\n", " .option(\"header\", \"true\") \\\n", " .csv('taxi_zone_lookup.csv')" ] }, { "cell_type": "code", "execution_count": 38, "id": "be0bf07d", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+-------------+--------------------+------------+\n", "|LocationID| Borough| Zone|service_zone|\n", "+----------+-------------+--------------------+------------+\n", "| 1| EWR| Newark Airport| EWR|\n", "| 2| Queens| Jamaica Bay| Boro Zone|\n", "| 3| Bronx|Allerton/Pelham G...| Boro Zone|\n", "| 4| Manhattan| Alphabet City| Yellow Zone|\n", "| 5|Staten Island| Arden Heights| Boro Zone|\n", "+----------+-------------+--------------------+------------+\n", "only showing top 5 rows\n" ] } ], "source": [ "df_zones.show(5)" ] }, { "cell_type": "code", "execution_count": 34, "id": "0466f1fa", "metadata": {}, "outputs": [], "source": [ "location_counts = df_partitioned.groupBy(\"PULocationID\").count()" ] }, { "cell_type": "code", "execution_count": 35, "id": "8372a594", "metadata": {}, "outputs": [], "source": [ "least_frequent_zones = location_counts.join(\n", " df_zones, \n", " location_counts.PULocationID == df_zones.LocationID, \n", " how=\"inner\"\n", ")" ] }, { "cell_type": "code", "execution_count": 40, "id": "75bef66e", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+-----+\n", "| Zone|count|\n", "+--------------------+-----+\n", "|Eltingville/Annad...| 1|\n", "|Governor's Island...| 1|\n", "| Arden Heights| 1|\n", "| Port Richmond| 3|\n", "| Green-Wood Cemetery| 4|\n", "| Great Kills| 4|\n", "| Rossville/Woodrow| 4|\n", "| Rikers Island| 4|\n", "| Jamaica Bay| 5|\n", "| Westerleigh| 12|\n", "+--------------------+-----+\n", "only showing top 10 rows\n" ] } ], "source": [ "least_frequent_zones.orderBy(\"count\", ascending=True).select(\"Zone\", \"count\").show(10)" ] }, { "cell_type": "markdown", "id": "74831b52", "metadata": {}, "source": [ "## Submitting the solutions\n", "\n", "- Form for submitting: https://courses.datatalks.club/de-zoomcamp-2026/homework/hw6\n", "- Deadline: See the website\n", "\n", "\n", "## Learning in Public\n", "\n", "We encourage everyone to share what they learned. This is called \"learning in public\".\n", "\n", "Read more about the benefits [here](https://alexeyondata.substack.com/p/benefits-of-learning-in-public-and).\n", "\n", "### Example post for LinkedIn\n", "\n", "```\n", "🚀 Week 6 of Data Engineering Zoomcamp by @DataTalksClub complete!\n", "\n", "Just finished Module 6 - Batch Processing with Spark. Learned how to:\n", "\n", "✅ Set up PySpark and create Spark sessions\n", "✅ Read and process Parquet files at scale\n", "✅ Repartition data for optimal performance\n", "✅ Analyze millions of taxi trips with DataFrames\n", "✅ Use Spark UI for monitoring jobs\n", "\n", "Processing 4M+ taxi trips with Spark - distributed computing is powerful! 💪\n", "\n", "Here's my homework solution: \n", "\n", "Following along with this amazing free course - who else is learning data engineering?\n", "\n", "You can sign up here: https://github.com/DataTalksClub/data-engineering-zoomcamp/\n", "```\n", "\n", "### Example post for Twitter/X\n", "\n", "```\n", "⚡ Module 6 of Data Engineering Zoomcamp done!\n", "\n", "- Batch processing with Spark 🔥\n", "- PySpark & DataFrames\n", "- Parquet file optimization\n", "- Spark UI on port 4040\n", "\n", "My solution: \n", "\n", "Free course by @DataTalksClub: https://github.com/DataTalksClub/data-engineering-zoomcamp/\n", "```" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.5" } }, "nbformat": 4, "nbformat_minor": 5 }