{ "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "notebook" }, "language_info": { "file_extension": ".py", "mimetype": "text/x-python", "name": "python" }, "Last_Active_Cell_Index": 27 }, "nbformat_minor": 4, "nbformat": 4, "cells": [ { "cell_type": "markdown", "source": "# Insurance Analytics: Medallion Architecture with Delta Liquid Clustering\n\n\n\n## Overview\n\n\n\nThis notebook demonstrates a complete **Medallion Architecture** implementation in Oracle AI Data Platform (AIDP) Workbench for insurance analytics, incorporating **Delta Liquid Clustering** for optimal performance. The medallion architecture organizes data into Bronze, Silver, and Gold layers for progressive data refinement and business value creation.\n\n\n\n### Medallion Architecture Overview\n\n\n\n**Bronze Layer**: Raw data ingestion with minimal processing - preserves original data fidelity\n\n\n\n**Silver Layer**: Cleaned, standardized, and enriched data - ready for analysis\n\n\n\n**Gold Layer**: Curated, aggregated data for analytics, ML-ready datasets, and business metrics\n\n\n\n### Delta Liquid Clustering\n\n\n\nLiquid clustering automatically identifies and groups similar data together based on clustering columns you define. This optimization happens automatically during data ingestion and maintenance operations, providing:\n\n\n\n- **Automatic optimization**: No manual tuning required\n\n- **Improved query performance**: Faster queries on clustered columns\n\n- **Reduced maintenance**: No need for manual repartitioning\n\n- **Adaptive clustering**: Adjusts as data patterns change\n\n\n\n### Use Case: Insurance Risk Assessment and Fraud Detection\n\n\n\nWe'll process insurance claim data through the medallion layers, optimizing each layer for specific analytical needs:\n\n\n\n- **Bronze**: Raw claim ingestion with liquid clustering on `policy_id` and `claim_date`\n\n- **Silver**: Data quality checks, standardization, and enrichment\n\n- **Gold**: Aggregated analytics and ML-ready datasets for fraud detection\n\n\n\n### AIDP Environment Setup\n\n\n\nThis notebook leverages the existing Spark session in your AIDP environment.", "metadata": {} }, { "cell_type": "code", "source": "# Setup: Create insurance catalog and schemas for medallion architecture\n\n# In AIDP, catalogs provide data isolation and governance\n\nspark.sql(\"CREATE CATALOG IF NOT EXISTS insurance\")\n\nspark.sql(\"CREATE SCHEMA IF NOT EXISTS insurance.bronze\")\n\nspark.sql(\"CREATE SCHEMA IF NOT EXISTS insurance.silver\")\n\nspark.sql(\"CREATE SCHEMA IF NOT EXISTS insurance.gold\")\n\nprint(\"Insurance catalog and bronze/silver/gold schemas created successfully!\")", "metadata": { "execution": { "iopub.status.busy": "2025-12-19T15:11:01.849Z" }, "trusted": true }, "outputs": [ { "data": { "text/plain": "Insurance catalog and bronze/silver/gold schemas created successfully!\n" }, "metadata": {}, "output_type": "display_data" } ], "execution_count": 1 }, { "cell_type": "markdown", "source": "# Bronze Layer: Raw Data Ingestion\n\n\n\n## Bronze Layer Purpose\n\n\n\nThe Bronze layer serves as the raw data ingestion point. Data is ingested with minimal transformation to preserve:\n\n\n\n- **Data fidelity**: Original data as received from source systems\n\n- **Auditability**: Complete historical record\n\n- **Reprocessing capability**: Ability to reprocess data if business rules change\n\n\n\n### Bronze Layer Characteristics\n\n\n\n- **Append-only**: New data is appended, existing data is never modified\n\n- **Raw format**: Minimal schema enforcement\n\n- **Timestamped**: Includes ingestion timestamps\n\n- **Optimized for ingestion**: Fast write performance", "metadata": {} }, { "cell_type": "markdown", "source": "## Step 1: Create Bronze Layer Table with Liquid Clustering\n\n\n\n### Table Design for Bronze Layer\n\n\n\nOur bronze `insurance_claims_raw` table will store raw claim data with:\n\n\n- **Raw fields**: All original data fields preserved\n- **Ingestion metadata**: Timestamps and source information\n- **Liquid clustering**: Optimized for `policy_id` and `ingestion_date` for efficient querying\n\n\n\n### Clustering Strategy for Bronze\n\n\nWe'll cluster by `policy_id` and `ingestion_date` because:\n\n\n- **policy_id**: Groups all claims for the same policy together\n- **ingestion_date**: Enables time-based data management and reprocessing\n- This combination optimizes for both policy analysis and temporal operations", "metadata": {} }, { "cell_type": "code", "source": "# Create Bronze layer Delta table with liquid clustering\n\n# CLUSTER BY defines the columns for automatic optimization\n\nspark.sql(\"\"\"\n\nCREATE TABLE IF NOT EXISTS insurance.bronze.insurance_claims_raw (\n\n -- Raw claim data fields\n\n policy_id STRING,\n\n claim_date TIMESTAMP,\n\n claim_type STRING,\n\n claim_amount DECIMAL(15,2),\n\n incident_type STRING,\n\n location STRING,\n\n fraud_score INT,\n\n \n\n -- Bronze layer metadata\n\n ingestion_timestamp TIMESTAMP,\n\n ingestion_date DATE,\n\n source_system STRING,\n\n batch_id STRING\n\n)\n\nUSING DELTA\n\nCLUSTER BY (policy_id, ingestion_date)\n\n\"\"\")\n\nprint(\"Bronze layer Delta table with liquid clustering created successfully!\")\n\nprint(\"Clustering will automatically optimize data layout for queries on policy_id and ingestion_date.\")", "metadata": { "execution": { "iopub.status.busy": "2025-12-19T15:24:11.315Z" }, "trusted": true }, "outputs": [ { "output_type": "display_data", "data": { "text/plain": "Bronze layer Delta table with liquid clustering created successfully!\nClustering will automatically optimize data layout for queries on policy_id and ingestion_date.\n" }, "metadata": {} } ], "execution_count": 11 }, { "cell_type": "markdown", "source": "## Step 2: Generate and Ingest Raw Insurance Data\n\n\n\n### Data Generation Strategy\n\n\nWe'll create realistic insurance claim data simulating multiple data sources:\n\n\n- **Multiple batches**: Simulating daily data loads\n- **Source systems**: Different claim processing systems\n- **Realistic patterns**: Seasonal variations, system-specific data quality\n\n\n\n### Bronze Layer Ingestion\n\n\nData is ingested in batches with:\n\n- **Batch IDs**: For tracking and reprocessing\n- **Source metadata**: System provenance\n- **Ingestion timestamps**: Processing audit trail", "metadata": {} }, { "cell_type": "code", "source": "# Generate sample insurance claim data for Bronze layer ingestion\n\n# Using fully qualified imports to avoid conflicts\n\nimport random\nimport uuid\nfrom datetime import datetime, timedelta\nfrom decimal import Decimal\n\n\n# Define insurance data constants\n\nCLAIM_TYPES = ['Auto', 'Home', 'Health', 'Life', 'Property']\nINCIDENT_TYPES = ['Accident', 'Theft', 'Natural Disaster', 'Illness', 'Fire', 'Flood', 'Collision', 'Medical Emergency']\nLOCATIONS = ['New York, NY', 'Los Angeles, CA', 'Chicago, IL', 'Houston, TX', 'Miami, FL', 'Denver, CO', 'Seattle, WA']\nSOURCE_SYSTEMS = ['Legacy_Claims', 'Mobile_App', 'Partner_API', 'Direct_Web']\n\n# Generate raw claim records for multiple batches\nraw_claim_data = []\nbase_date = datetime(2024, 1, 1)\n\n# Simulate 5 days of data ingestion\nfor batch_day in range(5):\n batch_date = base_date + timedelta(days=batch_day)\n batch_id = f\"BATCH_{batch_date.strftime('%Y%m%d')}\"\n \n # Each batch has different volumes from different sources\n for source_system in SOURCE_SYSTEMS:\n # Vary batch sizes by source system\n if source_system == 'Legacy_Claims':\n batch_size = random.randint(800, 1200)\n elif source_system == 'Mobile_App':\n batch_size = random.randint(200, 400)\n elif source_system == 'Partner_API':\n batch_size = random.randint(300, 600)\n else: # Direct_Web\n batch_size = random.randint(100, 300)\n \n for i in range(batch_size):\n policy_num = random.randint(1, 10000)\n policy_id = f\"POL{policy_num:08d}\"\n \n # Spread claims over time\n claim_days_offset = random.randint(0, 365)\n claim_hours_offset = random.randint(0, 23)\n claim_date = base_date + timedelta(days=claim_days_offset, hours=claim_hours_offset)\n \n # Select claim attributes\n claim_type = random.choice(CLAIM_TYPES)\n \n # Amount based on claim type (with some data quality variation by source)\n if claim_type == 'Auto':\n amount = round(random.uniform(1000, 50000), 2)\n elif claim_type == 'Home':\n amount = round(random.uniform(5000, 200000), 2)\n elif claim_type == 'Health':\n amount = round(random.uniform(500, 100000), 2)\n elif claim_type == 'Life':\n amount = round(random.uniform(10000, 500000), 2)\n else: # Property\n amount = round(random.uniform(2000, 150000), 2)\n \n # Add some data quality issues for realism\n if source_system == 'Legacy_Claims' and random.random() < 0.1:\n amount = amount * -1 # Negative amounts in legacy data\n \n incident_type = random.choice(INCIDENT_TYPES)\n location = random.choice(LOCATIONS)\n fraud_score = random.randint(0, 100)\n \n # Ingestion metadata\n ingestion_timestamp = batch_date + timedelta(hours=random.randint(0, 23), minutes=random.randint(0, 59))\n \n raw_claim_data.append({\n \"policy_id\": policy_id,\n \"claim_date\": claim_date,\n \"claim_type\": claim_type,\n \"claim_amount\": Decimal(amount),\n \"incident_type\": incident_type,\n \"location\": location,\n \"fraud_score\": fraud_score,\n \"ingestion_timestamp\": ingestion_timestamp,\n \"ingestion_date\": batch_date.date(),\n \"source_system\": source_system,\n \"batch_id\": batch_id\n })\n\nprint(f\"Generated {len(raw_claim_data)} raw insurance claim records across {len(SOURCE_SYSTEMS) * 5} batches\")\nprint(f\"Data sources: {SOURCE_SYSTEMS}\")\nif raw_claim_data:\n print(\"Sample record:\", raw_claim_data[0])", "metadata": { "execution": { "iopub.status.busy": "2025-12-19T15:24:17.152Z" }, "trusted": true }, "outputs": [ { "output_type": "display_data", "data": { "text/plain": "Generated 9666 raw insurance claim records across 20 batches\nData sources: ['Legacy_Claims', 'Mobile_App', 'Partner_API', 'Direct_Web']\nSample record: {'policy_id': 'POL00003296', 'claim_date': datetime.datetime(2024, 3, 6, 10, 0), 'claim_type': 'Home', 'claim_amount': Decimal('158287.3099999999976716935634613037109375'), 'incident_type': 'Fire', 'location': 'Los Angeles, CA', 'fraud_score': 52, 'ingestion_timestamp': datetime.datetime(2024, 1, 1, 17, 56), 'ingestion_date': datetime.date(2024, 1, 1), 'source_system': 'Legacy_Claims', 'batch_id': 'BATCH_20240101'}\n" }, "metadata": {} } ], "execution_count": 12 }, { "cell_type": "code", "source": "# Insert raw data into Bronze layer\nfrom pyspark.sql.functions import col\nfrom pyspark.sql.types import DecimalType, IntegerType\n\n# Create DataFrame from raw data\n\ndf_raw_claims = spark.createDataFrame(raw_claim_data)\n\n\nprint(\"Bronze Layer DataFrame Schema:\")\ndf_raw_claims.printSchema()\n\nprint(\"\\nSample Bronze Layer Data:\")\ndf_raw_claims.show(5)\n\ndf_raw_claims = df_raw_claims.withColumn(\"claim_amount\", col(\"claim_amount\").cast(DecimalType(15, 2)))\ndf_raw_claims = df_raw_claims.withColumn(\"fraud_score\", col(\"fraud_score\").cast(IntegerType()))\n\n# Insert data into Bronze layer with liquid clustering\n# The CLUSTER BY (policy_id, ingestion_date) will automatically optimize the data layout\n\ndf_raw_claims.write.mode(\"append\").saveAsTable(\"insurance.bronze.insurance_claims_raw\")\n\nprint(f\"\\nSuccessfully inserted {df_raw_claims.count()} raw records into insurance.bronze.insurance_claims_raw\")\nprint(\"Liquid clustering automatically optimized the data layout during ingestion!\")\n\n# Verify Bronze layer data\nbronze_count = spark.sql(\"SELECT COUNT(*) as total_records FROM insurance.bronze.insurance_claims_raw\").collect()[0][0]\nprint(f\"\\nBronze layer now contains {bronze_count} total records\")", "metadata": { "execution": { "iopub.status.busy": "2025-12-19T15:32:21.612Z" }, "trusted": true }, "outputs": [ { "output_type": "display_data", "data": { "text/plain": "Bronze Layer DataFrame Schema:\nroot\n |-- batch_id: string (nullable = true)\n |-- claim_amount: decimal(38,18) (nullable = true)\n |-- claim_date: timestamp (nullable = true)\n |-- claim_type: string (nullable = true)\n |-- fraud_score: long (nullable = true)\n |-- incident_type: string (nullable = true)\n |-- ingestion_date: date (nullable = true)\n |-- ingestion_timestamp: timestamp (nullable = true)\n |-- location: string (nullable = true)\n |-- policy_id: string (nullable = true)\n |-- source_system: string (nullable = true)\n\n\nSample Bronze Layer Data:\n+--------------+--------------------+-------------------+----------+-----------+-----------------+--------------+-------------------+---------------+-----------+-------------+\n| batch_id| claim_amount| claim_date|claim_type|fraud_score| incident_type|ingestion_date|ingestion_timestamp| location| policy_id|source_system|\n+--------------+--------------------+-------------------+----------+-----------+-----------------+--------------+-------------------+---------------+-----------+-------------+\n|BATCH_20240101|158287.3099999999...|2024-03-06 10:00:00| Home| 52| Fire| 2024-01-01|2024-01-01 17:56:00|Los Angeles, CA|POL00003296|Legacy_Claims|\n|BATCH_20240101|6409.020000000000...|2024-09-03 10:00:00| Property| 58| Collision| 2024-01-01|2024-01-01 20:15:00| New York, NY|POL00002752|Legacy_Claims|\n|BATCH_20240101|40442.55999999999...|2024-08-01 01:00:00| Auto| 19| Flood| 2024-01-01|2024-01-01 07:52:00| Miami, FL|POL00009202|Legacy_Claims|\n|BATCH_20240101|355334.0100000000...|2024-07-08 20:00:00| Life| 88|Medical Emergency| 2024-01-01|2024-01-01 23:17:00| New York, NY|POL00004756|Legacy_Claims|\n|BATCH_20240101|58358.94999999999...|2024-04-27 19:00:00| Property| 60| Theft| 2024-01-01|2024-01-01 11:00:00| Chicago, IL|POL00007849|Legacy_Claims|\n+--------------+--------------------+-------------------+----------+-----------+-----------------+--------------+-------------------+---------------+-----------+-------------+\nonly showing top 5 rows\n\n" }, "metadata": {} }, { "output_type": "display_data", "data": { "text/plain": "\nSuccessfully inserted 9666 raw records into insurance.bronze.insurance_claims_raw\nLiquid clustering automatically optimized the data layout during ingestion!\n" }, "metadata": {} }, { "output_type": "display_data", "data": { "text/plain": "\nBronze layer now contains 9666 total records\n" }, "metadata": {} } ], "execution_count": 17 }, { "cell_type": "markdown", "source": "# Silver Layer: Data Cleaning and Enrichment\n\n\n\n## Silver Layer Purpose\n\n\n\nThe Silver layer transforms raw Bronze data into clean, standardized, and enriched datasets:\n\n\n\n- **Data quality**: Validation, cleansing, and standardization\n- **Business rules**: Application of business logic and transformations\n- **Enrichment**: Addition of derived fields and external data\n- **Deduplication**: Removal of duplicate records\n\n\n\n### Silver Layer Characteristics\n\n\n- **Refined data**: Clean and standardized\n- **Business-aligned**: Optimized for analytical use cases\n- **Versioned**: Tracks transformation logic versions\n- **Queryable**: Supports both operational and analytical queries", "metadata": {} }, { "cell_type": "markdown", "source": "## Step 3: Create Silver Layer Table with Liquid Clustering\n\n\n\n### Silver Layer Design\n\n\n\nOur silver `insurance_claims_clean` table will include:\n\n\n- **Cleaned fields**: Validated and standardized data\n- **Derived fields**: Calculated metrics and categorizations\n- **Quality flags**: Data quality indicators\n- **Business metadata**: Processing information\n\n\n\n### Clustering Strategy for Silver\n\n\nWe'll cluster by `policy_id` and `claim_date` because:\n\n\n- **policy_id**: Enables fast policy-level analytics\n- **claim_date**: Optimizes time-series analysis and fraud detection\n- This combination supports both operational and analytical workloads", "metadata": {} }, { "cell_type": "code", "source": "# Create Silver layer Delta table with liquid clustering\n\nspark.sql(\"\"\"\n\nCREATE TABLE IF NOT EXISTS insurance.silver.insurance_claims_clean (\n\n -- Core claim data (cleaned and standardized)\n\n policy_id STRING,\n\n claim_date TIMESTAMP,\n\n claim_type STRING,\n\n claim_amount DECIMAL(15,2),\n\n incident_type STRING,\n\n location STRING,\n\n fraud_score INT,\n\n \n\n -- Derived and enriched fields\n\n claim_year INT,\n\n claim_month INT,\n\n claim_day_of_week INT,\n\n claim_hour INT,\n\n risk_category STRING,\n\n amount_category STRING,\n\n is_high_value_claim BOOLEAN,\n\n \n\n -- Data quality and processing metadata\n\n data_quality_score DOUBLE,\n\n processing_timestamp TIMESTAMP,\n\n silver_version STRING,\n\n source_system STRING,\n\n batch_id STRING\n\n)\n\nUSING DELTA\n\nCLUSTER BY (policy_id, claim_date)\n\n\"\"\")\n\nprint(\"Silver layer Delta table with liquid clustering created successfully!\")\nprint(\"Clustering will optimize for policy-based and temporal analytical queries.\")", "metadata": { "trusted": true, "execution": { "iopub.status.busy": "2025-12-19T15:32:40.243Z" } }, "outputs": [ { "output_type": "display_data", "data": { "text/plain": "Silver layer Delta table with liquid clustering created successfully!\nClustering will optimize for policy-based and temporal analytical queries.\n" }, "metadata": {} } ], "execution_count": 18 }, { "cell_type": "markdown", "source": "## Step 4: Transform Bronze to Silver Layer\n\n\n\n### Data Transformation Logic\n\n\nTransformations applied from Bronze to Silver:\n\n\n1. **Data Quality Checks**: Validate required fields and data ranges\n2. **Standardization**: Normalize formats and values\n3. **Deduplication**: Remove duplicate records based on business keys\n4. **Enrichment**: Add derived fields and categorizations\n5. **Business Rules**: Apply insurance-specific logic\n\n\n\n### Quality Assurance\n\n\n- **Completeness**: Ensure all required fields are present\n- **Accuracy**: Validate data ranges and formats\n- **Consistency**: Standardize categorical values\n- **Timeliness**: Check for reasonable date ranges", "metadata": {} }, { "cell_type": "code", "source": "# Transform Bronze data to Silver layer\n\nfrom pyspark.sql.functions import (\n col, when, year, month, dayofweek, hour, \n abs, round, concat, lit, current_timestamp\n)\nfrom pyspark.sql.window import Window\nfrom pyspark.sql import functions as F\n\n# Read Bronze layer data\ndf_bronze = spark.read.table(\"insurance.bronze.insurance_claims_raw\")\n\nprint(f\"Processing {df_bronze.count()} records from Bronze layer\")\n\n# Apply data quality and transformation logic\ndf_silver = df_bronze \\\n .withColumn(\"claim_amount\", abs(col(\"claim_amount\"))) \\\n .withColumn(\"claim_year\", year(col(\"claim_date\"))) \\\n .withColumn(\"claim_month\", month(col(\"claim_date\"))) \\\n .withColumn(\"claim_day_of_week\", dayofweek(col(\"claim_date\"))) \\\n .withColumn(\"claim_hour\", hour(col(\"claim_date\"))) \\\n .withColumn(\"risk_category\", \n when(col(\"fraud_score\") >= 80, \"Very High Risk\")\n .when(col(\"fraud_score\") >= 60, \"High Risk\")\n .when(col(\"fraud_score\") >= 40, \"Medium Risk\")\n .when(col(\"fraud_score\") >= 20, \"Low Risk\")\n .otherwise(\"Very Low Risk\")\n ) \\\n .withColumn(\"amount_category\",\n when(col(\"claim_amount\") >= 100000, \"High Value\")\n .when(col(\"claim_amount\") >= 25000, \"Medium Value\")\n .otherwise(\"Low Value\")\n ) \\\n .withColumn(\"is_high_value_claim\", col(\"claim_amount\") >= 50000) \\\n .withColumn(\"data_quality_score\", \n when(col(\"policy_id\").isNotNull() & \n col(\"claim_date\").isNotNull() & \n (col(\"claim_amount\") > 0), 1.0)\n .otherwise(0.5)\n ) \\\n .withColumn(\"processing_timestamp\", current_timestamp()) \\\n .withColumn(\"silver_version\", lit(\"v1.0\"))\n\n# Remove duplicates based on policy_id, claim_date, claim_type, claim_amount\nwindow_spec = Window.partitionBy(\"policy_id\", \"claim_date\", \"claim_type\", \"claim_amount\") \\\n .orderBy(col(\"ingestion_timestamp\").desc())\n\ndf_silver_deduped = df_silver \\\n .withColumn(\"row_num\", F.row_number().over(window_spec)) \\\n .filter(col(\"row_num\") == 1) \\\n .drop(\"row_num\")\n\n# Select final Silver layer columns\ndf_silver_final = df_silver_deduped.select(\n \"policy_id\", \"claim_date\", \"claim_type\", \"claim_amount\",\n \"incident_type\", \"location\", \"fraud_score\",\n \"claim_year\", \"claim_month\", \"claim_day_of_week\", \"claim_hour\",\n \"risk_category\", \"amount_category\", \"is_high_value_claim\",\n \"data_quality_score\", \"processing_timestamp\", \"silver_version\",\n \"source_system\", \"batch_id\"\n)\n\nprint(f\"After transformations: {df_silver_final.count()} clean records\")\nprint(\"\\nSilver Layer Schema:\")\ndf_silver_final.printSchema()\n\nprint(\"\\nSample Silver Layer Data:\")\ndf_silver_final.show(5)", "metadata": { "trusted": true, "execution": { "iopub.status.busy": "2025-12-19T15:34:31.534Z" } }, "outputs": [ { "output_type": "display_data", "data": { "text/plain": "Processing 9666 records from Bronze layer\n" }, "metadata": {} }, { "output_type": "display_data", "data": { "text/plain": "After transformations: 9666 clean records\n\nSilver Layer Schema:\nroot\n |-- policy_id: string (nullable = true)\n |-- claim_date: timestamp (nullable = true)\n |-- claim_type: string (nullable = true)\n |-- claim_amount: decimal(15,2) (nullable = true)\n |-- incident_type: string (nullable = true)\n |-- location: string (nullable = true)\n |-- fraud_score: integer (nullable = true)\n |-- claim_year: integer (nullable = true)\n |-- claim_month: integer (nullable = true)\n |-- claim_day_of_week: integer (nullable = true)\n |-- claim_hour: integer (nullable = true)\n |-- risk_category: string (nullable = false)\n |-- amount_category: string (nullable = false)\n |-- is_high_value_claim: boolean (nullable = true)\n |-- data_quality_score: double (nullable = false)\n |-- processing_timestamp: timestamp (nullable = false)\n |-- silver_version: string (nullable = false)\n |-- source_system: string (nullable = true)\n |-- batch_id: string (nullable = true)\n\n\nSample Silver Layer Data:\n" }, "metadata": {} }, { "output_type": "display_data", "data": { "text/plain": "+-----------+-------------------+----------+------------+-------------+---------------+-----------+----------+-----------+-----------------+----------+-------------+---------------+-------------------+------------------+--------------------+--------------+-------------+--------------+\n| policy_id| claim_date|claim_type|claim_amount|incident_type| location|fraud_score|claim_year|claim_month|claim_day_of_week|claim_hour|risk_category|amount_category|is_high_value_claim|data_quality_score|processing_timestamp|silver_version|source_system| batch_id|\n+-----------+-------------------+----------+------------+-------------+---------------+-----------+----------+-----------+-----------------+----------+-------------+---------------+-------------------+------------------+--------------------+--------------+-------------+--------------+\n|POL00000001|2024-10-31 17:00:00| Auto| 30735.20| Fire|Los Angeles, CA| 8| 2024| 10| 5| 17|Very Low Risk| Medium Value| false| 1.0|2025-12-19 15:34:...| v1.0| Partner_API|BATCH_20240103|\n|POL00000002|2024-07-28 22:00:00| Life| 146227.66| Theft| New York, NY| 33| 2024| 7| 1| 22| Low Risk| High Value| true| 1.0|2025-12-19 15:34:...| v1.0|Legacy_Claims|BATCH_20240105|\n|POL00000003|2024-07-30 19:00:00| Health| 11313.85| Collision|Los Angeles, CA| 8| 2024| 7| 3| 19|Very Low Risk| Low Value| false| 1.0|2025-12-19 15:34:...| v1.0| Mobile_App|BATCH_20240102|\n|POL00000003|2024-10-23 23:00:00| Auto| 20968.79| Illness| Miami, FL| 1| 2024| 10| 4| 23|Very Low Risk| Low Value| false| 1.0|2025-12-19 15:34:...| v1.0| Partner_API|BATCH_20240101|\n|POL00000004|2024-05-25 10:00:00| Auto| 21857.29| Flood| Seattle, WA| 72| 2024| 5| 7| 10| High Risk| Low Value| false| 1.0|2025-12-19 15:34:...| v1.0| Partner_API|BATCH_20240101|\n+-----------+-------------------+----------+------------+-------------+---------------+-----------+----------+-----------+-----------------+----------+-------------+---------------+-------------------+------------------+--------------------+--------------+-------------+--------------+\nonly showing top 5 rows\n\n" }, "metadata": {} } ], "execution_count": 20 }, { "cell_type": "code", "source": "# Insert transformed data into Silver layer\n\ndf_silver_final.write.mode(\"overwrite\").saveAsTable(\"insurance.silver.insurance_claims_clean\")\n\nprint(f\"Successfully inserted {df_silver_final.count()} cleaned records into insurance.silver.insurance_claims_clean\")\n\n# Verify Silver layer data\nsilver_count = spark.sql(\"SELECT COUNT(*) as total_records FROM insurance.silver.insurance_claims_clean\").collect()[0][0]\nquality_stats = spark.sql(\"\"\"\n SELECT \n ROUND(AVG(data_quality_score), 3) as avg_quality_score,\n COUNT(*) as total_claims,\n COUNT(DISTINCT policy_id) as unique_policies\n FROM insurance.silver.insurance_claims_clean\n\"\"\").collect()[0]\n\nprint(f\"\\nSilver layer verification:\")\nprint(f\"- Total records: {silver_count}\")\nprint(f\"- Average data quality score: {quality_stats[0]}\")\nprint(f\"- Unique policies: {quality_stats[2]}\")", "metadata": { "trusted": true, "execution": { "iopub.status.busy": "2025-12-19T15:35:02.865Z" } }, "outputs": [ { "output_type": "display_data", "data": { "text/plain": "Successfully inserted 9666 cleaned records into insurance.silver.insurance_claims_clean\n" }, "metadata": {} }, { "output_type": "display_data", "data": { "text/plain": "\nSilver layer verification:\n- Total records: 9666\n- Average data quality score: 1.0\n- Unique policies: 6228\n" }, "metadata": {} } ], "execution_count": 21 }, { "cell_type": "markdown", "source": "# Gold Layer: Analytics and ML-Ready Data\n\n\n\n## Gold Layer Purpose\n\n\n\nThe Gold layer provides curated, aggregated datasets optimized for:\n\n\n- **Business intelligence**: Pre-aggregated metrics and KPIs\n- **Machine learning**: Feature engineering and model-ready datasets\n- **Real-time analytics**: Optimized for dashboard and reporting queries\n- **Data products**: Clean, governed datasets for downstream consumption\n\n\n\n### Gold Layer Characteristics\n\n\n- **Aggregated data**: Summarized metrics and KPIs\n- **ML-ready**: Feature-engineered datasets for predictive modeling\n- **Optimized**: Indexed and partitioned for query performance\n- **Governed**: Well-documented and versioned", "metadata": {} }, { "cell_type": "markdown", "source": "## Step 5: Create Gold Layer Analytics Tables\n\n\n\n### Gold Layer Design\n\n\n\nWe'll create two Gold layer tables:\n\n\n1. **Claim Analytics**: Aggregated business metrics\n2. **Fraud ML Dataset**: Feature-engineered data for fraud detection models\n\n\nBoth tables will use liquid clustering optimized for their specific query patterns.", "metadata": {} }, { "cell_type": "code", "source": "# Create Gold layer analytics table\n\nspark.sql(\"\"\"\n\nCREATE TABLE IF NOT EXISTS insurance.gold.claim_analytics (\n\n -- Aggregation dimensions\n\n claim_year INT,\n\n claim_month INT,\n\n claim_type STRING,\n\n incident_type STRING,\n\n location STRING,\n\n risk_category STRING,\n\n \n\n -- Aggregated metrics\n\n total_claims BIGINT,\n\n total_claim_amount DECIMAL(20,2),\n\n avg_claim_amount DECIMAL(15,2),\n\n avg_fraud_score DOUBLE,\n\n high_value_claims BIGINT,\n\n unique_policies BIGINT,\n\n \n\n -- Processing metadata\n\n processing_date DATE,\n\n gold_version STRING\n\n)\n\nUSING DELTA\n\nCLUSTER BY (claim_year, claim_month, claim_type)\n\n\"\"\")\n\nprint(\"Gold layer analytics table created successfully!\")\n\n# Create Gold layer ML-ready dataset\n\nspark.sql(\"\"\"\n\nCREATE TABLE IF NOT EXISTS insurance.gold.fraud_ml_dataset (\n\n -- Core features\n\n policy_id STRING,\n\n claim_date TIMESTAMP,\n\n claim_amount DECIMAL(15,2),\n\n fraud_score INT,\n\n \n\n -- Categorical features (indexed)\n\n claim_type_index INT,\n\n incident_type_index INT,\n\n location_index INT,\n\n \n\n -- Temporal features\n\n claim_month INT,\n\n claim_day_of_week INT,\n\n claim_hour INT,\n\n \n\n -- Derived features\n\n amount_category STRING,\n\n is_high_value_claim BOOLEAN,\n\n is_fraud BOOLEAN,\n\n \n\n -- Processing metadata\n\n created_date DATE,\n\n dataset_version STRING\n\n)\n\nUSING DELTA\n\nCLUSTER BY (fraud_score, claim_date)\n\n\"\"\")\n\nprint(\"Gold layer ML dataset table created successfully!\")\nprint(\"Clustering optimized for fraud analysis and temporal patterns.\")", "metadata": { "trusted": true, "execution": { "iopub.status.busy": "2025-12-19T15:40:06.549Z" } }, "outputs": [ { "output_type": "display_data", "data": { "text/plain": "Gold layer analytics table created successfully!\n" }, "metadata": {} }, { "output_type": "display_data", "data": { "text/plain": "Gold layer ML dataset table created successfully!\nClustering optimized for fraud analysis and temporal patterns.\n" }, "metadata": {} } ], "execution_count": 24 }, { "cell_type": "markdown", "source": "## Step 6: Transform Silver to Gold Layer Analytics\n\n\n\n### Business Intelligence Aggregations\n\n\nCreate aggregated metrics for:\n\n\n- **Temporal analysis**: Monthly/quarterly claim trends\n- **Risk analysis**: Fraud patterns and risk distributions\n- **Operational metrics**: Claim volumes and processing efficiency\n- **Geographic insights**: Location-based claim patterns\n\n\n\n### Feature Engineering for ML\n\n\nPrepare features for fraud detection:\n\n\n- **Categorical encoding**: Convert strings to numeric indices\n- **Temporal features**: Extract time-based patterns\n- **Derived features**: Business logic-based indicators\n- **Target variable**: Fraud classification based on score thresholds", "metadata": {} }, { "cell_type": "code", "source": "# Transform Silver data to Gold layer analytics\n\nfrom pyspark.sql.functions import (\n count, sum, avg, countDistinct, current_date, lit,\n round as spark_round\n)\n\n# Read Silver layer data\ndf_silver = spark.read.table(\"insurance.silver.insurance_claims_clean\")\n\nprint(f\"Processing {df_silver.count()} records from Silver layer for Gold layer analytics\")\n\n# Create aggregated analytics dataset\ndf_analytics = df_silver.groupBy(\n \"claim_year\", \"claim_month\", \"claim_type\", \n \"incident_type\", \"location\", \"risk_category\"\n).agg(\n count(\"*\").alias(\"total_claims\"),\n sum(\"claim_amount\").alias(\"total_claim_amount\"),\n spark_round(avg(\"claim_amount\"), 2).alias(\"avg_claim_amount\"),\n spark_round(avg(\"fraud_score\"), 2).alias(\"avg_fraud_score\"),\n sum(when(col(\"is_high_value_claim\"), 1).otherwise(0)).alias(\"high_value_claims\"),\n countDistinct(\"policy_id\").alias(\"unique_policies\")\n).withColumn(\"processing_date\", current_date()) \\\n .withColumn(\"gold_version\", lit(\"v1.0\"))\n\nprint(f\"Created {df_analytics.count()} aggregated analytics records\")\n\n# Show sample analytics\nprint(\"\\nSample Gold Layer Analytics:\")\ndf_analytics.orderBy(col(\"total_claims\").desc()).show(10)", "metadata": { "trusted": true, "execution": { "iopub.status.busy": "2025-12-19T15:45:04.080Z" } }, "outputs": [ { "output_type": "display_data", "data": { "text/plain": "Processing 9666 records from Silver layer for Gold layer analytics\n" }, "metadata": {} }, { "output_type": "display_data", "data": { "text/plain": "Created 7397 aggregated analytics records\n\nSample Gold Layer Analytics:\n" }, "metadata": {} }, { "output_type": "display_data", "data": { "text/plain": "+----------+-----------+----------+-----------------+---------------+--------------+------------+------------------+----------------+---------------+-----------------+---------------+---------------+------------+\n|claim_year|claim_month|claim_type| incident_type| location| risk_category|total_claims|total_claim_amount|avg_claim_amount|avg_fraud_score|high_value_claims|unique_policies|processing_date|gold_version|\n+----------+-----------+----------+-----------------+---------------+--------------+------------+------------------+----------------+---------------+-----------------+---------------+---------------+------------+\n| 2024| 9| Home| Fire| New York, NY| High Risk| 5| 401940.56| 80388.11| 70.6| 2| 5| 2025-12-19| v1.0|\n| 2024| 1| Health|Medical Emergency| Denver, CO| High Risk| 5| 203666.18| 40733.24| 68.4| 2| 5| 2025-12-19| v1.0|\n| 2024| 7| Property| Fire| Chicago, IL| Very Low Risk| 5| 356388.92| 71277.78| 9.4| 3| 5| 2025-12-19| v1.0|\n| 2024| 1| Life| Theft| Denver, CO| Medium Risk| 5| 1772831.45| 354566.29| 49.4| 5| 5| 2025-12-19| v1.0|\n| 2024| 9| Home| Flood| Chicago, IL|Very High Risk| 5| 496691.85| 99338.37| 93.6| 4| 5| 2025-12-19| v1.0|\n| 2024| 12| Auto| Natural Disaster|Los Angeles, CA|Very High Risk| 5| 149494.79| 29898.96| 88.8| 0| 5| 2025-12-19| v1.0|\n| 2024| 9| Home| Collision| Denver, CO| Very Low Risk| 5| 680022.47| 136004.49| 4.0| 5| 5| 2025-12-19| v1.0|\n| 2024| 6| Health| Theft| Denver, CO| High Risk| 5| 270356.92| 54071.38| 67.8| 3| 5| 2025-12-19| v1.0|\n| 2024| 3| Home| Natural Disaster| Chicago, IL| Medium Risk| 4| 420147.64| 105036.91| 46.0| 3| 4| 2025-12-19| v1.0|\n| 2024| 10| Life| Accident| New York, NY| Very Low Risk| 4| 745972.75| 186493.19| 5.75| 3| 4| 2025-12-19| v1.0|\n+----------+-----------+----------+-----------------+---------------+--------------+------------+------------------+----------------+---------------+-----------------+---------------+---------------+------------+\nonly showing top 10 rows\n\n" }, "metadata": {} } ], "execution_count": 31 }, { "cell_type": "code", "source": "# Insert analytics data into Gold layer\n\ndf_analytics.write.mode(\"overwrite\").saveAsTable(\"insurance.gold.claim_analytics\")\n\nprint(f\"Successfully inserted {df_analytics.count()} analytics records into insurance.gold.claim_analytics\")\n\n# Verify Gold analytics data\nanalytics_summary = spark.sql(\"\"\"\n SELECT \n COUNT(*) as total_aggregations,\n ROUND(SUM(total_claim_amount), 2) as total_claim_value,\n ROUND(AVG(avg_fraud_score), 2) as avg_fraud_score\n FROM insurance.gold.claim_analytics\n\"\"\").collect()[0]\n\nprint(f\"\\nGold layer analytics summary:\")\nprint(f\"- Total aggregations: {analytics_summary[0]}\")\nprint(f\"- Total claim value: ${analytics_summary[1]:,}\")\nprint(f\"- Average fraud score: {analytics_summary[2]}\")", "metadata": { "trusted": true, "execution": { "iopub.status.busy": "2025-12-19T15:45:40.703Z" } }, "outputs": [ { "output_type": "display_data", "data": { "text/plain": "Successfully inserted 7397 analytics records into insurance.gold.claim_analytics\n" }, "metadata": {} }, { "output_type": "display_data", "data": { "text/plain": "\nGold layer analytics summary:\n- Total aggregations: 7397\n- Total claim value: $978,477,793.21\n- Average fraud score: 49.28\n" }, "metadata": {} } ], "execution_count": 32 }, { "cell_type": "markdown", "source": "## Step 7: Create ML-Ready Fraud Detection Dataset\n\n\n\n### Feature Engineering Pipeline\n\n\nTransform Silver data into ML-ready features:\n\n\n1. **Categorical Encoding**: Convert categorical variables to numeric indices\n2. **Feature Selection**: Choose relevant features for fraud detection\n3. **Target Creation**: Define fraud based on score thresholds\n4. **Data Splitting**: Prepare train/validation/test sets", "metadata": {} }, { "cell_type": "code", "source": "# Create ML-ready fraud detection dataset\n\nfrom pyspark.ml.feature import StringIndexer\nfrom pyspark.sql.functions import current_date\n\n# Read Silver layer data for ML\ndf_silver_ml = spark.read.table(\"insurance.silver.insurance_claims_clean\")\n\n# Create indexers for categorical variables\nclaim_type_indexer = StringIndexer(inputCol=\"claim_type\", outputCol=\"claim_type_index\")\nincident_type_indexer = StringIndexer(inputCol=\"incident_type\", outputCol=\"incident_type_index\")\nlocation_indexer = StringIndexer(inputCol=\"location\", outputCol=\"location_index\")\n\n# Apply indexing\ndf_ml_indexed = claim_type_indexer.fit(df_silver_ml).transform(df_silver_ml)\ndf_ml_indexed = incident_type_indexer.fit(df_ml_indexed).transform(df_ml_indexed)\ndf_ml_indexed = location_indexer.fit(df_ml_indexed).transform(df_ml_indexed)\n\n# Create ML dataset with selected features\ndf_ml_dataset = df_ml_indexed.select(\n \"policy_id\",\n \"claim_date\",\n \"claim_amount\",\n \"fraud_score\",\n \"claim_type_index\",\n \"incident_type_index\",\n \"location_index\",\n \"claim_month\",\n \"claim_day_of_week\",\n \"claim_hour\",\n \"amount_category\",\n \"is_high_value_claim\",\n (col(\"fraud_score\") >= 60).alias(\"is_fraud\"),\n current_date().alias(\"created_date\"),\n lit(\"v1.0\").alias(\"dataset_version\")\n)\n\nprint(f\"Created ML-ready dataset with {df_ml_dataset.count()} records\")\nprint(\"\\nFraud distribution:\")\ndf_ml_dataset.groupBy(\"is_fraud\").count().show()\n\nprint(\"\\nSample ML Dataset:\")\ndf_ml_dataset.show(5)", "metadata": { "trusted": true, "execution": { "iopub.status.busy": "2025-12-19T15:43:33.255Z" } }, "outputs": [ { "output_type": "display_data", "data": { "text/plain": "Created ML-ready dataset with 9666 records\n\nFraud distribution:\n" }, "metadata": {} }, { "output_type": "display_data", "data": { "text/plain": "+--------+-----+\n|is_fraud|count|\n+--------+-----+\n| true| 3850|\n| false| 5816|\n+--------+-----+\n\n\nSample ML Dataset:\n" }, "metadata": {} }, { "output_type": "display_data", "data": { "text/plain": "+-----------+-------------------+------------+-----------+----------------+-------------------+--------------+-----------+-----------------+----------+---------------+-------------------+--------+------------+---------------+\n| policy_id| claim_date|claim_amount|fraud_score|claim_type_index|incident_type_index|location_index|claim_month|claim_day_of_week|claim_hour|amount_category|is_high_value_claim|is_fraud|created_date|dataset_version|\n+-----------+-------------------+------------+-----------+----------------+-------------------+--------------+-----------+-----------------+----------+---------------+-------------------+--------+------------+---------------+\n|POL00000001|2024-10-31 17:00:00| 30735.20| 8| 3.0| 0.0| 2.0| 10| 5| 17| Medium Value| false| false| 2025-12-19| v1.0|\n|POL00000002|2024-07-28 22:00:00| 146227.66| 33| 4.0| 1.0| 1.0| 7| 1| 22| High Value| true| false| 2025-12-19| v1.0|\n|POL00000003|2024-07-30 19:00:00| 11313.85| 8| 0.0| 2.0| 2.0| 7| 3| 19| Low Value| false| false| 2025-12-19| v1.0|\n|POL00000003|2024-10-23 23:00:00| 20968.79| 1| 3.0| 6.0| 4.0| 10| 4| 23| Low Value| false| false| 2025-12-19| v1.0|\n|POL00000004|2024-05-25 10:00:00| 21857.29| 72| 3.0| 5.0| 6.0| 5| 7| 10| Low Value| false| true| 2025-12-19| v1.0|\n+-----------+-------------------+------------+-----------+----------------+-------------------+--------------+-----------+-----------------+----------+---------------+-------------------+--------+------------+---------------+\nonly showing top 5 rows\n\n" }, "metadata": {} } ], "execution_count": 27 }, { "cell_type": "code", "source": "# Insert ML dataset into Gold layer\n\ndf_ml_dataset.write.mode(\"overwrite\").saveAsTable(\"insurance.gold.fraud_ml_dataset\")\n\nprint(f\"Successfully inserted {df_ml_dataset.count()} ML-ready records into insurance.gold.fraud_ml_dataset\")\n\n# Verify Gold ML data\nml_summary = spark.sql(\"\"\"\n SELECT \n COUNT(*) as total_records,\n SUM(CASE WHEN is_fraud THEN 1 ELSE 0 END) as fraud_cases,\n ROUND(AVG(claim_amount), 2) as avg_claim_amount,\n ROUND(AVG(fraud_score), 2) as avg_fraud_score\n FROM insurance.gold.fraud_ml_dataset\n\"\"\").collect()[0]\n\nprint(f\"\\nGold layer ML dataset summary:\")\nprint(f\"- Total records: {ml_summary[0]}\")\nprint(f\"- Fraud cases: {ml_summary[1]} ({ml_summary[1]/ml_summary[0]*100:.1f}%)\")\nprint(f\"- Average claim amount: ${ml_summary[2]:,}\")\nprint(f\"- Average fraud score: {ml_summary[3]}\")", "metadata": { "trusted": true, "execution": { "iopub.status.busy": "2025-12-19T15:44:00.669Z" } }, "outputs": [ { "output_type": "display_data", "data": { "text/plain": "Successfully inserted 9666 ML-ready records into insurance.gold.fraud_ml_dataset\n" }, "metadata": {} }, { "output_type": "display_data", "data": { "text/plain": "\nGold layer ML dataset summary:\n- Total records: 9666\n- Fraud cases: 3850 (39.8%)\n- Average claim amount: $101,228.82\n- Average fraud score: 49.49\n" }, "metadata": {} } ], "execution_count": 28 }, { "cell_type": "markdown", "source": "## Step 8: Demonstrate Medallion Architecture Benefits\n\n\n\n### Query Performance Across Layers\n\n\nShow how liquid clustering optimizes queries at each layer:\n\n\n- **Bronze**: Raw data queries with ingestion metadata\n- **Silver**: Clean data queries with derived fields\n- **Gold**: Aggregated analytics and ML-ready queries\n\n\n\n### Business Value Demonstration\n\n\nIllustrate insurance analytics use cases:\n\n\n- **Operational monitoring**: Claim processing metrics\n- **Risk analysis**: Fraud pattern detection\n- **Business intelligence**: Trend analysis and KPIs", "metadata": {} }, { "cell_type": "code", "source": "# Demonstrate Medallion Architecture query performance\n\nprint(\"=== Medallion Architecture Query Performance Demo ===\\n\")\n\n# Bronze Layer: Raw data analysis\nprint(\"1. BRONZE LAYER - Raw Data Ingestion Analysis\")\nbronze_stats = spark.sql(\"\"\"\n SELECT \n source_system,\n COUNT(*) as records_ingested,\n ROUND(AVG(claim_amount), 2) as avg_amount\n FROM insurance.bronze.insurance_claims_raw\n GROUP BY source_system\n ORDER BY records_ingested DESC\n\"\"\")\nbronze_stats.show()\n\n# Silver Layer: Clean data analysis\nprint(\"\\n2. SILVER LAYER - Clean Data Quality Analysis\")\nsilver_stats = spark.sql(\"\"\"\n SELECT \n risk_category,\n COUNT(*) as claims,\n ROUND(AVG(claim_amount), 2) as avg_amount,\n ROUND(AVG(data_quality_score), 3) as quality_score\n FROM insurance.silver.insurance_claims_clean\n GROUP BY risk_category\n ORDER BY claims DESC\n\"\"\")\nsilver_stats.show()\n\n# Gold Layer: Business analytics\nprint(\"\\n3. GOLD LAYER - Business Intelligence Analytics\")\ngold_stats = spark.sql(\"\"\"\n SELECT \n claim_year,\n claim_month,\n SUM(total_claims) as monthly_claims,\n ROUND(SUM(total_claim_amount), 2) as monthly_amount,\n ROUND(AVG(avg_fraud_score), 2) as avg_risk_score\n FROM insurance.gold.claim_analytics\n GROUP BY claim_year, claim_month\n ORDER BY claim_year, claim_month\n\"\"\")\ngold_stats.show()\n\n# Gold Layer: ML insights\nprint(\"\\n4. GOLD LAYER - ML-Ready Fraud Analysis\")\nfraud_insights = spark.sql(\"\"\"\n SELECT \n amount_category,\n is_fraud,\n COUNT(*) as claims,\n ROUND(AVG(claim_amount), 2) as avg_amount\n FROM insurance.gold.fraud_ml_dataset\n GROUP BY amount_category, is_fraud\n ORDER BY amount_category, is_fraud\n\"\"\")\nfraud_insights.show()", "metadata": { "trusted": true, "execution": { "iopub.status.busy": "2025-12-19T15:46:06.072Z" } }, "outputs": [ { "output_type": "display_data", "data": { "text/plain": "=== Medallion Architecture Query Performance Demo ===\n\n1. BRONZE LAYER - Raw Data Ingestion Analysis\n" }, "metadata": {} }, { "output_type": "display_data", "data": { "text/plain": "+-------------+----------------+----------+\n|source_system|records_ingested|avg_amount|\n+-------------+----------------+----------+\n|Legacy_Claims| 5180| 79355.76|\n| Partner_API| 2274| 102191.12|\n| Mobile_App| 1312| 104661.70|\n| Direct_Web| 900| 99272.45|\n+-------------+----------------+----------+\n\n\n2. SILVER LAYER - Clean Data Quality Analysis\n" }, "metadata": {} }, { "output_type": "display_data", "data": { "text/plain": "+--------------+------+----------+-------------+\n| risk_category|claims|avg_amount|quality_score|\n+--------------+------+----------+-------------+\n| Medium Risk| 1956| 102039.71| 1.0|\n| High Risk| 1946| 103120.08| 1.0|\n| Low Risk| 1940| 96515.19| 1.0|\n| Very Low Risk| 1920| 104572.57| 1.0|\n|Very High Risk| 1904| 99893.72| 1.0|\n+--------------+------+----------+-------------+\n\n\n3. GOLD LAYER - Business Intelligence Analytics\n" }, "metadata": {} }, { "output_type": "display_data", "data": { "text/plain": "+----------+-----------+--------------+--------------+--------------+\n|claim_year|claim_month|monthly_claims|monthly_amount|avg_risk_score|\n+----------+-----------+--------------+--------------+--------------+\n| 2024| 1| 819| 80797908.54| 48.93|\n| 2024| 2| 773| 76222034.76| 50.82|\n| 2024| 3| 800| 81043424.47| 49.08|\n| 2024| 4| 816| 81202422.92| 48.85|\n| 2024| 5| 825| 82443145.34| 48.94|\n| 2024| 6| 770| 77217884.04| 50.08|\n| 2024| 7| 795| 80651164.65| 48.33|\n| 2024| 8| 828| 84490675.35| 48.71|\n| 2024| 9| 780| 79059662.50| 49.04|\n| 2024| 10| 816| 85649684.06| 49.98|\n| 2024| 11| 830| 85716244.97| 49.97|\n| 2024| 12| 814| 83983541.61| 48.73|\n+----------+-----------+--------------+--------------+--------------+\n\n\n4. GOLD LAYER - ML-Ready Fraud Analysis\n" }, "metadata": {} }, { "output_type": "display_data", "data": { "text/plain": "+---------------+--------+------+----------+\n|amount_category|is_fraud|claims|avg_amount|\n+---------------+--------+------+----------+\n| High Value| false| 1945| 215219.15|\n| High Value| true| 1291| 217092.81|\n| Low Value| false| 1191| 13202.54|\n| Low Value| true| 816| 13251.86|\n| Medium Value| false| 2680| 57195.15|\n| Medium Value| true| 1743| 57251.28|\n+---------------+--------+------+----------+\n\n" }, "metadata": {} } ], "execution_count": 33 }, { "cell_type": "markdown", "source": "## Step 9: Train Fraud Detection Model on Gold Layer Data\n\n\n\n### Machine Learning on Curated Data\n\n\nUse the Gold layer ML-ready dataset to train a fraud detection model:\n\n\n- **Feature-rich**: Engineered features from Silver layer\n- **Optimized**: Liquid clustering for fast model training\n- **Business-focused**: Fraud prediction with clear business impact\n\n\n\n### Model Evaluation and Business Impact\n\n\nEvaluate model performance and calculate:\n\n\n- **Accuracy metrics**: Precision, recall, AUC\n- **Business value**: Potential fraud detection savings\n- **Operational impact**: Investigation prioritization", "metadata": {} }, { "cell_type": "code", "source": "# Train fraud detection model using Gold layer ML dataset\n\nfrom pyspark.ml.feature import VectorAssembler, StandardScaler\nfrom pyspark.ml.classification import RandomForestClassifier\nfrom pyspark.ml.evaluation import BinaryClassificationEvaluator\nfrom pyspark.ml import Pipeline\nimport pyspark.sql.functions as F\n\n# Load Gold layer ML dataset\nml_data = spark.read.table(\"insurance.gold.fraud_ml_dataset\")\n\nprint(f\"Loaded {ml_data.count()} records from Gold layer ML dataset\")\n\n# Prepare features for ML\nfeature_cols = [\n \"claim_amount\", \"claim_month\", \"claim_day_of_week\", \"claim_hour\",\n \"claim_type_index\", \"incident_type_index\", \"location_index\"\n]\n\n# Create feature vector\nassembler = VectorAssembler(inputCols=feature_cols, outputCol=\"features\")\nscaler = StandardScaler(inputCol=\"features\", outputCol=\"scaled_features\")\n\n# Train Random Forest model\nrf = RandomForestClassifier(\n labelCol=\"is_fraud\",\n featuresCol=\"scaled_features\",\n numTrees=100,\n maxDepth=10,\n seed=42\n)\n\n# Create pipeline\npipeline = Pipeline(stages=[assembler, scaler, rf])\n\n# Split data\ntrain_data, test_data = ml_data.randomSplit([0.8, 0.2], seed=42)\n\nprint(f\"Training set: {train_data.count()} records\")\nprint(f\"Test set: {test_data.count()} records\")\n\n# Show class distribution\nprint(\"\\nFraud distribution in training set:\")\ntrain_data.groupBy(\"is_fraud\").count().show()", "metadata": { "trusted": true, "execution": { "iopub.status.busy": "2025-12-19T15:46:18.813Z" } }, "outputs": [ { "output_type": "display_data", "data": { "text/plain": "Loaded 9666 records from Gold layer ML dataset\n" }, "metadata": {} }, { "output_type": "display_data", "data": { "text/plain": "Training set: 7807 records\n" }, "metadata": {} }, { "output_type": "display_data", "data": { "text/plain": "Test set: 1859 records\n\nFraud distribution in training set:\n" }, "metadata": {} }, { "output_type": "display_data", "data": { "text/plain": "+--------+-----+\n|is_fraud|count|\n+--------+-----+\n| true| 3087|\n| false| 4720|\n+--------+-----+\n\n" }, "metadata": {} } ], "execution_count": 34 }, { "cell_type": "code", "source": "# Train and evaluate the fraud detection model\ntrain_data = train_data.withColumn(\"is_fraud\", col(\"is_fraud\").cast(IntegerType()))\ntest_data = test_data.withColumn(\"is_fraud\", col(\"is_fraud\").cast(IntegerType()))\n\nprint(\"Training fraud detection model on Gold layer data...\")\nmodel = pipeline.fit(train_data)\n\n# Make predictions\npredictions = model.transform(test_data)\n\n# Evaluate model\nevaluator = BinaryClassificationEvaluator(labelCol=\"is_fraud\", metricName=\"areaUnderROC\")\nauc = evaluator.evaluate(predictions)\n\nprint(f\"\\nModel Performance on Gold Layer Data:\")\nprint(f\"AUC: {auc:.4f}\")\n\n# Calculate detailed metrics\ntp = predictions.filter(\"is_fraud = 1 AND prediction = 1\").count()\ntn = predictions.filter(\"is_fraud = 0 AND prediction = 0\").count()\nfp = predictions.filter(\"is_fraud = 0 AND prediction = 1\").count()\nfn = predictions.filter(\"is_fraud = 1 AND prediction = 0\").count()\n\nprecision = tp / (tp + fp) if (tp + fp) > 0 else 0\nrecall = tp / (tp + fn) if (tp + fn) > 0 else 0\naccuracy = (tp + tn) / (tp + tn + fp + fn)\n\nprint(f\"Precision: {precision:.4f}\")\nprint(f\"Recall: {recall:.4f}\")\nprint(f\"Accuracy: {accuracy:.4f}\")\n\n# Business impact analysis\nfraud_claims = predictions.filter(\"prediction = 1\")\npotential_fraud_value = fraud_claims.agg(F.sum(\"claim_amount\")).collect()[0][0]\ntotal_test_value = test_data.agg(F.sum(\"claim_amount\")).collect()[0][0]\n\nprint(f\"\\nBusiness Impact Analysis:\")\nprint(f\"Total test set claim value: ${total_test_value:,.2f}\")\nprint(f\"Predicted fraudulent claims value: ${potential_fraud_value:,.2f}\")\nprint(f\"Fraud detection coverage: {potential_fraud_value/total_test_value*100:.1f}%\")\nprint(f\"Potential annual savings (est.): ${potential_fraud_value*12:,.2f}\")\n\n# Show sample predictions\nprint(\"\\nSample Fraud Predictions:\")\npredictions.select(\"policy_id\", \"claim_amount\", \"fraud_score\", \"is_fraud\", \"prediction\", \"probability\") \\\n .orderBy(F.desc(\"probability\")) \\\n .show(10)", "metadata": { "trusted": true, "execution": { "iopub.status.busy": "2025-12-19T15:48:39.649Z" } }, "outputs": [ { "output_type": "display_data", "data": { "text/plain": "Training fraud detection model on Gold layer data...\n" }, "metadata": {} }, { "output_type": "display_data", "data": { "text/plain": "\nModel Performance on Gold Layer Data:\nAUC: 0.4979\n" }, "metadata": {} }, { "output_type": "display_data", "data": { "text/plain": "Precision: 0.3736\nRecall: 0.0446\nAccuracy: 0.5772\n" }, "metadata": {} }, { "output_type": "display_data", "data": { "text/plain": "\nBusiness Impact Analysis:\nTotal test set claim value: $184,631,848.85\nPredicted fraudulent claims value: $12,207,875.25\nFraud detection coverage: 6.6%\nPotential annual savings (est.): $146,494,503.00\n\nSample Fraud Predictions:\n" }, "metadata": {} }, { "output_type": "display_data", "data": { "text/plain": "+-----------+------------+-----------+--------+----------+--------------------+\n| policy_id|claim_amount|fraud_score|is_fraud|prediction| probability|\n+-----------+------------+-----------+--------+----------+--------------------+\n|POL00009569| 12449.93| 65| 1| 0.0|[0.79989178380132...|\n|POL00008076| 1366.26| 34| 0| 0.0|[0.77271661548148...|\n|POL00008940| 30951.86| 84| 1| 0.0|[0.75911306876166...|\n|POL00006015| 109689.57| 49| 0| 0.0|[0.75803257555286...|\n|POL00009235| 43158.71| 91| 1| 0.0|[0.73937684219733...|\n|POL00001855| 8171.97| 99| 1| 0.0|[0.73877500051399...|\n|POL00001810| 143294.36| 97| 1| 0.0|[0.73588989728601...|\n|POL00009662| 220301.52| 29| 0| 0.0|[0.73233852437669...|\n|POL00002310| 8861.38| 49| 0| 0.0|[0.73077833978262...|\n|POL00009034| 162076.57| 4| 0| 0.0|[0.72874533414012...|\n+-----------+------------+-----------+--------+----------+--------------------+\nonly showing top 10 rows\n\n" }, "metadata": {} } ], "execution_count": 36 }, { "cell_type": "markdown", "source": "# Key Takeaways: Medallion Architecture with Delta Liquid Clustering\n\n\n\n## What We Demonstrated\n\n\n\n### 1. Complete Medallion Architecture Implementation\n\n\n- **Bronze Layer**: Raw data ingestion with liquid clustering on `(policy_id, ingestion_date)`\n- **Silver Layer**: Data quality, cleansing, and enrichment with clustering on `(policy_id, claim_date)`\n- **Gold Layer**: Business analytics and ML-ready datasets with specialized clustering\n\n\n### 2. Delta Liquid Clustering Optimization\n\n\n- **Automatic optimization**: No manual partitioning or Z-Ordering required\n- **Query-specific clustering**: Each layer optimized for its access patterns\n- **Performance benefits**: Fast queries on clustered columns\n- **Maintenance-free**: Automatic data layout optimization\n\n\n### 3. Progressive Data Refinement\n\n\n- **Bronze**: Preserves data fidelity with ingestion metadata\n- **Silver**: Clean, standardized data with derived fields\n- **Gold**: Curated analytics and ML-ready features\n\n\n### 4. Insurance Business Value\n\n\n- **Fraud Detection**: ML model trained on curated Gold layer data\n- **Operational Analytics**: Real-time claim processing insights\n- **Risk Management**: Automated risk scoring and categorization\n- **Business Intelligence**: Trend analysis and KPI monitoring\n\n\n## AIDP Advantages\n\n\n- **Unified Platform**: Seamless data flow from ingestion to analytics\n- **Performance**: Liquid clustering optimizes each layer for its use case\n- **Governance**: Catalog-based data organization and access control\n- **Scalability**: Handles insurance-scale data volumes efficiently\n\n\n## Best Practices Demonstrated\n\n\n### Data Architecture\n- Choose clustering columns based on primary query patterns\n- Design layer-specific schemas for optimal performance\n- Implement proper data quality checks and monitoring\n\n### Performance Optimization\n- Use liquid clustering for automatic data layout optimization\n- Align clustering strategy with access patterns\n- Balance between too few and too many clustering columns\n\n### Business Value\n- Focus on end-to-end analytics workflows\n- Enable self-service analytics with curated datasets\n- Integrate ML capabilities for predictive insights\n\n\n## Next Steps\n\n\n- **Scale Up**: Process larger insurance datasets\n- **Add Streaming**: Implement real-time claim processing\n- **Advanced ML**: Try AutoML and deep learning models\n- **Data Products**: Create APIs for claim analytics\n- **Monitoring**: Add data quality and performance monitoring\n\n\nThis implementation demonstrates how Oracle AI Data Platform enables sophisticated insurance analytics while maintaining enterprise-grade performance, governance, and scalability.", "metadata": {} } ] }