{ "cells": [ { "cell_type": "markdown", "id": "3eab3ca4", "metadata": {}, "source": [ "## Module 6 Homework\n", "In this homework we'll put what we learned about Spark in practice.\n", "\n", "For this homework we will be using the Yellow 2025-11 data from the official website:" ] }, { "cell_type": "markdown", "id": "f99b1c0a", "metadata": {}, "source": [ "## Question 1: Install Spark and PySpark\n", "- 4.1.1" ] }, { "cell_type": "code", "execution_count": 2, "id": "d08db467", "metadata": {}, "outputs": [], "source": [ "import pyspark\n", "from pyspark.sql import SparkSession\n", "from pyspark.sql import types" ] }, { "cell_type": "code", "execution_count": 5, "id": "dda6c87e", "metadata": {}, "outputs": [], "source": [ "spark = SparkSession.builder \\\n", " .master(\"local[*]\") \\\n", " .appName('test') \\\n", " .getOrCreate()" ] }, { "cell_type": "code", "execution_count": 6, "id": "4f08bcca", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'4.1.1'" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.version" ] }, { "cell_type": "code", "execution_count": 7, "id": "abcbb7ee", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--2026-03-02 15:02:49-- https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-11.parquet\n", "Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 2600:9000:2684:1800:b:20a5:b140:21, 2600:9000:2684:6800:b:20a5:b140:21, 2600:9000:2684:fe00:b:20a5:b140:21, ...\n", "Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|2600:9000:2684:1800:b:20a5:b140:21|:443... connected.\n", "HTTP request sent, awaiting response... 200 OK\n", "Length: 71134255 (68M) [binary/octet-stream]\n", "Saving to: ‘yellow_tripdata_2025-11.parquet’\n", "\n", "yellow_tripdata_202 100%[===================>] 67.84M 22.1MB/s in 3.1s \n", "\n", "2026-03-02 15:02:52 (22.1 MB/s) - ‘yellow_tripdata_2025-11.parquet’ saved [71134255/71134255]\n", "\n" ] } ], "source": [ "!wget -nc https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-11.parquet" ] }, { "cell_type": "code", "execution_count": null, "id": "8bb4e27e", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+\n", "|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|\n", "+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+\n", "| 7| 2025-11-01 00:13:25| 2025-11-01 00:13:25| 1| 1.68| 1| N| 43| 186| 1| 14.9| 0.0| 0.5| 1.5| 0.0| 1.0| 22.15| 2.5| 0.0| 0.75|\n", "| 2| 2025-11-01 00:49:07| 2025-11-01 01:01:22| 1| 2.28| 1| N| 142| 237| 1| 14.2| 1.0| 0.5| 4.99| 0.0| 1.0| 24.94| 2.5| 0.0| 0.75|\n", "| 1| 2025-11-01 00:07:19| 2025-11-01 00:20:41| 0| 2.7| 1| N| 163| 238| 1| 15.6| 4.25| 0.5| 4.27| 0.0| 1.0| 25.62| 2.5| 0.0| 0.75|\n", "| 2| 2025-11-01 00:00:00| 2025-11-01 01:01:03| 3| 12.87| 1| N| 138| 261| 1| 66.7| 6.0| 0.5| 0.0| 6.94| 1.0| 86.14| 2.5| 1.75| 0.75|\n", "| 1| 2025-11-01 00:18:50| 2025-11-01 00:49:32| 0| 8.4| 1| N| 138| 37| 2| 39.4| 7.75| 0.5| 0.0| 0.0| 1.0| 48.65| 0.0| 1.75| 0.0|\n", "+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+\n", "only showing top 5 rows\n" ] } ], "source": [ "df = spark.read.parquet(\"yellow_tripdata_2025-11.parquet\")\n", "df.show(5) # check first 5 rows" ] }, { "cell_type": "markdown", "id": "d23b15cf", "metadata": {}, "source": [ "## Question 2: Yellow November 2025\n", "Read the November 2025 Yellow into a Spark Dataframe.\n", "\n", "Repartition the Dataframe to 4 partitions and save it to parquet.\n", "\n", "What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.\n", "\n", "\n", "- 25MB\n", "\n" ] }, { "cell_type": "code", "execution_count": 9, "id": "3e096c60", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "df_repart = df.repartition(4)\n", "\n", "output_path = \"yellow_nov2025_repart\"\n", "df_repart.write.mode(\"overwrite\").parquet(output_path)" ] }, { "cell_type": "code", "execution_count": 10, "id": "05670f02", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Average Parquet file size: 25.33 MB\n" ] } ], "source": [ "import os\n", "\n", "folder = output_path\n", "parquet_files = [f for f in os.listdir(folder) if f.endswith(\".parquet\")]\n", "\n", "sizes = [os.path.getsize(os.path.join(folder, f)) for f in parquet_files] # bytes\n", "avg_size_mb = sum(sizes) / len(sizes) / (1024*1024)\n", "print(f\"Average Parquet file size: {avg_size_mb:.2f} MB\")" ] }, { "cell_type": "markdown", "id": "35dcb410", "metadata": {}, "source": [ "## Question 3: Count records\n", "How many taxi trips were there on the 15th of November?\n", "\n", "Consider only trips that started on the 15th of November.\n", "\n", "\n", "- 162,604\n" ] }, { "cell_type": "code", "execution_count": 11, "id": "52052cf2", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "[Stage 5:> (0 + 2) / 2]\r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Number of trips on 15th November: 162604\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "from pyspark.sql.functions import col, to_date\n", "\n", "# Extract date only and filter\n", "df_15 = df.filter(to_date(col(\"tpep_pickup_datetime\")) == \"2025-11-15\")\n", "\n", "# Count the records\n", "trip_count = df_15.count()\n", "print(f\"Number of trips on 15th November: {trip_count}\")" ] }, { "cell_type": "markdown", "id": "b3a8a7ba", "metadata": {}, "source": [ "## Question 4: Longest trip\n", "What is the length of the longest trip in the dataset in hours?\n", "\n", "- 90.6\n" ] }, { "cell_type": "code", "execution_count": 14, "id": "044861e6", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "[Stage 8:> (0 + 2) / 2]\r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Longest trip duration: 90.65 hours\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "from pyspark.sql.functions import col, unix_timestamp, max as spark_max\n", "\n", "# Make sure your DataFrame is loaded\n", "# df = spark.read.parquet(\"yellow_tripdata_2025-11.parquet\")\n", "\n", "# Optional: filter out invalid trips with dropoff before pickup\n", "df_valid = df.filter(col(\"tpep_dropoff_datetime\") > col(\"tpep_pickup_datetime\"))\n", "\n", "# Compute duration in hours\n", "df_with_duration = df_valid.withColumn(\n", " \"duration_hours\",\n", " (unix_timestamp(col(\"tpep_dropoff_datetime\")) - unix_timestamp(col(\"tpep_pickup_datetime\"))) / 3600\n", ")\n", "\n", "# Get the maximum duration\n", "max_duration = df_with_duration.agg(spark_max(\"duration_hours\")).collect()[0][0]\n", "print(f\"Longest trip duration: {max_duration:.2f} hours\")" ] }, { "cell_type": "markdown", "id": "02b431df", "metadata": {}, "source": [ "## Question 5: User Interface\n", "Spark's User Interface which shows the application's dashboard runs on which local port?\n", "\n", "\n", "- 4040\n" ] }, { "cell_type": "code", "execution_count": null, "id": "89a28cd7", "metadata": {}, "outputs": [], "source": [ "# Print the Spark UI Web URL\n", "print(spark.sparkContext.uiWebUrl)" ] }, { "cell_type": "markdown", "id": "30261f67", "metadata": {}, "source": [ "http://localhost:4040/jobs/" ] }, { "cell_type": "markdown", "id": "90b1f751", "metadata": {}, "source": [ "## Question 6: Least frequent pickup location zone\n", "Load the zone lookup data into a temp view in Spark:\n", "\n", "wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv\n", "Using the zone lookup data and the Yellow November 2025 data, what is the name of the LEAST frequent pickup location Zone?\n", "\n", "- Governor's Island/Ellis Island/Liberty Island\n" ] }, { "cell_type": "code", "execution_count": 18, "id": "1bede491", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--2026-03-02 15:18:43-- https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv\n", "Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 2600:9000:2684:400:b:20a5:b140:21, 2600:9000:2684:ce00:b:20a5:b140:21, 2600:9000:2684:1400:b:20a5:b140:21, ...\n", "Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|2600:9000:2684:400:b:20a5:b140:21|:443... connected.\n", "HTTP request sent, awaiting response... 200 OK\n", "Length: 12331 (12K) [text/csv]\n", "Saving to: ‘taxi_zone_lookup.csv’\n", "\n", "taxi_zone_lookup.cs 100%[===================>] 12.04K --.-KB/s in 0s \n", "\n", "2026-03-02 15:18:43 (33.1 MB/s) - ‘taxi_zone_lookup.csv’ saved [12331/12331]\n", "\n" ] } ], "source": [ "!wget -nc https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv" ] }, { "cell_type": "code", "execution_count": 19, "id": "fc48c106", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "from pyspark.sql import SparkSession\n", "from pyspark.sql.functions import col, count\n", "\n", "spark = SparkSession.builder \\\n", " .appName(\"YellowNov2025\") \\\n", " .master(\"local[*]\") \\\n", " .getOrCreate()\n", "\n", "spark.sparkContext.setLogLevel(\"ERROR\")\n", "\n", "# Load zones CSV\n", "zones_df = spark.read.csv(\"taxi_zone_lookup.csv\", header=True, inferSchema=True)\n", "\n", "# Create a temporary view (optional for SQL queries)\n", "zones_df.createOrReplaceTempView(\"zones\")" ] }, { "cell_type": "code", "execution_count": 20, "id": "3a29afe4", "metadata": {}, "outputs": [], "source": [ "# Count trips per pickup location\n", "pickup_counts = df.groupBy(\"PULocationID\").agg(count(\"*\").alias(\"trip_count\"))\n", "\n", "# Join with zones to get zone names\n", "pickup_with_zones = pickup_counts.join(\n", " zones_df,\n", " pickup_counts.PULocationID == zones_df.LocationID,\n", " how=\"left\"\n", ")\n", "\n", "# Select relevant columns\n", "pickup_with_zones = pickup_with_zones.select(\n", " \"Zone\", \"Borough\", \"trip_count\"\n", ")" ] }, { "cell_type": "code", "execution_count": 21, "id": "4bf49ffa", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "[Stage 14:==========================================================(2 + 0) / 2]\r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Least frequent pickup zone: Governor's Island/Ellis Island/Liberty Island with 1 trips\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "least_pickup = pickup_with_zones.orderBy(col(\"trip_count\").asc()).first()\n", "print(f\"Least frequent pickup zone: {least_pickup['Zone']} with {least_pickup['trip_count']} trips\")" ] } ], "metadata": { "kernelspec": { "display_name": "week6 (3.13.3)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.13.3" } }, "nbformat": 4, "nbformat_minor": 5 }