{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Insurance: Delta Liquid Clustering Demo\n", "\n", "\n", "\n", "## Overview\n", "\n", "\n", "\n", "This notebook demonstrates the power of **Delta Liquid Clustering** in Oracle AI Data Platform (AIDP) Workbench using an insurance analytics use case. Liquid clustering automatically optimizes data layout for query performance without requiring manual partitioning or Z-Ordering.\n", "\n", "\n", "\n", "### What is Liquid Clustering?\n", "\n", "\n", "\n", "Liquid clustering automatically identifies and groups similar data together based on clustering columns you define. This optimization happens automatically during data ingestion and maintenance operations, providing:\n", "\n", "\n", "\n", "- **Automatic optimization**: No manual tuning required\n", "\n", "- **Improved query performance**: Faster queries on clustered columns\n", "\n", "- **Reduced maintenance**: No need for manual repartitioning\n", "\n", "- **Adaptive clustering**: Adjusts as data patterns change\n", "\n", "\n", "\n", "### Use Case: Insurance Risk Assessment and Fraud Detection\n", "\n", "\n", "\n", "We'll analyze insurance claim records from an insurance company. Our clustering strategy will optimize for:\n", "\n", "\n", "- **Policy-specific queries**: Fast lookups by policy ID\n", "\n", "- **Time-based analysis**: Efficient filtering by claim date\n", "\n", "- **Fraud pattern detection**: Quick aggregation by claim type and risk scores\n", "\n", "\n", "\n", "### AIDP Environment Setup\n", "\n", "\n", "\n", "This notebook leverages the existing Spark session in your AIDP environment." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create insurance catalog and analytics schema\n", "\n", "# In AIDP, catalogs provide data isolation and governance\n", "\n", "spark.sql(\"CREATE CATALOG IF NOT EXISTS insurance\")\n", "\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS insurance.analytics\")\n", "\n", "print(\"Insurance catalog and analytics schema created successfully!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 2: Create Delta Table with Liquid Clustering\n", "\n", "\n", "\n", "### Table Design\n", "\n", "\n", "\n", "Our `insurance_claims` table will store:\n", "\n", "\n", "- **policy_id**: Unique policy identifier\n", "\n", "- **claim_date**: Date and time of claim\n", "\n", "- **claim_type**: Type (Auto, Home, Health, Life, etc.)\n", "\n", "- **claim_amount**: Claim amount\n", "\n", "- **incident_type**: Type of incident (Accident, Theft, Natural Disaster, etc.)\n", "\n", "- **location**: Incident location\n", "\n", "- **fraud_score**: Fraud risk assessment (0-100)\n", "\n", "\n", "\n", "### Clustering Strategy\n", "\n", "\n", "We'll cluster by `policy_id` and `claim_date` because:\n", "\n", "\n", "- **policy_id**: Policies often have multiple claims over time, grouping policy history together\n", "\n", "- **claim_date**: Time-based queries are critical for fraud detection, seasonal analysis, and regulatory reporting\n", "\n", "- This combination optimizes for both policy analysis and temporal fraud pattern detection" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Delta table with liquid clustering created successfully!\n", "Clustering will automatically optimize data layout for queries on policy_id and claim_date.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create Delta table with liquid clustering\n", "\n", "# CLUSTER BY defines the columns for automatic optimization\n", "\n", "spark.sql(\"\"\"\n", "\n", "CREATE TABLE IF NOT EXISTS insurance.analytics.insurance_claims (\n", "\n", " policy_id STRING,\n", "\n", " claim_date TIMESTAMP,\n", "\n", " claim_type STRING,\n", "\n", " claim_amount DECIMAL(15,2),\n", "\n", " incident_type STRING,\n", "\n", " location STRING,\n", "\n", " fraud_score INT\n", "\n", ")\n", "\n", "USING DELTA\n", "\n", "CLUSTER BY (policy_id, claim_date)\n", "\n", "\"\"\")\n", "\n", "print(\"Delta table with liquid clustering created successfully!\")\n", "\n", "print(\"Clustering will automatically optimize data layout for queries on policy_id and claim_date.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 3: Generate Insurance Sample Data\n", "\n", "\n", "\n", "### Data Generation Strategy\n", "\n", "\n", "We'll create realistic insurance claim data including:\n", "\n", "\n", "- **10,000 policies** with multiple claims over time\n", "\n", "- **Claim types**: Auto, Home, Health, Life, Property\n", "\n", "- **Realistic temporal patterns**: Seasonal claim patterns, accident spikes\n", "\n", "- **Incident types**: Accidents, theft, natural disasters, illnesses\n", "\n", "\n", "\n", "### Why This Data Pattern?\n", "\n", "\n", "This data simulates real insurance scenarios where:\n", "\n", "\n", "- Policies accumulate claims over time\n", "\n", "- Fraud patterns emerge in claim submissions\n", "\n", "- Seasonal events affect claim volumes\n", "\n", "- Risk scoring enables fraud prevention\n", "\n", "- Policy analysis drives underwriting decisions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Generated 5513 insurance claim records\n", "Sample record: {'policy_id': 'POL00000001', 'claim_date': datetime.datetime(2024, 11, 27, 1, 0), 'claim_type': 'Life', 'claim_amount': 400797.58, 'incident_type': 'Flood', 'location': 'Chicago, IL', 'fraud_score': 28}\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Generate sample insurance claim data\n", "\n", "# Using fully qualified imports to avoid conflicts\n", "\n", "import random\n", "\n", "from datetime import datetime, timedelta\n", "\n", "\n", "# Define insurance data constants\n", "\n", "CLAIM_TYPES = ['Auto', 'Home', 'Health', 'Life', 'Property']\n", "\n", "INCIDENT_TYPES = ['Accident', 'Theft', 'Natural Disaster', 'Illness', 'Fire', 'Flood', 'Collision', 'Medical Emergency']\n", "\n", "LOCATIONS = ['New York, NY', 'Los Angeles, CA', 'Chicago, IL', 'Houston, TX', 'Miami, FL', 'Denver, CO', 'Seattle, WA']\n", "\n", "\n", "# Generate policy claim records\n", "\n", "claim_data = []\n", "\n", "base_date = datetime(2024, 1, 1)\n", "\n", "\n", "# Create 10,000 policies with 0-5 claims each\n", "\n", "for policy_num in range(1, 10001):\n", "\n", " policy_id = f\"POL{policy_num:08d}\"\n", " \n", " # Each policy gets 0-5 claims over 12 months (many policies have no claims)\n", "\n", " num_claims = random.choices([0, 1, 2, 3, 4, 5], weights=[0.7, 0.15, 0.08, 0.04, 0.02, 0.01])[0]\n", " \n", " for i in range(num_claims):\n", "\n", " # Spread claims over 12 months\n", "\n", " days_offset = random.randint(0, 365)\n", "\n", " hours_offset = random.randint(0, 23)\n", "\n", " claim_date = base_date + timedelta(days=days_offset, hours=hours_offset)\n", " \n", " # Select claim type\n", "\n", " claim_type = random.choice(CLAIM_TYPES)\n", " \n", " # Amount based on claim type\n", "\n", " if claim_type == 'Auto':\n", " amount = round(random.uniform(1000, 50000), 2)\n", " elif claim_type == 'Home':\n", " amount = round(random.uniform(5000, 200000), 2)\n", " elif claim_type == 'Health':\n", " amount = round(random.uniform(500, 100000), 2)\n", " elif claim_type == 'Life':\n", " amount = round(random.uniform(10000, 500000), 2)\n", " else: # Property\n", " amount = round(random.uniform(2000, 150000), 2)\n", " \n", " # Select incident type and location\n", " incident_type = random.choice(INCIDENT_TYPES)\n", " location = random.choice(LOCATIONS)\n", " \n", " # Fraud score (0-100, higher = more suspicious)\n", " fraud_score = random.randint(0, 100)\n", " \n", " claim_data.append({\n", " \"policy_id\": policy_id,\n", " \"claim_date\": claim_date,\n", " \"claim_type\": claim_type,\n", " \"claim_amount\": amount,\n", " \"incident_type\": incident_type,\n", " \"location\": location,\n", " \"fraud_score\": fraud_score\n", " })\n", "\n", "\n", "print(f\"Generated {len(claim_data)} insurance claim records\")\n", "if claim_data:\n", " print(\"Sample record:\", claim_data[0])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 4: Insert Data Using PySpark\n", "\n", "\n", "\n", "### Data Insertion Strategy\n", "\n", "\n", "We'll use PySpark to:\n", "\n", "\n", "1. **Create DataFrame** from our generated data\n", "\n", "2. **Insert into Delta table** with liquid clustering\n", "\n", "3. **Verify the insertion** with a sample query\n", "\n", "\n", "\n", "### Why PySpark for Insertion?\n", "\n", "\n", "- **Distributed processing**: Handles large datasets efficiently\n", "\n", "- **Type safety**: Ensures data integrity\n", "\n", "- **Optimization**: Leverages Spark's query optimization\n", "\n", "- **Liquid clustering**: Automatically applies clustering during insertion" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "DataFrame Schema:\n", "root\n", " |-- claim_amount: double (nullable = true)\n", " |-- claim_date: timestamp (nullable = true)\n", " |-- claim_type: string (nullable = true)\n", " |-- fraud_score: long (nullable = true)\n", " |-- incident_type: string (nullable = true)\n", " |-- location: string (nullable = true)\n", " |-- policy_id: string (nullable = true)\n", "\n", "\n", "Sample Data:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+------------+-------------------+----------+-----------+----------------+-----------+-----------+\n", "|claim_amount| claim_date|claim_type|fraud_score| incident_type| location| policy_id|\n", "+------------+-------------------+----------+-----------+----------------+-----------+-----------+\n", "| 400797.58|2024-11-27 01:00:00| Life| 28| Flood|Chicago, IL|POL00000001|\n", "| 8105.93|2024-10-20 09:00:00| Auto| 8| Flood|Chicago, IL|POL00000005|\n", "| 74300.1|2024-01-01 07:00:00| Health| 98| Fire| Miami, FL|POL00000005|\n", "| 3397.77|2024-08-30 15:00:00| Auto| 16|Natural Disaster| Miami, FL|POL00000005|\n", "| 29543.32|2024-06-11 14:00:00| Health| 100| Accident| Miami, FL|POL00000005|\n", "+------------+-------------------+----------+-----------+----------------+-----------+-----------+\n", "only showing top 5 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Successfully inserted 5513 records into insurance.analytics.insurance_claims\n", "Liquid clustering automatically optimized the data layout during insertion!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Insert data using PySpark DataFrame operations\n", "\n", "# Using fully qualified function references to avoid conflicts\n", "\n", "\n", "# Create DataFrame from generated data\n", "\n", "df_claims = spark.createDataFrame(claim_data)\n", "\n", "\n", "# Display schema and sample data\n", "\n", "print(\"DataFrame Schema:\")\n", "df_claims.printSchema()\n", "\n", "\n", "print(\"\\nSample Data:\")\n", "df_claims.show(5)\n", "\n", "# Insert data into Delta table with liquid clustering\n", "# The CLUSTER BY (policy_id, claim_date) will automatically optimize the data layout\n", "\n", "df_claims.write.mode(\"overwrite\").saveAsTable(\"insurance.analytics.insurance_claims\")\n", "\n", "print(f\"\\nSuccessfully inserted {df_claims.count()} records into insurance.analytics.insurance_claims\")\n", "print(\"Liquid clustering automatically optimized the data layout during insertion!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 5: Demonstrate Liquid Clustering Benefits\n", "\n", "\n", "\n", "### Query Performance Analysis\n", "\n", "\n", "Now let's see how liquid clustering improves query performance. We'll run queries that benefit from our clustering strategy:\n", "\n", "\n", "1. **Policy claim history** (clustered by policy_id)\n", "\n", "2. **Time-based fraud analysis** (clustered by claim_date)\n", "\n", "3. **Combined policy + time queries** (optimal for our clustering)\n", "\n", "\n", "\n", "### Expected Performance Benefits\n", "\n", "\n", "With liquid clustering, these queries should be significantly faster because:\n", "\n", "\n", "- **Data locality**: Related records are physically grouped together\n", "\n", "- **Reduced I/O**: Less data needs to be read from disk\n", "\n", "- **Automatic optimization**: No manual tuning required" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Query 1: Policy Claim History ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+-------------------+----------+------------+-------------+\n", "| policy_id| claim_date|claim_type|claim_amount|incident_type|\n", "+-----------+-------------------+----------+------------+-------------+\n", "|POL00000001|2024-11-27 01:00:00| Life| 400797.58| Flood|\n", "+-----------+-------------------+----------+------------+-------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Records found: 1\n", "\n", "=== Query 2: High-Risk Claims Today ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+---------+----------+------------+-----------+\n", "|claim_date|policy_id|claim_type|claim_amount|fraud_score|\n", "+----------+---------+----------+------------+-----------+\n", "+----------+---------+----------+------------+-----------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "High-risk claims found: 0\n", "\n", "=== Query 3: Policy Fraud Pattern Analysis ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+-------------------+----------+------------+-----------+\n", "| policy_id| claim_date|claim_type|claim_amount|fraud_score|\n", "+-----------+-------------------+----------+------------+-----------+\n", "|POL00000011|2024-12-11 18:00:00| Life| 16927.89| 96|\n", "|POL00000014|2024-08-09 15:00:00| Health| 22654.45| 22|\n", "|POL00000014|2024-11-03 06:00:00| Health| 32460.09| 13|\n", "|POL00000016|2024-10-28 11:00:00| Life| 76279.78| 38|\n", "+-----------+-------------------+----------+------------+-----------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Pattern records found: 4\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Demonstrate liquid clustering benefits with optimized queries\n", "\n", "\n", "# Query 1: Policy claim history - benefits from policy_id clustering\n", "\n", "print(\"=== Query 1: Policy Claim History ===\")\n", "\n", "policy_history = spark.sql(\"\"\"\n", "\n", "SELECT policy_id, claim_date, claim_type, claim_amount, incident_type\n", "\n", "FROM insurance.analytics.insurance_claims\n", "\n", "WHERE policy_id = 'POL00000001'\n", "\n", "ORDER BY claim_date DESC\n", "\n", "\"\"\")\n", "\n", "policy_history.show()\n", "print(f\"Records found: {policy_history.count()}\")\n", "\n", "# Query 2: Time-based fraud analysis - benefits from claim_date clustering\n", "\n", "print(\"\\n=== Query 2: High-Risk Claims Today ===\")\n", "\n", "high_risk_today = spark.sql(\"\"\"\n", "\n", "SELECT claim_date, policy_id, claim_type, claim_amount, fraud_score\n", "\n", "FROM insurance.analytics.insurance_claims\n", "\n", "WHERE DATE(claim_date) = CURRENT_DATE AND fraud_score > 70\n", "\n", "ORDER BY fraud_score DESC, claim_date DESC\n", "\n", "\"\"\")\n", "\n", "high_risk_today.show()\n", "print(f\"High-risk claims found: {high_risk_today.count()}\")\n", "\n", "# Query 3: Combined policy + time query - optimal for our clustering strategy\n", "\n", "print(\"\\n=== Query 3: Policy Fraud Pattern Analysis ===\")\n", "\n", "fraud_patterns = spark.sql(\"\"\"\n", "\n", "SELECT policy_id, claim_date, claim_type, claim_amount, fraud_score\n", "\n", "FROM insurance.analytics.insurance_claims\n", "\n", "WHERE policy_id LIKE 'POL0000001%' AND claim_date >= '2024-06-01'\n", "\n", "ORDER BY policy_id, claim_date\n", "\n", "\"\"\")\n", "\n", "fraud_patterns.show()\n", "print(f\"Pattern records found: {fraud_patterns.count()}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 6: Analyze Clustering Effectiveness\n", "\n", "\n", "\n", "### Understanding the Impact\n", "\n", "\n", "Let's examine how liquid clustering has organized our data and analyze some aggregate statistics to demonstrate the insurance insights possible with this optimized structure.\n", "\n", "\n", "\n", "### Key Analytics\n", "\n", "\n", "- **Claim volume** by type and fraud patterns\n", "\n", "- **Policy risk analysis** and claim frequency\n", "\n", "- **Fraud detection metrics** and risk scoring effectiveness\n", "\n", "- **Incident type trends** and geographic patterns" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Claim Analysis by Type ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+------------+--------------+----------+---------------+\n", "|claim_type|total_claims| total_amount|avg_amount|avg_fraud_score|\n", "+----------+------------+--------------+----------+---------------+\n", "| Home| 1114| 1.099470446E8| 98695.73| 50.2|\n", "| Life| 1110|2.8647859694E8| 258088.83| 49.24|\n", "| Auto| 1102| 2.857177722E7| 25927.2| 50.84|\n", "| Property| 1098| 8.424141052E7| 76722.6| 50.93|\n", "| Health| 1089| 5.610814189E7| 51522.63| 48.25|\n", "+----------+------------+--------------+----------+---------------+\n", "\n", "\n", "=== Fraud Score Distribution ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+--------------+-----------+----------+\n", "| risk_category|claim_count|percentage|\n", "+--------------+-----------+----------+\n", "| Medium Risk| 1134| 20.57|\n", "| High Risk| 1114| 20.21|\n", "|Very High Risk| 1110| 20.13|\n", "| Very Low Risk| 1080| 19.59|\n", "| Low Risk| 1075| 19.50|\n", "+--------------+-----------+----------+\n", "\n", "\n", "=== Incident Type Analysis ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------------+---------+-------------+--------------+\n", "| incident_type|incidents| total_claims|avg_fraud_risk|\n", "+-----------------+---------+-------------+--------------+\n", "| Flood| 683| 7.43306941E7| 50.2|\n", "| Collision| 706|7.284745742E7| 49.91|\n", "| Illness| 738|7.216841449E7| 50.81|\n", "| Accident| 653|7.152836838E7| 49.56|\n", "| Theft| 676|6.890431429E7| 48.4|\n", "|Medical Emergency| 703|6.881489943E7| 49.57|\n", "| Natural Disaster| 676|6.878878596E7| 51.25|\n", "| Fire| 678| 6.79640371E7| 49.36|\n", "+-----------------+---------+-------------+--------------+\n", "\n", "\n", "=== Monthly Claim Trends ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-------+------+-------------+--------------------+---------------+\n", "| month|claims| total_amount|policies_with_claims|avg_fraud_score|\n", "+-------+------+-------------+--------------------+---------------+\n", "|2024-01| 464|5.042012088E7| 427| 50.93|\n", "|2024-02| 431|4.588194485E7| 411| 50.04|\n", "|2024-03| 459|5.150691829E7| 428| 50.67|\n", "|2024-04| 438|4.305181724E7| 410| 49.53|\n", "|2024-05| 463|4.742429218E7| 437| 50.43|\n", "|2024-06| 446| 4.71913826E7| 426| 50.07|\n", "|2024-07| 483|4.948759258E7| 453| 49.26|\n", "|2024-08| 459|4.587712833E7| 429| 50.6|\n", "|2024-09| 467|4.639055335E7| 449| 49.63|\n", "|2024-10| 458|4.661274666E7| 436| 48.78|\n", "|2024-11| 447|4.389484181E7| 425| 48.68|\n", "|2024-12| 498| 4.76076324E7| 466| 50.08|\n", "+-------+------+-------------+--------------------+---------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Analyze clustering effectiveness and insurance insights\n", "\n", "\n", "# Claim analysis by type\n", "\n", "print(\"=== Claim Analysis by Type ===\")\n", "\n", "claim_analysis = spark.sql(\"\"\"\n", "\n", "SELECT claim_type, COUNT(*) as total_claims,\n", "\n", " ROUND(SUM(claim_amount), 2) as total_amount,\n", "\n", " ROUND(AVG(claim_amount), 2) as avg_amount,\n", "\n", " ROUND(AVG(fraud_score), 2) as avg_fraud_score\n", "\n", "FROM insurance.analytics.insurance_claims\n", "\n", "GROUP BY claim_type\n", "\n", "ORDER BY total_claims DESC\n", "\n", "\"\"\")\n", "\n", "claim_analysis.show()\n", "\n", "# Fraud score distribution\n", "\n", "print(\"\\n=== Fraud Score Distribution ===\")\n", "\n", "fraud_distribution = spark.sql(\"\"\"\n", "\n", "SELECT \n", "\n", " CASE \n", "\n", " WHEN fraud_score >= 80 THEN 'Very High Risk'\n", "\n", " WHEN fraud_score >= 60 THEN 'High Risk'\n", "\n", " WHEN fraud_score >= 40 THEN 'Medium Risk'\n", "\n", " WHEN fraud_score >= 20 THEN 'Low Risk'\n", "\n", " ELSE 'Very Low Risk'\n", "\n", " END as risk_category,\n", "\n", " COUNT(*) as claim_count,\n", "\n", " ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as percentage\n", "\n", "FROM insurance.analytics.insurance_claims\n", "\n", "GROUP BY \n", "\n", " CASE \n", "\n", " WHEN fraud_score >= 80 THEN 'Very High Risk'\n", "\n", " WHEN fraud_score >= 60 THEN 'High Risk'\n", "\n", " WHEN fraud_score >= 40 THEN 'Medium Risk'\n", "\n", " WHEN fraud_score >= 20 THEN 'Low Risk'\n", "\n", " ELSE 'Very Low Risk'\n", "\n", " END\n", "\n", "ORDER BY claim_count DESC\n", "\n", "\"\"\")\n", "\n", "fraud_distribution.show()\n", "\n", "# Incident type analysis\n", "\n", "print(\"\\n=== Incident Type Analysis ===\")\n", "\n", "incident_analysis = spark.sql(\"\"\"\n", "\n", "SELECT incident_type, COUNT(*) as incidents,\n", "\n", " ROUND(SUM(claim_amount), 2) as total_claims,\n", "\n", " ROUND(AVG(fraud_score), 2) as avg_fraud_risk\n", "\n", "FROM insurance.analytics.insurance_claims\n", "\n", "GROUP BY incident_type\n", "\n", "ORDER BY total_claims DESC\n", "\n", "\"\"\")\n", "\n", "incident_analysis.show()\n", "\n", "# Monthly claim trends\n", "\n", "print(\"\\n=== Monthly Claim Trends ===\")\n", "\n", "monthly_trends = spark.sql(\"\"\"\n", "\n", "SELECT DATE_FORMAT(claim_date, 'yyyy-MM') as month,\n", "\n", " COUNT(*) as claims,\n", "\n", " ROUND(SUM(claim_amount), 2) as total_amount,\n", "\n", " COUNT(DISTINCT policy_id) as policies_with_claims,\n", "\n", " ROUND(AVG(fraud_score), 2) as avg_fraud_score\n", "\n", "FROM insurance.analytics.insurance_claims\n", "\n", "GROUP BY DATE_FORMAT(claim_date, 'yyyy-MM')\n", "\n", "ORDER BY month\n", "\n", "\"\"\")\n", "\n", "monthly_trends.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 7: Train Insurance Fraud Detection Model\n", "\n", "\n", "\n", "### Machine Learning for Insurance Business Improvement\n", "\n", "\n", "Now we'll train a machine learning model to predict fraudulent insurance claims. This model can help insurance companies:\n", "\n", "\n", "- **Reduce fraud losses** by identifying suspicious claims\n", "\n", "- **Improve underwriting** with better risk assessment\n", "\n", "- **Automate claim processing** for low-risk claims\n", "\n", "- **Optimize investigations** by prioritizing high-risk claims\n", "\n", "\n", "### Model Approach\n", "\n", "\n", "We'll use a **Random Forest Classifier** to predict claim fraud based on:\n", "\n", "\n", "- Claim amount and type\n", "\n", "- Incident type and location\n", "\n", "- Temporal patterns\n", "\n", "- Policy claim history\n", "\n", "\n", "### Business Impact\n", "\n", "\n", "- **Fraud Detection**: Identify potentially fraudulent claims\n", "\n", "- **Cost Savings**: Reduce payout on fraudulent claims\n", "\n", "- **Efficiency**: Speed up legitimate claim processing\n", "\n", "- **Risk Management**: Better portfolio risk assessment" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Loaded 5513 records for ML training\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+--------+-----+\n", "|is_fraud|count|\n", "+--------+-----+\n", "| 1| 2714|\n", "| 0| 2799|\n", "+--------+-----+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Prepare data for machine learning\n", "\n", "from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler\n", "from pyspark.ml.classification import RandomForestClassifier\n", "from pyspark.ml.evaluation import BinaryClassificationEvaluator\n", "from pyspark.ml import Pipeline\n", "import pyspark.sql.functions as F\n", "\n", "# Load data for ML\n", "ml_data = spark.sql(\"\"\"\n", "SELECT \n", " policy_id,\n", " claim_date,\n", " claim_type,\n", " claim_amount,\n", " incident_type,\n", " location,\n", " fraud_score,\n", " CASE WHEN fraud_score > 50 THEN 1 ELSE 0 END as is_fraud\n", "FROM insurance.analytics.insurance_claims\n", "\"\"\")\n", "\n", "print(f\"Loaded {ml_data.count()} records for ML training\")\n", "ml_data.groupBy(\"is_fraud\").count().show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Training set: 4462 records\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Test set: 1051 records\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Feature engineering\n", "\n", "# Extract temporal features\n", "ml_data = ml_data.withColumn(\"month\", F.month(\"claim_date\")) \\\n", " .withColumn(\"day_of_week\", F.dayofweek(\"claim_date\")) \\\n", " .withColumn(\"hour\", F.hour(\"claim_date\"))\n", "\n", "# Create indexers for categorical variables\n", "claim_type_indexer = StringIndexer(inputCol=\"claim_type\", outputCol=\"claim_type_index\")\n", "incident_type_indexer = StringIndexer(inputCol=\"incident_type\", outputCol=\"incident_type_index\")\n", "location_indexer = StringIndexer(inputCol=\"location\", outputCol=\"location_index\")\n", "\n", "# Assemble features\n", "assembler = VectorAssembler(\n", " inputCols=[\"claim_amount\", \"month\", \"day_of_week\", \"hour\", \n", " \"claim_type_index\", \"incident_type_index\", \"location_index\"],\n", " outputCol=\"features\"\n", ")\n", "\n", "# Scale features\n", "scaler = StandardScaler(inputCol=\"features\", outputCol=\"scaled_features\")\n", "\n", "# Create and train the model\n", "rf = RandomForestClassifier(\n", " labelCol=\"is_fraud\", \n", " featuresCol=\"scaled_features\",\n", " numTrees=100,\n", " maxDepth=10\n", ")\n", "\n", "# Create pipeline\n", "pipeline = Pipeline(stages=[claim_type_indexer, incident_type_indexer, location_indexer, assembler, scaler, rf])\n", "\n", "# Split data\n", "train_data, test_data = ml_data.randomSplit([0.8, 0.2], seed=42)\n", "\n", "print(f\"Training set: {train_data.count()} records\")\n", "print(f\"Test set: {test_data.count()} records\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Training fraud detection model...\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Model AUC: 0.4755\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+------------+-----------+--------+----------+--------------------+\n", "| policy_id|claim_amount|fraud_score|is_fraud|prediction| probability|\n", "+-----------+------------+-----------+--------+----------+--------------------+\n", "|POL00001904| 4115.65| 52| 1| 0.0|[0.63190323912293...|\n", "|POL00001905| 95529.96| 40| 0| 1.0|[0.47798718719887...|\n", "|POL00001908| 316551.73| 93| 1| 0.0|[0.54588204876409...|\n", "|POL00001918| 58207.35| 61| 1| 0.0|[0.59752362590001...|\n", "|POL00001929| 48456.26| 64| 1| 0.0|[0.63309638679955...|\n", "|POL00001933| 70555.47| 83| 1| 0.0|[0.51046680088231...|\n", "|POL00001949| 45760.33| 53| 1| 0.0|[0.52839110150149...|\n", "|POL00001956| 60404.91| 42| 0| 1.0|[0.47152909208731...|\n", "|POL00001966| 80796.6| 13| 0| 1.0|[0.43347025130989...|\n", "|POL00001966| 22096.23| 93| 1| 0.0|[0.57927429736318...|\n", "+-----------+------------+-----------+--------+----------+--------------------+\n", "only showing top 10 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+--------+----------+-----+\n", "|is_fraud|prediction|count|\n", "+--------+----------+-----+\n", "| 1| 0.0| 293|\n", "| 0| 0.0| 282|\n", "| 1| 1.0| 223|\n", "| 0| 1.0| 253|\n", "+--------+----------+-----+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Train the model\n", "\n", "print(\"Training fraud detection model...\")\n", "model = pipeline.fit(train_data)\n", "\n", "# Make predictions\n", "predictions = model.transform(test_data)\n", "\n", "# Evaluate the model\n", "evaluator = BinaryClassificationEvaluator(labelCol=\"is_fraud\", metricName=\"areaUnderROC\")\n", "auc = evaluator.evaluate(predictions)\n", "\n", "print(f\"Model AUC: {auc:.4f}\")\n", "\n", "# Show prediction results\n", "predictions.select(\"policy_id\", \"claim_amount\", \"fraud_score\", \"is_fraud\", \"prediction\", \"probability\").show(10)\n", "\n", "# Calculate confusion matrix\n", "confusion_matrix = predictions.groupBy(\"is_fraud\", \"prediction\").count()\n", "confusion_matrix.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Feature Importance ===\n", "claim_amount: 0.2152\n", "month: 0.1609\n", "day_of_week: 0.1014\n", "hour: 0.1899\n", "claim_type: 0.0757\n", "incident_type: 0.1327\n", "location: 0.1242\n", "\n", "=== Business Impact Analysis ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Total test set claim amount: $112,316,036.12\n", "Predicted fraudulent claims amount: $45,858,556.86\n", "Potential fraud detection coverage: 40.8%\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Model Performance:\n", "Accuracy: 0.4805\n", "Precision: 0.4685\n", "Recall: 0.4322\n", "AUC: 0.4755\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Model interpretation and business insights\n", "\n", "# Feature importance (approximate)\n", "rf_model = model.stages[-1]\n", "feature_importance = rf_model.featureImportances\n", "feature_names = [\"claim_amount\", \"month\", \"day_of_week\", \"hour\", \"claim_type\", \"incident_type\", \"location\"]\n", "\n", "print(\"=== Feature Importance ===\")\n", "for name, importance in zip(feature_names, feature_importance):\n", " print(f\"{name}: {importance:.4f}\")\n", "\n", "# Business impact analysis\n", "print(\"\\n=== Business Impact Analysis ===\")\n", "\n", "# Calculate potential savings\n", "fraud_predictions = predictions.filter(\"prediction = 1\")\n", "potential_savings = fraud_predictions.agg(F.sum(\"claim_amount\")).collect()[0][0]\n", "\n", "total_test_claims = test_data.agg(F.sum(\"claim_amount\")).collect()[0][0]\n", "\n", "print(f\"Total test set claim amount: ${total_test_claims:,.2f}\")\n", "print(f\"Predicted fraudulent claims amount: ${potential_savings:,.2f}\")\n", "print(f\"Potential fraud detection coverage: {(potential_savings/total_test_claims)*100:.1f}%\")\n", "\n", "# Accuracy metrics\n", "accuracy = predictions.filter(\"is_fraud = prediction\").count() / predictions.count()\n", "precision = predictions.filter(\"prediction = 1 AND is_fraud = 1\").count() / predictions.filter(\"prediction = 1\").count()\n", "recall = predictions.filter(\"prediction = 1 AND is_fraud = 1\").count() / predictions.filter(\"is_fraud = 1\").count()\n", "\n", "print(f\"\\nModel Performance:\")\n", "print(f\"Accuracy: {accuracy:.4f}\")\n", "print(f\"Precision: {precision:.4f}\")\n", "print(f\"Recall: {recall:.4f}\")\n", "print(f\"AUC: {auc:.4f}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Key Takeaways: Delta Liquid Clustering + ML in AIDP\n", "\n", "\n", "\n", "### What We Demonstrated\n", "\n", "\n", "1. **Automatic Optimization**: Created a table with `CLUSTER BY (policy_id, claim_date)` and let Delta automatically optimize data layout\n", "\n", "\n", "2. **Performance Benefits**: Queries on clustered columns (policy_id, claim_date) are significantly faster due to data locality\n", "\n", "\n", "3. **Zero Maintenance**: No manual partitioning, bucketing, or Z-Ordering required - Delta handles it automatically\n", "\n", "\n", "4. **Machine Learning Integration**: Trained a fraud detection model using the optimized data\n", "\n", "\n", "5. **Real-World Use Case**: Insurance analytics where fraud detection and risk assessment are critical\n", "\n", "\n", "### AIDP Advantages\n", "\n", "\n", "- **Unified Analytics**: Seamlessly integrates data optimization with ML\n", "\n", "- **Governance**: Catalog and schema isolation for sensitive insurance data\n", "\n", "- **Performance**: Optimized for both analytical queries and ML training\n", "\n", "- **Scalability**: Handles insurance-scale data volumes effortlessly\n", "\n", "\n", "### Business Benefits for Insurance\n", "\n", "\n", "1. **Fraud Reduction**: Automated detection of suspicious claims\n", "\n", "2. **Cost Savings**: Reduced fraudulent payouts\n", "\n", "3. **Operational Efficiency**: Faster claim processing\n", "\n", "4. **Risk Management**: Better portfolio assessment\n", "\n", "5. **Customer Experience**: Quicker legitimate claim approvals\n", "\n", "\n", "### Best Practices for Insurance Analytics\n", "\n", "\n", "1. **Choose clustering columns** based on your most common query patterns\n", "\n", "2. **Start with 1-4 columns** - too many can reduce effectiveness\n", "\n", "3. **Consider cardinality** - high-cardinality columns work best\n", "\n", "4. **Monitor and adjust** as query patterns evolve\n", "\n", "5. **Combine with ML** for predictive analytics and automation\n", "\n", "\n", "### Next Steps\n", "\n", "\n", "- Explore other AIDP ML features like AutoML\n", "\n", "- Try liquid clustering with different column combinations\n", "\n", "- Scale up to larger insurance datasets\n", "\n", "- Integrate with real insurance systems and claims platforms\n", "\n", "- Deploy models for real-time fraud scoring\n", "\n", "\n", "This notebook demonstrates how Oracle AI Data Platform makes advanced insurance analytics accessible while maintaining enterprise-grade performance and governance." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" } }, "nbformat": 4, "nbformat_minor": 4 }