{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# PySpark Demo Notebook 4\n", "\n", "## Contents\n", "\n", "1. [Read CSV-Format File](#Read-CSV-Format-File)\n", "2. [Run PostgreSQL Script](#Run-PostgreSQL-Script)\n", "3. [Load PostgreSQL Data](#Run-PostgreSQL-Script)\n", "4. [Create a New Record](#Create-a-New-Record)\n", "5. [Append Record to Database Table](#Append-Record-to-Database-Table)\n", "6. [Overwrite Data to Database Table](#Overwrite-Data-to-Database-Table)\n", "7. [Analyze and Graph Data with BokehJS](#Analyze-and-Graph-Data-with-BokehJS)\n", "9. [Read and Write Data to Parquet](#Read-and-Write-Data-to-Parquet)\n", "\n", "## Background\n", "\n", "_Prepared by: [Gary A. Stafford](https://twitter.com/GaryStafford) \n", "Associated article: [Getting Started with Data Analytics using Jupyter Notebooks, PySpark, and Docker](https://wp.me/p1RD28-6Fj)_" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Read CSV-Format File\n", "Read CSV-format data file into a Spark DataFrame." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "ExecuteTime": { "end_time": "2019-12-06T03:45:29.626016Z", "start_time": "2019-12-06T03:45:29.416677Z" }, "pycharm": { "is_executing": false } }, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "from pyspark.sql.types import StructType, StructField, StringType, IntegerType" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "ExecuteTime": { "end_time": "2019-12-06T03:45:34.343019Z", "start_time": "2019-12-06T03:45:29.628615Z" }, "pycharm": { "is_executing": false } }, "outputs": [], "source": [ "spark = SparkSession \\\n", " .builder \\\n", " .appName('04_notebook') \\\n", " .config('spark.driver.extraClassPath', 'postgresql-42.2.10.jar') \\\n", " .getOrCreate()" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "ExecuteTime": { "end_time": "2019-12-06T03:45:34.354352Z", "start_time": "2019-12-06T03:45:34.346118Z" }, "pycharm": { "is_executing": false } }, "outputs": [], "source": [ "bakery_schema = StructType([\n", " StructField('date', StringType(), True),\n", " StructField('time', StringType(), True),\n", " StructField('transaction', IntegerType(), True),\n", " StructField('item', StringType(), True)\n", "])" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "ExecuteTime": { "end_time": "2019-12-06T03:45:36.751894Z", "start_time": "2019-12-06T03:45:34.357930Z" }, "pycharm": { "is_executing": false } }, "outputs": [], "source": [ "df1 = spark.read \\\n", " .format('csv') \\\n", " .option('header', 'true') \\\n", " .load('BreadBasket_DMS.csv', schema=bakery_schema)" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "ExecuteTime": { "end_time": "2019-12-06T03:45:39.950006Z", "start_time": "2019-12-06T03:45:36.753623Z" }, "pycharm": { "is_executing": false } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "DataFrame rows: 21293\n", "DataFrame schema: DataFrame[date: string, time: string, transaction: int, item: string]\n", "+----------+--------+-----------+-------------+\n", "|date |time |transaction|item |\n", "+----------+--------+-----------+-------------+\n", "|2016-10-30|09:58:11|1 |Bread |\n", "|2016-10-30|10:05:34|2 |Scandinavian |\n", "|2016-10-30|10:05:34|2 |Scandinavian |\n", "|2016-10-30|10:07:57|3 |Hot chocolate|\n", "|2016-10-30|10:07:57|3 |Jam |\n", "|2016-10-30|10:07:57|3 |Cookies |\n", "|2016-10-30|10:08:41|4 |Muffin |\n", "|2016-10-30|10:13:03|5 |Coffee |\n", "|2016-10-30|10:13:03|5 |Pastry |\n", "|2016-10-30|10:13:03|5 |Bread |\n", "+----------+--------+-----------+-------------+\n", "only showing top 10 rows\n", "\n" ] } ], "source": [ "print('DataFrame rows: %d' % df1.count())\n", "print('DataFrame schema: %s' % df1)\n", "df1.show(10, False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Run PostgreSQL Script\n", "Run the sql script to create the database schema and import data from CSV file." ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "ExecuteTime": { "end_time": "2019-12-06T03:45:40.268164Z", "start_time": "2019-12-06T03:45:39.954014Z" }, "pycharm": { "is_executing": false }, "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "DROP TABLE IF EXISTS \"transactions\"\n", "\n", "DROP SEQUENCE IF EXISTS transactions_id_seq\n", "\n", "CREATE SEQUENCE transactions_id_seq INCREMENT 1 MINVALUE 1 MAXVALUE 2147483647 START 1 CACHE 1\n", "\n", "\n", "CREATE TABLE \"public\".\"transactions\"\n", "(\n", " \"id\" integer DEFAULT nextval('transactions_id_seq') NOT NULL,\n", " \"date\" character varying(10) NOT NULL,\n", " \"time\" character varying(8) NOT NULL,\n", " \"transaction\" integer NOT NULL,\n", " \"item\" character varying(50) NOT NULL\n", ") WITH (oids = false)\n", "\n", "Row count: 21293\n" ] } ], "source": [ "%run -i '03_load_sql.py'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Load PostgreSQL Data\n", "Load the PostgreSQL 'transactions' table's contents into a Spark DataFrame." ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "ExecuteTime": { "end_time": "2019-12-06T03:45:40.276617Z", "start_time": "2019-12-06T03:45:40.270872Z" }, "pycharm": { "is_executing": false } }, "outputs": [], "source": [ "properties = {\n", " 'driver': 'org.postgresql.Driver',\n", " 'url': 'jdbc:postgresql://postgres:5432/bakery',\n", " 'user': 'postgres',\n", " 'password': 'postgres1234',\n", " 'dbtable': 'transactions',\n", "}" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "ExecuteTime": { "end_time": "2019-12-06T03:45:40.600010Z", "start_time": "2019-12-06T03:45:40.278813Z" }, "pycharm": { "is_executing": false } }, "outputs": [], "source": [ "df2 = spark.read \\\n", " .format('jdbc') \\\n", " .option('driver', properties['driver']) \\\n", " .option('url', properties['url']) \\\n", " .option('user', properties['user']) \\\n", " .option('password', properties['password']) \\\n", " .option('dbtable', properties['dbtable']) \\\n", " .load()" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "ExecuteTime": { "end_time": "2019-12-06T03:45:40.979256Z", "start_time": "2019-12-06T03:45:40.602206Z" }, "pycharm": { "is_executing": false }, "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "DataFrame rows: 21293\n", "DataFrame schema: DataFrame[date: string, time: string, transaction: int, item: string]\n", "+---+----------+--------+-----------+-------------+\n", "|id |date |time |transaction|item |\n", "+---+----------+--------+-----------+-------------+\n", "|1 |2016-10-30|09:58:11|1 |Bread |\n", "|2 |2016-10-30|10:05:34|2 |Scandinavian |\n", "|3 |2016-10-30|10:05:34|2 |Scandinavian |\n", "|4 |2016-10-30|10:07:57|3 |Hot chocolate|\n", "|5 |2016-10-30|10:07:57|3 |Jam |\n", "|6 |2016-10-30|10:07:57|3 |Cookies |\n", "|7 |2016-10-30|10:08:41|4 |Muffin |\n", "|8 |2016-10-30|10:13:03|5 |Coffee |\n", "|9 |2016-10-30|10:13:03|5 |Pastry |\n", "|10 |2016-10-30|10:13:03|5 |Bread |\n", "+---+----------+--------+-----------+-------------+\n", "only showing top 10 rows\n", "\n" ] } ], "source": [ "print('DataFrame rows: %d' % df1.count())\n", "print('DataFrame schema: %s' % df1)\n", "df2.show(10, False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create a New Record\n", "Create a new bakery record and load into a Spark DataFrame." ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "ExecuteTime": { "end_time": "2019-12-06T03:45:41.359691Z", "start_time": "2019-12-06T03:45:40.980775Z" }, "pycharm": { "is_executing": false } }, "outputs": [], "source": [ "data = [('2016-10-30', '10:13:27', 2, 'Pastry')]\n", "df3 = spark.createDataFrame(data, bakery_schema)" ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "ExecuteTime": { "end_time": "2019-12-06T03:45:42.269428Z", "start_time": "2019-12-06T03:45:41.363867Z" }, "pycharm": { "is_executing": false } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "DataFrame rows: 1\n", "DataFrame schema: DataFrame[date: string, time: string, transaction: int, item: string]\n", "+----------+--------+-----------+------+\n", "|date |time |transaction|item |\n", "+----------+--------+-----------+------+\n", "|2016-10-30|10:13:27|2 |Pastry|\n", "+----------+--------+-----------+------+\n", "\n" ] } ], "source": [ "print('DataFrame rows: %d' % df3.count())\n", "print('DataFrame schema: %s' % df3)\n", "df3.show(10, False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Append Record to Database Table\n", "Append the contents of the DataFrame to the bakery PostgreSQL database's 'transactions' table." ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "ExecuteTime": { "end_time": "2019-12-06T03:45:42.600551Z", "start_time": "2019-12-06T03:45:42.270651Z" }, "pycharm": { "is_executing": false } }, "outputs": [], "source": [ "df3.write \\\n", " .format('jdbc') \\\n", " .option('driver', properties['driver']) \\\n", " .option('url', properties['url']) \\\n", " .option('user', properties['user']) \\\n", " .option('password', properties['password']) \\\n", " .option('dbtable', properties['dbtable']) \\\n", " .mode('append') \\\n", " .save()" ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "ExecuteTime": { "end_time": "2019-12-06T03:45:42.748246Z", "start_time": "2019-12-06T03:45:42.602245Z" }, "pycharm": { "is_executing": false } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "DataFrame rows: 21294\n" ] } ], "source": [ "# should now contain one additional row of data\n", "print('DataFrame rows: %d' % df2.count())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Overwrite Data to Database Table\n", "Overwrite the contents of the CSV file-based DataFrame to the 'transactions' table." ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "ExecuteTime": { "end_time": "2019-12-06T03:45:44.441462Z", "start_time": "2019-12-06T03:45:42.750237Z" }, "pycharm": { "is_executing": false } }, "outputs": [], "source": [ "df1.write \\\n", " .format('jdbc') \\\n", " .option('driver', properties['driver']) \\\n", " .option('url', properties['url']) \\\n", " .option('user', properties['user']) \\\n", " .option('password', properties['password']) \\\n", " .option('dbtable', properties['dbtable']) \\\n", " .option('truncate', 'true') \\\n", " .mode('overwrite') \\\n", " .save()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Analyze and Graph Data with BokehJS\n", "Perform some simple analysis of the bakery data and plot the results with [BokehJS](https://docs.bokeh.org/en/latest/index.html).\n", "### Business Questions\n", "1. What are the busiest days of the week?\n", "2. What are the busiest times of the day?\n", "3. What are the top selling bakery items?\n", "4. How many items do customers usually buy?" ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "ExecuteTime": { "end_time": "2019-12-06T03:45:44.682664Z", "start_time": "2019-12-06T03:45:44.445670Z" }, "pycharm": { "is_executing": false } }, "outputs": [ { "data": { "text/html": [ "\n", "
" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "application/javascript": [ "\n", "(function(root) {\n", " function now() {\n", " return new Date();\n", " }\n", "\n", " var force = true;\n", "\n", " if (typeof root._bokeh_onload_callbacks === \"undefined\" || force === true) {\n", " root._bokeh_onload_callbacks = [];\n", " root._bokeh_is_loading = undefined;\n", " }\n", "\n", " var JS_MIME_TYPE = 'application/javascript';\n", " var HTML_MIME_TYPE = 'text/html';\n", " var EXEC_MIME_TYPE = 'application/vnd.bokehjs_exec.v0+json';\n", " var CLASS_NAME = 'output_bokeh rendered_html';\n", "\n", " /**\n", " * Render data to the DOM node\n", " */\n", " function render(props, node) {\n", " var script = document.createElement(\"script\");\n", " node.appendChild(script);\n", " }\n", "\n", " /**\n", " * Handle when an output is cleared or removed\n", " */\n", " function handleClearOutput(event, handle) {\n", " var cell = handle.cell;\n", "\n", " var id = cell.output_area._bokeh_element_id;\n", " var server_id = cell.output_area._bokeh_server_id;\n", " // Clean up Bokeh references\n", " if (id != null && id in Bokeh.index) {\n", " Bokeh.index[id].model.document.clear();\n", " delete Bokeh.index[id];\n", " }\n", "\n", " if (server_id !== undefined) {\n", " // Clean up Bokeh references\n", " var cmd = \"from bokeh.io.state import curstate; print(curstate().uuid_to_server['\" + server_id + \"'].get_sessions()[0].document.roots[0]._id)\";\n", " cell.notebook.kernel.execute(cmd, {\n", " iopub: {\n", " output: function(msg) {\n", " var id = msg.content.text.trim();\n", " if (id in Bokeh.index) {\n", " Bokeh.index[id].model.document.clear();\n", " delete Bokeh.index[id];\n", " }\n", " }\n", " }\n", " });\n", " // Destroy server and session\n", " var cmd = \"import bokeh.io.notebook as ion; ion.destroy_server('\" + server_id + \"')\";\n", " cell.notebook.kernel.execute(cmd);\n", " }\n", " }\n", "\n", " /**\n", " * Handle when a new output is added\n", " */\n", " function handleAddOutput(event, handle) {\n", " var output_area = handle.output_area;\n", " var output = handle.output;\n", "\n", " // limit handleAddOutput to display_data with EXEC_MIME_TYPE content only\n", " if ((output.output_type != \"display_data\") || (!output.data.hasOwnProperty(EXEC_MIME_TYPE))) {\n", " return\n", " }\n", "\n", " var toinsert = output_area.element.find(\".\" + CLASS_NAME.split(' ')[0]);\n", "\n", " if (output.metadata[EXEC_MIME_TYPE][\"id\"] !== undefined) {\n", " toinsert[toinsert.length - 1].firstChild.textContent = output.data[JS_MIME_TYPE];\n", " // store reference to embed id on output_area\n", " output_area._bokeh_element_id = output.metadata[EXEC_MIME_TYPE][\"id\"];\n", " }\n", " if (output.metadata[EXEC_MIME_TYPE][\"server_id\"] !== undefined) {\n", " var bk_div = document.createElement(\"div\");\n", " bk_div.innerHTML = output.data[HTML_MIME_TYPE];\n", " var script_attrs = bk_div.children[0].attributes;\n", " for (var i = 0; i < script_attrs.length; i++) {\n", " toinsert[toinsert.length - 1].firstChild.setAttribute(script_attrs[i].name, script_attrs[i].value);\n", " }\n", " // store reference to server id on output_area\n", " output_area._bokeh_server_id = output.metadata[EXEC_MIME_TYPE][\"server_id\"];\n", " }\n", " }\n", "\n", " function register_renderer(events, OutputArea) {\n", "\n", " function append_mime(data, metadata, element) {\n", " // create a DOM node to render to\n", " var toinsert = this.create_output_subarea(\n", " metadata,\n", " CLASS_NAME,\n", " EXEC_MIME_TYPE\n", " );\n", " this.keyboard_manager.register_events(toinsert);\n", " // Render to node\n", " var props = {data: data, metadata: metadata[EXEC_MIME_TYPE]};\n", " render(props, toinsert[toinsert.length - 1]);\n", " element.append(toinsert);\n", " return toinsert\n", " }\n", "\n", " /* Handle when an output is cleared or removed */\n", " events.on('clear_output.CodeCell', handleClearOutput);\n", " events.on('delete.Cell', handleClearOutput);\n", "\n", " /* Handle when a new output is added */\n", " events.on('output_added.OutputArea', handleAddOutput);\n", "\n", " /**\n", " * Register the mime type and append_mime function with output_area\n", " */\n", " OutputArea.prototype.register_mime_type(EXEC_MIME_TYPE, append_mime, {\n", " /* Is output safe? */\n", " safe: true,\n", " /* Index of renderer in `output_area.display_order` */\n", " index: 0\n", " });\n", " }\n", "\n", " // register the mime type if in Jupyter Notebook environment and previously unregistered\n", " if (root.Jupyter !== undefined) {\n", " var events = require('base/js/events');\n", " var OutputArea = require('notebook/js/outputarea').OutputArea;\n", "\n", " if (OutputArea.prototype.mime_types().indexOf(EXEC_MIME_TYPE) == -1) {\n", " register_renderer(events, OutputArea);\n", " }\n", " }\n", "\n", " \n", " if (typeof (root._bokeh_timeout) === \"undefined\" || force === true) {\n", " root._bokeh_timeout = Date.now() + 5000;\n", " root._bokeh_failed_load = false;\n", " }\n", "\n", " var NB_LOAD_WARNING = {'data': {'text/html':\n", " \"\\n\"+\n", " \"BokehJS does not appear to have successfully loaded. If loading BokehJS from CDN, this \\n\"+\n", " \"may be due to a slow or bad network connection. Possible fixes:\\n\"+\n", " \"
\\n\"+\n", " \"\\n\"+\n",
" \"from bokeh.resources import INLINE\\n\"+\n",
" \"output_notebook(resources=INLINE)\\n\"+\n",
" \"
\\n\"+\n",
" \"\\n\"+\n \"BokehJS does not appear to have successfully loaded. If loading BokehJS from CDN, this \\n\"+\n \"may be due to a slow or bad network connection. Possible fixes:\\n\"+\n \"
\\n\"+\n \"\\n\"+\n \"from bokeh.resources import INLINE\\n\"+\n \"output_notebook(resources=INLINE)\\n\"+\n \"
\\n\"+\n \"