{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Energy Analytics: Medallion Architecture with Delta Liquid Clustering Demo\n", "\n", "## Overview\n", "\n", "This notebook demonstrates the **Medallion Architecture** combined with **Delta Liquid Clustering** in Oracle AI Data Platform (AIDP) Workbench using an energy and utilities analytics use case. \n", "\n", "### What is Medallion Architecture?\n", "\n", "The Medallion Architecture is a data design pattern used to logically organize data in a lakehouse:\n", "\n", "- **Bronze Layer**: Raw data as ingested from sources\n", "- **Silver Layer**: Cleaned, validated, and enriched data\n", "- **Gold Layer**: Business-level aggregates and ML-ready features\n", "\n", "### What is Liquid Clustering?\n", "\n", "Liquid clustering automatically optimizes data layout for query performance without requiring manual partitioning or Z-Ordering.\n", "\n", "### Use Case: Smart Grid Monitoring and Energy Consumption Analytics\n", "\n", "We'll analyze energy consumption and smart grid performance data through the medallion layers, with liquid clustering optimizing performance at each stage.\n", "\n", "### AIDP Environment Setup\n", "\n", "This notebook leverages the existing Spark session in your AIDP environment." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setup: Create Energy Catalog and Schemas\n", "\n", "### Multi-Layer Schema Design\n", "\n", "We'll create separate schemas for each medallion layer to maintain data isolation and governance:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create energy catalog with bronze, silver, and gold schemas\n", "# In AIDP, catalogs provide data isolation and governance\n", "\n", "spark.sql(\"CREATE CATALOG IF NOT EXISTS energy\")\n", "\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS energy.bronze\")\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS energy.silver\")\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS energy.gold\")\n", "\n", "print(\"Energy catalog with bronze, silver, and gold schemas created successfully!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 🥉 BRONZE LAYER: Raw Data Ingestion\n", "\n", "### Bronze Layer Principles\n", "\n", "- **Raw data storage**: Store data as-is from source systems\n", "- **Minimal processing**: No transformations or cleaning\n", "- **Append-only**: Historical record preservation\n", "- **Schema enforcement**: Basic structure validation\n", "\n", "### Table Design for Energy Readings\n", "\n", "Our `energy_readings_bronze` table stores raw meter data:\n", "\n", "- **meter_id**: Raw meter identifier from source\n", "- **reading_date**: Timestamp as received\n", "- **energy_type**: Energy type string\n", "- **consumption**: Raw consumption value\n", "- **location**: Geographic location string\n", "- **peak_demand**: Peak usage value\n", "- **efficiency_rating**: Raw efficiency score\n", "- **ingestion_timestamp**: When data was ingested\n", "\n", "### Bronze Layer Clustering Strategy\n", "\n", "Cluster by `meter_id` and `reading_date` for efficient raw data queries and incremental processing." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create Bronze layer Delta table with liquid clustering\n", "# CLUSTER BY optimizes for incremental data ingestion and querying\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS energy.bronze.energy_readings (\n", " meter_id STRING,\n", " reading_date TIMESTAMP,\n", " energy_type STRING,\n", " consumption DECIMAL(10,3),\n", " location STRING,\n", " peak_demand DECIMAL(8,2),\n", " efficiency_rating INT,\n", " ingestion_timestamp TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (meter_id, reading_date)\n", "\"\"\")\n", "\n", "print(\"Bronze layer Delta table with liquid clustering created successfully!\")\n", "print(\"Ready to ingest raw energy meter data...\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Generate and Ingest Raw Energy Data\n", "\n", "#### Data Generation Strategy\n", "\n", "We'll simulate raw data from smart meters including realistic variations:\n", "\n", "- **Data quality issues**: Missing values, outliers, inconsistent formats\n", "- **Realistic patterns**: Seasonal variations, peak usage times\n", "- **Source diversity**: Different meter types and locations\n", "\n", "#### Why Raw Data Ingestion?\n", "\n", "The bronze layer preserves the original state of data for:\n", "\n", "- **Audit trails**: Complete historical record\n", "- **Reprocessing**: Ability to re-run transformations\n", "- **Data lineage**: Track data from source to insights" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Generate realistic raw energy consumption data with quality issues\n", "# This simulates data as it would come from various meter sources\n", "\n", "import random\n", "from datetime import datetime, timedelta\n", "\n", "# Define energy data constants\n", "ENERGY_TYPES = ['Electricity', 'Natural Gas', 'Water', 'Solar']\n", "LOCATIONS = ['Residential_NYC', 'Commercial_CHI', 'Industrial_HOU', 'Residential_LAX', 'Commercial_SFO']\n", "\n", "# Base consumption parameters by energy type and location\n", "CONSUMPTION_PARAMS = {\n", " 'Electricity': {\n", " 'Residential_NYC': {'base_consumption': 15, 'peak_factor': 2.5, 'efficiency': 85},\n", " 'Commercial_CHI': {'base_consumption': 150, 'peak_factor': 3.0, 'efficiency': 78},\n", " 'Industrial_HOU': {'base_consumption': 500, 'peak_factor': 2.2, 'efficiency': 92},\n", " 'Residential_LAX': {'base_consumption': 12, 'peak_factor': 2.8, 'efficiency': 88},\n", " 'Commercial_SFO': {'base_consumption': 180, 'peak_factor': 2.7, 'efficiency': 82}\n", " },\n", " 'Natural Gas': {\n", " 'Residential_NYC': {'base_consumption': 25, 'peak_factor': 1.8, 'efficiency': 90},\n", " 'Commercial_CHI': {'base_consumption': 80, 'peak_factor': 2.1, 'efficiency': 85},\n", " 'Industrial_HOU': {'base_consumption': 200, 'peak_factor': 1.9, 'efficiency': 95},\n", " 'Residential_LAX': {'base_consumption': 20, 'peak_factor': 2.0, 'efficiency': 87},\n", " 'Commercial_SFO': {'base_consumption': 95, 'peak_factor': 2.3, 'efficiency': 83}\n", " },\n", " 'Water': {\n", " 'Residential_NYC': {'base_consumption': 180, 'peak_factor': 1.5, 'efficiency': 88},\n", " 'Commercial_CHI': {'base_consumption': 450, 'peak_factor': 1.7, 'efficiency': 82},\n", " 'Industrial_HOU': {'base_consumption': 1200, 'peak_factor': 1.6, 'efficiency': 91},\n", " 'Residential_LAX': {'base_consumption': 160, 'peak_factor': 1.8, 'efficiency': 85},\n", " 'Commercial_SFO': {'base_consumption': 380, 'peak_factor': 1.9, 'efficiency': 79}\n", " },\n", " 'Solar': {\n", " 'Residential_NYC': {'base_consumption': -8, 'peak_factor': 3.5, 'efficiency': 78},\n", " 'Commercial_CHI': {'base_consumption': -75, 'peak_factor': 4.0, 'efficiency': 85},\n", " 'Industrial_HOU': {'base_consumption': -250, 'peak_factor': 3.8, 'efficiency': 88},\n", " 'Residential_LAX': {'base_consumption': -12, 'peak_factor': 4.2, 'efficiency': 82},\n", " 'Commercial_SFO': {'base_consumption': -95, 'peak_factor': 3.9, 'efficiency': 86}\n", " }\n", "}\n", "\n", "# Generate raw energy reading records with data quality variations\n", "bronze_data = []\n", "base_date = datetime(2024, 1, 1)\n", "\n", "# Create 2,000 meters with hourly readings for 3 months\n", "for meter_num in range(1, 2001):\n", " meter_id = f\"MTR{meter_num:06d}\"\n", " \n", " # Each meter gets readings for 90 days (hourly)\n", " for day in range(90):\n", " for hour in range(24):\n", " reading_date = base_date + timedelta(days=day, hours=hour)\n", " ingestion_timestamp = reading_date + timedelta(minutes=random.randint(0, 30)) # Simulated ingestion delay\n", " \n", " # Select energy type and location for this meter\n", " energy_type = random.choice(ENERGY_TYPES)\n", " location = random.choice(LOCATIONS)\n", " \n", " params = CONSUMPTION_PARAMS[energy_type][location]\n", " \n", " # Calculate consumption with variations\n", " month = reading_date.month\n", " if energy_type in ['Electricity', 'Natural Gas']:\n", " if month in [12, 1, 2]: # Winter\n", " seasonal_factor = 1.4\n", " elif month in [6, 7, 8]: # Summer\n", " seasonal_factor = 1.3\n", " else:\n", " seasonal_factor = 1.0\n", " else:\n", " seasonal_factor = 1.0\n", " \n", " hour_factor = 1.0\n", " if hour in [6, 7, 8, 17, 18, 19]: # Peak hours\n", " hour_factor = params['peak_factor']\n", " elif hour in [2, 3, 4, 5]: # Off-peak\n", " hour_factor = 0.4\n", " \n", " consumption_variation = random.uniform(0.8, 1.2)\n", " consumption = round(params['base_consumption'] * seasonal_factor * hour_factor * consumption_variation, 3)\n", " \n", " # Add data quality issues (simulating real-world raw data)\n", " if random.random() < 0.02: # 2% missing consumption\n", " consumption = None\n", " elif random.random() < 0.05: # 5% outliers\n", " consumption = consumption * random.uniform(5, 10)\n", " \n", " peak_demand = round(abs(consumption if consumption else params['base_consumption']) * random.uniform(1.1, 1.5), 2) if consumption else None\n", " \n", " efficiency_variation = random.randint(-5, 3)\n", " efficiency_rating = max(0, min(100, params['efficiency'] + efficiency_variation))\n", " \n", " # Occasional missing efficiency ratings\n", " if random.random() < 0.03:\n", " efficiency_rating = None\n", " \n", " bronze_data.append({\n", " \"meter_id\": meter_id,\n", " \"reading_date\": reading_date,\n", " \"energy_type\": energy_type,\n", " \"consumption\": consumption,\n", " \"location\": location,\n", " \"peak_demand\": peak_demand,\n", " \"efficiency_rating\": efficiency_rating,\n", " \"ingestion_timestamp\": ingestion_timestamp\n", " })\n", "\n", "print(f\"Generated {len(bronze_data)} raw energy reading records for bronze layer\")\n", "print(\"Sample raw record:\", bronze_data[0])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Insert raw data into Bronze layer using PySpark\n", "# Bronze layer preserves data as-is from sources\n", "from pyspark.sql.functions import col\n", "from pyspark.sql.types import DecimalType, IntegerType\n", "\n", "# Create DataFrame from generated raw data\n", "df_bronze = spark.createDataFrame(bronze_data)\n", "\n", "# Display schema and sample data\n", "print(\"Bronze Layer DataFrame Schema:\")\n", "df_bronze.printSchema()\n", "\n", "print(\"\\nSample Raw Data (including quality issues):\")\n", "df_bronze.show(5)\n", "\n", "# Insert data into Bronze layer Delta table\n", "df_bronze.write.mode(\"overwrite\").saveAsTable(\"energy.bronze.energy_readings\")\n", "\n", "print(f\"\\nSuccessfully ingested {df_bronze.count()} raw records into bronze layer\")\n", "print(\"Data includes missing values, outliers, and formatting variations as would be found in raw sources\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 🥈 SILVER LAYER: Data Cleaning and Enrichment\n", "\n", "### Silver Layer Principles\n", "\n", "- **Data quality**: Clean, validate, and standardize data\n", "- **Business rules**: Apply transformations and enrichments\n", "- **Deduplication**: Remove duplicates and handle conflicts\n", "- **Schema evolution**: Consistent data structure\n", "\n", "### Silver Layer Transformations\n", "\n", "From our bronze `energy_readings` table, we'll create `energy_readings_silver` with:\n", "\n", "- **Data validation**: Remove invalid records, handle missing values\n", "- **Standardization**: Consistent formats and units\n", "- **Enrichment**: Add derived fields like consumption categories\n", "- **Quality metrics**: Data quality scores and validation flags\n", "\n", "### Silver Layer Clustering Strategy\n", "\n", "Maintain clustering by `meter_id` and `reading_date` for efficient processing and querying of cleaned data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create Silver layer Delta table with liquid clustering\n", "# Silver layer applies data quality rules and business transformations\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS energy.silver.energy_readings (\n", " meter_id STRING,\n", " reading_date TIMESTAMP,\n", " energy_type STRING,\n", " consumption DECIMAL(10,3),\n", " location STRING,\n", " peak_demand DECIMAL(8,2),\n", " efficiency_rating INT,\n", " ingestion_timestamp TIMESTAMP,\n", " -- Derived fields\n", " consumption_category STRING,\n", " is_peak_hour BOOLEAN,\n", " seasonal_factor DECIMAL(3,2),\n", " data_quality_score INT,\n", " processing_timestamp TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (meter_id, reading_date)\n", "\"\"\")\n", "\n", "print(\"Silver layer Delta table with liquid clustering created successfully!\")\n", "print(\"Ready to transform and clean bronze layer data...\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Transform bronze data to silver layer with cleaning and enrichment\n", "# This demonstrates data quality improvements and business logic application\n", "\n", "from pyspark.sql.functions import col, when, hour, month, current_timestamp, lit\n", "\n", "# Read bronze data\n", "df_bronze = spark.table(\"energy.bronze.energy_readings\")\n", "\n", "# Apply data cleaning and enrichment transformations\n", "df_silver = df_bronze \\\n", " .filter(col(\"reading_date\").isNotNull()) \\\n", " .filter(col(\"meter_id\").isNotNull()) \\\n", " .withColumn(\"consumption\", \n", " when(col(\"consumption\").isNull(), 0)\n", " .when(col(\"consumption\") > 10000, 0) # Remove extreme outliers\n", " .otherwise(col(\"consumption\"))) \\\n", " .withColumn(\"efficiency_rating\",\n", " when(col(\"efficiency_rating\").isNull(), 75) # Default efficiency\n", " .when(col(\"efficiency_rating\") < 0, 0)\n", " .when(col(\"efficiency_rating\") > 100, 100)\n", " .otherwise(col(\"efficiency_rating\"))) \\\n", " .withColumn(\"peak_demand\",\n", " when(col(\"peak_demand\").isNull(), col(\"consumption\") * 1.2)\n", " .otherwise(col(\"peak_demand\"))) \\\n", " .withColumn(\"consumption_category\",\n", " when(col(\"consumption\") <= 0, \"Generation\")\n", " .when(col(\"consumption\") < 50, \"Low\")\n", " .when(col(\"consumption\") < 200, \"Medium\")\n", " .otherwise(\"High\")) \\\n", " .withColumn(\"is_peak_hour\",\n", " when(hour(col(\"reading_date\")).isin([6,7,8,17,18,19]), True)\n", " .otherwise(False)) \\\n", " .withColumn(\"seasonal_factor\",\n", " when(month(col(\"reading_date\")).isin([12,1,2]), 1.4)\n", " .when(month(col(\"reading_date\")).isin([6,7,8]), 1.3)\n", " .otherwise(1.0)) \\\n", " .withColumn(\"data_quality_score\",\n", " when(col(\"consumption\").isNotNull() & col(\"efficiency_rating\").isNotNull(), 100)\n", " .when(col(\"consumption\").isNotNull(), 80)\n", " .otherwise(60)) \\\n", " .withColumn(\"processing_timestamp\", current_timestamp())\n", "\n", "print(\"Silver layer transformations applied:\")\n", "print(\"- Null value handling\")\n", "print(\"- Outlier removal\")\n", "print(\"- Data standardization\")\n", "print(\"- Business rule enrichment\")\n", "print(\"- Quality scoring\")\n", "\n", "print(f\"\\nBronze records: {df_bronze.count()}\")\n", "print(f\"Silver records after cleaning: {df_silver.count()}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Insert cleaned and enriched data into Silver layer\n", "\n", "df_silver.write.mode(\"overwrite\").saveAsTable(\"energy.silver.energy_readings\")\n", "\n", "print(\"Successfully transformed and loaded data into silver layer\")\n", "print(\"\\nSample silver layer data:\")\n", "spark.table(\"energy.silver.energy_readings\").select(\n", " \"meter_id\", \"reading_date\", \"consumption\", \"consumption_category\", \n", " \"is_peak_hour\", \"data_quality_score\"\n", ").show(10)\n", "\n", "# Data quality comparison\n", "bronze_nulls = df_bronze.filter(col(\"consumption\").isNull() | col(\"efficiency_rating\").isNull()).count()\n", "silver_nulls = df_silver.filter(col(\"consumption\").isNull() | col(\"efficiency_rating\").isNull()).count()\n", "\n", "print(f\"\\nData Quality Improvement:\")\n", "print(f\"Bronze layer null values: {bronze_nulls}\")\n", "print(f\"Silver layer null values: {silver_nulls}\")\n", "print(f\"Quality improvement: {((bronze_nulls - silver_nulls) / bronze_nulls * 100):.1f}%\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 🥇 GOLD LAYER: Business Analytics and ML Features\n", "\n", "### Gold Layer Principles\n", "\n", "- **Business focus**: Aggregated metrics and KPIs\n", "- **Performance**: Optimized for reporting and analytics\n", "- **ML-ready**: Feature engineering for machine learning\n", "- **Governance**: Curated datasets for enterprise use\n", "\n", "### Gold Layer Tables\n", "\n", "From our silver `energy_readings` table, we'll create:\n", "\n", "1. **`meter_analytics_gold`**: Aggregated meter-level analytics\n", "2. **`energy_forecasting_features_gold`**: ML-ready features for demand forecasting\n", "\n", "### Gold Layer Clustering Strategy\n", "\n", "Different clustering strategies optimized for specific use cases:\n", "- Meter analytics: Cluster by `meter_id` and `month`\n", "- Forecasting features: Cluster by `meter_id` and `reading_date`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create Gold layer tables for business analytics and ML\n", "\n", "# Meter analytics table - business KPIs\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS energy.gold.meter_analytics (\n", " meter_id STRING,\n", " month STRING,\n", " energy_type STRING,\n", " location STRING,\n", " total_consumption DECIMAL(12,3),\n", " avg_consumption DECIMAL(10,3),\n", " max_peak_demand DECIMAL(8,2),\n", " avg_efficiency DECIMAL(5,2),\n", " reading_count INT,\n", " peak_hours_count INT,\n", " data_quality_avg DECIMAL(5,2),\n", " last_reading_date TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (meter_id, month)\n", "\"\"\")\n", "\n", "# ML features table for energy demand forecasting\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS energy.gold.energy_forecasting_features (\n", " meter_id STRING,\n", " reading_date TIMESTAMP,\n", " energy_type STRING,\n", " location STRING,\n", " consumption DECIMAL(10,3),\n", " -- Temporal features\n", " hour_of_day INT,\n", " day_of_week INT,\n", " month_of_year INT,\n", " is_weekend BOOLEAN,\n", " -- Lagged features\n", " prev_hour_consumption DECIMAL(10,3),\n", " prev_day_consumption DECIMAL(10,3),\n", " -- Statistical features\n", " rolling_24h_avg DECIMAL(10,3),\n", " rolling_7d_avg DECIMAL(10,3),\n", " -- Other features\n", " peak_demand DECIMAL(8,2),\n", " efficiency_rating INT,\n", " seasonal_factor DECIMAL(3,2),\n", " consumption_category STRING\n", ")\n", "USING DELTA\n", "CLUSTER BY (meter_id, reading_date)\n", "\"\"\")\n", "\n", "print(\"Gold layer tables created successfully!\")\n", "print(\"- meter_analytics: Business KPIs and aggregations\")\n", "print(\"- energy_forecasting_features: ML-ready feature set\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Populate meter analytics gold table\n", "# Business-focused aggregations for reporting and dashboards\n", "\n", "from pyspark.sql.functions import date_format, count, sum, avg, max, last\n", "\n", "meter_analytics = spark.table(\"energy.silver.energy_readings\") \\\n", " .groupBy(\"meter_id\", date_format(\"reading_date\", \"yyyy-MM\").alias(\"month\")) \\\n", " .agg(\n", " last(\"energy_type\").alias(\"energy_type\"),\n", " last(\"location\").alias(\"location\"),\n", " sum(\"consumption\").alias(\"total_consumption\"),\n", " avg(\"consumption\").alias(\"avg_consumption\"),\n", " max(\"peak_demand\").alias(\"max_peak_demand\"),\n", " avg(\"efficiency_rating\").alias(\"avg_efficiency\"),\n", " count(\"*\").alias(\"reading_count\"),\n", " sum(when(col(\"is_peak_hour\"), 1).otherwise(0)).alias(\"peak_hours_count\"),\n", " avg(\"data_quality_score\").alias(\"data_quality_avg\"),\n", " max(\"reading_date\").alias(\"last_reading_date\")\n", " )\n", "\n", "meter_analytics.write.mode(\"overwrite\").saveAsTable(\"energy.gold.meter_analytics\")\n", "\n", "print(\"Meter analytics gold table populated\")\n", "print(\"\\nSample meter analytics:\")\n", "spark.table(\"energy.gold.meter_analytics\").select(\n", " \"meter_id\", \"month\", \"total_consumption\", \"avg_consumption\", \n", " \"max_peak_demand\", \"avg_efficiency\"\n", ").show(10)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create ML-ready features for energy demand forecasting\n", "# Advanced feature engineering for predictive analytics\n", "\n", "from pyspark.sql.functions import lag, window, dayofweek, date_add, when\n", "from pyspark.sql.window import Window\n", "\n", "df_silver = spark.table(\"energy.silver.energy_readings\")\n", "\n", "# Create time-based features\n", "df_features = df_silver \\\n", " .withColumn(\"hour_of_day\", hour(\"reading_date\")) \\\n", " .withColumn(\"day_of_week\", dayofweek(\"reading_date\")) \\\n", " .withColumn(\"month_of_year\", month(\"reading_date\")) \\\n", " .withColumn(\"is_weekend\", when(col(\"day_of_week\").isin([1,7]), True).otherwise(False))\n", "\n", "# Add lagged features\n", "window_spec_1h = Window.partitionBy(\"meter_id\").orderBy(\"reading_date\")\n", "window_spec_24h = Window.partitionBy(\"meter_id\").orderBy(\"reading_date\").rowsBetween(-23, 0)\n", "window_spec_7d = Window.partitionBy(\"meter_id\").orderBy(\"reading_date\").rowsBetween(-167, 0)\n", "\n", "df_features = df_features \\\n", " .withColumn(\"prev_hour_consumption\", lag(\"consumption\", 1).over(window_spec_1h)) \\\n", " .withColumn(\"prev_day_consumption\", lag(\"consumption\", 24).over(window_spec_1h)) \\\n", " .withColumn(\"rolling_24h_avg\", avg(\"consumption\").over(window_spec_24h)) \\\n", " .withColumn(\"rolling_7d_avg\", avg(\"consumption\").over(window_spec_7d))\n", "\n", "# Select final feature set\n", "df_gold_features = df_features.select(\n", " \"meter_id\", \"reading_date\", \"energy_type\", \"location\", \"consumption\",\n", " \"hour_of_day\", \"day_of_week\", \"month_of_year\", \"is_weekend\",\n", " \"prev_hour_consumption\", \"prev_day_consumption\",\n", " \"rolling_24h_avg\", \"rolling_7d_avg\",\n", " \"peak_demand\", \"efficiency_rating\", \"seasonal_factor\", \"consumption_category\"\n", ")\n", "\n", "df_gold_features.write.mode(\"overwrite\").saveAsTable(\"energy.gold.energy_forecasting_features\")\n", "\n", "print(\"Energy forecasting features gold table populated\")\n", "print(\"\\nSample ML features:\")\n", "spark.table(\"energy.gold.energy_forecasting_features\").select(\n", " \"meter_id\", \"reading_date\", \"consumption\", \"prev_hour_consumption\", \n", " \"rolling_24h_avg\", \"hour_of_day\", \"is_weekend\"\n", ").show(10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Liquid Clustering Performance Demonstration\n", "\n", "### Query Performance Across Medallion Layers\n", "\n", "Now let's demonstrate how liquid clustering optimizes queries at each layer:\n", "\n", "1. **Bronze**: Raw data scanning and filtering\n", "2. **Silver**: Cleaned data analytics\n", "3. **Gold**: Business intelligence and ML feature extraction\n", "\n", "### Performance Benefits\n", "\n", "Liquid clustering provides:\n", "- **Automatic optimization**: No manual tuning required\n", "- **Query acceleration**: Faster aggregations and joins\n", "- **Storage efficiency**: Better compression and layout\n", "- **Adaptive performance**: Adjusts as data patterns change" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Demonstrate liquid clustering benefits across medallion layers\n", "\n", "print(\"=== Liquid Clustering Performance Demonstration ===\\n\")\n", "\n", "# Bronze layer: Raw data exploration\n", "print(\"🥉 BRONZE LAYER - Raw Data Queries\")\n", "bronze_query = spark.sql(\"\"\"\n", "SELECT meter_id, reading_date, energy_type, consumption, peak_demand\n", "FROM energy.bronze.energy_readings\n", "WHERE meter_id = 'MTR000001'\n", "ORDER BY reading_date DESC\n", "LIMIT 10\n", "\"\"\")\n", "bronze_query.show()\n", "print(f\"Bronze query returned {bronze_query.count()} records\\n\")\n", "\n", "# Silver layer: Cleaned data analytics\n", "print(\"🥈 SILVER LAYER - Enhanced Analytics\")\n", "silver_query = spark.sql(\"\"\"\n", "SELECT meter_id, reading_date, consumption, consumption_category, \n", " is_peak_hour, data_quality_score\n", "FROM energy.silver.energy_readings\n", "WHERE meter_id = 'MTR000001' AND consumption_category = 'High'\n", "ORDER BY reading_date DESC\n", "LIMIT 10\n", "\"\"\")\n", "silver_query.show()\n", "print(f\"Silver query returned {silver_query.count()} records\\n\")\n", "\n", "# Gold layer: Business intelligence\n", "print(\"🥇 GOLD LAYER - Business Analytics\")\n", "gold_analytics = spark.sql(\"\"\"\n", "SELECT meter_id, month, total_consumption, avg_consumption, \n", " max_peak_demand, avg_efficiency\n", "FROM energy.gold.meter_analytics\n", "WHERE meter_id LIKE 'MTR000%'\n", "ORDER BY meter_id, month\n", "LIMIT 15\n", "\"\"\")\n", "gold_analytics.show()\n", "print(f\"Gold analytics returned {gold_analytics.count()} records\\n\")\n", "\n", "# Gold layer: ML feature extraction\n", "print(\"🥇 GOLD LAYER - ML Feature Extraction\")\n", "gold_ml = spark.sql(\"\"\"\n", "SELECT meter_id, reading_date, consumption, prev_hour_consumption,\n", " rolling_24h_avg, hour_of_day, is_weekend\n", "FROM energy.gold.energy_forecasting_features\n", "WHERE meter_id = 'MTR000001'\n", "ORDER BY reading_date DESC\n", "LIMIT 10\n", "\"\"\")\n", "gold_ml.show()\n", "print(f\"ML features query returned {gold_ml.count()} records\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Energy Demand Forecasting Model (Gold Layer)\n", "\n", "### Business Value of Predictive Analytics\n", "\n", "Energy demand forecasting enables utilities to:\n", "\n", "- **Optimize grid operations**: Predict and prevent peak demand issues\n", "- **Improve pricing strategies**: Dynamic pricing based on predicted demand\n", "- **Enable demand response**: Encourage conservation during peak times\n", "- **Reduce infrastructure costs**: Better capacity planning\n", "\n", "### ML Pipeline Using Gold Layer Features\n", "\n", "We'll train a Random Forest model using our gold layer forecasting features to predict hourly energy consumption." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Energy Demand Forecasting Model ===\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "ML dataset: 3504000 records after null removal\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Training set: 2832000 records\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Testing set: 672000 records\n", "\n", "Training Random Forest model...\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Model training completed!\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Generated predictions for 672000 test records\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Model Performance:\n", "RMSE: 265.71\n", "R² Score: 0.8226\n", "\n", "Sample Predictions:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+---------+-------------------+-----------+-------------------+-----------+\n", "| meter_id| reading_date|consumption| prediction|energy_type|\n", "+---------+-------------------+-----------+-------------------+-----------+\n", "|MTR000004|2024-03-01 00:00:00| -204.261|-193.54698996455508| Solar|\n", "|MTR000004|2024-03-01 01:00:00| 147.416| 128.95653049569464|Electricity|\n", "|MTR000004|2024-03-01 02:00:00| 68.205| 87.40272657988953| Water|\n", "|MTR000004|2024-03-01 03:00:00| 121.664| 115.12839032370157| Water|\n", "|MTR000004|2024-03-01 04:00:00| 555.751| 654.6475498955411| Water|\n", "|MTR000004|2024-03-01 05:00:00| 34.831| 31.40105961988671|Natural Gas|\n", "|MTR000004|2024-03-01 06:00:00| 825.667| 628.9479734113935| Water|\n", "|MTR000004|2024-03-01 07:00:00| 656.253| 603.6565043644263| Water|\n", "|MTR000004|2024-03-01 08:00:00| -309.556|-221.52657571314506| Solar|\n", "|MTR000004|2024-03-01 09:00:00| -8.386| -32.25039479438769| Solar|\n", "+---------+-------------------+-----------+-------------------+-----------+\n", "only showing top 10 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Train energy demand forecasting model using gold layer features\n", "# Demonstrates end-to-end ML pipeline with medallion architecture\n", "\n", "from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler\n", "from pyspark.ml.regression import RandomForestRegressor\n", "from pyspark.ml.evaluation import RegressionEvaluator\n", "from pyspark.ml import Pipeline\n", "from pyspark.sql.functions import col\n", "\n", "print(\"=== Energy Demand Forecasting Model ===\\n\")\n", "\n", "# Load gold layer ML features and filter out null values for training\n", "df_ml = spark.table(\"energy.gold.energy_forecasting_features\") \\\n", " .filter(col(\"prev_hour_consumption\").isNotNull()) \\\n", " .filter(col(\"reading_date\") < \"2024-03-15\") \\\n", " .na.drop() # Drop any remaining null values\n", "\n", "print(f\"ML dataset: {df_ml.count()} records after null removal\")\n", "\n", "# Split data for training and testing\n", "train_data = df_ml.filter(\"reading_date < '2024-03-01'\")\n", "test_data = df_ml.filter(\"reading_date >= '2024-03-01'\")\n", "\n", "print(f\"Training set: {train_data.count()} records\")\n", "print(f\"Testing set: {test_data.count()} records\")\n", "\n", "# Prepare features for ML\n", "feature_cols = [\n", " \"hour_of_day\", \"day_of_week\", \"month_of_year\", \"is_weekend\",\n", " \"prev_hour_consumption\", \"prev_day_consumption\", \n", " \"rolling_24h_avg\", \"rolling_7d_avg\",\n", " \"peak_demand\", \"efficiency_rating\", \"seasonal_factor\",\n", " \"energy_type_index\", \"location_index\", \"consumption_category_index\"\n", "]\n", "\n", "# Encode categorical variables\n", "energy_type_indexer = StringIndexer(inputCol=\"energy_type\", outputCol=\"energy_type_index\")\n", "location_indexer = StringIndexer(inputCol=\"location\", outputCol=\"location_index\")\n", "category_indexer = StringIndexer(inputCol=\"consumption_category\", outputCol=\"consumption_category_index\")\n", "\n", "# Create pipeline with handleInvalid=\"skip\" to handle any remaining nulls\n", "assembler = VectorAssembler(inputCols=feature_cols, outputCol=\"features\", handleInvalid=\"skip\")\n", "rf = RandomForestRegressor(featuresCol=\"features\", labelCol=\"consumption\", numTrees=50, seed=42)\n", "\n", "pipeline = Pipeline(stages=[energy_type_indexer, location_indexer, category_indexer, assembler, rf])\n", "\n", "# Train model\n", "print(\"\\nTraining Random Forest model...\")\n", "model = pipeline.fit(train_data)\n", "print(\"Model training completed!\")\n", "\n", "# Make predictions\n", "predictions = model.transform(test_data)\n", "print(f\"\\nGenerated predictions for {predictions.count()} test records\")\n", "\n", "# Evaluate model\n", "evaluator_rmse = RegressionEvaluator(labelCol=\"consumption\", predictionCol=\"prediction\", metricName=\"rmse\")\n", "evaluator_r2 = RegressionEvaluator(labelCol=\"consumption\", predictionCol=\"prediction\", metricName=\"r2\")\n", "\n", "rmse = evaluator_rmse.evaluate(predictions)\n", "r2 = evaluator_r2.evaluate(predictions)\n", "\n", "print(f\"\\nModel Performance:\")\n", "print(f\"RMSE: {rmse:.2f}\")\n", "print(f\"R² Score: {r2:.4f}\")\n", "\n", "# Show sample predictions\n", "print(\"\\nSample Predictions:\")\n", "predictions.select(\"meter_id\", \"reading_date\", \"consumption\", \"prediction\", \"energy_type\").show(10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Key Takeaways: Medallion Architecture with Liquid Clustering\n", "\n", "### Architecture Benefits Demonstrated\n", "\n", "1. **Bronze Layer**: Raw data ingestion with data quality issues preserved\n", "2. **Silver Layer**: Data cleaning, validation, and business rule application\n", "3. **Gold Layer**: Business analytics and ML-ready feature engineering\n", "\n", "### Liquid Clustering Advantages\n", "\n", "- **Automatic optimization**: No manual partitioning or Z-Ordering required\n", "- **Query performance**: Fast aggregations across all medallion layers\n", "- **Storage efficiency**: Optimized data layout for each layer's access patterns\n", "- **Scalability**: Handles large-scale energy datasets efficiently\n", "\n", "### Business Value Delivered\n", "\n", "- **Data governance**: Clear data lineage from raw ingestion to business insights\n", "- **Analytics acceleration**: Fast queries for real-time dashboards and reporting\n", "- **ML readiness**: Feature engineering optimized for predictive modeling\n", "- **Operational efficiency**: Automated data quality and transformation pipelines\n", "\n", "### AIDP Platform Advantages\n", "\n", "- **Unified analytics**: Seamless data engineering and ML workflows\n", "- **Performance optimization**: Delta tables with liquid clustering\n", "- **Enterprise governance**: Multi-layer data organization\n", "- **Scalable processing**: Distributed computing for large energy datasets\n", "\n", "This notebook demonstrates how Oracle AI Data Platform combines medallion architecture principles with advanced Delta Lake features to deliver a complete data analytics solution for energy and utilities use cases." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" } }, "nbformat": 4, "nbformat_minor": 4 }