{ "cells": [ { "cell_type": "code", "execution_count": 1, "id": "16047145", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "/bin/python\n", "/usr/lib/jvm/java-21-openjdk\n" ] } ], "source": [ "import sys, os\n", "\n", "os.environ[\"JAVA_HOME\"] = \"/usr/lib/jvm/java-21-openjdk\"\n", "os.environ[\"PATH\"] = os.environ[\"JAVA_HOME\"] + \"/bin:\" + os.environ[\"PATH\"]\n", "print(sys.executable)\n", "print(os.environ.get(\"JAVA_HOME\"))" ] }, { "cell_type": "code", "execution_count": 2, "id": "00bc6543", "metadata": {}, "outputs": [], "source": [ "import pyspark\n", "from pyspark.sql import SparkSession\n", "from pyspark.sql import types" ] }, { "cell_type": "code", "execution_count": 3, "id": "cd4a0f3d", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "WARNING: Using incubator modules: jdk.incubator.vector\n", "Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties\n", "Setting default log level to \"WARN\".\n", "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", "26/03/10 00:15:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n" ] } ], "source": [ "spark = SparkSession.builder \\\n", " .master(\"local[*]\") \\\n", " .appName('test') \\\n", " .getOrCreate()" ] }, { "cell_type": "code", "execution_count": 4, "id": "eb3e4c36", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'4.1.1'" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.version" ] }, { "cell_type": "code", "execution_count": 5, "id": "5236cebd", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "-rw-r--r--. 1 miko miko 68M 12-19 16:51 yellow_tripdata_2025-11.parquet\n" ] } ], "source": [ "!ls -lh yellow_tripdata_2025-11.parquet" ] }, { "cell_type": "code", "execution_count": 6, "id": "0a3399a3", "metadata": {}, "outputs": [], "source": [ "schema = types.StructType([\n", " types.StructField('VendorID', types.IntegerType(), True),\n", " types.StructField('tpep_pickup_datetime', types.TimestampType(), True),\n", " types.StructField('tpep_dropoff_datetime', types.TimestampType(), True),\n", " types.StructField('passenger_count', types.IntegerType(), True),\n", " types.StructField('trip_distance', types.DoubleType(), True),\n", " types.StructField('RatecodeID', types.IntegerType(), True),\n", " types.StructField('store_and_fwd_flag', types.StringType(), True),\n", " types.StructField('PULocationID', types.IntegerType(), True),\n", " types.StructField('DOLocationID', types.IntegerType(), True),\n", " types.StructField('payment_type', types.IntegerType(), True),\n", " types.StructField('fare_amount', types.DoubleType(), True),\n", " types.StructField('extra', types.DoubleType(), True),\n", " types.StructField('mta_tax', types.DoubleType(), True),\n", " types.StructField('tip_amount', types.DoubleType(), True),\n", " types.StructField('tolls_amount', types.DoubleType(), True),\n", " types.StructField('improvement_surcharge', types.DoubleType(), True),\n", " types.StructField('total_amount', types.DoubleType(), True),\n", " types.StructField('congestion_surcharge', types.DoubleType(), True)\n", "])" ] }, { "cell_type": "code", "execution_count": 9, "id": "68bc8b72", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "df = spark.read.parquet('yellow_tripdata_2025-11.parquet')\n", "\n", "df.repartition(4).write.parquet('homework_pq', mode='overwrite')" ] }, { "cell_type": "code", "execution_count": null, "id": "58989b55", "metadata": {}, "outputs": [], "source": [ "df = spark.read.parquet('homework_pq')" ] }, { "cell_type": "markdown", "id": "48b01d2f", "metadata": {}, "source": [ "**Q3**: How many taxi trips were there on February 15?" ] }, { "cell_type": "code", "execution_count": 11, "id": "f7489aea", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import functions as F" ] }, { "cell_type": "code", "execution_count": 13, "id": "6c2500fd", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "162604" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df \\\n", " .withColumn('tpep_pickup_datetime', F.to_date(df.tpep_pickup_datetime)) \\\n", " .filter(\"tpep_pickup_datetime = '2025-11-15'\") \\\n", " .count()" ] }, { "cell_type": "code", "execution_count": 15, "id": "dd7ae60d", "metadata": {}, "outputs": [], "source": [ "df.createOrReplaceTempView('yellow_2025_11')" ] }, { "cell_type": "code", "execution_count": 16, "id": "6d47c147", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+\n", "|count(1)|\n", "+--------+\n", "| 162604|\n", "+--------+\n", "\n" ] } ], "source": [ "spark.sql(\"\"\"\n", "SELECT\n", " COUNT(1)\n", "FROM \n", " yellow_2025_11\n", "WHERE\n", " to_date(tpep_pickup_datetime) = '2025-11-15';\n", "\"\"\").show()" ] }, { "cell_type": "markdown", "id": "ae3f533b", "metadata": {}, "source": [ "**Q4**: What is the length of the longest trip in the dataset in hours?" ] }, { "cell_type": "code", "execution_count": 19, "id": "279d9161", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----------------+\n", "|max(duration_hrs)|\n", "+-----------------+\n", "|90.64666666666666|\n", "+-----------------+\n", "\n" ] } ], "source": [ "df.withColumn('duration_hrs', \n", " (F.unix_timestamp('tpep_dropoff_datetime') - F.unix_timestamp('tpep_pickup_datetime')) / 3600) \\\n", " .select(F.max('duration_hrs')) \\\n", " .show()" ] }, { "cell_type": "markdown", "id": "0d10173a", "metadata": {}, "source": [ "**Q6**: Least frequent pickup location zone" ] }, { "cell_type": "code", "execution_count": 23, "id": "74b7f664", "metadata": {}, "outputs": [], "source": [ "df_zones = spark.read.option(\"header\", \"true\").csv('taxi_zone_lookup.csv')" ] }, { "cell_type": "code", "execution_count": 24, "id": "81642d3b", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['LocationID', 'Borough', 'Zone', 'service_zone']" ] }, "execution_count": 24, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df_zones.columns" ] }, { "cell_type": "code", "execution_count": 25, "id": "4f460dda", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['VendorID',\n", " 'tpep_pickup_datetime',\n", " 'tpep_dropoff_datetime',\n", " 'passenger_count',\n", " 'trip_distance',\n", " 'RatecodeID',\n", " 'store_and_fwd_flag',\n", " 'PULocationID',\n", " 'DOLocationID',\n", " 'payment_type',\n", " 'fare_amount',\n", " 'extra',\n", " 'mta_tax',\n", " 'tip_amount',\n", " 'tolls_amount',\n", " 'improvement_surcharge',\n", " 'total_amount',\n", " 'congestion_surcharge',\n", " 'Airport_fee',\n", " 'cbd_congestion_fee']" ] }, "execution_count": 25, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.columns" ] }, { "cell_type": "code", "execution_count": 26, "id": "ad8f0101", "metadata": {}, "outputs": [], "source": [ "df_zones.createOrReplaceTempView('zones')" ] }, { "cell_type": "code", "execution_count": 28, "id": "6f738414", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+--------+\n", "| Zone|count(1)|\n", "+--------------------+--------+\n", "|Governor's Island...| 1|\n", "|Eltingville/Annad...| 1|\n", "| Arden Heights| 1|\n", "| Port Richmond| 3|\n", "| Rikers Island| 4|\n", "+--------------------+--------+\n", "\n" ] } ], "source": [ "spark.sql(\"\"\"\n", "SELECT\n", " pul.Zone,\n", " COUNT(1)\n", "FROM \n", " yellow_2025_11 y LEFT JOIN zones pul ON y.PULocationID = pul.LocationID\n", "GROUP BY \n", " 1\n", "ORDER BY\n", " 2 ASC\n", "LIMIT 5;\n", "\"\"\").show()" ] }, { "cell_type": "code", "execution_count": null, "id": "e4b754d1", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.14.3" } }, "nbformat": 4, "nbformat_minor": 5 }