{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "364cd143-d987-4fda-800e-b5dd0af97a79", "metadata": {}, "outputs": [], "source": [ "%session_id_prefix iceberg-sql-\n", "%glue_version 3.0\n", "%idle_timeout 60\n", "%connections \n", "%%configure \n", "{\n", " \"--conf\": \"spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\"\n", "}" ] }, { "cell_type": "code", "execution_count": null, "id": "fc76d99b-caf5-4663-bda0-1771abd8efdf", "metadata": {}, "outputs": [], "source": [ "catalog_name = \"glue_catalog\"\n", "bucket_name = \"\"\n", "bucket_prefix = \"\"\n", "database_name = \"iceberg_sql\"\n", "table_name = \"product\"\n", "warehouse_path = f\"s3://{bucket_name}/{bucket_prefix}\"\n", "dynamodb_table = 'myGlueLockTable'" ] }, { "cell_type": "markdown", "id": "223f52c4", "metadata": {}, "source": [ "## Initialize SparkSession" ] }, { "cell_type": "code", "execution_count": null, "id": "07d9de08-c473-4591-bd5a-ae7106d9addc", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "spark = SparkSession.builder \\\n", " .config(\"spark.sql.warehouse.dir\", warehouse_path) \\\n", " .config(f\"spark.sql.catalog.{catalog_name}\", \"org.apache.iceberg.spark.SparkCatalog\") \\\n", " .config(f\"spark.sql.catalog.{catalog_name}.warehouse\", warehouse_path) \\\n", " .config(f\"spark.sql.catalog.{catalog_name}.catalog-impl\", \"org.apache.iceberg.aws.glue.GlueCatalog\") \\\n", " .config(f\"spark.sql.catalog.{catalog_name}.io-impl\", \"org.apache.iceberg.aws.s3.S3FileIO\") \\\n", " .config(f\"spark.sql.catalog.{catalog_name}.lock-impl\", \"org.apache.iceberg.aws.glue.DynamoLockManager\") \\\n", " .config(f\"spark.sql.catalog.{catalog_name}.lock.table\", dynamodb_table) \\\n", " .config(\"spark.sql.extensions\", \"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\") \\\n", " .getOrCreate()" ] }, { "cell_type": "markdown", "id": "19d274a4-1610-43a6-a574-b18894a2f73b", "metadata": {}, "source": [ "## Clean up existing resources" ] }, { "cell_type": "code", "execution_count": null, "id": "3e8254c1-b5d6-4b03-92b3-ceffd90edd79", "metadata": {}, "outputs": [], "source": [ "query = f\"\"\"\n", "DROP TABLE IF EXISTS {catalog_name}.{database_name}.{table_name}\n", "\"\"\"\n", "spark.sql(query)" ] }, { "cell_type": "markdown", "id": "5078b110-be78-43fe-8ffe-93cd013c7b0c", "metadata": {}, "source": [ "## Create Iceberg table with sample data" ] }, { "cell_type": "code", "execution_count": null, "id": "fba92930-daf3-4b20-ae73-25b0b6c82d90", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import Row\n", "import time\n", "\n", "ut = time.time()\n", "\n", "product = [\n", " {'product_id': '00001', 'product_name': 'Heater', 'price': 250, 'category': 'Electronics', 'updated_at': ut},\n", " {'product_id': '00002', 'product_name': 'Thermostat', 'price': 400, 'category': 'Electronics', 'updated_at': ut},\n", " {'product_id': '00003', 'product_name': 'Television', 'price': 600, 'category': 'Electronics', 'updated_at': ut},\n", " {'product_id': '00004', 'product_name': 'Blender', 'price': 100, 'category': 'Electronics', 'updated_at': ut},\n", " {'product_id': '00005', 'product_name': 'USB charger', 'price': 50, 'category': 'Electronics', 'updated_at': ut}\n", "]\n", "\n", "df_products = spark.createDataFrame(Row(**x) for x in product)" ] }, { "cell_type": "code", "execution_count": null, "id": "cb9bf071-a9f9-4a26-a362-f2b6250a6c82", "metadata": {}, "outputs": [], "source": [ "df_products.createOrReplaceTempView(f\"tmp_{table_name}\")" ] }, { "cell_type": "code", "execution_count": null, "id": "aa310535", "metadata": {}, "outputs": [], "source": [ "query = f\"\"\"\n", "CREATE DATABASE IF NOT EXISTS {database_name}\n", "\"\"\"\n", "spark.sql(query)" ] }, { "cell_type": "code", "execution_count": null, "id": "a07c0a18-c73c-4393-ac79-427e4eef21c9", "metadata": {}, "outputs": [], "source": [ "query = f\"\"\"\n", "CREATE TABLE {catalog_name}.{database_name}.{table_name}\n", "USING iceberg\n", "AS SELECT * FROM tmp_{table_name}\n", "\"\"\"\n", "spark.sql(query)" ] }, { "cell_type": "code", "execution_count": null, "id": "faee90d6-46c8-409a-8baa-bd20aa6e444d", "metadata": {}, "outputs": [], "source": [ "%%sql\n", "USE iceberg_sql" ] }, { "cell_type": "code", "execution_count": null, "id": "1fdcedac", "metadata": {}, "outputs": [], "source": [ "%%sql\n", "SHOW TABLES" ] }, { "cell_type": "markdown", "id": "48d42e63-fc96-4047-96fb-ad388f16a8bb", "metadata": {}, "source": [ "## Read from Iceberg table" ] }, { "cell_type": "code", "execution_count": null, "id": "a3f30f34-b750-4b65-b002-f342efe910de", "metadata": {}, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM glue_catalog.iceberg_sql.product" ] }, { "cell_type": "markdown", "id": "edc9062b-d134-4e78-879e-d9c51e85fdad", "metadata": {}, "source": [ "## Upsert records into Iceberg table" ] }, { "cell_type": "code", "execution_count": null, "id": "92624355-7e93-4c65-950c-3d0ccd0110b6", "metadata": {}, "outputs": [], "source": [ "ut = time.time()\n", "\n", "product_updates = [\n", " {'product_id': '00001', 'product_name': 'Heater', 'price': 400, 'category': 'Electronics', 'updated_at': ut}, # Update\n", " {'product_id': '00006', 'product_name': 'Chair', 'price': 50, 'category': 'Furniture', 'updated_at': ut} # Insert\n", "]\n", "df_product_updates = spark.createDataFrame(Row(**x) for x in product_updates)" ] }, { "cell_type": "code", "execution_count": null, "id": "06524e63-1ca4-4e91-8b48-4165e5bdca8b", "metadata": {}, "outputs": [], "source": [ "df_product_updates.createOrReplaceTempView(f\"tmp_{table_name}_updates\")" ] }, { "cell_type": "code", "execution_count": null, "id": "38a4e5b9-49b1-4b7c-9388-cb13f33c62c3", "metadata": {}, "outputs": [], "source": [ "query = f\"\"\"\n", "MERGE INTO {catalog_name}.{database_name}.{table_name} AS t\n", "USING (SELECT * FROM tmp_{table_name}_updates) AS u\n", "ON t.product_id = u.product_id\n", "WHEN MATCHED THEN UPDATE SET t.updated_at = u.updated_at\n", "WHEN NOT MATCHED THEN INSERT *\n", "\"\"\"\n", "spark.sql(query)" ] }, { "cell_type": "code", "execution_count": null, "id": "d62c4865-125f-4619-9f58-d2ec4bd4b562", "metadata": {}, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM glue_catalog.iceberg_sql.product" ] }, { "cell_type": "markdown", "id": "9e209781", "metadata": {}, "source": [ "## Delete records" ] }, { "cell_type": "code", "execution_count": null, "id": "ea9793ea", "metadata": {}, "outputs": [], "source": [ "%%sql\n", "DELETE FROM glue_catalog.iceberg_sql.product WHERE product_name == \"Blender\"" ] }, { "cell_type": "code", "execution_count": null, "id": "214029da", "metadata": {}, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM glue_catalog.iceberg_sql.product" ] }, { "cell_type": "markdown", "id": "c546f7e4", "metadata": {}, "source": [ "## View History and Snapshots" ] }, { "cell_type": "code", "execution_count": null, "id": "15a368b3", "metadata": {}, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM glue_catalog.iceberg_sql.product.history" ] }, { "cell_type": "code", "execution_count": null, "id": "83b9e822", "metadata": {}, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM glue_catalog.iceberg_sql.product.snapshots" ] }, { "cell_type": "code", "execution_count": null, "id": "82c05698", "metadata": {}, "outputs": [], "source": [ "%%sql\n", "SELECT h.made_current_at, s.operation, h.snapshot_id, h.is_current_ancestor, s.summary[\"spark.app.id\"] FROM glue_catalog.iceberg_sql.product.history h JOIN glue_catalog.iceberg_sql.product.snapshots s ON h.snapshot_id = s.snapshot_id ORDER BY made_current_at" ] }, { "cell_type": "markdown", "id": "98770309", "metadata": {}, "source": [ "## Stop Session" ] }, { "cell_type": "code", "execution_count": null, "id": "53ccca8e-1692-4da6-af55-9ab52d6fbc08", "metadata": {}, "outputs": [], "source": [ "%stop_session" ] } ], "metadata": { "kernelspec": { "display_name": "Glue PySpark", "language": "python", "name": "glue_pyspark" } }, "nbformat": 4, "nbformat_minor": 5 }