{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# 2.1 Structured Data Introduction\n", "\n", "This notebook demonstrates the basic of processing of structured data with Spark SQL.\n", "\n", "Spark SQL is a higer level API for working with structured data. The data are represented in `DataFrames` - table like object with columns and rows conceptually similar to `panadas` or `R` data fames.\n", "\n", "\n", "`spark` is the main entry point for SparkSQL related operations. It is an instance of SparkSession and pyspark automatically creates one for you.\n", "\n", "Let's look a some way to create and display `DataFrames`:" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Inferred schema:\n", "root\n", " |-- id: long (nullable = true)\n", " |-- name: string (nullable = true)\n", " |-- price: double (nullable = true)\n", "\n", "+---+--------------+------+\n", "| id| name| price|\n", "+---+--------------+------+\n", "| 1| iPhone 6|1000.0|\n", "| 2| iPhone 7|1200.0|\n", "| 2|Samsung Galaxy| 900.0|\n", "+---+--------------+------+\n", "\n" ] } ], "source": [ "# import pyspark.sql classes and functions\n", "from pyspark.sql import *\n", "\n", "\n", "# The most direct way to create a DataFrame is to \n", "# build is from a list of `Row`s\n", "\n", "# Create a few `Row`s describing product items \n", "\n", "item1 = Row(id=1, name=\"iPhone 6\", price=1000.00)\n", "item2 = Row(id=2, name=\"iPhone 7\", price=1200.00)\n", "item3 = Row(id=2, name=\"Samsung Galaxy\", price=900.00)\n", "\n", "# Create a `DataFrame` from the list `Row`\n", "itemsDF = spark.createDataFrame([item1, item2, item3])\n", "\n", "# Each `DataFrame` is associate with a `schema` which defines names \n", "# and types of columns in the DataFrame. \n", "# `createDataFrame` by default infers the schema from the provided Rows \n", "# but later we will see how to specify the schema explicitely.\n", "\n", "# Let's print out the schema\n", "\n", "print(\"Inferred schema:\")\n", "\n", "itemsDF.printSchema()\n", "\n", "# Display the DataFrame with the `show()` function\n", "\n", "itemsDF.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can also define rows with specific types:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- date: string (nullable = true)\n", " |-- product_id: long (nullable = true)\n", " |-- user_id: string (nullable = true)\n", " |-- rating: long (nullable = true)\n", " |-- comment: string (nullable = true)\n", "\n" ] } ], "source": [ "# Create a Review Row definition\n", "Review = Row('date', 'product_id', 'user_id', 'rating', 'comment')\n", "\n", "# Create some Reviews\n", "\n", "review1 = Review('2017-01-01', 1, 'jonh', 4, 'Very nice')\n", "review2 = Review('2017-01-02', 1, 'karl', 3, None)\n", "review3 = Review('2017-01-02', 1, 'adam', 5, 'Super')\n", "review4 = Review('2017-01-03', 2, 'greg', 3, None)\n", "\n", "# Create a `DataFrame` from the list Rows with infered schema\n", "\n", "reviewsDF = spark.createDataFrame([review1, review2, review3, review4])\n", "\n", "# Print out the inferred schema\n", "reviewsDF.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If the inferred schema is not satisfactory we can define the explicit one: " ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- date: string (nullable = true)\n", " |-- product_id: long (nullable = false)\n", " |-- user_id: string (nullable = false)\n", " |-- rating: integer (nullable = true)\n", " |-- text: string (nullable = true)\n", "\n" ] }, { "data": { "text/html": [ "<div>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>date</th>\n", " <th>product_id</th>\n", " <th>user_id</th>\n", " <th>rating</th>\n", " <th>text</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>2017-01-01</td>\n", " <td>1</td>\n", " <td>jonh</td>\n", " <td>4</td>\n", " <td>Very nice</td>\n", " </tr>\n", " <tr>\n", " <th>1</th>\n", " <td>2017-01-02</td>\n", " <td>1</td>\n", " <td>karl</td>\n", " <td>3</td>\n", " <td>None</td>\n", " </tr>\n", " <tr>\n", " <th>2</th>\n", " <td>2017-01-02</td>\n", " <td>1</td>\n", " <td>adam</td>\n", " <td>5</td>\n", " <td>Super</td>\n", " </tr>\n", " <tr>\n", " <th>3</th>\n", " <td>2017-01-03</td>\n", " <td>2</td>\n", " <td>greg</td>\n", " <td>3</td>\n", " <td>None</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ "DataFrame[date: string, product_id: bigint, user_id: string, rating: int, text: string]" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "from pyspark.sql.types import *\n", "\n", "# Define the explicit schema with the column name, types and optionallity\n", "reviewSchema = StructType([\n", " StructField('date', StringType(), True),\n", " StructField('product_id', LongType(), False),\n", " StructField('user_id', StringType(), False),\n", " StructField('rating', IntegerType(), True),\n", " StructField('text', StringType(), True)\n", "])\n", "\n", "# Create a `DataFrame` from the list Rows with specified schema\n", "reviewsDF = spark.createDataFrame([review1, review2, review3, review4], schema=reviewSchema)\n", "reviewsDF.printSchema()\n", "\n", "# We can use the `display` for a nicely formatted preview of the DataFrame content\n", "display(reviewsDF)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's have a look at some basic operations and way of working with `DataFrames`:" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "4" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# count the rows in the DataFrame\n", "reviewsDF.count()" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>date</th>\n", " <th>product_id</th>\n", " <th>user_id</th>\n", " <th>rating</th>\n", " <th>text</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>2017-01-02</td>\n", " <td>1</td>\n", " <td>karl</td>\n", " <td>3</td>\n", " <td>None</td>\n", " </tr>\n", " <tr>\n", " <th>1</th>\n", " <td>2017-01-01</td>\n", " <td>1</td>\n", " <td>jonh</td>\n", " <td>4</td>\n", " <td>Very nice</td>\n", " </tr>\n", " <tr>\n", " <th>2</th>\n", " <td>2017-01-02</td>\n", " <td>1</td>\n", " <td>adam</td>\n", " <td>5</td>\n", " <td>Super</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ "DataFrame[date: string, product_id: bigint, user_id: string, rating: int, text: string]" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# We can use transoformation operations to produce new `DataFrames`\n", "# for example use `filter` to include only rows for which column `product_id` has value 1 \n", "# and `sort` to oder the results by the `rating` column.\n", "# As was the case with RDDs the transformations are `lazy`.\n", "\n", "filteredDF = reviewsDF.filter(reviewsDF.product_id == 1).sort(reviewsDF.rating)\n", "display(filteredDF)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>date</th>\n", " <th>product_id</th>\n", " <th>user_id</th>\n", " <th>rating</th>\n", " <th>text</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>2017-01-01</td>\n", " <td>1</td>\n", " <td>jonh</td>\n", " <td>4</td>\n", " <td>Very nice</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ "DataFrame[date: string, product_id: bigint, user_id: string, rating: int, text: string]" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "from pyspark.sql.functions import col\n", "\n", "# We can also `filter' using complex expressions,\n", "# for example: Reviews with `product_id` == 1 and `user_id` == \"jonh\"\n", "# (Use `&` instead of `and` and `|` instead of or)\n", "\n", "# The are two ways to reference columns in expressions: either using the `col` function \n", "# with the colum name (as below) or by referring to its DataFrame (above)\n", "\n", "filteredDF = reviewsDF.filter((col('product_id') == 1) & (col('user_id') == 'jonh'))\n", "display(filteredDF)" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>product_id</th>\n", " <th>user_id</th>\n", " <th>logRating</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>1</td>\n", " <td>adam</td>\n", " <td>1.609438</td>\n", " </tr>\n", " <tr>\n", " <th>1</th>\n", " <td>1</td>\n", " <td>jonh</td>\n", " <td>1.386294</td>\n", " </tr>\n", " <tr>\n", " <th>2</th>\n", " <td>1</td>\n", " <td>karl</td>\n", " <td>1.098612</td>\n", " </tr>\n", " <tr>\n", " <th>3</th>\n", " <td>2</td>\n", " <td>greg</td>\n", " <td>1.098612</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ "DataFrame[product_id: bigint, user_id: string, logRating: double]" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "from pyspark.sql.functions import desc, log\n", "\n", "# We can use `select` choose the columns or to create derrived columns \n", "# and `alias` to rename them.\n", "\n", "# Here we create a derrived column `logRating` from the log(`rating`)\n", "\n", "selectedDF = reviewsDF \\\n", " .select(col('product_id'), col('user_id'), log(col('rating')).alias('logRating')) \\\n", " .sort(desc('logRating'))\n", "display(selectedDF)" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>product_id</th>\n", " <th>avg(rating)</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>1</td>\n", " <td>4.0</td>\n", " </tr>\n", " <tr>\n", " <th>1</th>\n", " <td>2</td>\n", " <td>3.0</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ "DataFrame[product_id: bigint, avg(rating): double]" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# We can group the data by a colums (or columns) and compute aggregate statistics for each group\n", "# for example the average `rating` for each `product_id`:\n", "\n", "grouppedDF = reviewsDF.groupBy(col('product_id')).avg('rating')\n", "display(grouppedDF)" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>date</th>\n", " <th>product_id</th>\n", " <th>user_id</th>\n", " <th>rating</th>\n", " <th>text</th>\n", " <th>id</th>\n", " <th>name</th>\n", " <th>price</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>2017-01-01</td>\n", " <td>1</td>\n", " <td>jonh</td>\n", " <td>4</td>\n", " <td>Very nice</td>\n", " <td>1</td>\n", " <td>iPhone 6</td>\n", " <td>1000.0</td>\n", " </tr>\n", " <tr>\n", " <th>1</th>\n", " <td>2017-01-02</td>\n", " <td>1</td>\n", " <td>karl</td>\n", " <td>3</td>\n", " <td>None</td>\n", " <td>1</td>\n", " <td>iPhone 6</td>\n", " <td>1000.0</td>\n", " </tr>\n", " <tr>\n", " <th>2</th>\n", " <td>2017-01-02</td>\n", " <td>1</td>\n", " <td>adam</td>\n", " <td>5</td>\n", " <td>Super</td>\n", " <td>1</td>\n", " <td>iPhone 6</td>\n", " <td>1000.0</td>\n", " </tr>\n", " <tr>\n", " <th>3</th>\n", " <td>2017-01-03</td>\n", " <td>2</td>\n", " <td>greg</td>\n", " <td>3</td>\n", " <td>None</td>\n", " <td>2</td>\n", " <td>iPhone 7</td>\n", " <td>1200.0</td>\n", " </tr>\n", " <tr>\n", " <th>4</th>\n", " <td>2017-01-03</td>\n", " <td>2</td>\n", " <td>greg</td>\n", " <td>3</td>\n", " <td>None</td>\n", " <td>2</td>\n", " <td>Samsung Galaxy</td>\n", " <td>900.0</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ "DataFrame[date: string, product_id: bigint, user_id: string, rating: int, text: string, id: bigint, name: string, price: double]" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# We can joing two `DataFrames` together on a common column \n", "# (here the `product_id`)\n", "\n", "reviewsWithItemsDF = reviewsDF.join(itemsDF, itemsDF.id == reviewsDF.product_id)\n", "display(reviewsWithItemsDF)" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " date product_id user_id rating text id name price\n", "0 2017-01-01 1 jonh 4 Very nice 1 iPhone 6 1000.0\n", "1 2017-01-02 1 karl 3 None 1 iPhone 6 1000.0\n", "2 2017-01-02 1 adam 5 Super 1 iPhone 6 1000.0\n" ] } ], "source": [ "# we can convert (small) SparkSQL `DataFrames` to `pandas` data frames\n", "\n", "reviewsWithItemsPD = reviewsWithItemsDF.limit(3).toPandas()\n", "print(reviewsWithItemsPD)" ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Finally we can save a `DataFrame` in one of the supported formats, \n", "# for example `csv`\n", "\n", "reviewsDF.write.csv('output/reviews.csv', mode='overwrite')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's preview the results:" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "total 32\n", "-rw-r--r-- 1 szu004 staff 0 10 Jul 16:25 _SUCCESS\n", "-rw-r--r-- 1 szu004 staff 30 10 Jul 16:25 part-00000-17ad680f-9ea3-4e5b-9147-fcea196092af.csv\n", "-rw-r--r-- 1 szu004 staff 21 10 Jul 16:25 part-00001-17ad680f-9ea3-4e5b-9147-fcea196092af.csv\n", "-rw-r--r-- 1 szu004 staff 26 10 Jul 16:25 part-00002-17ad680f-9ea3-4e5b-9147-fcea196092af.csv\n", "-rw-r--r-- 1 szu004 staff 21 10 Jul 16:25 part-00003-17ad680f-9ea3-4e5b-9147-fcea196092af.csv\n", "\n", "Content:\n", "2017-01-01,1,jonh,4,Very nice\n", "2017-01-02,1,karl,3,\n", "2017-01-02,1,adam,5,Super\n", "2017-01-03,2,greg,3,\n" ] } ], "source": [ "%%sh\n", "\n", "ls -l output/reviews.csv\n", "\n", "echo\n", "echo \"Content:\"\n", "\n", "cat output/reviews.csv/*.csv" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To find out more about SparkSQL and `DataFrames` please check the:\n", "\n", "* [Spark SQL Programming Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html) \n", "* [SparkSQL API Documentation](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame)\n", "\n", "You can now play around modifying pieces of the code.\n", "\n", "When you are done and you are running off the local machine remember to *close the notebook* with `File/Close and Halt`." ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pyspark" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.11" } }, "nbformat": 4, "nbformat_minor": 1 }