{ "cells": [ { "cell_type": "markdown", "id": "be77ee3e", "metadata": {}, "source": [ "## Dataset\n", "\n", "For this homework we will be using the Yellow 2025-11 data from the official website:\n", "\n", "```bash\n", "wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-11.parquet" ] }, { "cell_type": "markdown", "id": "07727bd4", "metadata": {}, "source": [ "## Question 1: Install Spark and PySpark\n", "\n", "- Install Spark\n", "- Run PySpark\n", "- Create a local spark session\n", "- Execute spark.version.\n", "\n", "What's the output?" ] }, { "cell_type": "code", "execution_count": 1, "id": "d21d9ec8", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "26/03/07 22:20:23 WARN Utils: Your hostname, Khangs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.37 instead (on interface en0)\n", "26/03/07 22:20:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address\n", "Setting default log level to \"WARN\".\n", "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", "26/03/07 22:20:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n" ] }, { "data": { "text/plain": [ "'3.5.5'" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import pyspark\n", "from pyspark.sql import SparkSession\n", "spark = SparkSession.builder \\\n", " .master(\"local[*]\")\\\n", " .appName(\"HW06\")\\\n", " .getOrCreate()\n", "\n", "spark.version" ] }, { "cell_type": "code", "execution_count": 5, "id": "d133aca8", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+\n", "|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|\n", "+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+\n", "| 7| 2025-11-01 00:13:25| 2025-11-01 00:13:25| 1| 1.68| 1| N| 43| 186| 1| 14.9| 0.0| 0.5| 1.5| 0.0| 1.0| 22.15| 2.5| 0.0| 0.75|\n", "| 2| 2025-11-01 00:49:07| 2025-11-01 01:01:22| 1| 2.28| 1| N| 142| 237| 1| 14.2| 1.0| 0.5| 4.99| 0.0| 1.0| 24.94| 2.5| 0.0| 0.75|\n", "| 1| 2025-11-01 00:07:19| 2025-11-01 00:20:41| 0| 2.7| 1| N| 163| 238| 1| 15.6| 4.25| 0.5| 4.27| 0.0| 1.0| 25.62| 2.5| 0.0| 0.75|\n", "| 2| 2025-11-01 00:00:00| 2025-11-01 01:01:03| 3| 12.87| 1| N| 138| 261| 1| 66.7| 6.0| 0.5| 0.0| 6.94| 1.0| 86.14| 2.5| 1.75| 0.75|\n", "| 1| 2025-11-01 00:18:50| 2025-11-01 00:49:32| 0| 8.4| 1| N| 138| 37| 2| 39.4| 7.75| 0.5| 0.0| 0.0| 1.0| 48.65| 0.0| 1.75| 0.0|\n", "| 2| 2025-11-01 00:21:11| 2025-11-01 00:31:39| 1| 0.85| 1| N| 90| 100| 2| 10.7| 1.0| 0.5| 0.0| 0.0| 1.0| 16.45| 2.5| 0.0| 0.75|\n", "| 2| 2025-11-01 00:07:31| 2025-11-01 00:25:44| 1| 3.01| 1| N| 142| 170| 1| 19.1| 1.0| 0.5| 1.0| 0.0| 1.0| 25.85| 2.5| 0.0| 0.75|\n", "| 2| 2025-11-01 00:46:52| 2025-11-01 01:38:55| 3| 3.82| 1| N| 237| 144| 1| 42.2| 1.0| 0.5| 9.59| 0.0| 1.0| 57.54| 2.5| 0.0| 0.75|\n", "| 2| 2025-11-01 00:56:59| 2025-11-01 01:02:05| 1| 0.89| 1| N| 162| 161| 2| 7.2| 1.0| 0.5| 0.0| 0.0| 1.0| 12.95| 2.5| 0.0| 0.75|\n", "| 2| 2025-11-01 00:10:43| 2025-11-01 00:39:25| 3| 2.28| 1| N| 234| 162| 1| 24.0| 1.0| 0.5| 8.93| 0.0| 1.0| 38.68| 2.5| 0.0| 0.75|\n", "| 1| 2025-11-01 00:00:03| 2025-11-01 00:42:25| 2| 3.3| 1| N| 158| 88| 1| 35.9| 4.25| 0.5| 2.35| 0.0| 1.0| 44.0| 2.5| 0.0| 0.75|\n", "| 1| 2025-11-01 00:43:53| 2025-11-01 00:56:46| 2| 1.5| 1| N| 88| 148| 1| 12.8| 4.25| 0.5| 1.0| 0.0| 1.0| 19.55| 2.5| 0.0| 0.75|\n", "| 1| 2025-11-01 00:58:02| 2025-11-01 01:32:36| 2| 4.7| 1| N| 148| 236| 1| 32.4| 4.25| 0.5| 9.5| 0.0| 1.0| 47.65| 2.5| 0.0| 0.75|\n", "| 2| 2025-11-01 00:52:48| 2025-11-01 01:23:18| 1| 5.61| 1| N| 87| 255| 1| 33.1| 1.0| 0.5| 0.0| 0.0| 1.0| 38.85| 2.5| 0.0| 0.75|\n", "| 1| 2025-11-01 00:05:53| 2025-11-01 00:58:03| 1| 3.9| 1| N| 231| 43| 1| 40.8| 4.25| 0.5| 0.0| 0.0| 1.0| 46.55| 2.5| 0.0| 0.75|\n", "| 2| 2025-11-01 00:13:44| 2025-11-01 00:19:30| 2| 1.14| 1| N| 141| 262| 1| 7.9| 1.0| 0.5| 2.0| 0.0| 1.0| 14.9| 2.5| 0.0| 0.0|\n", "| 1| 2025-11-01 00:03:18| 2025-11-01 00:06:48| 2| 0.6| 1| N| 238| 24| 1| 5.1| 1.0| 0.5| 1.52| 0.0| 1.0| 9.12| 0.0| 0.0| 0.0|\n", "| 1| 2025-11-01 00:19:55| 2025-11-01 00:45:37| 1| 4.3| 1| N| 236| 147| 1| 24.7| 1.0| 0.5| 2.0| 0.0| 1.0| 29.2| 0.0| 0.0| 0.0|\n", "| 1| 2025-11-01 00:45:55| 2025-11-01 01:11:30| 1| 3.0| 1| N| 231| 137| 1| 24.0| 4.25| 0.5| 3.0| 0.0| 1.0| 32.75| 2.5| 0.0| 0.75|\n", "| 2| 2025-11-01 00:11:12| 2025-11-01 00:15:02| 1| 0.69| 1| N| 237| 237| 2| 6.5| 1.0| 0.5| 0.0| 0.0| 1.0| 11.5| 2.5| 0.0| 0.0|\n", "+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "sf = spark.read.parquet('yellow_tripdata_2025-11.parquet')\n", "sf.show()" ] }, { "cell_type": "code", "execution_count": 13, "id": "68b202b0", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- VendorID: integer (nullable = true)\n", " |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)\n", " |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)\n", " |-- passenger_count: long (nullable = true)\n", " |-- trip_distance: double (nullable = true)\n", " |-- RatecodeID: long (nullable = true)\n", " |-- store_and_fwd_flag: string (nullable = true)\n", " |-- PULocationID: integer (nullable = true)\n", " |-- DOLocationID: integer (nullable = true)\n", " |-- payment_type: long (nullable = true)\n", " |-- fare_amount: double (nullable = true)\n", " |-- extra: double (nullable = true)\n", " |-- mta_tax: double (nullable = true)\n", " |-- tip_amount: double (nullable = true)\n", " |-- tolls_amount: double (nullable = true)\n", " |-- improvement_surcharge: double (nullable = true)\n", " |-- total_amount: double (nullable = true)\n", " |-- congestion_surcharge: double (nullable = true)\n", " |-- Airport_fee: double (nullable = true)\n", " |-- cbd_congestion_fee: double (nullable = true)\n", "\n" ] } ], "source": [ "sf.printSchema()" ] }, { "cell_type": "markdown", "id": "d8fc7586", "metadata": {}, "source": [ "## Question 2: Yellow November 2025\n", "\n", "Read the November 2025 Yellow into a Spark Dataframe.\n", "\n", "Repartition the Dataframe to 4 partitions and save it to parquet.\n", "\n", "What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.\n", "\n", "- 6MB\n", "- 25MB\n", "- 75MB\n", "- 100MB" ] }, { "cell_type": "code", "execution_count": null, "id": "5003486c", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "sf.repartition(4).write.parquet('file_repartitioned',mode=\"overwrite\")" ] }, { "cell_type": "code", "execution_count": 8, "id": "dcaf7642", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Number of Parquet files: 4\n", "Average size: 24.42 MB\n" ] } ], "source": [ "import os\n", "\n", "folder_path = 'file_repartitioned'\n", "\n", "# Get all .parquet files inside the directory\n", "parquet_files = [f for f in os.listdir(folder_path) if f.endswith('.parquet')]\n", "\n", "total_size_bytes = sum(os.path.getsize(os.path.join(folder_path, f)) for f in parquet_files)\n", "num_files = len(parquet_files)\n", "\n", "if num_files > 0:\n", " avg_size_mb = (total_size_bytes / num_files) / (1024 * 1024)\n", " print(f\"Number of Parquet files: {num_files}\")\n", " print(f\"Average size: {avg_size_mb:.2f} MB\")\n", "else:\n", " print(\"No Parquet files found.\")\n" ] }, { "cell_type": "markdown", "id": "1cd802d5", "metadata": {}, "source": [ "## Question 3: Count records\n", "\n", "How many taxi trips were there on the 15th of November?\n", "\n", "Consider only trips that started on the 15th of November.\n", "\n", "- 62,610\n", "- 102,340\n", "- 162,604\n", "- 225,768" ] }, { "cell_type": "code", "execution_count": 25, "id": "c9cd1d46", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "162604" ] }, "execution_count": 25, "metadata": {}, "output_type": "execute_result" } ], "source": [ "sf.filter((sf['tpep_pickup_datetime'] >= '2025-11-15') & (sf['tpep_pickup_datetime'] < '2025-11-16')).count()" ] }, { "cell_type": "markdown", "id": "d0e96e9c", "metadata": {}, "source": [ "## Question 4: Longest trip\n", "\n", "What is the length of the longest trip in the dataset in hours?\n", "\n", "- 22.7\n", "- 58.2\n", "- 90.6\n", "- 134.5" ] }, { "cell_type": "code", "execution_count": 39, "id": "1e67174c", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+---------------------+--------------+\n", "|tpep_pickup_datetime|tpep_dropoff_datetime|duration_hours|\n", "+--------------------+---------------------+--------------+\n", "| 2025-11-26 20:22:12| 2025-11-30 15:01:00| 90.6|\n", "| 2025-11-27 04:22:41| 2025-11-30 09:19:35| 76.9|\n", "| 2025-11-03 10:42:55| 2025-11-06 14:55:45| 76.2|\n", "| 2025-11-07 11:23:22| 2025-11-10 08:40:41| 69.3|\n", "| 2025-11-18 17:12:47| 2025-11-21 12:17:37| 67.1|\n", "+--------------------+---------------------+--------------+\n", "only showing top 5 rows\n", "\n" ] } ], "source": [ "from pyspark.sql import functions as F\n", "\n", "sf = sf.withColumn(\n", " \"duration_hours\",\n", " F.round( \n", " (F.unix_timestamp(\"tpep_dropoff_datetime\") - \n", " F.unix_timestamp(\"tpep_pickup_datetime\")) / 3600,1))\n", "sf.select(['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'duration_hours'])\\\n", " .orderBy('duration_hours',ascending=False).show(5)" ] }, { "cell_type": "markdown", "id": "b118e9ff", "metadata": {}, "source": [ "## Question 5: User Interface\n", "\n", "Spark's User Interface which shows the application's dashboard runs on which local port?\n", "\n", "- 80\n", "- 443\n", "- 4040\n", "- 8080" ] }, { "cell_type": "code", "execution_count": 40, "id": "d5d65fce", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'http://192.168.1.37:4040'" ] }, "execution_count": 40, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.sparkContext.uiWebUrl" ] }, { "cell_type": "markdown", "id": "66fcbbf0", "metadata": {}, "source": [ "# Question 6: Least frequent pickup location zone\n", "\n", "Load the zone lookup data into a temp view in Spark:\n", "\n", "```bash\n", "wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv\n", "```\n", "\n", "Using the zone lookup data and the Yellow November 2025 data, what is the name of the LEAST frequent pickup location Zone?\n", "\n", "- Governor's Island/Ellis Island/Liberty Island\n", "- Arden Heights\n", "- Rikers Island\n", "- Jamaica Bay" ] }, { "cell_type": "code", "execution_count": 50, "id": "9b2b513d", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+-------------+--------------------+------------+\n", "|LocationID| Borough| Zone|service_zone|\n", "+----------+-------------+--------------------+------------+\n", "| 1| EWR| Newark Airport| EWR|\n", "| 2| Queens| Jamaica Bay| Boro Zone|\n", "| 3| Bronx|Allerton/Pelham G...| Boro Zone|\n", "| 4| Manhattan| Alphabet City| Yellow Zone|\n", "| 5|Staten Island| Arden Heights| Boro Zone|\n", "+----------+-------------+--------------------+------------+\n", "only showing top 5 rows\n", "\n" ] } ], "source": [ "taxi = spark.read.csv('taxi_zone_lookup.csv', header=True, inferSchema=True)\n", "taxi.createOrReplaceTempView('taxi_zone_lookup')\n", "taxi.show(5)" ] }, { "cell_type": "code", "execution_count": 54, "id": "f929d1c8", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+------------+\n", "| Zone|count_pickup|\n", "+--------------------+------------+\n", "|Governor's Island...| 1|\n", "|Eltingville/Annad...| 1|\n", "| Arden Heights| 1|\n", "| Port Richmond| 3|\n", "| Rikers Island| 4|\n", "+--------------------+------------+\n", "only showing top 5 rows\n", "\n" ] } ], "source": [ "location = sf.select('PULocationID')\n", "location.join(taxi, location['PULocationID'] == taxi['LocationID'])\\\n", " .groupby('Zone').count()\\\n", " .withColumnRenamed('count', 'count_pickup')\\\n", " .orderBy('count_pickup').show(5)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.5" } }, "nbformat": 4, "nbformat_minor": 5 }