{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# PySpark Demo Plotly Notebook\n", "## Demo\n", "1. Setup Spark\n", "2. Load Kaggle Data\n", "3. Transform Data with Spark SQL\n", "4. Graph Data with Plotly\n", "\n", "_Prepared by: [Gary A. Stafford](https://twitter.com/GaryStafford) \n", "Associated article: https://wp.me/p1RD28-61V_" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Setup Spark\n", "Setup Spark SparkSession" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "! pip install pip --upgrade --quiet" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "from pyspark.sql.types import StructType, StructField, StringType, IntegerType" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "spark = SparkSession \\\n", " .builder \\\n", " .appName('pyspark_demo_app') \\\n", " .master(\"local[*]\") \\\n", " .getOrCreate()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Load Kaggle Data\n", "Load the Kaggle dataset from the CSV file, containing ~21K records, into a DataFrame" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+--------+-----------+-------------+\n", "| date| time|transaction| item|\n", "+----------+--------+-----------+-------------+\n", "|2016-10-30|09:58:11| 1| Bread|\n", "|2016-10-30|10:05:34| 2| Scandinavian|\n", "|2016-10-30|10:05:34| 2| Scandinavian|\n", "|2016-10-30|10:07:57| 3|Hot chocolate|\n", "|2016-10-30|10:07:57| 3| Jam|\n", "|2016-10-30|10:07:57| 3| Cookies|\n", "|2016-10-30|10:08:41| 4| Muffin|\n", "|2016-10-30|10:13:03| 5| Coffee|\n", "|2016-10-30|10:13:03| 5| Pastry|\n", "|2016-10-30|10:13:03| 5| Bread|\n", "+----------+--------+-----------+-------------+\n", "only showing top 10 rows\n", "\n" ] }, { "data": { "text/plain": [ "21293" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "bakery_schema = StructType([\n", " StructField('date', StringType(), True),\n", " StructField('time', StringType(), True),\n", " StructField('transaction', IntegerType(), True),\n", " StructField('item', StringType(), True)\n", "])\n", "\n", "df_bakery1 = spark.read \\\n", " .format(\"csv\") \\\n", " .option(\"header\", \"true\") \\\n", " .load(\"BreadBasket_DMS.csv\", schema=bakery_schema)\n", "\n", "df_bakery1.show(10)\n", "df_bakery1.count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Transform Data with Spark SQL\n", "Transform the DataFrame's bakery data using Spark SQL" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+-----------+-------------+\n", "| date|transaction| item|\n", "+----------+-----------+-------------+\n", "|2016-10-30| 1| Bread|\n", "|2016-10-30| 2| Scandinavian|\n", "|2016-10-30| 2| Scandinavian|\n", "|2016-10-30| 3|Hot chocolate|\n", "|2016-10-30| 3| Jam|\n", "+----------+-----------+-------------+\n", "only showing top 5 rows\n", "\n" ] }, { "data": { "text/plain": [ "20507" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df_bakery1.createOrReplaceTempView(\"bakery_table_tmp1\")\n", "\n", "df_bakery2 = spark.sql(\"SELECT date, transaction, item \" +\n", " \"FROM bakery_table_tmp1 \" +\n", " \"WHERE item NOT LIKE 'NONE'\" +\n", " \"ORDER BY transaction\")\n", "df_bakery2.show(5)\n", "df_bakery2.count()" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+-----+\n", "| date|count|\n", "+----------+-----+\n", "|2017-01-01| 1|\n", "|2017-01-03| 87|\n", "|2017-01-04| 76|\n", "|2017-01-05| 95|\n", "|2017-01-06| 84|\n", "+----------+-----+\n", "only showing top 5 rows\n", "\n" ] }, { "data": { "text/plain": [ "98" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df_bakery2.createOrReplaceTempView(\"bakery_table_tmp2\")\n", "\n", "df_bakery3 = spark.sql(\"SELECT date, count(*) as count \" +\n", " \"FROM bakery_table_tmp2 \" +\n", " \"WHERE date >= '2017-01-01' \" +\n", " \"GROUP BY date \" +\n", " \"ORDER BY date\")\n", "df_bakery3.show(5)\n", "df_bakery3.count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Use Plotly to Visualize Data\n", "Use [Plotly](https://plot.ly/python/) to create a chart showing bakery items sold over time \n", "Demostrates linear fit and data smoothing:\n", "* [Plotly Python Open Source Graphing Library](https://plot.ly/python/)\n", "* [Smoothing in Python](https://plot.ly/python/smoothing/)\n", "* [Linear Fit in Python](https://plot.ly/python/linear-fits/)" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "! pip install plotly chart-studio --upgrade --quiet" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "import chart_studio.tools\n", "import chart_studio.plotly as py\n", "import plotly.graph_objs as go\n", "from numpy import arange\n", "from scipy import stats, signal\n", "import warnings\n", "warnings.filterwarnings('ignore')" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "# *** UPDATE WITH YOUR CREDENTIALS FOR PLOTLY ***\n", "chart_studio.tools.set_credentials_file(username='username_goes_here', api_key='api_key_goes_here')" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", " \n", " " ], "text/plain": [ "" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df_bakery4 = df_bakery3.toPandas()\n", "\n", "# Generated linear fit\n", "xi = arange(0, len(df_bakery4.index))\n", "slope, intercept, r_value, p_value, std_err = stats.linregress(xi, df_bakery4['count'])\n", "line = slope * xi + intercept\n", "\n", "layout = dict(title='2017 Bakery Sales',\n", " xaxis=dict(\n", " title='Month',\n", " showgrid=True,\n", " zeroline=True,\n", " showline=True,\n", " ticks='outside',\n", " tickangle=45,\n", " showticklabels=True),\n", " yaxis=dict(\n", " title='Items Sold/Day',\n", " showgrid=True,\n", " zeroline=True,\n", " showline=True,\n", " ticks='outside',\n", " showticklabels=True))\n", "\n", "trace1 = go.Bar(x=df_bakery4['date'], y=df_bakery4['count'], name='Items Sold')\n", "trace2 = go.Scatter(x=df_bakery4['date'], y=line, mode='lines', name='Linear Fit')\n", "trace3 = go.Scatter(x=df_bakery4['date'], y=signal.savgol_filter(df_bakery4['count'], 53, 3),\n", " mode='lines', name='Savitzky-Golay')\n", "data = [trace1, trace2, trace3]\n", "fig = dict(data=data, layout=layout)\n", "py.iplot(fig, filename='jupyter-basic_bar.html')" ] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.5" }, "pycharm": { "stem_cell": { "cell_type": "raw", "metadata": { "collapsed": false }, "source": [] } } }, "nbformat": 4, "nbformat_minor": 4 }