Nessie Iceberg/Hive SQL Demo with NBA Dataset
============================
This demo showcases how to use Nessie Python API along with Hive from Iceberg

Initialize PyHive
----------------------------------------------
To get started, we will first have to do a few setup steps that give us everything we need
to get started with Nessie. In case you're interested in the detailed setup steps for Hive, you can check out the [docs](https://projectnessie.org/tools/iceberg/hive/)

The Binder server has downloaded Hive, Hadoop and some data for us as well as started a Nessie server in the background. All we have to do is to connect to Hive session.

The below cell starts a local Hive session with parameters needed to configure Nessie. Each config option is followed by a comment explaining its purpose.

In [None]:
import os
import requests
from pyhive import hive
from pynessie import init

# where we will store our data
warehouse = "file://" + os.path.join(os.getcwd(), "nessie_warehouse")

# where our datasets are located
datasets_path = "file://" + os.path.join(os.path.dirname(os.getcwd()), "datasets")

nessie_client = init()


def create_namespace(ref: str, namespace: list[str]):
    hash = nessie_client.get_reference(ref).hash_
    # pynessie client has currently no code to create namespace, issue a plain REST request.
    response = requests.post(
        url=f"http://127.0.0.1:19120/api/v2/trees/{ref}@{hash}/history/commit",
        headers={"Accept": "application/json", "Content-Type": "application/json"},
        json={
            "commitMeta": {"message": "Create namespace nba"},
            "operations": [{"type": "PUT", "key": {"elements": namespace}, "content": {"type": "NAMESPACE"}}],
        },
    )
    if response.status_code != 200:
        raise Exception(f"Could not create namespace: HTTP {response.status_code} {response.reason}: {response.json()}")


def create_ref_catalog(ref: str):
    """
    Create a branch and switch the current ref to the created branch
    """
    default_branch = nessie_client.get_default_branch()
    if ref != default_branch:
        default_branch_hash = nessie_client.get_reference(default_branch).hash_
        nessie_client.create_branch(ref, ref=default_branch, hash_on_ref=default_branch_hash)
    return switch_ref_catalog(ref)


def switch_ref_catalog(ref: str):
    """
    Switch a branch. When we switch the branch via Hive, we will need to reconnect to Hive
    """
    # The important args below are:
    # catalog-impl: which Iceberg catalog to use, in this case we want NessieCatalog
    # uri: the location of the nessie server.
    # ref: the Nessie ref/branch we want to use (defaults to main)
    # warehouse: the location this catalog should store its data
    return hive.connect(
        "localhost",
        configuration={
            "iceberg.catalog.dev_catalog.catalog-impl": "org.apache.iceberg.nessie.NessieCatalog",
            "iceberg.catalog.dev_catalog.uri": "http://localhost:19120/api/v1",
            "iceberg.catalog.dev_catalog.ref": ref,
            "iceberg.catalog.dev_catalog.warehouse": warehouse,
        },
    ).cursor()


create_namespace("main", ["nba"])


print("\n\nHive running\n\n\n")

Solving Data Engineering problems with Nessie
============================

In this Demo we are a data engineer working at a fictional sports analytics blog. In order for the authors to write articles they have to have access to the relevant data. They need to be able to retrieve data quickly and be able to create charts with it.

We have been asked to collect and expose some information about basketball players. We have located some data sources and are now ready to start ingesting data into our data lakehouse. We will perform the ingestion steps on a Nessie branch to test and validate the data before exposing to the analysts.

Set up Nessie branches (via Nessie CLI)
----------------------------
Once all dependencies are configured, we can get started with ingesting our basketball data into `Nessie` with the following steps:

- Create a new branch named `dev`
- List all branches

It is worth mentioning that we don't have to explicitly create a `main` branch, since it's the default branch.

In [None]:
current_ref = create_ref_catalog("dev")

We have created the branch `dev` and we can see the branch with the Nessie `hash` its currently pointing to.

Below we list all branches. Note that the auto created `main` branch already exists and both branches point at the same empty `hash` initially

In [None]:
!nessie --verbose branch

Create tables under dev branch
-------------------------------------
Once we created the `dev` branch and verified that it exists, we can create some tables and add some data.

We create two tables under the `dev` branch:
- `salaries`
- `totals_stats`

These tables list the salaries per player per year and their stats per year.

To create the data we:

1. switch our branch context to dev
2. create the table
3. insert the data from an existing csv file. This csv file is already stored locally on the demo machine. A production use case would likely take feeds from official data sources

In [None]:
# Creating our demo schema
current_ref.execute("CREATE SCHEMA IF NOT EXISTS nba")

print("\nCreated schema nba\n")


print("\nCreating tables nba.salaries and nba.totals_stats....\n")

# Creating `salaries` table

current_ref.execute(
    f"""CREATE TABLE IF NOT EXISTS nba.salaries (Season STRING,
                Team STRING, Salary STRING, Player STRING)
                STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
                LOCATION '{warehouse}/nba/salaries'
                TBLPROPERTIES ('iceberg.catalog'='dev_catalog', 'write.format.default'='parquet',
                'iceberg.mr.in.memory.data.model'='GENERIC')"""
)

## We create a temporary table to load data into our target table since
## is not possible to load data directly from CSV into non-native table.
current_ref.execute(
    """CREATE TABLE IF NOT EXISTS nba.salaries_temp (Season STRING,
                Team STRING, Salary STRING, Player STRING)
                ROW FORMAT DELIMITED FIELDS TERMINATED BY ','"""
)

current_ref.execute(f'LOAD DATA LOCAL INPATH "{datasets_path}/nba/salaries.csv" OVERWRITE INTO TABLE nba.salaries_temp')
current_ref.execute("INSERT OVERWRITE TABLE nba.salaries SELECT * FROM nba.salaries_temp")

print("\nCreated and inserted data into table nba.salaries from dataset salaries\n")


# Creating `totals_stats` table

current_ref.execute(
    f"""CREATE TABLE IF NOT EXISTS nba.totals_stats (
                Season STRING, Age STRING, Team STRING, ORB STRING,
                DRB STRING, TRB STRING, AST STRING, STL STRING,
                BLK STRING, TOV STRING, PTS STRING, Player STRING, RSorPO STRING)
                STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
                LOCATION '{warehouse}/nba/totals_stats'
                TBLPROPERTIES ('iceberg.catalog'='dev_catalog', 'write.format.default'='parquet',
                'iceberg.mr.in.memory.data.model'='GENERIC')"""
)

## We create a temporary table to load data into our target table since
## is not possible to load data directly from CSV into non-native table.
current_ref.execute(
    """CREATE TABLE IF NOT EXISTS nba.totals_stats_temp (
                Season STRING, Age STRING, Team STRING, ORB STRING,
                DRB STRING, TRB STRING, AST STRING, STL STRING,
                BLK STRING, TOV STRING, PTS STRING, Player STRING, RSorPO STRING)
                ROW FORMAT DELIMITED FIELDS TERMINATED BY ','"""
)

current_ref.execute(
    f'LOAD DATA LOCAL INPATH "{datasets_path}/nba/totals_stats.csv" OVERWRITE INTO TABLE nba.totals_stats_temp'
)
current_ref.execute("INSERT OVERWRITE TABLE nba.totals_stats SELECT * FROM nba.totals_stats_temp")

print("\nCreated and inserted data into table nba.totals_stats from dataset totals_stats\n")

Now we count the rows in our tables to ensure they are the same number as the csv files. Unlike Spark and Flink demos, we can't use the notation of `table@branch` (see the github issue [here](https://github.com/projectnessie/nessie/issues/1985). Therefore, we just set Nessie ref settings through Hive setting `SET iceberg.catalog.{catalog}.ref = {branch}` whenever we want to work on a specific branch.

In [None]:
# We make sure we are still in dev branch
current_ref = switch_ref_catalog("dev")

print("\nCounting rows in nba.salaries\n")

# We count now
current_ref.execute("SELECT COUNT(*) FROM nba.salaries")
table_count = current_ref.fetchone()[0]

current_ref.execute("SELECT COUNT(*) FROM nba.salaries_temp")
csv_count = current_ref.fetchone()[0]
assert table_count == csv_count
print(table_count)

print("\nCounting rows in nba.totals_stats\n")

current_ref.execute("SELECT COUNT(*) FROM nba.totals_stats")
table_count = current_ref.fetchone()[0]

current_ref.execute("SELECT COUNT(*) FROM nba.totals_stats_temp")
csv_count = current_ref.fetchone()[0]
assert table_count == csv_count
print(table_count)

Check generated tables
----------------------------
Since we have been working solely on the `dev` branch, where we created 2 tables and added some data,
let's verify that the `main` branch was not altered by our changes.

In [None]:
!nessie content list

And on the `dev` branch we expect to see two tables

In [None]:
!nessie content list --ref dev

We can also verify that the `dev` and `main` branches point to different commits

In [None]:
!nessie --verbose branch

Dev promotion into main
-----------------------
Once we are done with our changes on the `dev` branch, we would like to merge those changes into `main`.
We merge `dev` into `main` via the command line `merge` command.
Both branches should be at the same revision after merging/promotion.

In [None]:
!nessie merge dev -b main --force

We can verify that the `main` branch now contains the expected tables and row counts.

The tables are now on `main` and ready for consumption by our blog authors and analysts!

In [None]:
!nessie --verbose branch

In [None]:
!nessie content list

In [None]:
# We switch to main branch
current_ref = switch_ref_catalog("main")

print("\nCounting rows in nba.salaries\n")

# We count now
current_ref.execute("SELECT COUNT(*) FROM nba.salaries")
table_count = current_ref.fetchone()[0]

current_ref.execute("SELECT COUNT(*) FROM nba.salaries_temp")
csv_count = current_ref.fetchone()[0]
assert table_count == csv_count
print(table_count)

print("\nCounting rows in nba.totals_stats\n")

current_ref.execute("SELECT COUNT(*) FROM nba.totals_stats")
table_count = current_ref.fetchone()[0]

current_ref.execute("SELECT COUNT(*) FROM nba.totals_stats_temp")
csv_count = current_ref.fetchone()[0]
assert table_count == csv_count
print(table_count)

Perform regular ETL on the new tables
-------------------
Our analysts are happy with the data and we want to now regularly ingest data to keep things up to date. Our first ETL job consists of the following:

1. Update the salaries table to add new data
2. We have decided the `Age` column isn't required in the `totals_stats` table so we will drop the column
3. We create a new table to hold information about the players appearances in all star games

As always we will do this work on a branch and verify the results. This ETL job can then be set up to run nightly with new stats and salary information.

In [None]:
current_ref = create_ref_catalog("etl")

In [None]:
# add some salaries for Kevin Durant
current_ref.execute(
    """INSERT INTO nba.salaries
                        VALUES ('2017-18', 'Golden State Warriors', '$25000000', 'Kevin Durant'),
                        ('2018-19', 'Golden State Warriors', '$30000000', 'Kevin Durant'),
                        ('2019-20', 'Brooklyn Nets', '$37199000', 'Kevin Durant'),
                        ('2020-21', 'Brooklyn Nets', '$39058950', 'Kevin Durant')"""
)

In [None]:
print("\nCreating table nba.allstar_games_stats\n")

# Creating `allstar_games_stats` table
current_ref.execute(
    f"""CREATE TABLE IF NOT EXISTS nba.allstar_games_stats (
                Season STRING, Age STRING, Team STRING, ORB STRING,
                TRB STRING, AST STRING, STL STRING, BLK STRING,
                TOV STRING, PF STRING, PTS STRING, Player STRING)
                STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
                LOCATION '{warehouse}/nba/allstar_games_stats'
                TBLPROPERTIES ('iceberg.catalog'='dev_catalog', 'write.format.default'='parquet',
                'iceberg.mr.in.memory.data.model'='GENERIC')"""
)

## We create a temporary table to load data into our target table since
## is not possible to load data directly from CSV into non-native table.
current_ref.execute(
    """CREATE TABLE IF NOT EXISTS nba.allstar_table_temp (
                Season STRING, Age STRING, Team STRING, ORB STRING, TRB STRING,
                AST STRING, STL STRING, BLK STRING,
                TOV STRING, PF STRING, PTS STRING, Player STRING)
                ROW FORMAT DELIMITED FIELDS TERMINATED BY ','"""
)

current_ref.execute(
    f'LOAD DATA LOCAL INPATH "{datasets_path}/nba/allstar_games_stats.csv" OVERWRITE INTO TABLE nba.allstar_table_temp'
)
current_ref.execute("INSERT OVERWRITE TABLE nba.allstar_games_stats SELECT * FROM nba.allstar_table_temp")

print("\nCreated and inserted data into table nba.allstar_table_temp from dataset allstar_games_stats\n")


print("\nCounting rows in nba.allstar_games_stats\n")

# Since we can't do 'table@branch'
current_ref = switch_ref_catalog("etl")
current_ref.execute("SELECT COUNT(*) FROM nba.allstar_games_stats")
print(current_ref.fetchone()[0])

We can verify that the new table isn't on the `main` branch but is present on the etl branch

In [None]:
# Since we have been working on the `etl` branch, the `allstar_games_stats` table is not on the `main` branch
!nessie content list

In [None]:
# We should see the new `allstar_games_stats` table on the `etl` branch
!nessie content list --ref etl

Now that we are happy with the data we can again merge it into `main`

In [None]:
!nessie merge etl -b main --force

Now lets verify that the changes exist on the `main` branch

In [None]:
!nessie content list

In [None]:
!nessie --verbose branch

In [None]:
# We switch to the main branch
current_ref = switch_ref_catalog("main")

print("\nCounting rows in nba.allstar_games_stats\n")

# We count now
current_ref.execute("SELECT COUNT(*) FROM nba.allstar_games_stats")
table_count = current_ref.fetchone()[0]

current_ref.execute("SELECT COUNT(*) FROM nba.allstar_table_temp")
csv_count = current_ref.fetchone()[0]
assert table_count == csv_count
print(table_count)

Create `experiment` branch
--------------------------------
As a data analyst we might want to carry out some experiments with some data, without affecting `main` in any way.
As in the previous examples, we can just get started by creating an `experiment` branch off of `main`
and carry out our experiment, which could consist of the following steps:
- drop `totals_stats` table
- add data to `salaries` table
- compare `experiment` and `main` tables

In [None]:
current_ref = create_ref_catalog("experiment")

In [None]:
# Drop the `totals_stats` table on the `experiment` branch
current_ref.execute("DROP TABLE nba.totals_stats")

In [None]:
# add some salaries for Dirk Nowitzki
current_ref.execute(
    """INSERT INTO nba.salaries VALUES
    ('2015-16', 'Dallas Mavericks', '$8333333', 'Dirk Nowitzki'),
    ('2016-17', 'Dallas Mavericks', '$25000000', 'Dirk Nowitzki'),
    ('2017-18', 'Dallas Mavericks', '$5000000', 'Dirk Nowitzki'),
    ('2018-19', 'Dallas Mavericks', '$5000000', 'Dirk Nowitzki')"""
)

In [None]:
# We should see the `salaries` and `allstar_games_stats` tables only (since we just dropped `totals_stats`)
!nessie content list --ref experiment

In [None]:
# `main` hasn't been changed and still has the `totals_stats` table
!nessie content list

Let's take a look at the contents of the `salaries` table on the `experiment` branch.

In [None]:
current_ref = switch_ref_catalog("experiment")

print("\nCounting rows in nba.salaries\n")

current_ref.execute("SELECT COUNT(*) FROM nba.salaries")
print(current_ref.fetchone()[0])

and compare to the contents of the `salaries` table on the `main` branch.

In [None]:
current_ref = switch_ref_catalog("main")

# the following INSERT is a workaround for https://github.com/apache/iceberg/pull/4509 until iceberg 0.13.2 is released
# add a single salary for Dirk Nowitzki (so we expect 3 less total rows)
current_ref.execute(
    """INSERT INTO nba.salaries VALUES
    ('2018-19', 'Dallas Mavericks', '$5000000', 'Dirk Nowitzki')"""
)

print("\nCounting rows in nba.salaries\n")

current_ref.execute("SELECT COUNT(*) FROM nba.salaries")
print(current_ref.fetchone()[0])

And finally lets clean up after ourselves

In [None]:
!nessie branch --delete dev
!nessie branch --delete etl
!nessie branch --delete experiment