{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 2.1 Structured Data Introduction\n",
"\n",
"This notebook demonstrates the basic of processing of structured data with Spark SQL.\n",
"\n",
"Spark SQL is a higer level API for working with structured data. The data are represented in `DataFrames` - table like object with columns and rows conceptually similar to `panadas` or `R` data fames.\n",
"\n",
"\n",
"`spark` is the main entry point for SparkSQL related operations. It is an instance of SparkSession and pyspark automatically creates one for you.\n",
"\n",
"Let's look a some way to create and display `DataFrames`:"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Inferred schema:\n",
"root\n",
" |-- id: long (nullable = true)\n",
" |-- name: string (nullable = true)\n",
" |-- price: double (nullable = true)\n",
"\n",
"+---+--------------+------+\n",
"| id| name| price|\n",
"+---+--------------+------+\n",
"| 1| iPhone 6|1000.0|\n",
"| 2| iPhone 7|1200.0|\n",
"| 2|Samsung Galaxy| 900.0|\n",
"+---+--------------+------+\n",
"\n"
]
}
],
"source": [
"# import pyspark.sql classes and functions\n",
"from pyspark.sql import *\n",
"\n",
"\n",
"# The most direct way to create a DataFrame is to \n",
"# build is from a list of `Row`s\n",
"\n",
"# Create a few `Row`s describing product items \n",
"\n",
"item1 = Row(id=1, name=\"iPhone 6\", price=1000.00)\n",
"item2 = Row(id=2, name=\"iPhone 7\", price=1200.00)\n",
"item3 = Row(id=2, name=\"Samsung Galaxy\", price=900.00)\n",
"\n",
"# Create a `DataFrame` from the list `Row`\n",
"itemsDF = spark.createDataFrame([item1, item2, item3])\n",
"\n",
"# Each `DataFrame` is associate with a `schema` which defines names \n",
"# and types of columns in the DataFrame. \n",
"# `createDataFrame` by default infers the schema from the provided Rows \n",
"# but later we will see how to specify the schema explicitely.\n",
"\n",
"# Let's print out the schema\n",
"\n",
"print(\"Inferred schema:\")\n",
"\n",
"itemsDF.printSchema()\n",
"\n",
"# Display the DataFrame with the `show()` function\n",
"\n",
"itemsDF.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can also define rows with specific types:"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- date: string (nullable = true)\n",
" |-- product_id: long (nullable = true)\n",
" |-- user_id: string (nullable = true)\n",
" |-- rating: long (nullable = true)\n",
" |-- comment: string (nullable = true)\n",
"\n"
]
}
],
"source": [
"# Create a Review Row definition\n",
"Review = Row('date', 'product_id', 'user_id', 'rating', 'comment')\n",
"\n",
"# Create some Reviews\n",
"\n",
"review1 = Review('2017-01-01', 1, 'jonh', 4, 'Very nice')\n",
"review2 = Review('2017-01-02', 1, 'karl', 3, None)\n",
"review3 = Review('2017-01-02', 1, 'adam', 5, 'Super')\n",
"review4 = Review('2017-01-03', 2, 'greg', 3, None)\n",
"\n",
"# Create a `DataFrame` from the list Rows with infered schema\n",
"\n",
"reviewsDF = spark.createDataFrame([review1, review2, review3, review4])\n",
"\n",
"# Print out the inferred schema\n",
"reviewsDF.printSchema()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If the inferred schema is not satisfactory we can define the explicit one: "
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- date: string (nullable = true)\n",
" |-- product_id: long (nullable = false)\n",
" |-- user_id: string (nullable = false)\n",
" |-- rating: integer (nullable = true)\n",
" |-- text: string (nullable = true)\n",
"\n"
]
},
{
"data": {
"text/html": [
"
\n",
"
\n",
" \n",
" \n",
" | \n",
" date | \n",
" product_id | \n",
" user_id | \n",
" rating | \n",
" text | \n",
"
\n",
" \n",
" \n",
" \n",
" | 0 | \n",
" 2017-01-01 | \n",
" 1 | \n",
" jonh | \n",
" 4 | \n",
" Very nice | \n",
"
\n",
" \n",
" | 1 | \n",
" 2017-01-02 | \n",
" 1 | \n",
" karl | \n",
" 3 | \n",
" None | \n",
"
\n",
" \n",
" | 2 | \n",
" 2017-01-02 | \n",
" 1 | \n",
" adam | \n",
" 5 | \n",
" Super | \n",
"
\n",
" \n",
" | 3 | \n",
" 2017-01-03 | \n",
" 2 | \n",
" greg | \n",
" 3 | \n",
" None | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
"DataFrame[date: string, product_id: bigint, user_id: string, rating: int, text: string]"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"from pyspark.sql.types import *\n",
"\n",
"# Define the explicit schema with the column name, types and optionallity\n",
"reviewSchema = StructType([\n",
" StructField('date', StringType(), True),\n",
" StructField('product_id', LongType(), False),\n",
" StructField('user_id', StringType(), False),\n",
" StructField('rating', IntegerType(), True),\n",
" StructField('text', StringType(), True)\n",
"])\n",
"\n",
"# Create a `DataFrame` from the list Rows with specified schema\n",
"reviewsDF = spark.createDataFrame([review1, review2, review3, review4], schema=reviewSchema)\n",
"reviewsDF.printSchema()\n",
"\n",
"# We can use the `display` for a nicely formatted preview of the DataFrame content\n",
"display(reviewsDF)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now let's have a look at some basic operations and way of working with `DataFrames`:"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"4"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# count the rows in the DataFrame\n",
"reviewsDF.count()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"
\n",
" \n",
" \n",
" | \n",
" date | \n",
" product_id | \n",
" user_id | \n",
" rating | \n",
" text | \n",
"
\n",
" \n",
" \n",
" \n",
" | 0 | \n",
" 2017-01-02 | \n",
" 1 | \n",
" karl | \n",
" 3 | \n",
" None | \n",
"
\n",
" \n",
" | 1 | \n",
" 2017-01-01 | \n",
" 1 | \n",
" jonh | \n",
" 4 | \n",
" Very nice | \n",
"
\n",
" \n",
" | 2 | \n",
" 2017-01-02 | \n",
" 1 | \n",
" adam | \n",
" 5 | \n",
" Super | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
"DataFrame[date: string, product_id: bigint, user_id: string, rating: int, text: string]"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"# We can use transoformation operations to produce new `DataFrames`\n",
"# for example use `filter` to include only rows for which column `product_id` has value 1 \n",
"# and `sort` to oder the results by the `rating` column.\n",
"# As was the case with RDDs the transformations are `lazy`.\n",
"\n",
"filteredDF = reviewsDF.filter(reviewsDF.product_id == 1).sort(reviewsDF.rating)\n",
"display(filteredDF)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"
\n",
" \n",
" \n",
" | \n",
" date | \n",
" product_id | \n",
" user_id | \n",
" rating | \n",
" text | \n",
"
\n",
" \n",
" \n",
" \n",
" | 0 | \n",
" 2017-01-01 | \n",
" 1 | \n",
" jonh | \n",
" 4 | \n",
" Very nice | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
"DataFrame[date: string, product_id: bigint, user_id: string, rating: int, text: string]"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"from pyspark.sql.functions import col\n",
"\n",
"# We can also `filter' using complex expressions,\n",
"# for example: Reviews with `product_id` == 1 and `user_id` == \"jonh\"\n",
"# (Use `&` instead of `and` and `|` instead of or)\n",
"\n",
"# The are two ways to reference columns in expressions: either using the `col` function \n",
"# with the colum name (as below) or by referring to its DataFrame (above)\n",
"\n",
"filteredDF = reviewsDF.filter((col('product_id') == 1) & (col('user_id') == 'jonh'))\n",
"display(filteredDF)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"
\n",
" \n",
" \n",
" | \n",
" product_id | \n",
" user_id | \n",
" logRating | \n",
"
\n",
" \n",
" \n",
" \n",
" | 0 | \n",
" 1 | \n",
" adam | \n",
" 1.609438 | \n",
"
\n",
" \n",
" | 1 | \n",
" 1 | \n",
" jonh | \n",
" 1.386294 | \n",
"
\n",
" \n",
" | 2 | \n",
" 1 | \n",
" karl | \n",
" 1.098612 | \n",
"
\n",
" \n",
" | 3 | \n",
" 2 | \n",
" greg | \n",
" 1.098612 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
"DataFrame[product_id: bigint, user_id: string, logRating: double]"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"from pyspark.sql.functions import desc, log\n",
"\n",
"# We can use `select` choose the columns or to create derrived columns \n",
"# and `alias` to rename them.\n",
"\n",
"# Here we create a derrived column `logRating` from the log(`rating`)\n",
"\n",
"selectedDF = reviewsDF \\\n",
" .select(col('product_id'), col('user_id'), log(col('rating')).alias('logRating')) \\\n",
" .sort(desc('logRating'))\n",
"display(selectedDF)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"
\n",
" \n",
" \n",
" | \n",
" product_id | \n",
" avg(rating) | \n",
"
\n",
" \n",
" \n",
" \n",
" | 0 | \n",
" 1 | \n",
" 4.0 | \n",
"
\n",
" \n",
" | 1 | \n",
" 2 | \n",
" 3.0 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
"DataFrame[product_id: bigint, avg(rating): double]"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"# We can group the data by a colums (or columns) and compute aggregate statistics for each group\n",
"# for example the average `rating` for each `product_id`:\n",
"\n",
"grouppedDF = reviewsDF.groupBy(col('product_id')).avg('rating')\n",
"display(grouppedDF)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"
\n",
" \n",
" \n",
" | \n",
" date | \n",
" product_id | \n",
" user_id | \n",
" rating | \n",
" text | \n",
" id | \n",
" name | \n",
" price | \n",
"
\n",
" \n",
" \n",
" \n",
" | 0 | \n",
" 2017-01-01 | \n",
" 1 | \n",
" jonh | \n",
" 4 | \n",
" Very nice | \n",
" 1 | \n",
" iPhone 6 | \n",
" 1000.0 | \n",
"
\n",
" \n",
" | 1 | \n",
" 2017-01-02 | \n",
" 1 | \n",
" karl | \n",
" 3 | \n",
" None | \n",
" 1 | \n",
" iPhone 6 | \n",
" 1000.0 | \n",
"
\n",
" \n",
" | 2 | \n",
" 2017-01-02 | \n",
" 1 | \n",
" adam | \n",
" 5 | \n",
" Super | \n",
" 1 | \n",
" iPhone 6 | \n",
" 1000.0 | \n",
"
\n",
" \n",
" | 3 | \n",
" 2017-01-03 | \n",
" 2 | \n",
" greg | \n",
" 3 | \n",
" None | \n",
" 2 | \n",
" iPhone 7 | \n",
" 1200.0 | \n",
"
\n",
" \n",
" | 4 | \n",
" 2017-01-03 | \n",
" 2 | \n",
" greg | \n",
" 3 | \n",
" None | \n",
" 2 | \n",
" Samsung Galaxy | \n",
" 900.0 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
"DataFrame[date: string, product_id: bigint, user_id: string, rating: int, text: string, id: bigint, name: string, price: double]"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"# We can joing two `DataFrames` together on a common column \n",
"# (here the `product_id`)\n",
"\n",
"reviewsWithItemsDF = reviewsDF.join(itemsDF, itemsDF.id == reviewsDF.product_id)\n",
"display(reviewsWithItemsDF)"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" date product_id user_id rating text id name price\n",
"0 2017-01-01 1 jonh 4 Very nice 1 iPhone 6 1000.0\n",
"1 2017-01-02 1 karl 3 None 1 iPhone 6 1000.0\n",
"2 2017-01-02 1 adam 5 Super 1 iPhone 6 1000.0\n"
]
}
],
"source": [
"# we can convert (small) SparkSQL `DataFrames` to `pandas` data frames\n",
"\n",
"reviewsWithItemsPD = reviewsWithItemsDF.limit(3).toPandas()\n",
"print(reviewsWithItemsPD)"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Finally we can save a `DataFrame` in one of the supported formats, \n",
"# for example `csv`\n",
"\n",
"reviewsDF.write.csv('output/reviews.csv', mode='overwrite')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's preview the results:"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"total 32\n",
"-rw-r--r-- 1 szu004 staff 0 10 Jul 16:25 _SUCCESS\n",
"-rw-r--r-- 1 szu004 staff 30 10 Jul 16:25 part-00000-17ad680f-9ea3-4e5b-9147-fcea196092af.csv\n",
"-rw-r--r-- 1 szu004 staff 21 10 Jul 16:25 part-00001-17ad680f-9ea3-4e5b-9147-fcea196092af.csv\n",
"-rw-r--r-- 1 szu004 staff 26 10 Jul 16:25 part-00002-17ad680f-9ea3-4e5b-9147-fcea196092af.csv\n",
"-rw-r--r-- 1 szu004 staff 21 10 Jul 16:25 part-00003-17ad680f-9ea3-4e5b-9147-fcea196092af.csv\n",
"\n",
"Content:\n",
"2017-01-01,1,jonh,4,Very nice\n",
"2017-01-02,1,karl,3,\n",
"2017-01-02,1,adam,5,Super\n",
"2017-01-03,2,greg,3,\n"
]
}
],
"source": [
"%%sh\n",
"\n",
"ls -l output/reviews.csv\n",
"\n",
"echo\n",
"echo \"Content:\"\n",
"\n",
"cat output/reviews.csv/*.csv"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To find out more about SparkSQL and `DataFrames` please check the:\n",
"\n",
"* [Spark SQL Programming Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html) \n",
"* [SparkSQL API Documentation](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame)\n",
"\n",
"You can now play around modifying pieces of the code.\n",
"\n",
"When you are done and you are running off the local machine remember to *close the notebook* with `File/Close and Halt`."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "PySpark",
"language": "python",
"name": "pyspark"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.11"
}
},
"nbformat": 4,
"nbformat_minor": 1
}