{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Telecommunications Medallion Architecture Demo\n", "\n", "## Overview\n", "\n", "This notebook demonstrates a **Medallion Architecture** implementation in Oracle AI Data Platform (AIDP) Workbench using a telecommunications analytics use case. The medallion architecture organizes data into three layers:\n", "\n", "- **Bronze Layer**: Raw data ingestion and storage\n", "- **Silver Layer**: Cleaned, transformed, and standardized data\n", "- **Gold Layer**: Aggregated, business-ready data and analytics\n", "\n", "The notebook also includes machine learning for customer churn prediction, showcasing how the medallion architecture supports advanced analytics.\n", "\n", "### What is Medallion Architecture?\n", "\n", "Medallion architecture provides a structured approach to data processing:\n", "\n", "- **Bronze**: Raw data as-is from source systems\n", "- **Silver**: Cleansed, deduplicated, and standardized data\n", "- **Gold**: Curated data ready for business intelligence and ML\n", "\n", "### Use Case: Telecommunications Analytics\n", "\n", "We'll analyze telecommunications network performance and customer usage data across all three layers, culminating in churn prediction modeling.\n", "\n", "### AIDP Environment Setup\n", "\n", "This notebook leverages the existing Spark session in your AIDP environment." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Bronze Layer: Raw Data Ingestion\n", "\n", "### Purpose\n", "- Store raw telecommunications data as-is\n", "- Provide data lake functionality\n", "- Enable reprocessing if needed\n", "\n", "### Schema Design\n", "- Raw network usage events\n", "- No transformations applied\n", "- Delta table with liquid clustering for performance" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Telecommunications catalog and bronze schema created successfully!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create telecommunications catalog and bronze schema\n", "\n", "spark.sql(\"CREATE CATALOG IF NOT EXISTS telecom\")\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS telecom.bronze\")\n", "\n", "print(\"Telecommunications catalog and bronze schema created successfully!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Bronze layer Delta table created successfully!\n", "Liquid clustering will optimize data layout for subscriber and time-based queries.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create bronze layer Delta table with liquid clustering\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS telecom.bronze.network_usage_raw (\n", " subscriber_id STRING,\n", " usage_date TIMESTAMP,\n", " service_type STRING,\n", " data_volume DECIMAL(10,3),\n", " call_duration DECIMAL(8,2),\n", " cell_tower_id STRING,\n", " signal_quality INT,\n", " ingestion_timestamp TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (subscriber_id, usage_date)\n", "\"\"\")\n", "\n", "print(\"Bronze layer Delta table created successfully!\")\n", "print(\"Liquid clustering will optimize data layout for subscriber and time-based queries.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Generated 599191 raw network usage records\n", "Sample record: {'subscriber_id': 'SUB00000001', 'usage_date': datetime.datetime(2024, 6, 21, 19, 52), 'service_type': 'Voice', 'data_volume': 0.0, 'call_duration': 12.51, 'cell_tower_id': 'TOWER_NYC_001', 'signal_quality': 82}\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Generate and insert raw telecommunications data\n", "\n", "import random\n", "from datetime import datetime, timedelta\n", "\n", "# Define telecommunications data constants\n", "SERVICE_TYPES = ['Voice', 'Data', 'SMS', 'Streaming']\n", "CELL_TOWERS = ['TOWER_NYC_001', 'TOWER_LAX_002', 'TOWER_CHI_003', 'TOWER_HOU_004', 'TOWER_MIA_005', 'TOWER_SFO_006', 'TOWER_SEA_007']\n", "\n", "# Base usage parameters by service type\n", "USAGE_PARAMS = {\n", " 'Voice': {'avg_duration': 5.0, 'frequency': 8, 'data_volume': 0.0},\n", " 'Data': {'avg_duration': 0.0, 'frequency': 15, 'data_volume': 0.5},\n", " 'SMS': {'avg_duration': 0.0, 'frequency': 12, 'data_volume': 0.0},\n", " 'Streaming': {'avg_duration': 0.0, 'frequency': 6, 'data_volume': 2.0}\n", "}\n", "\n", "# Generate raw network usage records (including some with potential data quality issues)\n", "usage_data = []\n", "base_date = datetime(2024, 1, 1)\n", "\n", "# Create 10,000 subscribers with 20-100 usage events each\n", "for subscriber_num in range(1, 10001):\n", " subscriber_id = f\"SUB{subscriber_num:08d}\"\n", " \n", " # Each subscriber gets 20-100 usage events over 12 months\n", " num_events = random.randint(20, 100)\n", " \n", " for i in range(num_events):\n", " # Spread usage events over 12 months\n", " days_offset = random.randint(0, 365)\n", " usage_date = base_date + timedelta(days=days_offset)\n", " \n", " # Add realistic timing\n", " hour_weights = [1, 1, 1, 1, 1, 2, 4, 6, 8, 7, 6, 8, 9, 8, 7, 6, 8, 9, 10, 8, 6, 4, 3, 2]\n", " hours_offset = random.choices(range(24), weights=hour_weights)[0]\n", " usage_date = usage_date.replace(hour=hours_offset, minute=random.randint(0, 59), second=0, microsecond=0)\n", " \n", " # Select service type\n", " service_type = random.choice(SERVICE_TYPES)\n", " params = USAGE_PARAMS[service_type]\n", " \n", " # Calculate usage metrics with variability (and occasional data quality issues)\n", " if service_type == 'Voice':\n", " duration_variation = random.uniform(0.3, 3.0)\n", " call_duration = round(params['avg_duration'] * duration_variation, 2)\n", " data_volume = 0.0\n", " elif service_type == 'Data':\n", " data_variation = random.uniform(0.1, 5.0)\n", " data_volume = round(params['data_volume'] * data_variation, 3)\n", " call_duration = 0.0\n", " elif service_type == 'SMS':\n", " data_volume = 0.0\n", " call_duration = 0.0\n", " else: # Streaming\n", " data_variation = random.uniform(0.5, 8.0)\n", " data_volume = round(params['data_volume'] * data_variation, 3)\n", " call_duration = 0.0\n", " \n", " # Select cell tower and signal quality\n", " cell_tower_id = random.choice(CELL_TOWERS)\n", " \n", " # Signal quality varies by tower and time (occasional nulls for data quality demo)\n", " if random.random() < 0.02: # 2% null values\n", " signal_quality = None\n", " else:\n", " base_signal = random.randint(60, 95)\n", " signal_variation = random.randint(-15, 5)\n", " signal_quality = max(0, min(100, base_signal + signal_variation))\n", " \n", " usage_data.append({\n", " \"subscriber_id\": subscriber_id,\n", " \"usage_date\": usage_date,\n", " \"service_type\": service_type,\n", " \"data_volume\": data_volume,\n", " \"call_duration\": call_duration,\n", " \"cell_tower_id\": cell_tower_id,\n", " \"signal_quality\": signal_quality\n", " })\n", "\n", "print(f\"Generated {len(usage_data)} raw network usage records\")\n", "print(\"Sample record:\", usage_data[0])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Bronze Layer DataFrame Schema:\n", "root\n", " |-- call_duration: double (nullable = true)\n", " |-- cell_tower_id: string (nullable = true)\n", " |-- data_volume: double (nullable = true)\n", " |-- service_type: string (nullable = true)\n", " |-- signal_quality: long (nullable = true)\n", " |-- subscriber_id: string (nullable = true)\n", " |-- usage_date: timestamp (nullable = true)\n", "\n", "\n", "Sample Bronze Data:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-------------+-------------+-----------+------------+--------------+-------------+-------------------+\n", "|call_duration|cell_tower_id|data_volume|service_type|signal_quality|subscriber_id| usage_date|\n", "+-------------+-------------+-----------+------------+--------------+-------------+-------------------+\n", "| 12.51|TOWER_NYC_001| 0.0| Voice| 82| SUB00000001|2024-06-21 19:52:00|\n", "| 0.0|TOWER_SFO_006| 0.0| SMS| 95| SUB00000001|2024-07-10 13:56:00|\n", "| 0.0|TOWER_HOU_004| 0.0| SMS| 70| SUB00000001|2024-10-07 08:58:00|\n", "| 0.0|TOWER_HOU_004| 11.144| Streaming| 64| SUB00000001|2024-05-18 20:56:00|\n", "| 0.0|TOWER_SEA_007| 12.774| Streaming| 83| SUB00000001|2024-12-12 13:59:00|\n", "+-------------+-------------+-----------+------------+--------------+-------------+-------------------+\n", "only showing top 5 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Bronze layer: Successfully ingested 599191 raw records\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Insert raw data into bronze layer\n", "\n", "df_bronze = spark.createDataFrame(usage_data)\n", "\n", "print(\"Bronze Layer DataFrame Schema:\")\n", "df_bronze.printSchema()\n", "\n", "print(\"\\nSample Bronze Data:\")\n", "df_bronze.show(5)\n", "\n", "# Insert into bronze table\n", "df_bronze.write.mode(\"overwrite\").saveAsTable(\"telecom.bronze.network_usage_raw\")\n", "\n", "bronze_count = spark.sql(\"SELECT COUNT(*) FROM telecom.bronze.network_usage_raw\").collect()[0][0]\n", "print(f\"\\nBronze layer: Successfully ingested {bronze_count} raw records\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Silver Layer: Data Cleaning and Standardization\n", "\n", "### Purpose\n", "- Clean and validate data quality\n", "- Standardize formats and handle missing values\n", "- Remove duplicates and apply business rules\n", "- Prepare data for downstream analytics\n", "\n", "### Transformations Applied\n", "- Handle missing signal_quality values\n", "- Standardize service types\n", "- Remove invalid data points\n", "- Add derived columns for analysis" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Silver schema created successfully!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create silver schema\n", "\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS telecom.silver\")\n", "print(\"Silver schema created successfully!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Silver layer Delta table created successfully!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create silver layer table with cleaned and standardized data\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS telecom.silver.network_usage_clean (\n", " subscriber_id STRING,\n", " usage_date TIMESTAMP,\n", " service_type STRING,\n", " data_volume DECIMAL(10,3),\n", " call_duration DECIMAL(8,2),\n", " cell_tower_id STRING,\n", " signal_quality INT,\n", " signal_category STRING,\n", " usage_hour INT,\n", " usage_day_of_week INT,\n", " is_business_hours BOOLEAN,\n", " processed_timestamp TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (subscriber_id, usage_date)\n", "\"\"\")\n", "\n", "print(\"Silver layer Delta table created successfully!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Silver Layer Transformations Applied:\n", "- Handled missing signal_quality values with tower averages\n", "- Added signal_category classification\n", "- Added temporal features (usage_hour, usage_day_of_week, is_business_hours)\n", "- Removed duplicates and invalid records\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Transform bronze data to silver layer\n", "\n", "from pyspark.sql.functions import *\n", "from pyspark.sql.window import Window\n", "\n", "# Read bronze data\n", "bronze_df = spark.table(\"telecom.bronze.network_usage_raw\")\n", "\n", "# Apply silver layer transformations\n", "silver_df = bronze_df \\\n", " .filter(\"subscriber_id IS NOT NULL\") \\\n", " .withColumn(\"signal_quality\", \n", " when(col(\"signal_quality\").isNull(), \n", " round(avg(\"signal_quality\").over(Window.partitionBy(\"cell_tower_id\")), 0).cast(\"int\")\n", " ).otherwise(col(\"signal_quality\"))) \\\n", " .withColumn(\"signal_category\",\n", " when(col(\"signal_quality\") >= 80, \"Excellent\")\n", " .when(col(\"signal_quality\") >= 60, \"Good\")\n", " .when(col(\"signal_quality\") >= 40, \"Fair\")\n", " .otherwise(\"Poor\")) \\\n", " .withColumn(\"usage_hour\", hour(\"usage_date\")) \\\n", " .withColumn(\"usage_day_of_week\", dayofweek(\"usage_date\")) \\\n", " .withColumn(\"is_business_hours\", \n", " when((col(\"usage_hour\") >= 9) & (col(\"usage_hour\") <= 17), True)\n", " .otherwise(False)) \\\n", " .filter(\"signal_quality IS NOT NULL\") \\\n", " .dropDuplicates([\"subscriber_id\", \"usage_date\", \"service_type\"])\n", "\n", "print(\"Silver Layer Transformations Applied:\")\n", "print(\"- Handled missing signal_quality values with tower averages\")\n", "print(\"- Added signal_category classification\")\n", "print(\"- Added temporal features (usage_hour, usage_day_of_week, is_business_hours)\")\n", "print(\"- Removed duplicates and invalid records\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Silver Layer DataFrame Schema:\n", "root\n", " |-- call_duration: double (nullable = true)\n", " |-- cell_tower_id: string (nullable = true)\n", " |-- data_volume: double (nullable = true)\n", " |-- service_type: string (nullable = true)\n", " |-- signal_quality: long (nullable = true)\n", " |-- subscriber_id: string (nullable = true)\n", " |-- usage_date: timestamp (nullable = true)\n", " |-- signal_category: string (nullable = false)\n", " |-- usage_hour: integer (nullable = true)\n", " |-- usage_day_of_week: integer (nullable = true)\n", " |-- is_business_hours: boolean (nullable = false)\n", "\n", "\n", "Sample Silver Data:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-------------+-------------+-----------+------------+--------------+-------------+-------------------+---------------+----------+-----------------+-----------------+\n", "|call_duration|cell_tower_id|data_volume|service_type|signal_quality|subscriber_id| usage_date|signal_category|usage_hour|usage_day_of_week|is_business_hours|\n", "+-------------+-------------+-----------+------------+--------------+-------------+-------------------+---------------+----------+-----------------+-----------------+\n", "| 0.0|TOWER_SEA_007| 7.295| Streaming| 72| SUB00000001|2024-01-15 20:16:00| Good| 20| 2| false|\n", "| 0.0|TOWER_MIA_005| 0.0| SMS| 67| SUB00000001|2024-03-08 12:40:00| Good| 12| 6| true|\n", "| 0.0|TOWER_CHI_003| 5.815| Streaming| 72| SUB00000001|2024-03-23 10:13:00| Good| 10| 7| true|\n", "| 0.0|TOWER_CHI_003| 2.236| Data| 75| SUB00000001|2024-05-11 14:12:00| Good| 14| 7| true|\n", "| 0.0|TOWER_HOU_004| 11.144| Streaming| 64| SUB00000001|2024-05-18 20:56:00| Good| 20| 7| false|\n", "+-------------+-------------+-----------+------------+--------------+-------------+-------------------+---------------+----------+-----------------+-----------------+\n", "only showing top 5 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Silver layer: Successfully processed 599179 cleaned records\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Data quality check - Total records: 599179, Null signals: 0\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Insert transformed data into silver layer\n", "\n", "print(\"Silver Layer DataFrame Schema:\")\n", "silver_df.printSchema()\n", "\n", "print(\"\\nSample Silver Data:\")\n", "silver_df.show(5)\n", "\n", "# Insert into silver table\n", "silver_df.write.mode(\"overwrite\").saveAsTable(\"telecom.silver.network_usage_clean\")\n", "\n", "silver_count = spark.sql(\"SELECT COUNT(*) FROM telecom.silver.network_usage_clean\").collect()[0][0]\n", "print(f\"\\nSilver layer: Successfully processed {silver_count} cleaned records\")\n", "\n", "# Quality check\n", "null_check = spark.sql(\"\"\"\n", "SELECT \n", " COUNT(*) as total_records,\n", " COUNT(CASE WHEN signal_quality IS NULL THEN 1 END) as null_signals\n", "FROM telecom.silver.network_usage_clean\n", "\"\"\").collect()[0]\n", "\n", "print(f\"Data quality check - Total records: {null_check['total_records']}, Null signals: {null_check['null_signals']}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Gold Layer: Business Analytics and Aggregations\n", "\n", "### Purpose\n", "- Provide business-ready aggregations\n", "- Enable fast queries for dashboards and reports\n", "- Support advanced analytics and ML\n", "\n", "### Analytics Included\n", "- Subscriber-level metrics\n", "- Service type performance\n", "- Network infrastructure analytics\n", "- Temporal usage patterns" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Gold schema created successfully!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create gold schema\n", "\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS telecom.gold\")\n", "print(\"Gold schema created successfully!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Gold layer subscriber analytics table created successfully!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create gold layer subscriber analytics table\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS telecom.gold.subscriber_analytics (\n", " subscriber_id STRING,\n", " total_sessions BIGINT,\n", " total_data_gb DECIMAL(10,3),\n", " total_call_minutes DECIMAL(8,2),\n", " avg_signal_quality DECIMAL(5,2),\n", " services_used INT,\n", " towers_used INT,\n", " active_days INT,\n", " avg_usage_hour DECIMAL(5,2),\n", " business_hours_pct DECIMAL(5,2),\n", " primary_service_type STRING,\n", " signal_category STRING,\n", " last_activity_date DATE,\n", " subscriber_segment STRING,\n", " created_at TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (subscriber_segment, avg_signal_quality)\n", "\"\"\")\n", "\n", "print(\"Gold layer subscriber analytics table created successfully!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Gold layer: Successfully created analytics for 10000 subscribers\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create gold layer aggregations from silver data\n", "\n", "subscriber_gold = spark.sql(\"\"\"\n", "WITH subscriber_metrics AS (\n", " SELECT \n", " subscriber_id,\n", " COUNT(*) as total_sessions,\n", " ROUND(SUM(data_volume), 3) as total_data_gb,\n", " ROUND(SUM(call_duration), 2) as total_call_minutes,\n", " ROUND(AVG(signal_quality), 2) as avg_signal_quality,\n", " COUNT(DISTINCT service_type) as services_used,\n", " COUNT(DISTINCT cell_tower_id) as towers_used,\n", " COUNT(DISTINCT DATE(usage_date)) as active_days,\n", " ROUND(AVG(usage_hour), 2) as avg_usage_hour,\n", " ROUND(AVG(CASE WHEN is_business_hours THEN 100.0 ELSE 0.0 END), 2) as business_hours_pct,\n", " MAX(usage_date) as last_activity_date\n", " FROM telecom.silver.network_usage_clean\n", " GROUP BY subscriber_id\n", "),\n", "service_preferences AS (\n", " SELECT \n", " subscriber_id,\n", " FIRST(service_type) as primary_service_type\n", " FROM (\n", " SELECT subscriber_id, service_type, COUNT(*) as usage_count,\n", " ROW_NUMBER() OVER (PARTITION BY subscriber_id ORDER BY COUNT(*) DESC) as rn\n", " FROM telecom.silver.network_usage_clean\n", " GROUP BY subscriber_id, service_type\n", " )\n", " WHERE rn = 1\n", " GROUP BY subscriber_id\n", ")\n", "SELECT \n", " s.subscriber_id,\n", " s.total_sessions,\n", " s.total_data_gb,\n", " s.total_call_minutes,\n", " s.avg_signal_quality,\n", " CASE WHEN s.avg_signal_quality >= 80 THEN 'Excellent'\n", " WHEN s.avg_signal_quality >= 60 THEN 'Good'\n", " WHEN s.avg_signal_quality >= 40 THEN 'Fair'\n", " ELSE 'Poor' END as signal_category,\n", " s.services_used,\n", " s.towers_used,\n", " s.active_days,\n", " s.avg_usage_hour,\n", " s.business_hours_pct,\n", " sp.primary_service_type,\n", " DATE(s.last_activity_date) as last_activity_date,\n", " CASE WHEN s.total_data_gb > 50 AND s.services_used >= 3 THEN 'High-Value'\n", " WHEN s.total_data_gb > 20 OR s.services_used >= 2 THEN 'Medium-Value'\n", " ELSE 'Low-Value' END as subscriber_segment\n", "FROM subscriber_metrics s\n", "LEFT JOIN service_preferences sp ON s.subscriber_id = sp.subscriber_id\n", "\"\"\")\n", "\n", "# Insert into gold layer\n", "subscriber_gold.write.mode(\"overwrite\").saveAsTable(\"telecom.gold.subscriber_analytics\")\n", "\n", "gold_count = spark.sql(\"SELECT COUNT(*) FROM telecom.gold.subscriber_analytics\").collect()[0][0]\n", "print(f\"Gold layer: Successfully created analytics for {gold_count} subscribers\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Gold layer infrastructure and service analytics tables created!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create additional gold layer tables for network and service analytics\n", "\n", "# Network infrastructure analytics\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS telecom.gold.network_infrastructure (\n", " cell_tower_id STRING,\n", " total_connections BIGINT,\n", " unique_subscribers BIGINT,\n", " avg_signal_quality DECIMAL(5,2),\n", " total_data_gb DECIMAL(10,3),\n", " total_call_minutes DECIMAL(8,2),\n", " signal_category STRING,\n", " utilization_rank INT,\n", " created_at TIMESTAMP \n", ")\n", "USING DELTA\n", "\"\"\")\n", "\n", "# Service performance analytics\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS telecom.gold.service_performance (\n", " service_type STRING,\n", " total_usage BIGINT,\n", " total_data_gb DECIMAL(10,3),\n", " total_call_minutes DECIMAL(8,2),\n", " avg_signal_quality DECIMAL(5,2),\n", " unique_subscribers BIGINT,\n", " avg_sessions_per_subscriber DECIMAL(5,2),\n", " revenue_potential DECIMAL(10,2),\n", " created_at TIMESTAMP \n", ")\n", "USING DELTA\n", "\"\"\")\n", "\n", "print(\"Gold layer infrastructure and service analytics tables created!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Network infrastructure analytics populated!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Populate network infrastructure analytics\n", "\n", "network_gold = spark.sql(\"\"\"\n", "SELECT \n", " cell_tower_id,\n", " COUNT(*) as total_connections,\n", " COUNT(DISTINCT subscriber_id) as unique_subscribers,\n", " ROUND(AVG(signal_quality), 2) as avg_signal_quality,\n", " ROUND(SUM(data_volume), 3) as total_data_gb,\n", " ROUND(SUM(call_duration), 2) as total_call_minutes,\n", " CASE WHEN AVG(signal_quality) >= 80 THEN 'Excellent'\n", " WHEN AVG(signal_quality) >= 60 THEN 'Good'\n", " WHEN AVG(signal_quality) >= 40 THEN 'Fair'\n", " ELSE 'Poor' END as signal_category,\n", " ROW_NUMBER() OVER (ORDER BY COUNT(*) DESC) as utilization_rank\n", "FROM telecom.silver.network_usage_clean\n", "GROUP BY cell_tower_id\n", "ORDER BY total_connections DESC\n", "\"\"\")\n", "\n", "network_gold.write.mode(\"overwrite\").saveAsTable(\"telecom.gold.network_infrastructure\")\n", "print(\"Network infrastructure analytics populated!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Service performance analytics populated!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Populate service performance analytics\n", "\n", "service_gold = spark.sql(\"\"\"\n", "SELECT \n", " service_type,\n", " COUNT(*) as total_usage,\n", " ROUND(SUM(data_volume), 3) as total_data_gb,\n", " ROUND(SUM(call_duration), 2) as total_call_minutes,\n", " ROUND(AVG(signal_quality), 2) as avg_signal_quality,\n", " COUNT(DISTINCT subscriber_id) as unique_subscribers,\n", " ROUND(COUNT(*) * 1.0 / COUNT(DISTINCT subscriber_id), 2) as avg_sessions_per_subscriber,\n", " -- Simplified revenue calculation\n", " ROUND(\n", " SUM(data_volume) * 10 + \n", " SUM(call_duration) * 0.1 + \n", " COUNT(CASE WHEN service_type = 'SMS' THEN 1 END) * 0.02 +\n", " COUNT(CASE WHEN service_type = 'Voice' THEN 1 END) * 0.5 +\n", " COUNT(CASE WHEN service_type = 'Streaming' THEN 1 END) * 2.0 +\n", " COUNT(CASE WHEN service_type = 'Data' THEN 1 END) * 0.8\n", " , 2) as revenue_potential\n", "FROM telecom.silver.network_usage_clean\n", "GROUP BY service_type\n", "ORDER BY total_usage DESC\n", "\"\"\")\n", "\n", "service_gold.write.mode(\"overwrite\").saveAsTable(\"telecom.gold.service_performance\")\n", "print(\"Service performance analytics populated!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Gold Layer Analytics ===\n", "\n", "Top Subscribers by Data Usage:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-------------+------------------+-------------+-------------+---------------+\n", "|subscriber_id|subscriber_segment|total_data_gb|services_used|signal_category|\n", "+-------------+------------------+-------------+-------------+---------------+\n", "| SUB00008666| High-Value| 407.254| 4| Good|\n", "| SUB00004805| High-Value| 372.894| 4| Good|\n", "| SUB00005566| High-Value| 369.98| 4| Good|\n", "| SUB00000090| High-Value| 354.836| 4| Good|\n", "| SUB00000759| High-Value| 348.745| 4| Good|\n", "+-------------+------------------+-------------+-------------+---------------+\n", "\n", "\n", "Network Tower Performance:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-------------+---------------+-----------------+------------------+----------------+\n", "|cell_tower_id|signal_category|total_connections|unique_subscribers|utilization_rank|\n", "+-------------+---------------+-----------------+------------------+----------------+\n", "|TOWER_HOU_004| Good| 86137| 9970| 1|\n", "|TOWER_CHI_003| Good| 85871| 9968| 2|\n", "|TOWER_MIA_005| Good| 85647| 9978| 3|\n", "|TOWER_LAX_002| Good| 85639| 9965| 4|\n", "|TOWER_SEA_007| Good| 85528| 9966| 5|\n", "+-------------+---------------+-----------------+------------------+----------------+\n", "\n", "\n", "Service Revenue Potential:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+------------+-----------+-----------------+---------------------------+\n", "|service_type|total_usage|revenue_potential|avg_sessions_per_subscriber|\n", "+------------+-----------+-----------------+---------------------------+\n", "| Streaming| 148979| 1.297370033E7| 14.90|\n", "| Data| 150714| 2040220.65| 15.07|\n", "| Voice| 149510| 197796.31| 14.95|\n", "| SMS| 149976| 2999.52| 15.00|\n", "+------------+-----------+-----------------+---------------------------+\n", "\n", "\n", "Subscriber Segmentation:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+------------------+----------------+-----------+----------+\n", "|subscriber_segment|subscriber_count|avg_data_gb|avg_signal|\n", "+------------------+----------------+-----------+----------+\n", "| High-Value| 9475| 151.94| 72.48|\n", "| Medium-Value| 525| 37.9| 72.5|\n", "+------------------+----------------+-----------+----------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Demonstrate gold layer analytics queries\n", "\n", "print(\"=== Gold Layer Analytics ===\")\n", "\n", "# Top subscribers by data usage\n", "print(\"\\nTop Subscribers by Data Usage:\")\n", "spark.sql(\"\"\"\n", "SELECT subscriber_id, subscriber_segment, total_data_gb, services_used, signal_category\n", "FROM telecom.gold.subscriber_analytics\n", "ORDER BY total_data_gb DESC\n", "LIMIT 5\n", "\"\"\").show()\n", "\n", "# Network tower performance\n", "print(\"\\nNetwork Tower Performance:\")\n", "spark.sql(\"\"\"\n", "SELECT cell_tower_id, signal_category, total_connections, unique_subscribers, utilization_rank\n", "FROM telecom.gold.network_infrastructure\n", "ORDER BY utilization_rank\n", "LIMIT 5\n", "\"\"\").show()\n", "\n", "# Service revenue potential\n", "print(\"\\nService Revenue Potential:\")\n", "spark.sql(\"\"\"\n", "SELECT service_type, total_usage, revenue_potential, avg_sessions_per_subscriber\n", "FROM telecom.gold.service_performance\n", "ORDER BY revenue_potential DESC\n", "\"\"\").show()\n", "\n", "# Subscriber segmentation\n", "print(\"\\nSubscriber Segmentation:\")\n", "spark.sql(\"\"\"\n", "SELECT subscriber_segment, COUNT(*) as subscriber_count, \n", " ROUND(AVG(total_data_gb), 2) as avg_data_gb,\n", " ROUND(AVG(avg_signal_quality), 2) as avg_signal\n", "FROM telecom.gold.subscriber_analytics\n", "GROUP BY subscriber_segment\n", "ORDER BY subscriber_count DESC\n", "\"\"\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Machine Learning: Customer Churn Prediction\n", "\n", "### Business Value\n", "- Predict subscribers likely to churn\n", "- Enable proactive retention strategies\n", "- Optimize marketing spend\n", "\n", "### ML Approach\n", "- Use gold layer subscriber analytics as features\n", "- Random Forest classifier for churn prediction\n", "- Include network quality and usage patterns" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Prepared ML dataset with 10000 subscribers\n", "Churn risk distribution:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+-----+\n", "|churn_risk|count|\n", "+----------+-----+\n", "| 1| 1213|\n", "| 0| 8787|\n", "+----------+-----+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Prepare data for churn prediction model using gold layer analytics\n", "\n", "from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler\n", "from pyspark.ml.classification import RandomForestClassifier\n", "from pyspark.ml.evaluation import BinaryClassificationEvaluator\n", "from pyspark.ml import Pipeline\n", "import pyspark.sql.functions as F\n", "\n", "# Read gold layer subscriber data\n", "subscriber_data = spark.table(\"telecom.gold.subscriber_analytics\")\n", "\n", "# Create churn labels based on gold layer metrics\n", "ml_data = subscriber_data.withColumn(\n", " \"churn_risk\",\n", " when(\n", " (col(\"total_sessions\") < 30) | \n", " (col(\"avg_signal_quality\") < 65) | \n", " (col(\"services_used\") < 3) |\n", " (col(\"subscriber_segment\") == \"Low-Value\"),\n", " 1\n", " ).otherwise(0)\n", ")\n", "\n", "print(f\"Prepared ML dataset with {ml_data.count()} subscribers\")\n", "print(\"Churn risk distribution:\")\n", "ml_data.groupBy(\"churn_risk\").count().show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Training set: 8079 subscribers\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Test set: 1921 subscribers\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Feature engineering for churn prediction\n", "\n", "# Index categorical features\n", "segment_indexer = StringIndexer(inputCol=\"subscriber_segment\", outputCol=\"segment_index\")\n", "signal_indexer = StringIndexer(inputCol=\"signal_category\", outputCol=\"signal_index\")\n", "service_indexer = StringIndexer(inputCol=\"primary_service_type\", outputCol=\"service_index\")\n", "\n", "# Assemble features\n", "feature_cols = [\n", " \"total_sessions\", \"total_data_gb\", \"total_call_minutes\", \n", " \"avg_signal_quality\", \"services_used\", \"towers_used\", \n", " \"active_days\", \"avg_usage_hour\", \"business_hours_pct\",\n", " \"segment_index\", \"signal_index\", \"service_index\"\n", "]\n", "\n", "assembler = VectorAssembler(inputCols=feature_cols, outputCol=\"features\")\n", "scaler = StandardScaler(inputCol=\"features\", outputCol=\"scaled_features\")\n", "\n", "# Create and train Random Forest model\n", "rf = RandomForestClassifier(\n", " labelCol=\"churn_risk\", \n", " featuresCol=\"scaled_features\",\n", " numTrees=100,\n", " maxDepth=10,\n", " seed=42\n", ")\n", "\n", "# Create pipeline\n", "pipeline = Pipeline(stages=[segment_indexer, signal_indexer, service_indexer, assembler, scaler, rf])\n", "\n", "# Split data\n", "train_data, test_data = ml_data.randomSplit([0.8, 0.2], seed=42)\n", "\n", "print(f\"Training set: {train_data.count()} subscribers\")\n", "print(f\"Test set: {test_data.count()} subscribers\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Training churn prediction model...\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Model Performance - AUC: 1.0000\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-------------+------------------+--------------+------------------+----------+----------+--------------------+\n", "|subscriber_id|subscriber_segment|total_sessions|avg_signal_quality|churn_risk|prediction| probability|\n", "+-------------+------------------+--------------+------------------+----------+----------+--------------------+\n", "| SUB00000003| High-Value| 93| 71.69| 0| 0.0|[0.99999567979812...|\n", "| SUB00000007| High-Value| 69| 73.16| 0| 0.0|[0.99999567979812...|\n", "| SUB00000009| High-Value| 97| 71.69| 0| 0.0|[0.99999567979812...|\n", "| SUB00000014| High-Value| 77| 72.01| 0| 0.0|[0.99999567979812...|\n", "| SUB00000020| High-Value| 23| 68.7| 1| 1.0| [0.01,0.99]|\n", "| SUB00000024| High-Value| 62| 71.76| 0| 0.0|[0.99999567979812...|\n", "| SUB00000030| High-Value| 54| 71.81| 0| 0.0|[0.99999567979812...|\n", "| SUB00000036| High-Value| 82| 70.43| 0| 0.0|[0.99998155550433...|\n", "| SUB00000046| Medium-Value| 22| 69.32| 1| 1.0| [0.0,1.0]|\n", "| SUB00000047| Medium-Value| 22| 70.95| 1| 1.0| [0.0,1.0]|\n", "+-------------+------------------+--------------+------------------+----------+----------+--------------------+\n", "only showing top 10 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Train the churn prediction model\n", "\n", "print(\"Training churn prediction model...\")\n", "model = pipeline.fit(train_data)\n", "\n", "# Make predictions\n", "predictions = model.transform(test_data)\n", "\n", "# Evaluate model\n", "evaluator = BinaryClassificationEvaluator(labelCol=\"churn_risk\", metricName=\"areaUnderROC\")\n", "auc = evaluator.evaluate(predictions)\n", "\n", "print(f\"\\nModel Performance - AUC: {auc:.4f}\")\n", "\n", "# Show predictions\n", "predictions.select(\n", " \"subscriber_id\", \"subscriber_segment\", \"total_sessions\", \n", " \"avg_signal_quality\", \"churn_risk\", \"prediction\", \"probability\"\n", ").show(10)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Feature Importance for Churn Prediction ===\n", "total_sessions: 0.4583\n", "total_data_gb: 0.0865\n", "total_call_minutes: 0.0499\n", "avg_signal_quality: 0.0013\n", "services_used: 0.0000\n", "towers_used: 0.0064\n", "active_days: 0.3676\n", "avg_usage_hour: 0.0015\n", "business_hours_pct: 0.0022\n", "segment_index: 0.0251\n", "signal_index: 0.0000\n", "service_index: 0.0011\n", "\n", "=== Business Impact Analysis ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Total test subscribers: 1921\n", "Subscribers predicted as high churn risk: 223\n", "Percentage flagged for intervention: 11.6%\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Estimated average ARPU: $1537.63\n", "Potential monthly revenue at risk: $342,891.99\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Detailed Model Metrics:\n", "Accuracy: 1.0000\n", "Precision: 1.0000\n", "Recall: 1.0000\n", "AUC: 1.0000\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Model interpretation and business impact analysis\n", "\n", "# Feature importance\n", "rf_model = model.stages[-1]\n", "feature_names = feature_cols\n", "\n", "print(\"=== Feature Importance for Churn Prediction ===\")\n", "for name, importance in zip(feature_names, rf_model.featureImportances):\n", " print(f\"{name}: {importance:.4f}\")\n", "\n", "print(\"\\n=== Business Impact Analysis ===\")\n", "\n", "# Calculate potential impact\n", "churn_predictions = predictions.filter(\"prediction = 1\")\n", "high_risk_subscribers = churn_predictions.count()\n", "total_test_subscribers = test_data.count()\n", "\n", "print(f\"Total test subscribers: {total_test_subscribers}\")\n", "print(f\"Subscribers predicted as high churn risk: {high_risk_subscribers}\")\n", "print(f\"Percentage flagged for intervention: {(high_risk_subscribers/total_test_subscribers)*100:.1f}%\")\n", "\n", "# Revenue impact calculation\n", "avg_data_gb = test_data.agg(F.avg(\"total_data_gb\")).collect()[0][0] or 0\n", "avg_call_minutes = test_data.agg(F.avg(\"total_call_minutes\")).collect()[0][0] or 0\n", "estimated_arpu = (avg_data_gb * 10) + (avg_call_minutes * 0.1) + 50\n", "potential_monthly_loss = high_risk_subscribers * estimated_arpu\n", "\n", "print(f\"\\nEstimated average ARPU: ${estimated_arpu:.2f}\")\n", "print(f\"Potential monthly revenue at risk: ${potential_monthly_loss:,.2f}\")\n", "\n", "# Model metrics\n", "accuracy = predictions.filter(\"churn_risk = prediction\").count() / predictions.count()\n", "precision = predictions.filter(\"prediction = 1 AND churn_risk = 1\").count() / predictions.filter(\"prediction = 1\").count() if predictions.filter(\"prediction = 1\").count() > 0 else 0\n", "recall = predictions.filter(\"prediction = 1 AND churn_risk = 1\").count() / predictions.filter(\"churn_risk = 1\").count() if predictions.filter(\"churn_risk = 1\").count() > 0 else 0\n", "\n", "print(f\"\\nDetailed Model Metrics:\")\n", "print(f\"Accuracy: {accuracy:.4f}\")\n", "print(f\"Precision: {precision:.4f}\")\n", "print(f\"Recall: {recall:.4f}\")\n", "print(f\"AUC: {auc:.4f}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Key Takeaways: Medallion Architecture + ML in AIDP\n", "\n", "### What We Demonstrated\n", "\n", "1. **Bronze Layer**: Raw data ingestion with liquid clustering for performance\n", "2. **Silver Layer**: Data cleaning, standardization, and enrichment\n", "3. **Gold Layer**: Business-ready aggregations and analytics\n", "4. **Machine Learning**: Churn prediction using curated gold layer data\n", "\n", "### Medallion Architecture Benefits\n", "\n", "- **Data Quality**: Progressive improvement from raw to business-ready\n", "- **Performance**: Optimized clustering at each layer\n", "- **Governance**: Clear data lineage and catalog organization\n", "- **Flexibility**: Reprocessing capability from bronze layer\n", "\n", "### AIDP Advantages\n", "\n", "- **Unified Platform**: Seamless data processing to ML\n", "- **Liquid Clustering**: Automatic optimization without manual tuning\n", "- **Enterprise Ready**: Governance, security, and scalability\n", "\n", "### Business Impact for Telecommunications\n", "\n", "1. **Data-Driven Insights**: Comprehensive analytics across all layers\n", "2. **Predictive Analytics**: ML-powered churn prevention\n", "3. **Operational Efficiency**: Automated data processing pipelines\n", "4. **Customer Experience**: Proactive service improvements\n", "\n", "### Best Practices\n", "\n", "1. **Layer Progression**: Always maintain clear bronze → silver → gold flow\n", "2. **Clustering Strategy**: Choose columns based on query patterns\n", "3. **Data Quality**: Implement validation at each layer\n", "4. **ML Integration**: Use gold layer for training production models\n", "\n", "### Next Steps\n", "\n", "- Deploy medallion pipelines in production\n", "- Add real-time streaming to bronze layer\n", "- Implement automated data quality monitoring\n", "- Scale ML models for real-time predictions\n", "- Integrate with customer service systems" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" } }, "nbformat": 4, "nbformat_minor": 4 }