{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Hospitality: Medallion Architecture Demo\n", "\n", "## Overview\n", "\n", "This notebook demonstrates the **Medallion Architecture** in Oracle AI Data Platform (AIDP) Workbench using a hospitality and tourism analytics use case. The medallion architecture organizes data into bronze (raw), silver (cleaned), and gold (aggregated/analytics) layers, providing a clear data processing pipeline.\n", "\n", "### What is Medallion Architecture?\n", "\n", "The medallion architecture is a data design pattern that organizes data into three layers:\n", "\n", "- **Bronze Layer**: Raw data as ingested, minimal processing\n", "- **Silver Layer**: Cleaned, transformed, and validated data\n", "- **Gold Layer**: Business-level aggregates and analytics-ready data\n", "\n", "### Use Case: Hotel Guest Experience and Revenue Management\n", "\n", "We'll analyze hotel booking and guest experience data across all three layers, incorporating machine learning for churn prediction in the gold layer.\n", "\n", "### AIDP Environment Setup\n", "\n", "This notebook leverages the existing Spark session in your AIDP environment." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Hospitality catalog and bronze/silver/gold schemas created successfully!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create hospitality catalog and schemas for medallion architecture\n", "\n", "# In AIDP, catalogs provide data isolation and governance\n", "\n", "spark.sql(\"CREATE CATALOG IF NOT EXISTS hospitality\")\n", "\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS hospitality.bronze\")\n", "\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS hospitality.silver\")\n", "\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS hospitality.gold\")\n", "\n", "print(\"Hospitality catalog and bronze/silver/gold schemas created successfully!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Bronze Layer: Raw Data Ingestion\n", "\n", "### Purpose\n", "The bronze layer stores raw data as ingested from source systems, with minimal processing. This provides an immutable audit trail of all incoming data.\n", "\n", "### Table Design\n", "Our bronze `raw_guest_bookings` table will store raw booking data with:\n", "\n", "- Raw field names and types as they come from source systems\n", "- No data quality checks or transformations\n", "- Timestamps for data lineage\n", "- Source system metadata" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Bronze layer table created for raw guest bookings data!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create bronze layer table for raw guest booking data\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS hospitality.bronze.raw_guest_bookings (\n", " guest_id_raw STRING,\n", " booking_timestamp_raw STRING,\n", " checkin_timestamp_raw STRING,\n", " room_type_raw STRING,\n", " booking_channel_raw STRING,\n", " revenue_amount_raw STRING,\n", " satisfaction_score_raw STRING,\n", " source_system STRING,\n", " ingestion_timestamp TIMESTAMP,\n", " raw_data_quality_score DOUBLE\n", ")\n", "USING DELTA\n", "CLUSTER BY (guest_id_raw, ingestion_timestamp)\n", "\"\"\")\n", "\n", "print(\"Bronze layer table created for raw guest bookings data!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Generate Raw Hospitality Sample Data\n", "\n", "#### Data Generation Strategy\n", "We'll create realistic raw hotel booking data including:\n", "\n", "- **5,000 guests** with multiple bookings over time\n", "- **Raw data quality issues**: Missing values, inconsistent formats, outliers\n", "- **Multiple source systems**: PMS, OTA, Mobile App, Call Center\n", "- **Realistic data quality problems**: Typos, null values, format inconsistencies" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Generated 25088 raw guest booking records with simulated data quality issues\n", "Sample raw record: {'guest_id_raw': 'GST000001', 'booking_timestamp_raw': '2024-10-17T00:00:00', 'checkin_timestamp_raw': '2024-10-31T00:00:00', 'room_type_raw': 'Deluxe King', 'booking_channel_raw': 'direct', 'revenue_amount_raw': '1488.17', 'satisfaction_score_raw': '6/10', 'source_system': 'MOBILE_APP', 'ingestion_timestamp': datetime.datetime(2025, 12, 20, 2, 36, 11, 264132), 'raw_data_quality_score': 0.8}\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Generate raw hospitality booking data with quality issues\n", "\n", "import random\n", "from datetime import datetime, timedelta\n", "import uuid\n", "\n", "# Define raw data constants with potential quality issues\n", "ROOM_TYPES_RAW = ['standard', 'deluxe', 'SUITE', 'executive', 'Standard Room', 'Deluxe King', None]\n", "BOOKING_CHANNELS_RAW = ['direct', 'ota', 'corporate', 'walk-in', 'Direct Booking', 'Online Travel', None]\n", "SOURCE_SYSTEMS = ['PMS', 'OTA_API', 'MOBILE_APP', 'CALL_CENTER']\n", "\n", "# Generate raw booking records with data quality issues\n", "raw_booking_data = []\n", "base_date = datetime(2024, 1, 1)\n", "\n", "# Create 5,000 guests with 2-8 bookings each\n", "for guest_num in range(1, 5001):\n", " guest_id = f\"GST{guest_num:06d}\"\n", " \n", " # Each guest gets 2-8 bookings over 12 months\n", " num_bookings = random.randint(2, 8)\n", " \n", " for i in range(num_bookings):\n", " # Spread bookings over 12 months\n", " days_offset = random.randint(0, 365)\n", " booking_date = base_date + timedelta(days=days_offset)\n", " \n", " # Check-in date (usually within 1-30 days of booking)\n", " checkin_offset = random.randint(1, 30)\n", " check_in_date = booking_date + timedelta(days=checkin_offset)\n", " \n", " # Select room type (with potential quality issues)\n", " room_type = random.choice(ROOM_TYPES_RAW)\n", " \n", " # Select booking channel (with potential quality issues)\n", " booking_channel = random.choice(BOOKING_CHANNELS_RAW)\n", " \n", " # Source system\n", " source_system = random.choice(SOURCE_SYSTEMS)\n", " \n", " # Generate revenue with format inconsistencies\n", " base_revenue = random.uniform(100, 2000)\n", " if random.random() < 0.1: # 10% chance of format issues\n", " revenue_str = f\"${base_revenue:.2f}\" # Currency symbol\n", " elif random.random() < 0.1:\n", " revenue_str = f\"{base_revenue:.2f} USD\" # With currency code\n", " elif random.random() < 0.05:\n", " revenue_str = \"\" # Missing value\n", " else:\n", " revenue_str = str(round(base_revenue, 2)) # Normal numeric\n", " \n", " # Generate satisfaction score with inconsistencies\n", " if random.random() < 0.1:\n", " satisfaction_str = str(random.randint(1, 10)) + \"/10\" # With denominator\n", " elif random.random() < 0.05:\n", " satisfaction_str = \"\" # Missing value\n", " elif random.random() < 0.05:\n", " satisfaction_str = \"N/A\" # Text value\n", " else:\n", " satisfaction_str = str(random.randint(1, 10)) # Normal integer\n", " \n", " # Data quality score (simulated)\n", " quality_issues = sum([\n", " room_type is None,\n", " booking_channel is None,\n", " revenue_str == \"\",\n", " satisfaction_str in [\"\", \"N/A\"],\n", " \"/\" in satisfaction_str\n", " ])\n", " quality_score = max(0.1, 1.0 - (quality_issues * 0.2))\n", " \n", " raw_booking_data.append({\n", " \"guest_id_raw\": guest_id,\n", " \"booking_timestamp_raw\": booking_date.isoformat(),\n", " \"checkin_timestamp_raw\": check_in_date.isoformat(),\n", " \"room_type_raw\": room_type,\n", " \"booking_channel_raw\": booking_channel,\n", " \"revenue_amount_raw\": revenue_str,\n", " \"satisfaction_score_raw\": satisfaction_str,\n", " \"source_system\": source_system,\n", " \"ingestion_timestamp\": datetime.now(),\n", " \"raw_data_quality_score\": quality_score\n", " })\n", "\n", "print(f\"Generated {len(raw_booking_data)} raw guest booking records with simulated data quality issues\")\n", "print(\"Sample raw record:\", raw_booking_data[0])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Bronze Layer DataFrame Schema:\n", "root\n", " |-- booking_channel_raw: string (nullable = true)\n", " |-- booking_timestamp_raw: string (nullable = true)\n", " |-- checkin_timestamp_raw: string (nullable = true)\n", " |-- guest_id_raw: string (nullable = true)\n", " |-- ingestion_timestamp: timestamp (nullable = true)\n", " |-- raw_data_quality_score: double (nullable = true)\n", " |-- revenue_amount_raw: string (nullable = true)\n", " |-- room_type_raw: string (nullable = true)\n", " |-- satisfaction_score_raw: string (nullable = true)\n", " |-- source_system: string (nullable = true)\n", "\n", "\n", "Sample Raw Data:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-------------------+---------------------+---------------------+------------+--------------------+----------------------+------------------+-------------+----------------------+-------------+\n", "|booking_channel_raw|booking_timestamp_raw|checkin_timestamp_raw|guest_id_raw| ingestion_timestamp|raw_data_quality_score|revenue_amount_raw|room_type_raw|satisfaction_score_raw|source_system|\n", "+-------------------+---------------------+---------------------+------------+--------------------+----------------------+------------------+-------------+----------------------+-------------+\n", "| direct| 2024-10-17T00:00:00| 2024-10-31T00:00:00| GST000001|2025-12-20 02:36:...| 0.8| 1488.17| Deluxe King| 6/10| MOBILE_APP|\n", "| ota| 2024-04-15T00:00:00| 2024-04-24T00:00:00| GST000001|2025-12-20 02:36:...| 0.8| 856.31| NULL| 9| MOBILE_APP|\n", "| direct| 2024-09-24T00:00:00| 2024-10-22T00:00:00| GST000001|2025-12-20 02:36:...| 1.0| 1723.38| SUITE| 4| MOBILE_APP|\n", "| ota| 2024-11-05T00:00:00| 2024-11-28T00:00:00| GST000001|2025-12-20 02:36:...| 1.0| $109.50| deluxe| 1| OTA_API|\n", "| direct| 2024-04-08T00:00:00| 2024-05-02T00:00:00| GST000002|2025-12-20 02:36:...| 0.8| | executive| 2| MOBILE_APP|\n", "+-------------------+---------------------+---------------------+------------+--------------------+----------------------+------------------+-------------+----------------------+-------------+\n", "only showing top 5 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Successfully inserted 25088 raw records into bronze layer\n", "Bronze layer preserves raw data with all quality issues intact\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Insert raw data into bronze layer\n", "\n", "df_raw_bookings = spark.createDataFrame(raw_booking_data)\n", "\n", "# Display schema and sample data\n", "print(\"Bronze Layer DataFrame Schema:\")\n", "df_raw_bookings.printSchema()\n", "\n", "print(\"\\nSample Raw Data:\")\n", "df_raw_bookings.show(5)\n", "\n", "# Insert data into bronze table\n", "df_raw_bookings.write.mode(\"overwrite\").saveAsTable(\"hospitality.bronze.raw_guest_bookings\")\n", "\n", "print(f\"\\nSuccessfully inserted {df_raw_bookings.count()} raw records into bronze layer\")\n", "print(\"Bronze layer preserves raw data with all quality issues intact\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Silver Layer: Data Cleaning and Transformation\n", "\n", "### Purpose\n", "The silver layer contains cleaned, validated, and transformed data. Raw data from bronze is processed to:\n", "\n", "- Standardize formats and data types\n", "- Handle missing values and outliers\n", "- Apply data quality rules\n", "- Create derived fields\n", "- Ensure referential integrity\n", "\n", "### Transformation Logic\n", "- Standardize room types and booking channels\n", "- Parse and validate revenue amounts\n", "- Clean satisfaction scores\n", "- Add data quality metrics\n", "- Create business-relevant derived fields" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Silver layer table created for cleaned guest bookings data!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create silver layer table for cleaned guest booking data\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS hospitality.silver.cleaned_guest_bookings (\n", " guest_id STRING,\n", " booking_date DATE,\n", " check_in_date DATE,\n", " room_type STRING,\n", " booking_channel STRING,\n", " total_revenue DECIMAL(8,2),\n", " guest_satisfaction INT,\n", " source_system STRING,\n", " ingestion_timestamp TIMESTAMP,\n", " data_quality_score DOUBLE,\n", " booking_lead_time_days INT,\n", " weekend_booking BOOLEAN,\n", " peak_season BOOLEAN,\n", " processed_timestamp TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (guest_id, booking_date)\n", "\"\"\")\n", "\n", "print(\"Silver layer table created for cleaned guest bookings data!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Silver layer transformation completed\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Bronze records: 25088\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Silver records after cleaning: 25088\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+---------+------------+-------------+---------+---------------+-------------+------------------+-------------+--------------------+------------------+----------------------+---------------+-----------+--------------------+\n", "| guest_id|booking_date|check_in_date|room_type|booking_channel|total_revenue|guest_satisfaction|source_system| ingestion_timestamp|data_quality_score|booking_lead_time_days|weekend_booking|peak_season| processed_timestamp|\n", "+---------+------------+-------------+---------+---------------+-------------+------------------+-------------+--------------------+------------------+----------------------+---------------+-----------+--------------------+\n", "|GST003660| 2024-09-27| 2024-10-24| Unknown| Unknown| 1002.37| 4| MOBILE_APP|2025-12-20 02:36:...| 0.6| 27| false| false|2025-12-20 02:36:...|\n", "|GST003660| 2024-09-07| 2024-09-08| Standard| Corporate| 1489.75| 3| CALL_CENTER|2025-12-20 02:36:...| 1.0| 1| true| false|2025-12-20 02:36:...|\n", "|GST003661| 2024-02-10| 2024-02-24|Executive| Direct| 317.82| NULL| MOBILE_APP|2025-12-20 02:36:...| 0.7| 14| true| false|2025-12-20 02:36:...|\n", "|GST003661| 2024-05-20| 2024-05-24| Standard| Corporate| 417.06| 1| PMS|2025-12-20 02:36:...| 1.0| 4| false| false|2025-12-20 02:36:...|\n", "|GST003661| 2024-08-21| 2024-09-18| Standard| Unknown| 314.63| 7| MOBILE_APP|2025-12-20 02:36:...| 0.6| 28| false| false|2025-12-20 02:36:...|\n", "+---------+------------+-------------+---------+---------------+-------------+------------------+-------------+--------------------+------------------+----------------------+---------------+-----------+--------------------+\n", "only showing top 5 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Transform bronze data to silver layer\n", "\n", "from pyspark.sql.functions import *\n", "from pyspark.sql.types import *\n", "\n", "# Read bronze data\n", "bronze_df = spark.table(\"hospitality.bronze.raw_guest_bookings\")\n", "\n", "# Define UDFs for data cleaning\n", "def standardize_room_type(room_type):\n", " if room_type is None:\n", " return \"Unknown\"\n", " room_lower = str(room_type).lower().strip()\n", " if \"standard\" in room_lower:\n", " return \"Standard\"\n", " elif \"deluxe\" in room_lower:\n", " return \"Deluxe\"\n", " elif \"suite\" in room_lower:\n", " return \"Suite\"\n", " elif \"executive\" in room_lower:\n", " return \"Executive\"\n", " else:\n", " return \"Other\"\n", "\n", "def standardize_booking_channel(channel):\n", " if channel is None:\n", " return \"Unknown\"\n", " channel_lower = str(channel).lower().strip()\n", " if \"direct\" in channel_lower:\n", " return \"Direct\"\n", " elif \"ota\" in channel_lower or \"online\" in channel_lower or \"travel\" in channel_lower:\n", " return \"Online Travel Agency\"\n", " elif \"corporate\" in channel_lower:\n", " return \"Corporate\"\n", " elif \"walk\" in channel_lower:\n", " return \"Walk-in\"\n", " else:\n", " return \"Other\"\n", "\n", "def parse_revenue(revenue_str):\n", " if not revenue_str or revenue_str.strip() == \"\":\n", " return None\n", " try:\n", " # Remove currency symbols and extra text\n", " clean_str = str(revenue_str).replace(\"$\", \"\").replace(\"USD\", \"\").replace(\" \", \"\").strip()\n", " return float(clean_str)\n", " except:\n", " return None\n", "\n", "def parse_satisfaction(satisfaction_str):\n", " if not satisfaction_str or satisfaction_str.strip() in [\"\", \"N/A\"]:\n", " return None\n", " try:\n", " # Handle formats like \"8/10\"\n", " if \"/\" in str(satisfaction_str):\n", " parts = str(satisfaction_str).split(\"/\")\n", " return int(parts[0])\n", " return int(float(satisfaction_str))\n", " except:\n", " return None\n", "\n", "# Register UDFs\n", "standardize_room_udf = udf(standardize_room_type, StringType())\n", "standardize_channel_udf = udf(standardize_booking_channel, StringType())\n", "parse_revenue_udf = udf(parse_revenue, DoubleType())\n", "parse_satisfaction_udf = udf(parse_satisfaction, IntegerType())\n", "\n", "# Transform bronze to silver\n", "silver_df = bronze_df \\\n", " .withColumn(\"guest_id\", col(\"guest_id_raw\")) \\\n", " .withColumn(\"booking_date\", to_date(col(\"booking_timestamp_raw\"))) \\\n", " .withColumn(\"check_in_date\", to_date(col(\"checkin_timestamp_raw\"))) \\\n", " .withColumn(\"room_type\", standardize_room_udf(col(\"room_type_raw\"))) \\\n", " .withColumn(\"booking_channel\", standardize_channel_udf(col(\"booking_channel_raw\"))) \\\n", " .withColumn(\"revenue_parsed\", parse_revenue_udf(col(\"revenue_amount_raw\"))) \\\n", " .withColumn(\"satisfaction_parsed\", parse_satisfaction_udf(col(\"satisfaction_score_raw\"))) \\\n", " .withColumn(\"total_revenue\", round(col(\"revenue_parsed\"), 2).cast(DecimalType(8,2))) \\\n", " .withColumn(\"guest_satisfaction\", col(\"satisfaction_parsed\")) \\\n", " .withColumn(\"booking_lead_time_days\", datediff(col(\"check_in_date\"), col(\"booking_date\"))) \\\n", " .withColumn(\"weekend_booking\", dayofweek(col(\"booking_date\")).isin([1,7])) \\\n", " .withColumn(\"peak_season\", month(col(\"check_in_date\")).isin([6,7,8,11,12])) \\\n", " .withColumn(\"processed_timestamp\", current_timestamp()) \\\n", " .withColumn(\"data_quality_score\", \n", " when(col(\"total_revenue\").isNull() | col(\"guest_satisfaction\").isNull(), 0.7)\n", " .otherwise(col(\"raw_data_quality_score\"))) \\\n", " .select(\n", " \"guest_id\",\n", " \"booking_date\",\n", " \"check_in_date\",\n", " \"room_type\",\n", " \"booking_channel\",\n", " \"total_revenue\",\n", " \"guest_satisfaction\",\n", " \"source_system\",\n", " \"ingestion_timestamp\",\n", " \"data_quality_score\",\n", " \"booking_lead_time_days\",\n", " \"weekend_booking\",\n", " \"peak_season\",\n", " \"processed_timestamp\"\n", " )\n", "\n", "# Filter out records with critical data issues (optional - could be quarantined)\n", "silver_df_filtered = silver_df.filter(\n", " col(\"guest_id\").isNotNull() & \n", " col(\"booking_date\").isNotNull()\n", ")\n", "\n", "print(\"Silver layer transformation completed\")\n", "print(f\"Bronze records: {bronze_df.count()}\")\n", "print(f\"Silver records after cleaning: {silver_df_filtered.count()}\")\n", "\n", "# Show sample transformed data\n", "silver_df_filtered.show(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Successfully saved 25088 cleaned records to silver layer\n", "Silver layer provides standardized, validated data for downstream analytics\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Save silver layer data\n", "\n", "silver_df_filtered.write.mode(\"overwrite\").saveAsTable(\"hospitality.silver.cleaned_guest_bookings\")\n", "\n", "print(f\"Successfully saved {silver_df_filtered.count()} cleaned records to silver layer\")\n", "print(\"Silver layer provides standardized, validated data for downstream analytics\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Gold Layer: Analytics and Machine Learning\n", "\n", "### Purpose\n", "The gold layer contains business-ready aggregates and analytics data optimized for:\n", "\n", "- Reporting and dashboards\n", "- Business intelligence\n", "- Machine learning model training\n", "- API endpoints\n", "\n", "### Analytics Tables\n", "We'll create several gold layer tables:\n", "\n", "- `guest_analytics`: Guest-level aggregates and KPIs\n", "- `revenue_analytics`: Revenue performance by various dimensions\n", "- `churn_predictions`: ML model predictions and insights\n", "\n", "### Machine Learning Integration\n", "We'll train a guest churn prediction model using the cleaned silver data and store predictions in the gold layer." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Gold layer analytics tables created successfully!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create gold layer tables for analytics\n", "\n", "# Guest analytics table\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS hospitality.gold.guest_analytics (\n", " guest_id STRING,\n", " total_bookings INT,\n", " total_spent DECIMAL(10,2),\n", " avg_booking_value DECIMAL(8,2),\n", " avg_satisfaction DECIMAL(3,2),\n", " satisfaction_variability DECIMAL(3,2),\n", " room_types_used INT,\n", " channels_used INT,\n", " active_months INT,\n", " days_since_last_booking INT,\n", " customer_tenure_days INT,\n", " avg_advance_booking_days DECIMAL(5,2),\n", " preferred_room_type STRING,\n", " preferred_channel STRING,\n", " lifetime_value_segment STRING,\n", " updated_timestamp TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (lifetime_value_segment, total_spent)\n", "\"\"\")\n", "\n", "# Revenue analytics table\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS hospitality.gold.revenue_analytics (\n", " date_dimension DATE,\n", " dimension_type STRING,\n", " dimension_value STRING,\n", " total_bookings INT,\n", " total_revenue DECIMAL(12,2),\n", " avg_revenue DECIMAL(8,2),\n", " avg_satisfaction DECIMAL(3,2),\n", " unique_guests INT,\n", " booking_channel_mix MAP,\n", " room_type_mix MAP,\n", " updated_timestamp TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (dimension_type, date_dimension)\n", "\"\"\")\n", "\n", "# Churn predictions table\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS hospitality.gold.churn_predictions (\n", " guest_id STRING,\n", " churn_probability DECIMAL(3,3),\n", " churn_risk_level STRING,\n", " predicted_churn BOOLEAN,\n", " feature_importance MAP,\n", " intervention_recommendation STRING,\n", " potential_lifetime_value DECIMAL(10,2),\n", " prediction_timestamp TIMESTAMP,\n", " model_version STRING\n", ")\n", "USING DELTA\n", "CLUSTER BY (churn_risk_level, churn_probability)\n", "\"\"\")\n", "\n", "print(\"Gold layer analytics tables created successfully!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Command ID failed with java.lang.RuntimeException: java.lang.Exception: [\u001b[0;31m---------------------------------------------------------------------------\u001b[0m, \u001b[0;31mAnalysisException\u001b[0m Traceback (most recent call last), File \u001b[0;32m/tmp/ipykernel_100365/746186560.py:39\u001b[0m\n", "\u001b[1;32m 35\u001b[0m room_window \u001b[38;5;241m=\u001b[39m Window\u001b[38;5;241m.\u001b[39mpartitionBy(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mguest_id\u001b[39m\u001b[38;5;124m\"\u001b[39m)\u001b[38;5;241m.\u001b[39morderBy(desc(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mtotal_bookings\u001b[39m\u001b[38;5;124m\"\u001b[39m))\n", "\u001b[1;32m 36\u001b[0m channel_window \u001b[38;5;241m=\u001b[39m Window\u001b[38;5;241m.\u001b[39mpartitionBy(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mguest_id\u001b[39m\u001b[38;5;124m\"\u001b[39m)\u001b[38;5;241m.\u001b[39morderBy(desc(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mtotal_bookings\u001b[39m\u001b[38;5;124m\"\u001b[39m))\n", "\u001b[1;32m 38\u001b[0m room_prefs \u001b[38;5;241m=\u001b[39m \u001b[43msilver_df\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mgroupBy\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mguest_id\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mroom_type\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m)\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mcount\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\u001b[43m \u001b[49m\u001b[43m\\\u001b[49m\n", "\u001b[0;32m---> 39\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mwithColumn\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mrank\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mrank\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mover\u001b[49m\u001b[43m(\u001b[49m\u001b[43mroom_window\u001b[49m\u001b[43m)\u001b[49m\u001b[43m)\u001b[49m \\\n", "\u001b[1;32m 40\u001b[0m \u001b[38;5;241m.\u001b[39mfilter(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mrank = 1\u001b[39m\u001b[38;5;124m\"\u001b[39m) \\\n", "\u001b[1;32m 41\u001b[0m \u001b[38;5;241m.\u001b[39mselect(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mguest_id\u001b[39m\u001b[38;5;124m\"\u001b[39m, col(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mroom_type\u001b[39m\u001b[38;5;124m\"\u001b[39m)\u001b[38;5;241m.\u001b[39malias(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mpreferred_room_type\u001b[39m\u001b[38;5;124m\"\u001b[39m))\n", "\u001b[1;32m 43\u001b[0m channel_prefs \u001b[38;5;241m=\u001b[39m silver_df\u001b[38;5;241m.\u001b[39mgroupBy(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mguest_id\u001b[39m\u001b[38;5;124m\"\u001b[39m, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mbooking_channel\u001b[39m\u001b[38;5;124m\"\u001b[39m)\u001b[38;5;241m.\u001b[39mcount() \\\n", "\u001b[1;32m 44\u001b[0m \u001b[38;5;241m.\u001b[39mwithColumn(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mrank\u001b[39m\u001b[38;5;124m\"\u001b[39m, rank()\u001b[38;5;241m.\u001b[39mover(channel_window)) \\\n", "\u001b[1;32m 45\u001b[0m \u001b[38;5;241m.\u001b[39mfilter(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mrank = 1\u001b[39m\u001b[38;5;124m\"\u001b[39m) \\\n", "\u001b[1;32m 46\u001b[0m \u001b[38;5;241m.\u001b[39mselect(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mguest_id\u001b[39m\u001b[38;5;124m\"\u001b[39m, col(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mbooking_channel\u001b[39m\u001b[38;5;124m\"\u001b[39m)\u001b[38;5;241m.\u001b[39malias(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mpreferred_channel\u001b[39m\u001b[38;5;124m\"\u001b[39m))\n", "\u001b[1;32m 48\u001b[0m \u001b[38;5;66;03m# Join preferences\u001b[39;00m\n", ", File \u001b[0;32m/opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py:5170\u001b[0m, in \u001b[0;36mDataFrame.withColumn\u001b[0;34m(self, colName, col)\u001b[0m\n", "\u001b[1;32m 5165\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(col, Column):\n", "\u001b[1;32m 5166\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m PySparkTypeError(\n", "\u001b[1;32m 5167\u001b[0m error_class\u001b[38;5;241m=\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mNOT_COLUMN\u001b[39m\u001b[38;5;124m\"\u001b[39m,\n", "\u001b[1;32m 5168\u001b[0m message_parameters\u001b[38;5;241m=\u001b[39m{\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124marg_name\u001b[39m\u001b[38;5;124m\"\u001b[39m: \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mcol\u001b[39m\u001b[38;5;124m\"\u001b[39m, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124marg_type\u001b[39m\u001b[38;5;124m\"\u001b[39m: \u001b[38;5;28mtype\u001b[39m(col)\u001b[38;5;241m.\u001b[39m\u001b[38;5;18m__name__\u001b[39m},\n", "\u001b[1;32m 5169\u001b[0m )\n", "\u001b[0;32m-> 5170\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m DataFrame(\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_jdf\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mwithColumn\u001b[49m\u001b[43m(\u001b[49m\u001b[43mcolName\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mcol\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_jc\u001b[49m\u001b[43m)\u001b[49m, \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39msparkSession)\n", ", File \u001b[0;32m/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322\u001b[0m, in \u001b[0;36mJavaMember.__call__\u001b[0;34m(self, *args)\u001b[0m\n", "\u001b[1;32m 1316\u001b[0m command \u001b[38;5;241m=\u001b[39m proto\u001b[38;5;241m.\u001b[39mCALL_COMMAND_NAME \u001b[38;5;241m+\u001b[39m\\\n", "\u001b[1;32m 1317\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mcommand_header \u001b[38;5;241m+\u001b[39m\\\n", "\u001b[1;32m 1318\u001b[0m args_command \u001b[38;5;241m+\u001b[39m\\\n", "\u001b[1;32m 1319\u001b[0m proto\u001b[38;5;241m.\u001b[39mEND_COMMAND_PART\n", "\u001b[1;32m 1321\u001b[0m answer \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mgateway_client\u001b[38;5;241m.\u001b[39msend_command(command)\n", "\u001b[0;32m-> 1322\u001b[0m return_value \u001b[38;5;241m=\u001b[39m \u001b[43mget_return_value\u001b[49m\u001b[43m(\u001b[49m\n", "\u001b[1;32m 1323\u001b[0m \u001b[43m \u001b[49m\u001b[43manswer\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mgateway_client\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mtarget_id\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mname\u001b[49m\u001b[43m)\u001b[49m\n", "\u001b[1;32m 1325\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m temp_arg \u001b[38;5;129;01min\u001b[39;00m temp_args:\n", "\u001b[1;32m 1326\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mhasattr\u001b[39m(temp_arg, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m_detach\u001b[39m\u001b[38;5;124m\"\u001b[39m):\n", ", File \u001b[0;32m/opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py:185\u001b[0m, in \u001b[0;36mcapture_sql_exception..deco\u001b[0;34m(*a, **kw)\u001b[0m\n", "\u001b[1;32m 181\u001b[0m converted \u001b[38;5;241m=\u001b[39m convert_exception(e\u001b[38;5;241m.\u001b[39mjava_exception)\n", "\u001b[1;32m 182\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(converted, UnknownException):\n", "\u001b[1;32m 183\u001b[0m \u001b[38;5;66;03m# Hide where the exception came from that shows a non-Pythonic\u001b[39;00m\n", "\u001b[1;32m 184\u001b[0m \u001b[38;5;66;03m# JVM exception message.\u001b[39;00m\n", "\u001b[0;32m--> 185\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m converted \u001b[38;5;28;01mfrom\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[38;5;28;01mNone\u001b[39;00m\n", "\u001b[1;32m 186\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n", "\u001b[1;32m 187\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m\n", ", \u001b[0;31mAnalysisException\u001b[0m: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `total_bookings` cannot be resolved. Did you mean one of the following? [`count`, `hospitality`.`silver`.`cleaned_guest_bookings`.`guest_id`, `hospitality`.`silver`.`cleaned_guest_bookings`.`room_type`].;\n", "'Project [guest_id#106236, room_type#106239, count#106390L, rank() windowspecdefinition(guest_id#106236, 'total_bookings DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#106396]\n", "+- Aggregate [guest_id#106236, room_type#106239], [guest_id#106236, room_type#106239, count(1) AS count#106390L]\n", " +- SubqueryAlias hospitality.silver.cleaned_guest_bookings\n", " +- Relation hospitality.silver.cleaned_guest_bookings[guest_id#106236,booking_date#106237,check_in_date#106238,room_type#106239,booking_channel#106240,total_revenue#106241,guest_satisfaction#106242,source_system#106243,ingestion_timestamp#106244,data_quality_score#106245,booking_lead_time_days#106246,weekend_booking#106247,peak_season#106248,processed_timestamp#106249] parquet\n", "]" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Generate guest analytics from silver layer\n", "\n", "from pyspark.sql import Window\n", "\n", "silver_df = spark.table(\"hospitality.silver.cleaned_guest_bookings\")\n", "\n", "# Guest analytics\n", "guest_analytics = silver_df.groupBy(\"guest_id\").agg(\n", " count(\"*\").alias(\"total_bookings\"),\n", " round(sum(\"total_revenue\"), 2).alias(\"total_spent\"),\n", " round(avg(\"total_revenue\"), 2).alias(\"avg_booking_value\"),\n", " round(avg(\"guest_satisfaction\"), 2).alias(\"avg_satisfaction\"),\n", " round(stddev(\"guest_satisfaction\"), 2).alias(\"satisfaction_variability\"),\n", " countDistinct(\"room_type\").alias(\"room_types_used\"),\n", " countDistinct(\"booking_channel\").alias(\"channels_used\"),\n", " countDistinct(date_format(\"check_in_date\", \"yyyy-MM\")).alias(\"active_months\"),\n", " datediff(current_date(), max(\"booking_date\")).alias(\"days_since_last_booking\"),\n", " datediff(current_date(), min(\"booking_date\")).alias(\"customer_tenure_days\"),\n", " round(avg(\"booking_lead_time_days\"), 2).alias(\"avg_advance_booking_days\")\n", ")\n", "\n", "# Add derived fields\n", "guest_analytics = guest_analytics.withColumn(\n", " \"preferred_room_type\",\n", " expr(\"\"\"\n", " CASE \n", " WHEN total_spent > 5000 THEN 'High-Value'\n", " WHEN total_spent > 2000 THEN 'Medium-Value'\n", " ELSE 'Standard-Value'\n", " END\n", " \"\"\").alias(\"lifetime_value_segment\")\n", ").withColumn(\"updated_timestamp\", current_timestamp())\n", "\n", "# Add preferred room type and channel using window functions\n", "room_window = Window.partitionBy(\"guest_id\").orderBy(desc(\"total_bookings\"))\n", "channel_window = Window.partitionBy(\"guest_id\").orderBy(desc(\"total_bookings\"))\n", "\n", "room_prefs = silver_df.groupBy(\"guest_id\", \"room_type\").count() \\\n", " .withColumn(\"rank\", rank().over(room_window)) \\\n", " .filter(\"rank = 1\") \\\n", " .select(\"guest_id\", col(\"room_type\").alias(\"preferred_room_type\"))\n", "\n", "channel_prefs = silver_df.groupBy(\"guest_id\", \"booking_channel\").count() \\\n", " .withColumn(\"rank\", rank().over(channel_window)) \\\n", " .filter(\"rank = 1\") \\\n", " .select(\"guest_id\", col(\"booking_channel\").alias(\"preferred_channel\"))\n", "\n", "# Join preferences\n", "guest_analytics = guest_analytics \\\n", " .join(room_prefs, \"guest_id\", \"left\") \\\n", " .join(channel_prefs, \"guest_id\", \"left\")\n", "\n", "print(f\"Generated guest analytics for {guest_analytics.count()} guests\")\n", "guest_analytics.show(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Generate revenue analytics\n", "\n", "# Room type analytics\n", "room_analytics = silver_df.groupBy(\n", " date_format(\"booking_date\", \"yyyy-MM\").alias(\"date_dimension\")\n", ").agg(\n", " count(\"*\").alias(\"total_bookings\"),\n", " round(sum(\"total_revenue\"), 2).alias(\"total_revenue\"),\n", " round(avg(\"total_revenue\"), 2).alias(\"avg_revenue\"),\n", " round(avg(\"guest_satisfaction\"), 2).alias(\"avg_satisfaction\"),\n", " countDistinct(\"guest_id\").alias(\"unique_guests\")\n", ").withColumn(\"dimension_type\", lit(\"monthly\")).withColumn(\"dimension_value\", col(\"date_dimension\")) \\\n", " .withColumn(\"updated_timestamp\", current_timestamp())\n", "\n", "# Add mix data (simplified)\n", "room_analytics = room_analytics.withColumn(\"booking_channel_mix\", lit(None).cast(\"map\")) \\\n", " .withColumn(\"room_type_mix\", lit(None).cast(\"map\"))\n", "\n", "print(f\"Generated monthly revenue analytics for {room_analytics.count()} months\")\n", "room_analytics.show(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Save gold layer analytics data\n", "\n", "guest_analytics.write.mode(\"overwrite\").saveAsTable(\"hospitality.gold.guest_analytics\")\n", "room_analytics.write.mode(\"overwrite\").saveAsTable(\"hospitality.gold.revenue_analytics\")\n", "\n", "print(\"Gold layer analytics tables populated successfully!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Train churn prediction model and generate predictions\n", "\n", "from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler\n", "from pyspark.ml.classification import RandomForestClassifier\n", "from pyspark.ml.evaluation import BinaryClassificationEvaluator\n", "from pyspark.ml import Pipeline\n", "import pyspark.sql.functions as F\n", "\n", "# Read guest analytics for ML\n", "guest_ml_data = spark.table(\"hospitality.gold.guest_analytics\")\n", "\n", "# Create churn risk label (simulated)\n", "guest_ml_data = guest_ml_data.withColumn(\n", " \"churn_risk\",\n", " when(\n", " (col(\"days_since_last_booking\") > 90) | \n", " (col(\"avg_satisfaction\") < 7) | \n", " (col(\"total_bookings\") < 3),\n", " 1\n", " ).otherwise(0)\n", ")\n", "\n", "# Feature engineering\n", "feature_cols = [\n", " \"total_bookings\", \"total_spent\", \"avg_booking_value\", \"avg_satisfaction\", \n", " \"satisfaction_variability\", \"room_types_used\", \"channels_used\", \n", " \"active_months\", \"days_since_last_booking\", \"customer_tenure_days\", \n", " \"avg_advance_booking_days\"\n", "]\n", "\n", "assembler = VectorAssembler(\n", " inputCols=feature_cols,\n", " outputCol=\"features\"\n", ")\n", "\n", "scaler = StandardScaler(inputCol=\"features\", outputCol=\"scaled_features\")\n", "\n", "# Train model\n", "rf = RandomForestClassifier(\n", " labelCol=\"churn_risk\", \n", " featuresCol=\"scaled_features\",\n", " numTrees=100,\n", " maxDepth=10,\n", " seed=42\n", ")\n", "\n", "pipeline = Pipeline(stages=[assembler, scaler, rf])\n", "\n", "# Split data\n", "train_data, test_data = guest_ml_data.randomSplit([0.8, 0.2], seed=42)\n", "\n", "print(f\"Training set: {train_data.count()} guests\")\n", "print(f\"Test set: {test_data.count()} guests\")\n", "\n", "# Train model\n", "print(\"Training churn prediction model...\")\n", "model = pipeline.fit(train_data)\n", "\n", "# Make predictions\n", "predictions = model.transform(test_data)\n", "\n", "# Evaluate\n", "evaluator = BinaryClassificationEvaluator(labelCol=\"churn_risk\", metricName=\"areaUnderROC\")\n", "auc = evaluator.evaluate(predictions)\n", "\n", "print(f\"Model AUC: {auc:.4f}\")\n", "\n", "# Generate churn predictions for all guests\n", "all_predictions = model.transform(guest_ml_data)\n", "\n", "# Create churn predictions table data\n", "churn_predictions = all_predictions.select(\n", " \"guest_id\",\n", " round(col(\"probability\").getItem(1), 3).alias(\"churn_probability\"),\n", " when(col(\"prediction\") == 1, \"High\").when(col(\"probability\").getItem(1) > 0.3, \"Medium\").otherwise(\"Low\").alias(\"churn_risk_level\"),\n", " (col(\"prediction\") == 1).alias(\"predicted_churn\"),\n", " lit(None).cast(\"map\").alias(\"feature_importance\"), # Simplified\n", " when(col(\"prediction\") == 1, \"Urgent retention campaign needed\")\n", " .when(col(\"probability\").getItem(1) > 0.3, \"Monitor closely and send loyalty offers\")\n", " .otherwise(\"Maintain regular communication\").alias(\"intervention_recommendation\"),\n", " col(\"total_spent\").alias(\"potential_lifetime_value\"),\n", " current_timestamp().alias(\"prediction_timestamp\"),\n", " lit(\"v1.0\").alias(\"model_version\")\n", ")\n", "\n", "print(f\"Generated churn predictions for {churn_predictions.count()} guests\")\n", "churn_predictions.show(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Save churn predictions to gold layer\n", "\n", "churn_predictions.write.mode(\"overwrite\").saveAsTable(\"hospitality.gold.churn_predictions\")\n", "\n", "print(\"Churn predictions saved to gold layer!\")\n", "print(\"Gold layer now contains complete analytics and ML predictions\")\n", "\n", "# Business impact summary\n", "high_risk_guests = churn_predictions.filter(\"churn_risk_level = 'High'\").count()\n", "total_guests = churn_predictions.count()\n", "avg_lifetime_value = churn_predictions.agg(avg(\"potential_lifetime_value\")).collect()[0][0]\n", "\n", "print(f\"\\nBusiness Impact Summary:\")\n", "print(f\"Total guests analyzed: {total_guests}\")\n", "print(f\"High-risk churn guests identified: {high_risk_guests}\")\n", "print(f\"Average lifetime value per guest: ${avg_lifetime_value:,.2f}\")\n", "print(f\"Potential revenue at risk: ${(high_risk_guests * avg_lifetime_value):,.0f}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Key Takeaways: Medallion Architecture in AIDP\n", "\n", "### What We Demonstrated\n", "\n", "1. **Bronze Layer**: Raw data ingestion with data quality issues preserved\n", "2. **Silver Layer**: Data cleaning, standardization, and validation\n", "3. **Gold Layer**: Business analytics and machine learning predictions\n", "4. **End-to-End Pipeline**: Complete data processing from raw to insights\n", "\n", "### Medallion Architecture Benefits\n", "\n", "- **Data Quality**: Progressive improvement through layers\n", "- **Governance**: Clear data lineage and audit trails\n", "- **Performance**: Optimized clustering for different access patterns\n", "- **Flexibility**: Each layer serves different use cases\n", "- **Maintainability**: Clear separation of concerns\n", "\n", "### Business Value for Hospitality\n", "\n", "1. **Data Quality Management**: Track and improve data quality from source\n", "2. **Customer Insights**: Rich analytics on guest behavior and preferences\n", "3. **Revenue Optimization**: ML-driven churn prevention and lifetime value\n", "4. **Operational Intelligence**: Multi-dimensional analytics for decision making\n", "5. **Scalability**: Architecture scales with business growth\n", "\n", "### AIDP Advantages\n", "\n", "- **Unified Platform**: Single environment for all data processing layers\n", "- **Delta Lake**: ACID transactions, time travel, and optimized performance\n", "- **ML Integration**: Seamless ML training and deployment\n", "- **Governance**: Catalog and schema isolation\n", "- **Performance**: Automatic optimization and clustering\n", "\n", "### Best Practices\n", "\n", "1. **Layer Design**: Each layer has a specific purpose and audience\n", "2. **Data Contracts**: Define clear schemas and expectations per layer\n", "3. **Quality Gates**: Implement data quality checks between layers\n", "4. **Access Control**: Different permissions for different layers\n", "5. **Monitoring**: Track data quality and pipeline health\n", "\n", "### Next Steps\n", "\n", "- Add real-time data ingestion to bronze layer\n", "- Implement automated data quality monitoring\n", "- Deploy ML models for real-time predictions\n", "- Create APIs for gold layer analytics\n", "- Add more sophisticated ML models and features\n", "- Integrate with actual hospitality systems\n", "\n", "This notebook demonstrates how Oracle AI Data Platform enables sophisticated data architectures that drive real business value in the hospitality industry." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" } }, "nbformat": 4, "nbformat_minor": 4 }