In [None]:
%session_id_prefix iceberg-sql-
%glue_version 3.0
%idle_timeout 60
%connections 
%%configure 
{
 "--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
}

In [None]:
catalog_name = "glue_catalog"
bucket_name = ""
bucket_prefix = ""
database_name = "iceberg_sql"
table_name = "product"
warehouse_path = f"s3://{bucket_name}/{bucket_prefix}"
dynamodb_table = 'myGlueLockTable'

## Initialize SparkSession

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .config("spark.sql.warehouse.dir", warehouse_path) \
 .config(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
 .config(f"spark.sql.catalog.{catalog_name}.warehouse", warehouse_path) \
 .config(f"spark.sql.catalog.{catalog_name}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
 .config(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
 .config(f"spark.sql.catalog.{catalog_name}.lock-impl", "org.apache.iceberg.aws.glue.DynamoLockManager") \
 .config(f"spark.sql.catalog.{catalog_name}.lock.table", dynamodb_table) \
 .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
 .getOrCreate()

## Clean up existing resources

In [None]:
query = f"""
DROP TABLE IF EXISTS {catalog_name}.{database_name}.{table_name}
"""
spark.sql(query)

## Create Iceberg table with sample data

In [None]:
from pyspark.sql import Row
import time

ut = time.time()

product = [
 {'product_id': '00001', 'product_name': 'Heater', 'price': 250, 'category': 'Electronics', 'updated_at': ut},
 {'product_id': '00002', 'product_name': 'Thermostat', 'price': 400, 'category': 'Electronics', 'updated_at': ut},
 {'product_id': '00003', 'product_name': 'Television', 'price': 600, 'category': 'Electronics', 'updated_at': ut},
 {'product_id': '00004', 'product_name': 'Blender', 'price': 100, 'category': 'Electronics', 'updated_at': ut},
 {'product_id': '00005', 'product_name': 'USB charger', 'price': 50, 'category': 'Electronics', 'updated_at': ut}
]

df_products = spark.createDataFrame(Row(**x) for x in product)

In [None]:
df_products.createOrReplaceTempView(f"tmp_{table_name}")

In [None]:
query = f"""
CREATE DATABASE IF NOT EXISTS {database_name}
"""
spark.sql(query)

In [None]:
query = f"""
CREATE TABLE {catalog_name}.{database_name}.{table_name}
USING iceberg
AS SELECT * FROM tmp_{table_name}
"""
spark.sql(query)

In [None]:
%%sql
USE iceberg_sql

In [None]:
%%sql
SHOW TABLES

## Read from Iceberg table

In [None]:
%%sql
SELECT * FROM glue_catalog.iceberg_sql.product

## Upsert records into Iceberg table

In [None]:
ut = time.time()

product_updates = [
 {'product_id': '00001', 'product_name': 'Heater', 'price': 400, 'category': 'Electronics', 'updated_at': ut}, # Update
 {'product_id': '00006', 'product_name': 'Chair', 'price': 50, 'category': 'Furniture', 'updated_at': ut} # Insert
]
df_product_updates = spark.createDataFrame(Row(**x) for x in product_updates)

In [None]:
df_product_updates.createOrReplaceTempView(f"tmp_{table_name}_updates")

In [None]:
query = f"""
MERGE INTO {catalog_name}.{database_name}.{table_name} AS t
USING (SELECT * FROM tmp_{table_name}_updates) AS u
ON t.product_id = u.product_id
WHEN MATCHED THEN UPDATE SET t.updated_at = u.updated_at
WHEN NOT MATCHED THEN INSERT *
"""
spark.sql(query)

In [None]:
%%sql
SELECT * FROM glue_catalog.iceberg_sql.product

## Delete records

In [None]:
%%sql
DELETE FROM glue_catalog.iceberg_sql.product WHERE product_name == "Blender"

In [None]:
%%sql
SELECT * FROM glue_catalog.iceberg_sql.product

## View History and Snapshots

In [None]:
%%sql
SELECT * FROM glue_catalog.iceberg_sql.product.history

In [None]:
%%sql
SELECT * FROM glue_catalog.iceberg_sql.product.snapshots

In [None]:
%%sql
SELECT h.made_current_at, s.operation, h.snapshot_id, h.is_current_ancestor, s.summary["spark.app.id"] FROM glue_catalog.iceberg_sql.product.history h JOIN glue_catalog.iceberg_sql.product.snapshots s ON h.snapshot_id = s.snapshot_id ORDER BY made_current_at

## Stop Session

In [None]:
%stop_session