{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Transportation Analytics: Medallion Architecture Demo with Delta Liquid Clustering\n", "\n", "## Overview\n", "\n", "This notebook demonstrates the **Medallion Architecture** in Oracle AI Data Platform (AIDP) Workbench using a transportation and logistics analytics use case. The medallion architecture organizes data into Bronze, Silver, and Gold layers, with Delta Liquid Clustering providing automatic optimization at each layer.\n", "\n", "### What is Medallion Architecture?\n", "\n", "- **Bronze Layer**: Raw, unprocessed data as ingested from source systems\n", "- **Silver Layer**: Cleaned, enriched, and transformed data with business logic applied\n", "- **Gold Layer**: Aggregated, business-ready data optimized for analytics and ML\n", "\n", "### What is Liquid Clustering?\n", "\n", "Liquid clustering automatically identifies and groups similar data together based on clustering columns you define. This optimization happens automatically during data ingestion and maintenance operations.\n", "\n", "### Use Case: Fleet Management and Route Optimization\n", "\n", "We'll analyze transportation fleet operations and logistics data across all three medallion layers, incorporating machine learning for predictive maintenance.\n", "\n", "### AIDP Environment Setup\n", "\n", "This notebook leverages the existing Spark session in your AIDP environment." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 1: Create Transportation Catalog and Schemas\n", "\n", "### Medallion Schema Structure\n", "\n", "We'll create separate schemas for each medallion layer:\n", "- `transportation.bronze`: Raw data storage\n", "- `transportation.silver`: Cleaned and enriched data\n", "- `transportation.gold`: Business analytics and ML-ready data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create transportation catalog and medallion schemas\n", "\n", "# In AIDP, catalogs provide data isolation and governance\n", "\n", "spark.sql(\"CREATE CATALOG IF NOT EXISTS transportation\")\n", "\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS transportation.bronze\")\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS transportation.silver\")\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS transportation.gold\")\n", "\n", "print(\"Transportation catalog and medallion schemas created successfully!\")\n", "print(\"- Bronze: Raw data ingestion\")\n", "print(\"- Silver: Cleaned and enriched data\")\n", "print(\"- Gold: Analytics and ML-ready data\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 2: Bronze Layer - Raw Data Ingestion\n", "\n", "### Bronze Layer Design\n", "\n", "The bronze layer stores raw data exactly as received, with minimal transformations:\n", "\n", "- **Data quality checks**: Basic validation\n", "- **Partitioning/Clustering**: Optimized for ingestion patterns\n", "- **Retention**: Raw data preserved for reprocessing\n", "- **Schema**: Flexible to accommodate source system changes" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create Bronze Layer: Raw fleet trips table\n", "\n", "# CLUSTER BY vehicle_id for efficient ingestion and vehicle-based queries\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS transportation.bronze.fleet_trips_raw (\n", " vehicle_id STRING,\n", " trip_date TIMESTAMP,\n", " route_id STRING,\n", " distance DECIMAL(8,2),\n", " duration DECIMAL(6,2),\n", " fuel_consumed DECIMAL(6,2),\n", " load_factor INT,\n", " ingestion_timestamp TIMESTAMP,\n", " source_system STRING\n", ")\n", "USING DELTA\n", "CLUSTER BY (vehicle_id)\n", "\"\"\")\n", "\n", "print(\"Bronze layer table created successfully!\")\n", "print(\"Liquid clustering by vehicle_id optimizes for ingestion patterns.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Generated 20077 raw fleet trip records (simulating IoT sensor data)\n", "Raw data includes noise, outliers, and potential missing values as would be expected from sensors.\n", "Vehicle profiles: High performers (30%), Medium performers (50%), Low performers (20%)\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Generate and ingest raw transportation fleet data\n", "\n", "# Using fully qualified imports to avoid conflicts\n", "\n", "import random\n", "from datetime import datetime, timedelta\n", "\n", "# Vehicle performance profiles to create more interesting data\n", "VEHICLE_PROFILES = {\n", " 'high_performer': {\n", " 'fuel_multiplier': 0.8, # Better fuel efficiency\n", " 'efficiency_variation': 0.1, # Low variation\n", " 'load_factor_bonus': 10,\n", " 'maintenance_risk': 0.05, # 5% chance of issues\n", " 'trip_frequency': 'high'\n", " },\n", " 'medium_performer': {\n", " 'fuel_multiplier': 1.0, # Average efficiency\n", " 'efficiency_variation': 0.3, # Medium variation\n", " 'load_factor_bonus': 0,\n", " 'maintenance_risk': 0.3, # 30% chance of issues\n", " 'trip_frequency': 'medium'\n", " },\n", " 'low_performer': {\n", " 'fuel_multiplier': 1.4, # Poor fuel efficiency\n", " 'efficiency_variation': 0.5, # High variation\n", " 'load_factor_bonus': -5,\n", " 'maintenance_risk': 0.7, # 70% chance of issues\n", " 'trip_frequency': 'low'\n", " }\n", "}\n", "\n", "# Base trip parameters by route type (simulating raw sensor data with some noise)\n", "TRIP_PARAMS = {\n", " 'Urban Delivery': {'avg_distance': 45, 'avg_duration': 120, 'avg_fuel': 8.5, 'load_factor': 85},\n", " 'Long-haul': {'avg_distance': 450, 'avg_duration': 480, 'avg_fuel': 65.0, 'load_factor': 92},\n", " 'Local Transport': {'avg_distance': 120, 'avg_duration': 180, 'avg_fuel': 15.2, 'load_factor': 78},\n", " 'Express Delivery': {'avg_distance': 80, 'avg_duration': 90, 'avg_fuel': 12.8, 'load_factor': 95}\n", "}\n", "\n", "# Generate raw fleet trip records (simulating IoT sensor data)\n", "raw_trip_data = []\n", "base_date = datetime(2024, 1, 1)\n", "\n", "# Create 500 vehicles with different performance profiles\n", "for vehicle_num in range(1, 501):\n", " vehicle_id = f\"VH{vehicle_num:04d}\"\n", " \n", " # Assign vehicle to performance profile (weighted distribution)\n", " profile_weights = [0.3, 0.5, 0.2] # 30% high, 50% medium, 20% low performers\n", " profile_name = random.choices(list(VEHICLE_PROFILES.keys()), weights=profile_weights)[0]\n", " profile = VEHICLE_PROFILES[profile_name]\n", " \n", " # Determine number of trips based on profile\n", " if profile['trip_frequency'] == 'high':\n", " num_trips = random.randint(45, 60)\n", " elif profile['trip_frequency'] == 'medium':\n", " num_trips = random.randint(30, 45)\n", " else: # low\n", " num_trips = random.randint(20, 30)\n", " \n", " for i in range(num_trips):\n", " # Spread trips over 12 months\n", " days_offset = random.randint(0, 365)\n", " trip_date = base_date + timedelta(days=days_offset)\n", " \n", " # Add realistic timing (more trips during business hours)\n", " hour_weights = [1, 1, 1, 1, 1, 3, 8, 10, 12, 10, 8, 6, 8, 9, 8, 7, 6, 5, 3, 2, 2, 1, 1, 1]\n", " hours_offset = random.choices(range(24), weights=hour_weights)[0]\n", " trip_date = trip_date.replace(hour=hours_offset, minute=random.randint(0, 59), second=0, microsecond=0)\n", " \n", " # Select route type\n", " route_type = random.choice(ROUTE_TYPES)\n", " params = TRIP_PARAMS[route_type]\n", " \n", " # Apply vehicle profile adjustments\n", " base_fuel = params['avg_fuel'] * profile['fuel_multiplier']\n", " base_load_factor = params['load_factor'] + profile['load_factor_bonus']\n", " \n", " # Calculate trip metrics with vehicle-specific variation\n", " distance_variation = random.uniform(0.7, 1.4) # Moderate variation\n", " distance = round(params['avg_distance'] * distance_variation, 2)\n", " \n", " duration_variation = random.uniform(0.8, 1.6) # Moderate variation\n", " duration = round(params['avg_duration'] * duration_variation, 2)\n", " \n", " fuel_variation = random.uniform(0.8, 1.3) # Vehicle performance variation\n", " fuel_consumed = round(base_fuel * fuel_variation, 2)\n", " \n", " load_factor_variation = random.randint(-8, 8) # Smaller variation\n", " load_factor = max(0, min(100, base_load_factor + load_factor_variation))\n", " \n", " # Select specific route\n", " route_id = random.choice(ROUTES)\n", " \n", " # Add some raw data characteristics (nulls, duplicates possible)\n", " if random.random() < 0.02: # 2% chance of missing fuel data\n", " fuel_consumed = None\n", " \n", " raw_trip_data.append({\n", " \"vehicle_id\": vehicle_id,\n", " \"trip_date\": trip_date,\n", " \"route_id\": route_id,\n", " \"distance\": distance,\n", " \"duration\": duration,\n", " \"fuel_consumed\": fuel_consumed,\n", " \"load_factor\": load_factor,\n", " \"source_system\": \"fleet_sensor\"\n", " })\n", "\n", "print(f\"Generated {len(raw_trip_data)} raw fleet trip records (simulating IoT sensor data)\")\n", "print(\"Raw data includes noise, outliers, and potential missing values as would be expected from sensors.\")\n", "print(f\"Vehicle profiles: High performers (30%), Medium performers (50%), Low performers (20%)\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Bronze Layer DataFrame Schema:\n", "root\n", " |-- distance: double (nullable = true)\n", " |-- duration: double (nullable = true)\n", " |-- fuel_consumed: double (nullable = true)\n", " |-- load_factor: long (nullable = true)\n", " |-- route_id: string (nullable = true)\n", " |-- source_system: string (nullable = true)\n", " |-- trip_date: timestamp (nullable = true)\n", " |-- vehicle_id: string (nullable = true)\n", "\n", "\n", "Sample Raw Data:\n", "+--------+--------+-------------+-----------+--------------+-------------+-------------------+----------+\n", "|distance|duration|fuel_consumed|load_factor| route_id|source_system| trip_date|vehicle_id|\n", "+--------+--------+-------------+-----------+--------------+-------------+-------------------+----------+\n", "| 52.92| 104.41| 6.96| 98|RT_NYC_MAN_001| fleet_sensor|2024-08-26 14:18:00| VH0001|\n", "| 139.76| 273.15| 15.31| 86|RT_MIA_ORL_005| fleet_sensor|2024-04-29 07:36:00| VH0001|\n", "| 56.5| 121.74| 8.77| 100|RT_MIA_ORL_005| fleet_sensor|2024-09-27 14:17:00| VH0001|\n", "| 72.19| 123.67| 12.59| 99|RT_HOU_DAL_004| fleet_sensor|2024-05-07 07:30:00| VH0001|\n", "| 591.1| 652.81| 59.73| 100|RT_LAX_SFO_002| fleet_sensor|2024-08-14 07:42:00| VH0001|\n", "+--------+--------+-------------+-----------+--------------+-------------+-------------------+----------+\n", "only showing top 5 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Successfully ingested 20077 raw records into Bronze layer\n", "Bronze layer preserves raw data with all its imperfections for auditability.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Insert raw data into Bronze layer\n", "\n", "# Create DataFrame from generated raw data\n", "df_raw_trips = spark.createDataFrame(raw_trip_data)\n", "\n", "# Display schema and sample data\n", "print(\"Bronze Layer DataFrame Schema:\")\n", "df_raw_trips.printSchema()\n", "\n", "print(\"\\nSample Raw Data:\")\n", "df_raw_trips.show(5)\n", "\n", "# Insert data into Bronze table\n", "df_raw_trips.write.mode(\"overwrite\").saveAsTable(\"transportation.bronze.fleet_trips_raw\")\n", "\n", "print(f\"\\nSuccessfully ingested {df_raw_trips.count()} raw records into Bronze layer\")\n", "print(\"Bronze layer preserves raw data with all its imperfections for auditability.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 3: Silver Layer - Data Cleaning and Enrichment\n", "\n", "### Silver Layer Design\n", "\n", "The silver layer transforms raw data into clean, enriched, business-ready format:\n", "\n", "- **Data Quality**: Validation, deduplication, outlier treatment\n", "- **Business Logic**: Enrichment with derived metrics\n", "- **Standardization**: Consistent formats and units\n", "- **Optimization**: Clustering for analytical query patterns" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create Silver Layer: Cleaned and enriched fleet trips\n", "\n", "# CLUSTER BY (vehicle_id, trip_date) for optimal analytical queries\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS transportation.silver.fleet_trips_clean (\n", " vehicle_id STRING,\n", " trip_date TIMESTAMP,\n", " route_id STRING,\n", " distance DECIMAL(8,2),\n", " duration DECIMAL(6,2),\n", " fuel_consumed DECIMAL(6,2),\n", " load_factor INT,\n", " fuel_efficiency DECIMAL(6,2), \n", " trip_speed DECIMAL(6,2), \n", " is_valid BOOLEAN, \n", " processing_timestamp TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (vehicle_id, trip_date)\n", "\"\"\")\n", "\n", "print(\"Silver layer table created successfully!\")\n", "print(\"Clustering by (vehicle_id, trip_date) optimizes for analytical queries.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Processing 20077 records from Bronze layer\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "After cleaning: 16135 valid records\n", "\n", "Data Quality Summary:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+-------------------+--------+---------------+----------+--------+\n", "|vehicle_id| trip_date|distance|fuel_efficiency|trip_speed|is_valid|\n", "+----------+-------------------+--------+---------------+----------+--------+\n", "| VH0002|2024-01-21 07:27:00| 152.8| 11.62| 51.69| true|\n", "| VH0015|2024-06-23 06:08:00| 447.69| 5.48| 58.69| true|\n", "| VH0022|2024-03-21 17:26:00| 80.66| 7.84| 35.71| true|\n", "| VH0022|2024-06-16 20:52:00| 57.04| 8.36| 22.71| true|\n", "| VH0033|2024-08-28 14:10:00| 138.74| 6.71| 36.35| true|\n", "+----------+-------------------+--------+---------------+----------+--------+\n", "only showing top 5 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Process Bronze to Silver: Clean and enrich the data\n", "from pyspark.sql import functions as F\n", "\n", "# Read from Bronze layer\n", "bronze_data = spark.table(\"transportation.bronze.fleet_trips_raw\")\n", "\n", "print(f\"Processing {bronze_data.count()} records from Bronze layer\")\n", "\n", "# Silver layer transformations\n", "silver_data = bronze_data \\\n", " .filter(\"vehicle_id IS NOT NULL\") \\\n", " .filter(\"trip_date IS NOT NULL\") \\\n", " .filter(\"distance > 0\") \\\n", " .filter(\"duration > 0\") \\\n", " .withColumn(\"fuel_consumed\", \n", " F.when(F.col(\"fuel_consumed\").isNull(), \n", " F.col(\"distance\") * 0.25).otherwise(F.col(\"fuel_consumed\"))) \\\n", " .withColumn(\"fuel_efficiency\", F.round(F.col(\"distance\") / F.col(\"fuel_consumed\"), 2)) \\\n", " .withColumn(\"trip_speed\", F.round(F.col(\"distance\") / (F.col(\"duration\") / 60), 2)) \\\n", " .withColumn(\"is_valid\", F.lit(True)) \\\n", " .filter(\"fuel_efficiency BETWEEN 5 AND 50\") \\\n", " .filter(\"trip_speed BETWEEN 10 AND 120\") \\\n", " .dropDuplicates([\"vehicle_id\", \"trip_date\"]) # Remove duplicates\n", "\n", "print(f\"After cleaning: {silver_data.count()} valid records\")\n", "\n", "# Show data quality improvements\n", "print(\"\\nData Quality Summary:\")\n", "silver_data.select(\"vehicle_id\", \"trip_date\", \"distance\", \"fuel_efficiency\", \"trip_speed\", \"is_valid\").show(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Successfully processed and stored 16135 cleaned records in Silver layer\n", "Silver layer provides validated, enriched data for downstream analytics.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Insert cleaned data into Silver layer\n", "\n", "silver_data.write.mode(\"overwrite\").saveAsTable(\"transportation.silver.fleet_trips_clean\")\n", "\n", "print(f\"Successfully processed and stored {silver_data.count()} cleaned records in Silver layer\")\n", "print(\"Silver layer provides validated, enriched data for downstream analytics.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 4: Gold Layer - Business Analytics and ML-Ready Data\n", "\n", "### Gold Layer Design\n", "\n", "The gold layer provides aggregated, business-ready data optimized for:\n", "\n", "- **Analytics**: Pre-computed aggregates and KPIs\n", "- **ML Training**: Feature engineering and model-ready datasets\n", "- **Reporting**: Business metrics and performance indicators\n", "- **Optimization**: Clustering for specific analytical workloads" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create Gold Layer: Vehicle performance analytics\n", "\n", "# CLUSTER BY vehicle_id for vehicle-centric analytics and ML\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS transportation.gold.vehicle_performance (\n", " vehicle_id STRING,\n", " total_trips INT,\n", " total_distance DECIMAL(10,2),\n", " total_duration DECIMAL(10,2),\n", " total_fuel DECIMAL(10,2),\n", " avg_fuel_efficiency DECIMAL(6,2),\n", " avg_load_factor DECIMAL(6,2),\n", " efficiency_stddev DECIMAL(6,2),\n", " routes_used INT,\n", " active_days INT,\n", " avg_daily_trips DECIMAL(6,2),\n", " maintenance_score DECIMAL(6,2),\n", " last_trip_date TIMESTAMP,\n", " created_at TIMESTAMP \n", ")\n", "USING DELTA\n", "CLUSTER BY (vehicle_id)\n", "\"\"\")\n", "\n", "print(\"Gold layer vehicle performance table created successfully!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Gold layer route analytics table created successfully!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create Gold Layer: Route performance analytics\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS transportation.gold.route_analytics (\n", " route_id STRING,\n", " total_trips INT,\n", " avg_distance DECIMAL(8,2),\n", " avg_duration DECIMAL(8,2),\n", " avg_speed DECIMAL(6,2),\n", " avg_load_factor DECIMAL(6,2),\n", " avg_fuel_efficiency DECIMAL(6,2),\n", " total_distance DECIMAL(12,2),\n", " total_fuel DECIMAL(10,2),\n", " vehicles_used INT,\n", " efficiency_rank INT,\n", " created_at TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (route_id)\n", "\"\"\")\n", "\n", "print(\"Gold layer route analytics table created successfully!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Created performance metrics for 500 vehicles\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+-----------+--------------+--------------+----------+-------------------+---------------+-----------------+-----------+-----------+---------------+-------------------+-----------------+\n", "|vehicle_id|total_trips|total_distance|total_duration|total_fuel|avg_fuel_efficiency|avg_load_factor|efficiency_stddev|routes_used|active_days|avg_daily_trips| last_trip_date|maintenance_score|\n", "+----------+-----------+--------------+--------------+----------+-------------------+---------------+-----------------+-----------+-----------+---------------+-------------------+-----------------+\n", "| VH0387| 49| 9798.69| 13893.5| 1166.11| 8.63| 95.29| 2.32| 5| 48| 1.02|2024-12-28 15:31:00| 0.0|\n", "| VH0083| 44| 7702.21| 11508.6| 937.15| 8.12| 94.93| 2.29| 5| 41| 1.07|2024-12-28 08:33:00| 0.0|\n", "| VH0351| 24| 6125.9| 8056.64| 884.95| 6.81| 87.08| 1.16| 5| 24| 1.0|2024-12-27 06:28:00| 1.0|\n", "| VH0180| 45| 9112.96| 12895.82| 1066.07| 8.39| 96.6| 2.35| 5| 41| 1.1|2024-12-24 07:04:00| 0.0|\n", "| VH0108| 54| 10470.65| 14247.71| 1161.8| 8.81| 93.7| 2.49| 5| 51| 1.06|2024-12-22 21:21:00| 0.0|\n", "+----------+-----------+--------------+--------------+----------+-------------------+---------------+-----------------+-----------+-----------+---------------+-------------------+-----------------+\n", "only showing top 5 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Process Silver to Gold: Create vehicle performance aggregates\n", "\n", "# Read from Silver layer\n", "silver_trips = spark.table(\"transportation.silver.fleet_trips_clean\")\n", "\n", "# Create vehicle performance aggregates\n", "vehicle_performance = silver_trips.groupBy(\"vehicle_id\").agg(\n", " F.count(\"*\").alias(\"total_trips\"),\n", " F.round(F.sum(\"distance\"), 2).alias(\"total_distance\"),\n", " F.round(F.sum(\"duration\"), 2).alias(\"total_duration\"),\n", " F.round(F.sum(\"fuel_consumed\"), 2).alias(\"total_fuel\"),\n", " F.round(F.avg(\"fuel_efficiency\"), 2).alias(\"avg_fuel_efficiency\"),\n", " F.round(F.avg(\"load_factor\"), 2).alias(\"avg_load_factor\"),\n", " F.round(F.stddev(\"fuel_efficiency\"), 2).alias(\"efficiency_stddev\"),\n", " F.countDistinct(\"route_id\").alias(\"routes_used\"),\n", " F.countDistinct(F.date_format(\"trip_date\", \"yyyy-MM-dd\")).alias(\"active_days\"),\n", " F.round(F.count(\"*\") / F.countDistinct(F.date_format(\"trip_date\", \"yyyy-MM-dd\")), 2).alias(\"avg_daily_trips\"),\n", " F.max(\"trip_date\").alias(\"last_trip_date\")\n", ").withColumn(\n", " \"maintenance_score\",\n", " F.when(\n", " (F.col(\"total_distance\") > 80000) | # Higher threshold\n", " (F.col(\"avg_fuel_efficiency\") < 8) | # Lower efficiency threshold\n", " (F.col(\"efficiency_stddev\") > 4), # Higher variation threshold\n", " F.lit(1.0)\n", " ).otherwise(F.lit(0.0))\n", ")\n", "\n", "print(f\"Created performance metrics for {vehicle_performance.count()} vehicles\")\n", "vehicle_performance.show(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Created analytics for 5 routes\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+--------------+-----------+------------+------------+---------+---------------+-------------------+--------------+----------+-------------+---------------+\n", "| route_id|total_trips|avg_distance|avg_duration|avg_speed|avg_load_factor|avg_fuel_efficiency|total_distance|total_fuel|vehicles_used|efficiency_rank|\n", "+--------------+-----------+------------+------------+---------+---------------+-------------------+--------------+----------+-------------+---------------+\n", "|RT_LAX_SFO_002| 3210| 200.28| 273.02| 41.79| 90.95| 7.97| 642898.98| 82176.9| 492| 1|\n", "|RT_HOU_DAL_004| 3241| 199.0| 271.76| 41.57| 90.76| 7.96| 644968.9| 82156.58| 485| 2|\n", "|RT_MIA_ORL_005| 3239| 193.26| 264.67| 41.6| 91.01| 7.9| 625966.89| 80728.67| 486| 3|\n", "|RT_CHI_DET_003| 3204| 203.44| 275.12| 41.76| 91.14| 7.88| 651824.98| 83899.3| 486| 4|\n", "|RT_NYC_MAN_001| 3241| 203.28| 277.44| 41.57| 91.23| 7.87| 658841.15| 85153.97| 482| 5|\n", "+--------------+-----------+------------+------------+---------+---------------+-------------------+--------------+----------+-------------+---------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Process Silver to Gold: Create route analytics\n", "from pyspark.sql.window import Window\n", "\n", "route_analytics = silver_trips.groupBy(\"route_id\").agg(\n", " F.count(\"*\").alias(\"total_trips\"),\n", " F.round(F.avg(\"distance\"), 2).alias(\"avg_distance\"),\n", " F.round(F.avg(\"duration\"), 2).alias(\"avg_duration\"),\n", " F.round(F.avg(\"trip_speed\"), 2).alias(\"avg_speed\"),\n", " F.round(F.avg(\"load_factor\"), 2).alias(\"avg_load_factor\"),\n", " F.round(F.avg(\"fuel_efficiency\"), 2).alias(\"avg_fuel_efficiency\"),\n", " F.round(F.sum(\"distance\"), 2).alias(\"total_distance\"),\n", " F.round(F.sum(\"fuel_consumed\"), 2).alias(\"total_fuel\"),\n", " F.countDistinct(\"vehicle_id\").alias(\"vehicles_used\")\n", ").withColumn(\n", " \"efficiency_rank\",\n", " F.row_number().over(\n", " Window.orderBy(F.desc(\"avg_fuel_efficiency\"))\n", " )\n", ")\n", "\n", "print(f\"Created analytics for {route_analytics.count()} routes\")\n", "route_analytics.orderBy(\"efficiency_rank\").show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Gold layer tables populated successfully!\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "- Vehicle performance: 500 records\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "- Route analytics: 5 records\n", "Gold layer provides optimized analytics and ML-ready data.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Insert Gold layer data\n", "\n", "vehicle_performance.write.mode(\"overwrite\").saveAsTable(\"transportation.gold.vehicle_performance\")\n", "route_analytics.write.mode(\"overwrite\").saveAsTable(\"transportation.gold.route_analytics\")\n", "\n", "print(\"Gold layer tables populated successfully!\")\n", "print(f\"- Vehicle performance: {vehicle_performance.count()} records\")\n", "print(f\"- Route analytics: {route_analytics.count()} records\")\n", "print(\"Gold layer provides optimized analytics and ML-ready data.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 5: Demonstrate Medallion Architecture Benefits\n", "\n", "### Query Performance Across Layers\n", "\n", "Let's demonstrate how each layer serves different analytical needs with optimized performance through liquid clustering." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Bronze Layer: Raw Data Investigation ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-------------+-------------+------------+------------+\n", "|source_system|total_records|missing_fuel|avg_distance|\n", "+-------------+-------------+------------+------------+\n", "| fleet_sensor| 20077| 400| 183.37|\n", "+-------------+-------------+------------+------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Bronze Layer: Raw data audit and investigation\n", "\n", "print(\"=== Bronze Layer: Raw Data Investigation ===\")\n", "bronze_audit = spark.sql(\"\"\"\n", "SELECT \n", " source_system,\n", " COUNT(*) as total_records,\n", " COUNT(CASE WHEN fuel_consumed IS NULL THEN 1 END) as missing_fuel,\n", " ROUND(AVG(distance), 2) as avg_distance\n", "FROM transportation.bronze.fleet_trips_raw\n", "GROUP BY source_system\n", "\"\"\")\n", "bronze_audit.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Silver Layer: Clean Data Analytics ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+----------------+----------------+--------------+---------+\n", "|vehicle_id|trips_this_month|monthly_distance|avg_efficiency|avg_speed|\n", "+----------+----------------+----------------+--------------+---------+\n", "| VH0022| 9| 2630.95| 8.62| 46.02|\n", "| VH0458| 7| 2361.6| 8.59| 41.74|\n", "| VH0215| 11| 2289.59| 7.95| 41.32|\n", "| VH0264| 6| 2177.42| 8.62| 53.75|\n", "| VH0214| 6| 2085.41| 8.61| 48.32|\n", "| VH0050| 5| 1928.98| 10.47| 58.37|\n", "| VH0205| 6| 1892.79| 8.07| 48.4|\n", "| VH0489| 6| 1858.77| 10.88| 40.32|\n", "| VH0471| 3| 1834.45| 12.44| 60.1|\n", "| VH0493| 5| 1830.21| 9.14| 42.22|\n", "+----------+----------------+----------------+--------------+---------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Silver Layer: Clean analytical queries\n", "\n", "print(\"=== Silver Layer: Clean Data Analytics ===\")\n", "silver_analysis = spark.sql(\"\"\"\n", "SELECT \n", " vehicle_id,\n", " COUNT(*) as trips_this_month,\n", " ROUND(SUM(distance), 2) as monthly_distance,\n", " ROUND(AVG(fuel_efficiency), 2) as avg_efficiency,\n", " ROUND(AVG(trip_speed), 2) as avg_speed\n", "FROM transportation.silver.fleet_trips_clean\n", "WHERE DATE_FORMAT(trip_date, 'yyyy-MM') = '2024-06'\n", "GROUP BY vehicle_id\n", "ORDER BY monthly_distance DESC\n", "LIMIT 10\n", "\"\"\")\n", "silver_analysis.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Gold Layer: Business KPIs ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+--------------+--------------------+--------------------+---------------------+----------------------------+\n", "|total_vehicles|fleet_avg_efficiency|total_fleet_distance|fleet_avg_load_factor|vehicles_needing_maintenance|\n", "+--------------+--------------------+--------------------+---------------------+----------------------------+\n", "| 500| 7.52| 3224501.0| 88.76| 329|\n", "+--------------+--------------------+--------------------+---------------------+----------------------------+\n", "\n", "\n", "=== Top Performing Routes ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+--------------+-----------+-------------------+---------------+\n", "| route_id|total_trips|avg_fuel_efficiency|efficiency_rank|\n", "+--------------+-----------+-------------------+---------------+\n", "|RT_LAX_SFO_002| 3210| 7.97| 1|\n", "|RT_HOU_DAL_004| 3241| 7.96| 2|\n", "|RT_MIA_ORL_005| 3239| 7.9| 3|\n", "|RT_CHI_DET_003| 3204| 7.88| 4|\n", "|RT_NYC_MAN_001| 3241| 7.87| 5|\n", "+--------------+-----------+-------------------+---------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Gold Layer: Business intelligence and KPIs\n", "\n", "print(\"=== Gold Layer: Business KPIs ===\")\n", "gold_kpis = spark.sql(\"\"\"\n", "SELECT \n", " COUNT(*) as total_vehicles,\n", " ROUND(AVG(avg_fuel_efficiency), 2) as fleet_avg_efficiency,\n", " ROUND(SUM(total_distance), 0) as total_fleet_distance,\n", " ROUND(AVG(avg_load_factor), 2) as fleet_avg_load_factor,\n", " COUNT(CASE WHEN maintenance_score = 1 THEN 1 END) as vehicles_needing_maintenance\n", "FROM transportation.gold.vehicle_performance\n", "\"\"\")\n", "gold_kpis.show()\n", "\n", "print(\"\\n=== Top Performing Routes ===\")\n", "top_routes = spark.sql(\"\"\"\n", "SELECT route_id, total_trips, avg_fuel_efficiency, efficiency_rank\n", "FROM transportation.gold.route_analytics\n", "ORDER BY efficiency_rank\n", "LIMIT 5\n", "\"\"\")\n", "top_routes.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 6: Machine Learning on Gold Layer Data\n", "\n", "### Predictive Maintenance Model\n", "\n", "Using the Gold layer's enriched vehicle performance data to train a predictive maintenance model." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Loading 500 vehicle records from Gold layer for ML training\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Training set: 426 vehicles\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Test set: 74 vehicles\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------------+-----+\n", "|maintenance_score|count|\n", "+-----------------+-----+\n", "| 0.0| 147|\n", "| 1.0| 279|\n", "+-----------------+-----+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Prepare Gold layer data for ML training\n", "\n", "from pyspark.ml.feature import VectorAssembler, StandardScaler\n", "from pyspark.ml.classification import RandomForestClassifier\n", "from pyspark.ml.evaluation import BinaryClassificationEvaluator\n", "from pyspark.ml import Pipeline\n", "import pyspark.sql.functions as F\n", "\n", "# Read vehicle performance data from Gold layer\n", "vehicle_data = spark.table(\"transportation.gold.vehicle_performance\")\n", "\n", "print(f\"Loading {vehicle_data.count()} vehicle records from Gold layer for ML training\")\n", "\n", "# Prepare features for maintenance prediction\n", "feature_cols = [\n", " \"total_trips\", \"total_distance\", \"total_duration\", \"total_fuel\",\n", " \"avg_fuel_efficiency\", \"avg_load_factor\", \"efficiency_stddev\",\n", " \"routes_used\", \"active_days\", \"avg_daily_trips\"\n", "]\n", "\n", "# Split data for training\n", "train_data, test_data = vehicle_data.randomSplit([0.8, 0.2], seed=42)\n", "\n", "print(f\"Training set: {train_data.count()} vehicles\")\n", "print(f\"Test set: {test_data.count()} vehicles\")\n", "\n", "# Show maintenance distribution\n", "train_data.groupBy(\"maintenance_score\").count().show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Training predictive maintenance model...\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Model AUC: 1.0000\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+--------------+-------------------+-----------------+----------+-----------+\n", "|vehicle_id|total_distance|avg_fuel_efficiency|maintenance_score|prediction|probability|\n", "+----------+--------------+-------------------+-----------------+----------+-----------+\n", "| VH0003| 8485.22| 8.46| 0.0| 0.0| [1.0,0.0]|\n", "| VH0007| 1812.16| 6.24| 1.0| 1.0| [0.0,1.0]|\n", "| VH0009| 10061.57| 8.19| 0.0| 0.0| [1.0,0.0]|\n", "| VH0014| 2822.83| 6.21| 1.0| 1.0| [0.0,1.0]|\n", "| VH0020| 4554.17| 5.85| 1.0| 1.0| [0.0,1.0]|\n", "| VH0024| 6996.05| 7.36| 1.0| 1.0| [0.0,1.0]|\n", "| VH0030| 11055.55| 8.75| 0.0| 0.0| [1.0,0.0]|\n", "| VH0036| 7073.37| 7.25| 1.0| 1.0|[0.04,0.96]|\n", "| VH0046| 5595.72| 7.03| 1.0| 1.0| [0.0,1.0]|\n", "| VH0047| 6312.05| 6.95| 1.0| 1.0| [0.0,1.0]|\n", "+----------+--------------+-------------------+-----------------+----------+-----------+\n", "only showing top 10 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Train predictive maintenance model\n", "\n", "# Feature engineering pipeline\n", "assembler = VectorAssembler(\n", " inputCols=feature_cols,\n", " outputCol=\"features\"\n", ")\n", "\n", "scaler = StandardScaler(inputCol=\"features\", outputCol=\"scaled_features\")\n", "\n", "# Random Forest model\n", "rf = RandomForestClassifier(\n", " labelCol=\"maintenance_score\",\n", " featuresCol=\"scaled_features\",\n", " numTrees=100,\n", " maxDepth=10\n", ")\n", "\n", "# Create and fit pipeline\n", "pipeline = Pipeline(stages=[assembler, scaler, rf])\n", "\n", "print(\"Training predictive maintenance model...\")\n", "model = pipeline.fit(train_data)\n", "\n", "# Make predictions\n", "predictions = model.transform(test_data)\n", "\n", "# Evaluate model\n", "evaluator = BinaryClassificationEvaluator(labelCol=\"maintenance_score\", metricName=\"areaUnderROC\")\n", "auc = evaluator.evaluate(predictions)\n", "\n", "print(f\"Model AUC: {auc:.4f}\")\n", "\n", "# Show predictions\n", "predictions.select(\n", " \"vehicle_id\", \"total_distance\", \"avg_fuel_efficiency\", \n", " \"maintenance_score\", \"prediction\", \"probability\"\n", ").show(10)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Feature Importance for Maintenance Prediction ===\n", "total_trips: 0.2166\n", "total_distance: 0.0010\n", "total_duration: 0.0156\n", "total_fuel: 0.0003\n", "avg_fuel_efficiency: 0.4895\n", "avg_load_factor: 0.1330\n", "efficiency_stddev: 0.0198\n", "routes_used: 0.0000\n", "active_days: 0.1237\n", "avg_daily_trips: 0.0005\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "=== Business Impact Analysis ===\n", "Total test vehicles: 74\n", "Vehicles predicted to need maintenance: 51\n", "Percentage flagged for maintenance: 68.9%\n", "\n", "Estimated cost per maintenance event: $2,500\n", "Potential annual savings from preventive maintenance: $76,500\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Model Performance:\n", "Accuracy: 0.9865\n", "Precision: 0.9804\n", "Recall: 1.0000\n", "AUC: 1.0000\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Model interpretation and business impact\n", "\n", "# Feature importance\n", "rf_model = model.stages[-1]\n", "feature_importance = rf_model.featureImportances\n", "\n", "print(\"=== Feature Importance for Maintenance Prediction ===\")\n", "for name, importance in zip(feature_cols, feature_importance):\n", " print(f\"{name}: {importance:.4f}\")\n", "\n", "# Business impact analysis\n", "maintenance_predictions = predictions.filter(\"prediction = 1\")\n", "vehicles_needing_maintenance = maintenance_predictions.count()\n", "total_test_vehicles = test_data.count()\n", "\n", "print(f\"\\n=== Business Impact Analysis ===\")\n", "print(f\"Total test vehicles: {total_test_vehicles}\")\n", "print(f\"Vehicles predicted to need maintenance: {vehicles_needing_maintenance}\")\n", "print(f\"Percentage flagged for maintenance: {(vehicles_needing_maintenance/total_test_vehicles)*100:.1f}%\")\n", "\n", "# Cost savings calculation\n", "avg_maintenance_cost = 2500\n", "preventive_maintenance_savings = 0.6\n", "potential_savings = vehicles_needing_maintenance * avg_maintenance_cost * preventive_maintenance_savings\n", "\n", "print(f\"\\nEstimated cost per maintenance event: ${avg_maintenance_cost:,}\")\n", "print(f\"Potential annual savings from preventive maintenance: ${potential_savings:,.0f}\")\n", "\n", "# Model performance metrics\n", "accuracy = predictions.filter(\"maintenance_score = prediction\").count() / predictions.count()\n", "precision = predictions.filter(\"prediction = 1 AND maintenance_score = 1\").count() / predictions.filter(\"prediction = 1\").count() if predictions.filter(\"prediction = 1\").count() > 0 else 0\n", "recall = predictions.filter(\"prediction = 1 AND maintenance_score = 1\").count() / predictions.filter(\"maintenance_score = 1\").count() if predictions.filter(\"maintenance_score = 1\").count() > 0 else 0\n", "\n", "print(f\"\\nModel Performance:\")\n", "print(f\"Accuracy: {accuracy:.4f}\")\n", "print(f\"Precision: {precision:.4f}\")\n", "print(f\"Recall: {recall:.4f}\")\n", "print(f\"AUC: {auc:.4f}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Key Takeaways: Medallion Architecture + Liquid Clustering + ML in AIDP\n", "\n", "### What We Demonstrated\n", "\n", "1. **Medallion Architecture**: Three-layer data organization (Bronze → Silver → Gold)\n", " - Bronze: Raw data ingestion with realistic vehicle performance profiles\n", " - Silver: Cleaned, enriched data with business logic\n", " - Gold: Aggregated analytics and ML-ready datasets\n", "\n", "2. **Liquid Clustering**: Automatic optimization at each layer\n", " - Bronze: Clustered by `vehicle_id` for ingestion patterns\n", " - Silver: Clustered by `(vehicle_id, trip_date)` for analytical queries\n", " - Gold: Clustered by entity keys for specific workloads\n", "\n", "3. **Progressive Data Quality**: Each layer improves data quality\n", " - Bronze preserves raw data with vehicle performance variations\n", " - Silver applies validation and enrichment\n", " - Gold provides business-ready aggregates\n", "\n", "4. **ML Integration**: End-to-end ML pipeline using Gold layer data\n", " - Feature engineering on aggregated metrics\n", " - Predictive maintenance model with meaningful feature importance\n", " - Business impact quantification\n", "\n", "### AIDP Advantages\n", "\n", "- **Unified Platform**: Seamless data flow from ingestion to ML\n", "- **Governance**: Catalog and schema isolation by layer\n", "- **Performance**: Liquid clustering optimizes each layer's access patterns\n", "- **Scalability**: Handles complex transformations at scale\n", "- **Integration**: Native ML capabilities on processed data\n", "\n", "### Business Benefits for Transportation\n", "\n", "1. **Data Quality**: Progressive improvement from raw to refined\n", "2. **Analytical Flexibility**: Different layers serve different use cases\n", "3. **Performance**: Optimized queries at each layer\n", "4. **ML Readiness**: Gold layer provides perfect training data\n", "5. **Cost Reduction**: Predictive maintenance prevents breakdowns\n", "6. **Operational Excellence**: Data-driven fleet management\n", "\n", "### Best Practices for Medallion Architecture\n", "\n", "1. **Layer Purpose**: Keep each layer focused on its role\n", "2. **Clustering Strategy**: Choose columns based on primary access patterns\n", "3. **Data Quality**: Implement validation progressively\n", "4. **Schema Evolution**: Allow flexibility in Bronze, standardize in Silver/Gold\n", "5. **Monitoring**: Track data quality and performance metrics\n", "6. **ML Integration**: Use Gold layer for model training and features\n", "\n", "### Next Steps\n", "\n", "- Implement real-time data ingestion into Bronze layer\n", "- Add more sophisticated Silver layer transformations\n", "- Create additional Gold layer aggregations and KPIs\n", "- Deploy ML models for real-time predictions\n", "- Integrate with fleet management systems\n", "- Scale to production workloads with AIDP\n", "\n", "This notebook demonstrates how Oracle AI Data Platform enables sophisticated data architecture patterns while maintaining enterprise-grade performance and governance." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" } }, "nbformat": 4, "nbformat_minor": 4 }