{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Nessie Spark SQL Demo with NBA Dataset\n", "============================\n", "This demo showcases how to use Nessie Python API along with Spark3 from Iceberg\n", "\n", "Initialize Pyspark\n", "----------------------------------------------\n", "To get started, we will first have to do a few setup steps that give us everything we need\n", "to get started with Nessie. In case you're interested in the detailed setup steps for Spark, you can check out the [docs](https://projectnessie.org/tools/iceberg/spark/).\n", "\n", "The Binder server has downloaded spark and some data for us as well as started a Nessie server in the background. All we have to do is start Spark.\n", "\n", "The below cell starts a local Spark session with parameters needed to configure Nessie. Each config option is followed by a comment explaining its purpose." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "import os\n", "import findspark\n", "from pyspark.sql import *\n", "from pyspark import SparkConf\n", "import pynessie\n", "\n", "findspark.init()\n", "pynessie_version = pynessie.__version__\n", "\n", "conf = SparkConf()\n", "# we need iceberg libraries and the nessie sql extensions\n", "conf.set(\n", " \"spark.jars.packages\",\n", " f\"org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.4.2,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.2_2.12:0.74.0\",\n", ")\n", "# ensure python <-> java interactions are w/ pyarrow\n", "conf.set(\"spark.sql.execution.pyarrow.enabled\", \"true\")\n", "# create catalog dev_catalog as an iceberg catalog\n", "conf.set(\"spark.sql.catalog.dev_catalog\", \"org.apache.iceberg.spark.SparkCatalog\")\n", "# tell the dev_catalog that its a Nessie catalog\n", "conf.set(\"spark.sql.catalog.dev_catalog.catalog-impl\", \"org.apache.iceberg.nessie.NessieCatalog\")\n", "# set the location for Nessie catalog to store data. Spark writes to this directory\n", "conf.set(\"spark.sql.catalog.dev_catalog.warehouse\", \"file://\" + os.getcwd() + \"/spark_warehouse/iceberg\")\n", "# set the location of the nessie server. In this demo its running locally. There are many ways to run it (see https://projectnessie.org/try/)\n", "conf.set(\"spark.sql.catalog.dev_catalog.uri\", \"http://localhost:19120/api/v1\")\n", "# default branch for Nessie catalog to work on\n", "conf.set(\"spark.sql.catalog.dev_catalog.ref\", \"main\")\n", "# use no authorization. Options are NONE AWS BASIC and aws implies running Nessie on a lambda\n", "conf.set(\"spark.sql.catalog.dev_catalog.auth_type\", \"NONE\")\n", "# enable the extensions for both Nessie and Iceberg\n", "conf.set(\n", " \"spark.sql.extensions\",\n", " \"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions\",\n", ")\n", "# finally, start up the Spark server\n", "spark = SparkSession.builder.config(conf=conf).getOrCreate()\n", "print(\"Spark Running\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Solving Data Engineering problems with Nessie\n", "============================\n", "\n", "In this Demo we are a data engineer working at a fictional sports analytics blog. In order for the authors to write articles they have to have access to the relevant data. They need to be able to retrieve data quickly and be able to create charts with it.\n", "\n", "We have been asked to collect and expose some information about basketball players. We have located some data sources and are now ready to start ingesting data into our data lakehouse. We will perform the ingestion steps on a Nessie branch to test and validate the data before exposing to the analysts." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Set up Nessie branches\n", "----------------------------\n", "Once all dependencies are configured, we can get started with ingesting our basketball data into `Nessie` with the following steps:\n", "\n", "- Create a new branch named `dev`\n", "- List all branches\n", "\n", "It is worth mentioning that we don't have to explicitly create a `main` branch, since it's the default branch." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create the 'nba' namespace in Nessie\n", "spark.sql(\"CREATE NAMESPACE dev_catalog.nba\")\n", "\n", "# Create the 'dev' branch from 'main' branch\n", "spark.sql(\"CREATE BRANCH dev IN dev_catalog FROM main\").toPandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We have created the branch `dev` and we can see the branch with the Nessie `hash` its currently pointing to.\n", "\n", "Below we list all branches. Note that the auto created `main` branch already exists and both branches point at the same empty `hash` initially" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spark.sql(\"LIST REFERENCES IN dev_catalog\").toPandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create tables under dev branch\n", "-------------------------------------\n", "Once we created the `dev` branch and verified that it exists, we can create some tables and add some data.\n", "\n", "We create two tables under the `dev` branch:\n", "- `salaries`\n", "- `totals_stats`\n", "\n", "These tables list the salaries per player per year and their stats per year.\n", "\n", "To create the data we:\n", "\n", "1. switch our branch context to dev\n", "2. create the table\n", "3. insert the data from an existing csv file. This csv file is already stored locally on the demo machine. A production use case would likely take feeds from official data sources" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spark.sql(\"USE REFERENCE dev IN dev_catalog\")\n", "\n", "# Creating `salaries` table\n", "spark.sql(\n", " \"\"\"CREATE TABLE IF NOT EXISTS dev_catalog.nba.salaries\n", " (Season STRING, Team STRING, Salary STRING, Player STRING) USING iceberg\"\"\"\n", ")\n", "\n", "spark.sql(\n", " \"\"\"CREATE OR REPLACE TEMPORARY VIEW salaries_table USING csv\n", " OPTIONS (path \"../datasets/nba/salaries.csv\", header true)\"\"\"\n", ")\n", "spark.sql(\"INSERT INTO dev_catalog.nba.salaries SELECT * FROM salaries_table\")\n", "\n", "# Creating `totals_stats` table\n", "spark.sql(\n", " \"\"\"CREATE TABLE IF NOT EXISTS dev_catalog.nba.totals_stats (\n", " Season STRING, Age STRING, Team STRING, ORB STRING, DRB STRING, TRB STRING, AST STRING, STL STRING,\n", " BLK STRING, TOV STRING, PTS STRING, Player STRING, RSorPO STRING)\n", " USING iceberg\"\"\"\n", ")\n", "spark.sql(\n", " \"\"\"CREATE OR REPLACE TEMPORARY VIEW stats_table USING csv\n", " OPTIONS (path \"../datasets/nba/totals_stats.csv\", header true)\"\"\"\n", ")\n", "spark.sql(\"INSERT INTO dev_catalog.nba.totals_stats SELECT * FROM stats_table\").toPandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we count the rows in our tables to ensure they are the same number as the csv files. Note we use the `table@branch` notation which overrides the context set by a `USE REFERENCE` command." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "table_count = spark.sql(\"select count(*) from dev_catalog.nba.`salaries@dev`\").toPandas().values[0][0]\n", "csv_count = spark.sql(\"select count(*) from salaries_table\").toPandas().values[0][0]\n", "assert table_count == csv_count\n", "print(table_count)\n", "\n", "table_count = spark.sql(\"select count(*) from dev_catalog.nba.`totals_stats@dev`\").toPandas().values[0][0]\n", "csv_count = spark.sql(\"select count(*) from stats_table\").toPandas().values[0][0]\n", "assert table_count == csv_count\n", "print(table_count)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Check generated tables\n", "----------------------------\n", "Since we have been working solely on the `dev` branch, where we created 2 tables and added some data,\n", "let's verify that the `main` branch was not altered by our changes." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "spark.sql(\"USE REFERENCE main IN dev_catalog\").toPandas()\n", "spark.sql(\"SHOW TABLES IN dev_catalog\").toPandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And on the `dev` branch we expect to see two tables" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spark.sql(\"USE REFERENCE dev IN dev_catalog\").toPandas()\n", "spark.sql(\"SHOW TABLES IN dev_catalog\").toPandas()" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "We can also verify that the `dev` and `main` branches point to different commits" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spark.sql(\"LIST REFERENCES IN dev_catalog\").toPandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Dev promotion into main\n", "-----------------------\n", "Once we are done with our changes on the `dev` branch, we would like to merge those changes into `main`.\n", "We merge `dev` into `main` via the Spark sql `merge` command.\n", "Both branches should be at the same revision after merging/promotion." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spark.sql(\"MERGE BRANCH dev INTO main IN dev_catalog\").toPandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can verify that the `main` branch now contains the expected tables and row counts.\n", "\n", "The tables are now on `main` and ready for consumption by our blog authors and analysts!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spark.sql(\"LIST REFERENCES IN dev_catalog\").toPandas()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spark.sql(\"USE REFERENCE main IN dev_catalog\").toPandas()\n", "spark.sql(\"SHOW TABLES IN dev_catalog\").toPandas()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "table_count = spark.sql(\"select count(*) from dev_catalog.nba.salaries\").toPandas().values[0][0]\n", "csv_count = spark.sql(\"select count(*) from salaries_table\").toPandas().values[0][0]\n", "assert table_count == csv_count\n", "print(table_count)\n", "\n", "table_count = spark.sql(\"select count(*) from dev_catalog.nba.totals_stats\").toPandas().values[0][0]\n", "csv_count = spark.sql(\"select count(*) from stats_table\").toPandas().values[0][0]\n", "assert table_count == csv_count\n", "print(table_count)" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "Perform regular ETL on the new tables\n", "-------------------\n", "Our analysts are happy with the data and we want to now regularly ingest data to keep things up to date. Our first ETL job consists of the following:\n", "\n", "1. Update the salaries table to add new data\n", "2. We have decided the `Age` column isn't required in the `totals_stats` table so we will drop the column\n", "3. We create a new table to hold information about the players appearances in all star games\n", "\n", "As always we will do this work on a branch and verify the results. This ETL job can then be set up to run nightly with new stats and salary information." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spark.sql(\"CREATE BRANCH etl IN dev_catalog FROM main\").toPandas()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "# add some salaries for Kevin Durant\n", "spark.sql(\"USE REFERENCE etl IN dev_catalog\")\n", "spark.sql(\n", " \"\"\"INSERT INTO dev_catalog.nba.salaries VALUES\n", " (\"2017-18\", \"Golden State Warriors\", \"$25000000\", \"Kevin Durant\"),\n", " (\"2018-19\", \"Golden State Warriors\", \"$30000000\", \"Kevin Durant\"),\n", " (\"2019-20\", \"Brooklyn Nets\", \"$37199000\", \"Kevin Durant\"),\n", " (\"2020-21\", \"Brooklyn Nets\", \"$39058950\", \"Kevin Durant\")\n", " \"\"\"\n", ").toPandas()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Dropping a column in the `totals_stats` table\n", "spark.sql(\"ALTER TABLE dev_catalog.nba.totals_stats DROP COLUMN Age\").toPandas()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Creating `allstar_games_stats` table and viewing the contents\n", "spark.sql(\n", " \"\"\"CREATE TABLE IF NOT EXISTS dev_catalog.nba.allstar_games_stats (\n", " Season STRING, Age STRING, Team STRING, ORB STRING, TRB STRING, AST STRING, STL STRING, BLK STRING,\n", " TOV STRING, PF STRING, PTS STRING, Player STRING)\n", " USING iceberg\"\"\"\n", ")\n", "spark.sql(\n", " \"\"\"CREATE OR REPLACE TEMPORARY VIEW allstar_table USING csv\n", " OPTIONS (path \"../datasets/nba/allstar_games_stats.csv\", header true)\"\"\"\n", ")\n", "spark.sql(\"INSERT INTO dev_catalog.nba.allstar_games_stats SELECT * FROM allstar_table\").toPandas()\n", "\n", "# notice how we view the data on the etl branch via @etl\n", "spark.sql(\"select count(*) from dev_catalog.nba.`allstar_games_stats@etl`\").toPandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can verify that the new table isn't on the `main` branch but is present on the etl branch" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spark.sql(\"USE REFERENCE main IN dev_catalog\").toPandas()\n", "spark.sql(\"SHOW TABLES IN dev_catalog\").toPandas()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spark.sql(\"USE REFERENCE etl IN dev_catalog\").toPandas()\n", "spark.sql(\"SHOW TABLES IN dev_catalog\").toPandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that we are happy with the data we can again merge it into `main`" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "spark.sql(\"MERGE BRANCH etl INTO main IN dev_catalog\").toPandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now lets verify that the changes exist on the `main` branch" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "spark.sql(\"USE REFERENCE main IN dev_catalog\").toPandas()\n", "spark.sql(\"SHOW TABLES IN dev_catalog\").toPandas()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spark.sql(\"LIST REFERENCES IN dev_catalog\").toPandas()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "table_count = spark.sql(\"select count(*) from dev_catalog.nba.allstar_games_stats\").toPandas().values[0][0]\n", "csv_count = spark.sql(\"select count(*) from allstar_table\").toPandas().values[0][0]\n", "assert table_count == csv_count\n", "print(table_count)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create `experiment` branch\n", "--------------------------------\n", "As a data analyst we might want to carry out some experiments with some data, without affecting `main` in any way.\n", "As in the previous examples, we can just get started by creating an `experiment` branch off of `main`\n", "and carry out our experiment, which could consist of the following steps:\n", "- drop `totals_stats` table\n", "- add data to `salaries` table\n", "- compare `experiment` and `main` tables" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spark.sql(\"CREATE BRANCH experiment IN dev_catalog FROM main\").toPandas()\n", "spark.sql(\"USE REFERENCE experiment IN dev_catalog\").toPandas()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Drop the `totals_stats` table on the `experiment` branch\n", "spark.sql(\"DROP TABLE dev_catalog.nba.totals_stats\").toPandas()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# add some salaries for Dirk Nowitzki\n", "spark.sql(\n", " \"\"\"INSERT INTO dev_catalog.nba.salaries VALUES\n", " (\"2015-16\", \"Dallas Mavericks\", \"$8333333\", \"Dirk Nowitzki\"),\n", " (\"2016-17\", \"Dallas Mavericks\", \"$25000000\", \"Dirk Nowitzki\"),\n", " (\"2017-28\", \"Dallas Mavericks\", \"$5000000\", \"Dirk Nowitzki\"),\n", " (\"2018-19\", \"Dallas Mavericks\", \"$5000000\", \"Dirk Nowitzki\")\n", " \"\"\"\n", ").toPandas()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spark.sql(\"SHOW TABLES IN dev_catalog\").toPandas()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "spark.sql(\"USE REFERENCE main IN dev_catalog\").toPandas()\n", "spark.sql(\"SHOW TABLES IN dev_catalog\").toPandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's take a look at the contents of the `salaries` table on the `experiment` branch.\n", "Notice the use of the `nessie` catalog and the use of `@experiment` to view data on the `experiment` branch" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spark.sql(\"select count(*) from dev_catalog.nba.`salaries@experiment`\").toPandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "and compare to the contents of the `salaries` table on the `main` branch. Notice that we didn't have to specify `@branchName` as it defaulted\n", "to the `main` branch" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "spark.sql(\"select count(*) from dev_catalog.nba.salaries\").toPandas()" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "And finally lets clean up after ourselves" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "spark.sql(\"DROP BRANCH dev IN dev_catalog\")\n", "spark.sql(\"DROP BRANCH etl IN dev_catalog\")\n", "spark.sql(\"DROP BRANCH experiment IN dev_catalog\")" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.12" } }, "nbformat": 4, "nbformat_minor": 4 }