{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Manipulating data in PySpark\n", "> In this chapter, you'll learn about the pyspark.sql module, which provides optimized data queries to your Spark session. This is the Summary of lecture \"Introduction to PySpark\", via datacamp.\n", "\n", "- toc: true \n", "- badges: true\n", "- comments: true\n", "- author: Chanseok Kang\n", "- categories: [Python, Datacamp, PySpark]\n", "- image: images/spark.png " ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import pyspark\n", "import numpy as np\n", "import pandas as pd" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Creating columns\n", "In this chapter, you'll learn how to use the methods defined by Spark's `DataFrame` class to perform common data operations.\n", "\n", "Let's look at performing column-wise operations. In Spark you can do this using the `.withColumn()` method, which takes two arguments. First, a string with the name of your new column, and second the new column itself.\n", "\n", "The new column must be an object of class `Column`. Creating one of these is as easy as extracting a column from your DataFrame using `df.colName`.\n", "\n", "Updating a Spark DataFrame is somewhat different than working in pandas because the Spark DataFrame is immutable. This means that it can't be changed, and so columns can't be updated in place.\n", "\n", "Thus, all these methods return a new DataFrame. To overwrite the original DataFrame you must reassign the returned DataFrame using the method like so:\n", "\n", "```\n", "df = df.withColumn(\"newCol\", df.oldCol + 1)\n", "```\n", "\n", "The above code creates a DataFrame with the same columns as df plus a new column, `newCol`, where every entry is equal to the corresponding entry from `oldCol`, plus one.\n", "\n", "To overwrite an existing column, just pass the name of the column as the first argument!" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "\n", "spark = (SparkSession\n", " .builder\n", " .appName(\"flights\")\n", " .getOrCreate())\n", "\n", "# Path to data set\n", "csv_file = \"./dataset/flights_small.csv\"\n", "\n", "# Read and create a temporary view\n", "# Infer schema (note that for larger files you \n", "# may want to specify the schema)\n", "flights = (spark.read.format(\"csv\")\n", " .option(\"inferSchema\", \"true\")\n", " .option(\"header\", \"true\")\n", " .load(csv_file))\n", "flights.createOrReplaceTempView(\"flights\")" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[Table(name='flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]\n" ] } ], "source": [ "# Print the tables in the catalog\n", "print(spark.catalog.listTables())" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+\n", "|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|\n", "+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+\n", "|2014| 12| 8| 658| -7| 935| -5| VX| N846VA| 1780| SEA| LAX| 132| 954| 6| 58|\n", "|2014| 1| 22| 1040| 5| 1505| 5| AS| N559AS| 851| SEA| HNL| 360| 2677| 10| 40|\n", "|2014| 3| 9| 1443| -2| 1652| 2| VX| N847VA| 755| SEA| SFO| 111| 679| 14| 43|\n", "|2014| 4| 9| 1705| 45| 1839| 34| WN| N360SW| 344| PDX| SJC| 83| 569| 17| 5|\n", "|2014| 3| 9| 754| -1| 1015| 1| AS| N612AS| 522| SEA| BUR| 127| 937| 7| 54|\n", "|2014| 1| 15| 1037| 7| 1352| 2| WN| N646SW| 48| PDX| DEN| 121| 991| 10| 37|\n", "|2014| 7| 2| 847| 42| 1041| 51| WN| N422WN| 1520| PDX| OAK| 90| 543| 8| 47|\n", "|2014| 5| 12| 1655| -5| 1842| -18| VX| N361VA| 755| SEA| SFO| 98| 679| 16| 55|\n", "|2014| 4| 19| 1236| -4| 1508| -7| AS| N309AS| 490| SEA| SAN| 135| 1050| 12| 36|\n", "|2014| 11| 19| 1812| -3| 2352| -4| AS| N564AS| 26| SEA| ORD| 198| 1721| 18| 12|\n", "|2014| 11| 8| 1653| -2| 1924| -1| AS| N323AS| 448| SEA| LAX| 130| 954| 16| 53|\n", "|2014| 8| 3| 1120| 0| 1415| 2| AS| N305AS| 656| SEA| PHX| 154| 1107| 11| 20|\n", "|2014| 10| 30| 811| 21| 1038| 29| AS| N433AS| 608| SEA| LAS| 127| 867| 8| 11|\n", "|2014| 11| 12| 2346| -4| 217| -28| AS| N765AS| 121| SEA| ANC| 183| 1448| 23| 46|\n", "|2014| 10| 31| 1314| 89| 1544| 111| AS| N713AS| 306| SEA| SFO| 129| 679| 13| 14|\n", "|2014| 1| 29| 2009| 3| 2159| 9| UA| N27205| 1458| PDX| SFO| 90| 550| 20| 9|\n", "|2014| 12| 17| 2015| 50| 2150| 41| AS| N626AS| 368| SEA| SMF| 76| 605| 20| 15|\n", "|2014| 8| 11| 1017| -3| 1613| -7| WN| N8634A| 827| SEA| MDW| 216| 1733| 10| 17|\n", "|2014| 1| 13| 2156| -9| 607| -15| AS| N597AS| 24| SEA| BOS| 290| 2496| 21| 56|\n", "|2014| 6| 5| 1733| -12| 1945| -10| OO| N215AG| 3488| PDX| BUR| 111| 817| 17| 33|\n", "+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "# Create the DataFrame flights\n", "flights = spark.table('flights')\n", "\n", "# Show the head\n", "flights.show()\n", "\n", "# Add duration_hrs\n", "flights = flights.withColumn('duration_hrs', flights.air_time / 60)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## SQL in a nutshell\n", "As you move forward, it will help to have a basic understanding of SQL. \n", "\n", "A SQL query returns a table derived from one or more tables contained in a database.\n", "\n", "Every SQL query is made up of commands that tell the database what you want to do with the data. The two commands that every query has to contain are `SELECT` and `FROM`.\n", "\n", "The `SELECT` command is followed by the columns you want in the resulting table.\n", "\n", "The `FROM` command is followed by the name of the table that contains those columns. The minimal SQL query is:\n", "```sql\n", "SELECT * FROM my_table;\n", "```\n", "The `*` selects all columns, so this returns the entire table named `my_table`.\n", "\n", "Similar to .withColumn(), you can do column-wise computations within a SELECT statement. For example,\n", "```sql\n", "SELECT origin, dest, air_time / 60 FROM flights;\n", "```\n", "returns a table with the origin, destination, and duration in hours for each flight.\n", "\n", "Another commonly used command is `WHERE`. This command filters the rows of the table based on some logical condition you specify. The resulting table contains the rows where your condition is true. For example, if you had a table of students and grades you could do:\n", "```sql\n", "SELECT * FROM students\n", "WHERE grade = 'A';\n", "```\n", "to select all the columns and the rows containing information about students who got As." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "So to extract the table of tail numbers and destimations for flights that lasted more than 10 hours is\n", "```sql\n", "SELECT dest, tail_num FROM flights WHERE air_time > 600;\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## SQL in a nutshell (2)\n", "Another common database task is aggregation. That is, reducing your data by breaking it into chunks and summarizing each chunk.\n", "\n", "This is done in SQL using the `GROUP BY` command. This command breaks your data into groups and applies a function from your `SELECT` statement to each group.\n", "\n", "For example, if you wanted to count the number of flights from each of two origin destinations, you could use the query\n", "```sql\n", "SELECT COUNT(*) FROM flights\n", "GROUP BY origin;\n", "```\n", "\n", "`GROUP BY origin` tells SQL that you want the output to have a row for each unique value of the `origin` column. The `SELECT` statement selects the values you want to populate each of the columns. Here, we want to `COUNT()` every row in each of the groups.\n", "\n", "It's possible to `GROUP BY` more than one column. When you do this, the resulting table has a row for every combination of the unique values in each column. The following query counts the number of flights from SEA and PDX to every destination airport:\n", "```sql\n", "SELECT origin, dest, COUNT(*) FROM flights\n", "GROUP BY origin, dest;\n", "```\n", "The output will have a row for every combination of the values in `origin` and `dest` (i.e. a row listing each origin and destination that a flight flew to). There will also be a column with the `COUNT()` of all the rows in each group." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Filtering Data\n", "Now that you have a bit of SQL know-how under your belt, it's easier to talk about the analogous operations using Spark DataFrames.\n", "\n", "Let's take a look at the `.filter()` method. As you might suspect, this is the Spark counterpart of SQL's WHERE clause. The `.filter()` method takes either an expression that would follow the `WHERE` clause of a SQL expression as a string, or a Spark Column of boolean (True/False) values.\n", "\n", "For example, the following two expressions will produce the same output:\n", "```python\n", "flights.filter(\"air_time > 120\").show()\n", "flights.filter(flights.air_time > 120).show()\n", "```\n", "Notice that in the first case, we pass a string to `.filter()`. In SQL, we would write this filtering task as `SELECT * FROM flights WHERE air_time > 120`. Spark's `.filter()` can accept any expression that could go in the `WHERE` clause of a SQL query (in this case, `\"air_time > 120\"`), as long as it is passed as a string. Notice that in this case, we do not reference the name of the table in the string -- as we wouldn't in the SQL request.\n", "\n", "In the second case, we actually pass a column of boolean values to `.filter()`. Remember that `flights.air_time > 120` returns a column of boolean values that has `True` in place of those records in `flights.air_time` that are over 120, and `False` otherwise." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+\n", "|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute| duration_hrs|\n", "+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+\n", "|2014| 1| 22| 1040| 5| 1505| 5| AS| N559AS| 851| SEA| HNL| 360| 2677| 10| 40| 6.0|\n", "|2014| 4| 19| 1236| -4| 1508| -7| AS| N309AS| 490| SEA| SAN| 135| 1050| 12| 36| 2.25|\n", "|2014| 11| 19| 1812| -3| 2352| -4| AS| N564AS| 26| SEA| ORD| 198| 1721| 18| 12| 3.3|\n", "|2014| 8| 3| 1120| 0| 1415| 2| AS| N305AS| 656| SEA| PHX| 154| 1107| 11| 20| 2.566666666666667|\n", "|2014| 11| 12| 2346| -4| 217| -28| AS| N765AS| 121| SEA| ANC| 183| 1448| 23| 46| 3.05|\n", "|2014| 8| 11| 1017| -3| 1613| -7| WN| N8634A| 827| SEA| MDW| 216| 1733| 10| 17| 3.6|\n", "|2014| 1| 13| 2156| -9| 607| -15| AS| N597AS| 24| SEA| BOS| 290| 2496| 21| 56| 4.833333333333333|\n", "|2014| 9| 26| 610| -5| 1523| 65| US| N127UW| 616| SEA| PHL| 293| 2378| 6| 10| 4.883333333333334|\n", "|2014| 12| 4| 954| -6| 1348| -17| HA| N395HA| 29| SEA| OGG| 333| 2640| 9| 54| 5.55|\n", "|2014| 6| 4| 1115| 0| 1346| -3| AS| N461AS| 488| SEA| SAN| 133| 1050| 11| 15| 2.216666666666667|\n", "|2014| 6| 26| 2054| -1| 2318| -6| B6| N590JB| 907| SEA| ANC| 179| 1448| 20| 54|2.9833333333333334|\n", "|2014| 6| 7| 1823| -7| 2112| -28| AS| N512AS| 815| SEA| LIH| 335| 2701| 18| 23| 5.583333333333333|\n", "|2014| 4| 30| 801| 1| 1757| 90| AS| N407AS| 18| SEA| MCO| 342| 2554| 8| 1| 5.7|\n", "|2014| 11| 29| 905| 155| 1655| 170| DL| N824DN| 1598| SEA| ATL| 229| 2182| 9| 5| 3.816666666666667|\n", "|2014| 6| 2| 2222| 7| 55| 15| AS| N402AS| 99| SEA| ANC| 190| 1448| 22| 22|3.1666666666666665|\n", "|2014| 11| 15| 1034| -6| 1414| -26| AS| N589AS| 794| SEA| ABQ| 139| 1180| 10| 34| 2.316666666666667|\n", "|2014| 10| 20| 1328| -1| 1949| 4| UA| N68805| 1212| SEA| IAH| 228| 1874| 13| 28| 3.8|\n", "|2014| 12| 16| 1500| 0| 1906| 19| US| N662AW| 500| SEA| PHX| 151| 1107| 15| 0|2.5166666666666666|\n", "|2014| 11| 19| 1319| -6| 1821| -14| DL| N309US| 2164| PDX| MSP| 169| 1426| 13| 19| 2.816666666666667|\n", "|2014| 5| 21| 515| 0| 757| 0| US| N172US| 593| SEA| PHX| 143| 1107| 5| 15|2.3833333333333333|\n", "+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+\n", "only showing top 20 rows\n", "\n", "+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+\n", "|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute| duration_hrs|\n", "+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+\n", "|2014| 1| 22| 1040| 5| 1505| 5| AS| N559AS| 851| SEA| HNL| 360| 2677| 10| 40| 6.0|\n", "|2014| 4| 19| 1236| -4| 1508| -7| AS| N309AS| 490| SEA| SAN| 135| 1050| 12| 36| 2.25|\n", "|2014| 11| 19| 1812| -3| 2352| -4| AS| N564AS| 26| SEA| ORD| 198| 1721| 18| 12| 3.3|\n", "|2014| 8| 3| 1120| 0| 1415| 2| AS| N305AS| 656| SEA| PHX| 154| 1107| 11| 20| 2.566666666666667|\n", "|2014| 11| 12| 2346| -4| 217| -28| AS| N765AS| 121| SEA| ANC| 183| 1448| 23| 46| 3.05|\n", "|2014| 8| 11| 1017| -3| 1613| -7| WN| N8634A| 827| SEA| MDW| 216| 1733| 10| 17| 3.6|\n", "|2014| 1| 13| 2156| -9| 607| -15| AS| N597AS| 24| SEA| BOS| 290| 2496| 21| 56| 4.833333333333333|\n", "|2014| 9| 26| 610| -5| 1523| 65| US| N127UW| 616| SEA| PHL| 293| 2378| 6| 10| 4.883333333333334|\n", "|2014| 12| 4| 954| -6| 1348| -17| HA| N395HA| 29| SEA| OGG| 333| 2640| 9| 54| 5.55|\n", "|2014| 6| 4| 1115| 0| 1346| -3| AS| N461AS| 488| SEA| SAN| 133| 1050| 11| 15| 2.216666666666667|\n", "|2014| 6| 26| 2054| -1| 2318| -6| B6| N590JB| 907| SEA| ANC| 179| 1448| 20| 54|2.9833333333333334|\n", "|2014| 6| 7| 1823| -7| 2112| -28| AS| N512AS| 815| SEA| LIH| 335| 2701| 18| 23| 5.583333333333333|\n", "|2014| 4| 30| 801| 1| 1757| 90| AS| N407AS| 18| SEA| MCO| 342| 2554| 8| 1| 5.7|\n", "|2014| 11| 29| 905| 155| 1655| 170| DL| N824DN| 1598| SEA| ATL| 229| 2182| 9| 5| 3.816666666666667|\n", "|2014| 6| 2| 2222| 7| 55| 15| AS| N402AS| 99| SEA| ANC| 190| 1448| 22| 22|3.1666666666666665|\n", "|2014| 11| 15| 1034| -6| 1414| -26| AS| N589AS| 794| SEA| ABQ| 139| 1180| 10| 34| 2.316666666666667|\n", "|2014| 10| 20| 1328| -1| 1949| 4| UA| N68805| 1212| SEA| IAH| 228| 1874| 13| 28| 3.8|\n", "|2014| 12| 16| 1500| 0| 1906| 19| US| N662AW| 500| SEA| PHX| 151| 1107| 15| 0|2.5166666666666666|\n", "|2014| 11| 19| 1319| -6| 1821| -14| DL| N309US| 2164| PDX| MSP| 169| 1426| 13| 19| 2.816666666666667|\n", "|2014| 5| 21| 515| 0| 757| 0| US| N172US| 593| SEA| PHX| 143| 1107| 5| 15|2.3833333333333333|\n", "+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "# Filter flights by passing a string\n", "long_flights1 = flights.filter(\"distance > 1000\")\n", "\n", "# Filter flights by passing a column of boolean values\n", "long_flights2 = flights.filter(flights.distance > 1000)\n", "\n", "# Print the data to check they're equal\n", "long_flights1.show()\n", "long_flights2.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Selecting\n", "The Spark variant of SQL's `SELECT` is the `.select()` method. This method takes multiple arguments - one for each column you want to select. These arguments can either be the column name as a string (one for each column) or a column object (using the df.colName syntax). When you pass a column object, you can perform operations like addition or subtraction on the column to change the data contained in it, much like inside `.withColumn()`.\n", "\n", "The difference between `.select()` and `.withColumn()` methods is that `.select()` returns only the columns you specify, while `.withColumn()` returns all the columns of the DataFrame in addition to the one you defined. It's often a good idea to drop columns you don't need at the beginning of an operation so that you're not dragging around extra data as you're wrangling. In this case, you would use `.select()` and not `.withColumn()`." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "# Select the first set of columns\n", "selected1 = flights.select(\"tailnum\", \"origin\", \"dest\")\n", "\n", "# Select the second set of columns\n", "temp = flights.select(flights.origin, flights.dest, flights.carrier)\n", "\n", "# Define first filter\n", "filterA = flights.origin == \"SEA\"\n", "\n", "# Define second filter\n", "filterB = flights.dest == \"PDX\"\n", "\n", "# Filter the data, first by filterA then by filterB\n", "selected2 = temp.filter(filterA).filter(filterB)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Selecting II\n", "Similar to SQL, you can also use the `.select()` method to perform column-wise operations. When you're selecting a column using the `df.colName` notation, you can perform any column operation and the `.select()` method will return the transformed column. For example,\n", "\n", "```python\n", "flights.select(flights.air_time/60)\n", "```\n", "\n", "returns a column of flight durations in hours instead of minutes. You can also use the `.alias()` method to rename a column you're selecting. So if you wanted to `.select()` the column `duration_hrs` (which isn't in your DataFrame) you could do\n", "\n", "```python\n", "flights.select((flights.air_time/60).alias(\"duration_hrs\"))\n", "```\n", "\n", "The equivalent Spark DataFrame method `.selectExpr()` takes SQL expressions as a string:\n", "```python\n", "flights.selectExpr(\"air_time/60 as duration_hrs\")\n", "```\n", "with the SQL as keyword being equivalent to the `.alias()` method. To select multiple columns, you can pass multiple strings." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "# Define avg_speed\n", "avg_speed = (flights.distance / (flights.air_time / 60)).alias(\"avg_speed\")\n", "\n", "# Select the correct columns\n", "speed1 = flights.select(\"origin\", \"dest\", \"tailnum\", avg_speed)\n", "\n", "# Create the same table using a SQL expression\n", "speed2 = flights.selectExpr(\"origin\", \"dest\", \"tailnum\", \"distance / (air_time/60) as avg_speed\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Aggregating\n", "All of the common aggregation methods, like `.min()`, `.max()`, and `.count()` are `GroupedData` methods. These are created by calling the `.groupBy()` DataFrame method. You'll learn exactly what that means in a few exercises. For now, all you have to do to use these functions is call that method on your DataFrame. For example, to find the minimum value of a column, `col`, in a DataFrame, `df`, you could do\n", "```python\n", "df.groupBy().min(\"col\").show()\n", "```\n", "This creates a `GroupedData` object (so you can use the .min() method), then finds the minimum value in `col`, and returns it as a DataFrame.\n", "\n", "Now you're ready to do some aggregating of your own!" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "> Note: It requires to type cast in air_time from string to interger" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- year: integer (nullable = true)\n", " |-- month: integer (nullable = true)\n", " |-- day: integer (nullable = true)\n", " |-- dep_time: string (nullable = true)\n", " |-- dep_delay: string (nullable = true)\n", " |-- arr_time: string (nullable = true)\n", " |-- arr_delay: string (nullable = true)\n", " |-- carrier: string (nullable = true)\n", " |-- tailnum: string (nullable = true)\n", " |-- flight: integer (nullable = true)\n", " |-- origin: string (nullable = true)\n", " |-- dest: string (nullable = true)\n", " |-- air_time: string (nullable = true)\n", " |-- distance: integer (nullable = true)\n", " |-- hour: string (nullable = true)\n", " |-- minute: string (nullable = true)\n", " |-- duration_hrs: double (nullable = true)\n", "\n" ] } ], "source": [ "flights.printSchema()" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.types import IntegerType\n", "flights = flights.withColumn(\"air_time\", flights[\"air_time\"].cast(IntegerType()))\n", "flights = flights.withColumn(\"dep_delay\", flights[\"dep_delay\"].cast(IntegerType()))" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------------+\n", "|min(distance)|\n", "+-------------+\n", "| 106|\n", "+-------------+\n", "\n", "+-------------+\n", "|max(air_time)|\n", "+-------------+\n", "| 409|\n", "+-------------+\n", "\n" ] } ], "source": [ "# Find the shortest flight from PDX in terms of distance\n", "flights.filter(flights.origin == 'PDX').groupBy().min('distance').show()\n", "\n", "# Find the longest flight from SEA in terms of air time\n", "flights.filter(flights.origin == 'SEA').groupBy().max('air_time').show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Aggregating II\n", "To get you familiar with more of the built in aggregation methods, here's a few more exercises involving the `flights` table!" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------------------+\n", "| avg(air_time)|\n", "+------------------+\n", "|188.20689655172413|\n", "+------------------+\n", "\n", "+------------------+\n", "| sum(duration_hrs)|\n", "+------------------+\n", "|25289.600000000126|\n", "+------------------+\n", "\n" ] } ], "source": [ "# Average duration of Delta flights\n", "flights.filter(flights.carrier == 'DL').filter(flights.origin == 'SEA').groupBy().avg('air_time').show()\n", "\n", "# Total hours in the air\n", "flights.withColumn('duration_hrs', flights.air_time / 60).groupBy().sum('duration_hrs').show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Grouping and Aggregating I\n", "Part of what makes aggregating so powerful is the addition of groups. PySpark has a whole class devoted to grouped data frames: `pyspark.sql.GroupedData`, which you saw in the last two exercises.\n", "\n", "You've learned how to create a grouped DataFrame by calling the `.groupBy()` method on a DataFrame with no arguments.\n", "\n", "Now you'll see that when you pass the name of one or more columns in your DataFrame to the `.groupBy()` method, the aggregation methods behave like when you use a `GROUP BY` statement in a SQL query!" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+-----+\n", "|tailnum|count|\n", "+-------+-----+\n", "| N442AS| 38|\n", "| N102UW| 2|\n", "| N36472| 4|\n", "| N38451| 4|\n", "| N73283| 4|\n", "| N513UA| 2|\n", "| N954WN| 5|\n", "| N388DA| 3|\n", "| N567AA| 1|\n", "| N516UA| 2|\n", "| N927DN| 1|\n", "| N8322X| 1|\n", "| N466SW| 1|\n", "| N6700| 1|\n", "| N607AS| 45|\n", "| N622SW| 4|\n", "| N584AS| 31|\n", "| N914WN| 4|\n", "| N654AW| 2|\n", "| N336NW| 1|\n", "+-------+-----+\n", "only showing top 20 rows\n", "\n", "+------+------------------+\n", "|origin| avg(air_time)|\n", "+------+------------------+\n", "| SEA| 160.4361496051259|\n", "| PDX|137.11543248288737|\n", "+------+------------------+\n", "\n" ] } ], "source": [ "# Group by tailnum\n", "by_plane = flights.groupBy('tailnum')\n", "\n", "# Number of flights each plane made\n", "by_plane.count().show()\n", "\n", "# Group by origin\n", "by_origin = flights.groupBy(\"origin\")\n", "\n", "# Average duration of flights from PDX and SEA\n", "by_origin.avg('air_time').show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Grouping and Aggregating II\n", "In addition to the `GroupedData` methods you've already seen, there is also the `.agg()` method. This method lets you pass an aggregate column expression that uses any of the aggregate functions from the `pyspark.sql.functions` submodule.\n", "\n", "This submodule contains many useful functions for computing things like standard deviations. All the aggregation functions in this submodule take the name of a column in a `GroupedData` table." ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+----+-------------------+\n", "|month|dest| avg(dep_delay)|\n", "+-----+----+-------------------+\n", "| 4| PHX| 1.6833333333333333|\n", "| 1| RDM| -1.625|\n", "| 5| ONT| 3.5555555555555554|\n", "| 7| OMA| -6.5|\n", "| 8| MDW| 7.45|\n", "| 6| DEN| 5.418181818181818|\n", "| 5| IAD| -4.0|\n", "| 12| COS| -1.0|\n", "| 11| ANC| 7.529411764705882|\n", "| 5| AUS| -0.75|\n", "| 5| COS| 11.666666666666666|\n", "| 2| PSP| 0.6|\n", "| 4| ORD|0.14285714285714285|\n", "| 10| DFW| 18.176470588235293|\n", "| 10| DCA| -1.5|\n", "| 8| JNU| 18.125|\n", "| 11| KOA| -1.0|\n", "| 10| OMA|-0.6666666666666666|\n", "| 6| ONT| 9.625|\n", "| 3| MSP| 3.2|\n", "+-----+----+-------------------+\n", "only showing top 20 rows\n", "\n", "+-----+----+----------------------+\n", "|month|dest|stddev_samp(dep_delay)|\n", "+-----+----+----------------------+\n", "| 4| PHX| 15.003380033491737|\n", "| 1| RDM| 8.830749846821778|\n", "| 5| ONT| 18.895178691342874|\n", "| 7| OMA| 2.1213203435596424|\n", "| 8| MDW| 14.467659032985843|\n", "| 6| DEN| 13.536905534420026|\n", "| 5| IAD| 3.8078865529319543|\n", "| 12| COS| 1.4142135623730951|\n", "| 11| ANC| 18.604716401245316|\n", "| 5| AUS| 4.031128874149275|\n", "| 5| COS| 33.38163167571851|\n", "| 2| PSP| 4.878524367060187|\n", "| 4| ORD| 11.593882803741764|\n", "| 10| DFW| 45.53019017606675|\n", "| 10| DCA| 0.7071067811865476|\n", "| 8| JNU| 40.79368823727514|\n", "| 11| KOA| 1.8708286933869707|\n", "| 10| OMA| 5.8594652770823155|\n", "| 6| ONT| 25.98316762829351|\n", "| 3| MSP| 21.556779370817555|\n", "+-----+----+----------------------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "import pyspark.sql.functions as F\n", "\n", "# Group by month and est\n", "by_month_dest = flights.groupBy('month', 'dest')\n", "\n", "# Average departure delay by month and destination\n", "by_month_dest.avg('dep_delay').show()\n", "\n", "# Standard deviation of departure delay\n", "by_month_dest.agg(F.stddev('dep_delay')).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Joining\n", "Another very common data operation is the join. Joins are a whole topic unto themselves, so in this course we'll just look at simple joins.\n", "\n", "A join will combine two different tables along a column that they share. This column is called the key. Examples of keys here include the `tailnum` and `carrier` columns from the `flights` table.\n", "\n", "For example, suppose that you want to know more information about the plane that flew a flight than just the tail number. This information isn't in the `flights` table because the same plane flies many different flights over the course of two years, so including this information in every row would result in a lot of duplication. To avoid this, you'd have a second table that has only one row for each plane and whose columns list all the information about the plane, including its tail number. You could call this table `planes`\n", "\n", "When you join the `flights` table to this table of airplane information, you're adding all the columns from the `planes` table to the `flights` table. To fill these columns with information, you'll look at the tail number from the `flights` table and find the matching one in the `planes` table, and then use that row to fill out all the new columns.\n", "\n", "Now you'll have a much bigger table than before, but now every row has all information about the plane that flew that flight!\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Joining II\n", "In PySpark, joins are performed using the DataFrame method `.join()`. This method takes three arguments. The first is the second DataFrame that you want to join with the first one. The second argument, `on`, is the name of the key column(s) as a string. The names of the key column(s) must be the same in each table. The third argument, `how`, specifies the kind of join to perform. In this course we'll always use the value `how=\"leftouter\"`." ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "airports = (spark.read.format(\"csv\")\n", " .option(\"inferSchema\", \"true\")\n", " .option(\"header\", \"true\")\n", " .load('./dataset/airports.csv'))\n", "airports.createOrReplaceTempView(\"airports\")" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+--------------------+----------------+-----------------+----+---+---+\n", "|faa| name| lat| lon| alt| tz|dst|\n", "+---+--------------------+----------------+-----------------+----+---+---+\n", "|04G| Lansdowne Airport| 41.1304722| -80.6195833|1044| -5| A|\n", "|06A|Moton Field Munic...| 32.4605722| -85.6800278| 264| -5| A|\n", "|06C| Schaumburg Regional| 41.9893408| -88.1012428| 801| -6| A|\n", "|06N| Randall Airport| 41.431912| -74.3915611| 523| -5| A|\n", "|09J|Jekyll Island Air...| 31.0744722| -81.4277778| 11| -4| A|\n", "|0A9|Elizabethton Muni...| 36.3712222| -82.1734167|1593| -4| A|\n", "|0G6|Williams County A...| 41.4673056| -84.5067778| 730| -5| A|\n", "|0G7|Finger Lakes Regi...| 42.8835647| -76.7812318| 492| -5| A|\n", "|0P2|Shoestring Aviati...| 39.7948244| -76.6471914|1000| -5| U|\n", "|0S9|Jefferson County ...| 48.0538086| -122.8106436| 108| -8| A|\n", "|0W3|Harford County Ai...| 39.5668378| -76.2024028| 409| -5| A|\n", "|10C| Galt Field Airport| 42.4028889| -88.3751111| 875| -6| U|\n", "|17G|Port Bucyrus-Craw...| 40.7815556| -82.9748056|1003| -5| A|\n", "|19A|Jackson County Ai...| 34.1758638| -83.5615972| 951| -4| U|\n", "|1A3|Martin Campbell F...| 35.0158056| -84.3468333|1789| -4| A|\n", "|1B9| Mansfield Municipal| 42.0001331| -71.1967714| 122| -5| A|\n", "|1C9|Frazier Lake Airpark|54.0133333333333|-124.768333333333| 152| -8| A|\n", "|1CS|Clow Internationa...| 41.6959744| -88.1292306| 670| -6| U|\n", "|1G3| Kent State Airport| 41.1513889| -81.4151111|1134| -4| A|\n", "|1OH| Fortman Airport| 40.5553253| -84.3866186| 885| -5| U|\n", "+---+--------------------+----------------+-----------------+----+---+---+\n", "only showing top 20 rows\n", "\n", "None\n", "+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+------------------+--------------------+---------+-----------+----+---+---+\n", "|dest|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|air_time|distance|hour|minute| duration_hrs| name| lat| lon| alt| tz|dst|\n", "+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+------------------+--------------------+---------+-----------+----+---+---+\n", "| LAX|2014| 12| 8| 658| -7| 935| -5| VX| N846VA| 1780| SEA| 132| 954| 6| 58| 2.2| Los Angeles Intl|33.942536|-118.408075| 126| -8| A|\n", "| HNL|2014| 1| 22| 1040| 5| 1505| 5| AS| N559AS| 851| SEA| 360| 2677| 10| 40| 6.0| Honolulu Intl|21.318681|-157.922428| 13|-10| N|\n", "| SFO|2014| 3| 9| 1443| -2| 1652| 2| VX| N847VA| 755| SEA| 111| 679| 14| 43| 1.85| San Francisco Intl|37.618972|-122.374889| 13| -8| A|\n", "| SJC|2014| 4| 9| 1705| 45| 1839| 34| WN| N360SW| 344| PDX| 83| 569| 17| 5|1.3833333333333333|Norman Y Mineta S...| 37.3626|-121.929022| 62| -8| A|\n", "| BUR|2014| 3| 9| 754| -1| 1015| 1| AS| N612AS| 522| SEA| 127| 937| 7| 54|2.1166666666666667| Bob Hope|34.200667|-118.358667| 778| -8| A|\n", "| DEN|2014| 1| 15| 1037| 7| 1352| 2| WN| N646SW| 48| PDX| 121| 991| 10| 37|2.0166666666666666| Denver Intl|39.861656|-104.673178|5431| -7| A|\n", "| OAK|2014| 7| 2| 847| 42| 1041| 51| WN| N422WN| 1520| PDX| 90| 543| 8| 47| 1.5|Metropolitan Oakl...|37.721278|-122.220722| 9| -8| A|\n", "| SFO|2014| 5| 12| 1655| -5| 1842| -18| VX| N361VA| 755| SEA| 98| 679| 16| 55|1.6333333333333333| San Francisco Intl|37.618972|-122.374889| 13| -8| A|\n", "| SAN|2014| 4| 19| 1236| -4| 1508| -7| AS| N309AS| 490| SEA| 135| 1050| 12| 36| 2.25| San Diego Intl|32.733556|-117.189667| 17| -8| A|\n", "| ORD|2014| 11| 19| 1812| -3| 2352| -4| AS| N564AS| 26| SEA| 198| 1721| 18| 12| 3.3| Chicago Ohare Intl|41.978603| -87.904842| 668| -6| A|\n", "| LAX|2014| 11| 8| 1653| -2| 1924| -1| AS| N323AS| 448| SEA| 130| 954| 16| 53|2.1666666666666665| Los Angeles Intl|33.942536|-118.408075| 126| -8| A|\n", "| PHX|2014| 8| 3| 1120| 0| 1415| 2| AS| N305AS| 656| SEA| 154| 1107| 11| 20| 2.566666666666667|Phoenix Sky Harbo...|33.434278|-112.011583|1135| -7| N|\n", "| LAS|2014| 10| 30| 811| 21| 1038| 29| AS| N433AS| 608| SEA| 127| 867| 8| 11|2.1166666666666667| Mc Carran Intl|36.080056| -115.15225|2141| -8| A|\n", "| ANC|2014| 11| 12| 2346| -4| 217| -28| AS| N765AS| 121| SEA| 183| 1448| 23| 46| 3.05|Ted Stevens Ancho...|61.174361|-149.996361| 152| -9| A|\n", "| SFO|2014| 10| 31| 1314| 89| 1544| 111| AS| N713AS| 306| SEA| 129| 679| 13| 14| 2.15| San Francisco Intl|37.618972|-122.374889| 13| -8| A|\n", "| SFO|2014| 1| 29| 2009| 3| 2159| 9| UA| N27205| 1458| PDX| 90| 550| 20| 9| 1.5| San Francisco Intl|37.618972|-122.374889| 13| -8| A|\n", "| SMF|2014| 12| 17| 2015| 50| 2150| 41| AS| N626AS| 368| SEA| 76| 605| 20| 15|1.2666666666666666| Sacramento Intl|38.695417|-121.590778| 27| -8| A|\n", "| MDW|2014| 8| 11| 1017| -3| 1613| -7| WN| N8634A| 827| SEA| 216| 1733| 10| 17| 3.6| Chicago Midway Intl|41.785972| -87.752417| 620| -6| A|\n", "| BOS|2014| 1| 13| 2156| -9| 607| -15| AS| N597AS| 24| SEA| 290| 2496| 21| 56| 4.833333333333333|General Edward La...|42.364347| -71.005181| 19| -5| A|\n", "| BUR|2014| 6| 5| 1733| -12| 1945| -10| OO| N215AG| 3488| PDX| 111| 817| 17| 33| 1.85| Bob Hope|34.200667|-118.358667| 778| -8| A|\n", "+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+------------------+--------------------+---------+-----------+----+---+---+\n", "only showing top 20 rows\n", "\n", "None\n" ] } ], "source": [ "# Examine the data\n", "print(airports.show())\n", "\n", "# rename the faa column\n", "airports = airports.withColumnRenamed('faa', 'dest')\n", "\n", "# Joint the DataFrames\n", "flights_with_airports = flights.join(airports, on='dest', how='leftouter')\n", "\n", "# Examine the new DataFrame\n", "print(flights_with_airports.show())" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" } }, "nbformat": 4, "nbformat_minor": 4 }