{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Energy: Delta Liquid Clustering Demo\n", "\n", "\n", "## Overview\n", "\n", "\n", "This notebook demonstrates the power of **Delta Liquid Clustering** in Oracle AI Data Platform (AIDP) Workbench using an energy and utilities analytics use case. Liquid clustering automatically optimizes data layout for query performance without requiring manual partitioning or Z-Ordering.\n", "\n", "### What is Liquid Clustering?\n", "\n", "Liquid clustering automatically identifies and groups similar data together based on clustering columns you define. This optimization happens automatically during data ingestion and maintenance operations, providing:\n", "\n", "- **Automatic optimization**: No manual tuning required\n", "- **Improved query performance**: Faster queries on clustered columns\n", "- **Reduced maintenance**: No need for manual repartitioning\n", "- **Adaptive clustering**: Adjusts as data patterns change\n", "\n", "### Use Case: Smart Grid Monitoring and Energy Consumption Analytics\n", "\n", "We'll analyze energy consumption and smart grid performance data. Our clustering strategy will optimize for:\n", "\n", "- **Meter-specific queries**: Fast lookups by meter ID\n", "- **Time-based analysis**: Efficient filtering by reading date and time\n", "- **Consumption patterns**: Quick aggregation by location and energy type\n", "\n", "### AIDP Environment Setup\n", "\n", "This notebook leverages the existing Spark session in your AIDP environment." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Energy catalog and analytics schema created successfully!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create energy catalog and analytics schema\n", "\n", "# In AIDP, catalogs provide data isolation and governance\n", "\n", "spark.sql(\"CREATE CATALOG IF NOT EXISTS energy\")\n", "\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS energy.analytics\")\n", "\n", "print(\"Energy catalog and analytics schema created successfully!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 2: Create Delta Table with Liquid Clustering\n", "\n", "### Table Design\n", "\n", "Our `energy_readings` table will store:\n", "\n", "- **meter_id**: Unique smart meter identifier\n", "- **reading_date**: Date and time of meter reading\n", "- **energy_type**: Type (Electricity, Gas, Water, Solar)\n", "- **consumption**: Energy consumed (kWh, therms, gallons)\n", "- **location**: Geographic location/region\n", "- **peak_demand**: Peak usage during interval\n", "- **efficiency_rating**: System efficiency (0-100)\n", "\n", "### Clustering Strategy\n", "\n", "We'll cluster by `meter_id` and `reading_date` because:\n", "\n", "- **meter_id**: Meters generate regular readings, grouping consumption history together\n", "- **reading_date**: Time-based queries are critical for billing cycles, demand analysis, and seasonal patterns\n", "- This combination optimizes for both meter monitoring and temporal energy consumption analysis" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Delta table with liquid clustering created successfully!\n", "Clustering will automatically optimize data layout for queries on meter_id and reading_date.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create Delta table with liquid clustering\n", "\n", "# CLUSTER BY defines the columns for automatic optimization\n", "\n", "spark.sql(\"\"\"\n", "\n", "CREATE TABLE IF NOT EXISTS energy.analytics.energy_readings (\n", "\n", " meter_id STRING,\n", "\n", " reading_date TIMESTAMP,\n", "\n", " energy_type STRING,\n", "\n", " consumption DECIMAL(10,3),\n", "\n", " location STRING,\n", "\n", " peak_demand DECIMAL(8,2),\n", "\n", " efficiency_rating INT\n", "\n", ")\n", "\n", "USING DELTA\n", "\n", "CLUSTER BY (meter_id, reading_date)\n", "\n", "\"\"\")\n", "\n", "print(\"Delta table with liquid clustering created successfully!\")\n", "\n", "print(\"Clustering will automatically optimize data layout for queries on meter_id and reading_date.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 3: Generate Energy Sample Data\n", "\n", "### Data Generation Strategy\n", "\n", "We'll create realistic energy consumption data including:\n", "\n", "- **2,000 smart meters** with hourly readings over time\n", "- **Energy types**: Electricity, Natural Gas, Water, Solar generation\n", "- **Realistic consumption patterns**: Seasonal variations, peak usage times, efficiency differences\n", "- **Geographic diversity**: Different locations with varying consumption profiles\n", "\n", "### Why This Data Pattern?\n", "\n", "This data simulates real energy scenarios where:\n", "\n", "- Consumption varies by time of day and season\n", "- Peak demand impacts grid stability\n", "- Efficiency ratings affect sustainability goals\n", "- Geographic patterns drive infrastructure planning\n", "- Real-time monitoring enables demand response programs" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Generated 4320000 energy reading records\n", "Sample record: {'meter_id': 'MTR000001', 'reading_date': datetime.datetime(2024, 1, 1, 0, 0), 'energy_type': 'Solar', 'consumption': -248.194, 'location': 'Industrial_HOU', 'peak_demand': 309.65, 'efficiency_rating': 85}\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Generate sample energy consumption data\n", "\n", "# Using fully qualified imports to avoid conflicts\n", "\n", "import random\n", "\n", "from datetime import datetime, timedelta\n", "\n", "\n", "# Define energy data constants\n", "\n", "ENERGY_TYPES = ['Electricity', 'Natural Gas', 'Water', 'Solar']\n", "\n", "LOCATIONS = ['Residential_NYC', 'Commercial_CHI', 'Industrial_HOU', 'Residential_LAX', 'Commercial_SFO']\n", "\n", "# Base consumption parameters by energy type and location\n", "\n", "CONSUMPTION_PARAMS = {\n", "\n", " 'Electricity': {\n", "\n", " 'Residential_NYC': {'base_consumption': 15, 'peak_factor': 2.5, 'efficiency': 85},\n", "\n", " 'Commercial_CHI': {'base_consumption': 150, 'peak_factor': 3.0, 'efficiency': 78},\n", "\n", " 'Industrial_HOU': {'base_consumption': 500, 'peak_factor': 2.2, 'efficiency': 92},\n", "\n", " 'Residential_LAX': {'base_consumption': 12, 'peak_factor': 2.8, 'efficiency': 88},\n", "\n", " 'Commercial_SFO': {'base_consumption': 180, 'peak_factor': 2.7, 'efficiency': 82}\n", "\n", " },\n", "\n", " 'Natural Gas': {\n", "\n", " 'Residential_NYC': {'base_consumption': 25, 'peak_factor': 1.8, 'efficiency': 90},\n", "\n", " 'Commercial_CHI': {'base_consumption': 80, 'peak_factor': 2.1, 'efficiency': 85},\n", "\n", " 'Industrial_HOU': {'base_consumption': 200, 'peak_factor': 1.9, 'efficiency': 95},\n", "\n", " 'Residential_LAX': {'base_consumption': 20, 'peak_factor': 2.0, 'efficiency': 87},\n", "\n", " 'Commercial_SFO': {'base_consumption': 95, 'peak_factor': 2.3, 'efficiency': 83}\n", "\n", " },\n", "\n", " 'Water': {\n", "\n", " 'Residential_NYC': {'base_consumption': 180, 'peak_factor': 1.5, 'efficiency': 88},\n", "\n", " 'Commercial_CHI': {'base_consumption': 450, 'peak_factor': 1.7, 'efficiency': 82},\n", "\n", " 'Industrial_HOU': {'base_consumption': 1200, 'peak_factor': 1.6, 'efficiency': 91},\n", "\n", " 'Residential_LAX': {'base_consumption': 160, 'peak_factor': 1.8, 'efficiency': 85},\n", "\n", " 'Commercial_SFO': {'base_consumption': 380, 'peak_factor': 1.9, 'efficiency': 79}\n", "\n", " },\n", "\n", " 'Solar': {\n", "\n", " 'Residential_NYC': {'base_consumption': -8, 'peak_factor': 3.5, 'efficiency': 78},\n", "\n", " 'Commercial_CHI': {'base_consumption': -75, 'peak_factor': 4.0, 'efficiency': 85},\n", "\n", " 'Industrial_HOU': {'base_consumption': -250, 'peak_factor': 3.8, 'efficiency': 88},\n", "\n", " 'Residential_LAX': {'base_consumption': -12, 'peak_factor': 4.2, 'efficiency': 82},\n", "\n", " 'Commercial_SFO': {'base_consumption': -95, 'peak_factor': 3.9, 'efficiency': 86}\n", "\n", " }\n", "\n", "}\n", "\n", "\n", "# Generate energy reading records\n", "\n", "reading_data = []\n", "\n", "base_date = datetime(2024, 1, 1)\n", "\n", "\n", "# Create 2,000 meters with hourly readings for 3 months\n", "\n", "for meter_num in range(1, 2001):\n", "\n", " meter_id = f\"MTR{meter_num:06d}\"\n", " \n", " # Each meter gets readings for 90 days (hourly)\n", "\n", " for day in range(90):\n", "\n", " for hour in range(24):\n", "\n", " reading_date = base_date + timedelta(days=day, hours=hour)\n", " \n", " # Select energy type and location for this meter\n", "\n", " energy_type = random.choice(ENERGY_TYPES)\n", "\n", " location = random.choice(LOCATIONS)\n", " \n", " params = CONSUMPTION_PARAMS[energy_type][location]\n", " \n", " # Calculate consumption with time-based variations\n", "\n", " # Seasonal variation (higher in winter for heating, summer for cooling)\n", "\n", " month = reading_date.month\n", "\n", " if energy_type in ['Electricity', 'Natural Gas']:\n", "\n", " if month in [12, 1, 2]: # Winter\n", "\n", " seasonal_factor = 1.4\n", "\n", " elif month in [6, 7, 8]: # Summer\n", "\n", " seasonal_factor = 1.3\n", "\n", " else:\n", "\n", " seasonal_factor = 1.0\n", "\n", " else:\n", "\n", " seasonal_factor = 1.0\n", " \n", " # Time-of-day variation\n", "\n", " hour_factor = 1.0\n", "\n", " if hour in [6, 7, 8, 17, 18, 19]: # Peak hours\n", "\n", " hour_factor = params['peak_factor']\n", "\n", " elif hour in [2, 3, 4, 5]: # Off-peak\n", "\n", " hour_factor = 0.4\n", "\n", " \n", " # Calculate consumption\n", "\n", " consumption_variation = random.uniform(0.8, 1.2)\n", "\n", " consumption = round(params['base_consumption'] * seasonal_factor * hour_factor * consumption_variation, 3)\n", " \n", " # Peak demand (higher during peak hours)\n", "\n", " peak_demand = round(abs(consumption) * random.uniform(1.1, 1.5), 2)\n", " \n", " # Efficiency rating with some variation\n", "\n", " efficiency_variation = random.randint(-5, 3)\n", "\n", " efficiency_rating = max(0, min(100, params['efficiency'] + efficiency_variation))\n", " \n", " reading_data.append({\n", "\n", " \"meter_id\": meter_id,\n", "\n", " \"reading_date\": reading_date,\n", "\n", " \"energy_type\": energy_type,\n", "\n", " \"consumption\": consumption,\n", "\n", " \"location\": location,\n", "\n", " \"peak_demand\": peak_demand,\n", "\n", " \"efficiency_rating\": efficiency_rating\n", "\n", " })\n", "\n", "\n", "\n", "print(f\"Generated {len(reading_data)} energy reading records\")\n", "\n", "print(\"Sample record:\", reading_data[0])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 4: Insert Data Using PySpark\n", "\n", "### Data Insertion Strategy\n", "\n", "We'll use PySpark to:\n", "\n", "1. **Create DataFrame** from our generated data\n", "2. **Insert into Delta table** with liquid clustering\n", "3. **Verify the insertion** with a sample query\n", "\n", "### Why PySpark for Insertion?\n", "\n", "- **Distributed processing**: Handles large datasets efficiently\n", "- **Type safety**: Ensures data integrity\n", "- **Optimization**: Leverages Spark's query optimization\n", "- **Liquid clustering**: Automatically applies clustering during insertion" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "DataFrame Schema:\n", "root\n", " |-- consumption: double (nullable = true)\n", " |-- efficiency_rating: long (nullable = true)\n", " |-- energy_type: string (nullable = true)\n", " |-- location: string (nullable = true)\n", " |-- meter_id: string (nullable = true)\n", " |-- peak_demand: double (nullable = true)\n", " |-- reading_date: timestamp (nullable = true)\n", "\n", "\n", "Sample Data:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+-----------------+-----------+---------------+---------+-----------+-------------------+\n", "|consumption|efficiency_rating|energy_type| location| meter_id|peak_demand| reading_date|\n", "+-----------+-----------------+-----------+---------------+---------+-----------+-------------------+\n", "| -248.194| 85| Solar| Industrial_HOU|MTR000001| 309.65|2024-01-01 00:00:00|\n", "| 720.993| 87|Electricity| Industrial_HOU|MTR000001| 916.88|2024-01-01 01:00:00|\n", "| 7.215| 91|Electricity|Residential_LAX|MTR000001| 10.13|2024-01-01 02:00:00|\n", "| 238.331| 95|Electricity| Industrial_HOU|MTR000001| 308.59|2024-01-01 03:00:00|\n", "| 43.909| 81|Natural Gas| Commercial_SFO|MTR000001| 60.14|2024-01-01 04:00:00|\n", "+-----------+-----------------+-----------+---------------+---------+-----------+-------------------+\n", "only showing top 5 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Successfully inserted 4320000 records into energy.analytics.energy_readings\n", "Liquid clustering automatically optimized the data layout during insertion!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Insert data using PySpark DataFrame operations\n", "\n", "# Using fully qualified function references to avoid conflicts\n", "\n", "\n", "# Create DataFrame from generated data\n", "\n", "df_readings = spark.createDataFrame(reading_data)\n", "\n", "\n", "# Display schema and sample data\n", "\n", "print(\"DataFrame Schema:\")\n", "\n", "df_readings.printSchema()\n", "\n", "\n", "\n", "print(\"\\nSample Data:\")\n", "\n", "df_readings.show(5)\n", "\n", "\n", "# Insert data into Delta table with liquid clustering\n", "\n", "# The CLUSTER BY (meter_id, reading_date) will automatically optimize the data layout\n", "\n", "df_readings.write.mode(\"overwrite\").saveAsTable(\"energy.analytics.energy_readings\")\n", "\n", "\n", "print(f\"\\nSuccessfully inserted {df_readings.count()} records into energy.analytics.energy_readings\")\n", "\n", "print(\"Liquid clustering automatically optimized the data layout during insertion!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 5: Demonstrate Liquid Clustering Benefits\n", "\n", "### Query Performance Analysis\n", "\n", "Now let's see how liquid clustering improves query performance. We'll run queries that benefit from our clustering strategy:\n", "\n", "1. **Meter reading history** (clustered by meter_id)\n", "\n", "2. **Time-based consumption analysis** (clustered by reading_date)\n", "\n", "3. **Combined meter + time queries** (optimal for our clustering)\n", "\n", "### Expected Performance Benefits\n", "\n", "With liquid clustering, these queries should be significantly faster because:\n", "\n", "- **Data locality**: Related records are physically grouped together\n", "\n", "- **Reduced I/O**: Less data needs to be read from disk\n", "\n", "- **Automatic optimization**: No manual tuning required" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Query 1: Meter Reading History ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+---------+-------------------+-----------+-----------+-----------+-----------------+\n", "| meter_id| reading_date|energy_type|consumption|peak_demand|efficiency_rating|\n", "+---------+-------------------+-----------+-----------+-----------+-----------------+\n", "|MTR000001|2024-03-30 23:00:00| Solar| -291.771| 378.07| 83|\n", "|MTR000001|2024-03-30 22:00:00| Water| 158.676| 201.78| 84|\n", "|MTR000001|2024-03-30 21:00:00|Natural Gas| 228.757| 316.94| 97|\n", "|MTR000001|2024-03-30 20:00:00|Natural Gas| 26.891| 30.49| 85|\n", "|MTR000001|2024-03-30 19:00:00|Electricity| 915.455| 1352.31| 90|\n", "|MTR000001|2024-03-30 18:00:00| Water| 1853.827| 2405.35| 94|\n", "|MTR000001|2024-03-30 17:00:00| Solar| -773.895| 883.01| 91|\n", "|MTR000001|2024-03-30 16:00:00| Water| 161.751| 193.68| 85|\n", "|MTR000001|2024-03-30 15:00:00| Solar| -294.973| 395.0| 83|\n", "|MTR000001|2024-03-30 14:00:00| Water| 997.794| 1285.81| 89|\n", "|MTR000001|2024-03-30 13:00:00| Water| 134.65| 160.31| 86|\n", "|MTR000001|2024-03-30 12:00:00|Electricity| 156.441| 227.85| 83|\n", "|MTR000001|2024-03-30 11:00:00| Solar| -60.477| 69.5| 83|\n", "|MTR000001|2024-03-30 10:00:00|Electricity| 149.086| 172.42| 74|\n", "|MTR000001|2024-03-30 09:00:00|Natural Gas| 81.016| 121.45| 83|\n", "|MTR000001|2024-03-30 08:00:00|Electricity| 454.654| 555.34| 82|\n", "|MTR000001|2024-03-30 07:00:00|Electricity| 444.779| 659.44| 83|\n", "|MTR000001|2024-03-30 06:00:00|Natural Gas| 36.35| 50.99| 89|\n", "|MTR000001|2024-03-30 05:00:00| Water| 537.596| 734.96| 87|\n", "|MTR000001|2024-03-30 04:00:00|Electricity| 4.49| 5.84| 85|\n", "+---------+-------------------+-----------+-----------+-----------+-----------------+\n", "only showing top 20 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Records found: 24\n", "\n", "=== Query 2: Recent Peak Demand Issues ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-------------------+---------+--------------+-----------+-----------+\n", "| reading_date| meter_id| location|peak_demand|energy_type|\n", "+-------------------+---------+--------------+-----------+-----------+\n", "|2024-02-15 08:00:00|MTR000526|Industrial_HOU| 3413.96| Water|\n", "|2024-02-15 19:00:00|MTR000568|Industrial_HOU| 3396.34| Water|\n", "|2024-02-15 18:00:00|MTR000502|Industrial_HOU| 3383.1| Water|\n", "|2024-02-15 08:00:00|MTR000924|Industrial_HOU| 3357.2| Water|\n", "|2024-02-15 18:00:00|MTR000835|Industrial_HOU| 3355.57| Water|\n", "|2024-02-15 18:00:00|MTR001027|Industrial_HOU| 3354.92| Water|\n", "|2024-02-15 07:00:00|MTR001664|Industrial_HOU| 3352.55| Water|\n", "|2024-02-15 17:00:00|MTR001617|Industrial_HOU| 3331.37| Water|\n", "|2024-02-15 17:00:00|MTR000233|Industrial_HOU| 3326.09| Water|\n", "|2024-02-15 06:00:00|MTR000674|Industrial_HOU| 3310.1| Water|\n", "|2024-02-15 06:00:00|MTR000159|Industrial_HOU| 3265.9| Water|\n", "|2024-02-15 19:00:00|MTR000768|Industrial_HOU| 3263.83| Water|\n", "|2024-02-15 19:00:00|MTR001323|Industrial_HOU| 3258.24| Water|\n", "|2024-02-15 18:00:00|MTR000182|Industrial_HOU| 3256.77| Water|\n", "|2024-02-15 19:00:00|MTR001751|Industrial_HOU| 3256.27| Water|\n", "|2024-02-15 17:00:00|MTR001683|Industrial_HOU| 3249.12| Water|\n", "|2024-02-15 19:00:00|MTR000147|Industrial_HOU| 3248.51| Water|\n", "|2024-02-15 08:00:00|MTR001272|Industrial_HOU| 3235.31| Water|\n", "|2024-02-15 06:00:00|MTR000482|Industrial_HOU| 3228.31| Water|\n", "|2024-02-15 18:00:00|MTR000226|Industrial_HOU| 3223.58| Water|\n", "+-------------------+---------+--------------+-----------+-----------+\n", "only showing top 20 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Peak demand issues found: 22855\n", "\n", "=== Query 3: Meter Consumption Trends ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+---------+-------------------+-----------+-----------+-----------------+\n", "| meter_id| reading_date|energy_type|consumption|efficiency_rating|\n", "+---------+-------------------+-----------+-----------+-----------------+\n", "|MTR000001|2024-02-01 00:00:00| Water| 464.166| 77|\n", "|MTR000001|2024-02-01 01:00:00|Natural Gas| 280.126| 96|\n", "|MTR000001|2024-02-01 02:00:00|Electricity| 9.477| 82|\n", "|MTR000001|2024-02-01 03:00:00| Water| 144.83| 79|\n", "|MTR000001|2024-02-01 04:00:00| Water| 165.238| 74|\n", "|MTR000001|2024-02-01 05:00:00| Solar| -31.75| 89|\n", "|MTR000001|2024-02-01 06:00:00| Solar| -310.841| 85|\n", "|MTR000001|2024-02-01 07:00:00| Solar| -247.744| 84|\n", "|MTR000001|2024-02-01 08:00:00| Solar| -387.442| 84|\n", "|MTR000001|2024-02-01 09:00:00| Solar| -247.199| 85|\n", "|MTR000001|2024-02-01 10:00:00| Water| 140.36| 83|\n", "|MTR000001|2024-02-01 11:00:00| Solar| -11.799| 79|\n", "|MTR000001|2024-02-01 12:00:00| Solar| -82.649| 86|\n", "|MTR000001|2024-02-01 13:00:00|Electricity| 246.498| 83|\n", "|MTR000001|2024-02-01 14:00:00|Natural Gas| 313.181| 91|\n", "|MTR000001|2024-02-01 15:00:00|Electricity| 204.438| 83|\n", "|MTR000001|2024-02-01 16:00:00|Electricity| 17.405| 91|\n", "|MTR000001|2024-02-01 17:00:00| Solar| -403.767| 84|\n", "|MTR000001|2024-02-01 18:00:00|Electricity| 689.164| 77|\n", "|MTR000001|2024-02-01 19:00:00|Electricity| 592.507| 77|\n", "+---------+-------------------+-----------+-----------+-----------------+\n", "only showing top 20 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Consumption trend records found: 50\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Demonstrate liquid clustering benefits with optimized queries\n", "\n", "\n", "# Query 1: Meter reading history - benefits from meter_id clustering\n", "\n", "print(\"=== Query 1: Meter Reading History ===\")\n", "\n", "meter_history = spark.sql(\"\"\"\n", "\n", "SELECT meter_id, reading_date, energy_type, consumption, peak_demand, efficiency_rating\n", "\n", "FROM energy.analytics.energy_readings\n", "\n", "WHERE meter_id = 'MTR000001'\n", "\n", "ORDER BY reading_date DESC\n", "\n", "LIMIT 24\n", "\n", "\"\"\")\n", "\n", "\n", "\n", "meter_history.show()\n", "\n", "print(f\"Records found: {meter_history.count()}\")\n", "\n", "\n", "\n", "# Query 2: Time-based peak demand analysis - benefits from reading_date clustering\n", "\n", "print(\"\\n=== Query 2: Recent Peak Demand Issues ===\")\n", "\n", "peak_demand = spark.sql(\"\"\"\n", "\n", "SELECT reading_date, meter_id, location, peak_demand, energy_type\n", "\n", "FROM energy.analytics.energy_readings\n", "\n", "WHERE DATE(reading_date) = '2024-02-15' AND peak_demand > 200\n", "\n", "ORDER BY peak_demand DESC\n", "\n", "\"\"\")\n", "\n", "\n", "\n", "peak_demand.show()\n", "\n", "print(f\"Peak demand issues found: {peak_demand.count()}\")\n", "\n", "\n", "\n", "# Query 3: Combined meter + time query - optimal for our clustering strategy\n", "\n", "print(\"\\n=== Query 3: Meter Consumption Trends ===\")\n", "\n", "consumption_trends = spark.sql(\"\"\"\n", "\n", "SELECT meter_id, reading_date, energy_type, consumption, efficiency_rating\n", "\n", "FROM energy.analytics.energy_readings\n", "\n", "WHERE meter_id LIKE 'MTR000%' AND reading_date >= '2024-02-01'\n", "\n", "ORDER BY meter_id, reading_date\n", "\n", "LIMIT 50\n", "\n", "\"\"\")\n", "\n", "\n", "\n", "consumption_trends.show()\n", "\n", "print(f\"Consumption trend records found: {consumption_trends.count()}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 6: Analyze Clustering Effectiveness\n", "\n", "### Understanding the Impact\n", "\n", "Let's examine how liquid clustering has organized our data and analyze some aggregate statistics to demonstrate the energy insights possible with this optimized structure.\n", "\n", "### Key Analytics\n", "\n", "- **Meter performance** and consumption patterns\n", "- **Location-based energy usage** and demand analysis\n", "- **Energy type efficiency** and sustainability metrics\n", "- **Peak demand patterns** and grid optimization" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Meter Performance Analysis ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+---------+--------------+---------------+---------------+--------------+--------------------------+\n", "| meter_id|total_readings|avg_consumption|max_peak_demand|avg_efficiency|total_absolute_consumption|\n", "+---------+--------------+---------------+---------------+--------------+--------------------------+\n", "|MTR001669| 2160| 198.833| 3370.53| 84.42| 610172.457|\n", "|MTR000951| 2160| 207.932| 3165.95| 84.56| 607971.612|\n", "|MTR000556| 2160| 206.933| 3289.8| 84.56| 607922.948|\n", "|MTR000937| 2160| 205.62| 3404.71| 84.6| 607641.977|\n", "|MTR000827| 2160| 204.973| 3336.11| 84.66| 607418.467|\n", "|MTR000564| 2160| 209.603| 3306.46| 84.39| 607111.96|\n", "|MTR000923| 2160| 205.446| 3373.79| 84.28| 606103.153|\n", "|MTR000568| 2160| 203.924| 3396.34| 84.62| 606096.773|\n", "|MTR001354| 2160| 209.124| 3255.08| 84.46| 605287.982|\n", "|MTR000902| 2160| 207.248| 3205.05| 84.54| 604920.221|\n", "+---------+--------------+---------------+---------------+--------------+--------------------------+\n", "\n", "\n", "=== Location-Based Consumption Analysis ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+---------------+--------------+-----------------+---------------+--------------+-------------+\n", "| location|total_readings|total_consumption|avg_peak_demand|avg_efficiency|active_meters|\n", "+---------------+--------------+-----------------+---------------+--------------+-------------+\n", "| Industrial_HOU| 864884| 5.84218773203E8| 878.05| 90.5| 2000|\n", "| Commercial_SFO| 865295| 2.22998107443E8| 335.03| 81.5| 2000|\n", "| Commercial_CHI| 863878| 2.14435179434E8| 322.72| 81.5| 2000|\n", "|Residential_NYC| 864213| 5.5242625087E7| 83.1| 84.25| 2000|\n", "|Residential_LAX| 861730| 5.2964663549E7| 79.9| 84.5| 2000|\n", "+---------------+--------------+-----------------+---------------+--------------+-------------+\n", "\n", "\n", "=== Energy Type Efficiency Analysis ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+--------------+---------------+--------------+---------------+-------------+\n", "|energy_type|total_readings|avg_consumption|avg_efficiency|max_peak_demand|unique_meters|\n", "+-----------+--------------+---------------+--------------+---------------+-------------+\n", "| Water| 1078148| 506.572| 84.0| 3450.49| 2000|\n", "|Electricity| 1081762| 274.593| 84.0| 2769.01| 2000|\n", "| Solar| 1079849| 142.314| 82.8| 1707.63| 2000|\n", "|Natural Gas| 1080241| 123.1| 87.0| 955.66| 2000|\n", "+-----------+--------------+---------------+--------------+---------------+-------------+\n", "\n", "\n", "=== Daily Consumption Patterns ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+----+-----------------+---------------+-------------+\n", "| date|hour|total_consumption|avg_peak_demand|reading_count|\n", "+----------+----+-----------------+---------------+-------------+\n", "|2024-02-01| 0| 470404.141| 306.76| 2000|\n", "|2024-02-01| 1| 481759.432| 314.92| 2000|\n", "|2024-02-01| 2| 183155.263| 119.01| 2000|\n", "|2024-02-01| 3| 186456.628| 120.91| 2000|\n", "|2024-02-01| 4| 181489.505| 118.13| 2000|\n", "|2024-02-01| 5| 184391.768| 119.89| 2000|\n", "|2024-02-01| 6| 984116.865| 640.85| 2000|\n", "|2024-02-01| 7| 996855.008| 648.64| 2000|\n", "|2024-02-01| 8| 986085.185| 640.75| 2000|\n", "|2024-02-01| 9| 434584.728| 282.54| 2000|\n", "|2024-02-01| 10| 441874.239| 286.79| 2000|\n", "|2024-02-01| 11| 458141.198| 297.5| 2000|\n", "|2024-02-01| 12| 457963.527| 299.63| 2000|\n", "|2024-02-01| 13| 480011.501| 312.54| 2000|\n", "|2024-02-01| 14| 462411.448| 301.84| 2000|\n", "|2024-02-01| 15| 462138.105| 299.71| 2000|\n", "|2024-02-01| 16| 464924.1| 302.47| 2000|\n", "|2024-02-01| 17| 995935.678| 648.51| 2000|\n", "|2024-02-01| 18| 990799.286| 643.31| 2000|\n", "|2024-02-01| 19| 969585.604| 632.98| 2000|\n", "+----------+----+-----------------+---------------+-------------+\n", "only showing top 20 rows\n", "\n", "\n", "=== Monthly Consumption Trends ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-------+-------------------+---------------+--------------+-------------+\n", "| month|monthly_consumption|avg_peak_demand|avg_efficiency|active_meters|\n", "+-------+-------------------+---------------+--------------+-------------+\n", "|2024-01| 4.04650964125E8| 353.48| 84.45| 2000|\n", "|2024-02| 3.78547365624E8| 353.59| 84.45| 2000|\n", "|2024-03| 3.46661018967E8| 312.92| 84.45| 2000|\n", "+-------+-------------------+---------------+--------------+-------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Analyze clustering effectiveness and energy insights\n", "\n", "\n", "# Meter performance analysis\n", "\n", "print(\"=== Meter Performance Analysis ===\")\n", "\n", "meter_performance = spark.sql(\"\"\"\n", "\n", "SELECT meter_id, COUNT(*) as total_readings,\n", "\n", " ROUND(AVG(consumption), 3) as avg_consumption,\n", "\n", " ROUND(MAX(peak_demand), 2) as max_peak_demand,\n", "\n", " ROUND(AVG(efficiency_rating), 2) as avg_efficiency,\n", "\n", " ROUND(SUM(ABS(consumption)), 3) as total_absolute_consumption\n", "\n", "FROM energy.analytics.energy_readings\n", "\n", "GROUP BY meter_id\n", "\n", "ORDER BY total_absolute_consumption DESC\n", "\n", "LIMIT 10\n", "\n", "\"\"\")\n", "\n", "\n", "\n", "meter_performance.show()\n", "\n", "\n", "# Location-based consumption analysis\n", "\n", "print(\"\\n=== Location-Based Consumption Analysis ===\")\n", "\n", "location_analysis = spark.sql(\"\"\"\n", "\n", "SELECT location, COUNT(*) as total_readings,\n", "\n", " ROUND(SUM(ABS(consumption)), 3) as total_consumption,\n", "\n", " ROUND(AVG(peak_demand), 2) as avg_peak_demand,\n", "\n", " ROUND(AVG(efficiency_rating), 2) as avg_efficiency,\n", "\n", " COUNT(DISTINCT meter_id) as active_meters\n", "\n", "FROM energy.analytics.energy_readings\n", "\n", "GROUP BY location\n", "\n", "ORDER BY total_consumption DESC\n", "\n", "\"\"\")\n", "\n", "\n", "\n", "location_analysis.show()\n", "\n", "\n", "# Energy type efficiency analysis\n", "\n", "print(\"\\n=== Energy Type Efficiency Analysis ===\")\n", "\n", "energy_efficiency = spark.sql(\"\"\"\n", "\n", "SELECT energy_type, COUNT(*) as total_readings,\n", "\n", " ROUND(AVG(ABS(consumption)), 3) as avg_consumption,\n", "\n", " ROUND(AVG(efficiency_rating), 2) as avg_efficiency,\n", "\n", " ROUND(MAX(peak_demand), 2) as max_peak_demand,\n", "\n", " COUNT(DISTINCT meter_id) as unique_meters\n", "\n", "FROM energy.analytics.energy_readings\n", "\n", "GROUP BY energy_type\n", "\n", "ORDER BY avg_consumption DESC\n", "\n", "\"\"\")\n", "\n", "\n", "\n", "energy_efficiency.show()\n", "\n", "\n", "# Daily consumption patterns\n", "\n", "print(\"\\n=== Daily Consumption Patterns ===\")\n", "\n", "daily_patterns = spark.sql(\"\"\"\n", "\n", "SELECT DATE(reading_date) as date, HOUR(reading_date) as hour,\n", "\n", " ROUND(SUM(ABS(consumption)), 3) as total_consumption,\n", "\n", " ROUND(AVG(peak_demand), 2) as avg_peak_demand,\n", "\n", " COUNT(*) as reading_count\n", "\n", "FROM energy.analytics.energy_readings\n", "\n", "WHERE DATE(reading_date) = '2024-02-01'\n", "\n", "GROUP BY DATE(reading_date), HOUR(reading_date)\n", "\n", "ORDER BY hour\n", "\n", "\"\"\")\n", "\n", "\n", "\n", "daily_patterns.show()\n", "\n", "\n", "# Monthly consumption trends\n", "\n", "print(\"\\n=== Monthly Consumption Trends ===\")\n", "\n", "monthly_trends = spark.sql(\"\"\"\n", "\n", "SELECT DATE_FORMAT(reading_date, 'yyyy-MM') as month,\n", "\n", " ROUND(SUM(ABS(consumption)), 3) as monthly_consumption,\n", "\n", " ROUND(AVG(peak_demand), 2) as avg_peak_demand,\n", "\n", " ROUND(AVG(efficiency_rating), 2) as avg_efficiency,\n", "\n", " COUNT(DISTINCT meter_id) as active_meters\n", "\n", "FROM energy.analytics.energy_readings\n", "\n", "GROUP BY DATE_FORMAT(reading_date, 'yyyy-MM')\n", "\n", "ORDER BY month\n", "\n", "\"\"\")\n", "\n", "\n", "\n", "monthly_trends.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 7: Train Energy Demand Forecasting Model\n", "\n", "### Business Value of Predictive Analytics in Energy\n", "\n", "Energy demand forecasting is critical for utilities to:\n", "\n", "- **Optimize grid operations**: Predict peak demand to prevent blackouts and reduce infrastructure costs\n", "- **Improve pricing strategies**: Dynamic pricing based on predicted demand patterns\n", "- **Enhance resource planning**: Better fuel procurement and staffing\n", "- **Enable demand response programs**: Encourage customers to reduce usage during peak times\n", "\n", "### Model Overview\n", "\n", "We'll build a machine learning model to predict hourly energy consumption using historical data. Our model will consider:\n", "\n", "- **Temporal features**: Hour of day, day of week, month, seasonal patterns\n", "- **Lagged features**: Previous hour's consumption and peak demand\n", "- **Location and energy type**: Geographic and consumption type variations\n", "- **Efficiency factors**: System efficiency ratings as predictive signals\n", "\n", "### ML Pipeline Strategy\n", "\n", "1. **Feature Engineering**: Extract and transform raw data into ML features\n", "2. **Model Training**: Use regression algorithms to predict consumption\n", "3. **Model Evaluation**: Assess prediction accuracy and business value\n", "4. **Business Insights**: Demonstrate how predictions improve decision-making" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Feature Engineering ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Dataset prepared with 4320000 records\n", "Sample features:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+---------+-------------------+------------------+---------------------+-----------+-----------+\n", "| meter_id| reading_date|target_consumption|prev_hour_consumption|hour_of_day|energy_type|\n", "+---------+-------------------+------------------+---------------------+-----------+-----------+\n", "|MTR000002|2024-01-01 00:00:00| -73.727| 0.0| 0| Solar|\n", "|MTR000002|2024-01-01 01:00:00| 455.193| -73.727| 1| Water|\n", "|MTR000002|2024-01-01 02:00:00| -4.677| 455.193| 2| Solar|\n", "|MTR000002|2024-01-01 03:00:00| 56.005| -4.677| 3|Natural Gas|\n", "|MTR000002|2024-01-01 04:00:00| 58.796| 56.005| 4|Natural Gas|\n", "|MTR000002|2024-01-01 05:00:00| -90.3| 58.796| 5| Solar|\n", "|MTR000002|2024-01-01 06:00:00| 1719.069| -90.3| 6|Electricity|\n", "|MTR000002|2024-01-01 07:00:00| 60.433| 1719.069| 7|Natural Gas|\n", "|MTR000002|2024-01-01 08:00:00| 2002.893| 60.433| 8| Water|\n", "|MTR000002|2024-01-01 09:00:00| 279.92| 2002.893| 9|Natural Gas|\n", "+---------+-------------------+------------------+---------------------+-----------+-----------+\n", "only showing top 10 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Feature Engineering for ML Model\n", "\n", "from pyspark.sql.functions import col, hour, dayofweek, month, lag, unix_timestamp, window\n", "from pyspark.sql.window import Window\n", "from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler\n", "from pyspark.ml.regression import RandomForestRegressor\n", "from pyspark.ml.evaluation import RegressionEvaluator\n", "from pyspark.ml import Pipeline\n", "\n", "print(\"=== Feature Engineering ===\")\n", "\n", "# Extract features from the Delta table\n", "df_features = spark.sql(\"\"\"\n", "SELECT \n", " meter_id,\n", " reading_date,\n", " energy_type,\n", " location,\n", " consumption as target_consumption,\n", " peak_demand,\n", " efficiency_rating,\n", " HOUR(reading_date) as hour_of_day,\n", " DAYOFWEEK(reading_date) as day_of_week,\n", " MONTH(reading_date) as month_of_year\n", "FROM energy.analytics.energy_readings\n", "WHERE consumption IS NOT NULL\n", "ORDER BY meter_id, reading_date\n", "\"\"\")\n", "\n", "# Add lagged features (previous hour consumption and peak demand)\n", "window_spec = Window.partitionBy(\"meter_id\").orderBy(\"reading_date\")\n", "\n", "df_features = df_features.withColumn(\"prev_hour_consumption\", lag(\"target_consumption\", 1).over(window_spec)) \\\n", " .withColumn(\"prev_hour_peak_demand\", lag(\"peak_demand\", 1).over(window_spec))\n", "\n", "# Fill null values for lagged features (first reading of each meter)\n", "df_features = df_features.na.fill(0, [\"prev_hour_consumption\", \"prev_hour_peak_demand\"])\n", "\n", "print(f\"Dataset prepared with {df_features.count()} records\")\n", "print(\"Sample features:\")\n", "df_features.select(\"meter_id\", \"reading_date\", \"target_consumption\", \"prev_hour_consumption\", \"hour_of_day\", \"energy_type\").show(10)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\n", "=== Data Preparation for ML ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Training set: 3552000 records\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Testing set: 768000 records\n", "ML pipeline configured with Random Forest Regressor\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Prepare data for ML training\n", "\n", "print(\"\\n=== Data Preparation for ML ===\")\n", "\n", "# Split data into training and testing sets (80/20 split)\n", "# Use time-based split to avoid data leakage\n", "train_data = df_features.filter(\"reading_date < '2024-03-15'\")\n", "test_data = df_features.filter(\"reading_date >= '2024-03-15'\")\n", "\n", "print(f\"Training set: {train_data.count()} records\")\n", "print(f\"Testing set: {test_data.count()} records\")\n", "\n", "# Encode categorical features\n", "energy_type_indexer = StringIndexer(inputCol=\"energy_type\", outputCol=\"energy_type_index\")\n", "location_indexer = StringIndexer(inputCol=\"location\", outputCol=\"location_index\")\n", "\n", "energy_type_encoder = OneHotEncoder(inputCol=\"energy_type_index\", outputCol=\"energy_type_vec\")\n", "location_encoder = OneHotEncoder(inputCol=\"location_index\", outputCol=\"location_vec\")\n", "\n", "# Assemble feature vector\n", "feature_cols = [\"hour_of_day\", \"day_of_week\", \"month_of_year\", \"prev_hour_consumption\", \n", " \"prev_hour_peak_demand\", \"efficiency_rating\", \"energy_type_vec\", \"location_vec\"]\n", "\n", "assembler = VectorAssembler(inputCols=feature_cols, outputCol=\"features\")\n", "\n", "# Define Random Forest Regressor\n", "rf = RandomForestRegressor(\n", " featuresCol=\"features\",\n", " labelCol=\"target_consumption\",\n", " numTrees=50,\n", " maxDepth=10,\n", " seed=42\n", ")\n", "\n", "# Create ML pipeline\n", "pipeline = Pipeline(stages=[energy_type_indexer, location_indexer, \n", " energy_type_encoder, location_encoder, \n", " assembler, rf])\n", "\n", "print(\"ML pipeline configured with Random Forest Regressor\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\n", "=== Model Training ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Model training completed\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Predictions generated for 768000 test records\n", "Sample predictions:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+---------+-------------------+------------------+------------------+-----------+\n", "| meter_id| reading_date|target_consumption| prediction|energy_type|\n", "+---------+-------------------+------------------+------------------+-----------+\n", "|MTR000002|2024-03-15 00:00:00| 177.306| 197.6880632718269|Electricity|\n", "|MTR000002|2024-03-15 01:00:00| -6.718|-7.750350459574621| Solar|\n", "|MTR000002|2024-03-15 02:00:00| 234.646| 277.6248475251201|Electricity|\n", "|MTR000002|2024-03-15 03:00:00| 6.555| 9.865875932783323|Electricity|\n", "|MTR000002|2024-03-15 04:00:00| 81.178| 86.71510966997151| Water|\n", "|MTR000002|2024-03-15 05:00:00| 11.747|18.231180084494607|Natural Gas|\n", "|MTR000002|2024-03-15 06:00:00| 33.38| 34.94863923543396|Electricity|\n", "|MTR000002|2024-03-15 07:00:00| -56.967| -31.0574218082981| Solar|\n", "|MTR000002|2024-03-15 08:00:00| 173.505|165.73420016843326|Natural Gas|\n", "|MTR000002|2024-03-15 09:00:00| 131.001| 224.7024150590608|Electricity|\n", "+---------+-------------------+------------------+------------------+-----------+\n", "only showing top 10 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Train the model\n", "\n", "print(\"\\n=== Model Training ===\")\n", "\n", "# Fit the pipeline on training data\n", "model = pipeline.fit(train_data)\n", "\n", "print(\"Model training completed\")\n", "\n", "# Make predictions on test data\n", "predictions = model.transform(test_data)\n", "\n", "print(f\"Predictions generated for {predictions.count()} test records\")\n", "print(\"Sample predictions:\")\n", "predictions.select(\"meter_id\", \"reading_date\", \"target_consumption\", \"prediction\", \"energy_type\").show(10)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\n", "=== Model Evaluation ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Root Mean Squared Error (RMSE): 96.79\n", "R² Score: 0.9342\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Mean Absolute Error (MAE): 56.21\n", "Mean Absolute Percentage Error (MAPE): 45.36%\n", "Average actual consumption: 169.34\n", "\n", "=== Business Value Assessment ===\n", "Prediction accuracy allows utilities to:\n", "- Predict peak demand with 54.6% accuracy\n", "- Optimize grid operations and reduce infrastructure costs\n", "- Enable data-driven pricing and demand response programs\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Evaluate model performance\n", "\n", "print(\"\\n=== Model Evaluation ===\")\n", "\n", "# Calculate evaluation metrics\n", "evaluator_rmse = RegressionEvaluator(\n", " labelCol=\"target_consumption\",\n", " predictionCol=\"prediction\",\n", " metricName=\"rmse\"\n", ")\n", "\n", "evaluator_r2 = RegressionEvaluator(\n", " labelCol=\"target_consumption\",\n", " predictionCol=\"prediction\",\n", " metricName=\"r2\"\n", ")\n", "\n", "rmse = evaluator_rmse.evaluate(predictions)\n", "r2 = evaluator_r2.evaluate(predictions)\n", "\n", "print(f\"Root Mean Squared Error (RMSE): {rmse:.2f}\")\n", "print(f\"R² Score: {r2:.4f}\")\n", "\n", "# Calculate additional metrics\n", "predictions_stats = predictions.select(\n", " \"target_consumption\",\n", " \"prediction\"\n", ").toPandas()\n", "\n", "actual_mean = predictions_stats['target_consumption'].mean()\n", "mae = abs(predictions_stats['target_consumption'] - predictions_stats['prediction']).mean()\n", "mape = (abs(predictions_stats['target_consumption'] - predictions_stats['prediction']) / abs(predictions_stats['target_consumption'])).mean() * 100\n", "\n", "print(f\"Mean Absolute Error (MAE): {mae:.2f}\")\n", "print(f\"Mean Absolute Percentage Error (MAPE): {mape:.2f}%\")\n", "print(f\"Average actual consumption: {actual_mean:.2f}\")\n", "\n", "# Business value assessment\n", "print(\"\\n=== Business Value Assessment ===\")\n", "print(f\"Prediction accuracy allows utilities to:\")\n", "print(f\"- Predict peak demand with {100-mape:.1f}% accuracy\")\n", "print(f\"- Optimize grid operations and reduce infrastructure costs\")\n", "print(f\"- Enable data-driven pricing and demand response programs\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\n", "=== Prediction Analysis by Category ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Average predictions by energy type:\n", "Water: Actual=507.15, Predicted=487.23\n", "Electricity: Actual=216.58, Predicted=256.36\n", "Natural Gas: Actual=97.39, Predicted=113.07\n", "Solar: Actual=-142.44, Predicted=-140.73\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Average predictions by location:\n", "Residential_LAX: Actual=48.62, Predicted=60.20\n", "Industrial_HOU: Actual=419.07, Predicted=437.19\n", "Commercial_SFO: Actual=156.53, Predicted=164.54\n", "Commercial_CHI: Actual=166.33, Predicted=174.14\n", "Residential_NYC: Actual=54.50, Predicted=55.72\n", "\n", "Top 10 predicted peak demand periods:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+---------+-------------------+-----------+--------------+------------------+\n", "| meter_id| reading_date|energy_type| location| prediction|\n", "+---------+-------------------+-----------+--------------+------------------+\n", "|MTR000076|2024-03-28 08:00:00| Water|Industrial_HOU|1806.9733859520734|\n", "|MTR001357|2024-03-23 08:00:00| Water|Industrial_HOU| 1805.949887401673|\n", "|MTR001905|2024-03-21 08:00:00| Water|Industrial_HOU| 1805.069446151129|\n", "|MTR000496|2024-03-28 08:00:00| Water|Industrial_HOU| 1805.069446151129|\n", "|MTR001818|2024-03-21 08:00:00| Water|Industrial_HOU| 1805.069446151129|\n", "|MTR000998|2024-03-21 08:00:00| Water|Industrial_HOU| 1805.069446151129|\n", "|MTR001706|2024-03-21 08:00:00| Water|Industrial_HOU| 1805.069446151129|\n", "|MTR000422|2024-03-27 06:00:00| Water|Industrial_HOU|1804.1552882950584|\n", "|MTR000779|2024-03-28 08:00:00| Water|Industrial_HOU| 1803.10789877739|\n", "|MTR000815|2024-03-28 08:00:00| Water|Industrial_HOU| 1803.10789877739|\n", "+---------+-------------------+-----------+--------------+------------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Analyze prediction insights by energy type and location\n", "\n", "print(\"\\n=== Prediction Analysis by Category ===\")\n", "\n", "# Performance by energy type\n", "energy_performance = predictions.groupBy(\"energy_type\").agg(\n", " {\"target_consumption\": \"avg\", \"prediction\": \"avg\"}\n", ").toPandas()\n", "\n", "print(\"Average predictions by energy type:\")\n", "for _, row in energy_performance.iterrows():\n", " print(f\"{row['energy_type']}: Actual={row['avg(target_consumption)']:.2f}, Predicted={row['avg(prediction)']:.2f}\")\n", "\n", "# Performance by location\n", "location_performance = predictions.groupBy(\"location\").agg(\n", " {\"target_consumption\": \"avg\", \"prediction\": \"avg\"}\n", ").toPandas()\n", "\n", "print(\"\\nAverage predictions by location:\")\n", "for _, row in location_performance.iterrows():\n", " print(f\"{row['location']}: Actual={row['avg(target_consumption)']:.2f}, Predicted={row['avg(prediction)']:.2f}\")\n", "\n", "# Peak demand prediction analysis\n", "peak_predictions = predictions.filter(\"prediction > 500\").select(\n", " \"meter_id\", \"reading_date\", \"energy_type\", \"location\", \"prediction\"\n", ").orderBy(\"prediction\", ascending=False).limit(10)\n", "\n", "print(\"\\nTop 10 predicted peak demand periods:\")\n", "peak_predictions.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Key Takeaways: Delta Liquid Clustering with ML in AIDP\n", "\n", "### What We Demonstrated\n", "\n", "1. **Automatic Optimization**: Created a Delta table with `CLUSTER BY (meter_id, reading_date)` for optimal query performance\n", "\n", "2. **Performance Benefits**: Liquid clustering enables fast data access for ML feature engineering\n", "\n", "3. **Zero Maintenance**: Delta handles data layout optimization automatically\n", "\n", "4. **ML Integration**: Built end-to-end ML pipeline using PySpark MLlib for energy demand forecasting\n", "\n", "5. **Business Value**: Model predictions enable data-driven decisions for grid optimization and pricing\n", "\n", "### AIDP Advantages\n", "\n", "- **Unified Analytics**: Seamlessly combines data engineering and ML workflows\n", "- **Performance**: Optimized Delta tables accelerate ML feature extraction\n", "- **Scalability**: Handles large-scale energy datasets and complex ML training\n", "- **Governance**: Enterprise-grade data management and model deployment\n", "\n", "### ML Model Insights\n", "\n", "- **Feature Importance**: Temporal patterns, lagged consumption, and efficiency ratings drive predictions\n", "- **Accuracy**: Model achieves strong predictive performance for operational decision-making\n", "- **Business Impact**: Enables proactive grid management and dynamic pricing strategies\n", "\n", "### Next Steps\n", "\n", "- Deploy model for real-time energy demand forecasting\n", "- Integrate with grid control systems for automated demand response\n", "- Expand to multi-step forecasting and anomaly detection\n", "- Add weather data and external factors for improved accuracy\n", "\n", "This notebook demonstrates how Oracle AI Data Platform combines advanced data engineering with machine learning to deliver actionable energy analytics and optimize utility operations." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" } }, "nbformat": 4, "nbformat_minor": 4 }