{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Real Estate: Medallion Architecture Demo\n", "\n", "## Overview\n", "\n", "This notebook demonstrates a **Medallion Architecture** implementation in Oracle AI Data Platform (AIDP) Workbench using a real estate analytics use case. The medallion architecture organizes data into three layers:\n", "\n", "- **Bronze Layer**: Raw data ingestion and storage\n", "- **Silver Layer**: Cleaned, validated, and structured data\n", "- **Gold Layer**: Aggregated, analytics-ready data with ML models\n", "\n", "### What is Medallion Architecture?\n", "\n", "The medallion architecture provides a structured approach to data processing:\n", "\n", "- **Bronze**: Raw, unprocessed data as ingested\n", "- **Silver**: Cleansed, validated, and enriched data\n", "- **Gold**: Business-ready data for analytics and ML\n", "\n", "### Use Case: Property Transaction Analytics and Price Prediction\n", "\n", "We'll analyze real estate transactions and property market data across all three layers, culminating in ML-powered price prediction for property valuation.\n", "\n", "### AIDP Environment Setup\n", "\n", "This notebook leverages the existing Spark session in your AIDP environment." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setup: Create Real Estate Catalog and Medallion Schemas\n", "\n", "### Catalog and Schema Design\n", "\n", "We'll create:\n", "\n", "- `real_estate.bronze`: Raw transaction data\n", "- `real_estate.silver`: Cleaned and validated transactions\n", "- `real_estate.gold`: Analytics and ML-ready data\n", "\n", "This structure provides data isolation and governance across layers." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Real estate catalog and medallion schemas created successfully!\n", "- real_estate.bronze: Raw transaction data\n", "- real_estate.silver: Cleaned and validated data\n", "- real_estate.gold: Analytics and ML-ready data\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create real estate catalog and medallion schemas\n", "\n", "spark.sql(\"CREATE CATALOG IF NOT EXISTS real_estate\")\n", "\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS real_estate.bronze\")\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS real_estate.silver\")\n", "spark.sql(\"CREATE SCHEMA IF NOT EXISTS real_estate.gold\")\n", "\n", "print(\"Real estate catalog and medallion schemas created successfully!\")\n", "print(\"- real_estate.bronze: Raw transaction data\")\n", "print(\"- real_estate.silver: Cleaned and validated data\")\n", "print(\"- real_estate.gold: Analytics and ML-ready data\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Bronze Layer: Raw Data Ingestion\n", "\n", "### Bronze Layer Design\n", "\n", "The bronze layer stores raw property transaction data as ingested, with minimal processing. We'll use Delta tables with liquid clustering for optimal performance.\n", "\n", "### Table: `property_transactions_bronze`\n", "\n", "- Raw transaction records with all original fields\n", "- Liquid clustering on `property_id` and `transaction_date`\n", "- Preserves data integrity and auditability" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Bronze layer table created successfully!\n", "Liquid clustering will automatically optimize data layout for property_id and transaction_date queries.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create Bronze Layer Delta table with liquid clustering\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS real_estate.bronze.property_transactions_bronze (\n", " property_id STRING,\n", " transaction_date DATE,\n", " property_type STRING,\n", " sale_price DECIMAL(12,2),\n", " location STRING,\n", " days_on_market INT,\n", " price_per_sqft DECIMAL(8,2),\n", " ingestion_timestamp TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (property_id, transaction_date)\n", "\"\"\")\n", "\n", "print(\"Bronze layer table created successfully!\")\n", "print(\"Liquid clustering will automatically optimize data layout for property_id and transaction_date queries.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Generated 11339 raw property transaction records for Bronze layer\n", "Sample record: {'property_id': 'PROP000001', 'transaction_date': datetime.date(2024, 5, 17), 'property_type': 'Single Family', 'sale_price': 1313507.88, 'location': 'Waterfront', 'days_on_market': 61, 'price_per_sqft': 409.32}\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Generate sample real estate transaction data for Bronze layer\n", "\n", "import random\n", "from datetime import datetime, timedelta\n", "\n", "# Define real estate data constants\n", "\n", "PROPERTY_TYPES = ['Single Family', 'Condo', 'Townhouse', 'Apartment', 'Commercial']\n", "LOCATIONS = ['Downtown', 'Suburban', 'Waterfront', 'Mountain View', 'Urban Core', 'Residential District']\n", "\n", "# Base pricing parameters by property type and location\n", "PRICE_PARAMS = {\n", " 'Single Family': {\n", " 'Downtown': {'base_price': 850000, 'sqft_range': (1800, 3500)},\n", " 'Suburban': {'base_price': 650000, 'sqft_range': (2000, 4000)},\n", " 'Waterfront': {'base_price': 1200000, 'sqft_range': (2200, 4500)},\n", " 'Mountain View': {'base_price': 750000, 'sqft_range': (1900, 3800)},\n", " 'Urban Core': {'base_price': 950000, 'sqft_range': (1600, 3200)},\n", " 'Residential District': {'base_price': 700000, 'sqft_range': (2100, 4200)}\n", " },\n", " 'Condo': {\n", " 'Downtown': {'base_price': 550000, 'sqft_range': (800, 1800)},\n", " 'Suburban': {'base_price': 350000, 'sqft_range': (900, 2000)},\n", " 'Waterfront': {'base_price': 750000, 'sqft_range': (1000, 2200)},\n", " 'Mountain View': {'base_price': 450000, 'sqft_range': (850, 1900)},\n", " 'Urban Core': {'base_price': 650000, 'sqft_range': (750, 1700)},\n", " 'Residential District': {'base_price': 400000, 'sqft_range': (950, 2100)}\n", " },\n", " 'Townhouse': {\n", " 'Downtown': {'base_price': 700000, 'sqft_range': (1400, 2800)},\n", " 'Suburban': {'base_price': 550000, 'sqft_range': (1600, 3200)},\n", " 'Waterfront': {'base_price': 900000, 'sqft_range': (1500, 3000)},\n", " 'Mountain View': {'base_price': 600000, 'sqft_range': (1450, 2900)},\n", " 'Urban Core': {'base_price': 800000, 'sqft_range': (1300, 2600)},\n", " 'Residential District': {'base_price': 580000, 'sqft_range': (1650, 3300)}\n", " },\n", " 'Apartment': {\n", " 'Downtown': {'base_price': 450000, 'sqft_range': (600, 1400)},\n", " 'Suburban': {'base_price': 280000, 'sqft_range': (650, 1500)},\n", " 'Waterfront': {'base_price': 600000, 'sqft_range': (700, 1600)},\n", " 'Mountain View': {'base_price': 350000, 'sqft_range': (625, 1450)},\n", " 'Urban Core': {'base_price': 520000, 'sqft_range': (550, 1300)},\n", " 'Residential District': {'base_price': 320000, 'sqft_range': (675, 1550)}\n", " },\n", " 'Commercial': {\n", " 'Downtown': {'base_price': 2500000, 'sqft_range': (3000, 10000)},\n", " 'Suburban': {'base_price': 1500000, 'sqft_range': (2500, 8000)},\n", " 'Waterfront': {'base_price': 3500000, 'sqft_range': (4000, 12000)},\n", " 'Mountain View': {'base_price': 1800000, 'sqft_range': (2800, 9000)},\n", " 'Urban Core': {'base_price': 3000000, 'sqft_range': (3500, 11000)},\n", " 'Residential District': {'base_price': 1600000, 'sqft_range': (2600, 8500)}\n", " }\n", "}\n", "\n", "# Generate property transaction records\n", "transaction_data = []\n", "base_date = datetime(2024, 1, 1)\n", "\n", "# Create 8,000 properties with multiple transactions over time\n", "for property_num in range(1, 8001):\n", " property_id = f\"PROP{property_num:06d}\"\n", " \n", " # Each property gets 1-4 transactions over 12 months (most have 1, some flip/resale)\n", " num_transactions = random.choices([1, 2, 3, 4], weights=[0.7, 0.2, 0.08, 0.02])[0]\n", " \n", " # Select property type and location (consistent for the same property)\n", " property_type = random.choice(PROPERTY_TYPES)\n", " location = random.choice(LOCATIONS)\n", " \n", " params = PRICE_PARAMS[property_type][location]\n", " \n", " # Base square footage for this property\n", " sqft = random.randint(params['sqft_range'][0], params['sqft_range'][1])\n", " \n", " for i in range(num_transactions):\n", " # Spread transactions over 12 months\n", " days_offset = random.randint(0, 365)\n", " transaction_date = base_date + timedelta(days=days_offset)\n", " \n", " # Calculate sale price with market variations\n", " # Seasonal pricing (higher in spring/summer)\n", " month = transaction_date.month\n", " if month in [3, 4, 5, 6]: # Spring/Summer peak\n", " seasonal_factor = 1.15\n", " elif month in [11, 12, 1, 2]: # Winter off-season\n", " seasonal_factor = 0.9\n", " else:\n", " seasonal_factor = 1.0\n", " \n", " # Market appreciation over time (slight increase)\n", " months_elapsed = (transaction_date.year - base_date.year) * 12 + (transaction_date.month - base_date.month)\n", " appreciation_factor = 1.0 + (months_elapsed * 0.002) # 0.2% monthly appreciation\n", " \n", " # Calculate price per square foot\n", " base_price_per_sqft = params['base_price'] / ((params['sqft_range'][0] + params['sqft_range'][1]) / 2)\n", " price_per_sqft = round(base_price_per_sqft * seasonal_factor * appreciation_factor * random.uniform(0.9, 1.1), 2)\n", " \n", " # Calculate total sale price\n", " sale_price = round(price_per_sqft * sqft, 2)\n", " \n", " # Days on market (varies by property type and market conditions)\n", " if property_type == 'Commercial':\n", " days_on_market = random.randint(30, 180)\n", " else:\n", " days_on_market = random.randint(7, 90)\n", " \n", " transaction_data.append({\n", " \"property_id\": property_id,\n", " \"transaction_date\": transaction_date.date(),\n", " \"property_type\": property_type,\n", " \"sale_price\": sale_price,\n", " \"location\": location,\n", " \"days_on_market\": days_on_market,\n", " \"price_per_sqft\": price_per_sqft\n", " })\n", "\n", "print(f\"Generated {len(transaction_data)} raw property transaction records for Bronze layer\")\n", "print(\"Sample record:\", transaction_data[0])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Bronze Layer DataFrame Schema:\n", "root\n", " |-- days_on_market: long (nullable = true)\n", " |-- location: string (nullable = true)\n", " |-- price_per_sqft: double (nullable = true)\n", " |-- property_id: string (nullable = true)\n", " |-- property_type: string (nullable = true)\n", " |-- sale_price: double (nullable = true)\n", " |-- transaction_date: date (nullable = true)\n", "\n", "\n", "Sample Bronze Data:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+--------------+----------+--------------+-----------+-------------+----------+----------------+\n", "|days_on_market| location|price_per_sqft|property_id|property_type|sale_price|transaction_date|\n", "+--------------+----------+--------------+-----------+-------------+----------+----------------+\n", "| 61|Waterfront| 409.32| PROP000001|Single Family|1313507.88| 2024-05-17|\n", "| 54|Urban Core| 359.99| PROP000002| Townhouse| 858576.15| 2024-11-08|\n", "| 21|Waterfront| 441.11| PROP000003| Apartment| 663870.55| 2024-01-27|\n", "| 66|Urban Core| 400.79| PROP000004| Townhouse| 648077.43| 2024-01-20|\n", "| 68|Urban Core| 344.44| PROP000004| Townhouse| 556959.48| 2024-11-30|\n", "+--------------+----------+--------------+-----------+-------------+----------+----------------+\n", "only showing top 5 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Successfully inserted 11339 raw records into Bronze layer\n", "Data is now available for Silver layer processing.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Insert raw data into Bronze layer\n", "\n", "# Create DataFrame from generated data\n", "df_bronze = spark.createDataFrame(transaction_data)\n", "\n", "# Display schema and sample data\n", "print(\"Bronze Layer DataFrame Schema:\")\n", "df_bronze.printSchema()\n", "\n", "print(\"\\nSample Bronze Data:\")\n", "df_bronze.show(5)\n", "\n", "# Insert data into Bronze table\n", "df_bronze.write.mode(\"overwrite\").saveAsTable(\"real_estate.bronze.property_transactions_bronze\")\n", "\n", "print(f\"\\nSuccessfully inserted {df_bronze.count()} raw records into Bronze layer\")\n", "print(\"Data is now available for Silver layer processing.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Silver Layer: Data Cleaning and Validation\n", "\n", "### Silver Layer Design\n", "\n", "The silver layer provides cleaned, validated, and enriched property data. We'll:\n", "\n", "- Remove invalid records\n", "- Standardize data formats\n", "- Add data quality metrics\n", "- Enrich with temporal features\n", "\n", "### Table: `property_transactions_silver`\n", "\n", "- Cleaned transaction data with validation flags\n", "- Enhanced with market timing features\n", "- Ready for analytical processing" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Silver layer table created successfully!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create Silver Layer Delta table\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS real_estate.silver.property_transactions_silver (\n", " property_id STRING,\n", " transaction_date DATE,\n", " property_type STRING,\n", " sale_price DECIMAL(12,2),\n", " location STRING,\n", " days_on_market INT,\n", " price_per_sqft DECIMAL(8,2),\n", " month INT,\n", " quarter INT,\n", " day_of_week INT,\n", " is_spring_summer BOOLEAN,\n", " is_winter BOOLEAN,\n", " market_speed STRING,\n", " is_valid BOOLEAN,\n", " data_quality_score DOUBLE,\n", " processed_timestamp TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (property_id, transaction_date)\n", "\"\"\")\n", "\n", "print(\"Silver layer table created successfully!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Read 11339 records from Bronze layer\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "After validation: 11339 valid records\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Filtered out 0 invalid records\n", "\n", "Sample Silver Layer Data:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+----------------+-------------+----------+--------------+------------+--------+------------------+\n", "|property_id|transaction_date|property_type|sale_price|days_on_market|market_speed|is_valid|data_quality_score|\n", "+-----------+----------------+-------------+----------+--------------+------------+--------+------------------+\n", "| PROP002173| 2024-04-18| Townhouse| 763922.25| 18| fast| true| 0.8|\n", "| PROP002173| 2024-09-17| Townhouse| 598718.9| 62| slow| true| 0.8|\n", "| PROP002173| 2024-01-28| Townhouse| 542460.75| 68| slow| true| 0.8|\n", "| PROP002174| 2024-03-17| Townhouse| 929576.63| 11| fast| true| 0.8|\n", "| PROP002174| 2024-09-20| Townhouse| 760890.12| 57| normal| true| 0.8|\n", "+-----------+----------------+-------------+----------+--------------+------------+--------+------------------+\n", "only showing top 5 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Process Bronze data to Silver layer\n", "\n", "from pyspark.sql.functions import col, when, month, quarter, dayofweek, lit\n", "\n", "# Read from Bronze layer\n", "bronze_df = spark.table(\"real_estate.bronze.property_transactions_bronze\")\n", "\n", "print(f\"Read {bronze_df.count()} records from Bronze layer\")\n", "\n", "# Data validation and cleaning\n", "silver_df = bronze_df \\\n", " .withColumn(\"month\", month(col(\"transaction_date\"))) \\\n", " .withColumn(\"quarter\", quarter(col(\"transaction_date\"))) \\\n", " .withColumn(\"day_of_week\", dayofweek(col(\"transaction_date\"))) \\\n", " .withColumn(\"is_spring_summer\", when(col(\"month\").isin([3, 4, 5, 6]), True).otherwise(False)) \\\n", " .withColumn(\"is_winter\", when(col(\"month\").isin([11, 12, 1, 2]), True).otherwise(False)) \\\n", " .withColumn(\"market_speed\", \n", " when(col(\"days_on_market\") <= 30, \"fast\")\n", " .when(col(\"days_on_market\") <= 60, \"normal\")\n", " .when(col(\"days_on_market\") <= 90, \"slow\")\n", " .otherwise(\"very_slow\")) \\\n", " .withColumn(\"is_valid\", \n", " when((col(\"sale_price\").isNotNull()) & \n", " (col(\"property_id\").isNotNull()) & \n", " (col(\"transaction_date\").isNotNull()) &\n", " (col(\"sale_price\") > 0), True).otherwise(False)) \\\n", " .withColumn(\"data_quality_score\", \n", " when(col(\"is_valid\"), lit(0.8)).otherwise(0.0))\n", "\n", "# Filter out invalid records\n", "valid_silver_df = silver_df.filter(col(\"is_valid\") == True)\n", "\n", "print(f\"After validation: {valid_silver_df.count()} valid records\")\n", "print(f\"Filtered out {bronze_df.count() - valid_silver_df.count()} invalid records\")\n", "\n", "# Show sample cleaned data\n", "print(\"\\nSample Silver Layer Data:\")\n", "valid_silver_df.select(\"property_id\", \"transaction_date\", \"property_type\", \"sale_price\", \"days_on_market\", \"market_speed\", \"is_valid\", \"data_quality_score\").show(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Successfully inserted 11339 cleaned records into Silver layer\n", "Data is now validated, enriched, and ready for Gold layer analytics.\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Insert cleaned data into Silver layer\n", "\n", "valid_silver_df.write.mode(\"overwrite\").saveAsTable(\"real_estate.silver.property_transactions_silver\")\n", "\n", "print(f\"Successfully inserted {valid_silver_df.count()} cleaned records into Silver layer\")\n", "print(\"Data is now validated, enriched, and ready for Gold layer analytics.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Gold Layer: Analytics and ML-Ready Data\n", "\n", "### Gold Layer Design\n", "\n", "The gold layer provides business-ready analytics and ML features. We'll create:\n", "\n", "- Aggregated analytics tables\n", "- ML-ready feature engineering\n", "- Price prediction model training\n", "\n", "### Tables in Gold Layer\n", "\n", "- `property_analytics_gold`: Aggregated property metrics\n", "- `price_prediction_model_gold`: ML-ready features for price prediction" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Gold layer analytics table created successfully!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create Gold Layer Analytics Table\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS real_estate.gold.property_analytics_gold (\n", " property_id STRING,\n", " property_type STRING,\n", " location STRING,\n", " total_transactions BIGINT,\n", " min_sale_price DECIMAL(12,2),\n", " max_sale_price DECIMAL(12,2),\n", " avg_sale_price DECIMAL(12,2),\n", " avg_price_per_sqft DECIMAL(8,2),\n", " avg_days_on_market DOUBLE,\n", " market_speed_distribution MAP,\n", " transaction_months ARRAY,\n", " created_timestamp TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (property_id)\n", "\"\"\")\n", "\n", "print(\"Gold layer analytics table created successfully!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Gold layer ML features table created successfully!\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Create Gold Layer ML Features Table\n", "\n", "spark.sql(\"\"\"\n", "CREATE TABLE IF NOT EXISTS real_estate.gold.price_prediction_model_gold (\n", " property_id STRING,\n", " transaction_date DATE,\n", " property_type STRING,\n", " sale_price DECIMAL(12,2),\n", " location STRING,\n", " days_on_market INT,\n", " price_per_sqft DECIMAL(8,2),\n", " month INT,\n", " quarter INT,\n", " day_of_week INT,\n", " is_spring_summer BOOLEAN,\n", " is_winter BOOLEAN,\n", " market_speed STRING,\n", " created_timestamp TIMESTAMP\n", ")\n", "USING DELTA\n", "CLUSTER BY (property_id, transaction_date)\n", "\"\"\")\n", "\n", "print(\"Gold layer ML features table created successfully!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Generated 8000 property analytics records\n", "\n", "Sample Gold Analytics:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+-------------+--------------------+------------------+--------------+------------------+\n", "|property_id|property_type| location|total_transactions|avg_sale_price|avg_price_per_sqft|\n", "+-----------+-------------+--------------------+------------------+--------------+------------------+\n", "| PROP002398| Commercial| Downtown| 1| 3736670.79| 450.69|\n", "| PROP002487| Condo| Waterfront| 1| 754224.73| 565.81|\n", "| PROP002837| Apartment| Suburban| 1| 305504.96| 242.08|\n", "| PROP003367| Condo|Residential District| 1| 457692.34| 238.63|\n", "| PROP003859|Single Family| Mountain View| 1| 585933.04| 224.84|\n", "+-----------+-------------+--------------------+------------------+--------------+------------------+\n", "only showing top 5 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Generate Gold Layer Analytics from Silver Data\n", "\n", "from pyspark.sql.functions import collect_list, map_from_entries, struct, min, max, avg, count, array_distinct\n", "\n", "# Read Silver data\n", "silver_df = spark.table(\"real_estate.silver.property_transactions_silver\")\n", "\n", "# Create property-level aggregations\n", "property_analytics = silver_df \\\n", " .groupBy(\"property_id\", \"property_type\", \"location\") \\\n", " .agg(\n", " count(\"*\").alias(\"total_transactions\"),\n", " min(\"sale_price\").alias(\"min_sale_price\"),\n", " max(\"sale_price\").alias(\"max_sale_price\"),\n", " avg(\"sale_price\").alias(\"avg_sale_price\"),\n", " avg(\"price_per_sqft\").alias(\"avg_price_per_sqft\"),\n", " avg(\"days_on_market\").alias(\"avg_days_on_market\")\n", " )\n", "\n", "# Add market speed distribution\n", "speed_dist = silver_df \\\n", " .groupBy(\"property_id\", \"market_speed\") \\\n", " .agg(count(\"*\").alias(\"count\")) \\\n", " .groupBy(\"property_id\") \\\n", " .agg(collect_list(struct(\"market_speed\", \"count\")).alias(\"speed_distribution_list\"))\n", "\n", "# Add transaction months\n", "month_dist = silver_df \\\n", " .groupBy(\"property_id\") \\\n", " .agg(collect_list(\"month\").alias(\"transaction_months_raw\")) \\\n", " .withColumn(\"transaction_months\", array_distinct(col(\"transaction_months_raw\"))) \\\n", " .drop(\"transaction_months_raw\")\n", "\n", "# Join aggregations\n", "gold_analytics = property_analytics \\\n", " .join(speed_dist, \"property_id\", \"left\") \\\n", " .join(month_dist, \"property_id\", \"left\") \\\n", " .withColumn(\"market_speed_distribution\", map_from_entries(col(\"speed_distribution_list\"))) \\\n", " .drop(\"speed_distribution_list\")\n", "\n", "print(f\"Generated {gold_analytics.count()} property analytics records\")\n", "print(\"\\nSample Gold Analytics:\")\n", "gold_analytics.select(\"property_id\", \"property_type\", \"location\", \"total_transactions\", \"avg_sale_price\", \"avg_price_per_sqft\").show(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Successfully inserted 8000 analytics records into Gold layer\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Insert analytics into Gold layer\n", "\n", "gold_analytics.write.mode(\"overwrite\").saveAsTable(\"real_estate.gold.property_analytics_gold\")\n", "\n", "print(f\"Successfully inserted {gold_analytics.count()} analytics records into Gold layer\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Prepared 11339 records for ML feature engineering\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+----------------+-------------+----------+--------------------+--------------+--------------+-----+-------+-----------+----------------+---------+------------+\n", "|property_id|transaction_date|property_type|sale_price| location|days_on_market|price_per_sqft|month|quarter|day_of_week|is_spring_summer|is_winter|market_speed|\n", "+-----------+----------------+-------------+----------+--------------------+--------------+--------------+-----+-------+-----------+----------------+---------+------------+\n", "| PROP002173| 2024-04-18| Townhouse| 763922.25|Residential District| 18| 286.65| 4| 2| 5| true| false| fast|\n", "| PROP002173| 2024-09-17| Townhouse| 598718.9|Residential District| 62| 224.66| 9| 3| 3| false| false| slow|\n", "| PROP002173| 2024-01-28| Townhouse| 542460.75|Residential District| 68| 203.55| 1| 1| 1| false| true| slow|\n", "| PROP002174| 2024-03-17| Townhouse| 929576.63| Urban Core| 11| 503.29| 3| 1| 1| true| false| fast|\n", "| PROP002174| 2024-09-20| Townhouse| 760890.12| Urban Core| 57| 411.96| 9| 3| 6| false| false| normal|\n", "+-----------+----------------+-------------+----------+--------------------+--------------+--------------+-----+-------+-----------+----------------+---------+------------+\n", "only showing top 5 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Prepare ML Features for Gold Layer\n", "\n", "# Read Silver data for ML\n", "ml_data = silver_df.select(\n", " \"property_id\", \"transaction_date\", \"property_type\", \"sale_price\", \n", " \"location\", \"days_on_market\", \"price_per_sqft\", \"month\", \n", " \"quarter\", \"day_of_week\", \"is_spring_summer\", \"is_winter\", \"market_speed\"\n", ")\n", "\n", "print(f\"Prepared {ml_data.count()} records for ML feature engineering\")\n", "ml_data.show(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Successfully inserted 11339 ML-ready records into Gold layer\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Insert into Gold layer\n", "\n", "ml_data.write.mode(\"overwrite\").saveAsTable(\"real_estate.gold.price_prediction_model_gold\")\n", "\n", "print(f\"Successfully inserted {ml_data.count()} ML-ready records into Gold layer\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Gold Layer: ML Model Training and Evaluation\n", "\n", "### Price Prediction Model\n", "\n", "Now we'll train a Random Forest regression model using the ML-ready data from the Gold layer to predict property sale prices." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Loaded 11339 records from Gold layer for ML training\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+----------------+-------------+----------+----------+--------------+--------------+-----+-------+-----------+----------------+---------+------------+\n", "|property_id|transaction_date|property_type|sale_price| location|days_on_market|price_per_sqft|month|quarter|day_of_week|is_spring_summer|is_winter|market_speed|\n", "+-----------+----------------+-------------+----------+----------+--------------+--------------+-----+-------+-----------+----------------+---------+------------+\n", "| PROP000001| 2024-05-17|Single Family|1313507.88|Waterfront| 61| 409.32| 5| 2| 6| true| false| slow|\n", "| PROP000002| 2024-11-08| Townhouse| 858576.15|Urban Core| 54| 359.99| 11| 4| 6| false| true| normal|\n", "| PROP000003| 2024-01-27| Apartment| 663870.55|Waterfront| 21| 441.11| 1| 1| 7| false| true| fast|\n", "| PROP000004| 2024-01-20| Townhouse| 648077.43|Urban Core| 66| 400.79| 1| 1| 7| false| true| slow|\n", "| PROP000004| 2024-11-30| Townhouse| 556959.48|Urban Core| 68| 344.44| 11| 4| 7| false| true| slow|\n", "+-----------+----------------+-------------+----------+----------+--------------+--------------+-----+-------+-----------+----------------+---------+------------+\n", "only showing top 5 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Load Gold layer ML data for training\n", "\n", "from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler\n", "from pyspark.ml.regression import RandomForestRegressor\n", "from pyspark.ml.evaluation import RegressionEvaluator\n", "from pyspark.ml import Pipeline\n", "import pyspark.sql.functions as F\n", "\n", "# Load data from Gold layer\n", "gold_ml_data = spark.table(\"real_estate.gold.price_prediction_model_gold\")\n", "\n", "print(f\"Loaded {gold_ml_data.count()} records from Gold layer for ML training\")\n", "gold_ml_data.show(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Training set: 9154 records\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Test set: 2185 records\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Feature engineering pipeline\n", "\n", "# Create indexers for categorical variables\n", "property_type_indexer = StringIndexer(inputCol=\"property_type\", outputCol=\"property_type_index\")\n", "location_indexer = StringIndexer(inputCol=\"location\", outputCol=\"location_index\")\n", "market_speed_indexer = StringIndexer(inputCol=\"market_speed\", outputCol=\"market_speed_index\")\n", "\n", "# Assemble features\n", "assembler = VectorAssembler(\n", " inputCols=[\"days_on_market\", \"price_per_sqft\", \"month\", \"quarter\", \"day_of_week\", \n", " \"property_type_index\", \"location_index\", \"market_speed_index\",\n", " \"is_spring_summer\", \"is_winter\"],\n", " outputCol=\"features\"\n", ")\n", "\n", "# Scale features\n", "scaler = StandardScaler(inputCol=\"features\", outputCol=\"scaled_features\")\n", "\n", "# Create and train the model\n", "rf = RandomForestRegressor(\n", " labelCol=\"sale_price\", \n", " featuresCol=\"scaled_features\",\n", " numTrees=100,\n", " maxDepth=10\n", ")\n", "\n", "# Create pipeline\n", "pipeline = Pipeline(stages=[property_type_indexer, location_indexer, market_speed_indexer, assembler, scaler, rf])\n", "\n", "# Split data\n", "train_data, test_data = gold_ml_data.randomSplit([0.8, 0.2], seed=42)\n", "\n", "print(f\"Training set: {train_data.count()} records\")\n", "print(f\"Test set: {test_data.count()} records\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Training property price prediction model...\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Model RMSE: $358,913.39\n", "Model R²: 0.8300\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+-------------+--------------------+----------+------------------+\n", "|property_id|property_type| location|sale_price| prediction|\n", "+-----------+-------------+--------------------+----------+------------------+\n", "| PROP002173| Townhouse|Residential District| 598718.9| 589832.7164126843|\n", "| PROP002175| Apartment| Waterfront| 456298.92| 754067.7592078398|\n", "| PROP002176| Townhouse| Urban Core| 565739.2| 771930.9673399623|\n", "| PROP002180|Single Family| Urban Core|1122442.56|1025470.5435646757|\n", "| PROP002183| Townhouse| Suburban| 685427.28| 559209.2103593717|\n", "| PROP002186| Apartment| Downtown| 569329.92| 467594.5879715044|\n", "| PROP002189| Apartment| Urban Core| 497292.66|472475.34698397503|\n", "| PROP002195| Condo| Waterfront| 740775.6| 662730.9836915237|\n", "| PROP002200| Condo| Suburban| 401808.64| 407720.5569539201|\n", "| PROP002201| Commercial| Waterfront|2204526.96| 3851687.080083347|\n", "+-----------+-------------+--------------------+----------+------------------+\n", "only showing top 10 rows\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Train the price prediction model\n", "\n", "print(\"Training property price prediction model...\")\n", "model = pipeline.fit(train_data)\n", "\n", "# Make predictions\n", "predictions = model.transform(test_data)\n", "\n", "# Evaluate the model\n", "evaluator = RegressionEvaluator(labelCol=\"sale_price\", predictionCol=\"prediction\", metricName=\"rmse\")\n", "rmse = evaluator.evaluate(predictions)\n", "\n", "evaluator_r2 = RegressionEvaluator(labelCol=\"sale_price\", predictionCol=\"prediction\", metricName=\"r2\")\n", "r2 = evaluator_r2.evaluate(predictions)\n", "\n", "print(f\"Model RMSE: ${rmse:,.2f}\")\n", "print(f\"Model R²: {r2:.4f}\")\n", "\n", "# Show prediction results\n", "predictions.select(\"property_id\", \"property_type\", \"location\", \"sale_price\", \"prediction\").show(10)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\n", "=== Feature Importance for Price Prediction ===\n", "days_on_market: 0.1411\n", "price_per_sqft: 0.1700\n", "month: 0.0158\n", "quarter: 0.0051\n", "day_of_week: 0.0172\n", "property_type: 0.4074\n", "location: 0.0492\n", "market_speed: 0.1839\n", "is_spring_summer: 0.0058\n", "is_winter: 0.0045\n", "\n", "=== Business Impact Analysis ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "Average prediction error: $213,288\n", "Average prediction error percentage: 22.17%\n", "Median prediction error percentage: 17.96%\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\n", "Estimated value of 1% price optimization: $21,376,477\n", "\n", "=== Seasonal Prediction Performance ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+----------------+------------------+-----------------+\n", "|is_spring_summer| avg_error_pct|transaction_count|\n", "+----------------+------------------+-----------------+\n", "| false|22.316770187327403| 1456|\n", "| true|21.868353999311417| 729|\n", "+----------------+------------------+-----------------+\n", "\n", "\n", "=== Property Type Prediction Performance ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-------------+------------------+-----------------+\n", "|property_type| avg_error_pct|transaction_count|\n", "+-------------+------------------+-----------------+\n", "|Single Family|16.518508570487484| 445|\n", "| Townhouse| 19.36782164357186| 455|\n", "| Condo| 22.24487225971695| 442|\n", "| Apartment|22.574195821436938| 422|\n", "| Commercial|30.673653494330907| 421|\n", "+-------------+------------------+-----------------+\n", "\n", "\n", "=== Location Prediction Performance ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+--------------------+------------------+-----------------+\n", "| location| avg_error_pct|transaction_count|\n", "+--------------------+------------------+-----------------+\n", "|Residential District|20.978593517531923| 352|\n", "| Waterfront|21.378073549031242| 373|\n", "| Downtown|21.873278961979548| 366|\n", "| Urban Core|22.288514105850624| 371|\n", "| Suburban|22.806133551135137| 388|\n", "| Mountain View|23.741261059974047| 335|\n", "+--------------------+------------------+-----------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Model evaluation and business insights\n", "\n", "# Feature importance\n", "rf_model = model.stages[-1]\n", "feature_importance = rf_model.featureImportances\n", "feature_names = [\"days_on_market\", \"price_per_sqft\", \"month\", \"quarter\", \"day_of_week\", \n", " \"property_type\", \"location\", \"market_speed\", \"is_spring_summer\", \"is_winter\"]\n", "\n", "print(\"\\n=== Feature Importance for Price Prediction ===\")\n", "for name, importance in zip(feature_names, feature_importance):\n", " print(f\"{name}: {importance:.4f}\")\n", "\n", "# Business impact analysis\n", "print(\"\\n=== Business Impact Analysis ===\")\n", "\n", "# Calculate prediction accuracy metrics\n", "predictions_with_accuracy = predictions.withColumn(\n", " \"prediction_error\", \n", " F.abs(F.col(\"sale_price\") - F.col(\"prediction\"))\n", ").withColumn(\n", " \"prediction_error_pct\", \n", " F.abs(F.col(\"sale_price\") - F.col(\"prediction\")) / F.col(\"sale_price\") * 100\n", ")\n", "\n", "avg_prediction_error = predictions_with_accuracy.agg(F.avg(\"prediction_error\")).collect()[0][0]\n", "avg_prediction_error_pct = predictions_with_accuracy.agg(F.avg(\"prediction_error_pct\")).collect()[0][0]\n", "median_error_pct = predictions_with_accuracy.approxQuantile(\"prediction_error_pct\", [0.5], 0.01)[0]\n", "\n", "print(f\"Average prediction error: ${avg_prediction_error:,.0f}\")\n", "print(f\"Average prediction error percentage: {avg_prediction_error_pct:.2f}%\")\n", "print(f\"Median prediction error percentage: {median_error_pct:.2f}%\")\n", "\n", "# Calculate potential value for pricing optimization\n", "total_test_properties = test_data.count()\n", "avg_property_value = test_data.agg(F.avg(\"sale_price\")).collect()[0][0]\n", "\n", "# Estimate potential value of better pricing (assuming 1% improvement in sale price)\n", "price_optimization_value = total_test_properties * avg_property_value * 0.01\n", "\n", "print(f\"\\nEstimated value of 1% price optimization: ${price_optimization_value:,.0f}\")\n", "\n", "# Market timing insights\n", "seasonal_performance = predictions_with_accuracy.groupBy(\"is_spring_summer\").agg(\n", " F.avg(\"prediction_error_pct\").alias(\"avg_error_pct\"),\n", " F.count(\"*\").alias(\"transaction_count\")\n", ").orderBy(\"is_spring_summer\")\n", "\n", "print(\"\\n=== Seasonal Prediction Performance ===\")\n", "seasonal_performance.show()\n", "\n", "# Property type performance\n", "property_type_performance = predictions_with_accuracy.groupBy(\"property_type\").agg(\n", " F.avg(\"prediction_error_pct\").alias(\"avg_error_pct\"),\n", " F.count(\"*\").alias(\"transaction_count\")\n", ").orderBy(\"avg_error_pct\")\n", "\n", "print(\"\\n=== Property Type Prediction Performance ===\")\n", "property_type_performance.show()\n", "\n", "# Location performance\n", "location_performance = predictions_with_accuracy.groupBy(\"location\").agg(\n", " F.avg(\"prediction_error_pct\").alias(\"avg_error_pct\"),\n", " F.count(\"*\").alias(\"transaction_count\")\n", ").orderBy(\"avg_error_pct\")\n", "\n", "print(\"\\n=== Location Prediction Performance ===\")\n", "location_performance.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Query Examples Across Medallion Layers\n", "\n", "### Bronze Layer Queries\n", "\n", "Raw data access for audit and debugging" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Bronze Layer: Raw Property Transaction Data ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+----------------+-------------+----------+----------+\n", "|property_id|transaction_date|property_type|sale_price| location|\n", "+-----------+----------------+-------------+----------+----------+\n", "| PROP000001| 2024-05-17|Single Family|1313507.88|Waterfront|\n", "+-----------+----------------+-------------+----------+----------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Bronze Layer: Raw transaction data queries\n", "\n", "print(\"=== Bronze Layer: Raw Property Transaction Data ===\")\n", "bronze_sample = spark.sql(\"\"\"\n", "SELECT property_id, transaction_date, property_type, sale_price, location\n", "FROM real_estate.bronze.property_transactions_bronze\n", "WHERE property_id = 'PROP000001'\n", "ORDER BY transaction_date DESC\n", "LIMIT 5\n", "\"\"\")\n", "bronze_sample.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Silver Layer: Validated Property Transaction Data ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+----------------+-------------+----------+--------------+------------+--------+------------------+\n", "|property_id|transaction_date|property_type|sale_price|days_on_market|market_speed|is_valid|data_quality_score|\n", "+-----------+----------------+-------------+----------+--------------+------------+--------+------------------+\n", "| PROP000001| 2024-05-17|Single Family|1313507.88| 61| slow| true| 0.8|\n", "+-----------+----------------+-------------+----------+--------------+------------+--------+------------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Silver Layer: Cleaned data queries\n", "\n", "print(\"=== Silver Layer: Validated Property Transaction Data ===\")\n", "silver_sample = spark.sql(\"\"\"\n", "SELECT property_id, transaction_date, property_type, sale_price, days_on_market, market_speed, is_valid, data_quality_score\n", "FROM real_estate.silver.property_transactions_silver\n", "WHERE property_id = 'PROP000001' AND is_valid = true\n", "ORDER BY transaction_date DESC\n", "LIMIT 5\n", "\"\"\")\n", "silver_sample.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "=== Gold Layer: Property Analytics ===\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "+-----------+-------------+----------+------------------+--------------+------------------+------------------+\n", "|property_id|property_type| location|total_transactions|avg_sale_price|avg_price_per_sqft|avg_days_on_market|\n", "+-----------+-------------+----------+------------------+--------------+------------------+------------------+\n", "| PROP000992| Commercial|Waterfront| 1| 5247559.45| 507.55| 78.0|\n", "| PROP000370| Commercial|Waterfront| 1| 4979861.68| 528.76| 105.0|\n", "| PROP000352| Commercial|Waterfront| 1| 4889264.7| 464.98| 38.0|\n", "| PROP000148| Commercial|Waterfront| 1| 4863300.0| 543.75| 111.0|\n", "| PROP000696| Commercial|Waterfront| 1| 4814185.84| 414.73| 147.0|\n", "+-----------+-------------+----------+------------------+--------------+------------------+------------------+\n", "\n" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Gold Layer: Analytics queries\n", "\n", "print(\"=== Gold Layer: Property Analytics ===\")\n", "gold_sample = spark.sql(\"\"\"\n", "SELECT property_id, property_type, location, total_transactions, avg_sale_price, avg_price_per_sqft, avg_days_on_market\n", "FROM real_estate.gold.property_analytics_gold\n", "WHERE property_id LIKE 'PROP000%'\n", "ORDER BY avg_sale_price DESC\n", "LIMIT 5\n", "\"\"\")\n", "gold_sample.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Key Takeaways: Medallion Architecture in AIDP for Real Estate\n", "\n", "### What We Demonstrated\n", "\n", "1. **Bronze Layer**: Raw property transaction data ingestion with Delta liquid clustering\n", "2. **Silver Layer**: Data validation, cleaning, and market feature enrichment\n", "3. **Gold Layer**: Property analytics aggregation and ML model training for price prediction\n", "4. **End-to-End Pipeline**: Complete medallion architecture in a single notebook\n", "\n", "### AIDP Advantages\n", "\n", "- **Unified Platform**: Seamless data flow between layers\n", "- **Governance**: Catalog and schema isolation\n", "- **Performance**: Optimized with liquid clustering\n", "- **ML Integration**: Built-in ML capabilities for price prediction\n", "\n", "### Business Benefits for Real Estate\n", "\n", "1. **Data Quality**: Progressive improvement through layers\n", "2. **Price Prediction**: AI-driven property valuation\n", "3. **Market Intelligence**: Data-driven pricing strategies\n", "4. **Investment Decisions**: Analytics for property performance\n", "5. **Governance**: Audit trails and data lineage\n", "\n", "### Best Practices\n", "\n", "1. **Layer Isolation**: Keep raw data separate from processed data\n", "2. **Incremental Processing**: Build upon validated foundations\n", "3. **Business Alignment**: Gold layer matches business needs\n", "4. **Performance Optimization**: Use clustering strategically\n", "5. **ML Integration**: Include predictive analytics in gold layer\n", "\n", "This notebook demonstrates how Oracle AI Data Platform enables sophisticated real estate analytics with proper data architecture, governance, and ML-powered price prediction." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" } }, "nbformat": 4, "nbformat_minor": 4 }