{ "cells": [ { "cell_type": "markdown", "id": "132939b8", "metadata": {}, "source": [ "# Module 6 Homework\n", "\n", "In this homework we'll put what we learned about Spark in practice.\n", "\n", "For this homework we will be using the Yellow 2025-11 data from the official website:\n" ] }, { "cell_type": "code", "execution_count": 1, "id": "7bab5c23", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--2026-03-09 03:29:36-- https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-11.parquet\n", "Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 52.85.39.65, 52.85.39.97, 52.85.39.117, ...\n", "Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|52.85.39.65|:443... connected.\n", "HTTP request sent, awaiting response... 200 OK\n", "Length: 71134255 (68M) [binary/octet-stream]\n", "Saving to: ‘data/yellow_tripdata_2025-11.parquet.4’\n", "\n", "yellow_tripdata_202 100%[===================>] 67.84M 174MB/s in 0.4s \n", "\n", "2026-03-09 03:29:37 (174 MB/s) - ‘data/yellow_tripdata_2025-11.parquet.4’ saved [71134255/71134255]\n", "\n" ] } ], "source": [ "!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-11.parquet -P data/" ] }, { "cell_type": "markdown", "id": "e66f1f52", "metadata": {}, "source": [ "## Question 1: Install Spark and PySpark\n", "\n", "- Install Spark\n", "- Run PySpark\n", "- Create a local spark session\n", "- Execute spark.version.\n", "\n", "What's the output?" ] }, { "cell_type": "code", "execution_count": 2, "id": "d57f0998", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession" ] }, { "cell_type": "code", "execution_count": 3, "id": "7bf6ab8c", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "WARNING: Using incubator modules: jdk.incubator.vector\n", "Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties\n", "26/03/09 03:29:41 WARN Utils: Your hostname, codespaces-2b816e, resolves to a loopback address: 127.0.0.1; using 10.0.10.155 instead (on interface eth0)\n", "26/03/09 03:29:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address\n", "Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties\n", "Setting default log level to \"WARN\".\n", "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", "26/03/09 03:29:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n" ] } ], "source": [ "spark = SparkSession.builder \\\n", " .appName(\"LocalTest\") \\\n", " .master(\"local[*]\") \\\n", " .getOrCreate()" ] }, { "cell_type": "code", "execution_count": 4, "id": "ea2aa6e2", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Spark Version: 4.1.1\n" ] } ], "source": [ "# Print the Spark version\n", "print(f\"Spark Version: {spark.version}\")" ] }, { "cell_type": "markdown", "id": "d9c564e2", "metadata": {}, "source": [ "## Question 2: Yellow November 2025\n", "\n", "Read the November 2025 Yellow into a Spark Dataframe.\n", "\n", "Repartition the Dataframe to 4 partitions and save it to parquet.\n", "\n", "What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.\n", "\n", "- 6MB\n", "- 25MB\n", "- 75MB\n", "- **100MB**" ] }, { "cell_type": "code", "execution_count": 9, "id": "65909b35", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "[Stage 15:===========================================> (3 + 1) / 4]\r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Done! Check the folder: data/yellow_tripdata_2025_11_repartitioned\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "from pyspark.sql import SparkSession\n", "import os\n", "\n", "# Initialize Spark\n", "spark = SparkSession.builder \\\n", " .appName(\"YellowTaxiNov2025\") \\\n", " .master(\"local[*]\") \\\n", " .config(\"spark.driver.extraJavaOptions\", \n", " \"-Djava.security.manager=allow \" +\n", " \"--add-opens=java.base/java.util=ALL-UNNAMED \" +\n", " \"--add-opens=java.base/java.lang=ALL-UNNAMED \" +\n", " \"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED\") \\\n", " .getOrCreate()\n", "\n", "# Path to your downloaded file\n", "input_path = \"data/raw/yellow_tripdata_2025-11.parquet\"\n", "\n", "# 1. Read the Parquet file\n", "df = spark.read.parquet(input_path)\n", "\n", "# 2. Repartition to 4\n", "# This forces Spark to redistribute the data into 4 chunks\n", "df_repartitioned = df.repartition(4)\n", "\n", "# 3. Save to a new parquet folder\n", "output_path = \"data/yellow_tripdata_2025_11_repartitioned\"\n", "df_repartitioned.write.mode(\"overwrite\").parquet(output_path)\n", "\n", "print(f\"Done! Check the folder: {output_path}\")" ] }, { "cell_type": "markdown", "id": "7a0b26b4", "metadata": {}, "source": [ "## Question 3: Count records\n", "\n", "How many taxi trips were there on the 15th of November?\n", "\n", "Consider only trips that started on the 15th of November.\n", "\n", "- 62,610\n", "- 102,340\n", "- **162,604**\n", "- 225,768" ] }, { "cell_type": "code", "execution_count": 12, "id": "110da659", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Question 3 Answer: There were 162604 trips on November 15, 2025.\n" ] } ], "source": [ "from pyspark.sql import functions as F\n", "\n", "# 1. Filter the DataFrame\n", "\n", "trips_on_15th = df.filter(F.to_date(df.tpep_pickup_datetime) == \"2025-11-15\")\n", "# 2. Execute the count\n", "result = trips_on_15th.count()\n", "\n", "print(f\"Question 3 Answer: There were {result} trips on November 15, 2025.\")" ] }, { "cell_type": "markdown", "id": "c86ac9c8", "metadata": {}, "source": [ "## Question 4: Longest trip\n", "\n", "What is the length of the longest trip in the dataset in hours?\n", "\n", "- 22.7\n", "- 58.2\n", "- **90.6**\n", "- 134.5" ] }, { "cell_type": "code", "execution_count": 13, "id": "3d0a8e1e", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "[Stage 25:> (0 + 2) / 2]\r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "The longest trip duration is: 90.6 hours\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "from pyspark.sql import functions as F\n", "\n", "# 1. Calculate the duration in seconds and convert to hours\n", "# We use unix_timestamp to get the epoch seconds for each column\n", "df_with_duration = df.withColumn(\n", " \"duration_hours\", \n", " (F.unix_timestamp(\"tpep_dropoff_datetime\") - F.unix_timestamp(\"tpep_pickup_datetime\")) / 3600\n", ")\n", "\n", "# 2. Find the maximum value in that new column\n", "max_duration = df_with_duration.select(F.max(\"duration_hours\")).collect()[0][0]\n", "\n", "print(f\"The longest trip duration is: {max_duration:.1f} hours\")" ] }, { "cell_type": "markdown", "id": "637a9be2", "metadata": {}, "source": [ "## Question 5: User Interface\n", "\n", "Spark's User Interface which shows the application's dashboard runs on which local port?\n", "\n", "- 80\n", "- 443\n", "- **4040**\n", "- 8080" ] }, { "cell_type": "markdown", "id": "f7e9d41d", "metadata": {}, "source": [ "## Question 6: Least frequent pickup location zone\n", "\n", "Load the zone lookup data into a temp view in Spark:\n", "\n", "```bash\n", "wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv\n", "```\n", "\n", "Using the zone lookup data and the Yellow November 2025 data, what is the name of the LEAST frequent pickup location Zone?\n", "\n", "- **Governor's Island/Ellis Island/Liberty Island**\n", "- Arden Heights\n", "- Rikers Island\n", "- Jamaica Bay" ] }, { "cell_type": "code", "execution_count": 15, "id": "c9d76292", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--2026-03-09 03:45:42-- https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv\n", "Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... " ] }, { "name": "stdout", "output_type": "stream", "text": [ "52.85.39.153, 52.85.39.97, 52.85.39.65, ...\n", "Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|52.85.39.153|:443... connected.\n", "HTTP request sent, awaiting response... 200 OK\n", "Length: 12331 (12K) [text/csv]\n", "Saving to: ‘data/raw/taxi_zone_lookup.csv’\n", "\n", "taxi_zone_lookup.cs 100%[===================>] 12.04K --.-KB/s in 0s \n", "\n", "2026-03-09 03:45:43 (121 MB/s) - ‘data/raw/taxi_zone_lookup.csv’ saved [12331/12331]\n", "\n" ] } ], "source": [ "!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv -P data/raw/" ] }, { "cell_type": "code", "execution_count": 16, "id": "3c26a634", "metadata": {}, "outputs": [], "source": [ "# Load the Zone Lookup CSV\n", "df_zones = spark.read \\\n", " .option(\"header\", \"true\") \\\n", " .option(\"inferSchema\", \"true\") \\\n", " .csv(\"data/raw/taxi_zone_lookup.csv\")\n", "\n", "# Create the temporary view\n", "df_zones.createOrReplaceTempView(\"zones\")" ] }, { "cell_type": "code", "execution_count": 18, "id": "f78b5167", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "[Stage 31:> (0 + 2) / 2]\r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "+---------------------------------------------+----------+\n", "|Zone |trip_count|\n", "+---------------------------------------------+----------+\n", "|Governor's Island/Ellis Island/Liberty Island|1 |\n", "+---------------------------------------------+----------+\n", "\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "df.createOrReplaceTempView(\"yellow_data\")\n", "\n", "least_frequent_zone = spark.sql(\"\"\"\n", " SELECT \n", " z.Zone, \n", " COUNT(*) as trip_count\n", " FROM \n", " yellow_data y\n", " JOIN \n", " zones z ON y.PULocationID = z.LocationID\n", " GROUP BY \n", " z.Zone\n", " ORDER BY \n", " trip_count ASC\n", " LIMIT 1\n", "\"\"\")\n", "\n", "least_frequent_zone.show(truncate=False)" ] } ], "metadata": { "kernelspec": { "display_name": "06-batch (3.12.1)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.1" } }, "nbformat": 4, "nbformat_minor": 5 }