{
"cells": [
{
"cell_type": "code",
"execution_count": 42,
"id": "d2932cf1-bcb2-4946-936d-836a3c4182e0",
"metadata": {},
"outputs": [],
"source": [
"import pyspark\n",
"from pyspark.sql import SparkSession\n",
"from datetime import datetime as dt"
]
},
{
"cell_type": "markdown",
"id": "d81b727a-d1f0-4f67-a761-c8ceb68e9166",
"metadata": {},
"source": [
"## Question 2 read parquet, repartition into 4 parquet files and write to local directory"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "282689e4-63c9-42d9-9e7b-631d580d70c0",
"metadata": {},
"outputs": [],
"source": [
"spark = SparkSession.builder \\\n",
" .master(\"local[*]\") \\\n",
" .appName('test') \\\n",
" .getOrCreate()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "3556d500-4df7-4120-8bc5-5d0fd2e957d3",
"metadata": {},
"outputs": [],
"source": [
"df = spark.read \\\n",
" .option(\"header\", \"true\") \\\n",
" .parquet('yellow_tripdata_2025-11.parquet')"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "f5f5137d-6d2c-4253-a251-311506c1ac3b",
"metadata": {},
"outputs": [],
"source": [
"## repartition main parquet into 4 parquet files\n",
"df = df.repartition(4)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "aeab7ec9",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" "
]
}
],
"source": [
"## Write repartitioned dataframe to local file folder\n",
" ## will write as 4 parquet files\n",
" ## set subdirectory to retain the repartitioned files\n",
"df.write.parquet('spark_hmwk/2025/11')"
]
},
{
"cell_type": "markdown",
"id": "0294f2e6-b575-4841-8bf7-df90ff7f86f0",
"metadata": {},
"source": [
"## Question 3: Count records\n",
"How many taxi trips were there on the 15th of November?\n",
"\n",
"Consider only trips that started on the 15th of November."
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "10b26628-bee3-4e7f-86c7-b3dd678bf4eb",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"[Stage 13:==========================================> (12 + 4) / 16]"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+\n",
"|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|\n",
"+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+\n",
"| 2| 2025-11-02 08:11:08| 2025-11-02 08:15:21| 1| 1.24| 1| N| 186| 230| 1| 7.9| 0.0| 0.5| 2.53| 0.0| 1.0| 15.18| 2.5| 0.0| 0.75|\n",
"| 2| 2025-11-06 14:01:48| 2025-11-06 14:25:53| 2| 1.84| 1| N| 164| 237| 2| 20.5| 0.0| 0.5| 0.0| 0.0| 1.0| 25.25| 2.5| 0.0| 0.75|\n",
"+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+\n",
"only showing top 2 rows\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" "
]
}
],
"source": [
"df.show(2)"
]
},
{
"cell_type": "code",
"execution_count": 92,
"id": "0bd95479-0cd0-4472-8255-8d7bdf206287",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"
\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" VendorID | \n",
" tpep_pickup_datetime | \n",
" tpep_dropoff_datetime | \n",
" passenger_count | \n",
" trip_distance | \n",
" RatecodeID | \n",
" store_and_fwd_flag | \n",
" PULocationID | \n",
" DOLocationID | \n",
" payment_type | \n",
" fare_amount | \n",
" extra | \n",
" mta_tax | \n",
" tip_amount | \n",
" tolls_amount | \n",
" improvement_surcharge | \n",
" total_amount | \n",
" congestion_surcharge | \n",
" Airport_fee | \n",
" cbd_congestion_fee | \n",
"
\n",
" \n",
" \n",
" \n",
" | 0 | \n",
" 2 | \n",
" 2025-11-07 15:04:17 | \n",
" 2025-11-07 15:39:15 | \n",
" 1 | \n",
" 7.3 | \n",
" 1 | \n",
" N | \n",
" 262 | \n",
" 127 | \n",
" 1 | \n",
" 38.7 | \n",
" 0.0 | \n",
" 0.5 | \n",
" 8.54 | \n",
" 0.0 | \n",
" 1.0 | \n",
" 51.24 | \n",
" 2.5 | \n",
" 0.0 | \n",
" 0.00 | \n",
"
\n",
" \n",
" | 1 | \n",
" 2 | \n",
" 2025-11-08 17:51:05 | \n",
" 2025-11-08 18:02:09 | \n",
" 1 | \n",
" 1.2 | \n",
" 1 | \n",
" N | \n",
" 114 | \n",
" 107 | \n",
" 1 | \n",
" 11.4 | \n",
" 0.0 | \n",
" 0.5 | \n",
" 3.23 | \n",
" 0.0 | \n",
" 1.0 | \n",
" 19.38 | \n",
" 2.5 | \n",
" 0.0 | \n",
" 0.75 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count \\\n",
"0 2 2025-11-07 15:04:17 2025-11-07 15:39:15 1 \n",
"1 2 2025-11-08 17:51:05 2025-11-08 18:02:09 1 \n",
"\n",
" trip_distance RatecodeID store_and_fwd_flag PULocationID DOLocationID \\\n",
"0 7.3 1 N 262 127 \n",
"1 1.2 1 N 114 107 \n",
"\n",
" payment_type fare_amount extra mta_tax tip_amount tolls_amount \\\n",
"0 1 38.7 0.0 0.5 8.54 0.0 \n",
"1 1 11.4 0.0 0.5 3.23 0.0 \n",
"\n",
" improvement_surcharge total_amount congestion_surcharge Airport_fee \\\n",
"0 1.0 51.24 2.5 0.0 \n",
"1 1.0 19.38 2.5 0.0 \n",
"\n",
" cbd_congestion_fee \n",
"0 0.00 \n",
"1 0.75 "
]
},
"execution_count": 92,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.limit(2).toPandas() ## see Spark DataFrame in friendlier Pandas presentation"
]
},
{
"cell_type": "code",
"execution_count": 94,
"id": "b3fa3ffd-3e61-4c3e-9ebf-af36e0c1deef",
"metadata": {},
"outputs": [],
"source": [
"## Spark has to register PySpark DataFrame as a view or table before you can query it with SQL\n",
"df.createOrReplaceTempView(\"df\")\n",
"\n",
"## SELECT Query to add a date column from the timestamp column 'tpep_pickup_datetime'\n",
"df_date = spark.sql(\"\"\"\n",
" SELECT *,\n",
" TO_DATE(tpep_pickup_datetime) AS pickup_date\n",
" FROM df\n",
"\"\"\")\n",
"\n",
"## save queried table as a view to use in downstream queries\n",
"df_date.createOrReplaceTempView(\"df_date\")"
]
},
{
"cell_type": "code",
"execution_count": 95,
"id": "b44ca0de-6e8d-4e21-8924-b5c75c164c42",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" VendorID | \n",
" tpep_pickup_datetime | \n",
" tpep_dropoff_datetime | \n",
" passenger_count | \n",
" trip_distance | \n",
" RatecodeID | \n",
" store_and_fwd_flag | \n",
" PULocationID | \n",
" DOLocationID | \n",
" payment_type | \n",
" ... | \n",
" extra | \n",
" mta_tax | \n",
" tip_amount | \n",
" tolls_amount | \n",
" improvement_surcharge | \n",
" total_amount | \n",
" congestion_surcharge | \n",
" Airport_fee | \n",
" cbd_congestion_fee | \n",
" pickup_date | \n",
"
\n",
" \n",
" \n",
" \n",
" | 0 | \n",
" 2 | \n",
" 2025-11-07 15:04:17 | \n",
" 2025-11-07 15:39:15 | \n",
" 1 | \n",
" 7.3 | \n",
" 1 | \n",
" N | \n",
" 262 | \n",
" 127 | \n",
" 1 | \n",
" ... | \n",
" 0.0 | \n",
" 0.5 | \n",
" 8.54 | \n",
" 0.0 | \n",
" 1.0 | \n",
" 51.24 | \n",
" 2.5 | \n",
" 0.0 | \n",
" 0.0 | \n",
" 2025-11-07 | \n",
"
\n",
" \n",
"
\n",
"
1 rows × 21 columns
\n",
"
"
],
"text/plain": [
" VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count \\\n",
"0 2 2025-11-07 15:04:17 2025-11-07 15:39:15 1 \n",
"\n",
" trip_distance RatecodeID store_and_fwd_flag PULocationID DOLocationID \\\n",
"0 7.3 1 N 262 127 \n",
"\n",
" payment_type ... extra mta_tax tip_amount tolls_amount \\\n",
"0 1 ... 0.0 0.5 8.54 0.0 \n",
"\n",
" improvement_surcharge total_amount congestion_surcharge Airport_fee \\\n",
"0 1.0 51.24 2.5 0.0 \n",
"\n",
" cbd_congestion_fee pickup_date \n",
"0 0.0 2025-11-07 \n",
"\n",
"[1 rows x 21 columns]"
]
},
"execution_count": 95,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df_date.limit(1).toPandas() ## see format of datetime column \"pickup_datetime\""
]
},
{
"cell_type": "code",
"execution_count": 35,
"id": "5bdc6fb2-e293-43ca-99c7-65874e1610ec",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----------+--------+\n",
"|pickup_date|count(1)|\n",
"+-----------+--------+\n",
"| 2025-11-15| 162604|\n",
"+-----------+--------+\n",
"\n"
]
}
],
"source": [
"## filter by date and do groupby\n",
" ## from statement is the View created with the datetime column created and saved\n",
" \n",
"spark.sql(\"\"\"\n",
" SELECT pickup_date,\n",
" COUNT(*)\n",
" FROM df_date\n",
" WHERE pickup_date == '2025-11-15'\n",
" GROUP BY 1\n",
"\"\"\").show()"
]
},
{
"cell_type": "markdown",
"id": "dffabd0c",
"metadata": {},
"source": [
"## Question 4\n",
"\n",
"What is the length of the longest trip in the dataset in hours?"
]
},
{
"cell_type": "code",
"execution_count": 29,
"id": "f05ecf62",
"metadata": {},
"outputs": [],
"source": [
"df = spark.read \\\n",
" .option(\"header\", \"true\") \\\n",
" .parquet('spark_hmwk/2025/11')"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "0de24755-a5ae-4b56-9ab0-a3450775808a",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+\n",
"|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|\n",
"+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+\n",
"| 2| 2025-11-07 15:04:17| 2025-11-07 15:39:15| 1| 7.3| 1| N| 262| 127| 1| 38.7| 0.0| 0.5| 8.54| 0.0| 1.0| 51.24| 2.5| 0.0| 0.0|\n",
"| 2| 2025-11-08 17:51:05| 2025-11-08 18:02:09| 1| 1.2| 1| N| 114| 107| 1| 11.4| 0.0| 0.5| 3.23| 0.0| 1.0| 19.38| 2.5| 0.0| 0.75|\n",
"+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+\n",
"only showing top 2 rows\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" "
]
}
],
"source": [
"df.show(2)"
]
},
{
"cell_type": "markdown",
"id": "12f7a981-d81c-48d5-bfef-469ce48b6ace",
"metadata": {},
"source": [
"### Test Pandas calculations to derive total hours in test dataframe"
]
},
{
"cell_type": "code",
"execution_count": 96,
"id": "f813282d-fa12-41ac-bcaa-2f6b8dedd133",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" VendorID | \n",
" tpep_pickup_datetime | \n",
" tpep_dropoff_datetime | \n",
" passenger_count | \n",
" trip_distance | \n",
" RatecodeID | \n",
" store_and_fwd_flag | \n",
" PULocationID | \n",
" DOLocationID | \n",
" payment_type | \n",
" fare_amount | \n",
" extra | \n",
" mta_tax | \n",
" tip_amount | \n",
" tolls_amount | \n",
" improvement_surcharge | \n",
" total_amount | \n",
" congestion_surcharge | \n",
" Airport_fee | \n",
" cbd_congestion_fee | \n",
"
\n",
" \n",
" \n",
" \n",
" | 0 | \n",
" 2 | \n",
" 2025-11-07 15:04:17 | \n",
" 2025-11-07 15:39:15 | \n",
" 1 | \n",
" 7.30 | \n",
" 1 | \n",
" N | \n",
" 262 | \n",
" 127 | \n",
" 1 | \n",
" 38.7 | \n",
" 0.0 | \n",
" 0.5 | \n",
" 8.54 | \n",
" 0.0 | \n",
" 1.0 | \n",
" 51.24 | \n",
" 2.5 | \n",
" 0.0 | \n",
" 0.00 | \n",
"
\n",
" \n",
" | 1 | \n",
" 2 | \n",
" 2025-11-08 17:51:05 | \n",
" 2025-11-08 18:02:09 | \n",
" 1 | \n",
" 1.20 | \n",
" 1 | \n",
" N | \n",
" 114 | \n",
" 107 | \n",
" 1 | \n",
" 11.4 | \n",
" 0.0 | \n",
" 0.5 | \n",
" 3.23 | \n",
" 0.0 | \n",
" 1.0 | \n",
" 19.38 | \n",
" 2.5 | \n",
" 0.0 | \n",
" 0.75 | \n",
"
\n",
" \n",
" | 2 | \n",
" 7 | \n",
" 2025-11-03 17:34:26 | \n",
" 2025-11-03 17:34:26 | \n",
" 1 | \n",
" 5.06 | \n",
" 1 | \n",
" N | \n",
" 162 | \n",
" 209 | \n",
" 1 | \n",
" 24.7 | \n",
" 0.0 | \n",
" 0.5 | \n",
" 6.39 | \n",
" 0.0 | \n",
" 1.0 | \n",
" 38.34 | \n",
" 2.5 | \n",
" 0.0 | \n",
" 0.75 | \n",
"
\n",
" \n",
" | 3 | \n",
" 2 | \n",
" 2025-11-06 15:03:28 | \n",
" 2025-11-06 15:27:07 | \n",
" 1 | \n",
" 3.33 | \n",
" 1 | \n",
" N | \n",
" 163 | \n",
" 24 | \n",
" 1 | \n",
" 23.3 | \n",
" 0.0 | \n",
" 0.5 | \n",
" 4.00 | \n",
" 0.0 | \n",
" 1.0 | \n",
" 32.05 | \n",
" 2.5 | \n",
" 0.0 | \n",
" 0.75 | \n",
"
\n",
" \n",
" | 4 | \n",
" 2 | \n",
" 2025-11-05 21:06:15 | \n",
" 2025-11-05 21:12:03 | \n",
" 1 | \n",
" 0.90 | \n",
" 1 | \n",
" N | \n",
" 189 | \n",
" 49 | \n",
" 1 | \n",
" 7.2 | \n",
" 1.0 | \n",
" 0.5 | \n",
" 1.94 | \n",
" 0.0 | \n",
" 1.0 | \n",
" 11.64 | \n",
" 0.0 | \n",
" 0.0 | \n",
" 0.00 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count \\\n",
"0 2 2025-11-07 15:04:17 2025-11-07 15:39:15 1 \n",
"1 2 2025-11-08 17:51:05 2025-11-08 18:02:09 1 \n",
"2 7 2025-11-03 17:34:26 2025-11-03 17:34:26 1 \n",
"3 2 2025-11-06 15:03:28 2025-11-06 15:27:07 1 \n",
"4 2 2025-11-05 21:06:15 2025-11-05 21:12:03 1 \n",
"\n",
" trip_distance RatecodeID store_and_fwd_flag PULocationID DOLocationID \\\n",
"0 7.30 1 N 262 127 \n",
"1 1.20 1 N 114 107 \n",
"2 5.06 1 N 162 209 \n",
"3 3.33 1 N 163 24 \n",
"4 0.90 1 N 189 49 \n",
"\n",
" payment_type fare_amount extra mta_tax tip_amount tolls_amount \\\n",
"0 1 38.7 0.0 0.5 8.54 0.0 \n",
"1 1 11.4 0.0 0.5 3.23 0.0 \n",
"2 1 24.7 0.0 0.5 6.39 0.0 \n",
"3 1 23.3 0.0 0.5 4.00 0.0 \n",
"4 1 7.2 1.0 0.5 1.94 0.0 \n",
"\n",
" improvement_surcharge total_amount congestion_surcharge Airport_fee \\\n",
"0 1.0 51.24 2.5 0.0 \n",
"1 1.0 19.38 2.5 0.0 \n",
"2 1.0 38.34 2.5 0.0 \n",
"3 1.0 32.05 2.5 0.0 \n",
"4 1.0 11.64 0.0 0.0 \n",
"\n",
" cbd_congestion_fee \n",
"0 0.00 \n",
"1 0.75 \n",
"2 0.75 \n",
"3 0.75 \n",
"4 0.00 "
]
},
"execution_count": 96,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"## see dataframe in user friendly \"Pandas\" view to plan out the queries to follow\n",
" ## need to see columns and column types that are not as visible in spark.show() ascII presentation\n",
"dfx = df.limit(5).toPandas()\n",
"\n",
"dfx"
]
},
{
"cell_type": "code",
"execution_count": 97,
"id": "f256924a-4d36-459a-98dc-485515c0351b",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"VendorID int32\n",
"tpep_pickup_datetime datetime64[ns]\n",
"tpep_dropoff_datetime datetime64[ns]\n",
"passenger_count int64\n",
"trip_distance float64\n",
"RatecodeID int64\n",
"store_and_fwd_flag str\n",
"PULocationID int32\n",
"DOLocationID int32\n",
"payment_type int64\n",
"fare_amount float64\n",
"extra float64\n",
"mta_tax float64\n",
"tip_amount float64\n",
"tolls_amount float64\n",
"improvement_surcharge float64\n",
"total_amount float64\n",
"congestion_surcharge float64\n",
"Airport_fee float64\n",
"cbd_congestion_fee float64\n",
"dtype: object"
]
},
"execution_count": 97,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"dfx.dtypes ## dropoff and pickup columns are datetime columns"
]
},
{
"cell_type": "code",
"execution_count": 98,
"id": "8df5fa7b-ddf4-4776-a9ef-82c618b1b4fe",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" VendorID | \n",
" tpep_pickup_datetime | \n",
" tpep_dropoff_datetime | \n",
" passenger_count | \n",
" trip_distance | \n",
" RatecodeID | \n",
" store_and_fwd_flag | \n",
" PULocationID | \n",
" DOLocationID | \n",
" payment_type | \n",
" ... | \n",
" improvement_surcharge | \n",
" total_amount | \n",
" congestion_surcharge | \n",
" Airport_fee | \n",
" cbd_congestion_fee | \n",
" timedelta | \n",
" day_hours | \n",
" hours | \n",
" minutes_hours | \n",
" total_hours | \n",
"
\n",
" \n",
" \n",
" \n",
" | 0 | \n",
" 2 | \n",
" 2025-11-07 15:04:17 | \n",
" 2025-11-07 15:39:15 | \n",
" 1 | \n",
" 7.30 | \n",
" 1 | \n",
" N | \n",
" 262 | \n",
" 127 | \n",
" 1 | \n",
" ... | \n",
" 1.0 | \n",
" 51.24 | \n",
" 2.5 | \n",
" 0.0 | \n",
" 0.00 | \n",
" 0 days 00:34:58 | \n",
" 0 | \n",
" 0 | \n",
" 0.566667 | \n",
" 0.566667 | \n",
"
\n",
" \n",
" | 1 | \n",
" 2 | \n",
" 2025-11-08 17:51:05 | \n",
" 2025-11-08 18:02:09 | \n",
" 1 | \n",
" 1.20 | \n",
" 1 | \n",
" N | \n",
" 114 | \n",
" 107 | \n",
" 1 | \n",
" ... | \n",
" 1.0 | \n",
" 19.38 | \n",
" 2.5 | \n",
" 0.0 | \n",
" 0.75 | \n",
" 0 days 00:11:04 | \n",
" 0 | \n",
" 0 | \n",
" 0.183333 | \n",
" 0.183333 | \n",
"
\n",
" \n",
" | 2 | \n",
" 7 | \n",
" 2025-11-03 17:34:26 | \n",
" 2025-11-03 17:34:26 | \n",
" 1 | \n",
" 5.06 | \n",
" 1 | \n",
" N | \n",
" 162 | \n",
" 209 | \n",
" 1 | \n",
" ... | \n",
" 1.0 | \n",
" 38.34 | \n",
" 2.5 | \n",
" 0.0 | \n",
" 0.75 | \n",
" 0 days 00:00:00 | \n",
" 0 | \n",
" 0 | \n",
" 0.000000 | \n",
" 0.000000 | \n",
"
\n",
" \n",
" | 3 | \n",
" 2 | \n",
" 2025-11-06 15:03:28 | \n",
" 2025-11-06 15:27:07 | \n",
" 1 | \n",
" 3.33 | \n",
" 1 | \n",
" N | \n",
" 163 | \n",
" 24 | \n",
" 1 | \n",
" ... | \n",
" 1.0 | \n",
" 32.05 | \n",
" 2.5 | \n",
" 0.0 | \n",
" 0.75 | \n",
" 0 days 00:23:39 | \n",
" 0 | \n",
" 0 | \n",
" 0.383333 | \n",
" 0.383333 | \n",
"
\n",
" \n",
" | 4 | \n",
" 2 | \n",
" 2025-11-05 21:06:15 | \n",
" 2025-11-05 21:12:03 | \n",
" 1 | \n",
" 0.90 | \n",
" 1 | \n",
" N | \n",
" 189 | \n",
" 49 | \n",
" 1 | \n",
" ... | \n",
" 1.0 | \n",
" 11.64 | \n",
" 0.0 | \n",
" 0.0 | \n",
" 0.00 | \n",
" 0 days 00:05:48 | \n",
" 0 | \n",
" 0 | \n",
" 0.083333 | \n",
" 0.083333 | \n",
"
\n",
" \n",
"
\n",
"
5 rows × 25 columns
\n",
"
"
],
"text/plain": [
" VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count \\\n",
"0 2 2025-11-07 15:04:17 2025-11-07 15:39:15 1 \n",
"1 2 2025-11-08 17:51:05 2025-11-08 18:02:09 1 \n",
"2 7 2025-11-03 17:34:26 2025-11-03 17:34:26 1 \n",
"3 2 2025-11-06 15:03:28 2025-11-06 15:27:07 1 \n",
"4 2 2025-11-05 21:06:15 2025-11-05 21:12:03 1 \n",
"\n",
" trip_distance RatecodeID store_and_fwd_flag PULocationID DOLocationID \\\n",
"0 7.30 1 N 262 127 \n",
"1 1.20 1 N 114 107 \n",
"2 5.06 1 N 162 209 \n",
"3 3.33 1 N 163 24 \n",
"4 0.90 1 N 189 49 \n",
"\n",
" payment_type ... improvement_surcharge total_amount \\\n",
"0 1 ... 1.0 51.24 \n",
"1 1 ... 1.0 19.38 \n",
"2 1 ... 1.0 38.34 \n",
"3 1 ... 1.0 32.05 \n",
"4 1 ... 1.0 11.64 \n",
"\n",
" congestion_surcharge Airport_fee cbd_congestion_fee timedelta \\\n",
"0 2.5 0.0 0.00 0 days 00:34:58 \n",
"1 2.5 0.0 0.75 0 days 00:11:04 \n",
"2 2.5 0.0 0.75 0 days 00:00:00 \n",
"3 2.5 0.0 0.75 0 days 00:23:39 \n",
"4 0.0 0.0 0.00 0 days 00:05:48 \n",
"\n",
" day_hours hours minutes_hours total_hours \n",
"0 0 0 0.566667 0.566667 \n",
"1 0 0 0.183333 0.183333 \n",
"2 0 0 0.000000 0.000000 \n",
"3 0 0 0.383333 0.383333 \n",
"4 0 0 0.083333 0.083333 \n",
"\n",
"[5 rows x 25 columns]"
]
},
"execution_count": 98,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"## Calculate hours and minutes by calculating the time delta and then extracting the hours and the minutes and adding them together for a total hours and minutes per trip \n",
"dfx['timedelta'] = dfx['tpep_dropoff_datetime'] - dfx['tpep_pickup_datetime']\n",
"dfx['day_hours'] = dfx['timedelta'].dt.components['days']*24\n",
"dfx['hours'] = dfx['timedelta'].dt.components['hours']\n",
"dfx['minutes_hours'] = dfx['timedelta'].dt.components['minutes']/60\n",
"dfx['total_hours'] = (dfx['day_hours'] + dfx.hours + dfx['minutes_hours'])\n",
"dfx"
]
},
{
"cell_type": "markdown",
"id": "188ff3fb-69d3-4164-9079-3b2adfda0db4",
"metadata": {},
"source": [
"## Process to take will be:\n",
"\n",
"1. register and create a temporary view of Spark Dataframe \"df\"\n",
"2. query the temporary view \"df\" and create time delta column based on dropoff and pickup datetime columns. Order by the time delta column in DESC and save top 2 results into a Pandas Spark DataFrame\n",
"\n",
"3. use datetime package in pandas to create hourly columns as extracted by the time delta column (days in hours, hours, and minutes/hours). Calculate total hours with the newly created hourly columns."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "58c9b191-cf00-4255-9f02-11994f068688",
"metadata": {},
"outputs": [],
"source": [
"## Needed packages for execution of queries and Pandas DataFrame creation\n",
"from datetime import datetime as dt"
]
},
{
"cell_type": "code",
"execution_count": 60,
"id": "9fa964a1-5eaa-4ca6-a455-cf99bcb3b14b",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" "
]
},
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" VendorID | \n",
" tpep_pickup_datetime | \n",
" tpep_dropoff_datetime | \n",
" passenger_count | \n",
" trip_distance | \n",
" RatecodeID | \n",
" store_and_fwd_flag | \n",
" PULocationID | \n",
" DOLocationID | \n",
" payment_type | \n",
" ... | \n",
" improvement_surcharge | \n",
" total_amount | \n",
" congestion_surcharge | \n",
" Airport_fee | \n",
" cbd_congestion_fee | \n",
" timedelta | \n",
" day_hours | \n",
" hours | \n",
" minutes_hours | \n",
" total_hours | \n",
"
\n",
" \n",
" \n",
" \n",
" | 0 | \n",
" 2 | \n",
" 2025-11-26 20:22:12 | \n",
" 2025-11-30 15:01:00 | \n",
" 1 | \n",
" 121.17 | \n",
" 4 | \n",
" N | \n",
" 265 | \n",
" 265 | \n",
" 2 | \n",
" ... | \n",
" 1.0 | \n",
" 889.10 | \n",
" 0.0 | \n",
" 0.0 | \n",
" 0.00 | \n",
" 3 days 18:38:48 | \n",
" 72 | \n",
" 18 | \n",
" 0.633333 | \n",
" 90.633333 | \n",
"
\n",
" \n",
" | 1 | \n",
" 2 | \n",
" 2025-11-27 04:22:41 | \n",
" 2025-11-30 09:19:35 | \n",
" 1 | \n",
" 1.08 | \n",
" 1 | \n",
" N | \n",
" 246 | \n",
" 48 | \n",
" 2 | \n",
" ... | \n",
" 1.0 | \n",
" 13.65 | \n",
" 2.5 | \n",
" 0.0 | \n",
" 0.75 | \n",
" 3 days 04:56:54 | \n",
" 72 | \n",
" 4 | \n",
" 0.933333 | \n",
" 76.933333 | \n",
"
\n",
" \n",
"
\n",
"
2 rows × 25 columns
\n",
"
"
],
"text/plain": [
" VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count \\\n",
"0 2 2025-11-26 20:22:12 2025-11-30 15:01:00 1 \n",
"1 2 2025-11-27 04:22:41 2025-11-30 09:19:35 1 \n",
"\n",
" trip_distance RatecodeID store_and_fwd_flag PULocationID DOLocationID \\\n",
"0 121.17 4 N 265 265 \n",
"1 1.08 1 N 246 48 \n",
"\n",
" payment_type ... improvement_surcharge total_amount \\\n",
"0 2 ... 1.0 889.10 \n",
"1 2 ... 1.0 13.65 \n",
"\n",
" congestion_surcharge Airport_fee cbd_congestion_fee timedelta \\\n",
"0 0.0 0.0 0.00 3 days 18:38:48 \n",
"1 2.5 0.0 0.75 3 days 04:56:54 \n",
"\n",
" day_hours hours minutes_hours total_hours \n",
"0 72 18 0.633333 90.633333 \n",
"1 72 4 0.933333 76.933333 \n",
"\n",
"[2 rows x 25 columns]"
]
},
"execution_count": 60,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"## register and create temporary view of Spark Dataframe \"df\" \n",
"df.createOrReplaceTempView(\"df\")\n",
"\n",
"## Query the temporary view \n",
" ## create timedelta column between dropoff and pickup columns; filter on desc order on 'timedelta'\n",
" ## save top 2 results of largest 'timedelta'\n",
" ## save results into Pandas Spark Dataframe\n",
"df1 = spark.sql(\"\"\"\n",
" SELECT *,\n",
" (tpep_dropoff_datetime - tpep_pickup_datetime) AS timedelta\n",
" FROM df\n",
" ORDER BY timedelta DESC\n",
"\"\"\").limit(2).toPandas()\n",
"\n",
"## Extract datetime data from Panda Spark DataFrame and calculate total hours (days, hours, and minutes/hours)\n",
"df1['day_hours'] = df1['timedelta'].dt.components['days']*24\n",
"df1['hours'] = df1['timedelta'].dt.components['hours']\n",
"df1['minutes_hours'] = df1['timedelta'].dt.components['minutes']/60\n",
"df1['total_hours'] = (df1['day_hours'] + df1.hours + df1['minutes_hours'])\n",
"\n",
"\n",
"df1"
]
},
{
"cell_type": "markdown",
"id": "4c33c41c-5579-4bda-bb85-bff53c8f7b4f",
"metadata": {},
"source": [
"## Question 6\n",
"\n",
"Load the zone lookup data into a temp view in Spark:\n",
"\n",
"Using the zone lookup data and the Yellow November 2025 data, what is the name of the LEAST frequent pickup location Zone?"
]
},
{
"cell_type": "code",
"execution_count": 65,
"id": "70672335-5732-46b9-b4fc-cc88671dd0e9",
"metadata": {},
"outputs": [],
"source": [
"## pyspark packages needed to declare your schema on read\n",
"from pyspark.sql import types"
]
},
{
"cell_type": "code",
"execution_count": 64,
"id": "972a8888-3292-4f04-9680-4e2b1b647996",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" LocationID | \n",
" Borough | \n",
" Zone | \n",
" service_zone | \n",
"
\n",
" \n",
" \n",
" \n",
" | 0 | \n",
" 1 | \n",
" EWR | \n",
" Newark Airport | \n",
" EWR | \n",
"
\n",
" \n",
" | 1 | \n",
" 2 | \n",
" Queens | \n",
" Jamaica Bay | \n",
" Boro Zone | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" LocationID Borough Zone service_zone\n",
"0 1 EWR Newark Airport EWR\n",
"1 2 Queens Jamaica Bay Boro Zone"
]
},
"execution_count": 64,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"## read in csv \n",
" ## NOTE!!! = Spark does not infer schema and will auto-assign String type as Schema unless schema is known like in a parquet file\n",
" ## since this is a csv... schema is not known upon read\n",
"df_zone = spark.read \\\n",
" .option(\"header\", \"true\") \\\n",
" .csv('taxi_zone_lookup.csv')\n",
"\n",
"## see head(2) view of Spark DataFrame as a Pandas DataFrame\n",
" ## need to create structure with Schema to declare in Spark\n",
"df_zone.limit(2).toPandas()"
]
},
{
"cell_type": "code",
"execution_count": 67,
"id": "59bbdd86-70f5-4957-a1d6-11294e97ac43",
"metadata": {},
"outputs": [],
"source": [
"## read data into Spark DataFrame again with declared schema\n",
" ## schema created as structtype in VS Code\n",
"schema = types.StructType([\n",
" types.StructField('LocationID', types.IntegerType(), True),\n",
" types.StructField('Borough', types.StringType(), True),\n",
" types.StructField('Zone', types.StringType(), True),\n",
" types.StructField('service_zone', types.StringType(), True)\n",
"])"
]
},
{
"cell_type": "code",
"execution_count": 83,
"id": "4cb37ab9-f1fc-40e7-87ea-708bcc95028c",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" LocationID | \n",
" Borough | \n",
" Zone | \n",
" service_zone | \n",
"
\n",
" \n",
" \n",
" \n",
" | 0 | \n",
" 1 | \n",
" EWR | \n",
" Newark Airport | \n",
" EWR | \n",
"
\n",
" \n",
" | 1 | \n",
" 2 | \n",
" Queens | \n",
" Jamaica Bay | \n",
" Boro Zone | \n",
"
\n",
" \n",
" | 2 | \n",
" 3 | \n",
" Bronx | \n",
" Allerton/Pelham Gardens | \n",
" Boro Zone | \n",
"
\n",
" \n",
" | 3 | \n",
" 4 | \n",
" Manhattan | \n",
" Alphabet City | \n",
" Yellow Zone | \n",
"
\n",
" \n",
" | 4 | \n",
" 5 | \n",
" Staten Island | \n",
" Arden Heights | \n",
" Boro Zone | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" LocationID Borough Zone service_zone\n",
"0 1 EWR Newark Airport EWR\n",
"1 2 Queens Jamaica Bay Boro Zone\n",
"2 3 Bronx Allerton/Pelham Gardens Boro Zone\n",
"3 4 Manhattan Alphabet City Yellow Zone\n",
"4 5 Staten Island Arden Heights Boro Zone"
]
},
"execution_count": 83,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"##read in csv and declare schema on read\n",
"\n",
"df_zone = spark.read\\\n",
" .option(\"header\", \"true\")\\\n",
" .schema(schema)\\\n",
" .csv('taxi_zone_lookup.csv')\n",
"\n",
"## see sample of Spark DataFrame as a pandas dataframe\n",
"df_zone.limit(5).toPandas()"
]
},
{
"cell_type": "code",
"execution_count": 82,
"id": "d3f2729c-7072-4ed7-8a7b-891f75379bd4",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" VendorID | \n",
" tpep_pickup_datetime | \n",
" tpep_dropoff_datetime | \n",
" passenger_count | \n",
" trip_distance | \n",
" RatecodeID | \n",
" store_and_fwd_flag | \n",
" PULocationID | \n",
" DOLocationID | \n",
" payment_type | \n",
" fare_amount | \n",
" extra | \n",
" mta_tax | \n",
" tip_amount | \n",
" tolls_amount | \n",
" improvement_surcharge | \n",
" total_amount | \n",
" congestion_surcharge | \n",
" Airport_fee | \n",
" cbd_congestion_fee | \n",
"
\n",
" \n",
" \n",
" \n",
" | 0 | \n",
" 2 | \n",
" 2025-11-07 15:04:17 | \n",
" 2025-11-07 15:39:15 | \n",
" 1 | \n",
" 7.30 | \n",
" 1 | \n",
" N | \n",
" 262 | \n",
" 127 | \n",
" 1 | \n",
" 38.7 | \n",
" 0.0 | \n",
" 0.5 | \n",
" 8.54 | \n",
" 0.0 | \n",
" 1.0 | \n",
" 51.24 | \n",
" 2.5 | \n",
" 0.0 | \n",
" 0.00 | \n",
"
\n",
" \n",
" | 1 | \n",
" 2 | \n",
" 2025-11-08 17:51:05 | \n",
" 2025-11-08 18:02:09 | \n",
" 1 | \n",
" 1.20 | \n",
" 1 | \n",
" N | \n",
" 114 | \n",
" 107 | \n",
" 1 | \n",
" 11.4 | \n",
" 0.0 | \n",
" 0.5 | \n",
" 3.23 | \n",
" 0.0 | \n",
" 1.0 | \n",
" 19.38 | \n",
" 2.5 | \n",
" 0.0 | \n",
" 0.75 | \n",
"
\n",
" \n",
" | 2 | \n",
" 7 | \n",
" 2025-11-03 17:34:26 | \n",
" 2025-11-03 17:34:26 | \n",
" 1 | \n",
" 5.06 | \n",
" 1 | \n",
" N | \n",
" 162 | \n",
" 209 | \n",
" 1 | \n",
" 24.7 | \n",
" 0.0 | \n",
" 0.5 | \n",
" 6.39 | \n",
" 0.0 | \n",
" 1.0 | \n",
" 38.34 | \n",
" 2.5 | \n",
" 0.0 | \n",
" 0.75 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count \\\n",
"0 2 2025-11-07 15:04:17 2025-11-07 15:39:15 1 \n",
"1 2 2025-11-08 17:51:05 2025-11-08 18:02:09 1 \n",
"2 7 2025-11-03 17:34:26 2025-11-03 17:34:26 1 \n",
"\n",
" trip_distance RatecodeID store_and_fwd_flag PULocationID DOLocationID \\\n",
"0 7.30 1 N 262 127 \n",
"1 1.20 1 N 114 107 \n",
"2 5.06 1 N 162 209 \n",
"\n",
" payment_type fare_amount extra mta_tax tip_amount tolls_amount \\\n",
"0 1 38.7 0.0 0.5 8.54 0.0 \n",
"1 1 11.4 0.0 0.5 3.23 0.0 \n",
"2 1 24.7 0.0 0.5 6.39 0.0 \n",
"\n",
" improvement_surcharge total_amount congestion_surcharge Airport_fee \\\n",
"0 1.0 51.24 2.5 0.0 \n",
"1 1.0 19.38 2.5 0.0 \n",
"2 1.0 38.34 2.5 0.0 \n",
"\n",
" cbd_congestion_fee \n",
"0 0.00 \n",
"1 0.75 \n",
"2 0.75 "
]
},
"execution_count": 82,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.limit(3).toPandas() ##common column is PULocationID in \"df\" and LocationID in \"df_zone\""
]
},
{
"cell_type": "code",
"execution_count": 76,
"id": "ea37e197-202a-4358-a81f-dd867769e986",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"array([ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13,\n",
" 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26,\n",
" 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39,\n",
" 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52,\n",
" 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65,\n",
" 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78,\n",
" 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91,\n",
" 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104,\n",
" 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117,\n",
" 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130,\n",
" 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143,\n",
" 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156,\n",
" 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169,\n",
" 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182,\n",
" 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195,\n",
" 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208,\n",
" 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221,\n",
" 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234,\n",
" 235, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 247,\n",
" 248, 249, 250, 251, 252, 253, 254, 255, 256, 257, 258, 259, 260,\n",
" 261, 262, 263, 264, 265], dtype=int32)"
]
},
"execution_count": 76,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"## verify df_zone has variety of LocationID\n",
"df_zone.count() ##verify that Spark DataFrame is small\n",
"\n",
"\n",
"##verified.. now see array of all locationID\n",
"\n",
"df_zone.toPandas().LocationID.unique() ##confirmed variety in LocationID b"
]
},
{
"cell_type": "markdown",
"id": "5866d6d1-8d60-4726-bd32-af827da8e144",
"metadata": {},
"source": [
"### Common Columns to JOIN on\n",
"\n",
"1. df_zone (smaller table) has column \"LocationID\"\n",
"\n",
"2. df (larger table) has column \"PULocationID\"\n",
"\n",
"NOTE: will use Pandas API Syntax"
]
},
{
"cell_type": "code",
"execution_count": 78,
"id": "9de69ec2-b397-47ea-bea0-f47613b8b378",
"metadata": {},
"outputs": [],
"source": [
"## Needed pyspark package to broadcast smaller table during join\n",
"from pyspark.sql.functions import broadcast"
]
},
{
"cell_type": "code",
"execution_count": 85,
"id": "b199c022-6357-4f05-a3d1-3a18ff0f3b12",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" VendorID | \n",
" tpep_pickup_datetime | \n",
" tpep_dropoff_datetime | \n",
" passenger_count | \n",
" trip_distance | \n",
" RatecodeID | \n",
" store_and_fwd_flag | \n",
" PULocationID | \n",
" DOLocationID | \n",
" payment_type | \n",
" ... | \n",
" tolls_amount | \n",
" improvement_surcharge | \n",
" total_amount | \n",
" congestion_surcharge | \n",
" Airport_fee | \n",
" cbd_congestion_fee | \n",
" LocationID | \n",
" Borough | \n",
" Zone | \n",
" service_zone | \n",
"
\n",
" \n",
" \n",
" \n",
" | 0 | \n",
" 2 | \n",
" 2025-11-07 15:04:17 | \n",
" 2025-11-07 15:39:15 | \n",
" 1 | \n",
" 7.3 | \n",
" 1 | \n",
" N | \n",
" 262 | \n",
" 127 | \n",
" 1 | \n",
" ... | \n",
" 0.0 | \n",
" 1.0 | \n",
" 51.24 | \n",
" 2.5 | \n",
" 0.0 | \n",
" 0.00 | \n",
" 262 | \n",
" Manhattan | \n",
" Yorkville East | \n",
" Yellow Zone | \n",
"
\n",
" \n",
" | 1 | \n",
" 2 | \n",
" 2025-11-08 17:51:05 | \n",
" 2025-11-08 18:02:09 | \n",
" 1 | \n",
" 1.2 | \n",
" 1 | \n",
" N | \n",
" 114 | \n",
" 107 | \n",
" 1 | \n",
" ... | \n",
" 0.0 | \n",
" 1.0 | \n",
" 19.38 | \n",
" 2.5 | \n",
" 0.0 | \n",
" 0.75 | \n",
" 114 | \n",
" Manhattan | \n",
" Greenwich Village South | \n",
" Yellow Zone | \n",
"
\n",
" \n",
"
\n",
"
2 rows × 24 columns
\n",
"
"
],
"text/plain": [
" VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count \\\n",
"0 2 2025-11-07 15:04:17 2025-11-07 15:39:15 1 \n",
"1 2 2025-11-08 17:51:05 2025-11-08 18:02:09 1 \n",
"\n",
" trip_distance RatecodeID store_and_fwd_flag PULocationID DOLocationID \\\n",
"0 7.3 1 N 262 127 \n",
"1 1.2 1 N 114 107 \n",
"\n",
" payment_type ... tolls_amount improvement_surcharge total_amount \\\n",
"0 1 ... 0.0 1.0 51.24 \n",
"1 1 ... 0.0 1.0 19.38 \n",
"\n",
" congestion_surcharge Airport_fee cbd_congestion_fee LocationID \\\n",
"0 2.5 0.0 0.00 262 \n",
"1 2.5 0.0 0.75 114 \n",
"\n",
" Borough Zone service_zone \n",
"0 Manhattan Yorkville East Yellow Zone \n",
"1 Manhattan Greenwich Village South Yellow Zone \n",
"\n",
"[2 rows x 24 columns]"
]
},
"execution_count": 85,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df_merge = df.join(\n",
" broadcast(df_zone), df.PULocationID == df_zone.LocationID, \"left\"\n",
")\n",
"\n",
"df_merge.limit(2).toPandas() ## see sample in Pandas DataFrame"
]
},
{
"cell_type": "code",
"execution_count": 99,
"id": "e9933704-de59-477e-a1ad-f185d31b8fe7",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" Zone | \n",
" zone_count | \n",
"
\n",
" \n",
" \n",
" \n",
" | 0 | \n",
" Governor's Island/Ellis Island/Liberty Island | \n",
" 1 | \n",
"
\n",
" \n",
" | 1 | \n",
" Arden Heights | \n",
" 1 | \n",
"
\n",
" \n",
" | 2 | \n",
" Eltingville/Annadale/Prince's Bay | \n",
" 1 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" Zone zone_count\n",
"0 Governor's Island/Ellis Island/Liberty Island 1\n",
"1 Arden Heights 1\n",
"2 Eltingville/Annadale/Prince's Bay 1"
]
},
"execution_count": 99,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"## register df_merge Spark DataFrame and then query to find least used zone for pickup\n",
"\n",
"### ANSWER IS IN THIS OUTPUT\n",
"\n",
"df_merge.createOrReplaceTempView(\"df_merge\")\n",
"\n",
"## use Spark SQL \n",
" ## need COUNT(*) on PULocationID column from larger Spark DataFrame\n",
" ## ORDER BY ASCENDING\n",
"spark.sql(\"\"\"\n",
" SELECT Zone,\n",
" COUNT(*) AS zone_count\n",
" FROM df_merge\n",
" GROUP BY 1\n",
" ORDER BY zone_count\n",
"\"\"\").limit(3).toPandas()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.3"
}
},
"nbformat": 4,
"nbformat_minor": 5
}