{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Healthcare Analytics: Medallion Architecture Demo\n", "\n", "## Overview\n", "\n", "This notebook demonstrates a complete **Medallion Architecture** implementation in Oracle AI Data Platform (AIDP) Workbench using a healthcare analytics use case. The medallion architecture organizes data into three layers:\n", "\n", "- **Bronze Layer**: Raw ingested data, stored as-is from source systems\n", "- **Silver Layer**: Cleaned, standardized, and enriched data\n", "- **Gold Layer**: Business-ready aggregates and ML-ready datasets\n", "\n", "We'll use **Delta Liquid Clustering** throughout all layers to optimize query performance automatically.\n", "\n", "### Use Case: Patient Diagnosis Analytics with Medallion Architecture\n", "\n", "We'll process patient diagnosis records through the complete medallion pipeline, culminating in machine learning models for patient readmission prediction.\n", "\n", "### AIDP Environment Setup\n", "\n", "This notebook leverages the existing Spark session in your AIDP environment." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 1: Create Healthcare Catalog and Schemas\n", "\n", "### Medallion Schema Design\n", "\n", "- **bronze**: Raw data landing zone\n", "- **silver**: Cleaned and standardized data\n", "- **gold**: Business analytics and ML-ready data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Healthcare catalog and medallion schemas (bronze, silver, gold) created successfully!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create healthcare catalog and medallion schemas\n", "\n", "spark.sql(\"CREATE CATALOG IF NOT EXISTS healthcare\")\n", "\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS healthcare.bronze\")\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS healthcare.silver\")\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS healthcare.gold\")\n", "\n", "print(\"Healthcare catalog and medallion schemas (bronze, silver, gold) created successfully!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Bronze Layer: Raw Data Ingestion\n", "\n", "### Bronze Layer Purpose\n", "\n", "The bronze layer serves as the raw data landing zone where data is ingested from source systems **as-is**, without any transformations. This preserves data integrity and enables:\n", "\n", "- **Data lineage**: Complete audit trail from source to consumption\n", "- **Reprocessing**: Ability to reprocess data if business rules change\n", "- **Compliance**: Raw data retention for regulatory requirements\n", "\n", "### Table Design: bronze.patient_diagnoses_raw\n", "\n", "Our bronze table stores raw patient diagnosis data with minimal structure:\n", "\n", "- **patient_id**: Raw patient identifier from source system\n", "- **diagnosis_date**: Raw date string (various formats possible)\n", "- **diagnosis_code**: Raw diagnosis code (may include formatting inconsistencies)\n", "- **diagnosis_description**: Raw description text\n", "- **severity_level**: Raw severity indicator\n", "- **treating_physician**: Raw physician identifier\n", "- **facility_id**: Raw facility identifier\n", "- **ingestion_timestamp**: When data was ingested\n", "\n", "### Clustering Strategy\n", "\n", "Cluster by `ingestion_timestamp` and `patient_id` to optimize for:\n", "- Time-based data processing and incremental loads\n", "- Patient-centric queries during data validation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Bronze layer table created successfully!\n", "Clustering will optimize for time-based ingestion and patient-centric queries.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create Bronze Layer Delta table with liquid clustering\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS healthcare.bronze.patient_diagnoses_raw (\n", " patient_id STRING,\n", " diagnosis_date STRING, -- Raw date string, various formats possible\n", " diagnosis_code STRING,\n", " diagnosis_description STRING,\n", " severity_level STRING,\n", " treating_physician STRING,\n", " facility_id STRING,\n", " ingestion_timestamp TIMESTAMP -- When data was ingested\n", ")\n", "USING DELTA\n", "CLUSTER BY (ingestion_timestamp, patient_id)\n", "\"\"\")\n", "\n", "print(\"Bronze layer table created successfully!\")\n", "print(\"Clustering will optimize for time-based ingestion and patient-centric queries.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Generate and Ingest Raw Healthcare Data\n", "\n", "#### Raw Data Characteristics\n", "\n", "Raw data may contain:\n", "- **Inconsistent formatting**: Different date formats, case variations\n", "- **Missing values**: Null or empty fields\n", "- **Data quality issues**: Typos, duplicates, invalid codes\n", "- **Multiple sources**: Different systems with varying schemas\n", "\n", "#### Data Generation Strategy\n", "\n", "We'll simulate realistic raw healthcare data with some quality issues that would typically be found in bronze layer data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Generated 4945 raw patient diagnosis records\n", "Raw data includes formatting inconsistencies, missing values, and data quality issues\n", "Sample raw record: {'patient_id': 'PAT0001', 'diagnosis_date': '2024/07/19', 'diagnosis_code': 'J45.909', 'diagnosis_description': 'Unspecified asthma, uncomplicated', 'severity_level': 'Medium', 'treating_physician': 'dr_johnson', 'facility_id': 'CLINIC001', 'ingestion_timestamp': datetime.datetime(2025, 12, 20, 0, 15, 15, 772467)}\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Generate realistic raw healthcare diagnosis data with quality issues\n", "\n", "import random\n", "from datetime import datetime, timedelta\n", "\n", "# Define healthcare data with some inconsistencies (bronze layer characteristics)\n", "DIAGNOSES_RAW = [\n", " (\"E11.9\", \"Type 2 diabetes mellitus without complications\", \"Medium\"),\n", " (\"e11.9\", \"type 2 diabetes mellitus without complications\", \"medium\"), # Case inconsistency\n", " (\"I10\", \"Essential hypertension\", \"High\"),\n", " (\"i10\", \"essential hypertension\", \"high\"), # Case inconsistency\n", " (\"J45.909\", \"Unspecified asthma, uncomplicated\", \"Medium\"),\n", " (\"M54.5\", \"Low back pain\", \"Low\"),\n", " (\"N39.0\", \"Urinary tract infection, site not specified\", \"Medium\"),\n", " (\"Z51.11\", \"Encounter for antineoplastic chemotherapy\", \"Critical\"),\n", " (\"I25.10\", \"Atherosclerotic heart disease of native coronary artery without angina pectoris\", \"High\"),\n", " (\"F41.9\", \"Anxiety disorder, unspecified\", \"Medium\"),\n", " (\"\", \"\", \"\"), # Some missing values\n", " (\"INVALID\", \"Invalid diagnosis\", \"Unknown\") # Invalid data\n", "]\n", "\n", "FACILITIES_RAW = [\"HOSP001\", \"hosp002\", \"CLINIC001\", \"clinic002\", \"URGENT001\", \"\"] # Case and missing\n", "PHYSICIANS_RAW = [\"DR_SMITH\", \"dr_johnson\", \"DR_WILLIAMS\", \"dr_brown\", \"DR_JONES\", \"dr_garcia\", \"\", \"UNKNOWN\"]\n", "\n", "# Different date formats to simulate raw data\n", "DATE_FORMATS = [\"%Y-%m-%d\", \"%m/%d/%Y\", \"%d-%b-%Y\", \"%Y/%m/%d\"]\n", "\n", "# Generate raw patient diagnosis records\n", "raw_patient_data = []\n", "base_date = datetime(2024, 1, 1)\n", "ingestion_time = datetime.now()\n", "\n", "# Create 1,000 patients with 2-8 diagnoses each, including some data quality issues\n", "for patient_num in range(1, 1001):\n", " patient_id = f\"PAT{patient_num:04d}\"\n", " \n", " # Each patient gets 2-8 diagnoses over 12 months\n", " num_diagnoses = random.randint(2, 8)\n", " \n", " for i in range(num_diagnoses):\n", " # Spread diagnoses over 12 months\n", " days_offset = random.randint(0, 365)\n", " diagnosis_date_obj = base_date + timedelta(days=days_offset)\n", " \n", " # Random date format to simulate raw data inconsistency\n", " date_format = random.choice(DATE_FORMATS)\n", " diagnosis_date_str = diagnosis_date_obj.strftime(date_format)\n", " \n", " # Select random diagnosis (including some with quality issues)\n", " diagnosis_code, description, severity = random.choice(DIAGNOSES_RAW)\n", " \n", " # Select random facility and physician (including inconsistencies)\n", " facility = random.choice(FACILITIES_RAW)\n", " physician = random.choice(PHYSICIANS_RAW)\n", " \n", " # Occasionally introduce missing values (bronze layer realism)\n", " if random.random() < 0.05: # 5% chance of missing data\n", " diagnosis_code = None if random.random() < 0.5 else diagnosis_code\n", " severity = None if random.random() < 0.5 else severity\n", " \n", " raw_patient_data.append({\n", " \"patient_id\": patient_id,\n", " \"diagnosis_date\": diagnosis_date_str,\n", " \"diagnosis_code\": diagnosis_code,\n", " \"diagnosis_description\": description,\n", " \"severity_level\": severity,\n", " \"treating_physician\": physician,\n", " \"facility_id\": facility,\n", " \"ingestion_timestamp\": ingestion_time\n", " })\n", "\n", "print(f\"Generated {len(raw_patient_data)} raw patient diagnosis records\")\n", "print(\"Raw data includes formatting inconsistencies, missing values, and data quality issues\")\n", "print(\"Sample raw record:\", raw_patient_data[0])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Bronze Layer DataFrame Schema:\n", "root\n", " |-- diagnosis_code: string (nullable = true)\n", " |-- diagnosis_date: string (nullable = true)\n", " |-- diagnosis_description: string (nullable = true)\n", " |-- facility_id: string (nullable = true)\n", " |-- ingestion_timestamp: timestamp (nullable = true)\n", " |-- patient_id: string (nullable = true)\n", " |-- severity_level: string (nullable = true)\n", " |-- treating_physician: string (nullable = true)\n", "\n", "\n", "Sample Raw Data:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+--------------+--------------+---------------------+-----------+--------------------+----------+--------------+------------------+\n", "|diagnosis_code|diagnosis_date|diagnosis_description|facility_id| ingestion_timestamp|patient_id|severity_level|treating_physician|\n", "+--------------+--------------+---------------------+-----------+--------------------+----------+--------------+------------------+\n", "| J45.909| 2024/07/19| Unspecified asthm...| CLINIC001|2025-12-20 00:15:...| PAT0001| Medium| dr_johnson|\n", "| E11.9| 08/22/2024| Type 2 diabetes m...| hosp002|2025-12-20 00:15:...| PAT0001| Medium| |\n", "| I10| 15-Jan-2024| Essential hyperte...| CLINIC001|2025-12-20 00:15:...| PAT0002| High| dr_brown|\n", "| F41.9| 16-Aug-2024| Anxiety disorder,...| CLINIC001|2025-12-20 00:15:...| PAT0002| Medium| DR_JONES|\n", "| J45.909| 03/11/2024| Unspecified asthm...| CLINIC001|2025-12-20 00:15:...| PAT0002| Medium| DR_SMITH|\n", "+--------------+--------------+---------------------+-----------+--------------------+----------+--------------+------------------+\n", "only showing top 5 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Successfully inserted 4945 raw records into healthcare.bronze.patient_diagnoses_raw\n", "Bronze layer preserves raw data as-is for auditability and reprocessing.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Insert raw data into Bronze layer\n", "\n", "# Create DataFrame from raw generated data\n", "df_bronze = spark.createDataFrame(raw_patient_data)\n", "\n", "# Display schema and sample data\n", "print(\"Bronze Layer DataFrame Schema:\")\n", "df_bronze.printSchema()\n", "\n", "print(\"\\nSample Raw Data:\")\n", "df_bronze.show(5)\n", "\n", "# Insert data into Bronze Delta table\n", "df_bronze.write.mode(\"overwrite\").saveAsTable(\"healthcare.bronze.patient_diagnoses_raw\")\n", "\n", "print(f\"\\nSuccessfully inserted {df_bronze.count()} raw records into healthcare.bronze.patient_diagnoses_raw\")\n", "print(\"Bronze layer preserves raw data as-is for auditability and reprocessing.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Silver Layer: Data Cleaning and Standardization\n", "\n", "### Silver Layer Purpose\n", "\n", "The silver layer transforms raw bronze data into clean, standardized, and enriched datasets suitable for analytics:\n", "\n", "- **Data Quality**: Cleansing, standardization, and validation\n", "- **Normalization**: Consistent formats, units, and naming conventions\n", "- **Enrichment**: Adding derived fields, lookups, and business rules\n", "- **Deduplication**: Removing duplicates and handling conflicts\n", "\n", "### Table Design: silver.patient_diagnoses_clean\n", "\n", "Our silver table includes cleaned and enriched fields:\n", "\n", "- **patient_id**: Standardized patient identifier\n", "- **diagnosis_date**: Properly formatted DATE type\n", "- **diagnosis_code**: Validated and standardized ICD-10 codes\n", "- **diagnosis_description**: Cleaned and standardized text\n", "- **severity_level**: Standardized severity categories\n", "- **treating_physician**: Validated physician identifiers\n", "- **facility_id**: Standardized facility codes\n", "- **is_valid_record**: Data quality flag\n", "- **processing_timestamp**: When record was processed\n", "\n", "### Clustering Strategy\n", "\n", "Cluster by `patient_id` and `diagnosis_date` for optimal query performance on:\n", "- Patient journey analysis\n", "- Time-based healthcare analytics\n", "- Physician and facility performance metrics" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Silver layer table created successfully!\n", "Clustering optimizes for patient-centric and time-based analytics.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create Silver Layer Delta table with liquid clustering\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS healthcare.silver.patient_diagnoses_clean (\n", " patient_id STRING,\n", " diagnosis_date DATE,\n", " diagnosis_code STRING,\n", " diagnosis_description STRING,\n", " severity_level STRING,\n", " treating_physician STRING,\n", " facility_id STRING,\n", " is_valid_record BOOLEAN,\n", " data_quality_score DOUBLE,\n", " processing_timestamp TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (patient_id, diagnosis_date)\n", "\"\"\")\n", "\n", "print(\"Silver layer table created successfully!\")\n", "print(\"Clustering optimizes for patient-centric and time-based analytics.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Transform bronze data to silver layer with cleaning and standardization\n", "\n", "from pyspark.sql.functions import *\n", "from pyspark.sql.types import DateType\n", "\n", "# Read bronze data\n", "bronze_df = spark.table(\"healthcare.bronze.patient_diagnoses_raw\")\n", "\n", "# Data cleaning and standardization transformations\n", "silver_df = bronze_df.withColumn(\n", " \"diagnosis_date\",\n", " coalesce(\n", " to_date(\"diagnosis_date\", \"yyyy-MM-dd\"),\n", " to_date(\"diagnosis_date\", \"MM/dd/yyyy\"),\n", " to_date(\"diagnosis_date\", \"dd-MMM-yyyy\"),\n", " to_date(\"diagnosis_date\", \"yyyy/MM/dd\")\n", " )\n", ").withColumn(\n", " \"diagnosis_code\",\n", " upper(trim(\"diagnosis_code\"))\n", ").withColumn(\n", " \"diagnosis_description\",\n", " initcap(trim(\"diagnosis_description\"))\n", ").withColumn(\n", " \"severity_level\",\n", " when(upper(trim(\"severity_level\")).isin([\"CRITICAL\", \"HIGH\", \"MEDIUM\", \"LOW\"]), \n", " initcap(trim(\"severity_level\")))\n", " .otherwise(\"Unknown\")\n", ").withColumn(\n", " \"treating_physician\",\n", " when(trim(\"treating_physician\") != \"\", upper(trim(\"treating_physician\")))\n", " .otherwise(\"UNKNOWN\")\n", ").withColumn(\n", " \"facility_id\",\n", " when(trim(\"facility_id\") != \"\", upper(trim(\"facility_id\")))\n", " .otherwise(\"UNKNOWN\")\n", ").withColumn(\n", " \"is_valid_record\",\n", " (col(\"patient_id\").isNotNull()) & \n", " (col(\"diagnosis_date\").isNotNull()) & \n", " (col(\"diagnosis_code\").isNotNull()) &\n", " (length(trim(\"diagnosis_code\")) > 0)\n", ").withColumn(\n", " \"data_quality_score\",\n", " (when(col(\"patient_id\").isNotNull(), 0.2).otherwise(0) +\n", " when(col(\"diagnosis_date\").isNotNull(), 0.2).otherwise(0) +\n", " when(col(\"diagnosis_code\").isNotNull() & (length(trim(\"diagnosis_code\")) > 0), 0.2).otherwise(0) +\n", " when(col(\"severity_level\") != \"Unknown\", 0.2).otherwise(0) +\n", " when(col(\"treating_physician\") != \"UNKNOWN\", 0.2).otherwise(0))\n", ").withColumn(\n", " \"processing_timestamp\",\n", " current_timestamp()\n", ").filter(\n", " col(\"is_valid_record\") == True # Only keep valid records in silver layer\n", ")\n", "\n", "# Remove duplicates based on patient_id, diagnosis_date, diagnosis_code\n", "silver_df = silver_df.dropDuplicates([\"patient_id\", \"diagnosis_date\", \"diagnosis_code\"])\n", "\n", "# Insert cleaned data into silver layer\n", "silver_df.write.mode(\"overwrite\").saveAsTable(\"healthcare.silver.patient_diagnoses_clean\")\n", "\n", "print(f\"Successfully processed {silver_df.count()} clean records into healthcare.silver.patient_diagnoses_clean\")\n", "print(\"Silver layer provides standardized, validated, and enriched data for analytics.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Validate silver layer data quality improvements\n", "\n", "print(\"=== Silver Layer Data Quality Validation ===\")\n", "\n", "# Compare bronze vs silver data quality\n", "bronze_count = spark.table(\"healthcare.bronze.patient_diagnoses_raw\").count()\n", "silver_count = spark.table(\"healthcare.silver.patient_diagnoses_clean\").count()\n", "\n", "print(f\"Bronze layer records: {bronze_count}\")\n", "print(f\"Silver layer records: {silver_count}\")\n", "print(f\"Data quality improvement: {((silver_count/bronze_count)*100):.1f}% valid records retained\")\n", "\n", "# Show data quality distribution\n", "quality_distribution = spark.table(\"healthcare.silver.patient_diagnoses_clean\").groupBy(\"data_quality_score\").count().orderBy(\"data_quality_score\")\n", "quality_distribution.show()\n", "\n", "# Sample cleaned records\n", "print(\"\\nSample Cleaned Records:\")\n", "spark.table(\"healthcare.silver.patient_diagnoses_clean\").show(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Gold Layer: Business Analytics and ML-Ready Data\n", "\n", "### Gold Layer Purpose\n", "\n", "The gold layer contains business-ready datasets optimized for:\n", "\n", "- **Analytics Dashboards**: Aggregated metrics and KPIs\n", "- **Reporting**: Standardized business views\n", "- **Machine Learning**: Feature-engineered datasets\n", "- **Downstream Applications**: Clean, fast-access data\n", "\n", "### Gold Layer Tables\n", "\n", "We'll create multiple gold tables:\n", "\n", "1. **gold.patient_summary**: Patient-level aggregates\n", "2. **gold.diagnosis_analytics**: Diagnosis patterns and trends\n", "3. **gold.facility_performance**: Healthcare facility metrics\n", "4. **gold.patient_readmission_features**: ML-ready features for readmission prediction\n", "\n", "### Clustering Strategies\n", "\n", "- Patient summary: Cluster by `patient_id`\n", "- Diagnosis analytics: Cluster by `diagnosis_code`, `month`\n", "- Facility performance: Cluster by `facility_id`, `month`\n", "- ML features: Cluster by `patient_id`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create Gold Layer tables with liquid clustering\n", "\n", "# Patient summary table\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS healthcare.gold.patient_summary (\n", " patient_id STRING,\n", " total_diagnoses INT,\n", " unique_diagnoses INT,\n", " first_diagnosis_date DATE,\n", " last_diagnosis_date DATE,\n", " patient_tenure_days INT,\n", " avg_severity_score DOUBLE,\n", " facilities_used INT,\n", " physicians_seen INT,\n", " active_months INT,\n", " high_severity_flag BOOLEAN,\n", " complex_case_flag BOOLEAN,\n", " last_update_timestamp TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (patient_id)\n", "\"\"\")\n", "\n", "# Diagnosis analytics table\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS healthcare.gold.diagnosis_analytics (\n", " diagnosis_code STRING,\n", " diagnosis_description STRING,\n", " month STRING,\n", " diagnosis_count INT,\n", " unique_patients INT,\n", " avg_severity_score DOUBLE,\n", " critical_case_ratio DOUBLE,\n", " facility_count INT,\n", " physician_count INT\n", ")\n", "USING DELTA\n", "CLUSTER BY (diagnosis_code, month)\n", "\"\"\")\n", "\n", "# Facility performance table\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS healthcare.gold.facility_performance (\n", " facility_id STRING,\n", " month STRING,\n", " total_diagnoses INT,\n", " unique_patients INT,\n", " unique_physicians INT,\n", " avg_severity_score DOUBLE,\n", " critical_case_count INT,\n", " patient_satisfaction_proxy DOUBLE,\n", " efficiency_score DOUBLE\n", ")\n", "USING DELTA\n", "CLUSTER BY (facility_id, month)\n", "\"\"\")\n", "\n", "# ML-ready readmission features table\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS healthcare.gold.patient_readmission_features (\n", " patient_id STRING,\n", " total_diagnoses INT,\n", " unique_diagnoses INT,\n", " avg_severity_score DOUBLE,\n", " facilities_used INT,\n", " physicians_seen INT,\n", " active_months INT,\n", " days_since_last_visit INT,\n", " patient_tenure_days INT,\n", " avg_days_between_visits DOUBLE,\n", " high_visit_frequency BOOLEAN,\n", " complex_case BOOLEAN,\n", " high_severity_patient BOOLEAN,\n", " readmission_risk_label INT,\n", " feature_timestamp TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (patient_id)\n", "\"\"\")\n", "\n", "print(\"Gold layer tables created successfully!\")\n", "print(\"Each table is optimized with liquid clustering for specific query patterns.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Populate Gold Layer: Patient Summary\n", "\n", "from pyspark.sql.functions import *\n", "\n", "# Read silver layer data\n", "silver_df = spark.table(\"healthcare.silver.patient_diagnoses_clean\")\n", "\n", "# Create patient summary aggregates\n", "patient_summary_df = silver_df.groupBy(\"patient_id\").agg(\n", " count(\"*\").alias(\"total_diagnoses\"),\n", " countDistinct(\"diagnosis_code\").alias(\"unique_diagnoses\"),\n", " min(\"diagnosis_date\").alias(\"first_diagnosis_date\"),\n", " max(\"diagnosis_date\").alias(\"last_diagnosis_date\"),\n", " datediff(max(\"diagnosis_date\"), min(\"diagnosis_date\")).alias(\"patient_tenure_days\"),\n", " round(avg(\n", " when(col(\"severity_level\") == \"Critical\", 1.0)\n", " .when(col(\"severity_level\") == \"High\", 0.75)\n", " .when(col(\"severity_level\") == \"Medium\", 0.5)\n", " .otherwise(0.25)\n", " ), 3).alias(\"avg_severity_score\"),\n", " countDistinct(\"facility_id\").alias(\"facilities_used\"),\n", " countDistinct(\"treating_physician\").alias(\"physicians_seen\"),\n", " countDistinct(date_format(\"diagnosis_date\", \"yyyy-MM\")).alias(\"active_months\"),\n", " (avg(\n", " when(col(\"severity_level\") == \"Critical\", 1.0)\n", " .when(col(\"severity_level\") == \"Medium\", 0.5)\n", " .otherwise(0.25)\n", " ) > 0.6).alias(\"high_severity_flag\"),\n", " (countDistinct(\"diagnosis_code\") > 4).alias(\"complex_case_flag\"),\n", " current_timestamp().alias(\"last_update_timestamp\")\n", ").orderBy(\"patient_id\")\n", "\n", "# Insert into gold layer\n", "patient_summary_df.write.mode(\"overwrite\").saveAsTable(\"healthcare.gold.patient_summary\")\n", "\n", "print(f\"Created patient summaries for {patient_summary_df.count()} patients\")\n", "print(\"Patient summary provides consolidated view of patient healthcare journey.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Populate Gold Layer: Diagnosis Analytics\n", "\n", "# Create diagnosis analytics by code and month\n", "diagnosis_analytics_df = silver_df.withColumn(\"month\", date_format(\"diagnosis_date\", \"yyyy-MM\"))\n", "diagnosis_analytics_df = diagnosis_analytics_df.groupBy(\"diagnosis_code\", \"diagnosis_description\", \"month\").agg(\n", " count(\"*\").alias(\"diagnosis_count\"),\n", " countDistinct(\"patient_id\").alias(\"unique_patients\"),\n", " round(avg(\n", " when(col(\"severity_level\") == \"Critical\", 1.0)\n", " .when(col(\"severity_level\") == \"High\", 0.75)\n", " .when(col(\"severity_level\") == \"Medium\", 0.5)\n", " .otherwise(0.25)\n", " ), 3).alias(\"avg_severity_score\"),\n", " round(avg(\n", " when(col(\"severity_level\") == \"Critical\", 1.0)\n", " .otherwise(0.0)\n", " ), 3).alias(\"critical_case_ratio\"),\n", " countDistinct(\"facility_id\").alias(\"facility_count\"),\n", " countDistinct(\"treating_physician\").alias(\"physician_count\")\n", ").orderBy(\"diagnosis_code\", \"month\")\n", "\n", "# Insert into gold layer\n", "diagnosis_analytics_df.write.mode(\"overwrite\").saveAsTable(\"healthcare.gold.diagnosis_analytics\")\n", "\n", "print(f\"Created diagnosis analytics for {diagnosis_analytics_df.count()} diagnosis-month combinations\")\n", "print(\"Diagnosis analytics enables healthcare trend analysis and resource planning.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Populate Gold Layer: Facility Performance\n", "\n", "# Create facility performance metrics by facility and month\n", "facility_df = silver_df.withColumn(\"month\", date_format(\"diagnosis_date\", \"yyyy-MM\"))\n", "facility_performance_df = facility_df.groupBy(\"facility_id\", \"month\").agg(\n", " count(\"*\").alias(\"total_diagnoses\"),\n", " countDistinct(\"patient_id\").alias(\"unique_patients\"),\n", " countDistinct(\"treating_physician\").alias(\"unique_physicians\"),\n", " round(avg(\n", " when(col(\"severity_level\") == \"Critical\", 1.0)\n", " .when(col(\"severity_level\") == \"High\", 0.75)\n", " .when(col(\"severity_level\") == \"Medium\", 0.5)\n", " .otherwise(0.25)\n", " ), 3).alias(\"avg_severity_score\"),\n", " sum(when(col(\"severity_level\") == \"Critical\", 1).otherwise(0)).alias(\"critical_case_count\"),\n", " # Patient satisfaction proxy (inverse of severity and case complexity)\n", " round(1.0 - avg(\n", " when(col(\"severity_level\") == \"Critical\", 1.0)\n", " .when(col(\"severity_level\") == \"High\", 0.75)\n", " .when(col(\"severity_level\") == \"Medium\", 0.5)\n", " .otherwise(0.25)\n", " ), 3).alias(\"patient_satisfaction_proxy\"),\n", " # Efficiency score (diagnoses per physician)\n", " round(count(\"*\") / countDistinct(\"treating_physician\"), 2).alias(\"efficiency_score\")\n", ").orderBy(\"facility_id\", \"month\")\n", "\n", "# Insert into gold layer\n", "facility_performance_df.write.mode(\"overwrite\").saveAsTable(\"healthcare.gold.facility_performance\")\n", "\n", "print(f\"Created facility performance metrics for {facility_performance_df.count()} facility-month combinations\")\n", "print(\"Facility performance enables operational analytics and quality monitoring.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Created ML-ready features for 998 patients\n", "ML features enable predictive analytics for patient readmission risk.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Populate Gold Layer: ML-Ready Readmission Features\n", "\n", "# Use pure SQL to avoid window function issues - create temporary views first\n", "silver_df.createOrReplaceTempView(\"silver_diagnoses\")\n", "\n", "# Calculate patient-level features using SQL with proper subquery structure\n", "ml_features_sql = \"\"\"\n", "SELECT \n", " p.patient_id,\n", " p.total_diagnoses,\n", " p.unique_diagnoses,\n", " ROUND(p.avg_severity_score, 3) as avg_severity_score,\n", " p.facilities_used,\n", " p.physicians_seen,\n", " p.active_months,\n", " p.days_since_last_visit,\n", " p.patient_tenure_days,\n", " COALESCE(v.avg_days_between_visits, 30) as avg_days_between_visits,\n", " CASE WHEN p.total_diagnoses > 6 THEN 1 ELSE 0 END as high_visit_frequency,\n", " CASE WHEN p.unique_diagnoses > 4 THEN 1 ELSE 0 END as complex_case,\n", " CASE WHEN p.avg_severity_score > 0.6 THEN 1 ELSE 0 END as high_severity_patient,\n", " CASE WHEN \n", " p.total_diagnoses > 6 OR \n", " p.unique_diagnoses > 4 OR \n", " p.avg_severity_score > 0.6 OR\n", " p.facilities_used > 2\n", " THEN 1 ELSE 0 END as readmission_risk_label,\n", " CURRENT_TIMESTAMP() as feature_timestamp\n", "FROM (\n", " -- Patient-level aggregates\n", " SELECT \n", " patient_id,\n", " COUNT(*) as total_diagnoses,\n", " COUNT(DISTINCT diagnosis_code) as unique_diagnoses,\n", " AVG(CASE \n", " WHEN severity_level = 'Critical' THEN 1.0\n", " WHEN severity_level = 'High' THEN 0.75\n", " WHEN severity_level = 'Medium' THEN 0.5\n", " ELSE 0.25\n", " END) as avg_severity_score,\n", " COUNT(DISTINCT facility_id) as facilities_used,\n", " COUNT(DISTINCT treating_physician) as physicians_seen,\n", " COUNT(DISTINCT DATE_FORMAT(diagnosis_date, 'yyyy-MM')) as active_months,\n", " DATEDIFF(CURRENT_DATE(), MAX(diagnosis_date)) as days_since_last_visit,\n", " DATEDIFF(MAX(diagnosis_date), MIN(diagnosis_date)) as patient_tenure_days\n", " FROM silver_diagnoses\n", " GROUP BY patient_id\n", ") p\n", "LEFT JOIN (\n", " -- Average days between visits\n", " SELECT \n", " patient_id,\n", " ROUND(AVG(days_between_visits), 2) as avg_days_between_visits\n", " FROM (\n", " SELECT \n", " patient_id,\n", " diagnosis_date,\n", " LAG(diagnosis_date) OVER (PARTITION BY patient_id ORDER BY diagnosis_date) as prev_date,\n", " DATEDIFF(diagnosis_date, LAG(diagnosis_date) OVER (PARTITION BY patient_id ORDER BY diagnosis_date)) as days_between_visits\n", " FROM silver_diagnoses\n", " ) \n", " WHERE days_between_visits IS NOT NULL\n", " GROUP BY patient_id\n", ") v ON p.patient_id = v.patient_id\n", "\"\"\"\n", "\n", "# Execute the SQL query\n", "ml_features_df = spark.sql(ml_features_sql)\n", "\n", "# Insert into gold layer\n", "ml_features_df.write.mode(\"overwrite\").saveAsTable(\"healthcare.gold.patient_readmission_features\")\n", "\n", "print(f\"Created ML-ready features for {ml_features_df.count()} patients\")\n", "print(\"ML features enable predictive analytics for patient readmission risk.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 7: Demonstrate Gold Layer Analytics\n", "\n", "### Gold Layer Benefits\n", "\n", "The gold layer provides optimized access to business-critical insights:\n", "\n", "- **Fast Analytics**: Pre-aggregated data for dashboards\n", "- **Consistent Reporting**: Standardized business metrics\n", "- **Predictive Insights**: ML-ready feature sets\n", "- **Operational Intelligence**: Real-time performance monitoring" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Gold Layer Analytics Demonstration ===\n", "\n", "Patient Summary Analytics:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+---------------+----------------+------------------+------------------+-----------------+\n", "|patient_id|total_diagnoses|unique_diagnoses|avg_severity_score|high_severity_flag|complex_case_flag|\n", "+----------+---------------+----------------+------------------+------------------+-----------------+\n", "| PAT0001| 2| 2| 0.5| false| false|\n", "| PAT0002| 6| 5| 0.708| false| true|\n", "| PAT0003| 8| 5| 0.656| false| true|\n", "| PAT0004| 2| 2| 0.625| false| false|\n", "| PAT0005| 3| 3| 0.583| false| false|\n", "+----------+---------------+----------------+------------------+------------------+-----------------+\n", "only showing top 5 rows\n", "\n", "\n", "Diagnosis Analytics:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+--------------+-------+---------------+---------------+-------------------+\n", "|diagnosis_code| month|diagnosis_count|unique_patients|critical_case_ratio|\n", "+--------------+-------+---------------+---------------+-------------------+\n", "| I10|2024-10| 89| 86| 0.0|\n", "| E11.9|2024-08| 88| 87| 0.0|\n", "| I10|2024-03| 73| 70| 0.0|\n", "| I10|2024-06| 72| 69| 0.0|\n", "| E11.9|2024-03| 70| 68| 0.0|\n", "+--------------+-------+---------------+---------------+-------------------+\n", "only showing top 5 rows\n", "\n", "\n", "Facility Performance:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+-------+---------------+------------------+----------------+\n", "|facility_id| month|total_diagnoses|avg_severity_score|efficiency_score|\n", "+-----------+-------+---------------+------------------+----------------+\n", "| CLINIC001|2024-12| 78| 0.548| 11.14|\n", "| HOSP001|2024-05| 76| 0.576| 10.86|\n", "| URGENT001|2024-02| 75| 0.553| 10.71|\n", "| URGENT001|2024-10| 75| 0.6| 10.71|\n", "| CLINIC001|2024-03| 74| 0.544| 10.57|\n", "+-----------+-------+---------------+------------------+----------------+\n", "only showing top 5 rows\n", "\n", "\n", "ML Features Preview:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+---------------+------------------+----------------------+\n", "|patient_id|total_diagnoses|avg_severity_score|readmission_risk_label|\n", "+----------+---------------+------------------+----------------------+\n", "| PAT0523| 5| 0.400| 1|\n", "| PAT0080| 5| 0.700| 1|\n", "| PAT0804| 5| 0.450| 1|\n", "| PAT0394| 4| 0.500| 1|\n", "| PAT0320| 1| 0.500| 0|\n", "+----------+---------------+------------------+----------------------+\n", "only showing top 5 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Demonstrate Gold Layer analytics capabilities\n", "\n", "print(\"=== Gold Layer Analytics Demonstration ===\")\n", "\n", "# Patient summary analytics\n", "print(\"\\nPatient Summary Analytics:\")\n", "patient_summary = spark.table(\"healthcare.gold.patient_summary\")\n", "patient_summary.select(\n", " \"patient_id\", \"total_diagnoses\", \"unique_diagnoses\", \n", " \"avg_severity_score\", \"high_severity_flag\", \"complex_case_flag\"\n", ").show(5)\n", "\n", "# Diagnosis analytics\n", "print(\"\\nDiagnosis Analytics:\")\n", "diagnosis_analytics = spark.table(\"healthcare.gold.diagnosis_analytics\")\n", "diagnosis_analytics.select(\n", " \"diagnosis_code\", \"month\", \"diagnosis_count\", \n", " \"unique_patients\", \"critical_case_ratio\"\n", ").orderBy(desc(\"diagnosis_count\")).show(5)\n", "\n", "# Facility performance\n", "print(\"\\nFacility Performance:\")\n", "facility_performance = spark.table(\"healthcare.gold.facility_performance\")\n", "facility_performance.select(\n", " \"facility_id\", \"month\", \"total_diagnoses\", \n", " \"avg_severity_score\", \"efficiency_score\"\n", ").orderBy(desc(\"total_diagnoses\")).show(5)\n", "\n", "# ML features preview\n", "print(\"\\nML Features Preview:\")\n", "ml_features = spark.table(\"healthcare.gold.patient_readmission_features\")\n", "ml_features.select(\n", " \"patient_id\", \"total_diagnoses\", \"avg_severity_score\", \n", " \"readmission_risk_label\"\n", ").show(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 8: Train Patient Readmission Prediction Model\n", "\n", "### Machine Learning on Gold Layer Data\n", "\n", "Now we'll train a machine learning model using the ML-ready features from the gold layer. This demonstrates how the medallion architecture enables:\n", "\n", "- **Feature Engineering**: Pre-computed in gold layer\n", "- **Scalable Training**: Optimized data access patterns\n", "- **Model Governance**: Versioned feature sets\n", "- **Business Impact**: Direct connection to operational data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Training set: 837 patients\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Test set: 161 patients\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Train patient readmission prediction model using gold layer features\n", "\n", "from pyspark.ml.feature import VectorAssembler, StandardScaler\n", "from pyspark.ml.classification import RandomForestClassifier\n", "from pyspark.ml.evaluation import BinaryClassificationEvaluator\n", "from pyspark.ml import Pipeline\n", "\n", "# Read ML-ready features from gold layer\n", "ml_features_df = spark.table(\"healthcare.gold.patient_readmission_features\")\n", "\n", "# Prepare features for model training\n", "feature_cols = [\n", " \"total_diagnoses\", \"unique_diagnoses\", \"avg_severity_score\", \n", " \"facilities_used\", \"physicians_seen\", \"active_months\", \n", " \"days_since_last_visit\", \"patient_tenure_days\", \"avg_days_between_visits\",\n", " \"high_visit_frequency\", \"complex_case\", \"high_severity_patient\"\n", "]\n", "\n", "# Assemble features\n", "assembler = VectorAssembler(\n", " inputCols=feature_cols,\n", " outputCol=\"features\"\n", ")\n", "\n", "# Scale features\n", "scaler = StandardScaler(inputCol=\"features\", outputCol=\"scaled_features\")\n", "\n", "# Create Random Forest model\n", "rf = RandomForestClassifier(\n", " labelCol=\"readmission_risk_label\", \n", " featuresCol=\"scaled_features\",\n", " numTrees=100,\n", " maxDepth=10,\n", " seed=42\n", ")\n", "\n", "# Create pipeline\n", "pipeline = Pipeline(stages=[assembler, scaler, rf])\n", "\n", "# Split data\n", "train_data, test_data = ml_features_df.randomSplit([0.8, 0.2], seed=42)\n", "\n", "print(f\"Training set: {train_data.count()} patients\")\n", "print(f\"Test set: {test_data.count()} patients\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Training patient readmission prediction model...\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Model AUC: 1.0000\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+---------------+------------------+----------------------+----------+--------------------+\n", "|patient_id|total_diagnoses|avg_severity_score|readmission_risk_label|prediction| probability|\n", "+----------+---------------+------------------+----------------------+----------+--------------------+\n", "| PAT0003| 8| 0.656| 1| 1.0| [0.0,1.0]|\n", "| PAT0007| 4| 0.438| 1| 1.0| [0.0,1.0]|\n", "| PAT0009| 5| 0.650| 1| 1.0| [0.0,1.0]|\n", "| PAT0014| 3| 0.500| 1| 1.0|[0.00553571428571...|\n", "| PAT0020| 2| 0.250| 0| 0.0|[0.99980582524271...|\n", "| PAT0024| 5| 0.450| 1| 1.0| [0.0,1.0]|\n", "| PAT0030| 2| 0.750| 1| 1.0| [0.01,0.99]|\n", "| PAT0036| 8| 0.531| 1| 1.0| [0.0,1.0]|\n", "| PAT0046| 8| 0.688| 1| 1.0| [0.0,1.0]|\n", "| PAT0047| 4| 0.500| 1| 1.0| [0.01,0.99]|\n", "+----------+---------------+------------------+----------------------+----------+--------------------+\n", "only showing top 10 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------------------+----------+-----+\n", "|readmission_risk_label|prediction|count|\n", "+----------------------+----------+-----+\n", "| 0| 0.0| 21|\n", "| 1| 1.0| 140|\n", "+----------------------+----------+-----+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Train the model\n", "\n", "print(\"Training patient readmission prediction model...\")\n", "model = pipeline.fit(train_data)\n", "\n", "# Make predictions\n", "predictions = model.transform(test_data)\n", "\n", "# Evaluate model\n", "evaluator = BinaryClassificationEvaluator(labelCol=\"readmission_risk_label\", metricName=\"areaUnderROC\")\n", "auc = evaluator.evaluate(predictions)\n", "\n", "print(f\"Model AUC: {auc:.4f}\")\n", "\n", "# Show predictions\n", "predictions.select(\n", " \"patient_id\", \"total_diagnoses\", \"avg_severity_score\", \n", " \"readmission_risk_label\", \"prediction\", \"probability\"\n", ").show(10)\n", "\n", "# Confusion matrix\n", "confusion_matrix = predictions.groupBy(\"readmission_risk_label\", \"prediction\").count()\n", "confusion_matrix.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Feature Importance for Readmission Prediction ===\n", "total_diagnoses: 0.0767\n", "unique_diagnoses: 0.0393\n", "avg_severity_score: 0.2191\n", "facilities_used: 0.3970\n", "physicians_seen: 0.0132\n", "active_months: 0.0402\n", "days_since_last_visit: 0.0113\n", "patient_tenure_days: 0.0155\n", "avg_days_between_visits: 0.0157\n", "high_visit_frequency: 0.0004\n", "complex_case: 0.0027\n", "high_severity_patient: 0.1689\n", "\n", "=== Business Impact Analysis ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Total test patients: 161\n", "Patients predicted as high readmission risk: 140\n", "Percentage flagged for intervention: 87.0%\n", "\n", "Estimated cost per readmission: $15,000\n", "Estimated intervention success rate: 30%\n", "Potential readmissions prevented: 42\n", "Potential cost savings: $630,000\n", "Total intervention cost: $280,000\n", "Net savings: $350,000\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Model Performance:\n", "Accuracy: 1.0000\n", "Precision: 1.0000\n", "Recall: 1.0000\n", "AUC: 1.0000\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Model interpretation and business impact analysis\n", "\n", "# Feature importance\n", "rf_model = model.stages[-1]\n", "feature_importance = rf_model.featureImportances\n", "\n", "print(\"=== Feature Importance for Readmission Prediction ===\")\n", "for name, importance in zip(feature_cols, feature_importance):\n", " print(f\"{name}: {importance:.4f}\")\n", "\n", "print(\"\\n=== Business Impact Analysis ===\")\n", "\n", "# Calculate potential impact\n", "high_risk_predictions = predictions.filter(\"prediction = 1\")\n", "patients_at_risk = high_risk_predictions.count()\n", "total_test_patients = test_data.count()\n", "\n", "print(f\"Total test patients: {total_test_patients}\")\n", "print(f\"Patients predicted as high readmission risk: {patients_at_risk}\")\n", "print(f\"Percentage flagged for intervention: {(patients_at_risk/total_test_patients)*100:.1f}%\")\n", "\n", "# Cost savings potential\n", "avg_readmission_cost = 15000\n", "intervention_success_rate = 0.3\n", "avg_intervention_cost = 2000\n", "\n", "prevented_readmissions = patients_at_risk * intervention_success_rate\n", "cost_savings = prevented_readmissions * avg_readmission_cost\n", "total_intervention_cost = patients_at_risk * avg_intervention_cost\n", "net_savings = cost_savings - total_intervention_cost\n", "\n", "print(f\"\\nEstimated cost per readmission: ${avg_readmission_cost:,}\")\n", "print(f\"Estimated intervention success rate: {intervention_success_rate*100:.0f}%\")\n", "print(f\"Potential readmissions prevented: {prevented_readmissions:.0f}\")\n", "print(f\"Potential cost savings: ${cost_savings:,.0f}\")\n", "print(f\"Total intervention cost: ${total_intervention_cost:,.0f}\")\n", "print(f\"Net savings: ${net_savings:,.0f}\")\n", "\n", "# Model performance metrics\n", "accuracy = predictions.filter(\"readmission_risk_label = prediction\").count() / predictions.count()\n", "precision = predictions.filter(\"prediction = 1 AND readmission_risk_label = 1\").count() / predictions.filter(\"prediction = 1\").count() if predictions.filter(\"prediction = 1\").count() > 0 else 0\n", "recall = predictions.filter(\"prediction = 1 AND readmission_risk_label = 1\").count() / predictions.filter(\"readmission_risk_label = 1\").count() if predictions.filter(\"readmission_risk_label = 1\").count() > 0 else 0\n", "\n", "print(f\"\\nModel Performance:\")\n", "print(f\"Accuracy: {accuracy:.4f}\")\n", "print(f\"Precision: {precision:.4f}\")\n", "print(f\"Recall: {recall:.4f}\")\n", "print(f\"AUC: {auc:.4f}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Key Takeaways: Medallion Architecture with Delta Liquid Clustering\n", "\n", "### What We Demonstrated\n", "\n", "1. **Bronze Layer**: Raw data ingestion with data quality issues preserved for auditability\n", "2. **Silver Layer**: Data cleaning, standardization, and enrichment with validation\n", "3. **Gold Layer**: Business-ready aggregates, analytics, and ML-ready feature sets\n", "4. **Liquid Clustering**: Automatic optimization throughout all layers for query performance\n", "5. **ML Integration**: End-to-end pipeline from raw data to predictive models\n", "\n", "### Medallion Architecture Benefits\n", "\n", "- **Data Governance**: Clear separation of raw, cleaned, and business data\n", "- **Performance**: Each layer optimized for specific access patterns\n", "- **Maintainability**: Independent processing of each layer\n", "- **Scalability**: Supports growing data volumes and complexity\n", "- **Compliance**: Raw data retention with processed views for analytics\n", "\n", "### AIDP Advantages\n", "\n", "- **Unified Platform**: Seamless data processing from ingestion to ML\n", "- **Automatic Optimization**: Liquid clustering handles performance tuning\n", "- **Governance**: Catalog and schema isolation\n", "- **Integration**: Native ML capabilities with optimized data access\n", "\n", "### Healthcare Business Impact\n", "\n", "1. **Cost Reduction**: Prevent expensive readmissions through predictive analytics\n", "2. **Quality Improvement**: Better patient outcomes with data-driven interventions\n", "3. **Operational Efficiency**: Optimized resource allocation across facilities\n", "4. **Regulatory Compliance**: Comprehensive data lineage and auditability\n", "5. **Patient Satisfaction**: Proactive care reduces negative experiences\n", "\n", "### Best Practices for Medallion Architecture\n", "\n", "1. **Layer Separation**: Keep bronze raw, silver clean, gold business-ready\n", "2. **Clustering Strategy**: Choose clustering columns based on query patterns\n", "3. **Data Quality**: Implement validation and monitoring at each layer\n", "4. **Incremental Processing**: Support for incremental updates and reprocessing\n", "5. **Governance**: Document data lineage, quality rules, and business logic\n", "\n", "### Next Steps\n", "\n", "- Explore real-time processing with structured streaming\n", "- Implement data quality monitoring and alerting\n", "- Add more sophisticated ML models and feature engineering\n", "- Integrate with healthcare systems and EHR data\n", "- Deploy models for production readmission risk scoring\n", "\n", "This notebook demonstrates how Oracle AI Data Platform enables sophisticated healthcare analytics through the medallion architecture pattern, combining data engineering best practices with advanced ML capabilities." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" } }, "nbformat": 4, "nbformat_minor": 4 }