{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Lab 2 - Spark SQL\n", "This lab will show you how to work with SparkSQL. It's meant to be self-guided, but don't hesitate to ask your presentor for help. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 1 - Getting started: Create a [SQL Context](https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.SQLContext)\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Hint 1\n", "

\n", "
\n", "
\n", "
SQLContext is not included by default. You need to import it from pyspark.sql
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Hint 2\n", "

\n", "
\n", "
\n", "
SQLContext() takes a single parameter which is the current Spark context.
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Hint 3\n", "

\n", "
\n", "
\n", "
Type:
\n", "\n", "from pyspark.sql import SQLContext
\n", "sqlContext = SQLContext(sc)
\n", "
\n", "
\n", "
\n" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": true }, "outputs": [], "source": [ "#Import the SparkSQL library and connect to the current Spark context\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 2 - Download a JSON Recordset to work with\n", "Let's download the data, we can run commands on the console of the server (or docker image) that the notebook environment is using. To do so we simply put a \"!\" in front of the command that we want to run. For example:\n", "\n", "!pwd\n", "\n", "To get the data we will download a file to the environment. Simple run these two commands, the first just ensures that the file is removed if it exists:\n", "\n", "!rm world_bank.json.gz -f
\n", "!wget https://raw.githubusercontent.com/bradenrc/sparksql_pot/master/world_bank.json.gz

\n", "
\n", "
\n", "
\n", "

\n", " \n", " Advanced Optional 1\n", "

\n", "
\n", "
\n", "
Comment out the rm statement i.e. #!rm and re-run this section. What is the name of the downloaded file?
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Advanced Optional 2\n", "

\n", "
\n", "
\n", "
Add !ls to see all the files currently in storage. Try running !mkdir testdir
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Advanced Optional 3\n", "

\n", "
\n", "
\n", "
Clean up all added files/directories. Use !rmdir to remove a directory.
\n", "
\n", "
\n", "
\n" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": false }, "outputs": [], "source": [ "#Download file here\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 3 - Create a Dataframe \n", "
\n", "Use the SQLContext you created earlier to read the World Bank json data - world_bank.json.gz and return it as a Dataframe

\n", "
\n", "
\n", "
\n", "

\n", " \n", " Hint 1\n", "

\n", "
\n", "
\n", "
Use the read variable in SQLContext to return a Dataframe reader
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Hint 2\n", "

\n", "
\n", "
\n", "
Use the json() method in Dataframe to read the file. Note that the method handles a gzipped file format.
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Hint 3\n", "

\n", "
\n", "
\n", "
To create the Dataframe type:
\n", "\n", "example1_df = sqlContext.read.json(\"world_bank.json.gz\")
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Advanced Optional\n", "

\n", "
\n", "
\n", "
Obtain the same result by using textFile() to read the file as RDD and then convert to a Dataframe
\n", "
\n", "
\n", "
\n" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": false }, "outputs": [], "source": [ "#Create the Dataframe here:\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ " ## Step 3.1 - Show the Dataframe schema\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Hint 1\n", "

\n", "
\n", "
\n", "

We can look at the schema with this command:

\n", "\n", "Type:
\n", "example1_df.printSchema()
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Advanced Optional 1\n", "

\n", "
\n", "
\n", "
Get the dataframe columns. Try using command-completion (use TAB after the .) to obtain the list of possible methods/values
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Advanced Optional 2\n", "

\n", "
\n", "
\n", "
Convert the dataframe back to JSON and print the first value
\n", "
\n", "
\n", "
" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": false }, "outputs": [], "source": [ "#Print out the schema\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 3.2 - Using the Dataframe\n", "
\n", "Dataframes are a subset of RDDs and can be similarly transformed. You can map and filter them.\n", "
Take a look at the first two rows of data using the [take()](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=take#pyspark.sql.DataFrame.take) function.
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Hint 1\n", "

\n", "
\n", "
\n", "
example1_df.take(2)
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Advanced Optional 1\n", "

\n", "
\n", "
\n", "
take() returns data as an RDD list of Row objects. show() prints the objects to the console. What is the default number of rows displayed?
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Advanced Optional 2\n", "

\n", "
\n", "
\n", "
Save the table as a parquet table. Use !ls to confirm it was saved. Use a DataFrameWriter. What did you see when you ran the ls command?
\n", "
\n", "
\n", "
\n", " \n", "\n" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "collapsed": false }, "outputs": [], "source": [ "#Use take on the DataFrame to pull out 2 rows\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 4 - Register a Temp Table\n", "
\n", "SQL works on tables. Currently we have data in a dataframe, but we have no table identifier for it. Thus, we want to create a temporary table reference that refers to this dataframe.\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Hint 1\n", "

\n", "
\n", "
\n", "
The function is: DataframeObject.registerTempTable(\"name_of_table\")
\n", "Create a table named \"world_bank\"
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Hint 2\n", "

\n", "
\n", "
\n", "
example1_df.registerTempTable(\"world_bank\")
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Advanced Optional 1\n", "

\n", "
\n", "
\n", "
Use the tables() method in SQLContext to list all tables and their state. Extra Hint: show()
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Advanced Optional 2\n", "

\n", "
\n", "
\n", "
Try creating a second temporary table on the same dataframe. What does tables() return?
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Advanced Optional 3\n", "

\n", "
\n", "
\n", "
Drop the additional temp table. What does tables() return?
\n", "
\n", "
\n", "
\n", "\n" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "collapsed": false }, "outputs": [], "source": [ "#Create the table to be referenced via SparkSQL\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 5 - Writing SQL Statements\n", "
\n", "Write SQL statements to return two rows from the world_bank table.\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Hint 1\n", "

\n", "
\n", "
\n", "
Use the sql() method on your SQLContext
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Hint 2\n", "

\n", "
\n", "
\n", "
Use limit (i.e. limit 2) within your SQL statement to limit the number of rows returned. Use show() to display the values.
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Hint 3\n", "

\n", "
\n", "
\n", "
Type:
\n", " sqlContext.sql(\"select * from world_bank limit 2\").show()
\n", "
\n", "
\n", "
\n" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "collapsed": false }, "outputs": [], "source": [ "#Use SQL to query the table and print the output\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Step 5.1 - Writing SQL Statements\n", "
\n", "Try writing the next three sections yourself first. Each hint contains the solution for that section. We provide this here because this is more SQL than Spark and not everyone is familar with SQL. Nor is this an SQL class!\n", "

\n", "
\n", "
\n", "
\n", "

\n", " \n", " Hint 1\n", "

\n", "
\n", "
\n", "
sqlContext.sql(\"select * from world_bank limit 2\").toPandas()
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Hint 2\n", "

\n", "
\n", "
\n", "
sqlContext.sql(\"select regionname, count(*) as regioncount from world_bank group by regionname order by regioncount desc\").toPandas()
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Hint 3\n", "

\n", "
\n", "
\n", "
sqlContext.sql(\"select sector.Name from world_bank limit 5\").toPandas()
\n", "
\n", "
\n", "
" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "collapsed": false }, "outputs": [], "source": [ "#Extra credit, take the DataFrame you created with the two records and convert it into a Pandas DataFrame\n", "\n" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# Now calculate a simple count based on a group, for example \"regionname\". Return the regionname and a count of the values for that regionname. \n", "\n" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# With JSON data you can reference the nested data. \n", "# If you look at the Schema above you can see that sector.Name is a nested column.\n", "# Select that column and limit to a reasonable output (say five rows)\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 6 - Creating Simple Graphs\n", "
\n", "Create some simple graphs using the [matplotlib](http://matplotlib.org/1.5.3/index.html) and [numpy](http://www.numpy.org/) libraries\n", "
\n", "The \"%matplotlib inline\" statement is used to ensure that graphs are drawn within the notebook instead of popping up as separate windows.\n", "\n", "Make SURE you actually run this cell!" ] }, { "cell_type": "code", "execution_count": 44, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Load the libraries\n", "%matplotlib inline \n", "import matplotlib.pyplot as plt, numpy as np" ] }, { "cell_type": "markdown", "metadata": { "collapsed": false }, "source": [ "### Step 6.1 - Create the SQL data\n", "Write the sql statement and look at the data, remember to add .toPandas() for a formatted display. An easier option is to create a variable and set it to the SQL statement.\n", "#### First create a SQL statement that is a reasonable number of items\n", "For example, you can count the number of projects (rows) by countryname\n", "
or in other words: \n", "
count(*), countryname from table group by countryname

\n", "\n", "
\n", "
\n", "
\n", "

\n", " \n", " Hint 1\n", "

\n", "
\n", "
\n", "
Type:
\n", "query = \"select count(*) as Count, countryname from world_bank group by countryname\"
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Hint 2\n", "

\n", "
\n", "
\n", "
Type:
\n", " chart1_df = sqlContext.sql(query).toPandas()
\n", "print chart1_df
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Advanced Optional 1\n", "

\n", "
\n", "
\n", "
Printing the result isn't as nicely formatted. What command gives you a nicely formatted output? Use that instead.
\n", "
\n", "
\n", "
\n" ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# create the query to obtain the number of projects by countryname, save to a variable and print that variable\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Step 6.2 - Create charts based on the SQL data\n", "
\n", "Here we wish to create a chart based on the SQL data we just obtained. Python is an excellent choice when you need to create charts because of the variety and power of the charting libraries available. The one we are using here is for Pandas. Specifically the plot() method. Documentation can be found here.\n", "\n", "
\n", "
\n", "
\n", "

\n", " \n", " Hint 1\n", "

\n", "
\n", "
\n", "
Type:
\n", " chart1_df.plot(kind='bar', x='countryname', y='Count', figsize=(12, 5))
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Advanced Optional 1\n", "

\n", "
\n", "
\n", "
The table contains too much data. Change the SQL statement to return a smaller group of values like 30 or 40
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Advanced Optional 2\n", "

\n", "
\n", "
\n", "
Looking at the Pandas plot() documentation try other styles of plotting. Look here for ideas.
\n", "
\n", "
\n", "
\n" ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# Now take the variable (or same sql statement) and use the method:\n", "# .plot(kind='bar', x='countryname', y='Count', figsize=(12, 5)) to plot a graph\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 7 - Creating a DataFrame\n", "
\n", "Not all data comes with a defined (or derivable) schema like JSON. Sometimes we have the data first and then need to create a schema for it.
\n", "Try adding a schema to an RDD to create a DataFrame.
\n", "First, you need to create an RDD. This can be done with a loop or as\n", "seen in the instructor's example, or more simply by assigning values to an array.\n", "

" ] }, { "cell_type": "code", "execution_count": 74, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "[[1, 1, 1], [2, 2, 2], [3, 3, 3], ['4a', '4a', '4a'], [5, 5, 5]]" ] }, "execution_count": 74, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Default array defined below. Feel free to change as desired.\n", "array=[[1,1,1],[2,2,2],[3,3,3],[\"4a\",\"4a\",\"4a\"],[5,5,5]]\n", "my_rdd = sc.parallelize(array)\n", "my_rdd.collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Step 7.1 - Second, you need to add a schema to the RDD you created in the previous code block.\n", "Use first the StructField method, following these steps:
\n", "1- Define your schema columns as a string
\n", "2- Build the schema object using StructField
\n", "3- Apply the schema object to the RDD
\n", "\n", "Note: The cell below is missing some code and will not run properly until you add in some missing parts.\n", "

\n", "
\n", "
\n", "
\n", "

\n", " \n", " Hint 1\n", "

\n", "
\n", "
\n", "
The schema string is simply the space-separated list of column names (i.e. \"var1 var2 var3\")
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Hint 2\n", "

\n", "
\n", "
\n", "
The type should be IntegerType or StringType. Note that because we are applying this is a loop *everything* will be an Integer or String
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Hint 3\n", "

\n", "
\n", "
\n", "
Use the RDD you created above and apply the schema to it i.e.
\n", " schemaExample = sqlContext.createDataFrame(my_rdd, schema)\n", "
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Hint 4\n", "

\n", "
\n", "
\n", "
We really don't need to tell you a name to use for your temp table do we?
\n", "
\n", "
\n", "
\n" ] }, { "cell_type": "code", "execution_count": 75, "metadata": { "collapsed": true }, "outputs": [], "source": [ "from pyspark.sql.types import *\n", "\n", "# The schema is encoded in a string. Complete the string below\n", "schemaString = \"var1 var2 var3\"\n", "\n", "# MissingType() should be either StringType() or IntegerType(). Please replace as required.\n", "fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]\n", "schema = StructType(fields)\n", "\n", "# Apply the schema to the RDD.\n", "schemaExample = sqlContext.createDataFrame(my_rdd, schema)\n", "\n", "# Register the DataFrame as a table. Add table name below as parameter to registerTempTable.\n", "schemaExample.registerTempTable(\"myRDDTempTable\")\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Step 7.2 - Thirdly, write some SQL statements to verify that you successfully added a schema to your RDD\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Hint 1\n", "

\n", "
\n", "
\n", "
sqlContext.sql(\"select * from myRDDTempTable\").toPandas()
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Advanced Optional 1\n", "

\n", "
\n", "
\n", "
What is the type is changed to IntegerType (or StringType). Any change in the results?
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Advanced Optional 2\n", "

\n", "
\n", "
\n", "
Try to do some specific queries on the data (i.e.)
\n", " sqlContext.sql(\"select * from myRDDTempTable where var3 > 2\").toPandas()
\n", " Does this work regardless of the data type (i.e. IntegerType or StringType)?
\n", " What if you change some of the input values (i.e. change all 4s to \"4a\")
\n", "
\n", "
\n", "
" ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "collapsed": false }, "outputs": [], "source": [ "#Run some SQL statements on your newly created DataFrame and display the output\n", "#sqlContext.sql(\"select * from myRDDTempTable\").toPandas()\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 8\n", "### Reading from an external data source\n", "If you have time, this is a good example to show you how to read from other datasources.

\n", "In a different browser tab, create a dashDB service, add credentials and come back to this notebook.
If you are using Data Science Experience, you need to log into Bluemix and create a dashDB instance. The login and password should be the same as for DSE.
\n", "Each dashDB instance in Bluemix is created with a \"GOSALES\" set of tables which we can reuse for the purpose of this example. (You can create your own tables if you wish...)

Replace the Xs in the cell below with proper credentials and verify access to dashDB tables.

\n", "You can read from any database that you can connect to through jdbc. Here is the [documentation](http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases)\n", "

\n", "
\n", "
\n", "
\n", "

\n", " \n", " Hint 1\n", "

\n", "
\n", "
\n", "
To connect to a general dashDB instance:
\n", " url=\"\"
\n", "user=\"\"
\n", "password=\"\"
\n", "connection=\"jdbc:db2://\" + url + \":50000/BLUDB:user=\" + user + \";password=\" + password + \";\"
\n", "
\n", "
\n", "
\n", "
\n", "

\n", " \n", " Advanced Optional\n", "

\n", "
\n", "
\n", "
Create your own dashDB instance in Bluemix and connect to it
\n", "
\n", "
\n", "
\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "url=\"\"\n", "user=\"\"\n", "password=\"\"\n", "connection=\"jdbc:db2://\" + url + \":50000/BLUDB:user=\" + user + \";password=\" + password + \";\"\n", "\n", "salesDF = sqlContext.read.format('jdbc').\\\n", " options(url=connection,\\\n", " dbtable='GOSALES.BRANCH').load()\n", "salesDF.show()\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 2 with Spark 1.6", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.11" } }, "nbformat": 4, "nbformat_minor": 0 }