{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 2.1 Structured Data Introduction\n",
    "\n",
    "This notebook demonstrates the basic of processing of structured data with Spark SQL.\n",
    "\n",
    "Spark SQL is a higer level API for working with structured data. The data are represented in `DataFrames` - table like object with columns and rows conceptually similar to `panadas` or `R` data fames.\n",
    "\n",
    "\n",
    "`spark` is the main entry point for SparkSQL related operations. It is an instance of SparkSession and pyspark automatically creates one for you.\n",
    "\n",
    "Let's look a some way to create and display `DataFrames`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Inferred schema:\n",
      "root\n",
      " |-- id: long (nullable = true)\n",
      " |-- name: string (nullable = true)\n",
      " |-- price: double (nullable = true)\n",
      "\n",
      "+---+--------------+------+\n",
      "| id|          name| price|\n",
      "+---+--------------+------+\n",
      "|  1|      iPhone 6|1000.0|\n",
      "|  2|      iPhone 7|1200.0|\n",
      "|  2|Samsung Galaxy| 900.0|\n",
      "+---+--------------+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# import pyspark.sql classes and functions\n",
    "from pyspark.sql import *\n",
    "\n",
    "\n",
    "# The most direct way to create a DataFrame is to \n",
    "# build is from a list of `Row`s\n",
    "\n",
    "# Create a few `Row`s describing product items \n",
    "\n",
    "item1 = Row(id=1, name=\"iPhone 6\", price=1000.00)\n",
    "item2 = Row(id=2, name=\"iPhone 7\", price=1200.00)\n",
    "item3 = Row(id=2, name=\"Samsung Galaxy\", price=900.00)\n",
    "\n",
    "# Create a `DataFrame` from the list `Row`\n",
    "itemsDF = spark.createDataFrame([item1,  item2, item3])\n",
    "\n",
    "# Each `DataFrame` is associate with a `schema` which defines names \n",
    "# and types of columns in the DataFrame. \n",
    "# `createDataFrame` by default infers the schema from the provided Rows \n",
    "# but later we will see how to specify the schema explicitely.\n",
    "\n",
    "# Let's print out the schema\n",
    "\n",
    "print(\"Inferred schema:\")\n",
    "\n",
    "itemsDF.printSchema()\n",
    "\n",
    "# Display the DataFrame with the `show()` function\n",
    "\n",
    "itemsDF.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can also define rows with specific types:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- date: string (nullable = true)\n",
      " |-- product_id: long (nullable = true)\n",
      " |-- user_id: string (nullable = true)\n",
      " |-- rating: long (nullable = true)\n",
      " |-- comment: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# Create a Review Row definition\n",
    "Review = Row('date', 'product_id', 'user_id', 'rating', 'comment')\n",
    "\n",
    "# Create some Reviews\n",
    "\n",
    "review1 = Review('2017-01-01', 1, 'jonh', 4, 'Very nice')\n",
    "review2 = Review('2017-01-02', 1, 'karl', 3, None)\n",
    "review3 = Review('2017-01-02', 1, 'adam', 5, 'Super')\n",
    "review4 = Review('2017-01-03', 2, 'greg', 3, None)\n",
    "\n",
    "# Create a `DataFrame` from the list Rows with infered schema\n",
    "\n",
    "reviewsDF = spark.createDataFrame([review1,  review2, review3, review4])\n",
    "\n",
    "# Print out the inferred schema\n",
    "reviewsDF.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If the inferred schema is not satisfactory we can define the explicit one: "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- date: string (nullable = true)\n",
      " |-- product_id: long (nullable = false)\n",
      " |-- user_id: string (nullable = false)\n",
      " |-- rating: integer (nullable = true)\n",
      " |-- text: string (nullable = true)\n",
      "\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>date</th>\n",
       "      <th>product_id</th>\n",
       "      <th>user_id</th>\n",
       "      <th>rating</th>\n",
       "      <th>text</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>2017-01-01</td>\n",
       "      <td>1</td>\n",
       "      <td>jonh</td>\n",
       "      <td>4</td>\n",
       "      <td>Very nice</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>2017-01-02</td>\n",
       "      <td>1</td>\n",
       "      <td>karl</td>\n",
       "      <td>3</td>\n",
       "      <td>None</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>2017-01-02</td>\n",
       "      <td>1</td>\n",
       "      <td>adam</td>\n",
       "      <td>5</td>\n",
       "      <td>Super</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>2017-01-03</td>\n",
       "      <td>2</td>\n",
       "      <td>greg</td>\n",
       "      <td>3</td>\n",
       "      <td>None</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "DataFrame[date: string, product_id: bigint, user_id: string, rating: int, text: string]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "from pyspark.sql.types import *\n",
    "\n",
    "# Define the explicit schema with the column name, types and optionallity\n",
    "reviewSchema = StructType([\n",
    "    StructField('date', StringType(), True),\n",
    "    StructField('product_id', LongType(), False),\n",
    "    StructField('user_id', StringType(), False),\n",
    "    StructField('rating', IntegerType(), True),\n",
    "    StructField('text', StringType(), True)\n",
    "])\n",
    "\n",
    "# Create a `DataFrame` from the list Rows with specified schema\n",
    "reviewsDF = spark.createDataFrame([review1,  review2, review3, review4], schema=reviewSchema)\n",
    "reviewsDF.printSchema()\n",
    "\n",
    "# We can use the `display` for a nicely formatted preview of the DataFrame content\n",
    "display(reviewsDF)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now let's have a look at some basic operations and way of working with `DataFrames`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "4"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# count the rows in the DataFrame\n",
    "reviewsDF.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>date</th>\n",
       "      <th>product_id</th>\n",
       "      <th>user_id</th>\n",
       "      <th>rating</th>\n",
       "      <th>text</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>2017-01-02</td>\n",
       "      <td>1</td>\n",
       "      <td>karl</td>\n",
       "      <td>3</td>\n",
       "      <td>None</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>2017-01-01</td>\n",
       "      <td>1</td>\n",
       "      <td>jonh</td>\n",
       "      <td>4</td>\n",
       "      <td>Very nice</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>2017-01-02</td>\n",
       "      <td>1</td>\n",
       "      <td>adam</td>\n",
       "      <td>5</td>\n",
       "      <td>Super</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "DataFrame[date: string, product_id: bigint, user_id: string, rating: int, text: string]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "# We can use transoformation operations to produce new `DataFrames`\n",
    "# for example use `filter` to include only rows for which column `product_id` has value 1 \n",
    "# and `sort` to oder the results by the `rating` column.\n",
    "# As was the case with RDDs the transformations are `lazy`.\n",
    "\n",
    "filteredDF = reviewsDF.filter(reviewsDF.product_id == 1).sort(reviewsDF.rating)\n",
    "display(filteredDF)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>date</th>\n",
       "      <th>product_id</th>\n",
       "      <th>user_id</th>\n",
       "      <th>rating</th>\n",
       "      <th>text</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>2017-01-01</td>\n",
       "      <td>1</td>\n",
       "      <td>jonh</td>\n",
       "      <td>4</td>\n",
       "      <td>Very nice</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "DataFrame[date: string, product_id: bigint, user_id: string, rating: int, text: string]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "from pyspark.sql.functions import col\n",
    "\n",
    "# We can also `filter' using complex expressions,\n",
    "# for example: Reviews with `product_id` == 1 and `user_id` == \"jonh\"\n",
    "# (Use `&` instead of `and`  and `|` instead of or)\n",
    "\n",
    "# The are two ways to reference columns in expressions: either using the `col` function \n",
    "# with the colum name (as below) or by referring to its DataFrame (above)\n",
    "\n",
    "filteredDF = reviewsDF.filter((col('product_id') == 1) & (col('user_id') == 'jonh'))\n",
    "display(filteredDF)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>product_id</th>\n",
       "      <th>user_id</th>\n",
       "      <th>logRating</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>1</td>\n",
       "      <td>adam</td>\n",
       "      <td>1.609438</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>1</td>\n",
       "      <td>jonh</td>\n",
       "      <td>1.386294</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>1</td>\n",
       "      <td>karl</td>\n",
       "      <td>1.098612</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>2</td>\n",
       "      <td>greg</td>\n",
       "      <td>1.098612</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "DataFrame[product_id: bigint, user_id: string, logRating: double]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "from pyspark.sql.functions import desc, log\n",
    "\n",
    "# We can use `select` choose the columns or to create derrived columns \n",
    "# and `alias` to rename them.\n",
    "\n",
    "# Here we create a derrived column `logRating` from the log(`rating`)\n",
    "\n",
    "selectedDF = reviewsDF \\\n",
    "    .select(col('product_id'), col('user_id'), log(col('rating')).alias('logRating')) \\\n",
    "    .sort(desc('logRating'))\n",
    "display(selectedDF)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>product_id</th>\n",
       "      <th>avg(rating)</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>1</td>\n",
       "      <td>4.0</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>2</td>\n",
       "      <td>3.0</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "DataFrame[product_id: bigint, avg(rating): double]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "# We can group the data by a colums (or columns) and compute aggregate statistics for each group\n",
    "# for example the average `rating` for each `product_id`:\n",
    "\n",
    "grouppedDF = reviewsDF.groupBy(col('product_id')).avg('rating')\n",
    "display(grouppedDF)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>date</th>\n",
       "      <th>product_id</th>\n",
       "      <th>user_id</th>\n",
       "      <th>rating</th>\n",
       "      <th>text</th>\n",
       "      <th>id</th>\n",
       "      <th>name</th>\n",
       "      <th>price</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>2017-01-01</td>\n",
       "      <td>1</td>\n",
       "      <td>jonh</td>\n",
       "      <td>4</td>\n",
       "      <td>Very nice</td>\n",
       "      <td>1</td>\n",
       "      <td>iPhone 6</td>\n",
       "      <td>1000.0</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>2017-01-02</td>\n",
       "      <td>1</td>\n",
       "      <td>karl</td>\n",
       "      <td>3</td>\n",
       "      <td>None</td>\n",
       "      <td>1</td>\n",
       "      <td>iPhone 6</td>\n",
       "      <td>1000.0</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>2017-01-02</td>\n",
       "      <td>1</td>\n",
       "      <td>adam</td>\n",
       "      <td>5</td>\n",
       "      <td>Super</td>\n",
       "      <td>1</td>\n",
       "      <td>iPhone 6</td>\n",
       "      <td>1000.0</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>2017-01-03</td>\n",
       "      <td>2</td>\n",
       "      <td>greg</td>\n",
       "      <td>3</td>\n",
       "      <td>None</td>\n",
       "      <td>2</td>\n",
       "      <td>iPhone 7</td>\n",
       "      <td>1200.0</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>4</th>\n",
       "      <td>2017-01-03</td>\n",
       "      <td>2</td>\n",
       "      <td>greg</td>\n",
       "      <td>3</td>\n",
       "      <td>None</td>\n",
       "      <td>2</td>\n",
       "      <td>Samsung Galaxy</td>\n",
       "      <td>900.0</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "DataFrame[date: string, product_id: bigint, user_id: string, rating: int, text: string, id: bigint, name: string, price: double]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "# We can joing two `DataFrames` together on a common column \n",
    "# (here the `product_id`)\n",
    "\n",
    "reviewsWithItemsDF = reviewsDF.join(itemsDF,  itemsDF.id == reviewsDF.product_id)\n",
    "display(reviewsWithItemsDF)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "         date  product_id user_id  rating       text  id      name   price\n",
      "0  2017-01-01           1    jonh       4  Very nice   1  iPhone 6  1000.0\n",
      "1  2017-01-02           1    karl       3       None   1  iPhone 6  1000.0\n",
      "2  2017-01-02           1    adam       5      Super   1  iPhone 6  1000.0\n"
     ]
    }
   ],
   "source": [
    "# we can convert (small) SparkSQL `DataFrames` to `pandas` data frames\n",
    "\n",
    "reviewsWithItemsPD = reviewsWithItemsDF.limit(3).toPandas()\n",
    "print(reviewsWithItemsPD)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Finally we can save a `DataFrame` in one of the supported formats, \n",
    "# for example `csv`\n",
    "\n",
    "reviewsDF.write.csv('output/reviews.csv', mode='overwrite')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's preview the results:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "total 32\n",
      "-rw-r--r--  1 szu004  staff   0 10 Jul 16:25 _SUCCESS\n",
      "-rw-r--r--  1 szu004  staff  30 10 Jul 16:25 part-00000-17ad680f-9ea3-4e5b-9147-fcea196092af.csv\n",
      "-rw-r--r--  1 szu004  staff  21 10 Jul 16:25 part-00001-17ad680f-9ea3-4e5b-9147-fcea196092af.csv\n",
      "-rw-r--r--  1 szu004  staff  26 10 Jul 16:25 part-00002-17ad680f-9ea3-4e5b-9147-fcea196092af.csv\n",
      "-rw-r--r--  1 szu004  staff  21 10 Jul 16:25 part-00003-17ad680f-9ea3-4e5b-9147-fcea196092af.csv\n",
      "\n",
      "Content:\n",
      "2017-01-01,1,jonh,4,Very nice\n",
      "2017-01-02,1,karl,3,\n",
      "2017-01-02,1,adam,5,Super\n",
      "2017-01-03,2,greg,3,\n"
     ]
    }
   ],
   "source": [
    "%%sh\n",
    "\n",
    "ls -l output/reviews.csv\n",
    "\n",
    "echo\n",
    "echo \"Content:\"\n",
    "\n",
    "cat output/reviews.csv/*.csv"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To find out more about SparkSQL and `DataFrames` please check the:\n",
    "\n",
    "* [Spark SQL Programming Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html) \n",
    "* [SparkSQL API Documentation](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame)\n",
    "\n",
    "You can now play around modifying pieces of the code.\n",
    "\n",
    "When you are done and you are running off the local machine remember to *close the notebook* with `File/Close and Halt`."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "python",
   "name": "pyspark"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.11"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}