{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Financial Services: Medallion Architecture Demo\n", "\n", "## Overview\n", "\n", "This notebook demonstrates a **Medallion Architecture** implementation in Oracle AI Data Platform (AIDP) Workbench using a financial services analytics use case. The medallion architecture organizes data into three layers:\n", "\n", "- **Bronze Layer**: Raw data ingestion and storage\n", "- **Silver Layer**: Cleaned, validated, and structured data\n", "- **Gold Layer**: Aggregated, analytics-ready data with ML models\n", "\n", "### What is Medallion Architecture?\n", "\n", "The medallion architecture provides a structured approach to data processing:\n", "\n", "- **Bronze**: Raw, unprocessed data as ingested\n", "- **Silver**: Cleansed, validated, and enriched data\n", "- **Gold**: Business-ready data for analytics and ML\n", "\n", "### Use Case: Transaction Fraud Detection and Customer Analytics\n", "\n", "We'll analyze financial transaction records from a bank across all three layers, culminating in ML-powered fraud detection.\n", "\n", "### AIDP Environment Setup\n", "\n", "This notebook leverages the existing Spark session in your AIDP environment." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setup: Create Financial Services Catalog and Medallion Schemas\n", "\n", "### Catalog and Schema Design\n", "\n", "We'll create:\n", "- `finance.bronze`: Raw transaction data\n", "- `finance.silver`: Cleaned and validated transactions\n", "- `finance.gold`: Analytics and ML-ready data\n", "\n", "This structure provides data isolation and governance across layers." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Financial services catalog and medallion schemas created successfully!\n", "- finance.bronze: Raw transaction data\n", "- finance.silver: Cleaned and validated data\n", "- finance.gold: Analytics and ML-ready data\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create financial services catalog and medallion schemas\n", "\n", "spark.sql(\"CREATE CATALOG IF NOT EXISTS finance\")\n", "\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS finance.bronze\")\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS finance.silver\")\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS finance.gold\")\n", "\n", "print(\"Financial services catalog and medallion schemas created successfully!\")\n", "print(\"- finance.bronze: Raw transaction data\")\n", "print(\"- finance.silver: Cleaned and validated data\")\n", "print(\"- finance.gold: Analytics and ML-ready data\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Bronze Layer: Raw Data Ingestion\n", "\n", "### Bronze Layer Design\n", "\n", "The bronze layer stores raw transaction data as ingested, with minimal processing. We'll use Delta tables with liquid clustering for optimal performance.\n", "\n", "### Table: `account_transactions_bronze`\n", "\n", "- Raw transaction records with all original fields\n", "- Liquid clustering on `account_id` and `transaction_date`\n", "- Preserves data integrity and auditability" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Bronze layer table created successfully!\n", "Liquid clustering will automatically optimize data layout for account_id and transaction_date queries.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create Bronze Layer Delta table with liquid clustering\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS finance.bronze.account_transactions_bronze (\n", " account_id STRING,\n", " transaction_date TIMESTAMP,\n", " transaction_type STRING,\n", " amount DECIMAL(15,2),\n", " merchant_category STRING,\n", " location STRING,\n", " risk_score INT,\n", " ingestion_timestamp TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (account_id, transaction_date)\n", "\"\"\")\n", "\n", "print(\"Bronze layer table created successfully!\")\n", "print(\"Liquid clustering will automatically optimize data layout for account_id and transaction_date queries.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Generated 150236 raw account transaction records for Bronze layer\n", "Sample record: {'account_id': 'ACC00000001', 'transaction_date': datetime.datetime(2024, 12, 3, 8, 0), 'transaction_type': 'ATM', 'amount': -414.05, 'merchant_category': 'Restaurant', 'location': 'ATM', 'risk_score': 25}\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Generate sample financial transaction data for Bronze layer\n", "\n", "import random\n", "from datetime import datetime, timedelta\n", "\n", "# Define financial data constants\n", "TRANSACTION_TYPES = ['Deposit', 'Withdrawal', 'Transfer', 'Payment', 'ATM']\n", "MERCHANT_CATEGORIES = ['Retail', 'Restaurant', 'Online', 'Utilities', 'Entertainment', 'Groceries', 'Healthcare', 'Transportation']\n", "LOCATIONS = ['New York, NY', 'Los Angeles, CA', 'Chicago, IL', 'Houston, TX', 'Miami, FL', 'Online', 'ATM']\n", "\n", "# Generate account transaction records\n", "transaction_data = []\n", "base_date = datetime(2024, 1, 1)\n", "\n", "# Create 5,000 accounts with 10-50 transactions each\n", "for account_num in range(1, 5001):\n", " account_id = f\"ACC{account_num:08d}\"\n", " \n", " # Each account gets 10-50 transactions over 12 months\n", " num_transactions = random.randint(10, 50)\n", " \n", " for i in range(num_transactions):\n", " # Spread transactions over 12 months with realistic timing\n", " days_offset = random.randint(0, 365)\n", " hours_offset = random.randint(0, 23)\n", " transaction_date = base_date + timedelta(days=days_offset, hours=hours_offset)\n", " \n", " # Select transaction type\n", " transaction_type = random.choice(TRANSACTION_TYPES)\n", " \n", " # Amount based on transaction type\n", " if transaction_type in ['Deposit', 'Transfer']:\n", " amount = round(random.uniform(100, 10000), 2)\n", " elif transaction_type == 'ATM':\n", " amount = round(random.uniform(20, 500), 2) * -1\n", " else:\n", " amount = round(random.uniform(10, 2000), 2) * -1\n", " \n", " # Select merchant category and location\n", " merchant_category = random.choice(MERCHANT_CATEGORIES)\n", " if transaction_type == 'ATM':\n", " location = 'ATM'\n", " elif transaction_type == 'Online':\n", " location = 'Online'\n", " else:\n", " location = random.choice(LOCATIONS)\n", " \n", " # Risk score (0-100, higher = more suspicious)\n", " risk_score = random.randint(0, 100)\n", " \n", " transaction_data.append({\n", " \"account_id\": account_id,\n", " \"transaction_date\": transaction_date,\n", " \"transaction_type\": transaction_type,\n", " \"amount\": amount,\n", " \"merchant_category\": merchant_category,\n", " \"location\": location,\n", " \"risk_score\": risk_score\n", " })\n", "\n", "print(f\"Generated {len(transaction_data)} raw account transaction records for Bronze layer\")\n", "print(\"Sample record:\", transaction_data[0])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Bronze Layer DataFrame Schema:\n", "root\n", " |-- account_id: string (nullable = true)\n", " |-- amount: double (nullable = true)\n", " |-- location: string (nullable = true)\n", " |-- merchant_category: string (nullable = true)\n", " |-- risk_score: long (nullable = true)\n", " |-- transaction_date: timestamp (nullable = true)\n", " |-- transaction_type: string (nullable = true)\n", "\n", "\n", "Sample Bronze Data:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+--------+-----------+-----------------+----------+-------------------+----------------+\n", "| account_id| amount| location|merchant_category|risk_score| transaction_date|transaction_type|\n", "+-----------+--------+-----------+-----------------+----------+-------------------+----------------+\n", "|ACC00000001| -414.05| ATM| Restaurant| 25|2024-12-03 08:00:00| ATM|\n", "|ACC00000001|-1275.77| ATM| Transportation| 25|2024-12-25 17:00:00| Withdrawal|\n", "|ACC00000001| -40.76| ATM| Transportation| 87|2024-05-16 03:00:00| ATM|\n", "|ACC00000001| -501.45|Houston, TX| Restaurant| 74|2024-07-23 09:00:00| Withdrawal|\n", "|ACC00000001| -298.66| ATM| Transportation| 52|2024-06-18 02:00:00| ATM|\n", "+-----------+--------+-----------+-----------------+----------+-------------------+----------------+\n", "only showing top 5 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Successfully inserted 150236 raw records into Bronze layer\n", "Data is now available for Silver layer processing.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Insert raw data into Bronze layer\n", "\n", "# Create DataFrame from generated data\n", "df_bronze = spark.createDataFrame(transaction_data)\n", "\n", "# Display schema and sample data\n", "print(\"Bronze Layer DataFrame Schema:\")\n", "df_bronze.printSchema()\n", "\n", "print(\"\\nSample Bronze Data:\")\n", "df_bronze.show(5)\n", "\n", "# Insert data into Bronze table\n", "df_bronze.write.mode(\"overwrite\").saveAsTable(\"finance.bronze.account_transactions_bronze\")\n", "\n", "print(f\"\\nSuccessfully inserted {df_bronze.count()} raw records into Bronze layer\")\n", "print(\"Data is now available for Silver layer processing.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Silver Layer: Data Cleaning and Validation\n", "\n", "### Silver Layer Design\n", "\n", "The silver layer provides cleaned, validated, and enriched data. We'll:\n", "\n", "- Remove invalid records\n", "- Standardize data formats\n", "- Add data quality metrics\n", "- Enrich with derived fields\n", "\n", "### Table: `account_transactions_silver`\n", "\n", "- Cleaned transaction data with validation flags\n", "- Enhanced with temporal features\n", "- Ready for analytical processing" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Silver layer table created successfully!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create Silver Layer Delta table\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS finance.silver.account_transactions_silver (\n", " account_id STRING,\n", " transaction_date TIMESTAMP,\n", " transaction_type STRING,\n", " amount DECIMAL(15,2),\n", " merchant_category STRING,\n", " location STRING,\n", " risk_score INT,\n", " month INT,\n", " day_of_week INT,\n", " hour INT,\n", " is_valid BOOLEAN,\n", " data_quality_score DOUBLE,\n", " processed_timestamp TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (account_id, transaction_date)\n", "\"\"\")\n", "\n", "print(\"Silver layer table created successfully!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Read 150236 records from Bronze layer\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "After validation: 150236 valid records\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Filtered out 0 invalid records\n", "\n", "Sample Silver Layer Data:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+-------------------+----------------+--------+----------+--------+-------------------+\n", "| account_id| transaction_date|transaction_type| amount|risk_score|is_valid| data_quality_score|\n", "+-----------+-------------------+----------------+--------+----------+--------+-------------------+\n", "|ACC00001217|2024-11-07 14:00:00| Withdrawal| -834.63| 38| true| 0.734|\n", "|ACC00001217|2024-07-19 09:00:00| Deposit| 1955.05| 19| true| 0.867|\n", "|ACC00001217|2024-03-27 19:00:00| Withdrawal|-1649.43| 52| true| 0.6359999999999999|\n", "|ACC00001217|2024-11-24 12:00:00| Deposit| 7660.27| 52| true| 0.6359999999999999|\n", "|ACC00001217|2024-12-27 05:00:00| Deposit| 3054.73| 76| true|0.46799999999999997|\n", "+-----------+-------------------+----------------+--------+----------+--------+-------------------+\n", "only showing top 5 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Process Bronze data to Silver layer\n", "\n", "from pyspark.sql.functions import col, when, month, dayofweek, hour, lit\n", "\n", "# Read from Bronze layer\n", "bronze_df = spark.table(\"finance.bronze.account_transactions_bronze\")\n", "\n", "print(f\"Read {bronze_df.count()} records from Bronze layer\")\n", "\n", "# Data validation and cleaning\n", "silver_df = bronze_df \\\n", " .withColumn(\"month\", month(col(\"transaction_date\"))) \\\n", " .withColumn(\"day_of_week\", dayofweek(col(\"transaction_date\"))) \\\n", " .withColumn(\"hour\", hour(col(\"transaction_date\"))) \\\n", " .withColumn(\"is_valid\", \n", " when((col(\"amount\").isNotNull()) & \n", " (col(\"account_id\").isNotNull()) & \n", " (col(\"transaction_date\").isNotNull()), True).otherwise(False)) \\\n", " .withColumn(\"data_quality_score\", \n", " when(col(\"is_valid\"), \n", " (lit(1.0) - (col(\"risk_score\") / lit(100.0))) * lit(0.7) + lit(0.3)).otherwise(0.0))\n", "\n", "# Filter out invalid records\n", "valid_silver_df = silver_df.filter(col(\"is_valid\") == True)\n", "\n", "print(f\"After validation: {valid_silver_df.count()} valid records\")\n", "print(f\"Filtered out {bronze_df.count() - valid_silver_df.count()} invalid records\")\n", "\n", "# Show sample cleaned data\n", "print(\"\\nSample Silver Layer Data:\")\n", "valid_silver_df.select(\"account_id\", \"transaction_date\", \"transaction_type\", \"amount\", \"risk_score\", \"is_valid\", \"data_quality_score\").show(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Successfully inserted 150236 cleaned records into Silver layer\n", "Data is now validated, enriched, and ready for Gold layer analytics.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Insert cleaned data into Silver layer\n", "\n", "valid_silver_df.write.mode(\"overwrite\").saveAsTable(\"finance.silver.account_transactions_silver\")\n", "\n", "print(f\"Successfully inserted {valid_silver_df.count()} cleaned records into Silver layer\")\n", "print(\"Data is now validated, enriched, and ready for Gold layer analytics.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Gold Layer: Analytics and ML-Ready Data\n", "\n", "### Gold Layer Design\n", "\n", "The gold layer provides business-ready analytics and ML features. We'll create:\n", "\n", "- Aggregated analytics tables\n", "- ML-ready feature engineering\n", "- Fraud detection model training\n", "\n", "### Tables in Gold Layer\n", "\n", "- `transaction_analytics_gold`: Aggregated transaction metrics\n", "- `fraud_detection_model_gold`: ML-ready features for fraud detection" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Gold layer analytics table created successfully!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create Gold Layer Analytics Table\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS finance.gold.transaction_analytics_gold (\n", " account_id STRING,\n", " month_year STRING,\n", " total_transactions BIGINT,\n", " total_amount DECIMAL(15,2),\n", " avg_amount DECIMAL(15,2),\n", " avg_risk_score DOUBLE,\n", " high_risk_transactions BIGINT,\n", " transaction_types MAP,\n", " merchant_categories MAP,\n", " created_timestamp TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (account_id, month_year)\n", "\"\"\")\n", "\n", "print(\"Gold layer analytics table created successfully!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Gold layer ML features table created successfully!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create Gold Layer ML Features Table\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS finance.gold.fraud_detection_model_gold (\n", " account_id STRING,\n", " transaction_date TIMESTAMP,\n", " transaction_type STRING,\n", " amount DECIMAL(15,2),\n", " merchant_category STRING,\n", " location STRING,\n", " risk_score INT,\n", " month INT,\n", " day_of_week INT,\n", " hour INT,\n", " is_fraud INT,\n", " created_timestamp TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (account_id, transaction_date)\n", "\"\"\")\n", "\n", "print(\"Gold layer ML features table created successfully!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Generated 52825 monthly analytics records\n", "\n", "Sample Gold Analytics:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+----------+------------------+------------------+------------------+----------------------+\n", "| account_id|month_year|total_transactions| total_amount| avg_risk_score|high_risk_transactions|\n", "+-----------+----------+------------------+------------------+------------------+----------------------+\n", "|ACC00001246| 2024-07| 3| 2394.27|52.333333333333336| 1|\n", "|ACC00001259| 2024-08| 2| 7260.81| 4.5| 0|\n", "|ACC00001309| 2024-06| 5| 18283.59| 45.8| 2|\n", "|ACC00001347| 2024-04| 3| 5311.36|40.333333333333336| 0|\n", "|ACC00001347| 2024-01| 4|-950.8899999999999| 59.0| 3|\n", "+-----------+----------+------------------+------------------+------------------+----------------------+\n", "only showing top 5 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Generate Gold Layer Analytics from Silver Data\n", "\n", "from pyspark.sql.functions import date_format, collect_list, map_from_entries, struct, sum as sum_func, avg, count, when\n", "\n", "# Read Silver data\n", "silver_df = spark.table(\"finance.silver.account_transactions_silver\")\n", "\n", "# Create monthly aggregations\n", "analytics_df = silver_df \\\n", " .withColumn(\"month_year\", date_format(col(\"transaction_date\"), \"yyyy-MM\")) \\\n", " .groupBy(\"account_id\", \"month_year\") \\\n", " .agg(\n", " count(\"*\").alias(\"total_transactions\"),\n", " sum_func(\"amount\").alias(\"total_amount\"),\n", " avg(\"amount\").alias(\"avg_amount\"),\n", " avg(\"risk_score\").alias(\"avg_risk_score\"),\n", " sum_func(when(col(\"risk_score\") > 60, 1).otherwise(0)).alias(\"high_risk_transactions\")\n", " )\n", "\n", "# Add transaction type and merchant category distributions\n", "type_dist = silver_df \\\n", " .withColumn(\"month_year\", date_format(col(\"transaction_date\"), \"yyyy-MM\")) \\\n", " .groupBy(\"account_id\", \"month_year\", \"transaction_type\") \\\n", " .agg(count(\"*\").alias(\"count\")) \\\n", " .groupBy(\"account_id\", \"month_year\") \\\n", " .agg(collect_list(struct(\"transaction_type\", \"count\")).alias(\"transaction_types_list\"))\n", "\n", "merchant_dist = silver_df \\\n", " .withColumn(\"month_year\", date_format(col(\"transaction_date\"), \"yyyy-MM\")) \\\n", " .groupBy(\"account_id\", \"month_year\", \"merchant_category\") \\\n", " .agg(count(\"*\").alias(\"count\")) \\\n", " .groupBy(\"account_id\", \"month_year\") \\\n", " .agg(collect_list(struct(\"merchant_category\", \"count\")).alias(\"merchant_categories_list\"))\n", "\n", "# Join aggregations\n", "gold_analytics = analytics_df \\\n", " .join(type_dist, [\"account_id\", \"month_year\"], \"left\") \\\n", " .join(merchant_dist, [\"account_id\", \"month_year\"], \"left\") \\\n", " .withColumn(\"transaction_types\", map_from_entries(col(\"transaction_types_list\"))) \\\n", " .withColumn(\"merchant_categories\", map_from_entries(col(\"merchant_categories_list\"))) \\\n", " .drop(\"transaction_types_list\", \"merchant_categories_list\")\n", "\n", "print(f\"Generated {gold_analytics.count()} monthly analytics records\")\n", "print(\"\\nSample Gold Analytics:\")\n", "gold_analytics.select(\"account_id\", \"month_year\", \"total_transactions\", \"total_amount\", \"avg_risk_score\", \"high_risk_transactions\").show(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Successfully inserted 52825 analytics records into Gold layer\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Insert analytics into Gold layer\n", "\n", "gold_analytics.write.mode(\"overwrite\").saveAsTable(\"finance.gold.transaction_analytics_gold\")\n", "\n", "print(f\"Successfully inserted {gold_analytics.count()} analytics records into Gold layer\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Prepared 150236 records for ML feature engineering\n", "Fraud distribution:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+--------+-----+\n", "|is_fraud|count|\n", "+--------+-----+\n", "| 1|59577|\n", "| 0|90659|\n", "+--------+-----+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Prepare ML Features for Gold Layer\n", "\n", "# Read Silver data for ML\n", "ml_data = silver_df.withColumn(\"is_fraud\", when(col(\"risk_score\") > 60, 1).otherwise(0))\n", "\n", "print(f\"Prepared {ml_data.count()} records for ML feature engineering\")\n", "print(\"Fraud distribution:\")\n", "ml_data.groupBy(\"is_fraud\").count().show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Successfully inserted 150236 ML-ready records into Gold layer\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Insert into Gold layer\n", "\n", "# For simplicity, we'll store the features without full vectorization in the table\n", "# In practice, you'd include properly scaled features\n", "ml_gold_df = ml_data.select(\n", " \"account_id\", \"transaction_date\", \"transaction_type\", \"amount\", \n", " \"merchant_category\", \"location\", \"risk_score\", \"month\", \n", " \"day_of_week\", \"hour\", \"is_fraud\"\n", ")\n", "\n", "ml_gold_df.write.mode(\"overwrite\").saveAsTable(\"finance.gold.fraud_detection_model_gold\")\n", "\n", "print(f\"Successfully inserted {ml_gold_df.count()} ML-ready records into Gold layer\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Gold Layer: ML Model Training and Evaluation\n", "\n", "### Fraud Detection Model\n", "\n", "Now we'll train a Random Forest model using the ML-ready data from the Gold layer to predict fraudulent transactions." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Loaded 150236 records from Gold layer for ML training\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+--------+-----+\n", "|is_fraud|count|\n", "+--------+-----+\n", "| 1|59577|\n", "| 0|90659|\n", "+--------+-----+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Load Gold layer ML data for training\n", "\n", "from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler\n", "from pyspark.ml.classification import RandomForestClassifier\n", "from pyspark.ml.evaluation import BinaryClassificationEvaluator\n", "from pyspark.ml import Pipeline\n", "import pyspark.sql.functions as F\n", "\n", "# Load data from Gold layer\n", "gold_ml_data = spark.table(\"finance.gold.fraud_detection_model_gold\")\n", "\n", "print(f\"Loaded {gold_ml_data.count()} records from Gold layer for ML training\")\n", "gold_ml_data.groupBy(\"is_fraud\").count().show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Training set: 120258 records\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Test set: 29978 records\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Feature engineering pipeline\n", "\n", "# Create indexers for categorical variables\n", "transaction_type_indexer = StringIndexer(inputCol=\"transaction_type\", outputCol=\"transaction_type_index\")\n", "merchant_category_indexer = StringIndexer(inputCol=\"merchant_category\", outputCol=\"merchant_category_index\")\n", "location_indexer = StringIndexer(inputCol=\"location\", outputCol=\"location_index\")\n", "\n", "# Assemble features\n", "assembler = VectorAssembler(\n", " inputCols=[\"amount\", \"month\", \"day_of_week\", \"hour\", \n", " \"transaction_type_index\", \"merchant_category_index\", \"location_index\"],\n", " outputCol=\"features\"\n", ")\n", "\n", "# Scale features\n", "scaler = StandardScaler(inputCol=\"features\", outputCol=\"scaled_features\")\n", "\n", "# Create and train the model\n", "rf = RandomForestClassifier(\n", " labelCol=\"is_fraud\", \n", " featuresCol=\"scaled_features\",\n", " numTrees=100,\n", " maxDepth=10\n", ")\n", "\n", "# Create pipeline\n", "pipeline = Pipeline(stages=[transaction_type_indexer, merchant_category_indexer, location_indexer, assembler, scaler, rf])\n", "\n", "# Split data\n", "train_data, test_data = gold_ml_data.randomSplit([0.8, 0.2], seed=42)\n", "\n", "print(f\"Training set: {train_data.count()} records\")\n", "print(f\"Test set: {test_data.count()} records\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Training fraud detection model...\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Model AUC: 0.5077\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+--------+----------+--------+----------+--------------------+\n", "| account_id| amount|risk_score|is_fraud|prediction| probability|\n", "+-----------+--------+----------+--------+----------+--------------------+\n", "|ACC00001217| 8519.66| 32| 0| 0.0|[0.58610508814702...|\n", "|ACC00001217| -479.98| 59| 0| 0.0|[0.61198712376148...|\n", "|ACC00001217|-1732.56| 20| 0| 0.0|[0.63416382394754...|\n", "|ACC00001217| -1741.8| 17| 0| 0.0|[0.64010742470468...|\n", "|ACC00001217| -139.08| 52| 0| 0.0|[0.60456059677813...|\n", "|ACC00001217| 2030.16| 60| 0| 0.0|[0.60494424132128...|\n", "|ACC00001217| -485.66| 56| 0| 0.0|[0.58573687275789...|\n", "|ACC00001218|-1464.53| 52| 0| 0.0|[0.64161189482873...|\n", "|ACC00001218| 3339.41| 35| 0| 0.0|[0.59683919284296...|\n", "|ACC00001218| 5661.1| 26| 0| 0.0|[0.61682637643500...|\n", "+-----------+--------+----------+--------+----------+--------------------+\n", "only showing top 10 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Train the fraud detection model\n", "\n", "print(\"Training fraud detection model...\")\n", "model = pipeline.fit(train_data)\n", "\n", "# Make predictions\n", "predictions = model.transform(test_data)\n", "\n", "# Evaluate the model\n", "evaluator = BinaryClassificationEvaluator(labelCol=\"is_fraud\", metricName=\"areaUnderROC\")\n", "auc = evaluator.evaluate(predictions)\n", "\n", "print(f\"Model AUC: {auc:.4f}\")\n", "\n", "# Show prediction results\n", "predictions.select(\"account_id\", \"amount\", \"risk_score\", \"is_fraud\", \"prediction\", \"probability\").show(10)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "+--------+----------+-----+\n", "|is_fraud|prediction|count|\n", "+--------+----------+-----+\n", "| 1| 0.0|11954|\n", "| 0| 0.0|18020|\n", "| 1| 1.0| 1|\n", "| 0| 1.0| 3|\n", "+--------+----------+-----+\n", "\n", "\n", "=== Feature Importance ===\n", "amount: 0.2039\n", "month: 0.1645\n", "day_of_week: 0.1137\n", "hour: 0.2013\n", "transaction_type: 0.0601\n", "merchant_category: 0.1366\n", "location: 0.1200\n", "\n", "=== Business Impact Analysis ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Total test transactions: 29978\n", "Transactions flagged as high-risk: 4\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Percentage flagged: 0.0%\n", "Total amount of flagged transactions: $7,527.98\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Model Performance:\n", "Accuracy: 0.6011\n", "Precision: 0.2500\n", "Recall: 0.0001\n", "AUC: 0.5077\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Model evaluation and business insights\n", "\n", "# Calculate confusion matrix\n", "confusion_matrix = predictions.groupBy(\"is_fraud\", \"prediction\").count()\n", "confusion_matrix.show()\n", "\n", "# Feature importance\n", "rf_model = model.stages[-1]\n", "feature_importance = rf_model.featureImportances\n", "feature_names = [\"amount\", \"month\", \"day_of_week\", \"hour\", \"transaction_type\", \"merchant_category\", \"location\"]\n", "\n", "print(\"\\n=== Feature Importance ===\")\n", "for name, importance in zip(feature_names, feature_importance):\n", " print(f\"{name}: {importance:.4f}\")\n", "\n", "# Business impact analysis\n", "print(\"\\n=== Business Impact Analysis ===\")\n", "\n", "# Calculate potential savings from fraud detection\n", "fraud_predictions = predictions.filter(\"prediction = 1\")\n", "high_risk_transactions = fraud_predictions.count()\n", "total_flagged_amount = fraud_predictions.agg(F.sum(F.abs(\"amount\"))).collect()[0][0] or 0\n", "\n", "total_test_amount = test_data.agg(F.sum(F.abs(\"amount\"))).collect()[0][0] or 0\n", "\n", "print(f\"Total test transactions: {test_data.count()}\")\n", "print(f\"Transactions flagged as high-risk: {high_risk_transactions}\")\n", "print(f\"Percentage flagged: {(high_risk_transactions/test_data.count())*100:.1f}%\")\n", "print(f\"Total amount of flagged transactions: ${total_flagged_amount:,.2f}\")\n", "\n", "# Accuracy metrics\n", "accuracy = predictions.filter(\"is_fraud = prediction\").count() / predictions.count()\n", "precision = predictions.filter(\"prediction = 1 AND is_fraud = 1\").count() / predictions.filter(\"prediction = 1\").count() if predictions.filter(\"prediction = 1\").count() > 0 else 0\n", "recall = predictions.filter(\"prediction = 1 AND is_fraud = 1\").count() / predictions.filter(\"is_fraud = 1\").count() if predictions.filter(\"is_fraud = 1\").count() > 0 else 0\n", "\n", "print(f\"\\nModel Performance:\")\n", "print(f\"Accuracy: {accuracy:.4f}\")\n", "print(f\"Precision: {precision:.4f}\")\n", "print(f\"Recall: {recall:.4f}\")\n", "print(f\"AUC: {auc:.4f}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Query Examples Across Medallion Layers\n", "\n", "### Bronze Layer Queries\n", "Raw data access for audit and debugging" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Bronze Layer: Raw Transaction Data ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+-------------------+----------------+--------+----------+\n", "| account_id| transaction_date|transaction_type| amount|risk_score|\n", "+-----------+-------------------+----------------+--------+----------+\n", "|ACC00000001|2024-12-25 17:00:00| Withdrawal|-1275.77| 25|\n", "|ACC00000001|2024-12-03 08:00:00| ATM| -414.05| 25|\n", "|ACC00000001|2024-10-28 03:00:00| Transfer| 5309.08| 50|\n", "|ACC00000001|2024-10-15 14:00:00| Deposit| 987.68| 71|\n", "|ACC00000001|2024-10-05 06:00:00| Deposit| 9044.44| 53|\n", "+-----------+-------------------+----------------+--------+----------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Bronze Layer: Raw data queries\n", "\n", "print(\"=== Bronze Layer: Raw Transaction Data ===\")\n", "bronze_sample = spark.sql(\"\"\"\n", "SELECT account_id, transaction_date, transaction_type, amount, risk_score\n", "FROM finance.bronze.account_transactions_bronze\n", "WHERE account_id = 'ACC00000001'\n", "ORDER BY transaction_date DESC\n", "LIMIT 5\n", "\"\"\")\n", "bronze_sample.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Silver Layer: Validated Transaction Data ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+-------------------+----------------+--------+----------+--------+------------------+\n", "| account_id| transaction_date|transaction_type| amount|risk_score|is_valid|data_quality_score|\n", "+-----------+-------------------+----------------+--------+----------+--------+------------------+\n", "|ACC00000001|2024-12-25 17:00:00| Withdrawal|-1275.77| 25| true| 0.825|\n", "|ACC00000001|2024-12-03 08:00:00| ATM| -414.05| 25| true| 0.825|\n", "|ACC00000001|2024-10-28 03:00:00| Transfer| 5309.08| 50| true|0.6499999999999999|\n", "|ACC00000001|2024-10-15 14:00:00| Deposit| 987.68| 71| true| 0.503|\n", "|ACC00000001|2024-10-05 06:00:00| Deposit| 9044.44| 53| true| 0.629|\n", "+-----------+-------------------+----------------+--------+----------+--------+------------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Silver Layer: Cleaned data queries\n", "\n", "print(\"=== Silver Layer: Validated Transaction Data ===\")\n", "silver_sample = spark.sql(\"\"\"\n", "SELECT account_id, transaction_date, transaction_type, amount, risk_score, is_valid, data_quality_score\n", "FROM finance.silver.account_transactions_silver\n", "WHERE account_id = 'ACC00000001' AND is_valid = true\n", "ORDER BY transaction_date DESC\n", "LIMIT 5\n", "\"\"\")\n", "silver_sample.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Gold Layer: Account Analytics ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+----------+------------------+------------+--------------+----------------------+\n", "| account_id|month_year|total_transactions|total_amount|avg_risk_score|high_risk_transactions|\n", "+-----------+----------+------------------+------------+--------------+----------------------+\n", "|ACC00000001| 2024-12| 2| -1689.82| 25.0| 0|\n", "|ACC00000001| 2024-10| 3| 15341.2| 58.0| 1|\n", "|ACC00000001| 2024-08| 2| -3408.02| 51.0| 1|\n", "+-----------+----------+------------------+------------+--------------+----------------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Gold Layer: Analytics queries\n", "\n", "print(\"=== Gold Layer: Account Analytics ===\")\n", "gold_sample = spark.sql(\"\"\"\n", "SELECT account_id, month_year, total_transactions, total_amount, avg_risk_score, high_risk_transactions\n", "FROM finance.gold.transaction_analytics_gold\n", "WHERE account_id = 'ACC00000001'\n", "ORDER BY month_year DESC\n", "LIMIT 3\n", "\"\"\")\n", "gold_sample.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Key Takeaways: Medallion Architecture in AIDP\n", "\n", "### What We Demonstrated\n", "\n", "1. **Bronze Layer**: Raw data ingestion with Delta liquid clustering\n", "2. **Silver Layer**: Data validation, cleaning, and enrichment\n", "3. **Gold Layer**: Analytics aggregation and ML model training\n", "4. **End-to-End Pipeline**: Complete medallion architecture in a single notebook\n", "\n", "### AIDP Advantages\n", "\n", "- **Unified Platform**: Seamless data flow between layers\n", "- **Governance**: Catalog and schema isolation\n", "- **Performance**: Optimized with liquid clustering\n", "- **ML Integration**: Built-in ML capabilities\n", "\n", "### Business Benefits\n", "\n", "1. **Data Quality**: Progressive improvement through layers\n", "2. **Analytics Ready**: Business-focused aggregations\n", "3. **ML Automation**: Fraud detection and risk assessment\n", "4. **Scalability**: Handles large financial datasets\n", "5. **Governance**: Audit trails and data lineage\n", "\n", "### Best Practices\n", "\n", "1. **Layer Isolation**: Keep raw data separate from processed data\n", "2. **Incremental Processing**: Build upon validated foundations\n", "3. **Business Alignment**: Gold layer matches business needs\n", "4. **Performance Optimization**: Use clustering strategically\n", "5. **ML Integration**: Include predictive analytics in gold layer\n", "\n", "This notebook demonstrates how Oracle AI Data Platform enables sophisticated financial services analytics with proper data architecture and governance." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" } }, "nbformat": 4, "nbformat_minor": 4 }