{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Manufacturing: Medallion Architecture with Delta Liquid Clustering\n", "\n", "## Overview\n", "\n", "This notebook demonstrates the **Medallion Architecture** in Oracle AI Data Platform (AIDP) Workbench using Delta Liquid Clustering. The medallion architecture organizes data into three layers (Bronze, Silver, Gold) that progressively refine data quality and structure for different use cases.\n", "\n", "### What is the Medallion Architecture?\n", "\n", "- **Bronze Layer**: Raw, unprocessed data as ingested from source systems\n", "- **Silver Layer**: Cleaned, standardized, and enriched data\n", "- **Gold Layer**: Business-ready, aggregated data for analytics and reporting\n", "\n", "### What is Liquid Clustering?\n", "\n", "Liquid clustering automatically optimizes data layout for query performance without requiring manual partitioning or Z-Ordering.\n", "\n", "### Use Case: Manufacturing Analytics\n", "\n", "We'll process manufacturing production records through all three medallion layers, optimizing each with liquid clustering for equipment monitoring, quality control, and business intelligence.\n", "\n", "### AIDP Environment Setup\n", "\n", "This notebook leverages the existing Spark session in your AIDP environment." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Layer 1: Bronze - Raw Data Ingestion\n", "\n", "### Bronze Layer Purpose\n", "\n", "The bronze layer serves as the landing zone for raw data. Data is ingested in its original format with minimal processing:\n", "\n", "- **Raw format**: Data as received from source systems\n", "- **Minimal validation**: Basic schema enforcement\n", "- **Immutable**: Historical data preserved as-is\n", "- **Optimized for ingestion**: Fast write operations\n", "\n", "### Table Design\n", "\n", "Our bronze `production_records_bronze` table stores raw manufacturing data with liquid clustering optimized for time-based partitioning and equipment queries." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Manufacturing catalog with Bronze, Silver, and Gold schemas created successfully!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create manufacturing catalog and schemas for medallion architecture\n", "\n", "# Bronze, Silver, and Gold schemas provide data isolation and governance\n", "\n", "spark.sql(\"CREATE CATALOG IF NOT EXISTS manufacturing\")\n", "\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS manufacturing.bronze\")\n", "\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS manufacturing.silver\")\n", "\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS manufacturing.gold\")\n", "\n", "print(\"Manufacturing catalog with Bronze, Silver, and Gold schemas created successfully!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Bronze layer Delta table with liquid clustering created successfully!\n", "Clustering optimizes for time-based queries and equipment monitoring.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create Bronze layer Delta table with liquid clustering\n", "\n", "# CLUSTER BY optimizes for time-based queries and equipment monitoring\n", "\n", "spark.sql(\"\"\"\n", "\n", "CREATE TABLE IF NOT EXISTS manufacturing.bronze.production_records_bronze (\n", "\n", " machine_id STRING,\n", "\n", " production_date TIMESTAMP,\n", "\n", " product_type STRING,\n", "\n", " units_produced INT,\n", "\n", " defect_count INT,\n", "\n", " production_line STRING,\n", "\n", " cycle_time DECIMAL(5,2),\n", "\n", " ingestion_timestamp TIMESTAMP,\n", "\n", " source_system STRING \n", "\n", ")\n", "\n", "USING DELTA\n", "\n", "CLUSTER BY (production_date, machine_id)\n", "\n", "\"\"\")\n", "\n", "print(\"Bronze layer Delta table with liquid clustering created successfully!\")\n", "print(\"Clustering optimizes for time-based queries and equipment monitoring.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Generated 12137 raw production records for Bronze layer\n", "Data includes intentional quality issues for Silver layer processing demo\n", "Sample record: {'machine_id': 'MCH0001', 'production_date': datetime.datetime(2024, 8, 9, 7, 0), 'product_type': 'Automotive Parts', 'units_produced': 242, 'defect_count': 18, 'production_line': 'LINE_B', 'cycle_time': 10.78, 'source_system': 'manufacturing_scada'}\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Generate and ingest raw manufacturing production data into Bronze layer\n", "\n", "# Using fully qualified imports to avoid conflicts\n", "\n", "import random\n", "from datetime import datetime, timedelta\n", "\n", "# Define manufacturing data constants\n", "PRODUCT_TYPES = ['Electronics', 'Automotive Parts', 'Consumer Goods', 'Industrial Equipment']\n", "PRODUCTION_LINES = ['LINE_A', 'LINE_B', 'LINE_C', 'LINE_D', 'LINE_E']\n", "\n", "# Base production parameters by product type\n", "PRODUCTION_PARAMS = {\n", " 'Electronics': {'base_units': 500, 'defect_rate': 0.02, 'cycle_time': 2.5},\n", " 'Automotive Parts': {'base_units': 200, 'defect_rate': 0.05, 'cycle_time': 8.0},\n", " 'Consumer Goods': {'base_units': 800, 'defect_rate': 0.03, 'cycle_time': 1.8},\n", " 'Industrial Equipment': {'base_units': 50, 'defect_rate': 0.08, 'cycle_time': 25.0}\n", "}\n", "\n", "# Generate production records with some data quality issues (for Silver layer cleaning)\n", "production_data = []\n", "base_date = datetime(2024, 1, 1)\n", "\n", "# Create 200 machines with 30-90 production runs each\n", "for machine_num in range(1, 201):\n", " machine_id = f\"MCH{machine_num:04d}\"\n", " \n", " # Each machine gets 30-90 production runs over 12 months\n", " num_runs = random.randint(30, 90)\n", " \n", " for i in range(num_runs):\n", " # Spread production runs over 12 months (weekdays only, during shifts)\n", " days_offset = random.randint(0, 365)\n", " production_date = base_date + timedelta(days=days_offset)\n", " \n", " # Skip weekends\n", " while production_date.weekday() >= 5:\n", " production_date += timedelta(days=1)\n", " \n", " # Add shift timing (6 AM - 6 PM)\n", " hours_offset = random.randint(6, 18)\n", " production_date = production_date.replace(hour=hours_offset, minute=0, second=0, microsecond=0)\n", " \n", " # Select product type\n", " product_type = random.choice(PRODUCT_TYPES)\n", " params = PRODUCTION_PARAMS[product_type]\n", " \n", " # Calculate production with variability\n", " units_variation = random.uniform(0.7, 1.3)\n", " units_produced = int(params['base_units'] * units_variation)\n", " \n", " # Calculate defects\n", " defect_rate_variation = random.uniform(0.5, 2.0)\n", " actual_defect_rate = params['defect_rate'] * defect_rate_variation\n", " defect_count = int(units_produced * actual_defect_rate)\n", " \n", " # Calculate cycle time with variation\n", " cycle_time_variation = random.uniform(0.8, 1.4)\n", " cycle_time = round(params['cycle_time'] * cycle_time_variation, 2)\n", " \n", " # Select production line\n", " production_line = random.choice(PRODUCTION_LINES)\n", " \n", " # Add some data quality issues (nulls, invalid values) for Silver layer demo\n", " if random.random() < 0.05: # 5% chance of null values\n", " if random.choice([True, False]):\n", " units_produced = None\n", " else:\n", " cycle_time = None\n", " \n", " if random.random() < 0.02: # 2% chance of negative values\n", " defect_count = -abs(defect_count)\n", " \n", " production_data.append({\n", " \"machine_id\": machine_id,\n", " \"production_date\": production_date,\n", " \"product_type\": product_type,\n", " \"units_produced\": units_produced,\n", " \"defect_count\": defect_count,\n", " \"production_line\": production_line,\n", " \"cycle_time\": cycle_time,\n", " \"source_system\": \"manufacturing_scada\"\n", " })\n", "\n", "print(f\"Generated {len(production_data)} raw production records for Bronze layer\")\n", "print(\"Data includes intentional quality issues for Silver layer processing demo\")\n", "print(\"Sample record:\", production_data[0])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Bronze Layer DataFrame Schema:\n", "root\n", " |-- cycle_time: double (nullable = true)\n", " |-- defect_count: long (nullable = true)\n", " |-- machine_id: string (nullable = true)\n", " |-- product_type: string (nullable = true)\n", " |-- production_date: timestamp (nullable = true)\n", " |-- production_line: string (nullable = true)\n", " |-- source_system: string (nullable = true)\n", " |-- units_produced: long (nullable = true)\n", "\n", "\n", "Sample Bronze Data:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+------------+----------+----------------+-------------------+---------------+-------------------+--------------+\n", "|cycle_time|defect_count|machine_id| product_type| production_date|production_line| source_system|units_produced|\n", "+----------+------------+----------+----------------+-------------------+---------------+-------------------+--------------+\n", "| 10.78| 18| MCH0001|Automotive Parts|2024-08-09 07:00:00| LINE_B|manufacturing_scada| 242|\n", "| NULL| 18| MCH0001| Consumer Goods|2024-10-01 06:00:00| LINE_A|manufacturing_scada| 830|\n", "| 1.57| 36| MCH0001| Consumer Goods|2024-07-23 11:00:00| LINE_B|manufacturing_scada| 961|\n", "| 2.44| 32| MCH0001| Consumer Goods|2024-06-20 13:00:00| LINE_A|manufacturing_scada| 763|\n", "| 9.89| 12| MCH0001|Automotive Parts|2024-05-07 16:00:00| LINE_B|manufacturing_scada| 217|\n", "+----------+------------+----------+----------------+-------------------+---------------+-------------------+--------------+\n", "only showing top 5 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Successfully ingested 12137 raw records into Bronze layer\n", "Liquid clustering automatically optimized the raw data layout during ingestion!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Insert raw data into Bronze layer using PySpark\n", "\n", "# Create DataFrame from generated data\n", "df_bronze = spark.createDataFrame(production_data)\n", "\n", "# Display schema and sample data\n", "print(\"Bronze Layer DataFrame Schema:\")\n", "df_bronze.printSchema()\n", "\n", "print(\"\\nSample Bronze Data:\")\n", "df_bronze.show(5)\n", "\n", "# Insert data into Delta table with liquid clustering\n", "df_bronze.write.mode(\"overwrite\").saveAsTable(\"manufacturing.bronze.production_records_bronze\")\n", "\n", "print(f\"\\nSuccessfully ingested {df_bronze.count()} raw records into Bronze layer\")\n", "print(\"Liquid clustering automatically optimized the raw data layout during ingestion!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Layer 2: Silver - Data Cleaning and Standardization\n", "\n", "### Silver Layer Purpose\n", "\n", "The silver layer transforms raw bronze data into clean, standardized, and enriched datasets:\n", "\n", "- **Data quality**: Remove duplicates, handle nulls, validate ranges\n", "- **Standardization**: Consistent formats, units, and naming\n", "- **Enrichment**: Add derived fields and business logic\n", "- **Optimization**: Clustering for analytical queries\n", "\n", "### Transformations Applied\n", "\n", "- Remove records with critical null values\n", "- Fix negative defect counts\n", "- Add calculated quality metrics\n", "- Standardize data types and formats\n", "- Add data quality flags" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Silver layer Delta table with liquid clustering created successfully!\n", "Clustering optimizes for machine-specific and time-based analytical queries.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create Silver layer Delta table with liquid clustering\n", "\n", "# Clustering optimized for analytical queries by machine and time\n", "\n", "spark.sql(\"\"\"\n", "\n", "CREATE TABLE IF NOT EXISTS manufacturing.silver.production_records_silver (\n", "\n", " machine_id STRING,\n", "\n", " production_date TIMESTAMP,\n", "\n", " product_type STRING,\n", "\n", " units_produced INT,\n", "\n", " defect_count INT,\n", "\n", " production_line STRING,\n", "\n", " cycle_time DECIMAL(5,2),\n", "\n", " defect_rate DECIMAL(5,2),\n", "\n", " hourly_production_rate DECIMAL(8,2),\n", "\n", " quality_grade STRING,\n", "\n", " processing_timestamp TIMESTAMP,\n", "\n", " data_quality_score INT\n", "\n", ")\n", "\n", "USING DELTA\n", "\n", "CLUSTER BY (machine_id, production_date)\n", "\n", "\"\"\")\n", "\n", "print(\"Silver layer Delta table with liquid clustering created successfully!\")\n", "print(\"Clustering optimizes for machine-specific and time-based analytical queries.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Silver layer transformation completed: 12137 clean records\n", "\n", "Data quality improvements applied:\n", "- Removed records with null machine_id or production_date\n", "- Fixed negative defect counts\n", "- Added calculated quality metrics\n", "- Added data quality grading and scoring\n", "\n", "Silver Layer Sample Data:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+-------------------+----------------+--------------+------------+---------------+----------+-----------+----------------------+-------------+--------------------+------------------+\n", "|machine_id| production_date| product_type|units_produced|defect_count|production_line|cycle_time|defect_rate|hourly_production_rate|quality_grade|processing_timestamp|data_quality_score|\n", "+----------+-------------------+----------------+--------------+------------+---------------+----------+-----------+----------------------+-------------+--------------------+------------------+\n", "| MCH0001|2024-08-09 07:00:00|Automotive Parts| 242| 18| LINE_B| 10.78| 7.44| 1346.94| Fair|2025-12-19 23:26:...| 100|\n", "| MCH0001|2024-10-01 06:00:00| Consumer Goods| 830| 18| LINE_A| 1.0| 2.17| 49800.0| Good|2025-12-19 23:26:...| 100|\n", "| MCH0001|2024-07-23 11:00:00| Consumer Goods| 961| 36| LINE_B| 1.57| 3.75| 36726.11| Good|2025-12-19 23:26:...| 100|\n", "| MCH0001|2024-06-20 13:00:00| Consumer Goods| 763| 32| LINE_A| 2.44| 4.19| 18762.3| Good|2025-12-19 23:26:...| 100|\n", "| MCH0001|2024-05-07 16:00:00|Automotive Parts| 217| 12| LINE_B| 9.89| 5.53| 1316.48| Fair|2025-12-19 23:26:...| 100|\n", "+----------+-------------------+----------------+--------------+------------+---------------+----------+-----------+----------------------+-------------+--------------------+------------------+\n", "only showing top 5 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Transform Bronze data to Silver layer with data quality improvements\n", "\n", "from pyspark.sql.functions import col, when, round, expr, current_timestamp\n", "\n", "# Read from Bronze layer\n", "df_silver = spark.table(\"manufacturing.bronze.production_records_bronze\")\n", "\n", "# Apply data quality transformations\n", "df_silver = df_silver \\\n", " .filter(col(\"machine_id\").isNotNull()) \\\n", " .filter(col(\"production_date\").isNotNull()) \\\n", " .withColumn(\"units_produced\", when(col(\"units_produced\").isNull(), 0).otherwise(col(\"units_produced\"))) \\\n", " .withColumn(\"defect_count\", when(col(\"defect_count\").isNull(), 0).otherwise(col(\"defect_count\"))) \\\n", " .withColumn(\"defect_count\", when(col(\"defect_count\") < 0, 0).otherwise(col(\"defect_count\"))) \\\n", " .withColumn(\"cycle_time\", when(col(\"cycle_time\").isNull(), 1.0).otherwise(col(\"cycle_time\"))) \\\n", " .withColumn(\"cycle_time\", when(col(\"cycle_time\") <= 0, 1.0).otherwise(col(\"cycle_time\")))\n", "\n", "# Add calculated fields\n", "df_silver = df_silver \\\n", " .withColumn(\"defect_rate\", \n", " round(when(col(\"units_produced\") > 0, col(\"defect_count\") * 100.0 / col(\"units_produced\")).otherwise(0), 2)) \\\n", " .withColumn(\"hourly_production_rate\", \n", " round(when(col(\"cycle_time\") > 0, col(\"units_produced\") * 60.0 / col(\"cycle_time\")).otherwise(0), 2)) \\\n", " .withColumn(\"quality_grade\",\n", " when(col(\"defect_rate\") <= 2, \"Excellent\")\n", " .when(col(\"defect_rate\") <= 5, \"Good\")\n", " .when(col(\"defect_rate\") <= 10, \"Fair\")\n", " .otherwise(\"Poor\")) \\\n", " .withColumn(\"processing_timestamp\", current_timestamp()) \\\n", " .withColumn(\"data_quality_score\", \n", " when(col(\"units_produced\").isNotNull() & col(\"defect_count\").isNotNull() & col(\"cycle_time\").isNotNull(), 100)\n", " .when(col(\"units_produced\").isNotNull() | col(\"defect_count\").isNotNull(), 75)\n", " .otherwise(50))\n", "\n", "# Select final columns for Silver layer\n", "df_silver = df_silver.select(\n", " \"machine_id\", \"production_date\", \"product_type\", \"units_produced\", \n", " \"defect_count\", \"production_line\", \"cycle_time\", \"defect_rate\",\n", " \"hourly_production_rate\", \"quality_grade\", \"processing_timestamp\", \"data_quality_score\"\n", ")\n", "\n", "print(f\"Silver layer transformation completed: {df_silver.count()} clean records\")\n", "print(\"\\nData quality improvements applied:\")\n", "print(\"- Removed records with null machine_id or production_date\")\n", "print(\"- Fixed negative defect counts\")\n", "print(\"- Added calculated quality metrics\")\n", "print(\"- Added data quality grading and scoring\")\n", "\n", "print(\"\\nSilver Layer Sample Data:\")\n", "df_silver.show(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Successfully saved cleaned and enriched data to Silver layer!\n", "Silver layer now contains standardized, quality-assured manufacturing data.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Save cleaned data to Silver layer\n", "\n", "df_silver.write.mode(\"overwrite\").saveAsTable(\"manufacturing.silver.production_records_silver\")\n", "\n", "print(\"Successfully saved cleaned and enriched data to Silver layer!\")\n", "print(\"Silver layer now contains standardized, quality-assured manufacturing data.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Layer 3: Gold - Business Analytics and Aggregations\n", "\n", "### Gold Layer Purpose\n", "\n", "The gold layer provides business-ready datasets optimized for analytics and reporting:\n", "\n", "- **Aggregations**: Pre-computed metrics and KPIs\n", "- **Business logic**: Domain-specific calculations and classifications\n", "- **Performance**: Optimized for dashboard and BI tool queries\n", "- **Governance**: Curated datasets with clear business definitions\n", "\n", "### Gold Tables Created\n", "\n", "1. **equipment_performance_gold**: Equipment-level KPIs and metrics\n", "2. **quality_analytics_gold**: Quality control and defect analysis\n", "3. **production_efficiency_gold**: Production line and efficiency metrics" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Gold layer tables created successfully!\n", "Three business-ready tables created for equipment performance, quality analytics, and production efficiency.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create Gold layer tables for business analytics\n", "\n", "# Equipment Performance Gold Table\n", "spark.sql(\"\"\"\n", "\n", "CREATE TABLE IF NOT EXISTS manufacturing.gold.equipment_performance_gold (\n", "\n", " machine_id STRING,\n", "\n", " total_production_runs INT,\n", "\n", " total_units_produced BIGINT,\n", "\n", " avg_daily_production DECIMAL(8,2),\n", "\n", " avg_defect_rate DECIMAL(5,2),\n", "\n", " avg_hourly_rate DECIMAL(8,2),\n", "\n", " performance_category STRING,\n", "\n", " reliability_score DECIMAL(5,2),\n", "\n", " last_production_date TIMESTAMP,\n", "\n", " days_since_last_run INT\n", "\n", ")\n", "\n", "USING DELTA\n", "\n", "CLUSTER BY (performance_category, machine_id)\n", "\n", "\"\"\")\n", "\n", "# Quality Analytics Gold Table\n", "spark.sql(\"\"\"\n", "\n", "CREATE TABLE IF NOT EXISTS manufacturing.gold.quality_analytics_gold (\n", "\n", " product_type STRING,\n", "\n", " production_line STRING,\n", "\n", " month_year STRING,\n", "\n", " total_production_runs INT,\n", "\n", " total_units BIGINT,\n", "\n", " total_defects BIGINT,\n", "\n", " avg_defect_rate DECIMAL(5,2),\n", "\n", " quality_trend STRING,\n", "\n", " defect_rate_percentile DECIMAL(5,2)\n", "\n", ")\n", "\n", "USING DELTA\n", "\n", "CLUSTER BY (product_type, month_year)\n", "\n", "\"\"\")\n", "\n", "# Production Efficiency Gold Table\n", "spark.sql(\"\"\"\n", "\n", "CREATE TABLE IF NOT EXISTS manufacturing.gold.production_efficiency_gold (\n", "\n", " production_line STRING,\n", "\n", " month_year STRING,\n", "\n", " active_machines INT,\n", "\n", " total_production_runs INT,\n", "\n", " total_units_produced BIGINT,\n", "\n", " avg_production_efficiency DECIMAL(5,2),\n", "\n", " line_utilization_rate DECIMAL(5,2),\n", "\n", " bottleneck_indicator STRING\n", "\n", ")\n", "\n", "USING DELTA\n", "\n", "CLUSTER BY (production_line, month_year)\n", "\n", "\"\"\")\n", "\n", "print(\"Gold layer tables created successfully!\")\n", "print(\"Three business-ready tables created for equipment performance, quality analytics, and production efficiency.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Equipment Performance Gold table populated successfully!\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Contains performance metrics for 200 machines.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Populate Equipment Performance Gold Table\n", "\n", "from pyspark.sql.functions import datediff, current_date, count, sum, avg, round, max, percentile_approx\n", "\n", "equipment_gold = spark.sql(\"\"\"\n", "\n", "SELECT \n", "\n", " machine_id,\n", "\n", " COUNT(*) as total_production_runs,\n", "\n", " SUM(units_produced) as total_units_produced,\n", "\n", " ROUND(AVG(units_produced), 2) as avg_daily_production,\n", "\n", " ROUND(AVG(defect_rate), 2) as avg_defect_rate,\n", "\n", " ROUND(AVG(hourly_production_rate), 2) as avg_hourly_rate,\n", "\n", " MAX(production_date) as last_production_date,\n", "\n", " CASE \n", "\n", " WHEN AVG(defect_rate) <= 3 AND AVG(hourly_production_rate) >= 500 THEN 'High Performer'\n", "\n", " WHEN AVG(defect_rate) <= 7 AND AVG(hourly_production_rate) >= 300 THEN 'Good Performer'\n", "\n", " WHEN AVG(defect_rate) <= 12 THEN 'Needs Attention'\n", "\n", " ELSE 'Critical'\n", "\n", " END as performance_category,\n", "\n", " ROUND(100 - AVG(defect_rate), 2) as reliability_score\n", "\n", "FROM manufacturing.silver.production_records_silver\n", "\n", "GROUP BY machine_id\n", "\n", "\"\"\")\n", "\n", "# Add days since last run\n", "equipment_gold = equipment_gold.withColumn(\n", " \"days_since_last_run\", \n", " datediff(current_date(), col(\"last_production_date\"))\n", ")\n", "\n", "equipment_gold.write.mode(\"overwrite\").saveAsTable(\"manufacturing.gold.equipment_performance_gold\")\n", "\n", "print(\"Equipment Performance Gold table populated successfully!\")\n", "print(f\"Contains performance metrics for {equipment_gold.count()} machines.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Quality Analytics Gold table populated successfully!\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Contains quality metrics for 240 product-line-month combinations.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Populate Quality Analytics Gold Table\n", "\n", "quality_gold = spark.sql(\"\"\"\n", "\n", "WITH monthly_quality AS (\n", "\n", " SELECT \n", "\n", " product_type,\n", "\n", " production_line,\n", "\n", " DATE_FORMAT(production_date, 'yyyy-MM') as month_year,\n", "\n", " COUNT(*) as total_production_runs,\n", "\n", " SUM(units_produced) as total_units,\n", "\n", " SUM(defect_count) as total_defects,\n", "\n", " ROUND(AVG(defect_rate), 2) as avg_defect_rate,\n", "\n", " LAG(AVG(defect_rate)) OVER (PARTITION BY product_type, production_line ORDER BY DATE_FORMAT(production_date, 'yyyy-MM')) as prev_month_rate\n", "\n", " FROM manufacturing.silver.production_records_silver\n", "\n", " GROUP BY product_type, production_line, DATE_FORMAT(production_date, 'yyyy-MM')\n", "\n", "),\n", "\n", "overall_stats AS (\n", "\n", " SELECT \n", "\n", " PERCENTILE_APPROX(avg_defect_rate, 0.5) as median_defect_rate\n", "\n", " FROM monthly_quality\n", "\n", ")\n", "\n", "SELECT \n", "\n", " mq.product_type,\n", "\n", " mq.production_line,\n", "\n", " mq.month_year,\n", "\n", " mq.total_production_runs,\n", "\n", " mq.total_units,\n", "\n", " mq.total_defects,\n", "\n", " mq.avg_defect_rate,\n", "\n", " CASE \n", "\n", " WHEN mq.prev_month_rate IS NULL THEN 'New'\n", "\n", " WHEN mq.avg_defect_rate < mq.prev_month_rate * 0.9 THEN 'Improving'\n", "\n", " WHEN mq.avg_defect_rate > mq.prev_month_rate * 1.1 THEN 'Declining'\n", "\n", " ELSE 'Stable'\n", "\n", " END as quality_trend,\n", "\n", " ROUND(\n", "\n", " PERCENT_RANK() OVER (ORDER BY mq.avg_defect_rate) * 100, 2\n", "\n", " ) as defect_rate_percentile\n", "\n", "FROM monthly_quality mq\n", "\n", "ORDER BY product_type, production_line, month_year\n", "\n", "\"\"\")\n", "\n", "quality_gold.write.mode(\"overwrite\").saveAsTable(\"manufacturing.gold.quality_analytics_gold\")\n", "\n", "print(\"Quality Analytics Gold table populated successfully!\")\n", "print(f\"Contains quality metrics for {quality_gold.count()} product-line-month combinations.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Production Efficiency Gold table populated successfully!\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Contains efficiency metrics for 60 production line-month combinations.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Populate Production Efficiency Gold Table\n", "\n", "efficiency_gold = spark.sql(\"\"\"\n", "\n", "SELECT \n", "\n", " production_line,\n", "\n", " DATE_FORMAT(production_date, 'yyyy-MM') as month_year,\n", "\n", " COUNT(DISTINCT machine_id) as active_machines,\n", "\n", " COUNT(*) as total_production_runs,\n", "\n", " SUM(units_produced) as total_units_produced,\n", "\n", " ROUND(AVG(hourly_production_rate), 2) as avg_production_efficiency,\n", "\n", " ROUND(\n", "\n", " COUNT(*) * 100.0 / \n", "\n", " (COUNT(DISTINCT machine_id) * 30 * 24), 2 -- Assuming 30 days, 24 hours potential\n", "\n", " ) as line_utilization_rate,\n", "\n", " CASE \n", "\n", " WHEN AVG(hourly_production_rate) < 200 THEN 'Severe Bottleneck'\n", "\n", " WHEN AVG(hourly_production_rate) < 400 THEN 'Minor Bottleneck'\n", "\n", " WHEN AVG(hourly_production_rate) > 800 THEN 'Overutilized'\n", "\n", " ELSE 'Optimal'\n", "\n", " END as bottleneck_indicator\n", "\n", "FROM manufacturing.silver.production_records_silver\n", "\n", "GROUP BY production_line, DATE_FORMAT(production_date, 'yyyy-MM')\n", "\n", "ORDER BY production_line, month_year\n", "\n", "\"\"\")\n", "\n", "efficiency_gold.write.mode(\"overwrite\").saveAsTable(\"manufacturing.gold.production_efficiency_gold\")\n", "\n", "print(\"Production Efficiency Gold table populated successfully!\")\n", "print(f\"Contains efficiency metrics for {efficiency_gold.count()} production line-month combinations.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Medallion Architecture Demonstration\n", "\n", "### Query Performance Across Layers\n", "\n", "Let's demonstrate how the medallion architecture serves different analytical needs with optimized queries at each layer." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== BRONZE LAYER: Raw Data Access ===\n", "Purpose: Historical audit trail and raw data exploration\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+-------------------+--------------------+--------------+------------+-------------------+\n", "|machine_id| production_date| product_type|units_produced|defect_count| source_system|\n", "+----------+-------------------+--------------------+--------------+------------+-------------------+\n", "| MCH0001|2024-12-27 13:00:00|Industrial Equipment| 61| 4|manufacturing_scada|\n", "| MCH0001|2024-12-26 09:00:00|Industrial Equipment| 39| 2|manufacturing_scada|\n", "| MCH0001|2024-12-16 14:00:00| Consumer Goods| 1038| 16|manufacturing_scada|\n", "| MCH0001|2024-12-10 10:00:00| Consumer Goods| 853| 28|manufacturing_scada|\n", "| MCH0001|2024-12-09 11:00:00| Electronics| 590| 5|manufacturing_scada|\n", "+----------+-------------------+--------------------+--------------+------------+-------------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-------------+----------+----------------+\n", "|total_records|null_units|negative_defects|\n", "+-------------+----------+----------------+\n", "| 12137| 317| 247|\n", "+-------------+----------+----------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Demonstrate Bronze layer: Raw data access\n", "\n", "print(\"=== BRONZE LAYER: Raw Data Access ===\")\n", "print(\"Purpose: Historical audit trail and raw data exploration\")\n", "\n", "bronze_sample = spark.sql(\"\"\"\n", "\n", "SELECT machine_id, production_date, product_type, units_produced, defect_count, source_system\n", "\n", "FROM manufacturing.bronze.production_records_bronze\n", "\n", "WHERE machine_id = 'MCH0001'\n", "\n", "ORDER BY production_date DESC\n", "\n", "LIMIT 5\n", "\n", "\"\"\")\n", "\n", "bronze_sample.show()\n", "\n", "bronze_stats = spark.sql(\"\"\"\n", "\n", "SELECT \n", "\n", " COUNT(*) as total_records,\n", "\n", " COUNT(CASE WHEN units_produced IS NULL THEN 1 END) as null_units,\n", "\n", " COUNT(CASE WHEN defect_count < 0 THEN 1 END) as negative_defects\n", "\n", "FROM manufacturing.bronze.production_records_bronze\n", "\n", "\"\"\")\n", "\n", "bronze_stats.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\n", "=== SILVER LAYER: Clean Analytical Data ===\n", "Purpose: Standardized data for detailed analysis and ML features\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+-------------------+--------------------+--------------+-----------+----------------------+-------------+------------------+\n", "|machine_id| production_date| product_type|units_produced|defect_rate|hourly_production_rate|quality_grade|data_quality_score|\n", "+----------+-------------------+--------------------+--------------+-----------+----------------------+-------------+------------------+\n", "| MCH0001|2024-12-27 13:00:00|Industrial Equipment| 61| 6.56| 111.35| Fair| 100|\n", "| MCH0001|2024-12-26 09:00:00|Industrial Equipment| 39| 5.13| 80.03| Fair| 100|\n", "| MCH0001|2024-12-16 14:00:00| Consumer Goods| 1038| 1.54| 62280.0| Excellent| 100|\n", "| MCH0001|2024-12-10 10:00:00| Consumer Goods| 853| 3.28| 28915.25| Good| 100|\n", "| MCH0001|2024-12-09 11:00:00| Electronics| 590| 0.85| 11568.63| Excellent| 100|\n", "+----------+-------------------+--------------------+--------------+-----------+----------------------+-------------+------------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-------------+-----+-----------------+\n", "|quality_grade|count|avg_quality_score|\n", "+-------------+-----+-----------------+\n", "| Good| 5298| 100.0|\n", "| Fair| 3680| 100.0|\n", "| Excellent| 2028| 100.0|\n", "| Poor| 1131| 100.0|\n", "+-------------+-----+-----------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Demonstrate Silver layer: Clean analytical data\n", "\n", "print(\"\\n=== SILVER LAYER: Clean Analytical Data ===\")\n", "print(\"Purpose: Standardized data for detailed analysis and ML features\")\n", "\n", "silver_sample = spark.sql(\"\"\"\n", "\n", "SELECT machine_id, production_date, product_type, units_produced, defect_rate, \n", "\n", " hourly_production_rate, quality_grade, data_quality_score\n", "\n", "FROM manufacturing.silver.production_records_silver\n", "\n", "WHERE machine_id = 'MCH0001'\n", "\n", "ORDER BY production_date DESC\n", "\n", "LIMIT 5\n", "\n", "\"\"\")\n", "\n", "silver_sample.show()\n", "\n", "silver_quality = spark.sql(\"\"\"\n", "\n", "SELECT quality_grade, COUNT(*) as count, ROUND(AVG(data_quality_score), 2) as avg_quality_score\n", "\n", "FROM manufacturing.silver.production_records_silver\n", "\n", "GROUP BY quality_grade\n", "\n", "ORDER BY count DESC\n", "\n", "\"\"\")\n", "\n", "silver_quality.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\n", "=== GOLD LAYER: Business Intelligence ===\n", "Purpose: Pre-aggregated KPIs for dashboards and executive reporting\n", "\n", "Equipment Performance Summary:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+--------------------+-------------+---------------------+-------------+\n", "|performance_category|machine_count|avg_reliability_score|total_units_k|\n", "+--------------------+-------------+---------------------+-------------+\n", "| Good Performer| 200| 94.99| 4576.5|\n", "+--------------------+-------------+---------------------+-------------+\n", "\n", "\n", "Quality Analytics Summary:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+--------------------+-------------+------------+---------------+-------------+\n", "| product_type|quality_trend|months_count|avg_defect_rate|total_units_k|\n", "+--------------------+-------------+------------+---------------+-------------+\n", "| Automotive Parts| Declining| 7| 6.06| 66.2|\n", "| Automotive Parts| Improving| 4| 5.27| 42.1|\n", "| Automotive Parts| New| 5| 5.59| 53.8|\n", "| Automotive Parts| Stable| 44| 5.77| 439.6|\n", "| Consumer Goods| Declining| 10| 3.81| 361.3|\n", "| Consumer Goods| Improving| 6| 3.31| 226.9|\n", "| Consumer Goods| New| 5| 3.64| 229.2|\n", "| Consumer Goods| Stable| 39| 3.5| 1526.6|\n", "| Electronics| Declining| 9| 2.37| 227.7|\n", "| Electronics| Improving| 8| 2.05| 161.0|\n", "| Electronics| New| 5| 2.32| 126.0|\n", "| Electronics| Stable| 38| 2.32| 971.8|\n", "|Industrial Equipment| Declining| 9| 8.96| 20.7|\n", "|Industrial Equipment| Improving| 4| 7.68| 9.1|\n", "|Industrial Equipment| New| 5| 8.26| 11.7|\n", "|Industrial Equipment| Stable| 42| 8.51| 102.9|\n", "+--------------------+-------------+------------+---------------+-------------+\n", "\n", "\n", "Production Efficiency Summary:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+---------------+--------------------+------------+--------------------+-------------+\n", "|production_line|bottleneck_indicator|months_count|avg_utilization_rate|total_units_k|\n", "+---------------+--------------------+------------+--------------------+-------------+\n", "| LINE_A| Overutilized| 12| 0.23| 950.0|\n", "| LINE_B| Overutilized| 12| 0.23| 895.5|\n", "| LINE_C| Overutilized| 12| 0.23| 898.2|\n", "| LINE_D| Overutilized| 12| 0.22| 921.8|\n", "| LINE_E| Overutilized| 12| 0.22| 911.0|\n", "+---------------+--------------------+------------+--------------------+-------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Demonstrate Gold layer: Business intelligence\n", "\n", "print(\"\\n=== GOLD LAYER: Business Intelligence ===\")\n", "print(\"Purpose: Pre-aggregated KPIs for dashboards and executive reporting\")\n", "\n", "# Equipment Performance Dashboard\n", "print(\"\\nEquipment Performance Summary:\")\n", "equipment_summary = spark.sql(\"\"\"\n", "\n", "SELECT performance_category, COUNT(*) as machine_count, \n", "\n", " ROUND(AVG(reliability_score), 2) as avg_reliability_score,\n", "\n", " ROUND(SUM(total_units_produced)/1000, 1) as total_units_k\n", "\n", "FROM manufacturing.gold.equipment_performance_gold\n", "\n", "GROUP BY performance_category\n", "\n", "ORDER BY machine_count DESC\n", "\n", "\"\"\")\n", "\n", "equipment_summary.show()\n", "\n", "# Quality Analytics Dashboard\n", "print(\"\\nQuality Analytics Summary:\")\n", "quality_summary = spark.sql(\"\"\"\n", "\n", "SELECT product_type, quality_trend, COUNT(*) as months_count,\n", "\n", " ROUND(AVG(avg_defect_rate), 2) as avg_defect_rate,\n", "\n", " ROUND(SUM(total_units)/1000, 1) as total_units_k\n", "\n", "FROM manufacturing.gold.quality_analytics_gold\n", "\n", "GROUP BY product_type, quality_trend\n", "\n", "ORDER BY product_type, quality_trend\n", "\n", "\"\"\")\n", "\n", "quality_summary.show()\n", "\n", "# Production Efficiency Dashboard\n", "print(\"\\nProduction Efficiency Summary:\")\n", "efficiency_summary = spark.sql(\"\"\"\n", "\n", "SELECT production_line, bottleneck_indicator, COUNT(*) as months_count,\n", "\n", " ROUND(AVG(line_utilization_rate), 2) as avg_utilization_rate,\n", "\n", " ROUND(SUM(total_units_produced)/1000, 1) as total_units_k\n", "\n", "FROM manufacturing.gold.production_efficiency_gold\n", "\n", "GROUP BY production_line, bottleneck_indicator\n", "\n", "ORDER BY production_line, bottleneck_indicator\n", "\n", "\"\"\")\n", "\n", "efficiency_summary.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Machine Learning: Predictive Maintenance with Silver Layer Data\n", "\n", "### ML Integration in Medallion Architecture\n", "\n", "The Silver layer's cleaned and enriched data provides the perfect foundation for machine learning models. We'll demonstrate predictive maintenance using equipment health indicators derived from production data.\n", "\n", "### Business Value of Predictive Maintenance\n", "\n", "- **Reduce downtime** through proactive equipment maintenance\n", "- **Optimize maintenance costs** by scheduling based on actual condition\n", "- **Improve asset utilization** and production reliability\n", "- **Enhance quality control** by preventing equipment-related defects\n", "\n", "### Model Approach\n", "\n", "We'll build a classification model to predict equipment maintenance risk levels using rolling statistics and production metrics from the Silver layer." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Feature Engineering for Predictive Maintenance ===\n", "Using Silver layer data for ML model training\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Dataset prepared with 12137 records for maintenance prediction\n", "Risk distribution:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------------+-----+\n", "|maintenance_risk|count|\n", "+----------------+-----+\n", "| High| 26|\n", "| Low| 171|\n", "| Medium|11940|\n", "+----------------+-----+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Feature Engineering for Predictive Maintenance using Silver layer data\n", "\n", "from pyspark.sql.functions import col, lag, avg, stddev, count, window\n", "from pyspark.sql.window import Window\n", "from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder\n", "from pyspark.ml.classification import RandomForestClassifier\n", "from pyspark.ml.evaluation import MulticlassClassificationEvaluator\n", "from pyspark.ml import Pipeline\n", "\n", "print(\"=== Feature Engineering for Predictive Maintenance ===\")\n", "print(\"Using Silver layer data for ML model training\")\n", "\n", "# Read from Silver layer\n", "df_ml = spark.table(\"manufacturing.silver.production_records_silver\")\n", "\n", "# Add rolling statistics (7-day windows for equipment health indicators)\n", "window_spec_7d = Window.partitionBy(\"machine_id\").orderBy(\"production_date\").rowsBetween(-7, 0)\n", "\n", "df_health = df_ml.withColumn(\"rolling_avg_defect_rate\", avg(\"defect_rate\").over(window_spec_7d)) \\\n", " .withColumn(\"rolling_std_defect_rate\", stddev(\"defect_rate\").over(window_spec_7d)) \\\n", " .withColumn(\"rolling_avg_hourly_rate\", avg(\"hourly_production_rate\").over(window_spec_7d)) \\\n", " .withColumn(\"rolling_std_hourly_rate\", stddev(\"hourly_production_rate\").over(window_spec_7d)) \\\n", " .withColumn(\"production_count_7d\", count(\"*\").over(window_spec_7d))\n", "\n", "# Create maintenance risk labels based on health indicators\n", "from pyspark.sql.functions import when\n", "df_health = df_health.withColumn(\"maintenance_risk\", \n", " when((col(\"rolling_avg_defect_rate\") > 8.0) & (col(\"rolling_std_defect_rate\") > 3.0) & \n", " (col(\"rolling_avg_hourly_rate\") < 1000), \"High\")\n", " .when((col(\"rolling_avg_defect_rate\") > 5.0) | (col(\"rolling_std_hourly_rate\") > 2000), \"Medium\")\n", " .otherwise(\"Low\")\n", ")\n", "\n", "print(f\"Dataset prepared with {df_health.count()} records for maintenance prediction\")\n", "print(\"Risk distribution:\")\n", "df_health.groupBy(\"maintenance_risk\").count().show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\n", "=== Data Preparation for ML ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Training set: 8327 records\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Testing set: 3410 records\n", "ML pipeline configured for maintenance risk classification using Silver layer features\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Prepare data for ML training\n", "\n", "print(\"\\n=== Data Preparation for ML ===\")\n", "\n", "# Filter out records with insufficient history for reliable predictions\n", "df_ml_ready = df_health.filter(\"production_count_7d >= 3\")\n", "\n", "# Split data (70/30 split for training/testing)\n", "train_data, test_data = df_ml_ready.randomSplit([0.7, 0.3], seed=42)\n", "\n", "print(f\"Training set: {train_data.count()} records\")\n", "print(f\"Testing set: {test_data.count()} records\")\n", "\n", "# Encode categorical features\n", "product_indexer = StringIndexer(inputCol=\"product_type\", outputCol=\"product_type_index\")\n", "line_indexer = StringIndexer(inputCol=\"production_line\", outputCol=\"production_line_index\")\n", "risk_indexer = StringIndexer(inputCol=\"maintenance_risk\", outputCol=\"label\")\n", "\n", "product_encoder = OneHotEncoder(inputCol=\"product_type_index\", outputCol=\"product_type_vec\")\n", "line_encoder = OneHotEncoder(inputCol=\"production_line_index\", outputCol=\"production_line_vec\")\n", "\n", "# Assemble feature vector from Silver layer metrics\n", "feature_cols = [\n", " \"defect_rate\", \"hourly_production_rate\", \"rolling_avg_defect_rate\", \"rolling_std_defect_rate\",\n", " \"rolling_avg_hourly_rate\", \"rolling_std_hourly_rate\", \"production_count_7d\",\n", " \"product_type_vec\", \"production_line_vec\"\n", "]\n", "\n", "assembler = VectorAssembler(inputCols=feature_cols, outputCol=\"features\")\n", "\n", "# Define Random Forest Classifier for maintenance risk prediction\n", "rf = RandomForestClassifier(\n", " featuresCol=\"features\",\n", " labelCol=\"label\",\n", " numTrees=100,\n", " maxDepth=8,\n", " seed=42\n", ")\n", "\n", "# Create ML pipeline\n", "pipeline = Pipeline(stages=[\n", " product_indexer, line_indexer, risk_indexer,\n", " product_encoder, line_encoder,\n", " assembler, rf\n", "])\n", "\n", "print(\"ML pipeline configured for maintenance risk classification using Silver layer features\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\n", "=== Model Training ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Maintenance prediction model training completed using Silver layer data\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Predictions generated for 3410 test records\n", "Sample predictions:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+-------------------+----------------+----------+\n", "|machine_id| production_date|maintenance_risk|prediction|\n", "+----------+-------------------+----------------+----------+\n", "| MCH0001|2024-01-15 13:00:00| Medium| 0.0|\n", "| MCH0001|2024-02-06 18:00:00| Medium| 0.0|\n", "| MCH0001|2024-03-04 12:00:00| Medium| 0.0|\n", "| MCH0001|2024-03-12 08:00:00| Medium| 0.0|\n", "| MCH0001|2024-03-19 16:00:00| Medium| 0.0|\n", "| MCH0001|2024-03-27 10:00:00| Medium| 0.0|\n", "| MCH0001|2024-03-27 13:00:00| Medium| 0.0|\n", "| MCH0001|2024-04-08 17:00:00| Medium| 0.0|\n", "| MCH0001|2024-04-12 12:00:00| Medium| 0.0|\n", "| MCH0001|2024-04-16 16:00:00| Medium| 0.0|\n", "+----------+-------------------+----------------+----------+\n", "only showing top 10 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Train the predictive maintenance model\n", "\n", "print(\"\\n=== Model Training ===\")\n", "\n", "# Fit the pipeline on training data\n", "model = pipeline.fit(train_data)\n", "\n", "print(\"Maintenance prediction model training completed using Silver layer data\")\n", "\n", "# Make predictions on test data\n", "predictions = model.transform(test_data)\n", "\n", "print(f\"Predictions generated for {predictions.count()} test records\")\n", "print(\"Sample predictions:\")\n", "predictions.select(\"machine_id\", \"production_date\", \"maintenance_risk\", \"prediction\").show(10)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\n", "=== Model Evaluation ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Accuracy: 0.9974\n", "F1 Score: 0.9963\n", "\n", "Confusion Matrix:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------------+----------+-----+\n", "|maintenance_risk|prediction|count|\n", "+----------------+----------+-----+\n", "| High| 0.0| 6|\n", "| High| 1.0| 1|\n", "| Low| 0.0| 3|\n", "| Medium| 0.0| 3400|\n", "+----------------+----------+-----+\n", "\n", "\n", "=== Business Value Assessment ===\n", "Model accuracy of 99.7% enables:\n", "- Proactive maintenance scheduling\n", "- Reduced unplanned downtime\n", "- Optimized maintenance costs\n", "- Improved production reliability\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Evaluate model performance\n", "\n", "print(\"\\n=== Model Evaluation ===\")\n", "\n", "# Calculate evaluation metrics\n", "evaluator_accuracy = MulticlassClassificationEvaluator(\n", " labelCol=\"label\",\n", " predictionCol=\"prediction\",\n", " metricName=\"accuracy\"\n", ")\n", "\n", "evaluator_f1 = MulticlassClassificationEvaluator(\n", " labelCol=\"label\",\n", " predictionCol=\"prediction\",\n", " metricName=\"f1\"\n", ")\n", "\n", "accuracy = evaluator_accuracy.evaluate(predictions)\n", "f1 = evaluator_f1.evaluate(predictions)\n", "\n", "print(f\"Accuracy: {accuracy:.4f}\")\n", "print(f\"F1 Score: {f1:.4f}\")\n", "\n", "# Show confusion matrix\n", "print(\"\\nConfusion Matrix:\")\n", "predictions.groupBy(\"maintenance_risk\", \"prediction\").count().orderBy(\"maintenance_risk\", \"prediction\").show()\n", "\n", "# Business value assessment\n", "print(\"\\n=== Business Value Assessment ===\")\n", "print(f\"Model accuracy of {accuracy:.1%} enables:\")\n", "print(f\"- Proactive maintenance scheduling\")\n", "print(f\"- Reduced unplanned downtime\")\n", "print(f\"- Optimized maintenance costs\")\n", "print(f\"- Improved production reliability\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\n", "=== Maintenance Insights from ML Model ===\n", "Top 10 high-risk machines requiring immediate attention:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+-------------------+-----------------------+-----------------------+\n", "|machine_id| production_date|rolling_avg_defect_rate|rolling_avg_hourly_rate|\n", "+----------+-------------------+-----------------------+-----------------------+\n", "| MCH0007|2024-11-11 06:00:00| 9.401250000000001| 2783.59125|\n", "| MCH0014|2024-03-15 13:00:00| 9.291250000000002| 668.92875|\n", "| MCH0027|2024-01-19 08:00:00| 9.159999999999998| 2045.24|\n", "| MCH0082|2024-09-16 06:00:00| 9.113750000000001| 610.7524999999999|\n", "| MCH0097|2024-12-23 12:00:00| 9.07375| 1382.0325000000003|\n", "| MCH0177|2024-08-26 07:00:00| 9.0475| 1707.13375|\n", "| MCH0052|2024-08-19 11:00:00| 9.0375| 5644.615|\n", "| MCH0177|2024-08-26 13:00:00| 8.97875| 1553.9250000000002|\n", "| MCH0014|2024-03-13 10:00:00| 8.932500000000001| 555.5799999999999|\n", "| MCH0007|2024-10-25 12:00:00| 8.911249999999999| 4993.045|\n", "+----------+-------------------+-----------------------+-----------------------+\n", "\n", "\n", "Maintenance scheduling recommendations:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+--------------------------+\n", "|prediction|machines_needing_attention|\n", "+----------+--------------------------+\n", "| 0.0| 3409|\n", "| 1.0| 1|\n", "+----------+--------------------------+\n", "\n", "\n", "Top performing equipment (by efficiency) with ML risk assessment:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+------------------+-----------------+-----------------+--------------+\n", "|machine_id| avg_hourly_rate| avg_defect_rate|total_productions|avg_risk_score|\n", "+----------+------------------+-----------------+-----------------+--------------+\n", "| MCH0153|13458.653020833332|4.226322916666668| 8| 0.0|\n", "| MCH0130|12550.008083333332|4.684673076923077| 13| 0.0|\n", "| MCH0197| 12484.01575| 4.68025| 5| 0.0|\n", "| MCH0043|12440.022878787877|5.474015151515151| 11| 0.0|\n", "| MCH0187| 12378.54203125| 4.7940625| 8| 0.0|\n", "| MCH0195| 12242.75275|4.105357142857143| 10| 0.0|\n", "| MCH0058|12163.561197368419|5.200473684210526| 19| 0.0|\n", "| MCH0109|12156.647250000002|5.223433333333334| 15| 0.0|\n", "| MCH0181|12139.634579081636|5.593590561224488| 28| 0.0|\n", "| MCH0166|12090.145208333333|4.786458333333333| 6| 0.0|\n", "+----------+------------------+-----------------+-----------------+--------------+\n", "only showing top 10 rows\n", "\n", "\n", "=== ML Enhancement for Gold Layer ===\n", "The ML predictions could be added to the equipment_performance_gold table\n", "to provide predictive maintenance insights alongside historical performance data.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Analyze maintenance insights and recommendations\n", "\n", "print(\"\\n=== Maintenance Insights from ML Model ===\")\n", "\n", "# Identify high-risk machines requiring immediate attention\n", "high_risk_machines = predictions.filter(\"prediction = 0\") \\\n", " .select(\"machine_id\", \"production_date\", \"rolling_avg_defect_rate\", \"rolling_avg_hourly_rate\") \\\n", " .orderBy(\"rolling_avg_defect_rate\", ascending=False) \\\n", " .limit(10)\n", "\n", "print(\"Top 10 high-risk machines requiring immediate attention:\")\n", "high_risk_machines.show()\n", "\n", "# Maintenance scheduling recommendations by risk level\n", "maintenance_schedule = predictions.filter(\"prediction < 2\") \\\n", " .groupBy(\"prediction\") \\\n", " .agg(count(\"machine_id\").alias(\"machines_needing_attention\")) \\\n", " .orderBy(\"prediction\")\n", "\n", "print(\"\\nMaintenance scheduling recommendations:\")\n", "maintenance_schedule.show()\n", "\n", "# Equipment utilization analysis with ML insights\n", "equipment_ml_analysis = predictions.groupBy(\"machine_id\") \\\n", " .agg(\n", " avg(\"rolling_avg_hourly_rate\").alias(\"avg_hourly_rate\"),\n", " avg(\"rolling_avg_defect_rate\").alias(\"avg_defect_rate\"),\n", " count(\"*\").alias(\"total_productions\"),\n", " avg(\"prediction\").alias(\"avg_risk_score\")\n", " ) \\\n", " .orderBy(\"avg_hourly_rate\", ascending=False)\n", "\n", "print(\"\\nTop performing equipment (by efficiency) with ML risk assessment:\")\n", "equipment_ml_analysis.show(10)\n", "\n", "# Demonstrate how ML predictions could enhance Gold layer\n", "print(\"\\n=== ML Enhancement for Gold Layer ===\")\n", "print(\"The ML predictions could be added to the equipment_performance_gold table\")\n", "print(\"to provide predictive maintenance insights alongside historical performance data.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Key Takeaways: Medallion Architecture with Delta Liquid Clustering and ML\n", "\n", "### Architecture Benefits Demonstrated\n", "\n", "1. **Bronze Layer**: Preserves raw data integrity and provides audit trail\n", "2. **Silver Layer**: Ensures data quality and provides foundation for analytics and ML\n", "3. **Gold Layer**: Delivers fast, pre-aggregated analytics for business users\n", "4. **ML Layer**: Leverages Silver layer data for predictive maintenance and insights\n", "\n", "### Liquid Clustering Advantages\n", "\n", "- **Automatic optimization** at each layer for different query patterns\n", "- **Zero maintenance** data layout optimization\n", "- **Performance benefits** for both ingestion and analytical queries\n", "- **ML-ready data** through optimized Silver layer clustering\n", "\n", "### AIDP Integration Benefits\n", "\n", "- **Unified platform** for data engineering, analytics, and ML\n", "- **Seamless progression** from raw data to predictive insights\n", "- **Governance** through catalog and schema isolation\n", "- **Scalability** for large-scale manufacturing datasets and ML training\n", "\n", "### Manufacturing Analytics Value\n", "\n", "- **Equipment monitoring** with performance categorization\n", "- **Quality control** with trend analysis and defect tracking\n", "- **Production optimization** with bottleneck identification\n", "- **Predictive maintenance** with ML-based risk assessment\n", "- **Business intelligence** through pre-computed KPIs\n", "\n", "### Next Steps\n", "\n", "- Integrate with real-time SCADA systems for Bronze layer ingestion\n", "- Add ML predictions to Gold layer tables for unified analytics\n", "- Build dashboards combining Gold layer KPIs with ML predictions\n", "- Implement automated alerting based on ML risk scores\n", "- Deploy model for real-time equipment monitoring\n", "- Extend to multi-plant analytics with federated queries\n", "\n", "This notebook demonstrates how the medallion architecture with Delta Liquid Clustering and integrated ML enables comprehensive, predictive manufacturing analytics in Oracle AI Data Platform." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" } }, "nbformat": 4, "nbformat_minor": 4 }