{ "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "notebook" }, "language_info": { "file_extension": ".py", "mimetype": "text/x-python", "name": "python" }, "Last_Active_Cell_Index": 23 }, "nbformat_minor": 4, "nbformat": 4, "cells": [ { "cell_type": "markdown", "source": "# Delta Change Data Feed Demo\n\n## Overview\n\nThis notebook demonstrates **Delta Change Data Feed** in Oracle AI Data Platform (AIDP) Workbench. Change Data Feed allows you to capture and process row-level changes (inserts, updates, deletes) from Delta tables, enabling powerful use cases like:\n\n- **Change Data Capture (CDC)**: Track all changes for auditing and compliance\n- **Incremental Processing**: Process only changed data for efficiency\n- **Real-time Analytics**: Build streaming pipelines on table changes\n- **Data Replication**: Sync changes to downstream systems\n\n### What is Delta Change Data Feed?\n\nChange Data Feed is a Delta Lake feature that:\n- **Captures all changes**: Records inserts, updates, and deletes with timestamps\n- **Maintains history**: Keeps change history alongside current table state\n- **Enables efficient processing**: Allows reading only changed data\n- **Supports streaming**: Integrates with Spark Structured Streaming\n\n### Use Case: Customer Order Processing\n\nWe'll demonstrate change feed with an e-commerce order processing scenario:\n- Track customer orders and their lifecycle changes\n- Process order updates in real-time\n- Maintain audit trails for compliance\n- Enable incremental data pipelines\n\n### AIDP Environment Setup\n\nThis notebook leverages the existing Spark session in your AIDP environment.", "metadata": {} }, { "cell_type": "markdown", "source": "## Step 1: Create Catalog and Schema\n\n### Setup Strategy\n\nWe'll create a dedicated catalog and schema for our change feed demo:\n- **Catalog**: `retail` for data isolation\n- **Schema**: `orders` for order processing data\n\nThis follows AIDP best practices for data governance and organization.", "metadata": {} }, { "cell_type": "code", "source": "# Create retail catalog and orders schema\n# In AIDP, catalogs provide data isolation and governance\n\nspark.sql(\"CREATE CATALOG IF NOT EXISTS retail\")\nspark.sql(\"CREATE SCHEMA IF NOT EXISTS retail.orders\")\n\nprint(\"Retail catalog and orders schema created successfully!\")", "metadata": { "execution": { "iopub.status.busy": "2025-12-03T01:39:02.829Z" }, "trusted": true }, "outputs": [ { "data": { "text/plain": "Retail catalog and orders schema created successfully!\n" }, "metadata": {}, "output_type": "display_data" } ], "execution_count": 1 }, { "cell_type": "markdown", "source": "## Step 2: Create Delta Table with Change Data Feed Enabled\n\n### Table Design\n\nOur `orders` table will store:\n- **order_id**: Unique order identifier\n- **customer_id**: Customer who placed the order\n- **order_date**: Date the order was placed\n- **product_id**: Product being ordered\n- **quantity**: Quantity ordered\n- **unit_price**: Price per unit\n- **total_amount**: Total order amount\n- **status**: Order status (pending, confirmed, shipped, delivered, cancelled)\n- **last_updated**: Timestamp of last update\n\n### Enabling Change Data Feed\n\nTo enable change data feed, we use the table property:\n```sql\nTBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')\n```\n\nThis property must be set when creating the table - it cannot be altered later.", "metadata": {} }, { "cell_type": "code", "source": "# Create Delta table with change data feed enabled\n# TBLPROPERTIES enables change data feed for capturing all changes\n\nspark.sql(\"\"\"\nCREATE TABLE IF NOT EXISTS retail.orders.orders (\n order_id STRING,\n customer_id STRING,\n order_date DATE,\n product_id STRING,\n quantity INT,\n unit_price DECIMAL(10,2),\n total_amount DECIMAL(12,2),\n status STRING,\n last_updated TIMESTAMP\n)\nUSING DELTA TBLPROPERTIES (delta.enableChangeDataFeed = true)\n\"\"\")\n\nprint(\"Delta table with change data feed enabled created successfully!\")\nprint(\"All changes (inserts, updates, deletes) will now be captured automatically.\")", "metadata": { "execution": { "iopub.status.busy": "2025-12-03T01:39:15.384Z" }, "trusted": true }, "outputs": [ { "data": { "text/plain": "Delta table with change data feed enabled created successfully!\nAll changes (inserts, updates, deletes) will now be captured automatically.\n" }, "metadata": {}, "output_type": "display_data" } ], "execution_count": 1 }, { "cell_type": "markdown", "source": "## Step 3: Generate and Insert Initial Order Data\n\n### Data Generation Strategy\n\nWe'll create realistic e-commerce order data:\n- **1,000 customers** with multiple orders\n- **Various products** across different categories\n- **Realistic order statuses** and timestamps\n- **Historical data** spanning several months\n\nThis simulates a real e-commerce scenario where orders evolve over time.", "metadata": {} }, { "cell_type": "code", "source": "# Generate sample customer order data\n# Using fully qualified imports to avoid conflicts\n\nimport random\nfrom datetime import datetime, timedelta\n\n# Define e-commerce data constants\nPRODUCTS = [\n ('LAPTOP-001', 'Gaming Laptop', 1299.99),\n ('PHONE-001', 'Smartphone', 799.99),\n ('TABLET-001', 'Tablet', 499.99),\n ('HEADPHONES-001', 'Wireless Headphones', 199.99),\n ('MOUSE-001', 'Gaming Mouse', 79.99),\n ('KEYBOARD-001', 'Mechanical Keyboard', 149.99),\n ('MONITOR-001', '4K Monitor', 399.99),\n ('SPEAKERS-001', 'Bluetooth Speakers', 129.99)\n]\n\nSTATUSES = ['pending', 'confirmed', 'shipped', 'delivered']\n\n# Generate order data\norder_data = []\nbase_date = datetime(2024, 1, 1)\n\n# Create 1,000 customers with 1-5 orders each\nfor customer_num in range(1, 1001):\n customer_id = f\"CUST{customer_num:06d}\"\n \n # Each customer places 1-5 orders\n num_orders = random.randint(1, 5)\n \n for order_num in range(1, num_orders + 1):\n order_id = f\"ORDER{customer_num:06d}-{order_num:02d}\"\n \n # Random order date within the year\n days_offset = random.randint(0, 365)\n order_date = base_date + timedelta(days=days_offset)\n \n # Select random product\n product_id, product_name, unit_price = random.choice(PRODUCTS)\n \n # Random quantity (1-5)\n quantity = random.randint(1, 5)\n total_amount = round(unit_price * quantity, 2)\n \n # Random status\n status = random.choice(STATUSES)\n \n # Last updated timestamp (could be different from order date for status changes)\n update_offset = random.randint(0, 30) # Up to 30 days after order\n last_updated = order_date + timedelta(days=update_offset)\n \n order_data.append({\n \"order_id\": order_id,\n \"customer_id\": customer_id,\n \"order_date\": order_date.date(),\n \"product_id\": product_id,\n \"quantity\": int(quantity),\n \"unit_price\": float(unit_price),\n \"total_amount\": float(total_amount),\n \"status\": status,\n \"last_updated\": last_updated\n })\n\nprint(f\"Generated {len(order_data)} customer order records\")\nprint(\"Sample record:\", order_data[0])", "metadata": { "execution": { "iopub.status.busy": "2025-12-03T01:39:20.769Z" }, "trusted": true }, "outputs": [ { "data": { "text/plain": "Generated 3030 customer order records\nSample record: {'order_id': 'ORDER000001-01', 'customer_id': 'CUST000001', 'order_date': datetime.date(2024, 7, 21), 'product_id': 'TABLET-001', 'quantity': 2, 'unit_price': 499.99, 'total_amount': 999.98, 'status': 'delivered', 'last_updated': datetime.datetime(2024, 8, 8, 0, 0)}\n" }, "metadata": {}, "output_type": "display_data" } ], "execution_count": 1 }, { "cell_type": "code", "source": "# Insert initial data into the Delta table\n# This creates the first version of our table\nfrom pyspark.sql.functions import col\nfrom pyspark.sql.types import IntegerType, DecimalType\n\n# Create DataFrame from generated data\ndf_orders = spark.createDataFrame(order_data)\ndf_orders = df_orders.withColumn('quantity', col('quantity').cast(IntegerType()))\ndf_orders = df_orders.withColumn('total_amount', col('total_amount').cast(DecimalType(12,2)))\ndf_orders = df_orders.withColumn('unit_price', col('unit_price').cast(DecimalType(10,2)))\n\n# Display schema and sample data\nprint(\"DataFrame Schema:\")\ndf_orders.printSchema()\n\nprint(\"\\nSample Data:\")\ndf_orders.show(5)\n\n# Insert data into Delta table\n# This will create version 0 of the table\ndf_orders.write.mode(\"append\").saveAsTable(\"retail.orders.orders\")\n\nprint(f\"\\nSuccessfully inserted {df_orders.count()} records into retail.orders.orders\")\nprint(\"Change data feed is now capturing this initial data load.\")", "metadata": { "execution": { "iopub.status.busy": "2025-12-03T01:39:30.228Z" }, "trusted": true }, "outputs": [ { "data": { "text/plain": "DataFrame Schema:\nroot\n |-- customer_id: string (nullable = true)\n |-- last_updated: timestamp (nullable = true)\n |-- order_date: date (nullable = true)\n |-- order_id: string (nullable = true)\n |-- product_id: string (nullable = true)\n |-- quantity: integer (nullable = true)\n |-- status: string (nullable = true)\n |-- total_amount: decimal(12,2) (nullable = true)\n |-- unit_price: decimal(10,2) (nullable = true)\n\n\nSample Data:\n+-----------+-------------------+----------+--------------+--------------+--------+---------+------------+----------+\n|customer_id| last_updated|order_date| order_id| product_id|quantity| status|total_amount|unit_price|\n+-----------+-------------------+----------+--------------+--------------+--------+---------+------------+----------+\n| CUST000001|2024-08-08 00:00:00|2024-07-21|ORDER000001-01| TABLET-001| 2|delivered| 999.98| 499.99|\n| CUST000001|2024-10-26 00:00:00|2024-10-15|ORDER000001-02| SPEAKERS-001| 2|confirmed| 259.98| 129.99|\n| CUST000001|2024-12-24 00:00:00|2024-12-11|ORDER000001-03| LAPTOP-001| 1| pending| 1299.99| 1299.99|\n| CUST000002|2024-09-30 00:00:00|2024-09-27|ORDER000002-01|HEADPHONES-001| 1| pending| 199.99| 199.99|\n| CUST000002|2024-08-22 00:00:00|2024-07-31|ORDER000002-02| MONITOR-001| 5| pending| 1999.95| 399.99|\n+-----------+-------------------+----------+--------------+--------------+--------+---------+------------+----------+\nonly showing top 5 rows\n\n" }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": "\nSuccessfully inserted 3030 records into retail.orders.orders\nChange data feed is now capturing this initial data load.\n" }, "metadata": {}, "output_type": "display_data" } ], "execution_count": 1 }, { "cell_type": "markdown", "source": "## Step 4: Make Changes to Demonstrate Change Feed\n\n### Simulating Real-World Changes\n\nNow we'll simulate real e-commerce operations:\n1. **New orders**: Customers place additional orders\n2. **Status updates**: Orders progress through their lifecycle\n3. **Order cancellations**: Some orders are cancelled\n\nEach change will be captured by the change data feed.", "metadata": {} }, { "cell_type": "code", "source": "# Simulate new orders (INSERT operations)\nprint(\"=== Adding New Orders ===\")\n\nnew_orders = [\n {\n \"order_id\": \"ORDER01001-01\",\n \"customer_id\": \"CUST001001\",\n \"order_date\": datetime(2024, 12, 1).date(),\n \"product_id\": \"LAPTOP-001\",\n \"quantity\": 1,\n \"unit_price\": 1299.99,\n \"total_amount\": 1299.99,\n \"status\": \"pending\",\n \"last_updated\": datetime(2024, 12, 1, 10, 30)\n },\n {\n \"order_id\": \"ORDER01002-01\",\n \"customer_id\": \"CUST001002\",\n \"order_date\": datetime(2024, 12, 1).date(),\n \"product_id\": \"PHONE-001\",\n \"quantity\": 2,\n \"unit_price\": 799.99,\n \"total_amount\": 1599.98,\n \"status\": \"confirmed\",\n \"last_updated\": datetime(2024, 12, 1, 11, 15)\n }\n]\n\n# Insert new orders\ndf_new_orders = spark.createDataFrame(new_orders)\ndf_new_orders = df_new_orders.withColumn('quantity', col('quantity').cast(IntegerType()))\ndf_new_orders = df_new_orders.withColumn('total_amount', col('total_amount').cast(DecimalType(12,2)))\ndf_new_orders = df_new_orders.withColumn('unit_price', col('unit_price').cast(DecimalType(10,2)))\ndf_new_orders.write.mode(\"append\").saveAsTable(\"retail.orders.orders\")\nprint(\"Added 2 new orders\")", "metadata": { "execution": { "iopub.status.busy": "2025-12-03T01:39:39.680Z" }, "trusted": true }, "outputs": [ { "data": { "text/plain": "=== Adding New Orders ===\n" }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": "Added 2 new orders\n" }, "metadata": {}, "output_type": "display_data" } ], "execution_count": 1 }, { "cell_type": "code", "source": "# Simulate status updates (UPDATE operations)\nprint(\"=== Updating Order Statuses ===\")\n\n# Update some existing orders to new statuses\nspark.sql(\"\"\"\nUPDATE retail.orders.orders \nSET status = 'shipped', last_updated = '2024-12-02 14:30:00'\nWHERE order_id = 'ORDER000001-01'\n\"\"\")\n\nspark.sql(\"\"\"\nUPDATE retail.orders.orders \nSET status = 'delivered', last_updated = '2024-12-02 16:45:00'\nWHERE order_id = 'ORDER000002-01'\n\"\"\")\n\nspark.sql(\"\"\"\nUPDATE retail.orders.orders \nSET status = 'confirmed', last_updated = '2024-12-01 12:00:00'\nWHERE order_id IN ('ORDER000003-01', 'ORDER000004-01')\n\"\"\")\n\nprint(\"Updated order statuses (3 updates)\")", "metadata": { "execution": { "iopub.status.busy": "2025-12-03T01:40:01.350Z" }, "trusted": true }, "outputs": [ { "data": { "text/plain": "=== Updating Order Statuses ===\n" }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": "Updated order statuses (3 updates)\n" }, "metadata": {}, "output_type": "display_data" } ], "execution_count": 1 }, { "cell_type": "code", "source": "# Simulate order cancellations (DELETE operations)\nprint(\"=== Cancelling Orders (DELETE operations) ===\")\n\n# Cancel some orders (soft delete by setting status, or hard delete)\n# For demo purposes, we'll do hard deletes\nspark.sql(\"\"\"\nDELETE FROM retail.orders.orders \nWHERE order_id = 'ORDER000005-01'\n\"\"\")\n\nspark.sql(\"\"\"\nDELETE FROM retail.orders.orders \nWHERE order_id = 'ORDER000006-01'\n\"\"\")\n\nprint(\"Deleted 2 orders (simulating cancellations)\")\n\n# Check current table state\ncurrent_count = spark.sql(\"SELECT COUNT(*) as current_orders FROM retail.orders.orders\").collect()[0][0]\nprint(f\"Current total orders in table: {current_count}\")", "metadata": { "execution": { "iopub.status.busy": "2025-12-03T01:40:21.972Z" }, "trusted": true }, "outputs": [ { "data": { "text/plain": "=== Cancelling Orders (DELETE operations) ===\n" }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": "Deleted 2 orders (simulating cancellations)\n" }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": "Current total orders in table: 3030\n" }, "metadata": {}, "output_type": "display_data" } ], "execution_count": 1 }, { "cell_type": "code", "source": "# Now look at the history\nspark.sql(\"DESCRIBE HISTORY retail.orders.orders\").show()", "metadata": { "execution": { "iopub.status.busy": "2025-12-03T01:40:30.371Z" }, "trusted": true }, "outputs": [ { "data": { "text/plain": "+-------+-------------------+------+--------+------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+\n|version| timestamp|userId|userName| operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend| operationMetrics|userMetadata| engineInfo|\n+-------+-------------------+------+--------+------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+\n| 7|2025-12-03 01:40:14| NULL| NULL| DELETE|{predicate -> [\"(...|NULL| NULL| NULL| 6| Serializable| false|{numRemovedFiles ...| NULL|Apache-Spark/3.5....|\n| 6|2025-12-03 01:40:09| NULL| NULL| DELETE|{predicate -> [\"(...|NULL| NULL| NULL| 5| Serializable| false|{numRemovedFiles ...| NULL|Apache-Spark/3.5....|\n| 5|2025-12-03 01:39:57| NULL| NULL| UPDATE|{predicate -> [\"o...|NULL| NULL| NULL| 4| Serializable| false|{numRemovedFiles ...| NULL|Apache-Spark/3.5....|\n| 4|2025-12-03 01:39:52| NULL| NULL| UPDATE|{predicate -> [\"(...|NULL| NULL| NULL| 3| Serializable| false|{numRemovedFiles ...| NULL|Apache-Spark/3.5....|\n| 3|2025-12-03 01:39:47| NULL| NULL| UPDATE|{predicate -> [\"(...|NULL| NULL| NULL| 2| Serializable| false|{numRemovedFiles ...| NULL|Apache-Spark/3.5....|\n| 2|2025-12-03 01:39:36| NULL| NULL| WRITE|{mode -> Append, ...|NULL| NULL| NULL| 1| Serializable| true|{numFiles -> 3, n...| NULL|Apache-Spark/3.5....|\n| 1|2025-12-03 01:39:27| NULL| NULL| WRITE|{mode -> Append, ...|NULL| NULL| NULL| 0| Serializable| true|{numFiles -> 4, n...| NULL|Apache-Spark/3.5....|\n| 0|2025-12-03 01:39:10| NULL| NULL|CREATE TABLE|{partitionBy -> [...|NULL| NULL| NULL| NULL| Serializable| true| {}| NULL|Apache-Spark/3.5....|\n+-------+-------------------+------+--------+------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+\n\n" }, "metadata": {}, "output_type": "display_data" } ], "execution_count": 1 }, { "cell_type": "markdown", "source": "## Step 5: Reading the Change Data Feed\n\n### How Change Feed Works\n\nThe change data feed captures:\n- **_change_type**: INSERT, UPDATE_PREIMAGE, UPDATE_POSTIMAGE, DELETE\n- **_commit_version**: Delta table version when change occurred\n- **_commit_timestamp**: When the change was committed\n- All table columns with their values at the time of change\n\n### Reading Change Feed\n\nYou can read the change feed using:\n```python\nspark.read.format(\"delta\")\n .option(\"readChangeFeed\", \"true\")\n .table(\"table_name\")\n```\n\nFor historical changes, specify startingVersion:\n```python\n.option(\"startingVersion\", 0)\n```", "metadata": {} }, { "cell_type": "code", "source": "# Read the complete change data feed from the beginning\nprint(\"=== Complete Change Data Feed ===\")\n\nchanges_df = spark.read.format(\"delta\") \\\n .option(\"readChangeFeed\", \"true\") \\\n .option(\"startingVersion\", 0) \\\n .table(\"retail.orders.orders\")\n\n# Show change feed schema\nprint(\"Change Feed Schema:\")\nchanges_df.printSchema()\n\n# Display all changes\nprint(\"\\nAll Changes:\")\nchanges_df.orderBy(\"_commit_version\", \"order_id\").show(20, truncate=False)", "metadata": { "execution": { "iopub.status.busy": "2025-12-03T01:40:39.819Z" }, "trusted": true }, "outputs": [ { "data": { "text/plain": "=== Complete Change Data Feed ===\n" }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": "Change Feed Schema:\nroot\n |-- order_id: string (nullable = true)\n |-- customer_id: string (nullable = true)\n |-- order_date: date (nullable = true)\n |-- product_id: string (nullable = true)\n |-- quantity: integer (nullable = true)\n |-- unit_price: decimal(10,2) (nullable = true)\n |-- total_amount: decimal(12,2) (nullable = true)\n |-- status: string (nullable = true)\n |-- last_updated: timestamp (nullable = true)\n |-- _change_type: string (nullable = true)\n |-- _commit_version: long (nullable = true)\n |-- _commit_timestamp: timestamp (nullable = true)\n\n\nAll Changes:\n" }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": "+--------------+-----------+----------+--------------+--------+----------+------------+---------+-------------------+------------+---------------+-----------------------+\n|order_id |customer_id|order_date|product_id |quantity|unit_price|total_amount|status |last_updated |_change_type|_commit_version|_commit_timestamp |\n+--------------+-----------+----------+--------------+--------+----------+------------+---------+-------------------+------------+---------------+-----------------------+\n|ORDER000001-01|CUST000001 |2024-07-21|TABLET-001 |2 |499.99 |999.98 |delivered|2024-08-08 00:00:00|insert |1 |2025-12-03 01:39:27.062|\n|ORDER000001-02|CUST000001 |2024-10-15|SPEAKERS-001 |2 |129.99 |259.98 |confirmed|2024-10-26 00:00:00|insert |1 |2025-12-03 01:39:27.062|\n|ORDER000001-03|CUST000001 |2024-12-11|LAPTOP-001 |1 |1299.99 |1299.99 |pending |2024-12-24 00:00:00|insert |1 |2025-12-03 01:39:27.062|\n|ORDER000002-01|CUST000002 |2024-09-27|HEADPHONES-001|1 |199.99 |199.99 |pending |2024-09-30 00:00:00|insert |1 |2025-12-03 01:39:27.062|\n|ORDER000002-02|CUST000002 |2024-07-31|MONITOR-001 |5 |399.99 |1999.95 |pending |2024-08-22 00:00:00|insert |1 |2025-12-03 01:39:27.062|\n|ORDER000002-03|CUST000002 |2024-03-27|LAPTOP-001 |1 |1299.99 |1299.99 |shipped |2024-04-25 00:00:00|insert |1 |2025-12-03 01:39:27.062|\n|ORDER000003-01|CUST000003 |2024-06-24|LAPTOP-001 |4 |1299.99 |5199.96 |pending |2024-07-02 00:00:00|insert |1 |2025-12-03 01:39:27.062|\n|ORDER000004-01|CUST000004 |2024-09-27|PHONE-001 |1 |799.99 |799.99 |pending |2024-09-28 00:00:00|insert |1 |2025-12-03 01:39:27.062|\n|ORDER000004-02|CUST000004 |2024-08-12|PHONE-001 |5 |799.99 |3999.95 |confirmed|2024-08-18 00:00:00|insert |1 |2025-12-03 01:39:27.062|\n|ORDER000004-03|CUST000004 |2024-09-04|MONITOR-001 |4 |399.99 |1599.96 |delivered|2024-09-17 00:00:00|insert |1 |2025-12-03 01:39:27.062|\n|ORDER000004-04|CUST000004 |2024-01-27|LAPTOP-001 |5 |1299.99 |6499.95 |shipped |2024-01-30 00:00:00|insert |1 |2025-12-03 01:39:27.062|\n|ORDER000004-05|CUST000004 |2024-09-26|MOUSE-001 |2 |79.99 |159.98 |shipped |2024-10-08 00:00:00|insert |1 |2025-12-03 01:39:27.062|\n|ORDER000005-01|CUST000005 |2024-01-03|MONITOR-001 |3 |399.99 |1199.97 |confirmed|2024-01-07 00:00:00|insert |1 |2025-12-03 01:39:27.062|\n|ORDER000005-02|CUST000005 |2024-03-17|KEYBOARD-001 |5 |149.99 |749.95 |pending |2024-04-12 00:00:00|insert |1 |2025-12-03 01:39:27.062|\n|ORDER000005-03|CUST000005 |2024-04-12|MOUSE-001 |1 |79.99 |79.99 |shipped |2024-04-25 00:00:00|insert |1 |2025-12-03 01:39:27.062|\n|ORDER000006-01|CUST000006 |2024-04-17|SPEAKERS-001 |5 |129.99 |649.95 |pending |2024-04-23 00:00:00|insert |1 |2025-12-03 01:39:27.062|\n|ORDER000006-02|CUST000006 |2024-02-07|MOUSE-001 |3 |79.99 |239.97 |pending |2024-02-22 00:00:00|insert |1 |2025-12-03 01:39:27.062|\n|ORDER000006-03|CUST000006 |2024-04-18|MOUSE-001 |4 |79.99 |319.96 |shipped |2024-05-18 00:00:00|insert |1 |2025-12-03 01:39:27.062|\n|ORDER000006-04|CUST000006 |2024-11-11|TABLET-001 |3 |499.99 |1499.97 |pending |2024-12-06 00:00:00|insert |1 |2025-12-03 01:39:27.062|\n|ORDER000007-01|CUST000007 |2024-01-16|MONITOR-001 |4 |399.99 |1599.96 |delivered|2024-02-07 00:00:00|insert |1 |2025-12-03 01:39:27.062|\n+--------------+-----------+----------+--------------+--------+----------+------------+---------+-------------------+------------+---------------+-----------------------+\nonly showing top 20 rows\n\n" }, "metadata": {}, "output_type": "display_data" } ], "execution_count": 1 }, { "cell_type": "code", "source": "# Analyze changes by type\nprint(\"=== Changes by Type ===\")\n\nchange_summary = changes_df.groupBy(\"_change_type\").count().orderBy(\"_change_type\")\nchange_summary.show()\n\n# Show INSERT operations\nprint(\"\\nINSERT Operations:\")\nchanges_df.filter(\"_change_type = 'insert'\").select(\"order_id\", \"customer_id\", \"status\", \"total_amount\", \"_commit_version\").show(10)\n\n# Show UPDATE operations (both pre and post images)\nprint(\"\\nUPDATE Operations:\")\nchanges_df.filter(\"_change_type LIKE 'update%'\").select(\"order_id\", \"status\", \"_change_type\", \"_commit_version\").show(10)\n\n# Show DELETE operations\nprint(\"\\nDELETE Operations:\")\nchanges_df.filter(\"_change_type = 'delete'\").select(\"order_id\", \"customer_id\", \"status\", \"_commit_version\").show()", "metadata": { "execution": { "iopub.status.busy": "2025-12-03T01:40:53.323Z" }, "trusted": true }, "outputs": [ { "data": { "text/plain": "=== Changes by Type ===\n" }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": "+----------------+-----+\n| _change_type|count|\n+----------------+-----+\n| delete| 2|\n| insert| 3032|\n|update_postimage| 4|\n| update_preimage| 4|\n+----------------+-----+\n\n\nINSERT Operations:\n" }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": "+--------------+-----------+---------+------------+---------------+\n| order_id|customer_id| status|total_amount|_commit_version|\n+--------------+-----------+---------+------------+---------------+\n|ORDER000248-04| CUST000248| shipped| 599.96| 1|\n|ORDER000248-05| CUST000248|confirmed| 1999.95| 1|\n|ORDER000249-01| CUST000249| pending| 999.95| 1|\n|ORDER000250-01| CUST000250|confirmed| 1999.96| 1|\n|ORDER000250-02| CUST000250|confirmed| 399.95| 1|\n|ORDER000251-01| CUST000251|delivered| 239.97| 1|\n|ORDER000251-02| CUST000251|delivered| 1299.99| 1|\n|ORDER000252-01| CUST000252| pending| 199.99| 1|\n|ORDER000252-02| CUST000252|delivered| 2399.97| 1|\n|ORDER000252-03| CUST000252|delivered| 1499.97| 1|\n+--------------+-----------+---------+------------+---------------+\nonly showing top 10 rows\n\n\nUPDATE Operations:\n" }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": "+--------------+---------+----------------+---------------+\n| order_id| status| _change_type|_commit_version|\n+--------------+---------+----------------+---------------+\n|ORDER000003-01| pending| update_preimage| 5|\n|ORDER000003-01|confirmed|update_postimage| 5|\n|ORDER000004-01| pending| update_preimage| 5|\n|ORDER000004-01|confirmed|update_postimage| 5|\n|ORDER000002-01| pending| update_preimage| 4|\n|ORDER000002-01|delivered|update_postimage| 4|\n|ORDER000001-01|delivered| update_preimage| 3|\n|ORDER000001-01| shipped|update_postimage| 3|\n+--------------+---------+----------------+---------------+\n\n\nDELETE Operations:\n" }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": "+--------------+-----------+---------+---------------+\n| order_id|customer_id| status|_commit_version|\n+--------------+-----------+---------+---------------+\n|ORDER000005-01| CUST000005|confirmed| 6|\n|ORDER000006-01| CUST000006| pending| 7|\n+--------------+-----------+---------+---------------+\n\n" }, "metadata": {}, "output_type": "display_data" } ], "execution_count": 1 }, { "cell_type": "markdown", "source": "## Step 6: Processing Changes with a Batch Job\n\n### Batch Processing Strategy\n\nFor batch processing of changes, we can:\n1. Read change feed from a specific version\n2. Process changes (e.g., send notifications, update downstream systems)\n3. Track the last processed version\n\n### Use Case: Order Status Notifications\n\nWe'll simulate a batch job that processes order status changes to send notifications.", "metadata": {} }, { "cell_type": "code", "source": "# Get the latest table version\nlatest_version = spark.sql(\"DESCRIBE HISTORY retail.orders.orders\").select(\"version\").collect()[0][0]\nprint(f\"Latest table version: {latest_version}\")\n\n# Simulate batch processing starting from version 1 (after initial load)\n# In a real job, you'd store the last processed version in a checkpoint\nlast_processed_version = 1\n\nprint(f\"Processing changes from version {last_processed_version} to {latest_version}\")\n\n# Read incremental changes\nincremental_changes = spark.read.format(\"delta\") \\\n .option(\"readChangeFeed\", \"true\") \\\n .option(\"startingVersion\", last_processed_version) \\\n .table(\"retail.orders.orders\")\n\nprint(f\"Found {incremental_changes.count()} changes to process\")", "metadata": { "execution": { "iopub.status.busy": "2025-12-03T01:41:04.786Z" }, "trusted": true }, "outputs": [ { "data": { "text/plain": "Latest table version: 7\nProcessing changes from version 1 to 7\n" }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": "Found 3042 changes to process\n" }, "metadata": {}, "output_type": "display_data" } ], "execution_count": 1 }, { "cell_type": "code", "source": "# Process changes - simulate order status notifications\nprint(\"=== Processing Order Status Changes ===\")\n\n# Filter for status updates (update operations)\nstatus_updates = incremental_changes.filter(\"_change_type = 'update_postimage'\") \\\n .select(\"order_id\", \"customer_id\", \"status\", \"_commit_timestamp\")\n\n# Simulate sending notifications for status changes\nnotifications_sent = []\n\nfor row in status_updates.collect():\n notification = {\n \"order_id\": row.order_id,\n \"customer_id\": row.customer_id,\n \"new_status\": row.status,\n \"notification_time\": row._commit_timestamp,\n \"message\": f\"Order {row.order_id} status changed to {row.status}\"\n }\n notifications_sent.append(notification)\n print(f\"Notification: {notification['message']}\")\n\nprint(f\"\\nProcessed {len(notifications_sent)} status update notifications\")\n\n# In a real implementation, you'd update the checkpoint\n# spark.sql(f\"UPDATE checkpoint_table SET last_version = {latest_version}\")\nprint(f\"Checkpoint updated to version {latest_version}\")", "metadata": { "execution": { "iopub.status.busy": "2025-12-03T01:41:12.231Z" }, "trusted": true }, "outputs": [ { "data": { "text/plain": "=== Processing Order Status Changes ===\n" }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": "Notification: Order ORDER000003-01 status changed to confirmed\nNotification: Order ORDER000004-01 status changed to confirmed\nNotification: Order ORDER000002-01 status changed to delivered\nNotification: Order ORDER000001-01 status changed to shipped\n\nProcessed 4 status update notifications\nCheckpoint updated to version 7\n" }, "metadata": {}, "output_type": "display_data" } ], "execution_count": 1 }, { "cell_type": "markdown", "source": "## Step 7: Real-time Change Processing with Structured Streaming\n\n### Streaming Change Feed\n\nFor real-time processing, we can use Structured Streaming with change feed:\n- Continuously process new changes as they arrive\n- Enable real-time dashboards and alerts\n- Integrate with streaming analytics platforms\n\n### Streaming Query Setup\n\nWe'll create a streaming query that processes new changes in real-time.", "metadata": {} }, { "cell_type": "code", "source": "# Set up streaming change feed reader\nprint(\"=== Setting up Streaming Change Feed ===\")\n\n# Read change feed in streaming mode\nstreaming_changes = spark.readStream.format(\"delta\") \\\n .option(\"readChangeFeed\", \"true\") \\\n .option(\"startingVersion\", latest_version + 1) \\\n .table(\"retail.orders.orders\")\n\n# Define streaming processing logic\ndef process_batch(batch_df, batch_id):\n \"\"\"Process a batch of changes\"\"\"\n print(f\"Processing batch {batch_id} with {batch_df.count()} changes\")\n \n # Count changes by type\n change_counts = batch_df.groupBy(\"_change_type\").count()\n change_counts.show()\n \n # Process new orders\n new_orders = batch_df.filter(\"_change_type = 'insert'\")\n if new_orders.count() > 0:\n print(f\"New orders: {new_orders.count()}\")\n new_orders.select(\"order_id\", \"customer_id\", \"total_amount\").show()\n \n # Process status updates\n status_changes = batch_df.filter(\"_change_type = 'update_postimage'\")\n if status_changes.count() > 0:\n print(f\"Status changes: {status_changes.count()}\")\n status_changes.select(\"order_id\", \"status\").show()\n\n# Note: In a real scenario, you'd set up a proper streaming sink\n# For demo purposes, we'll show the streaming DataFrame setup\nprint(\"Streaming DataFrame created (would start query in production)\")\nprint(\"Schema:\")\nstreaming_changes.printSchema()", "metadata": { "execution": { "iopub.status.busy": "2025-12-03T01:41:19.688Z" }, "trusted": true }, "outputs": [ { "data": { "text/plain": "=== Setting up Streaming Change Feed ===\n" }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": "Streaming DataFrame created (would start query in production)\nSchema:\nroot\n |-- order_id: string (nullable = true)\n |-- customer_id: string (nullable = true)\n |-- order_date: date (nullable = true)\n |-- product_id: string (nullable = true)\n |-- quantity: integer (nullable = true)\n |-- unit_price: decimal(10,2) (nullable = true)\n |-- total_amount: decimal(12,2) (nullable = true)\n |-- status: string (nullable = true)\n |-- last_updated: timestamp (nullable = true)\n |-- _change_type: string (nullable = true)\n |-- _commit_version: long (nullable = true)\n |-- _commit_timestamp: timestamp (nullable = true)\n\n" }, "metadata": {}, "output_type": "display_data" } ], "execution_count": 1 }, { "cell_type": "markdown", "source": "## Step 8: Change Feed Analytics and Insights\n\n### Analyzing Change Patterns\n\nLet's analyze the change data to understand:\n- Change frequency and patterns\n- Order lifecycle analytics\n- Data quality and audit insights\n\n### Key Metrics\n\n- Total changes over time\n- Change type distribution\n- Order status transition patterns\n- Processing latency insights", "metadata": {} }, { "cell_type": "code", "source": "from pyspark.sql.functions import col, date_format, count\n\n# Analyze change patterns\nprint(\"=== Change Feed Analytics ===\")\n\n# Changes over time\nchanges_over_time = changes_df.groupBy(date_format(col(\"_commit_timestamp\"), \"yyyy-MM-dd\")).count().orderBy(date_format(col(\"_commit_timestamp\"), \"yyyy-MM-dd\"))\nchanges_over_time.show()\n\n# Change type distribution\nchange_types = changes_df.groupBy(\"_change_type\").count().orderBy(\"count\", ascending=False)\nprint(\"\\nChange Type Distribution:\")\nchange_types.show()\n\n# Simplified status analysis\ncurrent_statuses = spark.sql(\"SELECT status, COUNT(*) as count FROM retail.orders.orders GROUP BY status\")\nprint(\"Current Order Status Distribution:\")\ncurrent_statuses.show()\n\nchanges_df.createOrReplaceTempView(\"orders_v\")\nstatus_transitions = spark.sql( \"SELECT order_id, status, _commit_timestamp FROM orders_v WHERE _change_type IN ('insert', 'update_postimage') ORDER BY _commit_timestamp\")\nstatus_transitions.show()\n", "metadata": { "execution": { "iopub.status.busy": "2025-12-03T01:42:50.749Z" }, "trusted": true }, "outputs": [ { "data": { "text/plain": "=== Change Feed Analytics ===\n" }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": "+------------------------------------------+-----+\n|date_format(_commit_timestamp, yyyy-MM-dd)|count|\n+------------------------------------------+-----+\n| 2025-12-03| 3042|\n+------------------------------------------+-----+\n\n\nChange Type Distribution:\n" }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": "+----------------+-----+\n| _change_type|count|\n+----------------+-----+\n| insert| 3032|\n| update_preimage| 4|\n|update_postimage| 4|\n| delete| 2|\n+----------------+-----+\n\n" }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": "Current Order Status Distribution:\n" }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": "+---------+-----+\n| status|count|\n+---------+-----+\n| shipped| 725|\n|delivered| 790|\n| pending| 759|\n|confirmed| 756|\n+---------+-----+\n\n" }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": "+--------------+---------+--------------------+\n| order_id| status| _commit_timestamp|\n+--------------+---------+--------------------+\n|ORDER000248-04| shipped|2025-12-03 01:39:...|\n|ORDER000001-01|delivered|2025-12-03 01:39:...|\n|ORDER000248-05|confirmed|2025-12-03 01:39:...|\n|ORDER000001-02|confirmed|2025-12-03 01:39:...|\n|ORDER000249-01| pending|2025-12-03 01:39:...|\n|ORDER000001-03| pending|2025-12-03 01:39:...|\n|ORDER000250-01|confirmed|2025-12-03 01:39:...|\n|ORDER000002-01| pending|2025-12-03 01:39:...|\n|ORDER000250-02|confirmed|2025-12-03 01:39:...|\n|ORDER000002-02| pending|2025-12-03 01:39:...|\n|ORDER000251-01|delivered|2025-12-03 01:39:...|\n|ORDER000002-03| shipped|2025-12-03 01:39:...|\n|ORDER000251-02|delivered|2025-12-03 01:39:...|\n|ORDER000003-01| pending|2025-12-03 01:39:...|\n|ORDER000252-01| pending|2025-12-03 01:39:...|\n|ORDER000004-01| pending|2025-12-03 01:39:...|\n|ORDER000252-02|delivered|2025-12-03 01:39:...|\n|ORDER000004-02|confirmed|2025-12-03 01:39:...|\n|ORDER000252-03|delivered|2025-12-03 01:39:...|\n|ORDER000004-03|delivered|2025-12-03 01:39:...|\n+--------------+---------+--------------------+\nonly showing top 20 rows\n\n" }, "metadata": {}, "output_type": "display_data" } ], "execution_count": 1 }, { "cell_type": "markdown", "source": "## Key Takeaways: Delta Change Data Feed in AIDP\n\n### What We Demonstrated\n\n1. **Change Capture**: Enabled change feed on a Delta table to automatically capture all changes\n2. **Change Types**: Demonstrated INSERT, UPDATE, and DELETE change capture\n3. **Change Reading**: Read complete change history and incremental changes\n4. **Batch Processing**: Processed changes in batch mode for downstream systems\n5. **Streaming Setup**: Configured streaming change feed for real-time processing\n6. **Analytics**: Analyzed change patterns for insights and monitoring\n\n### AIDP Advantages\n\n- **Unified Platform**: Change feed integrates seamlessly with other AIDP services\n- **Performance**: Efficient change capture with minimal overhead\n- **Governance**: Catalog and schema isolation for change data\n- **Streaming**: Native integration with Spark Structured Streaming\n- **Audit Trail**: Complete change history for compliance and debugging\n\n### Best Practices for Change Data Feed\n\n1. **Enable at Creation**: Change feed must be enabled when creating the table\n2. **Version Management**: Track processed versions for incremental processing\n3. **Change Type Handling**: Handle INSERT, UPDATE_PREIMAGE, UPDATE_POSTIMAGE, DELETE appropriately\n4. **Performance**: Use startingVersion to avoid reprocessing old changes\n5. **Monitoring**: Monitor change volume and processing latency\n\n### Production Considerations\n\n- **Checkpointing**: Store last processed version reliably\n- **Error Handling**: Implement retry logic for failed processing\n- **Scaling**: Consider partitioning strategy for high-volume tables\n- **Retention**: Configure data retention policies for change history\n- **Security**: Apply appropriate permissions for change feed access\n\n### Next Steps\n\n- Explore integration with AIDP's streaming analytics\n- Implement change feed for existing tables (requires recreation)\n- Build end-to-end CDC pipelines to external systems\n- Monitor change feed performance at scale\n- Implement automated alerting for data quality issues\n\nThis notebook demonstrates how Oracle AI Data Platform makes change data capture accessible while maintaining enterprise-grade performance, governance, and reliability.", "metadata": {} } ] }