{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# PySpark Demo Notebook\n", "## Demo\n", "1. Run PostgreSQL Script\n", "2. Load PostgreSQL Data\n", "3. Create New Record\n", "4. Write New Record to PostgreSQL Table\n", "5. Load CSV Data File\n", "6. Write Data to PostgreSQL\n", "7. Analyze Data with Spark SQL\n", "8. Graph Data with BokehJS\n", "9. Read and Write Data to Parquet Format\n", "\n", "_Prepared by: [Gary A. Stafford](https://twitter.com/GaryStafford) \n", "Associated article: https://wp.me/p1RD28-61V_" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Run PostgreSQL Script\n", "Run the PostgreSQL sql script" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "pycharm": { "is_executing": false } }, "outputs": [], "source": [ "! pip install psycopg2-binary --upgrade --quiet" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "pycharm": { "is_executing": false } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "DROP TABLE IF EXISTS \"bakery_basket\"\n", "\n", "DROP SEQUENCE IF EXISTS bakery_basket_id_seq\n", "\n", "CREATE SEQUENCE bakery_basket_id_seq INCREMENT 1 MINVALUE 1 MAXVALUE 2147483647 START 1 CACHE 1\n", "\n", "\n", "CREATE TABLE \"public\".\"bakery_basket\"\n", "(\n", " \"id\" integer DEFAULT nextval('bakery_basket_id_seq') NOT NULL,\n", " \"date\" character varying(10) NOT NULL,\n", " \"time\" character varying(8) NOT NULL,\n", " \"transaction\" integer NOT NULL,\n", " \"item\" character varying(50) NOT NULL\n", ") WITH (oids = false)\n", "\n", "\n", "INSERT INTO \"bakery_basket\" (\"date\", \"time\", \"transaction\", \"item\", \"id\")\n", "VALUES ('2016-10-30', '09:58:11', 1, 'Bread', 1),\n", " ('2016-10-30', '10:05:34', 2, 'Scandinavian', 2),\n", " ('2016-10-30', '10:07:57', 3, 'Hot chocolate', 3)\n", "\n", "\n" ] } ], "source": [ "%run -i '03_load_sql.py'" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "pycharm": { "is_executing": false } }, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "from pyspark.sql.functions import to_timestamp\n", "from pyspark.sql.types import StructType, StructField, StringType, IntegerType" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "pycharm": { "is_executing": false } }, "outputs": [], "source": [ "spark = SparkSession \\\n", " .builder \\\n", " .appName('pyspark_demo_app') \\\n", " .config('spark.driver.extraClassPath',\n", " 'postgresql-42.2.10.jar') \\\n", " .master(\"local[*]\") \\\n", " .getOrCreate()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Load PostgreSQL Data\n", "Load the PostgreSQL 'bakery_basket' table's contents into a DataFrame" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "pycharm": { "is_executing": false } }, "outputs": [], "source": [ "properties = {\n", " 'driver': 'org.postgresql.Driver',\n", " 'url': 'jdbc:postgresql://postgres:5432/demo',\n", " 'user': 'postgres',\n", " 'password': 'postgres1234',\n", " 'dbtable': 'bakery_basket',\n", "}\n", "\n", "df1 = spark.read \\\n", " .format('jdbc') \\\n", " .option('driver', properties['driver']) \\\n", " .option('url', properties['url']) \\\n", " .option('user', properties['user']) \\\n", " .option('password', properties['password']) \\\n", " .option('dbtable', properties['dbtable']) \\\n", " .load()" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "pycharm": { "is_executing": true }, "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+----------+--------+-----------+-------------+\n", "| id| date| time|transaction| item|\n", "+---+----------+--------+-----------+-------------+\n", "| 1|2016-10-30|09:58:11| 1| Bread|\n", "| 2|2016-10-30|10:05:34| 2| Scandinavian|\n", "| 3|2016-10-30|10:07:57| 3|Hot chocolate|\n", "+---+----------+--------+-----------+-------------+\n", "\n", "CPU times: user 0 ns, sys: 0 ns, total: 0 ns\n", "Wall time: 3.98 s\n" ] } ], "source": [ "%%time\n", "df1.show(10)\n", "df1.count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create New Record\n", "Create a new bakery record and load into a DataFrame" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "pycharm": { "is_executing": false } }, "outputs": [], "source": [ "data = [('2016-10-30', '10:13:27', 2, 'Pastry')]\n", "\n", "bakery_schema = StructType([\n", " StructField('date', StringType(), True),\n", " StructField('time', StringType(), True),\n", " StructField('transaction', IntegerType(), True),\n", " StructField('item', StringType(), True)\n", "])\n", "\n", "df2 = spark.createDataFrame(data, bakery_schema)" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+--------+-----------+------+\n", "| date| time|transaction| item|\n", "+----------+--------+-----------+------+\n", "|2016-10-30|10:13:27| 2|Pastry|\n", "+----------+--------+-----------+------+\n", "\n" ] }, { "data": { "text/plain": [ "1" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df2.show()\n", "df2.count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Write New Record to PostgreSQL Table\n", "Append the contents of the DataFrame to the PostgreSQL 'bakery_basket' table" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "df2.write \\\n", " .format('jdbc') \\\n", " .option('driver', properties['driver']) \\\n", " .option('url', properties['url']) \\\n", " .option('user', properties['user']) \\\n", " .option('password', properties['password']) \\\n", " .option('dbtable', properties['dbtable']) \\\n", " .mode('append') \\\n", " .save()" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+----------+--------+-----------+-------------+\n", "| id| date| time|transaction| item|\n", "+---+----------+--------+-----------+-------------+\n", "| 1|2016-10-30|09:58:11| 1| Bread|\n", "| 2|2016-10-30|10:05:34| 2| Scandinavian|\n", "| 3|2016-10-30|10:07:57| 3|Hot chocolate|\n", "| 1|2016-10-30|10:13:27| 2| Pastry|\n", "+---+----------+--------+-----------+-------------+\n", "\n" ] }, { "data": { "text/plain": [ "4" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df1.show(10)\n", "df1.count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Load CSV File Data\n", "Load the Kaggle dataset from the CSV file, containing ~21K records, into a DataFrame" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "-rw-r--r-- 1 garystafford users 694K Nov 10 2018 BreadBasket_DMS.csv\r\n" ] } ], "source": [ "! ls -lh *.csv" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "df3 = spark.read \\\n", " .format(\"csv\") \\\n", " .option(\"header\", \"true\") \\\n", " .load(\"BreadBasket_DMS.csv\", schema=bakery_schema)" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+--------+-----------+-------------+\n", "| date| time|transaction| item|\n", "+----------+--------+-----------+-------------+\n", "|2016-10-30|09:58:11| 1| Bread|\n", "|2016-10-30|10:05:34| 2| Scandinavian|\n", "|2016-10-30|10:05:34| 2| Scandinavian|\n", "|2016-10-30|10:07:57| 3|Hot chocolate|\n", "|2016-10-30|10:07:57| 3| Jam|\n", "|2016-10-30|10:07:57| 3| Cookies|\n", "|2016-10-30|10:08:41| 4| Muffin|\n", "|2016-10-30|10:13:03| 5| Coffee|\n", "|2016-10-30|10:13:03| 5| Pastry|\n", "|2016-10-30|10:13:03| 5| Bread|\n", "+----------+--------+-----------+-------------+\n", "only showing top 10 rows\n", "\n" ] }, { "data": { "text/plain": [ "21293" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df3.show(10)\n", "df3.count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Write Data to PostgreSQL\n", "Append the contents of the DataFrame to the PostgreSQL 'bakery_basket' table" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "df3.write \\\n", " .format('jdbc') \\\n", " .option('driver', properties['driver']) \\\n", " .option('url', properties['url']) \\\n", " .option('user', properties['user']) \\\n", " .option('password', properties['password']) \\\n", " .option('dbtable', properties['dbtable']) \\\n", " .mode('append') \\\n", " .save()" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+----------+--------+-----------+-------------+\n", "| id| date| time|transaction| item|\n", "+---+----------+--------+-----------+-------------+\n", "| 1|2016-10-30|09:58:11| 1| Bread|\n", "| 2|2016-10-30|10:05:34| 2| Scandinavian|\n", "| 3|2016-10-30|10:07:57| 3|Hot chocolate|\n", "| 1|2016-10-30|10:13:27| 2| Pastry|\n", "| 2|2016-10-30|09:58:11| 1| Bread|\n", "| 3|2016-10-30|10:05:34| 2| Scandinavian|\n", "| 4|2016-10-30|10:05:34| 2| Scandinavian|\n", "| 5|2016-10-30|10:07:57| 3|Hot chocolate|\n", "| 6|2016-10-30|10:07:57| 3| Jam|\n", "| 7|2016-10-30|10:07:57| 3| Cookies|\n", "+---+----------+--------+-----------+-------------+\n", "only showing top 10 rows\n", "\n" ] }, { "data": { "text/plain": [ "21297" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df1.show(10)\n", "df1.count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Analyze Data with Spark SQL\n", "Analyze the DataFrame's bakery data using Spark SQL" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+----------+--------+-----------+-------------+\n", "| id| date| time|transaction| item|\n", "+---+----------+--------+-----------+-------------+\n", "| 1|2016-10-30|09:58:11| 1| Bread|\n", "| 2|2016-10-30|09:58:11| 1| Bread|\n", "| 2|2016-10-30|10:05:34| 2| Scandinavian|\n", "| 3|2016-10-30|10:05:34| 2| Scandinavian|\n", "| 4|2016-10-30|10:05:34| 2| Scandinavian|\n", "| 1|2016-10-30|10:13:27| 2| Pastry|\n", "| 3|2016-10-30|10:07:57| 3|Hot chocolate|\n", "| 6|2016-10-30|10:07:57| 3| Jam|\n", "| 5|2016-10-30|10:07:57| 3|Hot chocolate|\n", "| 7|2016-10-30|10:07:57| 3| Cookies|\n", "+---+----------+--------+-----------+-------------+\n", "only showing top 10 rows\n", "\n" ] }, { "data": { "text/plain": [ "21297" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df1.createOrReplaceTempView(\"bakery_table\")\n", "df4 = spark.sql(\"SELECT * FROM bakery_table \" +\n", " \"ORDER BY transaction, date, time\")\n", "df4.show(10)\n", "df4.count()" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+\n", "|item_count|\n", "+----------+\n", "| 95|\n", "+----------+\n", "\n", "+-------------+-----+\n", "| item|count|\n", "+-------------+-----+\n", "| Coffee| 5471|\n", "| Bread| 3326|\n", "| Tea| 1435|\n", "| Cake| 1025|\n", "| Pastry| 857|\n", "| Sandwich| 771|\n", df5 = spark.sql("SELECT COUNT(DISTINCT item) AS item_count FROM bakery_table")
df5.show()

df5 = spark.sql("SELECT item, count(*) as count " +
                "FROM bakery_table " +
                "WHERE item NOT LIKE 'NONE' " +
                "GROUP BY item ORDER BY count DESC " +
                "LIMIT 10")
df5.show()
var docs_json = {"2afb794f-f7a6-4ec0-960b-b5475357da25":{"roots":{"references":[{"attributes":{"below":[{"id":"1013","type":"CategoricalAxis"}],"left":[{"id":"1017","type":"LinearAxis"}],"min_border":0,"plot_height":375,"plot_width":750,"renderers":[{"id":"1013","type":"CategoricalAxis"},{"id":"1016","type":"Grid"},{"id":"1017","type":"LinearAxis"},{"id":"1021","type":"Grid"},{"id":"1031","type":"BoxAnnotation"},{"id":"1042","type":"GlyphRenderer"}],"title":{"id":"1044","type":"Title"},"toolbar":{"id":"1029","type":"Toolbar"},"x_range":{"id":"1005","type":"FactorRange"},"x_scale":{"id":"1009","type":"CategoricalScale"},"y_range":{"id":"1007","type":"DataRange1d"},"y_scale":{"id":"1011","type":"LinearScale"}},"id":"1004","subtype":"Figure","type":"Plot"}],"root_ids":["1004"]},"title":"Bokeh Application","version":"1.0.4"}};
    var render_items = [{"docid":"2afb794f-f7a6-4ec0-960b-b5475357da25","roots":{"1004":"58be55e2-e250-4746-ad48-a75461200f88"}}]; Items Sold\",\"formatter\":{\"id\":\"1050\",\"type\":\"BasicTickFormatter\"},\"plot\":{\"id\":\"1004\",\"subtype\":\"Figure\",\"type\":\"Plot\"},\"ticker\":{\"id\":\"1018\",\"type\":\"BasicTicker\"}},\"id\":\"1017\",\"type\":\"LinearAxis\"},{\"attributes\":{\"callback\":null},\"id\":\"1007\",\"type\":\"DataRange1d\"},{\"attributes\":{},\"id\":\"1018\",\"type\":\"BasicTicker\"},{\"attributes\":{\"dimension\":1,\"plot\":{\"id\":\"1004\",\"subtype\":\"Figure\",\"type\":\"Plot\"},\"ticker\":{\"id\":\"1018\",\"type\":\"BasicTicker\"}},\"id\":\"1021\",\"type\":\"Grid\"},{\"attributes\":{\"active_drag\":\"auto\",\"active_inspect\":\"auto\",\"active_multi\":null,\"active_scroll\":\"auto\",\"active_tap\":\"auto\",\"tools\":[{\"id\":\"1022\",\"type\":\"PanTool\"},{\"id\":\"1023\",\"type\":\"WheelZoomTool\"},{\"id\":\"1024\",\"type\":\"BoxZoomTool\"},{\"id\":\"1025\",\"type\":\"SaveTool\"},{\"id\":\"1026\",\"type\":\"ResetTool\"},{\"id\":\"1027\",\"type\":\"HelpTool\"},{\"id\":\"1028\",\"type\":\"HoverTool\"}]},\"id\":\"1029\",\"type\":\"Toolbar\"},{\"attributes\":{},\"id\":\"1050\",\"type\":\"BasicTickFormatter\"},{\"attributes\":{},\"id\":\"1052\",\"type\":\"Selection\"},{\"attributes\":{\"data_source\":{\"id\":\"1002\",\"type\":\"ColumnDataSource\"},\"glyph\":{\"id\":\"1040\",\"type\":\"VBar\"},\"hover_glyph\":null,\"muted_glyph\":null,\"nonselection_glyph\":{\"id\":\"1041\",\"type\":\"VBar\"},\"selection_glyph\":null,\"view\":{\"id\":\"1043\",\"type\":\"CDSView\"}},\"id\":\"1042\",\"type\":\"GlyphRenderer\"},{\"attributes\":{},\"id\":\"1022\",\"type\":\"PanTool\"},{\"attributes\":{\"source\":{\"id\":\"1002\",\"type\":\"ColumnDataSource\"}},\"id\":\"1043\",\"type\":\"CDSView\"},{\"attributes\":{},\"id\":\"1023\",\"type\":\"WheelZoomTool\"},{\"attributes\":{\"plot\":null,\"text\":\"Top 10 Bakery Items\"},\"id\":\"1044\",\"type\":\"Title\"},{\"attributes\":{\"overlay\":{\"id\":\"1031\",\"type\":\"BoxAnnotation\"}},\"id\":\"1024\",\"type\":\"BoxZoomTool\"},{\"attributes\":{\"callback\":null,\"data\":{\"count\":[5471,3326,1435,1025,857,771,616,591,540,379],\"index\":[0,1,2,3,4,5,6,7,8,9],\"item\":[\"Coffee\",\"Bread\",\"Tea\",\"Cake\",\"Pastry\",\"Sandwich\",\"Medialuna\",\"Hot chocolate\",\"Cookies\",\"Brownie\"]},\"selected\":{\"id\":\"1052\",\"type\":\"Selection\"},\"selection_policy\":{\"id\":\"1053\",\"type\":\"UnionRenderers\"}},\"id\":\"1002\",\"type\":\"ColumnDataSource\"},{\"attributes\":{},\"id\":\"1025\",\"type\":\"SaveTool\"},{\"attributes\":{\"fill_alpha\":{\"value\":0.1},\"fill_color\":{\"value\":\"#1f77b4\"},\"line_alpha\":{\"value\":0.1},\"line_color\":{\"value\":\"#1f77b4\"},\"top\":{\"field\":\"count\"},\"width\":{\"value\":0.9},\"x\":{\"field\":\"item\"}},\"id\":\"1041\",\"type\":\"VBar\"},{\"attributes\":{},\"id\":\"1026\",\"type\":\"ResetTool\"},{\"attributes\":{},\"id\":\"1048\",\"type\":\"CategoricalTickFormatter\"},{\"attributes\":{},\"id\":\"1027\",\"type\":\"HelpTool\"},{\"attributes\":{\"callback\":null,\"factors\":[\"Coffee\",\"Bread\",\"Tea\",\"Cake\",\"Pastry\",\"Sandwich\",\"Medialuna\",\"Hot chocolate\",\"Cookies\",\"Brownie\"]},\"id\":\"1005\",\"type\":\"FactorRange\"},{\"attributes\":{\"callback\":null,\"tooltips\":[[\"item\",\"@item\"],[\"count\",\"@{count}{,}\"]]},\"id\":\"1028\",\"type\":\"HoverTool\"},{\"attributes\":{\"fill_color\":{\"field\":\"item\",\"transform\":{\"id\":\"1003\",\"type\":\"CategoricalColorMapper\"}},\"line_color\":{\"value\":\"#1f77b4\"},\"top\":{\"field\":\"count\"},\"width\":{\"value\":0.9},\"x\":{\"field\":\"item\"}},\"id\":\"1040\",\"type\":\"VBar\"},{\"attributes\":{},\"id\":\"1009\",\"type\":\"CategoricalScale\"},{\"attributes\":{\"bottom_units\":\"screen\",\"fill_alpha\":{\"value\":0.5},\"fill_color\":{\"value\":\"lightgrey\"},\"left_units\":\"screen\",\"level\":\"overlay\",\"line_alpha\":{\"value\":1.0},\"line_color\":{\"value\":\"black\"},\"line_dash\":[4,4],\"line_width\":{\"value\":2},\"plot\":null,\"render_mode\":\"css\",\"right_units\":\"screen\",\"top_units\":\"screen\"},\"id\":\"1031\",\"type\":\"BoxAnnotation\"},{\"attributes\":{},\"id\":\"1011\",\"type\":\"LinearScale\"},{\"attributes\":{\"axis_label\":\"Bakery Items\",\"formatter\":{\"id\":\"1048\",\"type\":\"CategoricalTickFormatter\"},\"plot\":{\"id\":\"1004\",\"subtype\":\"Figure\",\"type\":\"Plot\"},\"ticker\":{\"id\":\"1014\",\"type\":\"CategoricalTicker\"}},\"id\":\"1013\",\"type\":\"CategoricalAxis\"},{\"attributes\":{},\"id\":\"1053\",\"type\":\"UnionRenderers\"},{\"attributes\":{\"factors\":[\"Coffee\",\"Bread\",\"Tea\",\"Cake\",\"Pastry\",\"Sandwich\",\"Medialuna\",\"Hot chocolate\",\"Cookies\",\"Brownie\"],\"palette\":[\"#a6cee3\",\"#1f78b4\",\"#b2df8a\",\"#33a02c\",\"#fb9a99\",\"#e31a1c\",\"#fdbf6f\",\"#ff7f00\",\"#cab2d6\",\"#6a3d9a\",\"#ffff99\",\"#b15928\"]},\"id\":\"1003\",\"type\":\"CategoricalColorMapper\"},{\"attributes\":{},\"id\":\"1014\",\"type\":\"CategoricalTicker\"}],\"root_ids\":[\"1004\"]},\"title\":\"Bokeh Application\",\"version\":\"1.0.4\"}};\n", " var render_items = [{\"docid\":\"2afb794f-f7a6-4ec0-960b-b5475357da25\",\"roots\":{\"1004\":\"58be55e2-e250-4746-ad48-a75461200f88\"}}];\n", " root.Bokeh.embed.embed_items_notebook(docs_json, render_items); ColumnDataSource(data=df5.toPandas())\n", "\n", "tooltips = [('item', '@item'), ('count', '@{count}{,}')]\n", "\n", "items = source.data['item'].tolist()\n", "color_map = factor_cmap(field_name='item', palette=Paired12, factors=items)\n", "plot = figure(x_range=items, plot_width=750, plot_height=375, min_border=0, tooltips=tooltips)\n", "plot.vbar(x='item', bottom=0, top='count', source=source, width=0.9, fill_color=color_map)\n", "plot.title.text = 'Top 10 Bakery Items'\n", "plot.xaxis.axis_label = 'Bakery Items'\n", "plot.yaxis.axis_label = 'Total Items Sold'\n", "\n", "show(plot)" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------------------+-----------+-------------+\n", "| timestamp|transaction| item|\n", "+-------------------+-----------+-------------+\n", "|2016-10-30 09:58:11| 1| Bread|\n", "|2016-10-30 09:58:11| 1| Bread|\n", "|2016-10-30 10:05:34| 2| Scandinavian|\n", "|2016-10-30 10:05:34| 2| Scandinavian|\n", "|2016-10-30 10:13:27| 2| Pastry|\n", "|2016-10-30 10:05:34| 2| Scandinavian|\n", "|2016-10-30 10:07:57| 3|Hot chocolate|\n", "|2016-10-30 10:07:57| 3| Jam|\n", "|2016-10-30 10:07:57| 3|Hot chocolate|\n", "|2016-10-30 10:07:57| 3| Cookies|\n", "+-------------------+-----------+-------------+\n", "only showing top 10 rows\n", "\n" ] }, { "data": { "text/plain": [ "20511" ] }, "execution_count": 19, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df6 = spark.sql(\"SELECT CONCAT(date,' ',time) as timestamp, transaction, item \" +\n", " \"FROM bakery_table \" +\n", " \"WHERE item NOT LIKE 'NONE'\" +\n", " \"ORDER BY transaction\"\n", " )\n", "df6.show(10)\n", "df6.count()" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- timestamp: timestamp (nullable = true)\n", " |-- transaction: integer (nullable = true)\n", " |-- item: string (nullable = true)\n", "\n", "+-------------------+-----------+-------------+\n", "| timestamp|transaction| item|\n", "+-------------------+-----------+-------------+\n", "|2016-10-30 09:58:11| 1| Bread|\n", "|2016-10-30 09:58:11| 1| Bread|\n", "|2016-10-30 10:05:34| 2| Scandinavian|\n", "|2016-10-30 10:05:34| 2| Scandinavian|\n", "|2016-10-30 10:13:27| 2| Pastry|\n", "|2016-10-30 10:05:34| 2| Scandinavian|\n", "|2016-10-30 10:07:57| 3|Hot chocolate|\n", "|2016-10-30 10:07:57| 3| Jam|\n", "|2016-10-30 10:07:57| 3|Hot chocolate|\n", "|2016-10-30 10:07:57| 3| Cookies|\n", "+-------------------+-----------+-------------+\n", "only showing top 10 rows\n", "\n" ] }, { "data": { "text/plain": [ "20511" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df7 = df6.withColumn('timestamp', to_timestamp(df6.timestamp, 'yyyy-MM-dd HH:mm:ss'))\n", "df7.printSchema()\n", "df7.show(10)\n", "df7.count()" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------------------+-----------+--------------+\n", "| timestamp|transaction| item|\n", "+-------------------+-----------+--------------+\n", "|2017-04-09 15:04:24| 9684| Smoothies|\n", "|2017-04-09 14:57:06| 9683| Pastry|\n", "|2017-04-09 14:57:06| 9683| Coffee|\n", "|2017-04-09 14:32:58| 9682| Tea|\n", "|2017-04-09 14:32:58| 9682| Tacos/Fajita|\n", "|2017-04-09 14:32:58| 9682| Muffin|\n", "|2017-04-09 14:32:58| 9682| Coffee|\n", "|2017-04-09 14:30:09| 9681|Spanish Brunch|\n", "|2017-04-09 14:30:09| 9681| Truffles|\n", "|2017-04-09 14:30:09| 9681| Tea|\n", "+-------------------+-----------+--------------+\n", "only showing top 10 rows\n", "\n" ] }, { "data": { "text/plain": [ "18888" ] }, "execution_count": 21, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df7.createOrReplaceTempView(\"bakery_table\")\n", "df8 = spark.sql(\"SELECT DISTINCT * \" +\n", " \"FROM bakery_table \" +\n", " \"WHERE item NOT LIKE 'NONE'\" +\n", " \"ORDER BY transaction DESC\"\n", " )\n", "df8.show(10)\n", "df8.count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Read and Write Data to Parquet Format\n", "Read and write DataFrame data to Parquet format files" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [], "source": [ "df8.write.parquet('output/bakery_parquet', mode='overwrite')" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "total 0\r\n", "drwxr-xr-x 404 garystafford users 13K Jun 9 12:44 bakery_parquet\r\n" ] } ], "source": [ "! ls -lh output/" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------------------+-----------+---------+\n", "| timestamp|transaction| item|\n", "+-------------------+-----------+---------+\n", "|2017-02-15 14:54:25| 6620| Cake|\n", "|2017-02-15 14:41:27| 6619| Bread|\n", "|2017-02-15 14:40:41| 6618| Coffee|\n", "|2017-02-15 14:40:41| 6618| Bread|\n", "|2017-02-15 14:23:16| 6617| Baguette|\n", "|2017-02-15 14:23:16| 6617| Coffee|\n", "|2017-02-15 14:23:16| 6617| Salad|\n", "|2017-02-15 14:23:16| 6617| Art Tray|\n", "|2017-02-15 14:23:16| 6617|Alfajores|\n", "|2017-02-15 14:16:26| 6616| Bread|\n", "+-------------------+-----------+---------+\n", "only showing top 10 rows\n", "\n" ] }, { "data": { "text/plain": [ "18888" ] }, "execution_count": 24, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df9 = spark.read.parquet('output/bakery_parquet')\n", "df9.show(10)\n", "df9.count()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.3" }, "pycharm": { "stem_cell": { "cell_type": "raw", "metadata": { "collapsed": false }, "source": [] } } }, "nbformat": 4, "nbformat_minor": 2 }