{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Media: Medallion Architecture Demo with Delta Liquid Clustering\n", "\n", "## Overview\n", "\n", "This notebook demonstrates a **Medallion Architecture** implementation in Oracle AI Data Platform (AIDP) Workbench using a media and entertainment analytics use case. The medallion architecture organizes data into three layers:\n", "\n", "- **Bronze Layer**: Raw, unprocessed data as ingested\n", "- **Silver Layer**: Cleaned, validated, and transformed data\n", "- **Gold Layer**: Aggregated, business-ready data with analytics and ML insights\n", "\n", "We'll incorporate **Delta Liquid Clustering** for automatic data optimization and include machine learning components for content recommendation.\n", "\n", "### Key Technologies\n", "- **Delta Lake**: ACID transactions, time travel, schema enforcement\n", "- **Liquid Clustering**: Automatic data layout optimization\n", "- **Medallion Architecture**: Progressive data refinement\n", "- **PySpark ML**: Machine learning for content recommendations\n", "\n", "### Use Case: Content Performance and User Engagement Analytics\n", "\n", "We'll analyze media content consumption patterns to optimize content recommendations, improve user engagement, and drive business insights.\n", "\n", "### AIDP Environment Setup\n", "\n", "This notebook leverages the existing Spark session in your AIDP environment." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Media catalog with bronze, silver, and gold schemas created successfully!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create media catalog with bronze, silver, and gold schemas\n", "# In AIDP, catalogs provide data isolation and governance\n", "\n", "spark.sql(\"CREATE CATALOG IF NOT EXISTS media\")\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS media.bronze\")\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS media.silver\")\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS media.gold\")\n", "\n", "print(\"Media catalog with bronze, silver, and gold schemas created successfully!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Bronze Layer: Raw Data Ingestion\n", "\n", "### Purpose\n", "The Bronze layer stores raw data exactly as received, without any transformations. This preserves data integrity and enables reprocessing if needed.\n", "\n", "### Table Design\n", "Our `content_engagement_raw` table will store:\n", "\n", "- **user_id**: Raw user identifier\n", "- **engagement_date**: Raw timestamp\n", "- **content_type**: Content type as received\n", "- **watch_time**: Raw watch time data\n", "- **content_id**: Raw content identifier\n", "- **engagement_score**: Raw engagement metric\n", "- **device_type**: Device information\n", "- **ingestion_timestamp**: When data was ingested\n", "\n", "### Clustering Strategy\n", "We'll cluster by `user_id` and `ingestion_timestamp` for efficient data management and historical tracking." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create Bronze layer Delta table with liquid clustering\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS media.bronze.content_engagement_raw (\n", " user_id STRING,\n", " engagement_date STRING,\n", " content_type STRING,\n", " watch_time STRING,\n", " content_id STRING,\n", " engagement_score STRING,\n", " device_type STRING,\n", " ingestion_timestamp TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (user_id, ingestion_timestamp)\n", "\"\"\")\n", "\n", "print(\"Bronze layer table created successfully!\")\n", "print(\"Clustering will optimize data layout for user-based queries and temporal analysis.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Generate and ingest raw media engagement data\n", "# Using fully qualified imports to avoid conflicts\n", "\n", "import random\n", "from datetime import datetime, timedelta\n", "\n", "# Define media data constants\n", "CONTENT_TYPES = ['Video', 'Article', 'Podcast', 'Live Stream']\n", "DEVICE_TYPES = ['Mobile', 'Desktop', 'Tablet', 'Smart TV', 'Gaming Console']\n", "\n", "# Generate sample raw data (simulating various data quality issues)\n", "raw_engagement_data = []\n", "base_date = datetime(2024, 1, 1)\n", "ingestion_time = datetime.now()\n", "\n", "# Create 15,000 users with varying data quality\n", "for user_num in range(1, 15001):\n", " user_id = f\"USER{user_num:06d}\"\n", " \n", " # Each user gets 8-35 engagement events\n", " num_engagements = random.randint(8, 35)\n", " \n", " for i in range(num_engagements):\n", " # Spread engagements over 12 months\n", " days_offset = random.randint(0, 365)\n", " engagement_date = base_date + timedelta(days=days_offset)\n", " \n", " # Add realistic timing\n", " hour_weights = [2, 1, 1, 1, 1, 1, 3, 6, 8, 7, 6, 7, 8, 9, 10, 9, 8, 10, 12, 9, 7, 5, 4, 3]\n", " hours_offset = random.choices(range(24), weights=hour_weights)[0]\n", " engagement_datetime = engagement_date.replace(hour=hours_offset, minute=random.randint(0, 59), second=0, microsecond=0)\n", " \n", " # Select content type\n", " content_type = random.choice(CONTENT_TYPES)\n", " \n", " # Select device type\n", " device_type = random.choice(DEVICE_TYPES)\n", " \n", " # Generate watch time with some data quality issues\n", " base_watch_time = {'Video': 15, 'Article': 8, 'Podcast': 25, 'Live Stream': 45}[content_type]\n", " watch_time = round(base_watch_time * random.uniform(0.1, 3.0), 2)\n", " \n", " # Content ID\n", " content_id = f\"{content_type[:3].upper()}{random.randint(10000, 99999)}\"\n", " \n", " # Engagement score with some outliers - increased range to ensure more high engagement examples\n", " base_score = {'Video': 75, 'Article': 65, 'Podcast': 70, 'Live Stream': 80}[content_type]\n", " engagement_score = random.randint(max(0, base_score - 15), min(100, base_score + 35))\n", " \n", " # Simulate data quality issues in raw data\n", " if random.random() < 0.05: # 5% chance of data quality issues\n", " if random.random() < 0.3:\n", " watch_time = \"NULL\" # Missing data\n", " elif random.random() < 0.5:\n", " engagement_score = str(random.randint(200, 500)) # Out of range\n", " else:\n", " content_type = content_type.lower() # Inconsistent casing\n", " \n", " raw_engagement_data.append({\n", " \"user_id\": user_id,\n", " \"engagement_date\": engagement_datetime.isoformat(),\n", " \"content_type\": str(content_type),\n", " \"watch_time\": str(watch_time),\n", " \"content_id\": content_id,\n", " \"engagement_score\": str(engagement_score),\n", " \"device_type\": device_type,\n", " \"ingestion_timestamp\": ingestion_time\n", " })\n", "\n", "print(f\"Generated {len(raw_engagement_data)} raw content engagement records\")\n", "print(\"Sample raw record:\", raw_engagement_data[0])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Insert raw data into Bronze layer\n", "\n", "df_raw = spark.createDataFrame(raw_engagement_data)\n", "\n", "print(\"Raw DataFrame Schema:\")\n", "df_raw.printSchema()\n", "\n", "print(\"\\nSample Raw Data:\")\n", "df_raw.show(5)\n", "\n", "# Insert into Bronze table\n", "df_raw.write.mode(\"overwrite\").saveAsTable(\"media.bronze.content_engagement_raw\")\n", "\n", "print(f\"\\nSuccessfully ingested {df_raw.count()} raw records into Bronze layer\")\n", "print(\"Data stored exactly as received, preserving original quality and format\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Silver Layer: Data Cleaning and Transformation\n", "\n", "### Purpose\n", "The Silver layer contains cleaned, validated, and standardized data. This layer:\n", "- Removes or corrects invalid data\n", "- Standardizes formats and types\n", "- Enriches data with derived fields\n", "- Prepares data for analytical use\n", "\n", "### Transformations Applied\n", "- **Type casting**: Convert strings to appropriate data types\n", "- **Data validation**: Remove/correct invalid values\n", "- **Standardization**: Consistent formatting\n", "- **Enrichment**: Add derived fields like engagement categories" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create Silver layer table\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS media.silver.content_engagement_clean (\n", " user_id STRING,\n", " engagement_date TIMESTAMP,\n", " content_type STRING,\n", " watch_time DECIMAL(8,2),\n", " content_id STRING,\n", " engagement_score INT,\n", " device_type STRING,\n", " engagement_category STRING,\n", " ingestion_timestamp TIMESTAMP,\n", " processing_timestamp TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (user_id, engagement_date)\n", "\"\"\")\n", "\n", "print(\"Silver layer table created successfully!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Transform Bronze data to Silver layer\n", "from pyspark.sql.functions import col, when, udf, current_timestamp\n", "from pyspark.sql.types import IntegerType, DecimalType, TimestampType\n", "\n", "# Read Bronze data\n", "bronze_df = spark.table(\"media.bronze.content_engagement_raw\")\n", "\n", "# Data cleaning and transformation\n", "silver_df = bronze_df \\\n", " .withColumn(\"engagement_date_clean\", \n", " when(col(\"engagement_date\").isNotNull(), col(\"engagement_date\").cast(TimestampType()))\n", " .otherwise(current_timestamp())) \\\n", " .withColumn(\"watch_time_clean\",\n", " when((col(\"watch_time\") != \"NULL\") & (col(\"watch_time\").cast(\"float\").isNotNull()), \n", " col(\"watch_time\").cast(DecimalType(8,2)))\n", " .otherwise(0.0)) \\\n", " .withColumn(\"engagement_score_clean\",\n", " when(col(\"engagement_score\").cast(\"int\").isNotNull(), col(\"engagement_score\").cast(\"int\"))\n", " .otherwise(50) # Default score\n", " .cast(IntegerType())) \\\n", " .withColumn(\"engagement_score_clean\",\n", " when(col(\"engagement_score_clean\").between(0, 100), col(\"engagement_score_clean\"))\n", " .otherwise(50)) \\\n", " .withColumn(\"content_type_clean\",\n", " when(col(\"content_type\").isNotNull(), col(\"content_type\"))\n", " .otherwise(\"Unknown\")) \\\n", " .withColumn(\"engagement_category\",\n", " when(col(\"engagement_score_clean\") >= 80, \"High\")\n", " .when(col(\"engagement_score_clean\") >= 60, \"Medium\")\n", " .otherwise(\"Low\")) \\\n", " .withColumn(\"processing_timestamp\", current_timestamp()) \\\n", " .select(\n", " col(\"user_id\"),\n", " col(\"engagement_date_clean\").alias(\"engagement_date\"),\n", " col(\"content_type_clean\").alias(\"content_type\"),\n", " col(\"watch_time_clean\").alias(\"watch_time\"),\n", " col(\"content_id\"),\n", " col(\"engagement_score_clean\").alias(\"engagement_score\"),\n", " col(\"device_type\"),\n", " col(\"engagement_category\"),\n", " col(\"ingestion_timestamp\"),\n", " col(\"processing_timestamp\")\n", " )\n", "\n", "print(\"Silver layer transformation completed\")\n", "print(f\"Processed {silver_df.count()} records\")\n", "\n", "print(\"\\nSilver Data Schema:\")\n", "silver_df.printSchema()\n", "\n", "print(\"\\nSample Clean Data:\")\n", "silver_df.show(10)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Insert cleaned data into Silver layer\n", "\n", "silver_df.write.mode(\"overwrite\").saveAsTable(\"media.silver.content_engagement_clean\")\n", "\n", "print(\"Successfully transformed and loaded data into Silver layer\")\n", "print(\"Data is now cleaned, validated, and enriched for analysis\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Gold Layer: Business Analytics and ML Insights\n", "\n", "### Purpose\n", "The Gold layer contains aggregated, business-ready data optimized for:\n", "- **Analytics dashboards** and reporting\n", "- **Machine learning** model training and scoring\n", "- **Business intelligence** and decision making\n", "\n", "### Tables in Gold Layer\n", "- **content_analytics**: Aggregated metrics and KPIs\n", "- **user_profiles**: User behavior summaries\n", "- **engagement_predictions**: ML model predictions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create Gold layer analytics table\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS media.gold.content_analytics (\n", " content_type STRING,\n", " device_type STRING,\n", " date DATE,\n", " total_engagements BIGINT,\n", " total_watch_time DECIMAL(12,2),\n", " avg_watch_time DECIMAL(8,2),\n", " avg_engagement_score DECIMAL(5,2),\n", " unique_users BIGINT,\n", " high_engagement_rate DECIMAL(5,4),\n", " created_at TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (content_type, date)\n", "\"\"\")\n", "\n", "print(\"Gold layer analytics table created successfully!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create user profiles table in Gold layer\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS media.gold.user_profiles (\n", " user_id STRING,\n", " total_sessions BIGINT,\n", " total_watch_time DECIMAL(10,2),\n", " avg_session_time DECIMAL(8,2),\n", " avg_engagement_score DECIMAL(5,2),\n", " preferred_content_type STRING,\n", " preferred_device STRING,\n", " engagement_trend STRING,\n", " last_engagement_date TIMESTAMP,\n", " user_segment STRING,\n", " created_at TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (user_segment, last_engagement_date)\n", "\"\"\")\n", "\n", "print(\"Gold layer user profiles table created successfully!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Aggregate data for Gold layer analytics\n", "from pyspark.sql.functions import date_format, count, sum, avg, countDistinct, round, current_timestamp\n", "\n", "# Read Silver data\n", "silver_data = spark.table(\"media.silver.content_engagement_clean\")\n", "\n", "# Create content analytics aggregations\n", "content_analytics = silver_data \\\n", " .withColumn(\"date\", date_format(\"engagement_date\", \"yyyy-MM-dd\").cast(\"date\")) \\\n", " .groupBy(\"content_type\", \"device_type\", \"date\") \\\n", " .agg(\n", " count(\"*\").alias(\"total_engagements\"),\n", " sum(\"watch_time\").alias(\"total_watch_time\"),\n", " avg(\"watch_time\").alias(\"avg_watch_time\"),\n", " avg(\"engagement_score\").alias(\"avg_engagement_score\"),\n", " countDistinct(\"user_id\").alias(\"unique_users\"),\n", " (count(when(col(\"engagement_category\") == \"High\", 1)) / count(\"*\")).alias(\"high_engagement_rate\")\n", " ) \\\n", " .withColumn(\"created_at\", current_timestamp())\n", "\n", "# Round decimal columns\n", "content_analytics = content_analytics \\\n", " .withColumn(\"total_watch_time\", round(\"total_watch_time\", 2)) \\\n", " .withColumn(\"avg_watch_time\", round(\"avg_watch_time\", 2)) \\\n", " .withColumn(\"avg_engagement_score\", round(\"avg_engagement_score\", 2)) \\\n", " .withColumn(\"high_engagement_rate\", round(\"high_engagement_rate\", 4))\n", "\n", "print(\"Content analytics aggregations created\")\n", "content_analytics.show(10)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "User profiles created\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+--------------+----------------+----------------+--------------------+--------------------+----------------------+----------------+----------------+------------+--------------------+\n", "| user_id|total_sessions|total_watch_time|avg_session_time|avg_engagement_score|last_engagement_date|preferred_content_type|preferred_device|engagement_trend|user_segment| created_at|\n", "+----------+--------------+----------------+----------------+--------------------+--------------------+----------------------+----------------+----------------+------------+--------------------+\n", "|USER003726| 31| 1330.09| 42.91| 72.16| 2024-12-25 11:20:00| Live Stream| Gaming Console| High Performer| Power User|2026-01-02 20:20:...|\n", "|USER003738| 21| 944.71| 44.99| 61.43| 2024-12-31 16:15:00| Live Stream| Smart TV| Good Engagement|Regular User|2026-01-02 20:20:...|\n", "|USER003806| 31| 911.62| 29.41| 65.68| 2024-12-26 13:05:00| Article| Desktop| Good Engagement| Power User|2026-01-02 20:20:...|\n", "|USER003834| 30| 1027.14| 34.24| 64.33| 2024-12-31 14:33:00| Article| Tablet| Good Engagement| Power User|2026-01-02 20:20:...|\n", "|USER003856| 31| 1053.74| 33.99| 65.32| 2024-12-26 14:05:00| Podcast| Gaming Console| Good Engagement| Power User|2026-01-02 20:20:...|\n", "|USER003871| 12| 493.83| 41.15| 64.5| 2024-11-21 18:11:00| Article| Smart TV| Good Engagement| Casual User|2026-01-02 20:20:...|\n", "|USER004227| 33| 1265.76| 38.36| 72.91| 2024-12-14 18:37:00| Video| Mobile| High Performer| Power User|2026-01-02 20:20:...|\n", "|USER004765| 28| 1339.59| 47.84| 71.71| 2024-12-20 17:01:00| Live Stream| Tablet| High Performer| Power User|2026-01-02 20:20:...|\n", "|USER004838| 18| 609.9| 33.88| 68.72| 2024-12-31 21:06:00| Video| Gaming Console| Good Engagement|Regular User|2026-01-02 20:20:...|\n", "|USER004974| 28| 1208.69| 43.17| 68.64| 2024-12-17 19:42:00| Live Stream| Tablet| Good Engagement| Power User|2026-01-02 20:20:...|\n", "+----------+--------------+----------------+----------------+--------------------+--------------------+----------------------+----------------+----------------+------------+--------------------+\n", "only showing top 10 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create user profiles for Gold layer\n", "from pyspark.sql.functions import max, first, when, col\n", "from pyspark.sql.window import Window\n", "\n", "# User behavior aggregations\n", "user_profiles_base = silver_data \\\n", " .groupBy(\"user_id\") \\\n", " .agg(\n", " count(\"*\").alias(\"total_sessions\"),\n", " sum(\"watch_time\").alias(\"total_watch_time\"),\n", " avg(\"watch_time\").alias(\"avg_session_time\"),\n", " avg(\"engagement_score\").alias(\"avg_engagement_score\"),\n", " max(\"engagement_date\").alias(\"last_engagement_date\")\n", " )\n", "\n", "# Get preferred content type and device per user\n", "user_preferences = silver_data \\\n", " .groupBy(\"user_id\", \"content_type\") \\\n", " .agg(count(\"*\").alias(\"content_count\")) \\\n", " .withColumn(\"rank\", row_number().over(Window.partitionBy(\"user_id\").orderBy(col(\"content_count\").desc()))) \\\n", " .filter(\"rank = 1\") \\\n", " .select(\"user_id\", \"content_type\")\n", "\n", "user_device_prefs = silver_data \\\n", " .groupBy(\"user_id\", \"device_type\") \\\n", " .agg(count(\"*\").alias(\"device_count\")) \\\n", " .withColumn(\"rank\", row_number().over(Window.partitionBy(\"user_id\").orderBy(col(\"device_count\").desc()))) \\\n", " .filter(\"rank = 1\") \\\n", " .select(\"user_id\", \"device_type\")\n", "\n", "# Combine user profiles\n", "user_profiles = user_profiles_base \\\n", " .join(user_preferences, \"user_id\", \"left\") \\\n", " .join(user_device_prefs, \"user_id\", \"left\") \\\n", " .withColumn(\"preferred_content_type\", col(\"content_type\")) \\\n", " .withColumn(\"preferred_device\", col(\"device_type\")) \\\n", " .withColumn(\"engagement_trend\", \n", " when(col(\"avg_engagement_score\") >= 70, \"High Performer\")\n", " .when(col(\"avg_engagement_score\") >= 60, \"Good Engagement\")\n", " .otherwise(\"Needs Attention\")) \\\n", " .withColumn(\"user_segment\",\n", " when(col(\"total_sessions\") >= 25, \"Power User\")\n", " .when(col(\"total_sessions\") >= 15, \"Regular User\")\n", " .otherwise(\"Casual User\")) \\\n", " .withColumn(\"created_at\", current_timestamp()) \\\n", " .drop(\"content_type\", \"device_type\")\n", "\n", "# Round decimal columns\n", "user_profiles = user_profiles \\\n", " .withColumn(\"total_watch_time\", round(\"total_watch_time\", 2)) \\\n", " .withColumn(\"avg_session_time\", round(\"avg_session_time\", 2)) \\\n", " .withColumn(\"avg_engagement_score\", round(\"avg_engagement_score\", 2))\n", "\n", "print(\"User profiles created\")\n", "user_profiles.show(10)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Successfully loaded aggregated analytics into Gold layer\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Content analytics: 11190 records\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "User profiles: 15000 records\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Load aggregated data into Gold layer tables\n", "\n", "content_analytics.write.mode(\"overwrite\").saveAsTable(\"media.gold.content_analytics\")\n", "user_profiles.write.mode(\"overwrite\").saveAsTable(\"media.gold.user_profiles\")\n", "\n", "print(\"Successfully loaded aggregated analytics into Gold layer\")\n", "print(f\"Content analytics: {content_analytics.count()} records\")\n", "print(f\"User profiles: {user_profiles.count()} records\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Machine Learning: Content Engagement Prediction\n", "\n", "### ML in the Gold Layer\n", "We'll train a machine learning model to predict content engagement and create personalized recommendations.\n", "\n", "### Business Value\n", "- **Personalized Recommendations**: Increase user engagement and watch time\n", "- **Content Optimization**: Identify high-performing content patterns\n", "- **Revenue Growth**: Better engagement drives advertising and subscription revenue" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Prepared 321321 records for ML training\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+---------------+------+\n", "|high_engagement| count|\n", "+---------------+------+\n", "| 1|139466|\n", "| 0|181855|\n", "+---------------+------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Prepare data for ML model training\n", "from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler\n", "from pyspark.ml.classification import RandomForestClassifier\n", "from pyspark.ml.evaluation import BinaryClassificationEvaluator\n", "from pyspark.ml.functions import vector_to_array\n", "from pyspark.ml import Pipeline\n", "import pyspark.sql.functions as F\n", "\n", "# Read Silver layer data for ML\n", "ml_data = spark.table(\"media.silver.content_engagement_clean\")\n", "\n", "# Create engagement prediction features\n", "engagement_features = ml_data \\\n", " .withColumn(\"high_engagement\", when(col(\"engagement_score\") > 70, 1).otherwise(0)) \\\n", " .withColumn(\"engagement_hour\", F.hour(\"engagement_date\")) \\\n", " .withColumn(\"engagement_day_of_week\", F.dayofweek(\"engagement_date\")) \\\n", " .withColumn(\"user_avg_engagement\", \n", " F.avg(\"engagement_score\").over(Window.partitionBy(\"user_id\").orderBy(\"engagement_date\").rowsBetween(-10, -1))) \\\n", " .withColumn(\"user_prior_engagements\", \n", " F.count(\"*\").over(Window.partitionBy(\"user_id\").orderBy(\"engagement_date\").rowsBetween(-10, -1))) \\\n", " .fillna(0, subset=[\"user_avg_engagement\"]) \\\n", " .fillna(1, subset=[\"user_prior_engagements\"])\n", "\n", "print(f\"Prepared {engagement_features.count()} records for ML training\")\n", "engagement_features.groupBy(\"high_engagement\").count().show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Training set: 257009 interactions\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Test set: 64312 interactions\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Feature engineering and model training\n", "\n", "# Index categorical features\n", "content_type_indexer = StringIndexer(inputCol=\"content_type\", outputCol=\"content_type_index\")\n", "device_type_indexer = StringIndexer(inputCol=\"device_type\", outputCol=\"device_type_index\")\n", "\n", "# Assemble features\n", "feature_cols = [\"watch_time\", \"engagement_hour\", \"engagement_day_of_week\", \n", " \"user_avg_engagement\", \"user_prior_engagements\", \n", " \"content_type_index\", \"device_type_index\"]\n", "\n", "assembler = VectorAssembler(inputCols=feature_cols, outputCol=\"features\")\n", "scaler = StandardScaler(inputCol=\"features\", outputCol=\"scaled_features\")\n", "\n", "# Random Forest model\n", "rf = RandomForestClassifier(\n", " labelCol=\"high_engagement\", \n", " featuresCol=\"scaled_features\",\n", " numTrees=50,\n", " maxDepth=8\n", ")\n", "\n", "# Create pipeline\n", "pipeline = Pipeline(stages=[content_type_indexer, device_type_indexer, assembler, scaler, rf])\n", "\n", "# Split data\n", "train_data, test_data = engagement_features.randomSplit([0.8, 0.2], seed=42)\n", "\n", "print(f\"Training set: {train_data.count()} interactions\")\n", "print(f\"Test set: {test_data.count()} interactions\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Training content engagement prediction model...\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Model AUC: 0.6207\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+------------+----------+---------------+----------+--------------------+\n", "| user_id|content_type|watch_time|high_engagement|prediction| probability|\n", "+----------+------------+----------+---------------+----------+--------------------+\n", "|USER000004| Live Stream| 18.84| 0| 1.0|[0.45896525024308...|\n", "|USER000004| Video| 26.57| 1| 1.0|[0.49641008043898...|\n", "|USER000004| Article| 8.29| 1| 0.0|[0.70666455872038...|\n", "|USER000007| Live Stream| 103.29| 1| 1.0|[0.42414598085538...|\n", "|USER000007| Live Stream| 28.55| 1| 1.0|[0.43578712043250...|\n", "|USER000011| Live Stream| 119.85| 0| 1.0|[0.42343499054838...|\n", "|USER000011| Live Stream| 33.56| 1| 1.0|[0.43529629589007...|\n", "|USER000022| Article| 22.15| 1| 0.0|[0.69395640594806...|\n", "|USER000022| Video| 24.93| 1| 0.0|[0.52654328396472...|\n", "|USER000022| Podcast| 14.57| 0| 0.0|[0.63646842845440...|\n", "|USER000022| Article| 11.56| 0| 0.0|[0.70575863809617...|\n", "|USER000022| Article| 0.0| 1| 0.0|[0.70191653332529...|\n", "|USER000022| Podcast| 61.76| 1| 0.0|[0.61036191071031...|\n", "|USER000022| Podcast| 33.52| 1| 0.0|[0.62087453837095...|\n", "|USER000022| Podcast| 59.36| 1| 0.0|[0.60774847627774...|\n", "+----------+------------+----------+---------------+----------+--------------------+\n", "only showing top 15 rows\n", "\n", "Gold layer predictions table created!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Train the engagement prediction model\n", "\n", "print(\"Training content engagement prediction model...\")\n", "model = pipeline.fit(train_data)\n", "\n", "# Make predictions\n", "predictions = model.transform(test_data)\n", "\n", "# Evaluate model\n", "evaluator = BinaryClassificationEvaluator(labelCol=\"high_engagement\", metricName=\"areaUnderROC\")\n", "auc = evaluator.evaluate(predictions)\n", "\n", "print(f\"Model AUC: {auc:.4f}\")\n", "\n", "# Show prediction results\n", "predictions.select(\"user_id\", \"content_type\", \"watch_time\", \"high_engagement\", \"prediction\", \"probability\").show(15)\n", "\n", "# Create predictions table in Gold layer\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS media.gold.engagement_predictions (\n", " user_id STRING,\n", " content_type STRING,\n", " watch_time DECIMAL(8,2),\n", " engagement_score INT,\n", " predicted_high_engagement INT,\n", " prediction_probability DECIMAL(5,4),\n", " prediction_timestamp TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (user_id, predicted_high_engagement)\n", "\"\"\")\n", "\n", "print(\"Gold layer predictions table created!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Successfully saved 64312 predictions to Gold layer\n", "ML predictions are now available for business analysis and recommendations\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Save predictions to Gold layer\n", "\n", "predictions_for_gold = predictions \\\n", " .select(\n", " \"user_id\",\n", " \"content_type\", \n", " \"watch_time\",\n", " \"engagement_score\",\n", " \"prediction\",\n", " vector_to_array(\"probability\")[1].alias(\"prediction_probability\"),\n", " F.current_timestamp().alias(\"prediction_timestamp\")\n", " ) \\\n", " .withColumnRenamed(\"prediction\", \"predicted_high_engagement\") \\\n", " .withColumn(\"prediction_probability\", F.round(\"prediction_probability\", 4))\n", "\n", "predictions_for_gold.write.mode(\"overwrite\").saveAsTable(\"media.gold.engagement_predictions\")\n", "\n", "print(f\"Successfully saved {predictions_for_gold.count()} predictions to Gold layer\")\n", "print(\"ML predictions are now available for business analysis and recommendations\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Feature Importance for Engagement Prediction ===\n", "watch_time: 0.1994\n", "engagement_hour: 0.0163\n", "engagement_day_of_week: 0.0085\n", "user_avg_engagement: 0.0155\n", "user_prior_engagements: 0.0090\n", "content_type_index: 0.7440\n", "device_type_index: 0.0073\n", "\n", "=== Business Impact Analysis ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Total predictions: 64312\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Predicted high engagement content: 16963\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Recommendation coverage: 26.4%\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Average watch time for recommended content: 65.75 minutes\n", "Average watch time overall: 35.22 minutes\n", "Potential engagement lift: 86.7%\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Model Performance:\n", "Accuracy: 0.6039\n", "Precision: 0.5702\n", "AUC: 0.6207\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Model interpretation and business insights\n", "\n", "# Feature importance\n", "rf_model = model.stages[-1]\n", "feature_importance = rf_model.featureImportances\n", "feature_names = feature_cols\n", "\n", "print(\"=== Feature Importance for Engagement Prediction ===\")\n", "for name, importance in zip(feature_names, feature_importance):\n", " print(f\"{name}: {importance:.4f}\")\n", "\n", "print(\"\\n=== Business Impact Analysis ===\")\n", "\n", "# Calculate potential impact\n", "high_engagement_predictions = predictions.filter(\"prediction = 1\")\n", "total_predictions = predictions.count()\n", "\n", "print(f\"Total predictions: {total_predictions}\")\n", "print(f\"Predicted high engagement content: {high_engagement_predictions.count()}\")\n", "print(f\"Recommendation coverage: {(high_engagement_predictions.count()/total_predictions)*100:.1f}%\")\n", "\n", "# Revenue impact estimation\n", "avg_watch_time_predicted = high_engagement_predictions.agg(F.avg(\"watch_time\")).collect()[0][0] or 0\n", "avg_watch_time_all = predictions.agg(F.avg(\"watch_time\")).collect()[0][0] or 0\n", "engagement_lift = ((avg_watch_time_predicted - avg_watch_time_all) / avg_watch_time_all) * 100\n", "\n", "print(f\"\\nAverage watch time for recommended content: {avg_watch_time_predicted:.2f} minutes\")\n", "print(f\"Average watch time overall: {avg_watch_time_all:.2f} minutes\")\n", "print(f\"Potential engagement lift: {engagement_lift:.1f}%\")\n", "\n", "# Model accuracy metrics\n", "accuracy = predictions.filter(\"high_engagement = prediction\").count() / predictions.count()\n", "precision = predictions.filter(\"prediction = 1 AND high_engagement = 1\").count() / predictions.filter(\"prediction = 1\").count() if predictions.filter(\"prediction = 1\").count() > 0 else 0\n", "\n", "print(f\"\\nModel Performance:\")\n", "print(f\"Accuracy: {accuracy:.4f}\")\n", "print(f\"Precision: {precision:.4f}\")\n", "print(f\"AUC: {auc:.4f}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Querying the Medallion Architecture\n", "\n", "### Demonstrating Data Flow and Optimization\n", "\n", "Let's run queries across all layers to show how the medallion architecture enables different types of analysis." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Bronze Layer: Raw Data ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+-------------------+------------+----------+----------------+\n", "| user_id| engagement_date|content_type|watch_time|engagement_score|\n", "+----------+-------------------+------------+----------+----------------+\n", "|USER000001|2024-08-16T08:36:00| Podcast| 15.01| 75|\n", "|USER000001|2024-02-06T20:47:00| Video| 13.89| 59|\n", "|USER000001|2024-12-31T13:58:00| Article| 19.38| 66|\n", "|USER000001|2024-07-19T14:43:00| Video| 30.19| 94|\n", "|USER000001|2024-04-13T07:42:00| Podcast| 57.78| 84|\n", "+----------+-------------------+------------+----------+----------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Query Bronze layer - raw data inspection\n", "print(\"=== Bronze Layer: Raw Data ===\")\n", "spark.sql(\"\"\"\n", "SELECT user_id, engagement_date, content_type, watch_time, engagement_score\n", "FROM media.bronze.content_engagement_raw\n", "WHERE user_id = 'USER000001'\n", "ORDER BY ingestion_timestamp DESC\n", "LIMIT 5\n", "\"\"\").show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Silver Layer: Cleaned Data ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+-------------------+------------+----------+----------------+-------------------+\n", "| user_id| engagement_date|content_type|watch_time|engagement_score|engagement_category|\n", "+----------+-------------------+------------+----------+----------------+-------------------+\n", "|USER000001|2024-12-31 13:58:00| Article| 19.38| 66| Medium|\n", "|USER000001|2024-11-15 17:00:00| Live Stream| 32.38| 50| Low|\n", "|USER000001|2024-10-20 17:29:00| Podcast| 56.98| 46| Low|\n", "|USER000001|2024-10-10 18:42:00| Article| 16.54| 48| Low|\n", "|USER000001|2024-09-28 06:49:00| Video| 34.18| 77| Medium|\n", "+----------+-------------------+------------+----------+----------------+-------------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Query Silver layer - cleaned data analysis\n", "print(\"=== Silver Layer: Cleaned Data ===\")\n", "spark.sql(\"\"\"\n", "SELECT user_id, engagement_date, content_type, watch_time, engagement_score, engagement_category\n", "FROM media.silver.content_engagement_clean\n", "WHERE user_id = 'USER000001'\n", "ORDER BY engagement_date DESC\n", "LIMIT 5\n", "\"\"\").show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Gold Layer: Business Analytics ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+------------+----------+-----------------+--------------+--------------------+--------------------+\n", "|content_type| date|total_engagements|avg_watch_time|avg_engagement_score|high_engagement_rate|\n", "+------------+----------+-----------------+--------------+--------------------+--------------------+\n", "| Video|2024-12-31| 42| 25.03| 70.07| 0.3333|\n", "| Video|2024-12-31| 45| 22.04| 68.6| 0.2889|\n", "| Video|2024-12-31| 41| 21.96| 65.98| 0.2195|\n", "| Video|2024-12-31| 48| 24.05| 73.25| 0.375|\n", "| Video|2024-12-31| 50| 20.02| 68.8| 0.28|\n", "+------------+----------+-----------------+--------------+--------------------+--------------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Query Gold layer - business analytics\n", "print(\"=== Gold Layer: Business Analytics ===\")\n", "spark.sql(\"\"\"\n", "SELECT content_type, date, total_engagements, avg_watch_time, avg_engagement_score, high_engagement_rate\n", "FROM media.gold.content_analytics\n", "WHERE content_type = 'Video'\n", "ORDER BY date DESC\n", "LIMIT 5\n", "\"\"\").show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Gold Layer: User Profiles ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+--------------+--------------------+----------------------+------------+\n", "| user_id|total_sessions|avg_engagement_score|preferred_content_type|user_segment|\n", "+----------+--------------+--------------------+----------------------+------------+\n", "|USER005822| 35| 71.77| Live Stream| Power User|\n", "|USER006268| 35| 63.14| Video| Power User|\n", "|USER004568| 35| 68.43| Live Stream| Power User|\n", "|USER006265| 35| 69.09| Video| Power User|\n", "|USER005047| 35| 64.77| Video| Power User|\n", "+----------+--------------+--------------------+----------------------+------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Query Gold layer - user profiles\n", "print(\"=== Gold Layer: User Profiles ===\")\n", "spark.sql(\"\"\"\n", "SELECT user_id, total_sessions, avg_engagement_score, preferred_content_type, user_segment\n", "FROM media.gold.user_profiles\n", "ORDER BY total_sessions DESC\n", "LIMIT 5\n", "\"\"\").show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Gold Layer: ML Predictions ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------+------------+----------------+-------------------------+----------------------+\n", "| user_id|content_type|engagement_score|predicted_high_engagement|prediction_probability|\n", "+----------+------------+----------------+-------------------------+----------------------+\n", "|USER006220| live stream| 55| 1.0| 0.7102|\n", "|USER002796| live stream| 54| 1.0| 0.682|\n", "|USER009903| live stream| 95| 1.0| 0.6719|\n", "|USER003578| live stream| 52| 1.0| 0.6697|\n", "|USER010685| live stream| 59| 1.0| 0.6669|\n", "+----------+------------+----------------+-------------------------+----------------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Query Gold layer - ML predictions\n", "print(\"=== Gold Layer: ML Predictions ===\")\n", "spark.sql(\"\"\"\n", "SELECT user_id, content_type, engagement_score, predicted_high_engagement, prediction_probability\n", "FROM media.gold.engagement_predictions\n", "WHERE predicted_high_engagement = 1\n", "ORDER BY prediction_probability DESC\n", "LIMIT 5\n", "\"\"\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Key Takeaways: Medallion Architecture with Delta Liquid Clustering\n", "\n", "### Architecture Benefits\n", "\n", "1. **Progressive Data Refinement**: Each layer serves specific analytical needs\n", " - Bronze: Data preservation and auditability\n", " - Silver: Clean, validated data for operational analytics\n", " - Gold: Business-ready aggregations and ML insights\n", "\n", "2. **Performance Optimization**: Liquid clustering automatically optimizes query performance\n", " - No manual partitioning or Z-Ordering required\n", " - Adaptive clustering adjusts to query patterns\n", " - Significant performance improvements for analytical workloads\n", "\n", "3. **Data Governance**: Clear separation of concerns and data quality management\n", " - Schema enforcement prevents data corruption\n", " - Time travel enables historical analysis\n", " - Catalog isolation provides security and governance\n", "\n", "### Business Impact for Media Companies\n", "\n", "1. **Personalized Content Discovery**: ML-driven recommendations increase engagement\n", "2. **Data-Driven Content Strategy**: Analytics guide content creation and acquisition\n", "3. **User Retention**: Better understanding of user behavior improves retention\n", "4. **Revenue Optimization**: Higher engagement drives subscription and advertising revenue\n", "5. **Operational Efficiency**: Automated data processing reduces manual effort\n", "\n", "### Technical Advantages\n", "\n", "- **Unified Analytics**: Seamless integration of data processing and ML\n", "- **Scalability**: Handles massive media datasets effortlessly\n", "- **Cost Efficiency**: Liquid clustering reduces storage and compute costs\n", "- **Developer Productivity**: Focus on business logic, not infrastructure\n", "\n", "### Best Practices\n", "\n", "1. **Layer Design**: Clearly define the purpose of each medallion layer\n", "2. **Clustering Strategy**: Choose clustering columns based on query patterns\n", "3. **Data Quality**: Implement comprehensive validation in the Silver layer\n", "4. **Incremental Processing**: Use Delta's capabilities for incremental updates\n", "5. **Monitoring**: Track data quality and pipeline performance\n", "\n", "### Next Steps\n", "\n", "- Implement real-time data ingestion pipelines\n", "- Add more sophisticated ML models (recommendation systems, churn prediction)\n", "- Integrate with content management systems\n", "- Deploy models for production recommendations\n", "- Scale to larger datasets and more complex analytics\n", "\n", "This medallion architecture demonstrates how Oracle AI Data Platform enables sophisticated media analytics while maintaining enterprise-grade performance, governance, and scalability." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" } }, "nbformat": 4, "nbformat_minor": 4 }