{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Retail Analytics: Medallion Architecture Demo with Delta Liquid Clustering\n", "\n", "This notebook demonstrates the **Medallion Architecture** in Oracle AI Data Platform (AIDP) Workbench using a retail analytics use case with Delta Liquid Clustering for optimal performance.\n", "\n", "## Medallion Architecture Overview\n", "\n", "The Medallion Architecture organizes data into three layers:\n", "\n", "- **Bronze Layer**: Raw, unprocessed data as ingested from sources\n", "- **Silver Layer**: Cleaned, enriched, and standardized data\n", "- **Gold Layer**: Business-ready data with aggregations and ML features\n", "\n", "## Delta Liquid Clustering\n", "\n", "Liquid clustering automatically optimizes data layout for query performance without requiring manual partitioning or Z-Ordering.\n", "\n", "## Use Case: Customer Purchase Analytics with Churn Prediction\n", "\n", "We'll analyze customer purchase records from a retail company and build a churn prediction model using the medallion architecture.\n", "\n", "## AIDP Environment Setup\n", "\n", "This notebook leverages the existing Spark session in your AIDP environment." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 1: Create Retail Catalog and Medallion Schemas\n", "\n", "In AIDP, catalogs provide data isolation and governance. We'll create separate schemas for each medallion layer." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Retail catalog and medallion schemas (bronze, silver, gold) created successfully!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create retail catalog and medallion schemas\n", "# In AIDP, catalogs provide data isolation and governance\n", "\n", "spark.sql(\"CREATE CATALOG IF NOT EXISTS retail\")\n", "\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS retail.bronze\")\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS retail.silver\")\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS retail.gold\")\n", "\n", "print(\"Retail catalog and medallion schemas (bronze, silver, gold) created successfully!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Bronze Layer: Raw Data Ingestion\n", "\n", "### Table Design\n", "\n", "Our `customer_purchases_raw` table stores raw purchase data as ingested from source systems:\n", "\n", "- **customer_id**: Raw customer identifier\n", "- **purchase_date**: Raw purchase date\n", "- **product_id**: Raw product identifier\n", "- **product_category**: Raw category string\n", "- **purchase_amount**: Raw transaction amount\n", "- **store_id**: Raw store identifier\n", "- **payment_method**: Raw payment type\n", "- **ingestion_timestamp**: When data was ingested\n", "\n", "### Clustering Strategy\n", "\n", "We'll cluster by `customer_id` and `purchase_date` to optimize for:\n", "- Customer-specific queries\n", "- Time-based analysis\n", "- Purchase pattern analysis" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Bronze layer Delta table with liquid clustering created successfully!\n", "Clustering will automatically optimize data layout for queries on customer_id and purchase_date.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create Bronze layer Delta table with liquid clustering\n", "\n", "# CLUSTER BY defines the columns for automatic optimization\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS retail.bronze.customer_purchases_raw (\n", " customer_id STRING,\n", " purchase_date DATE,\n", " product_id STRING,\n", " product_category STRING,\n", " purchase_amount DECIMAL(10,2),\n", " store_id STRING,\n", " payment_method STRING,\n", " ingestion_timestamp TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (customer_id, purchase_date)\n", "\"\"\")\n", "\n", "print(\"Bronze layer Delta table with liquid clustering created successfully!\")\n", "print(\"Clustering will automatically optimize data layout for queries on customer_id and purchase_date.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 3: Generate and Ingest Raw Retail Data\n", "\n", "### Data Generation Strategy\n", "\n", "We'll create realistic raw retail purchase data including:\n", "\n", "- **1,000 customers** with multiple purchases over time\n", "- **Product categories**: Electronics, Clothing, Home & Garden, Books, Sports\n", "- **Realistic temporal patterns**: Seasonal shopping, repeat purchases\n", "- **Multiple stores**: Different retail locations\n", "- **Data quality issues**: Some missing values, inconsistent formats (simulating real-world data)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Generated 5475 raw customer purchase records (with simulated data quality issues)\n", "Sample record: {'customer_id': 'CUST000001', 'purchase_date': datetime.date(2024, 1, 12), 'product_id': 'BOK003', 'product_category': 'Books', 'purchase_amount': 19.58, 'store_id': 'STORE_MIA_005', 'payment_method': 'Digital Wallet', 'ingestion_timestamp': datetime.datetime(2026, 1, 2, 20, 28, 11, 313848)}\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Generate sample retail purchase data with some data quality issues\n", "# Using fully qualified imports to avoid conflicts\n", "\n", "import random\n", "from datetime import datetime, timedelta\n", "\n", "# Define retail data constants\n", "PRODUCTS = {\n", " \"Electronics\": [\n", " (\"ELE001\", \"Smartphone\", 599.99),\n", " (\"ELE002\", \"Laptop\", 1299.99),\n", " (\"ELE003\", \"Headphones\", 149.99),\n", " (\"ELE004\", \"Smart TV\", 799.99),\n", " (\"ELE005\", \"Tablet\", 399.99)\n", " ],\n", " \"Clothing\": [\n", " (\"CLO001\", \"T-Shirt\", 19.99),\n", " (\"CLO002\", \"Jeans\", 79.99),\n", " (\"CLO003\", \"Jacket\", 129.99),\n", " (\"CLO004\", \"Sneakers\", 89.99),\n", " (\"CLO005\", \"Dress\", 59.99)\n", " ],\n", " \"Home & Garden\": [\n", " (\"HOM001\", \"Blender\", 79.99),\n", " (\"HOM002\", \"Coffee Maker\", 49.99),\n", " (\"HOM003\", \"Garden Tools Set\", 39.99),\n", " (\"HOM004\", \"Bedding Set\", 89.99),\n", " (\"HOM005\", \"Decorative Pillow\", 24.99)\n", " ],\n", " \"Books\": [\n", " (\"BOK001\", \"Fiction Novel\", 14.99),\n", " (\"BOK002\", \"Cookbook\", 24.99),\n", " (\"BOK003\", \"Biography\", 19.99),\n", " (\"BOK004\", \"Self-Help Book\", 16.99),\n", " (\"BOK005\", \"Children's Book\", 9.99)\n", " ],\n", " \"Sports\": [\n", " (\"SPO001\", \"Yoga Mat\", 29.99),\n", " (\"SPO002\", \"Dumbbells\", 49.99),\n", " (\"SPO003\", \"Running Shoes\", 119.99),\n", " (\"SPO004\", \"Basketball\", 24.99),\n", " (\"SPO005\", \"Tennis Racket\", 89.99)\n", " ]\n", "}\n", "\n", "STORES = [\"STORE_NYC_001\", \"STORE_LAX_002\", \"STORE_CHI_003\", \"STORE_HOU_004\", \"STORE_MIA_005\"]\n", "PAYMENT_METHODS = [\"Credit Card\", \"Debit Card\", \"Cash\", \"Digital Wallet\", \"Buy Now Pay Later\"]\n", "\n", "# Generate customer purchase records with some data quality issues\n", "purchase_data = []\n", "base_date = datetime(2024, 1, 1)\n", "\n", "# Create 1,000 customers with 3-8 purchases each\n", "for customer_num in range(1, 1001):\n", " customer_id = f\"CUST{customer_num:06d}\"\n", " \n", " # Each customer gets 3-8 purchases over 12 months\n", " num_purchases = random.randint(3, 8)\n", " \n", " for i in range(num_purchases):\n", " # Spread purchases over 12 months\n", " days_offset = random.randint(0, 365)\n", " purchase_date = base_date + timedelta(days=days_offset)\n", " \n", " # Select random category and product\n", " category = random.choice(list(PRODUCTS.keys()))\n", " product_id, product_name, base_price = random.choice(PRODUCTS[category])\n", " \n", " # Add some price variation (±20%)\n", " price_variation = random.uniform(0.8, 1.2)\n", " purchase_amount = round(base_price * price_variation, 2)\n", " \n", " # Select random store and payment method\n", " store_id = random.choice(STORES)\n", " payment_method = random.choice(PAYMENT_METHODS)\n", " \n", " # Simulate data quality issues (5% chance)\n", " if random.random() < 0.05:\n", " # Introduce some missing or invalid data\n", " if random.random() < 0.3:\n", " purchase_amount = None # Missing amount\n", " elif random.random() < 0.5:\n", " product_category = None # Missing category\n", " else:\n", " payment_method = \"Unknown\" # Invalid payment method\n", " \n", " purchase_data.append({\n", " \"customer_id\": customer_id,\n", " \"purchase_date\": purchase_date.date(),\n", " \"product_id\": product_id,\n", " \"product_category\": category,\n", " \"purchase_amount\": purchase_amount,\n", " \"store_id\": store_id,\n", " \"payment_method\": payment_method,\n", " \"ingestion_timestamp\": datetime.now()\n", " })\n", "\n", "print(f\"Generated {len(purchase_data)} raw customer purchase records (with simulated data quality issues)\")\n", "print(\"Sample record:\", purchase_data[0])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Bronze Layer DataFrame Schema:\n", "root\n", " |-- customer_id: string (nullable = true)\n", " |-- ingestion_timestamp: timestamp (nullable = true)\n", " |-- payment_method: string (nullable = true)\n", " |-- product_category: string (nullable = true)\n", " |-- product_id: string (nullable = true)\n", " |-- purchase_amount: double (nullable = true)\n", " |-- purchase_date: date (nullable = true)\n", " |-- store_id: string (nullable = true)\n", "\n", "\n", "Sample Raw Data:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+--------------------+-----------------+----------------+----------+---------------+-------------+-------------+\n", "|customer_id| ingestion_timestamp| payment_method|product_category|product_id|purchase_amount|purchase_date| store_id|\n", "+-----------+--------------------+-----------------+----------------+----------+---------------+-------------+-------------+\n", "| CUST000001|2026-01-02 20:28:...| Digital Wallet| Books| BOK003| 19.58| 2024-01-12|STORE_MIA_005|\n", "| CUST000001|2026-01-02 20:28:...| Credit Card| Clothing| CLO003| 132.31| 2024-01-09|STORE_HOU_004|\n", "| CUST000001|2026-01-02 20:28:...| Digital Wallet| Clothing| CLO005| 64.72| 2024-02-02|STORE_NYC_001|\n", "| CUST000001|2026-01-02 20:28:...|Buy Now Pay Later| Clothing| CLO003| 145.23| 2024-01-22|STORE_HOU_004|\n", "| CUST000001|2026-01-02 20:28:...| Digital Wallet| Sports| SPO001| 26.21| 2024-10-14|STORE_LAX_002|\n", "+-----------+--------------------+-----------------+----------------+----------+---------------+-------------+-------------+\n", "only showing top 5 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Successfully inserted 5475 raw records into retail.bronze.customer_purchases_raw\n", "Bronze layer now contains raw, unprocessed data with potential quality issues.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Insert raw data into Bronze layer using PySpark\n", "\n", "# Create DataFrame from generated data\n", "df_purchases_raw = spark.createDataFrame(purchase_data)\n", "\n", "# Display schema and sample data\n", "print(\"Bronze Layer DataFrame Schema:\")\n", "df_purchases_raw.printSchema()\n", "\n", "print(\"\\nSample Raw Data:\")\n", "df_purchases_raw.show(5)\n", "\n", "# Insert data into Bronze table with liquid clustering\n", "df_purchases_raw.write.mode(\"overwrite\").saveAsTable(\"retail.bronze.customer_purchases_raw\")\n", "\n", "print(f\"\\nSuccessfully inserted {df_purchases_raw.count()} raw records into retail.bronze.customer_purchases_raw\")\n", "print(\"Bronze layer now contains raw, unprocessed data with potential quality issues.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Silver Layer: Data Cleansing and Enrichment\n", "\n", "### Transformation Logic\n", "\n", "The Silver layer transforms raw Bronze data into:\n", "\n", "- **Cleaned data**: Handle missing values, standardize formats\n", "- **Enriched data**: Add derived features and business logic\n", "- **Validated data**: Apply data quality rules\n", "\n", "### Silver Table Design\n", "\n", "`customer_purchases_clean` table includes:\n", "\n", "- All original fields (cleaned)\n", "- Data quality flags\n", "- Derived features (e.g., purchase_amount_category)\n", "- Standardized categories" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Silver layer Delta table created successfully!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create Silver layer table for cleaned and enriched data\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS retail.silver.customer_purchases_clean (\n", " customer_id STRING,\n", " purchase_date DATE,\n", " product_id STRING,\n", " product_category STRING,\n", " purchase_amount DECIMAL(10,2),\n", " store_id STRING,\n", " payment_method STRING,\n", " ingestion_timestamp TIMESTAMP,\n", " data_quality_score INT,\n", " purchase_amount_category STRING,\n", " is_high_value_customer BOOLEAN,\n", " processing_timestamp TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (customer_id, purchase_date)\n", "\"\"\")\n", "\n", "print(\"Silver layer Delta table created successfully!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Silver Layer Transformation Results:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+-------------+----------------+---------------+------------------+------------------------+\n", "|customer_id|purchase_date|product_category|purchase_amount|data_quality_score|purchase_amount_category|\n", "+-----------+-------------+----------------+---------------+------------------+------------------------+\n", "| CUST000183| 2024-01-07| Sports| 44.05| 100| Low|\n", "| CUST000183| 2024-09-02| Clothing| 19.22| 100| Low|\n", "| CUST000183| 2024-07-06| Clothing| 55.25| 100| Low|\n", "| CUST000183| 2024-07-14| Electronics| 664.13| 100| High|\n", "| CUST000184| 2024-12-13| Sports| 56.46| 100| Low|\n", "| CUST000184| 2024-12-15| Home & Garden| 52.62| 100| Low|\n", "| CUST000184| 2024-05-03| Clothing| 128.43| 100| Medium|\n", "| CUST000185| 2024-03-28| Clothing| 83.71| 100| Low|\n", "| CUST000185| 2024-12-14| Clothing| 22.79| 100| Low|\n", "| CUST000185| 2024-08-22| Sports| 22.8| 100| Low|\n", "+-----------+-------------+----------------+---------------+------------------+------------------------+\n", "only showing top 10 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Successfully transformed and inserted 5475 cleaned records into retail.silver.customer_purchases_clean\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Transform Bronze data to Silver layer with cleansing and enrichment\n", "\n", "from pyspark.sql.functions import col, when, lit, current_timestamp, expr\n", "\n", "# Read from Bronze layer\n", "bronze_df = spark.table(\"retail.bronze.customer_purchases_raw\")\n", "\n", "# Apply data cleansing and enrichment\n", "silver_df = bronze_df.withColumn(\n", " \"purchase_amount\",\n", " when(col(\"purchase_amount\").isNull(), 0.0).otherwise(col(\"purchase_amount\"))\n", ").withColumn(\n", " \"product_category\",\n", " when(col(\"product_category\").isNull(), \"Unknown\").otherwise(col(\"product_category\"))\n", ").withColumn(\n", " \"payment_method\",\n", " when(col(\"payment_method\") == \"Unknown\", \"Other\").otherwise(col(\"payment_method\"))\n", ").withColumn(\n", " \"data_quality_score\",\n", " when(\n", " (col(\"purchase_amount\").isNotNull()) & \n", " (col(\"product_category\") != \"Unknown\") & \n", " (col(\"payment_method\") != \"Other\"),\n", " 100\n", " ).otherwise(75)\n", ").withColumn(\n", " \"purchase_amount_category\",\n", " when(col(\"purchase_amount\") >= 500, \"High\")\n", " .when(col(\"purchase_amount\") >= 100, \"Medium\")\n", " .otherwise(\"Low\")\n", ").withColumn(\n", " \"is_high_value_customer\",\n", " lit(False) # Will be updated in Gold layer based on aggregations\n", ").withColumn(\n", " \"processing_timestamp\",\n", " current_timestamp()\n", ")\n", "\n", "# Show transformation results\n", "print(\"Silver Layer Transformation Results:\")\n", "silver_df.select(\n", " \"customer_id\", \"purchase_date\", \"product_category\", \n", " \"purchase_amount\", \"data_quality_score\", \"purchase_amount_category\"\n", ").show(10)\n", "\n", "# Write to Silver layer\n", "silver_df.write.mode(\"overwrite\").saveAsTable(\"retail.silver.customer_purchases_clean\")\n", "\n", "print(f\"\\nSuccessfully transformed and inserted {silver_df.count()} cleaned records into retail.silver.customer_purchases_clean\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Gold Layer: Business Analytics and ML Features\n", "\n", "### Gold Layer Purpose\n", "\n", "The Gold layer provides:\n", "\n", "- **Aggregated metrics** for business reporting\n", "- **Customer analytics** with lifetime value calculations\n", "- **ML-ready features** for predictive modeling\n", "\n", "### Tables in Gold Layer\n", "\n", "1. `customer_analytics`: Aggregated customer metrics\n", "2. `sales_analytics`: Business performance metrics\n", "3. `churn_prediction_features`: ML features for churn modeling" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Gold layer tables created successfully!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create Gold layer tables for business analytics\n", "\n", "# Customer analytics table\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS retail.gold.customer_analytics (\n", " customer_id STRING,\n", " total_purchases INT,\n", " total_spent DECIMAL(10,2),\n", " avg_purchase_value DECIMAL(10,2),\n", " purchase_variability DECIMAL(10,2),\n", " categories_purchased INT,\n", " stores_used INT,\n", " payment_methods_used INT,\n", " active_months INT,\n", " days_since_last_purchase INT,\n", " customer_tenure_days INT,\n", " avg_days_between_purchases DECIMAL(10,2),\n", " customer_segment STRING,\n", " lifetime_value DECIMAL(10,2),\n", " last_updated TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (customer_segment, customer_id)\n", "\"\"\")\n", "\n", "# Sales analytics table\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS retail.gold.sales_analytics (\n", " period STRING,\n", " total_transactions INT,\n", " total_revenue DECIMAL(10,2),\n", " avg_transaction_value DECIMAL(10,2),\n", " unique_customers INT,\n", " top_category STRING,\n", " top_store STRING,\n", " last_updated TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (period)\n", "\"\"\")\n", "\n", "print(\"Gold layer tables created successfully!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Customer analytics generated for 1000 customers\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+---------------+-----------+----------------+------------------------+\n", "|customer_id|total_purchases|total_spent|customer_segment|days_since_last_purchase|\n", "+-----------+---------------+-----------+----------------+------------------------+\n", "| CUST000184| 3| 237.51| Low Value| 383|\n", "| CUST000543| 4| 876.03| Medium Value| 503|\n", "| CUST000492| 3| 215.61| Low Value| 528|\n", "| CUST000217| 5| 272.42| Low Value| 427|\n", "| CUST000338| 8| 481.32| Low Value| 455|\n", "| CUST000435| 3| 867.26| Medium Value| 374|\n", "| CUST000406| 3| 1546.72| Medium Value| 370|\n", "| CUST000193| 3| 80.36| Low Value| 490|\n", "| CUST000331| 6| 651.72| Medium Value| 368|\n", "| CUST000422| 7| 580.87| Medium Value| 379|\n", "+-----------+---------------+-----------+----------------+------------------------+\n", "only showing top 10 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Generate Gold layer customer analytics from Silver data\n", "\n", "from pyspark.sql.functions import count, sum, avg, stddev, countDistinct, max, min, datediff, current_date, round\n", "\n", "silver_df = spark.table(\"retail.silver.customer_purchases_clean\")\n", "\n", "# Calculate customer-level aggregations\n", "customer_analytics = silver_df.groupBy(\"customer_id\").agg(\n", " count(\"*\").alias(\"total_purchases\"),\n", " round(sum(\"purchase_amount\"), 2).alias(\"total_spent\"),\n", " round(avg(\"purchase_amount\"), 2).alias(\"avg_purchase_value\"),\n", " round(stddev(\"purchase_amount\"), 2).alias(\"purchase_variability\"),\n", " countDistinct(\"product_category\").alias(\"categories_purchased\"),\n", " countDistinct(\"store_id\").alias(\"stores_used\"),\n", " countDistinct(\"payment_method\").alias(\"payment_methods_used\"),\n", " countDistinct(expr(\"DATE_FORMAT(purchase_date, 'yyyy-MM')\")).alias(\"active_months\"),\n", " datediff(current_date(), max(\"purchase_date\")).alias(\"days_since_last_purchase\"),\n", " datediff(current_date(), min(\"purchase_date\")).alias(\"customer_tenure_days\"),\n", " round(avg(\"purchase_amount\"), 2).alias(\"lifetime_value\"), # Simplified CLV\n", " current_timestamp().alias(\"last_updated\")\n", ").withColumn(\n", " \"customer_segment\",\n", " when(col(\"total_spent\") >= 2000, \"High Value\")\n", " .when(col(\"total_spent\") >= 500, \"Medium Value\")\n", " .otherwise(\"Low Value\")\n", ").withColumn(\n", " \"avg_days_between_purchases\",\n", " when(col(\"total_purchases\") > 1, \n", " round(col(\"customer_tenure_days\") / (col(\"total_purchases\") - 1), 2)\n", " ).otherwise(col(\"customer_tenure_days\"))\n", ")\n", "\n", "# Write to Gold layer\n", "customer_analytics.write.mode(\"overwrite\").saveAsTable(\"retail.gold.customer_analytics\")\n", "\n", "print(f\"Customer analytics generated for {customer_analytics.count()} customers\")\n", "customer_analytics.select(\n", " \"customer_id\", \"total_purchases\", \"total_spent\", \n", " \"customer_segment\", \"days_since_last_purchase\"\n", ").show(10)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Sales analytics generated by period\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-------+------------------+-------------+---------------------+----------------+--------------------+------------+-------------+\n", "| period|total_transactions|total_revenue|avg_transaction_value|unique_customers| last_updated|top_category| top_store|\n", "+-------+------------------+-------------+---------------------+----------------+--------------------+------------+-------------+\n", "|2024-09| 469| 82519.0| 175.95| 385|2026-01-02 20:30:...| Electronics|STORE_NYC_001|\n", "|2024-02| 399| 67535.15| 169.26| 325|2026-01-02 20:30:...| Electronics|STORE_NYC_001|\n", "|2024-08| 444| 68654.21| 154.63| 367|2026-01-02 20:30:...| Electronics|STORE_NYC_001|\n", "|2024-06| 457| 80212.57| 175.52| 367|2026-01-02 20:30:...| Electronics|STORE_NYC_001|\n", "|2024-12| 454| 86359.89| 190.22| 372|2026-01-02 20:30:...| Electronics|STORE_NYC_001|\n", "|2024-03| 456| 70601.22| 154.83| 377|2026-01-02 20:30:...| Electronics|STORE_NYC_001|\n", "|2024-05| 522| 85645.91| 164.07| 430|2026-01-02 20:30:...| Electronics|STORE_NYC_001|\n", "|2024-11| 470| 86633.08| 184.33| 375|2026-01-02 20:30:...| Electronics|STORE_NYC_001|\n", "|2024-10| 452| 80140.87| 177.3| 360|2026-01-02 20:30:...| Electronics|STORE_NYC_001|\n", "|2024-07| 425| 70450.22| 165.77| 361|2026-01-02 20:30:...| Electronics|STORE_NYC_001|\n", "|2024-04| 440| 74152.53| 168.53| 355|2026-01-02 20:30:...| Electronics|STORE_NYC_001|\n", "|2024-01| 487| 87135.12| 178.92| 400|2026-01-02 20:30:...| Electronics|STORE_NYC_001|\n", "+-------+------------------+-------------+---------------------+----------------+--------------------+------------+-------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Generate Gold layer sales analytics\n", "\n", "# Monthly sales analytics\n", "monthly_sales = silver_df.withColumn(\n", " \"period\", expr(\"DATE_FORMAT(purchase_date, 'yyyy-MM')\")\n", ").groupBy(\"period\").agg(\n", " count(\"*\").alias(\"total_transactions\"),\n", " round(sum(\"purchase_amount\"), 2).alias(\"total_revenue\"),\n", " round(avg(\"purchase_amount\"), 2).alias(\"avg_transaction_value\"),\n", " countDistinct(\"customer_id\").alias(\"unique_customers\"),\n", " current_timestamp().alias(\"last_updated\")\n", ").orderBy(\"period\")\n", "\n", "# Add top category and store for each period (simplified)\n", "category_sales = silver_df.withColumn(\n", " \"period\", expr(\"DATE_FORMAT(purchase_date, 'yyyy-MM')\")\n", ").groupBy(\"period\", \"product_category\").agg(\n", " sum(\"purchase_amount\").alias(\"category_revenue\")\n", ").orderBy(\"period\", col(\"category_revenue\").desc())\n", "\n", "# Get top category per period\n", "from pyspark.sql.window import Window\n", "window_spec = Window.partitionBy(\"period\").orderBy(col(\"category_revenue\").desc())\n", "top_categories = category_sales.withColumn(\n", " \"rank\", expr(\"row_number() over (partition by period order by category_revenue desc)\")\n", ").filter(\"rank = 1\").select(\"period\", \"product_category\")\n", "\n", "# Join with monthly sales\n", "sales_analytics = monthly_sales.join(\n", " top_categories, \"period\", \"left\"\n", ").withColumnRenamed(\"product_category\", \"top_category\").withColumn(\n", " \"top_store\", lit(\"STORE_NYC_001\") # Simplified - would calculate actual top store\n", ")\n", "\n", "# Write to Gold layer\n", "sales_analytics.write.mode(\"overwrite\").saveAsTable(\"retail.gold.sales_analytics\")\n", "\n", "print(\"Sales analytics generated by period\")\n", "sales_analytics.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 7: Machine Learning - Customer Churn Prediction\n", "\n", "### ML in Gold Layer\n", "\n", "Using the Gold layer customer analytics to train a churn prediction model:\n", "\n", "- **Target**: Predict customers at risk of churning\n", "- **Features**: Customer behavior metrics from Gold layer\n", "- **Model**: Random Forest Classifier\n", "- **Business Impact**: Enable proactive retention campaigns" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "ML dataset prepared with 1000 customer records\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+-----+\n", "|churn_risk|count|\n", "+----------+-----+\n", "| 1| 1000|\n", "+----------+-----+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Prepare ML features from Gold layer customer analytics\n", "\n", "from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler\n", "from pyspark.ml.classification import RandomForestClassifier\n", "from pyspark.ml.evaluation import BinaryClassificationEvaluator\n", "from pyspark.ml import Pipeline\n", "import pyspark.sql.functions as F\n", "\n", "# Read customer analytics from Gold layer\n", "customer_analytics_df = spark.table(\"retail.gold.customer_analytics\")\n", "\n", "# Create churn risk label (business logic)\n", "ml_features_df = customer_analytics_df.withColumn(\n", " \"churn_risk\",\n", " when(\n", " (col(\"days_since_last_purchase\") > 60) | \n", " (col(\"total_purchases\") < 4) | \n", " (col(\"avg_purchase_value\") < 50),\n", " 1\n", " ).otherwise(0)\n", ")\n", "\n", "print(f\"ML dataset prepared with {ml_features_df.count()} customer records\")\n", "ml_features_df.groupBy(\"churn_risk\").count().show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Training set: 838 customers\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Test set: 162 customers\n", "Training customer churn prediction model...\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Feature engineering and model training\n", "\n", "# Select features for the model\n", "feature_cols = [\n", " \"total_purchases\", \"total_spent\", \"avg_purchase_value\", \"purchase_variability\", \n", " \"categories_purchased\", \"stores_used\", \"payment_methods_used\", \n", " \"active_months\", \"days_since_last_purchase\", \"customer_tenure_days\", \n", " \"avg_days_between_purchases\"\n", "]\n", "\n", "# Handle missing values\n", "ml_features_df = ml_features_df.fillna(30, subset=['avg_days_between_purchases'])\n", "ml_features_df = ml_features_df.fillna(0, subset=['purchase_variability'])\n", "\n", "# Assemble features\n", "assembler = VectorAssembler(\n", " inputCols=feature_cols,\n", " outputCol=\"features\"\n", ")\n", "\n", "# Scale features\n", "scaler = StandardScaler(inputCol=\"features\", outputCol=\"scaled_features\")\n", "\n", "# Create and train the model\n", "rf = RandomForestClassifier(\n", " labelCol=\"churn_risk\", \n", " featuresCol=\"scaled_features\",\n", " numTrees=100,\n", " maxDepth=10\n", ")\n", "\n", "# Create pipeline\n", "pipeline = Pipeline(stages=[assembler, scaler, rf])\n", "\n", "# Split data\n", "train_data, test_data = ml_features_df.randomSplit([0.8, 0.2], seed=42)\n", "\n", "print(f\"Training set: {train_data.count()} customers\")\n", "print(f\"Test set: {test_data.count()} customers\")\n", "\n", "# Train the model\n", "print(\"Training customer churn prediction model...\")\n", "model = pipeline.fit(train_data)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Model AUC: 1.0000\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+---------------+-----------+----------+----------+-----------+\n", "|customer_id|total_purchases|total_spent|churn_risk|prediction|probability|\n", "+-----------+---------------+-----------+----------+----------+-----------+\n", "| CUST000003| 5| 172.23| 1| 1.0| [0.0,1.0]|\n", "| CUST000007| 8| 364.11| 1| 1.0| [0.0,1.0]|\n", "| CUST000009| 8| 579.61| 1| 1.0| [0.0,1.0]|\n", "| CUST000014| 5| 1090.84| 1| 1.0| [0.0,1.0]|\n", "| CUST000020| 3| 258.81| 1| 1.0| [0.0,1.0]|\n", "| CUST000024| 8| 1555.7| 1| 1.0| [0.0,1.0]|\n", "| CUST000030| 7| 1542.91| 1| 1.0| [0.0,1.0]|\n", "| CUST000036| 8| 451.76| 1| 1.0| [0.0,1.0]|\n", "| CUST000046| 8| 349.46| 1| 1.0| [0.0,1.0]|\n", "| CUST000047| 5| 1306.6| 1| 1.0| [0.0,1.0]|\n", "+-----------+---------------+-----------+----------+----------+-----------+\n", "only showing top 10 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+----------+-----+\n", "|churn_risk|prediction|count|\n", "+----------+----------+-----+\n", "| 1| 1.0| 162|\n", "+----------+----------+-----+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Business Impact Analysis:\n", "Total test customers: 162\n", "Customers predicted to be at churn risk: 162\n", "Percentage flagged for retention: 100.0%\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Average customer lifetime value: $857.03\n", "Potential revenue at risk: $138,838\n", "Estimated retention success rate: 35%\n", "Potential revenue saved: $48,593\n", "Retention program ROI: 1099.8%\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Model Performance:\n", "Accuracy: 1.0000\n", "Precision: 1.0000\n", "Recall: 1.0000\n", "AUC: 1.0000\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Model evaluation and business insights\n", "\n", "# Make predictions\n", "predictions = model.transform(test_data)\n", "\n", "# Evaluate the model\n", "evaluator = BinaryClassificationEvaluator(labelCol=\"churn_risk\", metricName=\"areaUnderROC\")\n", "auc = evaluator.evaluate(predictions)\n", "\n", "print(f\"Model AUC: {auc:.4f}\")\n", "\n", "# Show prediction results\n", "predictions.select(\n", " \"customer_id\", \"total_purchases\", \"total_spent\", \"churn_risk\", \n", " \"prediction\", \"probability\"\n", ").show(10)\n", "\n", "# Calculate confusion matrix\n", "confusion_matrix = predictions.groupBy(\"churn_risk\", \"prediction\").count()\n", "confusion_matrix.show()\n", "\n", "# Business impact analysis\n", "churn_predictions = predictions.filter(\"prediction = 1\")\n", "customers_at_risk = churn_predictions.count()\n", "total_test_customers = test_data.count()\n", "\n", "print(f\"\\nBusiness Impact Analysis:\")\n", "print(f\"Total test customers: {total_test_customers}\")\n", "print(f\"Customers predicted to be at churn risk: {customers_at_risk}\")\n", "print(f\"Percentage flagged for retention: {(customers_at_risk/total_test_customers)*100:.1f}%\")\n", "\n", "# Calculate potential revenue impact\n", "avg_customer_value = test_data.agg(F.avg(\"total_spent\")).collect()[0][0] or 0\n", "potential_lost_revenue = customers_at_risk * avg_customer_value\n", "\n", "print(f\"Average customer lifetime value: ${avg_customer_value:,.2f}\")\n", "print(f\"Potential revenue at risk: ${potential_lost_revenue:,.0f}\")\n", "\n", "# Retention program value\n", "retention_success_rate = 0.35\n", "avg_retention_cost = 25\n", "saved_revenue = (customers_at_risk * retention_success_rate) * avg_customer_value\n", "retention_roi = (saved_revenue - (customers_at_risk * avg_retention_cost)) / (customers_at_risk * avg_retention_cost) * 100 if customers_at_risk > 0 else 0\n", "\n", "print(f\"Estimated retention success rate: {retention_success_rate*100:.0f}%\")\n", "print(f\"Potential revenue saved: ${saved_revenue:,.0f}\")\n", "print(f\"Retention program ROI: {retention_roi:.1f}%\")\n", "\n", "# Model performance metrics\n", "accuracy = predictions.filter(\"churn_risk = prediction\").count() / predictions.count()\n", "precision = predictions.filter(\"prediction = 1 AND churn_risk = 1\").count() / predictions.filter(\"prediction = 1\").count() if predictions.filter(\"prediction = 1\").count() > 0 else 0\n", "recall = predictions.filter(\"prediction = 1 AND churn_risk = 1\").count() / predictions.filter(\"churn_risk = 1\").count() if predictions.filter(\"churn_risk = 1\").count() > 0 else 0\n", "\n", "print(f\"\\nModel Performance:\")\n", "print(f\"Accuracy: {accuracy:.4f}\")\n", "print(f\"Precision: {precision:.4f}\")\n", "print(f\"Recall: {recall:.4f}\")\n", "print(f\"AUC: {auc:.4f}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 8: Demonstrate Medallion Architecture Benefits\n", "\n", "### Query Performance Across Layers\n", "\n", "Let's demonstrate how each layer serves different analytical needs with optimized queries." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Bronze Layer: Raw Data Inspection ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+-------------+----------------+---------------+--------------------+-----------------+\n", "|customer_id|purchase_date|product_category|purchase_amount| ingestion_timestamp|data_quality_flag|\n", "+-----------+-------------+----------------+---------------+--------------------+-----------------+\n", "| CUST000001| 2024-01-09| Clothing| 132.31|2026-01-02 20:28:...| Valid|\n", "| CUST000001| 2024-01-12| Books| 19.58|2026-01-02 20:28:...| Valid|\n", "| CUST000001| 2024-01-22| Clothing| 145.23|2026-01-02 20:28:...| Valid|\n", "| CUST000001| 2024-02-02| Clothing| 64.72|2026-01-02 20:28:...| Valid|\n", "| CUST000001| 2024-10-14| Sports| 26.21|2026-01-02 20:28:...| Valid|\n", "+-----------+-------------+----------------+---------------+--------------------+-----------------+\n", "\n", "\n", "=== Silver Layer: Cleaned and Enriched Data ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+-------------+----------------+---------------+------------------+------------------------+--------------------+\n", "|customer_id|purchase_date|product_category|purchase_amount|data_quality_score|purchase_amount_category|processing_timestamp|\n", "+-----------+-------------+----------------+---------------+------------------+------------------------+--------------------+\n", "| CUST000001| 2024-01-09| Clothing| 132.31| 100| Medium|2026-01-02 20:28:...|\n", "| CUST000001| 2024-01-12| Books| 19.58| 100| Low|2026-01-02 20:28:...|\n", "| CUST000001| 2024-01-22| Clothing| 145.23| 100| Medium|2026-01-02 20:28:...|\n", "| CUST000001| 2024-02-02| Clothing| 64.72| 100| Low|2026-01-02 20:28:...|\n", "| CUST000001| 2024-10-14| Sports| 26.21| 100| Low|2026-01-02 20:28:...|\n", "+-----------+-------------+----------------+---------------+------------------+------------------------+--------------------+\n", "\n", "\n", "=== Gold Layer: Customer Analytics ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+---------------+-----------+----------------+------------------------+--------------+\n", "|customer_id|total_purchases|total_spent|customer_segment|days_since_last_purchase|lifetime_value|\n", "+-----------+---------------+-----------+----------------+------------------------+--------------+\n", "| CUST000747| 8| 4655.1| High Value| 406| 581.89|\n", "| CUST000263| 8| 4593.14| High Value| 369| 574.14|\n", "| CUST000280| 8| 4469.72| High Value| 410| 558.71|\n", "| CUST000128| 7| 3894.54| High Value| 382| 556.36|\n", "| CUST000808| 7| 3883.85| High Value| 373| 554.84|\n", "| CUST000289| 7| 3581.88| High Value| 415| 511.7|\n", "| CUST000992| 8| 3523.17| High Value| 537| 440.4|\n", "| CUST000800| 8| 3244.45| High Value| 400| 405.56|\n", "| CUST000053| 8| 3243.89| High Value| 409| 405.49|\n", "| CUST000865| 6| 3217.36| High Value| 367| 536.23|\n", "+-----------+---------------+-----------+----------------+------------------------+--------------+\n", "\n", "\n", "=== Gold Layer: Sales Analytics ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-------+------------------+-------------+----------------+------------+\n", "| period|total_transactions|total_revenue|unique_customers|top_category|\n", "+-------+------------------+-------------+----------------+------------+\n", "|2024-01| 487| 87135.12| 400| Electronics|\n", "|2024-02| 399| 67535.15| 325| Electronics|\n", "|2024-03| 456| 70601.22| 377| Electronics|\n", "|2024-04| 440| 74152.53| 355| Electronics|\n", "|2024-05| 522| 85645.91| 430| Electronics|\n", "|2024-06| 457| 80212.57| 367| Electronics|\n", "|2024-07| 425| 70450.22| 361| Electronics|\n", "|2024-08| 444| 68654.21| 367| Electronics|\n", "|2024-09| 469| 82519.0| 385| Electronics|\n", "|2024-10| 452| 80140.87| 360| Electronics|\n", "|2024-11| 470| 86633.08| 375| Electronics|\n", "|2024-12| 454| 86359.89| 372| Electronics|\n", "+-------+------------------+-------------+----------------+------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Demonstrate Bronze layer: Raw data inspection\n", "print(\"=== Bronze Layer: Raw Data Inspection ===\")\n", "spark.sql(\"\"\"\n", "SELECT customer_id, purchase_date, product_category, purchase_amount, \n", " ingestion_timestamp,\n", " CASE WHEN purchase_amount IS NULL THEN 'Missing Amount' \n", " WHEN product_category IS NULL THEN 'Missing Category'\n", " ELSE 'Valid' END as data_quality_flag\n", "FROM retail.bronze.customer_purchases_raw \n", "WHERE customer_id = 'CUST000001'\n", "ORDER BY purchase_date\n", "\"\"\").show()\n", "\n", "# Demonstrate Silver layer: Cleaned and enriched data\n", "print(\"\\n=== Silver Layer: Cleaned and Enriched Data ===\")\n", "spark.sql(\"\"\"\n", "SELECT customer_id, purchase_date, product_category, purchase_amount,\n", " data_quality_score, purchase_amount_category,\n", " processing_timestamp\n", "FROM retail.silver.customer_purchases_clean\n", "WHERE customer_id = 'CUST000001'\n", "ORDER BY purchase_date\n", "\"\"\").show()\n", "\n", "# Demonstrate Gold layer: Business analytics\n", "print(\"\\n=== Gold Layer: Customer Analytics ===\")\n", "spark.sql(\"\"\"\n", "SELECT customer_id, total_purchases, total_spent, customer_segment,\n", " days_since_last_purchase, lifetime_value\n", "FROM retail.gold.customer_analytics\n", "ORDER BY total_spent DESC\n", "LIMIT 10\n", "\"\"\").show()\n", "\n", "print(\"\\n=== Gold Layer: Sales Analytics ===\")\n", "spark.sql(\"\"\"\n", "SELECT period, total_transactions, total_revenue, unique_customers, top_category\n", "FROM retail.gold.sales_analytics\n", "ORDER BY period\n", "\"\"\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Key Takeaways: Medallion Architecture with Delta Liquid Clustering + ML\n", "\n", "### What We Demonstrated\n", "\n", "1. **Medallion Architecture**: Bronze (raw) → Silver (cleaned) → Gold (analytics/ML)\n", "2. **Delta Liquid Clustering**: Automatic optimization in each layer\n", "3. **Data Quality Improvement**: Progressive cleansing and enrichment\n", "4. **Business Value**: Analytics and ML for customer churn prediction\n", "\n", "### Layer Benefits\n", "\n", "- **Bronze**: Preserves raw data integrity, audit trail\n", "- **Silver**: Clean, standardized data for consistent analysis\n", "- **Gold**: Business-ready insights and ML features\n", "\n", "### AIDP Advantages\n", "\n", "- **Unified Platform**: Seamless data flow between layers\n", "- **Performance**: Liquid clustering optimizes each layer\n", "- **Governance**: Schema isolation and data quality controls\n", "- **ML Integration**: Direct path from data to predictive models\n", "\n", "### Business Impact\n", "\n", "1. **Data Quality**: Systematic approach to clean and validate data\n", "2. **Analytics Efficiency**: Faster insights with optimized structures\n", "3. **ML Readiness**: Features engineered for predictive modeling\n", "4. **Customer Retention**: Proactive churn prevention\n", "5. **Revenue Protection**: Data-driven business decisions\n", "\n", "### Best Practices\n", "\n", "1. **Layer Progression**: Always maintain clear Bronze → Silver → Gold flow\n", "2. **Data Quality**: Implement validation rules in Silver layer\n", "3. **Clustering Strategy**: Choose columns based on query patterns per layer\n", "4. **Schema Evolution**: Plan for changing business requirements\n", "5. **Governance**: Maintain data lineage and quality metrics\n", "\n", "### Next Steps\n", "\n", "- Implement real-time data ingestion into Bronze layer\n", "- Add more sophisticated data quality validations\n", "- Extend ML models (recommendation, lifetime value prediction)\n", "- Build automated pipelines for layer updates\n", "- Integrate with business intelligence tools\n", "\n", "This notebook demonstrates how Oracle AI Data Platform enables sophisticated data architectures while maintaining performance, governance, and analytical power." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "name": "python", "version": "3.8.5" } }, "nbformat": 4, "nbformat_minor": 4 }