{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# PySpark Demo Plotly Notebook\n",
"## Demo\n",
"1. Setup Spark\n",
"2. Load Kaggle Data\n",
"3. Transform Data with Spark SQL\n",
"4. Graph Data with Plotly\n",
"\n",
"_Prepared by: [Gary A. Stafford](https://twitter.com/GaryStafford) \n",
"Associated article: https://wp.me/p1RD28-61V_"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Setup Spark\n",
"Setup Spark SparkSession"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"! pip install pip --upgrade --quiet"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import SparkSession\n",
"from pyspark.sql.types import StructType, StructField, StringType, IntegerType"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"spark = SparkSession \\\n",
" .builder \\\n",
" .appName('pyspark_demo_app') \\\n",
" .master(\"local[*]\") \\\n",
" .getOrCreate()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Load Kaggle Data\n",
"Load the Kaggle dataset from the CSV file, containing ~21K records, into a DataFrame"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------+--------+-----------+-------------+\n",
"| date| time|transaction| item|\n",
"+----------+--------+-----------+-------------+\n",
"|2016-10-30|09:58:11| 1| Bread|\n",
"|2016-10-30|10:05:34| 2| Scandinavian|\n",
"|2016-10-30|10:05:34| 2| Scandinavian|\n",
"|2016-10-30|10:07:57| 3|Hot chocolate|\n",
"|2016-10-30|10:07:57| 3| Jam|\n",
"|2016-10-30|10:07:57| 3| Cookies|\n",
"|2016-10-30|10:08:41| 4| Muffin|\n",
"|2016-10-30|10:13:03| 5| Coffee|\n",
"|2016-10-30|10:13:03| 5| Pastry|\n",
"|2016-10-30|10:13:03| 5| Bread|\n",
"+----------+--------+-----------+-------------+\n",
"only showing top 10 rows\n",
"\n"
]
},
{
"data": {
"text/plain": [
"21293"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"bakery_schema = StructType([\n",
" StructField('date', StringType(), True),\n",
" StructField('time', StringType(), True),\n",
" StructField('transaction', IntegerType(), True),\n",
" StructField('item', StringType(), True)\n",
"])\n",
"\n",
"df_bakery1 = spark.read \\\n",
" .format(\"csv\") \\\n",
" .option(\"header\", \"true\") \\\n",
" .load(\"BreadBasket_DMS.csv\", schema=bakery_schema)\n",
"\n",
"df_bakery1.show(10)\n",
"df_bakery1.count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Transform Data with Spark SQL\n",
"Transform the DataFrame's bakery data using Spark SQL"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------+-----------+-------------+\n",
"| date|transaction| item|\n",
"+----------+-----------+-------------+\n",
"|2016-10-30| 1| Bread|\n",
"|2016-10-30| 2| Scandinavian|\n",
"|2016-10-30| 2| Scandinavian|\n",
"|2016-10-30| 3|Hot chocolate|\n",
"|2016-10-30| 3| Jam|\n",
"+----------+-----------+-------------+\n",
"only showing top 5 rows\n",
"\n"
]
},
{
"data": {
"text/plain": [
"20507"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df_bakery1.createOrReplaceTempView(\"bakery_table_tmp1\")\n",
"\n",
"df_bakery2 = spark.sql(\"SELECT date, transaction, item \" +\n",
" \"FROM bakery_table_tmp1 \" +\n",
" \"WHERE item NOT LIKE 'NONE'\" +\n",
" \"ORDER BY transaction\")\n",
"df_bakery2.show(5)\n",
"df_bakery2.count()"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------+-----+\n",
"| date|count|\n",
"+----------+-----+\n",
"|2017-01-01| 1|\n",
"|2017-01-03| 87|\n",
"|2017-01-04| 76|\n",
"|2017-01-05| 95|\n",
"|2017-01-06| 84|\n",
"+----------+-----+\n",
"only showing top 5 rows\n",
"\n"
]
},
{
"data": {
"text/plain": [
"98"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df_bakery2.createOrReplaceTempView(\"bakery_table_tmp2\")\n",
"\n",
"df_bakery3 = spark.sql(\"SELECT date, count(*) as count \" +\n",
" \"FROM bakery_table_tmp2 \" +\n",
" \"WHERE date >= '2017-01-01' \" +\n",
" \"GROUP BY date \" +\n",
" \"ORDER BY date\")\n",
"df_bakery3.show(5)\n",
"df_bakery3.count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Use Plotly to Visualize Data\n",
"Use [Plotly](https://plot.ly/python/) to create a chart showing bakery items sold over time \n",
"Demostrates linear fit and data smoothing:\n",
"* [Plotly Python Open Source Graphing Library](https://plot.ly/python/)\n",
"* [Smoothing in Python](https://plot.ly/python/smoothing/)\n",
"* [Linear Fit in Python](https://plot.ly/python/linear-fits/)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"! pip install plotly chart-studio --upgrade --quiet"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"import chart_studio.tools\n",
"import chart_studio.plotly as py\n",
"import plotly.graph_objs as go\n",
"from numpy import arange\n",
"from scipy import stats, signal\n",
"import warnings\n",
"warnings.filterwarnings('ignore')"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"# *** UPDATE WITH YOUR CREDENTIALS FOR PLOTLY ***\n",
"chart_studio.tools.set_credentials_file(username='username_goes_here', api_key='api_key_goes_here')"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
" \n",
" "
],
"text/plain": [
""
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df_bakery4 = df_bakery3.toPandas()\n",
"\n",
"# Generated linear fit\n",
"xi = arange(0, len(df_bakery4.index))\n",
"slope, intercept, r_value, p_value, std_err = stats.linregress(xi, df_bakery4['count'])\n",
"line = slope * xi + intercept\n",
"\n",
"layout = dict(title='2017 Bakery Sales',\n",
" xaxis=dict(\n",
" title='Month',\n",
" showgrid=True,\n",
" zeroline=True,\n",
" showline=True,\n",
" ticks='outside',\n",
" tickangle=45,\n",
" showticklabels=True),\n",
" yaxis=dict(\n",
" title='Items Sold/Day',\n",
" showgrid=True,\n",
" zeroline=True,\n",
" showline=True,\n",
" ticks='outside',\n",
" showticklabels=True))\n",
"\n",
"trace1 = go.Bar(x=df_bakery4['date'], y=df_bakery4['count'], name='Items Sold')\n",
"trace2 = go.Scatter(x=df_bakery4['date'], y=line, mode='lines', name='Linear Fit')\n",
"trace3 = go.Scatter(x=df_bakery4['date'], y=signal.savgol_filter(df_bakery4['count'], 53, 3),\n",
" mode='lines', name='Savitzky-Golay')\n",
"data = [trace1, trace2, trace3]\n",
"fig = dict(data=data, layout=layout)\n",
"py.iplot(fig, filename='jupyter-basic_bar.html')"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "conda_python3",
"language": "python",
"name": "conda_python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.5"
},
"pycharm": {
"stem_cell": {
"cell_type": "raw",
"metadata": {
"collapsed": false
},
"source": []
}
}
},
"nbformat": 4,
"nbformat_minor": 4
}