{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Education: Medallion Architecture with Delta Liquid Clustering Demo\n", "\n", "\n", "\n", "## Overview\n", "\n", "\n", "\n", "This notebook demonstrates a complete **Medallion Architecture** implementation in Oracle AI Data Platform (AIDP) Workbench for education analytics. The medallion architecture provides a structured approach to data processing:\n", "\n", "- **Bronze Layer**: Raw data ingestion and storage\n", "- **Silver Layer**: Cleaned, validated, and enriched data\n", "- **Gold Layer**: Analytics-ready data with aggregations and ML insights\n", "\n", "We'll use **Delta Liquid Clustering** throughout to automatically optimize data layout for query performance without manual tuning.\n", "\n", "### What is Medallion Architecture?\n", "\n", "Medallion Architecture organizes data processing into layers that progressively refine data quality and structure:\n", "\n", "- **Bronze**: Raw, unprocessed data as received from sources\n", "- **Silver**: Cleaned, standardized, and enriched data with business rules applied\n", "- **Gold**: Aggregated, analytics-ready data optimized for business intelligence and ML\n", "\n", "### Use Case: Student Performance Analytics and Learning Management\n", "\n", "We'll build an end-to-end education analytics pipeline that:\n", "\n", "- Ingests raw student assessment data\n", "- Cleans and validates the data\n", "- Creates analytics-ready aggregations\n", "- Trains ML models for performance prediction\n", "\n", "### AIDP Environment Setup\n", "\n", "This notebook leverages the existing Spark session in your AIDP environment." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setup: Create Education Catalog and Schemas" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Education catalog with bronze, silver, and gold schemas created successfully!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create education catalog and layer-specific schemas\n", "# In AIDP, catalogs provide data isolation and governance\n", "\n", "spark.sql(\"CREATE CATALOG IF NOT EXISTS education\")\n", "\n", "# Bronze layer: Raw data\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS education.bronze\")\n", "\n", "# Silver layer: Cleaned and processed data\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS education.silver\")\n", "\n", "# Gold layer: Analytics and ML-ready data\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS education.gold\")\n", "\n", "print(\"Education catalog with bronze, silver, and gold schemas created successfully!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Bronze Layer: Raw Data Ingestion\n", "\n", "## Overview\n", "\n", "The Bronze layer stores raw data exactly as received from source systems, without any transformation. This preserves data integrity and allows for:\n", "\n", "- **Audit trails**: Complete historical record of all data received\n", "- **Reprocessing**: Ability to re-run transformations if business rules change\n", "- **Data lineage**: Clear traceability from source to final analytics\n", "\n", "## Bronze Table Design\n", "\n", "Our `bronze_student_assessments` table will store raw assessment data with:\n", "\n", "- Raw data fields as received from the Learning Management System (LMS)\n", "- Ingestion timestamps for data freshness tracking\n", "- Source system metadata\n", "- No data validation or cleansing at this layer" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Bronze layer table created successfully!\n", "Liquid clustering will optimize data layout for student_id and ingestion time queries.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create Bronze layer table for raw student assessment data\n", "# CLUSTER BY optimizes for ingestion patterns and audit queries\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS education.bronze.student_assessments_raw (\n", " -- Raw data fields as received from LMS\n", " raw_student_id STRING,\n", " raw_assessment_date STRING,\n", " raw_subject STRING,\n", " raw_score STRING,\n", " raw_grade_level STRING,\n", " raw_completion_time STRING,\n", " raw_engagement_score STRING,\n", " \n", " -- Ingestion metadata\n", " ingestion_timestamp TIMESTAMP,\n", " source_system STRING,\n", " batch_id STRING,\n", " \n", " -- Raw JSON payload for full fidelity\n", " raw_payload STRING\n", ")\n", "USING DELTA\n", "CLUSTER BY (raw_student_id, ingestion_timestamp)\n", "\"\"\")\n", "\n", "print(\"Bronze layer table created successfully!\")\n", "print(\"Liquid clustering will optimize data layout for student_id and ingestion time queries.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Generated 67535 raw student assessment records\n", "Sample raw record (with potential data quality issues): {'raw_student_id': 'STU000001', 'raw_assessment_date': '2024-04-05', 'raw_subject': 'English', 'raw_score': '187.20373006255488', 'raw_grade_level': '3rd Grade', 'raw_completion_time': '89.16', 'raw_engagement_score': '90', 'ingestion_timestamp': datetime.datetime(2025, 12, 20, 0, 48, 32, 590515), 'source_system': 'LMS_System_A', 'batch_id': 'BATCH_20251220_004832', 'raw_payload': '{\"student_id\": \"STU000001\", \"assessment_date\": \"2024-04-05\", \"subject\": \"English\", \"score\": \"187.20373006255488\", \"grade_level\": \"3rd Grade\", \"completion_time\": \"89.16\", \"engagement_score\": \"90\", \"source_metadata\": {\"export_timestamp\": \"2025-12-20T00:48:32.590495\", \"data_quality_score\": 0.7309072167760401}}'}\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Generate realistic raw education data (simulating LMS export)\n", "# This represents data as it might come from various source systems\n", "\n", "import random\n", "import json\n", "from datetime import datetime, timedelta\n", "\n", "# Define education data constants\n", "SUBJECTS = ['Math', 'English', 'Science', 'History', 'Art', 'Physical Education']\n", "GRADE_LEVELS = ['Kindergarten', '1st Grade', '2nd Grade', '3rd Grade', '4th Grade', '5th Grade', \n", " '6th Grade', '7th Grade', '8th Grade', '9th Grade', '10th Grade', '11th Grade', '12th Grade']\n", "\n", "# Simulate different source systems with varying data quality\n", "SOURCE_SYSTEMS = ['LMS_System_A', 'Assessment_Platform_B', 'School_Database_C']\n", "\n", "# Generate raw assessment records (some with data quality issues)\n", "raw_assessment_data = []\n", "base_date = datetime(2024, 1, 1)\n", "batch_id = f\"BATCH_{datetime.now().strftime('%Y%m%d_%H%M%S')}\"\n", "\n", "# Create 3,000 students with 15-30 assessments each\n", "for student_num in range(1, 3001):\n", " student_id = f\"STU{student_num:06d}\"\n", " \n", " # Assign grade level\n", " grade_level = random.choice(GRADE_LEVELS)\n", " \n", " # Each student gets 15-30 assessments over 12 months\n", " num_assessments = random.randint(15, 30)\n", " \n", " for i in range(num_assessments):\n", " # Spread assessments over 12 months\n", " days_offset = random.randint(0, 365)\n", " assessment_date = base_date + timedelta(days=days_offset)\n", " \n", " # Select subject\n", " subject = random.choice(SUBJECTS)\n", " \n", " # Generate realistic scores with some data quality issues\n", " base_score = random.uniform(50, 100)\n", " \n", " # Simulate data quality issues (10% of records)\n", " has_quality_issue = random.random() < 0.1\n", " \n", " if has_quality_issue:\n", " # Introduce various data quality problems\n", " quality_issues = [\n", " lambda: str(random.choice(['', 'NULL', 'N/A'])), # Missing values\n", " lambda: str(random.uniform(150, 200)), # Out of range scores\n", " lambda: f\"{random.randint(1,12)}th Grade\", # Inconsistent grade format\n", " lambda: str(random.uniform(-10, 50)), # Negative or invalid scores\n", " ]\n", " score = random.choice(quality_issues)()\n", " else:\n", " score = str(round(base_score, 2))\n", " \n", " # Generate other fields with occasional quality issues\n", " completion_time = str(round(random.uniform(20, 120), 2)) if random.random() > 0.05 else ''\n", " engagement_score = str(random.randint(0, 100)) if random.random() > 0.05 else 'INVALID'\n", " \n", " # Select source system\n", " source_system = random.choice(SOURCE_SYSTEMS)\n", " \n", " # Create raw JSON payload (simulating source system export)\n", " raw_payload = json.dumps({\n", " \"student_id\": student_id,\n", " \"assessment_date\": assessment_date.strftime('%Y-%m-%d'),\n", " \"subject\": subject,\n", " \"score\": score,\n", " \"grade_level\": grade_level,\n", " \"completion_time\": completion_time,\n", " \"engagement_score\": engagement_score,\n", " \"source_metadata\": {\n", " \"export_timestamp\": datetime.now().isoformat(),\n", " \"data_quality_score\": random.uniform(0.7, 1.0)\n", " }\n", " })\n", " \n", " raw_assessment_data.append({\n", " \"raw_student_id\": student_id,\n", " \"raw_assessment_date\": assessment_date.strftime('%Y-%m-%d'),\n", " \"raw_subject\": subject,\n", " \"raw_score\": score,\n", " \"raw_grade_level\": grade_level,\n", " \"raw_completion_time\": completion_time,\n", " \"raw_engagement_score\": engagement_score,\n", " \"ingestion_timestamp\": datetime.now(),\n", " \"source_system\": source_system,\n", " \"batch_id\": batch_id,\n", " \"raw_payload\": raw_payload\n", " })\n", "\n", "print(f\"Generated {len(raw_assessment_data)} raw student assessment records\")\n", "print(\"Sample raw record (with potential data quality issues):\", raw_assessment_data[0])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Bronze Layer DataFrame Schema:\n", "root\n", " |-- batch_id: string (nullable = true)\n", " |-- ingestion_timestamp: timestamp (nullable = true)\n", " |-- raw_assessment_date: string (nullable = true)\n", " |-- raw_completion_time: string (nullable = true)\n", " |-- raw_engagement_score: string (nullable = true)\n", " |-- raw_grade_level: string (nullable = true)\n", " |-- raw_payload: string (nullable = true)\n", " |-- raw_score: string (nullable = true)\n", " |-- raw_student_id: string (nullable = true)\n", " |-- raw_subject: string (nullable = true)\n", " |-- source_system: string (nullable = true)\n", "\n", "\n", "Sample Raw Data (Bronze Layer):\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+--------------------+--------------------+-------------------+-------------------+--------------------+---------------+--------------------+------------------+--------------+-----------+--------------------+\n", "| batch_id| ingestion_timestamp|raw_assessment_date|raw_completion_time|raw_engagement_score|raw_grade_level| raw_payload| raw_score|raw_student_id|raw_subject| source_system|\n", "+--------------------+--------------------+-------------------+-------------------+--------------------+---------------+--------------------+------------------+--------------+-----------+--------------------+\n", "|BATCH_20251220_00...|2025-12-20 00:48:...| 2024-04-05| 89.16| 90| 3rd Grade|{\"student_id\": \"S...|187.20373006255488| STU000001| English| LMS_System_A|\n", "|BATCH_20251220_00...|2025-12-20 00:48:...| 2024-09-10| 53.93| 87| 3rd Grade|{\"student_id\": \"S...| 85.18| STU000001| Art| School_Database_C|\n", "|BATCH_20251220_00...|2025-12-20 00:48:...| 2024-09-11| 91.76| 7| 3rd Grade|{\"student_id\": \"S...| 70.55| STU000001| Math| LMS_System_A|\n", "|BATCH_20251220_00...|2025-12-20 00:48:...| 2024-08-09| 81.52| 62| 3rd Grade|{\"student_id\": \"S...| 71.72| STU000001| English|Assessment_Platfo...|\n", "|BATCH_20251220_00...|2025-12-20 00:48:...| 2024-03-03| 86.99| 1| 3rd Grade|{\"student_id\": \"S...| 55.79| STU000001| Math|Assessment_Platfo...|\n", "+--------------------+--------------------+-------------------+-------------------+--------------------+---------------+--------------------+------------------+--------------+-----------+--------------------+\n", "only showing top 5 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Successfully inserted 67535 raw records into Bronze layer\n", "Data is stored exactly as received - no transformations applied.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Insert raw data into Bronze layer\n", "# Using PySpark for distributed processing and type safety\n", "\n", "# Create DataFrame from generated raw data\n", "df_raw_assessments = spark.createDataFrame(raw_assessment_data)\n", "\n", "# Display schema and sample data\n", "print(\"Bronze Layer DataFrame Schema:\")\n", "df_raw_assessments.printSchema()\n", "\n", "print(\"\\nSample Raw Data (Bronze Layer):\")\n", "df_raw_assessments.show(5)\n", "\n", "# Insert data into Bronze table with liquid clustering\n", "# This preserves the raw data exactly as received\n", "df_raw_assessments.write.mode(\"overwrite\").saveAsTable(\"education.bronze.student_assessments_raw\")\n", "\n", "print(f\"\\nSuccessfully inserted {df_raw_assessments.count()} raw records into Bronze layer\")\n", "print(\"Data is stored exactly as received - no transformations applied.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Bronze Layer: Raw Data Inspection ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Records with potential data quality issues:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+--------------+-------------------+------------------+-----------------+---------------+--------------------+\n", "|raw_student_id|raw_assessment_date| raw_subject| raw_score|raw_grade_level| source_system|\n", "+--------------+-------------------+------------------+-----------------+---------------+--------------------+\n", "| STU002227| 2024-11-17|Physical Education|38.43922011268124| Kindergarten|Assessment_Platfo...|\n", "| STU002228| 2024-03-30| Math| 66.05| 1st Grade| LMS_System_A|\n", "| STU002229| 2024-12-16| English| NULL| 11th Grade|Assessment_Platfo...|\n", "| STU002229| 2024-10-16| Math| | 11th Grade|Assessment_Platfo...|\n", "| STU002230| 2024-05-30| Math| 6th Grade| Kindergarten|Assessment_Platfo...|\n", "| STU002230| 2024-06-23| History| NULL| Kindergarten| School_Database_C|\n", "| STU002230| 2024-09-02| English| | Kindergarten|Assessment_Platfo...|\n", "| STU002230| 2024-12-02|Physical Education| 87.65| Kindergarten| School_Database_C|\n", "| STU002230| 2024-03-02| English| 93.39| Kindergarten|Assessment_Platfo...|\n", "| STU002230| 2024-07-01|Physical Education| 5th Grade| Kindergarten|Assessment_Platfo...|\n", "+--------------+-------------------+------------------+-----------------+---------------+--------------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Total raw records in Bronze layer: 67535\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Demonstrate Bronze layer querying\n", "# Show raw data with potential quality issues\n", "\n", "print(\"=== Bronze Layer: Raw Data Inspection ===\")\n", "\n", "# Query raw data to show data quality issues\n", "raw_data_sample = spark.sql(\"\"\"\n", "SELECT raw_student_id, raw_assessment_date, raw_subject, \n", " raw_score, raw_grade_level, source_system\n", "FROM education.bronze.student_assessments_raw\n", "WHERE raw_score NOT REGEXP '^[0-9]+\\\\.?[0-9]*$'\n", " OR raw_score = '' \n", " OR raw_engagement_score NOT REGEXP '^[0-9]+$' \n", "LIMIT 10\n", "\"\"\")\n", "\n", "print(\"Records with potential data quality issues:\")\n", "raw_data_sample.show()\n", "\n", "print(f\"Total raw records in Bronze layer: {spark.table('education.bronze.student_assessments_raw').count()}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Silver Layer: Data Cleansing and Standardization\n", "\n", "## Overview\n", "\n", "The Silver layer transforms raw Bronze data into clean, standardized, and enriched datasets. Key activities:\n", "\n", "- **Data validation**: Remove or correct invalid data\n", "- **Standardization**: Normalize formats and values\n", "- **Enrichment**: Add derived fields and business logic\n", "- **Deduplication**: Remove duplicate records\n", "- **Schema enforcement**: Apply consistent data types\n", "\n", "## Silver Table Design\n", "\n", "Our `silver_student_assessments` table will store cleaned data with:\n", "\n", "- Validated and standardized fields\n", "- Derived metrics (e.g., performance categories)\n", "- Data quality scores\n", "- Business rule validations" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Silver layer table created successfully!\n", "Optimized for student-specific and time-based analytical queries.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create Silver layer table for cleaned and validated data\n", "# CLUSTER BY optimizes for analytical queries and student tracking\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS education.silver.student_assessments_clean (\n", " -- Standardized fields\n", " student_id STRING,\n", " assessment_date DATE,\n", " subject STRING,\n", " score DECIMAL(5,2),\n", " grade_level STRING,\n", " completion_time DECIMAL(6,2),\n", " engagement_score INT,\n", " \n", " -- Derived and enriched fields\n", " performance_category STRING, -- High, Medium, Low\n", " engagement_category STRING, -- High, Medium, Low\n", " is_valid_score BOOLEAN,\n", " is_valid_engagement BOOLEAN,\n", " \n", " -- Metadata\n", " source_system STRING,\n", " batch_id STRING,\n", " processing_timestamp TIMESTAMP,\n", " data_quality_score DECIMAL(3,2),\n", " \n", " -- Bronze layer reference for lineage\n", " bronze_batch_id STRING\n", ")\n", "USING DELTA\n", "CLUSTER BY (student_id, assessment_date)\n", "\"\"\")\n", "\n", "print(\"Silver layer table created successfully!\")\n", "print(\"Optimized for student-specific and time-based analytical queries.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Transformed 67240 clean records for Silver layer\n", "Applied data validation, standardization, and enrichment.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Transform Bronze data to Silver layer\n", "# Apply data cleansing, validation, and enrichment\n", "from pyspark.sql import functions as F\n", "\n", "from pyspark.sql.functions import col, when, regexp_replace, to_date, udf\n", "from pyspark.sql.types import BooleanType, StringType\n", "\n", "# Read Bronze data\n", "bronze_df = spark.table(\"education.bronze.student_assessments_raw\")\n", "\n", "# Define data cleansing functions\n", "def validate_score(score_str):\n", " \"\"\"Validate and clean score values\"\"\"\n", " if not score_str or score_str in ['', 'NULL', 'N/A', 'INVALID']:\n", " return None\n", " try:\n", " score = float(score_str)\n", " return score if 0 <= score <= 100 else None\n", " except:\n", " return None\n", "\n", "def validate_engagement(engagement_str):\n", " \"\"\"Validate and clean engagement scores\"\"\"\n", " if not engagement_str or engagement_str in ['', 'NULL', 'N/A', 'INVALID']:\n", " return None\n", " try:\n", " score = int(float(engagement_str))\n", " return score if 0 <= score <= 100 else None\n", " except:\n", " return None\n", "\n", "def standardize_grade_level(grade_str):\n", " \"\"\"Standardize grade level formats\"\"\"\n", " if not grade_str:\n", " return None\n", " \n", " # Handle various formats\n", " grade_str = grade_str.strip()\n", " \n", " # Convert \"Xth Grade\" to \"Xth Grade\"\n", " if grade_str.endswith('th Grade') or grade_str.endswith('st Grade') or grade_str.endswith('nd Grade') or grade_str.endswith('rd Grade'):\n", " return grade_str\n", " \n", " # Convert \"Grade X\" to \"Xth Grade\"\n", " if grade_str.startswith('Grade '):\n", " grade_num = grade_str.replace('Grade ', '')\n", " try:\n", " num = int(grade_num)\n", " if num == 1:\n", " return \"1st Grade\"\n", " elif num == 2:\n", " return \"2nd Grade\"\n", " elif num == 3:\n", " return \"3rd Grade\"\n", " else:\n", " return f\"{num}th Grade\"\n", " except:\n", " return grade_str\n", " \n", " return grade_str\n", "\n", "# Register UDFs\n", "validate_score_udf = udf(validate_score)\n", "validate_engagement_udf = udf(validate_engagement)\n", "standardize_grade_udf = udf(standardize_grade_level)\n", "\n", "# Transform Bronze to Silver\n", "silver_df = bronze_df.withColumn(\"student_id\", col(\"raw_student_id\")) \\\n", " .withColumn(\"assessment_date\", to_date(col(\"raw_assessment_date\"))) \\\n", " .withColumn(\"subject\", col(\"raw_subject\")) \\\n", " .withColumn(\"score\", validate_score_udf(col(\"raw_score\"))) \\\n", " .withColumn(\"grade_level\", standardize_grade_udf(col(\"raw_grade_level\"))) \\\n", " .withColumn(\"completion_time\", \n", " when(col(\"raw_completion_time\").rlike(\"^[0-9]+\\\\.?[0-9]*$\"), \n", " col(\"raw_completion_time\").cast(\"decimal(6,2)\")).otherwise(None)) \\\n", " .withColumn(\"engagement_score\", validate_engagement_udf(col(\"raw_engagement_score\"))) \\\n", " .withColumn(\"performance_category\",\n", " when(col(\"score\") >= 85, \"High\")\n", " .when(col(\"score\") >= 70, \"Medium\")\n", " .when(col(\"score\") < 70, \"Low\")\n", " .otherwise(\"Unknown\")) \\\n", " .withColumn(\"engagement_category\",\n", " when(col(\"engagement_score\") >= 80, \"High\")\n", " .when(col(\"engagement_score\") >= 60, \"Medium\")\n", " .when(col(\"engagement_score\") < 60, \"Low\")\n", " .otherwise(\"Unknown\")) \\\n", " .withColumn(\"is_valid_score\", col(\"score\").isNotNull()) \\\n", " .withColumn(\"is_valid_engagement\", col(\"engagement_score\").isNotNull()) \\\n", " .withColumn(\"source_system\", col(\"source_system\")) \\\n", " .withColumn(\"batch_id\", col(\"batch_id\")) \\\n", " .withColumn(\"processing_timestamp\", F.current_timestamp()) \\\n", " .withColumn(\"data_quality_score\", \n", " ((when(col(\"is_valid_score\"), 1).otherwise(0)) + \n", " (when(col(\"is_valid_engagement\"), 1).otherwise(0))) / 2.0) \\\n", " .withColumn(\"bronze_batch_id\", col(\"batch_id\")) \\\n", " .select(\"student_id\", \"assessment_date\", \"subject\", \"score\", \"grade_level\", \n", " \"completion_time\", \"engagement_score\", \"performance_category\", \n", " \"engagement_category\", \"is_valid_score\", \"is_valid_engagement\", \n", " \"source_system\", \"batch_id\", \"processing_timestamp\", \n", " \"data_quality_score\", \"bronze_batch_id\")\n", "\n", "# Filter out records with no valid data\n", "silver_df = silver_df.filter(\"is_valid_score = true OR is_valid_engagement = true\")\n", "\n", "print(f\"Transformed {silver_df.count()} clean records for Silver layer\")\n", "print(\"Applied data validation, standardization, and enrichment.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Silver Layer DataFrame Schema:\n", "root\n", " |-- student_id: string (nullable = true)\n", " |-- assessment_date: date (nullable = true)\n", " |-- subject: string (nullable = true)\n", " |-- score: string (nullable = true)\n", " |-- grade_level: string (nullable = true)\n", " |-- completion_time: decimal(6,2) (nullable = true)\n", " |-- engagement_score: string (nullable = true)\n", " |-- performance_category: string (nullable = false)\n", " |-- engagement_category: string (nullable = false)\n", " |-- is_valid_score: boolean (nullable = false)\n", " |-- is_valid_engagement: boolean (nullable = false)\n", " |-- source_system: string (nullable = true)\n", " |-- batch_id: string (nullable = true)\n", " |-- processing_timestamp: timestamp (nullable = false)\n", " |-- data_quality_score: double (nullable = true)\n", " |-- bronze_batch_id: string (nullable = true)\n", "\n", "\n", "Sample Cleaned Data (Silver Layer):\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+---------------+------------------+-----------------+------------+---------------+----------------+--------------------+-------------------+--------------+-------------------+--------------------+--------------------+--------------------+------------------+--------------------+\n", "|student_id|assessment_date| subject| score| grade_level|completion_time|engagement_score|performance_category|engagement_category|is_valid_score|is_valid_engagement| source_system| batch_id|processing_timestamp|data_quality_score| bronze_batch_id|\n", "+----------+---------------+------------------+-----------------+------------+---------------+----------------+--------------------+-------------------+--------------+-------------------+--------------------+--------------------+--------------------+------------------+--------------------+\n", "| STU002226| 2024-11-22| History| 66.58| 9th Grade| 84.15| 4| Low| Low| true| true|Assessment_Platfo...|BATCH_20251220_00...|2025-12-20 00:50:...| 1.0|BATCH_20251220_00...|\n", "| STU002227| 2024-05-05| Math| 50.85|Kindergarten| 118.86| 24| Low| Low| true| true| LMS_System_A|BATCH_20251220_00...|2025-12-20 00:50:...| 1.0|BATCH_20251220_00...|\n", "| STU002227| 2024-11-03| History| 65.44|Kindergarten| 89.92| 48| Low| Low| true| true|Assessment_Platfo...|BATCH_20251220_00...|2025-12-20 00:50:...| 1.0|BATCH_20251220_00...|\n", "| STU002227| 2024-11-17|Physical Education|38.43922011268124|Kindergarten| 77.98| NULL| Low| Unknown| true| false|Assessment_Platfo...|BATCH_20251220_00...|2025-12-20 00:50:...| 0.5|BATCH_20251220_00...|\n", "| STU002227| 2024-05-15| Science| 73.1|Kindergarten| 48.18| 13| Medium| Low| true| true| School_Database_C|BATCH_20251220_00...|2025-12-20 00:50:...| 1.0|BATCH_20251220_00...|\n", "+----------+---------------+------------------+-----------------+------------+---------------+----------------+--------------------+-------------------+--------------+-------------------+--------------------+--------------------+--------------------+------------------+--------------------+\n", "only showing top 5 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Successfully inserted 67240 cleaned records into Silver layer\n", "\n", "Data Quality Statistics:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+--------------+-------------------+-----+\n", "|is_valid_score|is_valid_engagement|count|\n", "+--------------+-------------------+-----+\n", "| true| false| 3081|\n", "| true| true|59184|\n", "| false| true| 4975|\n", "+--------------+-------------------+-----+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Insert cleaned data into Silver layer\n", "\n", "# Display sample of cleaned data\n", "print(\"Silver Layer DataFrame Schema:\")\n", "silver_df.printSchema()\n", "\n", "print(\"\\nSample Cleaned Data (Silver Layer):\")\n", "silver_df.show(5)\n", "\n", "# Insert into Silver table\n", "silver_df.write.mode(\"overwrite\").saveAsTable(\"education.silver.student_assessments_clean\")\n", "\n", "print(f\"\\nSuccessfully inserted {silver_df.count()} cleaned records into Silver layer\")\n", "\n", "# Show data quality improvements\n", "quality_stats = silver_df.groupBy(\"is_valid_score\", \"is_valid_engagement\").count()\n", "print(\"\\nData Quality Statistics:\")\n", "quality_stats.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Silver Layer: Cleaned Data Analytics ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+--------------------+-------------------+------------+---------+--------------+-----------------+\n", "|performance_category|engagement_category|record_count|avg_score|avg_engagement|avg_quality_score|\n", "+--------------------+-------------------+------------+---------+--------------+-----------------+\n", "| High| High| 3626| 92.47| 90.09| 100.0|\n", "| High| Low| 10255| 92.5| 29.45| 100.0|\n", "| High| Medium| 3447| 92.46| 69.42| 100.0|\n", "| High| Unknown| 925| 92.53| NULL| 50.0|\n", "| Low| High| 5111| 58.11| 89.91| 100.0|\n", "| Low| Low| 14478| 58.07| 29.72| 100.0|\n", "| Low| Medium| 4774| 57.95| 69.42| 100.0|\n", "| Low| Unknown| 1268| 58.17| NULL| 50.0|\n", "| Medium| High| 3753| 77.54| 90.03| 100.0|\n", "| Medium| Low| 10349| 77.56| 29.78| 100.0|\n", "| Medium| Medium| 3391| 77.43| 69.24| 100.0|\n", "| Medium| Unknown| 888| 77.48| NULL| 50.0|\n", "| Unknown| High| 1064| NULL| 90.16| 50.0|\n", "| Unknown| Low| 2910| NULL| 30.18| 50.0|\n", "| Unknown| Medium| 1001| NULL| 69.69| 50.0|\n", "+--------------------+-------------------+------------+---------+--------------+-----------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Grade Level Standardization:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+------------+-----+\n", "| grade_level|count|\n", "+------------+-----+\n", "| 10th Grade| 5059|\n", "| 11th Grade| 5288|\n", "| 12th Grade| 4932|\n", "| 1st Grade| 5150|\n", "| 2nd Grade| 5351|\n", "| 3rd Grade| 5059|\n", "| 4th Grade| 4419|\n", "| 5th Grade| 5544|\n", "| 6th Grade| 5272|\n", "| 7th Grade| 5208|\n", "| 8th Grade| 5364|\n", "| 9th Grade| 5372|\n", "|Kindergarten| 5222|\n", "+------------+-----+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Demonstrate Silver layer analytical capabilities\n", "# Show cleaned data benefits\n", "\n", "print(\"=== Silver Layer: Cleaned Data Analytics ===\")\n", "\n", "# Query performance by category\n", "performance_analysis = spark.sql(\"\"\"\n", "SELECT performance_category, engagement_category, \n", " COUNT(*) as record_count,\n", " ROUND(AVG(score), 2) as avg_score,\n", " ROUND(AVG(engagement_score), 2) as avg_engagement,\n", " ROUND(AVG(data_quality_score * 100), 2) as avg_quality_score\n", "FROM education.silver.student_assessments_clean\n", "GROUP BY performance_category, engagement_category\n", "ORDER BY performance_category, engagement_category\n", "\"\"\")\n", "\n", "performance_analysis.show()\n", "\n", "# Show grade level standardization\n", "grade_standardization = spark.sql(\"\"\"\n", "SELECT grade_level, COUNT(*) as count\n", "FROM education.silver.student_assessments_clean\n", "GROUP BY grade_level\n", "ORDER BY grade_level\n", "\"\"\")\n", "\n", "print(\"\\nGrade Level Standardization:\")\n", "grade_standardization.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Gold Layer: Analytics and Machine Learning\n", "\n", "## Overview\n", "\n", "The Gold layer provides analytics-ready data optimized for business intelligence and machine learning. Key activities:\n", "\n", "- **Aggregations**: Pre-computed metrics and KPIs\n", "- **Machine Learning**: Feature engineering and model training\n", "- **Business Intelligence**: Dashboard-ready datasets\n", "- **Performance Optimization**: Denormalized for fast queries\n", "\n", "## Gold Tables\n", "\n", "- `student_performance_aggregates`: Aggregated student metrics\n", "- `subject_performance_analytics`: Subject-level analytics\n", "- `student_performance_predictions`: ML predictions for intervention planning" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Gold layer tables created successfully!\n", "Optimized for analytical queries and ML feature serving.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create Gold layer tables for analytics and ML\n", "\n", "# Student performance aggregates\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS education.gold.student_performance_aggregates (\n", " student_id STRING,\n", " grade_level STRING,\n", " total_assessments INT,\n", " avg_score DECIMAL(5,2),\n", " score_stddev DECIMAL(5,2),\n", " avg_engagement DECIMAL(5,2),\n", " avg_completion_time DECIMAL(6,2),\n", " subjects_attempted INT,\n", " active_months INT,\n", " performance_trend DECIMAL(5,2),\n", " engagement_trend DECIMAL(5,2),\n", " overall_performance_category STRING,\n", " risk_level STRING,\n", " last_assessment_date DATE,\n", " processing_timestamp TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (grade_level, overall_performance_category)\n", "\"\"\")\n", "\n", "# Subject performance analytics\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS education.gold.subject_performance_analytics (\n", " subject STRING,\n", " grade_level STRING,\n", " total_assessments INT,\n", " avg_score DECIMAL(5,2),\n", " avg_engagement DECIMAL(5,2),\n", " avg_completion_time DECIMAL(6,2),\n", " unique_students INT,\n", " performance_distribution STRING,\n", " difficulty_rating DECIMAL(3,2),\n", " processing_timestamp TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (subject, grade_level)\n", "\"\"\")\n", "\n", "# ML predictions table\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS education.gold.student_performance_predictions (\n", " student_id STRING,\n", " predicted_performance_category STRING,\n", " prediction_probability DECIMAL(5,4),\n", " risk_score DECIMAL(5,4),\n", " recommended_intervention STRING,\n", " model_version STRING,\n", " prediction_timestamp TIMESTAMP,\n", " features_used STRING\n", ")\n", "USING DELTA\n", "CLUSTER BY (student_id, prediction_timestamp)\n", "\"\"\")\n", "\n", "print(\"Gold layer tables created successfully!\")\n", "print(\"Optimized for analytical queries and ML feature serving.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Created performance aggregates for 3000 students\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Student performance aggregates inserted into Gold layer\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create student performance aggregates\n", "\n", "student_aggregates = spark.sql(\"\"\"\n", "SELECT \n", " student_id,\n", " grade_level,\n", " COUNT(*) as total_assessments,\n", " ROUND(AVG(score), 2) as avg_score,\n", " ROUND(STDDEV(score), 2) as score_stddev,\n", " ROUND(AVG(engagement_score), 2) as avg_engagement,\n", " ROUND(AVG(completion_time), 2) as avg_completion_time,\n", " COUNT(DISTINCT subject) as subjects_attempted,\n", " COUNT(DISTINCT DATE_FORMAT(assessment_date, 'yyyy-MM')) as active_months,\n", " -- Performance trend (recent vs earlier)\n", " ROUND(\n", " AVG(CASE WHEN assessment_date >= '2024-07-01' THEN score END) - \n", " AVG(CASE WHEN assessment_date < '2024-07-01' THEN score END), \n", " 2\n", " ) as performance_trend,\n", " -- Engagement trend\n", " ROUND(\n", " AVG(CASE WHEN assessment_date >= '2024-07-01' THEN engagement_score END) - \n", " AVG(CASE WHEN assessment_date < '2024-07-01' THEN engagement_score END), \n", " 2\n", " ) as engagement_trend,\n", " -- Overall category\n", " CASE \n", " WHEN AVG(score) >= 85 THEN 'High Performer'\n", " WHEN AVG(score) >= 70 THEN 'Medium Performer'\n", " ELSE 'Low Performer'\n", " END as overall_performance_category,\n", " -- Risk level based on multiple factors\n", " CASE \n", " WHEN AVG(score) < 70 AND AVG(engagement_score) < 60 THEN 'High Risk'\n", " WHEN AVG(score) < 75 OR AVG(engagement_score) < 65 THEN 'Medium Risk'\n", " ELSE 'Low Risk'\n", " END as risk_level,\n", " MAX(assessment_date) as last_assessment_date,\n", " CURRENT_TIMESTAMP() as processing_timestamp\n", "FROM education.silver.student_assessments_clean\n", "GROUP BY student_id, grade_level\n", "\"\"\")\n", "\n", "# Handle null values in trend calculations\n", "student_aggregates = student_aggregates.fillna(0, subset=['performance_trend', 'engagement_trend'])\n", "\n", "print(f\"Created performance aggregates for {student_aggregates.count()} students\")\n", "\n", "# Insert into Gold layer\n", "student_aggregates.write.mode(\"overwrite\").saveAsTable(\"education.gold.student_performance_aggregates\")\n", "\n", "print(\"Student performance aggregates inserted into Gold layer\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Created analytics for 78 subject-grade combinations\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Subject performance analytics inserted into Gold layer\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create subject performance analytics\n", "\n", "subject_analytics = spark.sql(\"\"\"\n", "SELECT \n", " subject,\n", " grade_level,\n", " COUNT(*) as total_assessments,\n", " ROUND(AVG(score), 2) as avg_score,\n", " ROUND(AVG(engagement_score), 2) as avg_engagement,\n", " ROUND(AVG(completion_time), 2) as avg_completion_time,\n", " COUNT(DISTINCT student_id) as unique_students,\n", " -- Performance distribution\n", " CONCAT(\n", " 'High: ', CAST(COUNT(CASE WHEN performance_category = 'High' THEN 1 END) AS STRING), ', ',\n", " 'Medium: ', CAST(COUNT(CASE WHEN performance_category = 'Medium' THEN 1 END) AS STRING), ', ',\n", " 'Low: ', CAST(COUNT(CASE WHEN performance_category = 'Low' THEN 1 END) AS STRING)\n", " ) as performance_distribution,\n", " -- Difficulty rating (inverse of average score)\n", " ROUND((100 - AVG(score)) / 20, 2) as difficulty_rating,\n", " CURRENT_TIMESTAMP() as processing_timestamp\n", "FROM education.silver.student_assessments_clean\n", "GROUP BY subject, grade_level\n", "ORDER BY subject, grade_level\n", "\"\"\")\n", "\n", "print(f\"Created analytics for {subject_analytics.count()} subject-grade combinations\")\n", "\n", "# Insert into Gold layer\n", "subject_analytics.write.mode(\"overwrite\").saveAsTable(\"education.gold.subject_performance_analytics\")\n", "\n", "print(\"Subject performance analytics inserted into Gold layer\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Gold Layer: Analytics Dashboard Data ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------------------------+-----------+-------------+---------+--------------+\n", "|overall_performance_category| risk_level|student_count|avg_score|avg_engagement|\n", "+----------------------------+-----------+-------------+---------+--------------+\n", "| High Performer|Medium Risk| 2| 85.28| 50.33|\n", "| Low Performer| High Risk| 382| 68.03| 49.53|\n", "| Low Performer|Medium Risk| 22| 67.91| 62.34|\n", "| Medium Performer| Low Risk| 19| 77.82| 67.5|\n", "| Medium Performer|Medium Risk| 2575| 74.78| 50.14|\n", "+----------------------------+-----------+-------------+---------+--------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Subject Difficulty Analysis:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+------------------+---------+--------------+\n", "| subject|avg_score|avg_difficulty|\n", "+------------------+---------+--------------+\n", "| Science| 73.55| 1.32|\n", "|Physical Education| 73.95| 1.3|\n", "| Art| 73.96| 1.3|\n", "| Math| 73.96| 1.3|\n", "| English| 73.99| 1.3|\n", "| History| 74.02| 1.3|\n", "+------------------+---------+--------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Students Needing Intervention:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+------------+---------+--------------+----------+-----------------+----------------+\n", "|student_id| grade_level|avg_score|avg_engagement|risk_level|performance_trend|engagement_trend|\n", "+----------+------------+---------+--------------+----------+-----------------+----------------+\n", "| STU000045| 11th Grade| 59.83| 45.28| High Risk| 14.64| 20.83|\n", "| STU001660| 6th Grade| 60.19| 43.61| High Risk| -1.62| -9.61|\n", "| STU002952| 10th Grade| 61.69| 46.0| High Risk| 1.46| -11.38|\n", "| STU000141| 3rd Grade| 61.94| 57.45| High Risk| -0.65| -7.48|\n", "| STU001770| 5th Grade| 62.71| 40.79| High Risk| -4.2| 19.19|\n", "| STU000401| 11th Grade| 63.19| 38.88| High Risk| -1.53| 23.27|\n", "| STU000861|Kindergarten| 63.3| 41.69| High Risk| -2.98| 7.83|\n", "| STU000802| 5th Grade| 63.35| 59.9| High Risk| -3.57| -3.05|\n", "| STU002835| 9th Grade| 63.61| 36.31| High Risk| 6.12| -15.13|\n", "| STU000792| 3rd Grade| 63.65| 44.47| High Risk| 7.29| 5.14|\n", "+----------+------------+---------+--------------+----------+-----------------+----------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Demonstrate Gold layer analytics capabilities\n", "\n", "print(\"=== Gold Layer: Analytics Dashboard Data ===\")\n", "\n", "# Student performance overview\n", "performance_overview = spark.sql(\"\"\"\n", "SELECT overall_performance_category, risk_level, \n", " COUNT(*) as student_count,\n", " ROUND(AVG(avg_score), 2) as avg_score,\n", " ROUND(AVG(avg_engagement), 2) as avg_engagement\n", "FROM education.gold.student_performance_aggregates\n", "GROUP BY overall_performance_category, risk_level\n", "ORDER BY overall_performance_category, risk_level\n", "\"\"\")\n", "\n", "performance_overview.show()\n", "\n", "# Subject difficulty analysis\n", "subject_difficulty = spark.sql(\"\"\"\n", "SELECT subject, ROUND(AVG(avg_score), 2) as avg_score, \n", " ROUND(AVG(difficulty_rating), 2) as avg_difficulty\n", "FROM education.gold.subject_performance_analytics\n", "GROUP BY subject\n", "ORDER BY avg_difficulty DESC\n", "\"\"\")\n", "\n", "print(\"\\nSubject Difficulty Analysis:\")\n", "subject_difficulty.show()\n", "\n", "# At-risk students for intervention\n", "at_risk_students = spark.sql(\"\"\"\n", "SELECT student_id, grade_level, avg_score, avg_engagement, risk_level,\n", " performance_trend, engagement_trend\n", "FROM education.gold.student_performance_aggregates\n", "WHERE risk_level IN ('High Risk', 'Medium Risk')\n", "ORDER BY avg_score ASC\n", "LIMIT 10\n", "\"\"\")\n", "\n", "print(\"\\nStudents Needing Intervention:\")\n", "at_risk_students.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Prepared ML features for 3000 students\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+--------------+-----+\n", "|high_performer|count|\n", "+--------------+-----+\n", "| 1| 1162|\n", "| 0| 1838|\n", "+--------------+-----+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Train student performance prediction model\n", "# This is the ML component moved to Gold layer\n", "\n", "from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler\n", "from pyspark.ml.classification import RandomForestClassifier\n", "from pyspark.ml.evaluation import BinaryClassificationEvaluator\n", "from pyspark.ml import Pipeline\n", "import pyspark.sql.functions as F\n", "\n", "# Create ML features from Gold layer aggregates\n", "ml_features = spark.sql(\"\"\"\n", "SELECT \n", " student_id,\n", " total_assessments,\n", " avg_score,\n", " score_stddev,\n", " avg_engagement,\n", " avg_completion_time,\n", " subjects_attempted,\n", " active_months,\n", " performance_trend,\n", " engagement_trend,\n", " grade_level,\n", " -- Target: High performance (score >= 75)\n", " CASE WHEN avg_score >= 75 THEN 1 ELSE 0 END as high_performer,\n", " -- Risk score for intervention planning\n", " CASE \n", " WHEN risk_level = 'High Risk' THEN 0.9\n", " WHEN risk_level = 'Medium Risk' THEN 0.6\n", " ELSE 0.2\n", " END as risk_score\n", "FROM education.gold.student_performance_aggregates\n", "\"\"\")\n", "\n", "# Fill null values\n", "ml_features = ml_features.fillna(0, subset=['score_stddev', 'performance_trend', 'engagement_trend'])\n", "\n", "print(f\"Prepared ML features for {ml_features.count()} students\")\n", "ml_features.groupBy(\"high_performer\").count().show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Training set: 2451 students\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Test set: 549 students\n", "Training student performance prediction model...\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Model Performance - AUC: 0.9999\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+---------+--------------+--------------+----------+--------------------+\n", "|student_id|avg_score|avg_engagement|high_performer|prediction| probability|\n", "+----------+---------+--------------+--------------+----------+--------------------+\n", "| STU000003| 79.26| 47.81| 1| 1.0|[0.00238709677419...|\n", "| STU000007| 78.33| 55.05| 1| 1.0| [0.0,1.0]|\n", "| STU000009| 77.46| 41.0| 1| 1.0| [0.0,1.0]|\n", "| STU000014| 75.39| 46.42| 1| 1.0| [0.0,1.0]|\n", "| STU000020| 73.89| 48.07| 0| 0.0| [1.0,0.0]|\n", "| STU000024| 72.0| 59.65| 0| 0.0|[0.97654411764705...|\n", "| STU000030| 71.41| 55.63| 0| 0.0| [1.0,0.0]|\n", "| STU000036| 73.99| 54.08| 0| 0.0|[0.99306451612903...|\n", "| STU000046| 73.72| 56.4| 0| 0.0|[0.97955752212389...|\n", "| STU000047| 77.49| 42.73| 1| 1.0| [0.0,1.0]|\n", "+----------+---------+--------------+--------------+----------+--------------------+\n", "only showing top 10 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Feature engineering and model training\n", "\n", "# Create indexers for categorical features\n", "grade_indexer = StringIndexer(inputCol=\"grade_level\", outputCol=\"grade_level_index\")\n", "\n", "# Assemble features for the model\n", "feature_cols = [\"total_assessments\", \"avg_score\", \"score_stddev\", \"avg_engagement\", \n", " \"avg_completion_time\", \"subjects_attempted\", \"active_months\", \n", " \"performance_trend\", \"engagement_trend\", \"grade_level_index\"]\n", "\n", "assembler = VectorAssembler(\n", " inputCols=feature_cols,\n", " outputCol=\"features\"\n", ")\n", "\n", "# Scale features\n", "scaler = StandardScaler(inputCol=\"features\", outputCol=\"scaled_features\")\n", "\n", "# Create and train the model\n", "rf = RandomForestClassifier(\n", " labelCol=\"high_performer\", \n", " featuresCol=\"scaled_features\",\n", " numTrees=100,\n", " maxDepth=10,\n", " seed=42\n", ")\n", "\n", "# Create pipeline\n", "pipeline = Pipeline(stages=[grade_indexer, assembler, scaler, rf])\n", "\n", "# Split data\n", "train_data, test_data = ml_features.randomSplit([0.8, 0.2], seed=42)\n", "\n", "print(f\"Training set: {train_data.count()} students\")\n", "print(f\"Test set: {test_data.count()} students\")\n", "\n", "# Train the model\n", "print(\"Training student performance prediction model...\")\n", "model = pipeline.fit(train_data)\n", "\n", "# Make predictions\n", "predictions = model.transform(test_data)\n", "\n", "# Evaluate the model\n", "evaluator = BinaryClassificationEvaluator(labelCol=\"high_performer\", metricName=\"areaUnderROC\")\n", "auc = evaluator.evaluate(predictions)\n", "\n", "print(f\"\\nModel Performance - AUC: {auc:.4f}\")\n", "\n", "# Show prediction results\n", "predictions.select(\"student_id\", \"avg_score\", \"avg_engagement\", \"high_performer\", \"prediction\", \"probability\").show(10)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Stored 549 ML predictions in Gold layer\n", "\n", "Sample ML Predictions:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+------------------------------+----------------------+----------+------------------------+-------------+--------------------+--------------------+\n", "|student_id|predicted_performance_category|prediction_probability|risk_score|recommended_intervention|model_version|prediction_timestamp| features_used|\n", "+----------+------------------------------+----------------------+----------+------------------------+-------------+--------------------+--------------------+\n", "| STU000003| High Performer| 0.9976129032258065| 0.6| Advanced Learning...| v1.0|2025-12-20 01:02:...|total_assessments...|\n", "| STU000007| High Performer| 1.0| 0.6| Advanced Learning...| v1.0|2025-12-20 01:02:...|total_assessments...|\n", "| STU000009| High Performer| 1.0| 0.6| Advanced Learning...| v1.0|2025-12-20 01:02:...|total_assessments...|\n", "| STU000014| High Performer| 1.0| 0.6| Advanced Learning...| v1.0|2025-12-20 01:02:...|total_assessments...|\n", "| STU000020| Low Performer| 1.0| 0.6| Academic Support ...| v1.0|2025-12-20 01:02:...|total_assessments...|\n", "+----------+------------------------------+----------------------+----------+------------------------+-------------+--------------------+--------------------+\n", "only showing top 5 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Generate predictions and store in Gold layer\n", "\n", "from pyspark.ml.functions import vector_to_array\n", "\n", "# Convert probability vector to array for proper indexing\n", "predictions_with_prob_array = predictions.withColumn(\"prob_array\", vector_to_array(\"probability\"))\n", "\n", "# Create predictions DataFrame\n", "model_predictions = predictions_with_prob_array.select(\n", " \"student_id\",\n", " when(col(\"prediction\") == 1, \"High Performer\").otherwise(\"Low Performer\").alias(\"predicted_performance_category\"),\n", " (when(col(\"prediction\") == 1, col(\"prob_array\")[1]).otherwise(col(\"prob_array\")[0])).alias(\"prediction_probability\"),\n", " \"risk_score\",\n", " when(col(\"prediction\") == 0, \"Academic Support Program\").otherwise(\"Advanced Learning Program\").alias(\"recommended_intervention\"),\n", " F.lit(\"v1.0\").alias(\"model_version\"),\n", " F.current_timestamp().alias(\"prediction_timestamp\"),\n", " F.lit(\",\".join(feature_cols)).alias(\"features_used\")\n", ")\n", "\n", "# Insert predictions into Gold layer\n", "model_predictions.write.mode(\"overwrite\").saveAsTable(\"education.gold.student_performance_predictions\")\n", "\n", "print(f\"Stored {model_predictions.count()} ML predictions in Gold layer\")\n", "\n", "# Show sample predictions\n", "print(\"\\nSample ML Predictions:\")\n", "model_predictions.show(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Feature Importance for Student Performance Prediction ===\n", "total_assessments: 0.0063\n", "avg_score: 0.9327\n", "score_stddev: 0.0294\n", "avg_engagement: 0.0054\n", "avg_completion_time: 0.0058\n", "subjects_attempted: 0.0006\n", "active_months: 0.0026\n", "performance_trend: 0.0065\n", "engagement_trend: 0.0056\n", "grade_level_index: 0.0052\n", "\n", "=== ML-Driven Business Impact Analysis ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+------------------------+-------------+--------------+\n", "|recommended_intervention|student_count|avg_confidence|\n", "+------------------------+-------------+--------------+\n", "| Academic Support ...| 331| 0.9831|\n", "| Advanced Learning...| 218| 0.9943|\n", "+------------------------+-------------+--------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Estimated Intervention Program:\n", "Students identified for intervention: 382\n", "Total intervention cost: $191,000\n", "Expected benefit: $955,000\n", "Intervention program ROI: 400.0%\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Model Performance Metrics:\n", "Accuracy: 0.9945\n", "Precision: 1.0000\n", "Recall: 0.9864\n", "AUC: 0.9999\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Model interpretation and business insights\n", "\n", "# Feature importance\n", "rf_model = model.stages[-1]\n", "feature_importance = rf_model.featureImportances\n", "feature_names = feature_cols\n", "\n", "print(\"=== Feature Importance for Student Performance Prediction ===\")\n", "for name, importance in zip(feature_names, feature_importance):\n", " print(f\"{name}: {importance:.4f}\")\n", "\n", "# Business impact analysis\n", "print(\"\\n=== ML-Driven Business Impact Analysis ===\")\n", "\n", "# Calculate intervention recommendations\n", "intervention_stats = spark.sql(\"\"\"\n", "SELECT recommended_intervention, COUNT(*) as student_count,\n", " ROUND(AVG(prediction_probability), 4) as avg_confidence\n", "FROM education.gold.student_performance_predictions\n", "GROUP BY recommended_intervention\n", "\"\"\")\n", "\n", "intervention_stats.show()\n", "\n", "# Calculate potential ROI\n", "high_risk_count = spark.sql(\"\"\"\n", "SELECT COUNT(*) as high_risk_count\n", "FROM education.gold.student_performance_aggregates\n", "WHERE risk_level = 'High Risk'\n", "\"\"\").collect()[0][0]\n", "\n", "intervention_cost_per_student = 500\n", "intervention_effectiveness = 0.25\n", "avg_student_value = 10000\n", "\n", "total_intervention_cost = high_risk_count * intervention_cost_per_student\n", "expected_benefit = high_risk_count * intervention_effectiveness * avg_student_value\n", "intervention_roi = (expected_benefit - total_intervention_cost) / total_intervention_cost * 100 if total_intervention_cost > 0 else 0\n", "\n", "print(f\"\\nEstimated Intervention Program:\")\n", "print(f\"Students identified for intervention: {high_risk_count}\")\n", "print(f\"Total intervention cost: ${total_intervention_cost:,}\")\n", "print(f\"Expected benefit: ${expected_benefit:,.0f}\")\n", "print(f\"Intervention program ROI: {intervention_roi:.1f}%\")\n", "\n", "# Model accuracy metrics\n", "accuracy = predictions.filter(\"high_performer = prediction\").count() / predictions.count()\n", "precision = predictions.filter(\"prediction = 1 AND high_performer = 1\").count() / predictions.filter(\"prediction = 1\").count() if predictions.filter(\"prediction = 1\").count() > 0 else 0\n", "recall = predictions.filter(\"prediction = 1 AND high_performer = 1\").count() / predictions.filter(\"high_performer = 1\").count() if predictions.filter(\"high_performer = 1\").count() > 0 else 0\n", "\n", "print(f\"\\nModel Performance Metrics:\")\n", "print(f\"Accuracy: {accuracy:.4f}\")\n", "print(f\"Precision: {precision:.4f}\")\n", "print(f\"Recall: {recall:.4f}\")\n", "print(f\"AUC: {auc:.4f}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Key Takeaways: Medallion Architecture + ML in AIDP\n", "\n", "## What We Demonstrated\n", "\n", "1. **Bronze Layer**: Raw data ingestion preserving data fidelity with potential quality issues\n", "2. **Silver Layer**: Data cleansing, validation, standardization, and enrichment\n", "3. **Gold Layer**: Analytics-ready aggregations and ML predictions for business insights\n", "4. **Delta Liquid Clustering**: Automatic optimization across all layers for query performance\n", "5. **Machine Learning Integration**: Student performance prediction with business impact analysis\n", "\n", "## Medallion Architecture Benefits\n", "\n", "- **Data Governance**: Clear data lineage from Bronze to Gold\n", "- **Reusability**: Each layer serves different use cases\n", "- **Performance**: Optimized clustering strategies per layer\n", "- **Maintainability**: Independent layer updates and refreshes\n", "- **Scalability**: Handles data volume growth at each stage\n", "\n", "## AIDP Advantages\n", "\n", "- **Unified Analytics**: Seamless data transformation pipeline\n", "- **ML Integration**: Built-in ML capabilities with Spark MLlib\n", "- **Performance**: Delta Liquid Clustering for automatic optimization\n", "- **Governance**: Catalog and schema isolation\n", "- **Scalability**: Distributed processing with Spark\n", "\n", "## Education Business Impact\n", "\n", "1. **Early Intervention**: ML identifies at-risk students before performance declines\n", "2. **Personalized Learning**: Performance predictions enable tailored educational approaches\n", "3. **Resource Optimization**: Data-driven allocation of academic support\n", "4. **Academic Excellence**: Improved student outcomes through predictive analytics\n", "5. **Educational Equity**: Proactive support ensures all students receive appropriate intervention\n", "\n", "## Best Practices\n", "\n", "1. **Layer Optimization**: Choose clustering columns based on query patterns per layer\n", "2. **Data Quality**: Implement validation rules early in the Silver layer\n", "3. **Incremental Processing**: Design for incremental updates as data grows\n", "4. **Monitoring**: Track data quality and ML model performance over time\n", "5. **Governance**: Maintain clear documentation and data lineage\n", "\n", "## Next Steps\n", "\n", "- Implement incremental data processing pipelines\n", "- Add real-time data ingestion for immediate interventions\n", "- Integrate with learning management systems\n", "- Deploy ML models for production scoring\n", "- Build dashboards for real-time academic monitoring\n", "- Expand to multi-year longitudinal analysis\n", "\n", "This notebook demonstrates how Oracle AI Data Platform enables sophisticated education analytics through the Medallion Architecture pattern, combining data engineering best practices with machine learning for actionable business insights." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" } }, "nbformat": 4, "nbformat_minor": 4 }