{ "cells": [ { "cell_type": "markdown", "id": "2d2164d5-cd71-4160-9a4e-c55fb6620708", "metadata": {}, "source": [ "# AIDP - Sales Data MERGE Operation - Complete Example\n", "\n", "## README: AIDP External Catalog - MERGE\n", "\n", "This notebook demonstrates a complete MERGE operation workflow into ALH (Autonomous AI Lakehouse) database table using the AIDP framework.\n", "\n", "### Key Concepts:\n", "\n", "**Catalog User: `aidp_user`**\n", "- `aidp_user` is a database user that is used to create the external catalog in AIDP\n", "- This user has been granted access to the `coder` schema in database\n", "- The `coder` schema contains tables that are managed by aidp_user\n", "\n", "**Table Location in AIDP:**\n", "- **Full Table Name**: `ext_private_catalog_aidp_user.coder.sales`\n", " - `ext_private_catalog_aidp_user`: External catalog reference for aidp_user\n", " - `coder`: Schema name\n", " - `sales`: Table name\n", "\n", "### Table Schema and Create Statement:\n", "\n", "**Please execute the following CREATE TABLE statement in your database (coder schema) before running this notebook:**\n", "\n", "```sql\n", "CREATE TABLE IF NOT EXISTS coder.sales (\n", " order_id NUMBER NOT NULL,\n", " customer_name VARCHAR2(100),\n", " product_name VARCHAR2(100),\n", " quantity NUMBER\n", ")\n", "```\n", "\n", "**Column Descriptions:**\n", "- `order_id` (NUMBER): Unique order identifier - Used as the identifier for MERGE operations\n", "- `customer_name` (VARCHAR2): Name of the customer placing the order\n", "- `product_name` (VARCHAR2): Name of the product ordered\n", "- `quantity` (NUMBER): Quantity of the product ordered\n", "\n", "### Workflow Overview:\n", "1. Ensure the sales table exists in the coder schema (execute CREATE TABLE statement above)\n", "2. Insert initial sample customer order records\n", "3. Create an update/insert DataFrame (simulating new orders and updates)\n", "4. Execute MERGE operation to sync the DataFrame with the table\n", "5. Verify final results with summary statistics\n", "\n", "### Merge Operation Details:\n", "- **Merge Mode**: \"MERGE\" mode enabled\n", "- **Merge Keys**: \"order_id\"\n", "- **Skip object-storage Staging**: Enabled for performance optimization\n", "- **Merge Staging Using Username**: Uses current session username (aidp_user) for staging table. If the user has a special case of requiring merge staging in the logged in user (aidp_user), then this flag can be set to true." ] }, { "cell_type": "markdown", "id": "c1d806ed-27d8-4b8d-8cb2-821c62734996", "metadata": {}, "source": [ "## Step 1: Define Target Table" ] }, { "cell_type": "code", "execution_count": 36, "id": "533858f4-eaae-4f5e-8fde-4721b2be8a6d", "metadata": { "command_metadata": { "end_time": 1778780651560.2595, "start_time": 1778780650786.866 }, "execution": { "iopub.status.busy": "2026-05-14T17:44:14.057Z" }, "result_type": "result", "trusted": true, "type": "python" }, "outputs": [ { "data": { "text/plain": [ "Target Table: ext_private_catalog_aidp_user.coder.sales\n", "\n", "Catalog: ext_private_catalog_aidp_user\n", "Schema: coder\n", "Table: sales\n", "\n", "āœ“ Ensure the CREATE TABLE statement from README has been executed in your database before proceeding.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Define target table location\n", "catalog_name = \"ext_private_catalog_aidp_user\"\n", "schema_name = \"coder\"\n", "table_name = \"sales\"\n", "full_target_table = f\"{catalog_name}.{schema_name}.{table_name}\"\n", "\n", "print(f\"Target Table: {full_target_table}\")\n", "print(f\"\\nCatalog: {catalog_name}\")\n", "print(f\"Schema: {schema_name}\")\n", "print(f\"Table: {table_name}\")\n", "print(\"\\nāœ“ Ensure the CREATE TABLE statement from README has been executed in your database before proceeding.\")" ] }, { "cell_type": "markdown", "id": "65aa6cb6-4439-47ea-922b-a6fd68f4530d", "metadata": {}, "source": [ "## Step 2: Create Initial DataFrame with Sample Data" ] }, { "cell_type": "code", "execution_count": 37, "id": "f49fc536-4ffc-4265-8631-627b1253b43c", "metadata": { "command_metadata": { "end_time": 1778780655415.946, "start_time": 1778780654228.7942 }, "execution": { "iopub.status.busy": "2026-05-14T17:44:16.579Z" }, "result_type": "result", "trusted": true, "type": "python" }, "outputs": [ { "data": { "text/plain": [ "\n", "======================================================================\n", "=== INITIAL TEST DATA (New Records to Insert) ===\n", "======================================================================\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Total records: 4\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+--------+-------------+------------+--------+\n", "|order_id|customer_name|product_name|quantity|\n", "+--------+-------------+------------+--------+\n", "|1001 |John Smith |Laptop |2 |\n", "|1002 |Jane Doe |Monitor |4 |\n", "|1003 |Bob Johnson |Keyboard |5 |\n", "|1004 |Alice Brown |Mouse |10 |\n", "+--------+-------------+------------+--------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "from pyspark.sql.types import StructType, StructField, IntegerType, StringType\n", "\n", "# Define schema for sales data\n", "schema = StructType([\n", " StructField(\"order_id\", IntegerType(), False),\n", " StructField(\"customer_name\", StringType(), True),\n", " StructField(\"product_name\", StringType(), True),\n", " StructField(\"quantity\", IntegerType(), True)\n", "])\n", "\n", "# Create initial test data - Starting inventory\n", "initial_data = [\n", " (1001, \"John Smith\", \"Laptop\", 2),\n", " (1002, \"Jane Doe\", \"Monitor\", 4),\n", " (1003, \"Bob Johnson\", \"Keyboard\", 5),\n", " (1004, \"Alice Brown\", \"Mouse\", 10)\n", "]\n", "\n", "df_initial = spark.createDataFrame(initial_data, schema=schema)\n", "\n", "print(\"\\n\" + \"=\"*70)\n", "print(\"=== INITIAL TEST DATA (New Records to Insert) ===\")\n", "print(\"=\"*70)\n", "print(f\"\\nTotal records: {df_initial.count()}\")\n", "df_initial.show(truncate=False)" ] }, { "cell_type": "markdown", "id": "9417f700-6a3a-4df6-aa16-b4738c9a7bb2", "metadata": {}, "source": [ "## Step 3: Insert Initial Data into Sales Table" ] }, { "cell_type": "code", "execution_count": 38, "id": "f6de8974-3ef8-47b3-83bb-f046fa632be4", "metadata": { "command_metadata": { "end_time": 1778780705030.047, "start_time": 1778780656742.1416 }, "execution": { "iopub.status.busy": "2026-05-14T17:45:06.547Z" }, "result_type": "result", "trusted": true, "type": "python" }, "outputs": [ { "data": { "text/plain": [ "\n", "šŸ“ Inserting initial sample data into target table...\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "āœ“ 4 records inserted into ext_private_catalog_aidp_user.coder.sales\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Insert initial data - Create temp view and use SQL INSERT\n", "print(\"\\nšŸ“ Inserting initial sample data into target table...\")\n", "\n", "# Create temporary view from DataFrame\n", "df_initial.createOrReplaceTempView(\"temp_sales_data\")\n", "\n", "# Insert using SQL with temp view\n", "insert_sql = f\"\"\"\n", " INSERT INTO {full_target_table}\n", " SELECT * FROM temp_sales_data\n", "\"\"\"\n", "\n", "spark.sql(insert_sql)\n", "\n", "print(f\"āœ“ {df_initial.count()} records inserted into {full_target_table}\")" ] }, { "cell_type": "markdown", "id": "1cf26857-0c5b-4ab2-b864-766516db56ef", "metadata": {}, "source": [ "## Step 4: Verify Inserted Data" ] }, { "cell_type": "code", "execution_count": 39, "id": "2f671f97-8109-422b-badf-f25fdfee3e3f", "metadata": { "command_metadata": { "end_time": 1778780711487.1472, "start_time": 1778780706724.306 }, "execution": { "iopub.status.busy": "2026-05-14T17:45:12.745Z" }, "result_type": "result", "trusted": true, "type": "python" }, "outputs": [ { "data": { "text/plain": [ "\n", "======================================================================\n", "=== TABLE DATA AFTER INITIAL INSERT ===\n", "======================================================================\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Total records: 4\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+---------------+-------------+------------+-------------+\n", "|order_id |customer_name|product_name|quantity |\n", "+---------------+-------------+------------+-------------+\n", "|1001.0000000000|John Smith |Laptop |2.0000000000 |\n", "|1002.0000000000|Jane Doe |Monitor |4.0000000000 |\n", "|1003.0000000000|Bob Johnson |Keyboard |5.0000000000 |\n", "|1004.0000000000|Alice Brown |Mouse |10.0000000000|\n", "+---------------+-------------+------------+-------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Read the current data from the table\n", "df_current = spark.read.table(full_target_table)\n", "\n", "print(\"\\n\" + \"=\"*70)\n", "print(\"=== TABLE DATA AFTER INITIAL INSERT ===\")\n", "print(\"=\"*70)\n", "print(f\"\\nTotal records: {df_current.count()}\")\n", "df_current.orderBy(\"order_id\").show(truncate=False)" ] }, { "cell_type": "markdown", "id": "453af39c-fce1-42f4-93b8-5381c2a63c79", "metadata": {}, "source": [ "## Step 5: Create Update and Insert DataFrame for MERGE" ] }, { "cell_type": "code", "execution_count": 40, "id": "46997c14-bd1f-4e5f-bcb8-acb094f0f371", "metadata": { "command_metadata": { "end_time": 1778780714073.5872, "start_time": 1778780712909.8481 }, "execution": { "iopub.status.busy": "2026-05-14T17:45:15.443Z" }, "result_type": "result", "trusted": true, "type": "python" }, "outputs": [ { "data": { "text/plain": [ "\n", "======================================================================\n", "=== DATA PREPARED FOR MERGE OPERATION ===\n", "======================================================================\n", "\n", "This DataFrame contains:\n", " - UPDATED records: order_id 1001, 1002 (quantity and product changes)\n", " - NEW records: order_id 1005, 1006 (to be inserted)\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Total records to merge: 4\n", "+--------+-------------+------------+--------+\n", "|order_id|customer_name|product_name|quantity|\n", "+--------+-------------+------------+--------+\n", "|1001 |John Smith |Laptop Pro |3 |\n", "|1002 |Jane Doe |Monitor 4K |5 |\n", "|1005 |David Wilson |Headphones |8 |\n", "|1006 |Emma Davis |Webcam |3 |\n", "+--------+-------------+------------+--------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create data for MERGE operation\n", "# This includes:\n", "# - Updated records (order_id: 1001, 1002) with quantity changes\n", "# - New records (order_id: 1005, 1006) to be inserted\n", "\n", "merge_data = [\n", " (1001, \"John Smith\", \"Laptop Pro\", 3), # Update: Change product and quantity\n", " (1002, \"Jane Doe\", \"Monitor 4K\", 5), # Update: Change product and quantity\n", " (1005, \"David Wilson\", \"Headphones\", 8), # Insert: New order\n", " (1006, \"Emma Davis\", \"Webcam\", 3) # Insert: New order\n", "]\n", "\n", "df_merge_data = spark.createDataFrame(merge_data, schema=schema)\n", "\n", "print(\"\\n\" + \"=\"*70)\n", "print(\"=== DATA PREPARED FOR MERGE OPERATION ===\")\n", "print(\"=\"*70)\n", "print(\"\\nThis DataFrame contains:\")\n", "print(\" - UPDATED records: order_id 1001, 1002 (quantity and product changes)\")\n", "print(\" - NEW records: order_id 1005, 1006 (to be inserted)\")\n", "print(f\"\\nTotal records to merge: {df_merge_data.count()}\")\n", "df_merge_data.show(truncate=False)" ] }, { "cell_type": "markdown", "id": "e16c4641-47c8-4418-ad32-263ce4b7729b", "metadata": {}, "source": [ "## Step 6: Execute MERGE Operation" ] }, { "cell_type": "code", "execution_count": 41, "id": "5d09d31e-fc91-47a7-bda4-41861b1746b7", "metadata": { "command_metadata": { "end_time": 1778780719987.541, "start_time": 1778780715730.0835 }, "execution": { "iopub.status.busy": "2026-05-14T17:45:21.090Z" }, "result_type": "result", "trusted": true, "type": "python" }, "outputs": [ { "data": { "text/plain": [ "\n", "======================================================================\n", "šŸ”„ EXECUTING MERGE OPERATION\n", "======================================================================\n", "\n", "Target Table: ext_private_catalog_aidp_user.coder.sales\n", "Records to merge: 4\n", "\n", "MERGE Options:\n", " - write.mode: \"MERGE\"\n", " - write.merge.keys: \"order_id\" (order_id specific key constraint)\n", " - skip.oos.staging: \"true\" (skip object-storage Staging staging)\n", " - merge.staging.using.username: \"true\" (use session username)\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "āœ“ MERGE operation completed successfully\n", "āœ“ Updated 2 records (order_id: 1001, 1002)\n", "āœ“ Inserted 2 new records (order_id: 1005, 1006)\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Execute MERGE operation with AIDP-specific options\n", "print(\"\\n\" + \"=\"*70)\n", "print(\"šŸ”„ EXECUTING MERGE OPERATION\")\n", "print(\"=\"*70)\n", "print(f\"\\nTarget Table: {full_target_table}\")\n", "print(f\"Records to merge: {df_merge_data.count()}\")\n", "print(\"\\nMERGE Options:\")\n", "print(' - write.mode: \"MERGE\"')\n", "print(' - write.merge.keys: \"order_id\" (order_id specific key constraint)')\n", "print(' - skip.oos.staging: \"true\" (skip object-storage Staging staging)')\n", "print(' - merge.staging.using.username: \"true\" (use session username)\\n')\n", "\n", "# Execute MERGE operation using DataFrame write API\n", "df_merge_data.write \\\n", " .option(\"write.mode\", \"MERGE\") \\\n", " .option(\"write.merge.keys\", \"order_id\") \\\n", " .option(\"skip.oos.staging\", \"true\") \\\n", " .option(\"merge.staging.using.username\", \"true\") \\\n", " .insertInto(full_target_table)\n", "\n", "print(\"āœ“ MERGE operation completed successfully\")\n", "print(\"āœ“ Updated 2 records (order_id: 1001, 1002)\")\n", "print(\"āœ“ Inserted 2 new records (order_id: 1005, 1006)\")" ] }, { "cell_type": "markdown", "id": "21ec0f53-df7c-44c7-a453-55e152069576", "metadata": {}, "source": [ "## Step 7: Verify MERGE Results" ] }, { "cell_type": "code", "execution_count": 42, "id": "b9034e72-0dd5-45c9-bcca-3ca4f8649f6e", "metadata": { "command_metadata": { "end_time": 1778780725511.737, "start_time": 1778780721271.7903 }, "execution": { "iopub.status.busy": "2026-05-14T17:45:26.619Z" }, "result_type": "result", "trusted": true, "type": "python" }, "outputs": [ { "data": { "text/plain": [ "\n", "======================================================================\n", "=== TABLE DATA AFTER MERGE OPERATION ===\n", "======================================================================\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Total records: 6\n", "\n", "Expected Results:\n", " - order_id 1001: Product updated to 'Laptop Pro', quantity to 3\n", " - order_id 1002: Product updated to 'Monitor 4K', quantity to 5\n", " - order_id 1003: Keyboard (unchanged from initial insert)\n", " - order_id 1004: Mouse (unchanged from initial insert)\n", " - order_id 1005: NEW - David Wilson, Headphones, quantity 8\n", " - order_id 1006: NEW - Emma Davis, Webcam, quantity 3\n", "\n", "Actual Results:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+---------------+-------------+------------+-------------+\n", "|order_id |customer_name|product_name|quantity |\n", "+---------------+-------------+------------+-------------+\n", "|1001.0000000000|John Smith |Laptop Pro |3.0000000000 |\n", "|1002.0000000000|Jane Doe |Monitor 4K |5.0000000000 |\n", "|1003.0000000000|Bob Johnson |Keyboard |5.0000000000 |\n", "|1004.0000000000|Alice Brown |Mouse |10.0000000000|\n", "|1005.0000000000|David Wilson |Headphones |8.0000000000 |\n", "|1006.0000000000|Emma Davis |Webcam |3.0000000000 |\n", "+---------------+-------------+------------+-------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Read the updated data from the table\n", "df_after_merge = spark.read.table(full_target_table)\n", "\n", "print(\"\\n\" + \"=\"*70)\n", "print(\"=== TABLE DATA AFTER MERGE OPERATION ===\")\n", "print(\"=\"*70)\n", "print(f\"\\nTotal records: {df_after_merge.count()}\")\n", "print(\"\\nExpected Results:\")\n", "print(\" - order_id 1001: Product updated to 'Laptop Pro', quantity to 3\")\n", "print(\" - order_id 1002: Product updated to 'Monitor 4K', quantity to 5\")\n", "print(\" - order_id 1003: Keyboard (unchanged from initial insert)\")\n", "print(\" - order_id 1004: Mouse (unchanged from initial insert)\")\n", "print(\" - order_id 1005: NEW - David Wilson, Headphones, quantity 8\")\n", "print(\" - order_id 1006: NEW - Emma Davis, Webcam, quantity 3\")\n", "print(\"\\nActual Results:\")\n", "df_after_merge.orderBy(\"order_id\").show(truncate=False)" ] }, { "cell_type": "markdown", "id": "dd0c9dc0-5d6f-473d-9fdc-e8427264c53f", "metadata": {}, "source": [ "## Key Takeaways\n", "\n", "### Create Table - SQL Statement:\n", "```sql\n", "CREATE TABLE IF NOT EXISTS coder.sales (\n", " order_id NUMBER NOT NULL,\n", " customer_name VARCHAR2(100),\n", " product_name VARCHAR2(100),\n", " quantity NUMBER\n", ")\n", "```\n", "\n", "### Initial Data Insert - Using Temporary View and SQL:\n", "```python\n", "# Create temporary view from DataFrame\n", "df_initial.createOrReplaceTempView(\"temp_sales_data\")\n", "\n", "# Insert using SQL\n", "insert_sql = f\"\"\"\n", " INSERT INTO {full_target_table}\n", " SELECT * FROM temp_sales_data\n", "\"\"\"\n", "spark.sql(insert_sql)\n", "```\n", "\n", "### MERGE Operation Behavior:\n", "- **write.mode = \"MERGE\"**: Enables the MERGE operation mode for insert/update logic\n", "- **write.merge.keys = \"ORDER_ID\"**: order_id is specific key constraint; all records are processed as-is \n", "- **All records in the DataFrame will be inserted or updated into the target table**\n", "- **Atomic Operation**: All INSERT and UPDATE operations happen as a single transaction\n", "\n", "### AIDP-Specific Options:\n", "- **skip.oos.staging**: Skips object-storage Staging for performance optimization\n", "- **merge.staging.using.username**: Uses the current session's username for staging operations\n", "\n", "### Spark DataFrame MERGE Syntax:\n", "```python\n", "df.write \\\n", " .option(\"write.mode\", \"MERGE\") \\\n", " .option(\"write.merge.keys\", \"order_id\") \\\n", " .option(\"skip.oos.staging\", \"true\") \\\n", " .option(\"merge.staging.using.username\", \"true\") \\\n", " .insertInto(full_target_table)\n", "```\n", "\n", "### Notebook Workflow:\n", "- **Step 1**: Define Target Table - Sets up table reference\n", "- **Step 2**: Create Initial DataFrame - Prepares sample data\n", "- **Step 3**: INSERT Initial Data - Inserts records via SQL\n", "- **Step 4**: Verify Inserted Data - Confirms insertion\n", "- **Step 5**: Create MERGE DataFrame - Prepares update/insert data\n", "- **Step 6**: Execute MERGE - Runs the MERGE operation\n", "- **Step 7**: Verify MERGE Results - Validates final state\n", "- **Step 8**: Summary Report - Generates statistics " ] } ], "metadata": { "Last_Active_Cell_Index": 14, "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "name": "python", "version": "3.8.0" } }, "nbformat": 4, "nbformat_minor": 5 }