import pandas as pd import numpy as np from hmmlearn import hmm from sklearn.preprocessing import StandardScaler import matplotlib.pyplot as plt from datetime import datetime # ============================================================================ # FEATURE ENGINEERING (Meso-Scale Microstructure) # ============================================================================ def calculate_garman_klass(df): """Computes Garman-Klass Volatility Proxy.""" log_hl = np.log(df['high'] / df['low']) log_co = np.log(df['close'] / df['open']) gk_var = 0.5 * (log_hl ** 2) - (2 * np.log(2) - 1) * (log_co ** 2) return np.sqrt(gk_var) def calculate_hurst(series, window=100): """Estimates the Hurst Exponent (Rolling Fractal Dimension).""" if len(series) < window: return 0.5 lags = range(2, 20) tau = [np.sqrt(np.std(np.subtract(series[lag:], series[:-lag]))) for lag in lags] poly = np.polyfit(np.log(lags), np.log(tau), 1) return poly[0] * 2.0 def get_regime_features(df): """Processes features for HMM state identification.""" df = df.copy() df['gk_vol'] = calculate_garman_klass(df) df['log_ret'] = np.log(df['close'] / df['close'].shift(1)) # Rolling Hurst (computationally expensive, use optimized window) # For speed in backtest, we can use a simpler proxy or smaller window df['hurst'] = df['close'].rolling(window=100).apply(lambda x: calculate_hurst(x), raw=True) return df.dropna() # ============================================================================ # HMM REGIME SELECTOR # ============================================================================ def train_hmm(df_train, n_states=3): """Trains a Gaussian HMM on microstructure features and sorts states by volatility.""" features = ['gk_vol', 'hurst', 'log_ret'] X = df_train[features].values scaler = StandardScaler() X_scaled = scaler.fit_transform(X) model = hmm.GaussianHMM(n_components=n_states, covariance_type="full", n_iter=200, random_state=42) model.fit(X_scaled) # AGENT B: Map states by volatility (State 0 = Lowest Vol, State 2 = Highest Vol) # This ensures consistency across training runs state_vols = model.means_[:, 0] # Mean of 'gk_vol' for each state sorted_state_indices = np.argsort(state_vols) # Calculate Circuit Breaker Threshold (99th percentile of training volatility) vol_threshold = np.percentile(df_train['gk_vol'], 99) return model, scaler, sorted_state_indices, vol_threshold def predict_regimes(model, scaler, sorted_indices, df): """Applies trained HMM and maps regimes to standardized IDs.""" X = df[['gk_vol', 'hurst', 'log_ret']].values X_scaled = scaler.transform(X) raw_regimes = model.predict(X_scaled) # Map raw HMM labels to our sorted IDs (0=Safe, 1=Neutral, 2=Crisis) mapping = {raw: sorted for sorted, raw in enumerate(sorted_indices)} df['regime'] = [mapping[r] for r in raw_regimes] return df # ============================================================================ # BACKTEST LOGIC (Reactive Survival) # ============================================================================ def backtest_vebb(df, is_adaptive=True, initial_capital=10000, vol_threshold=999): """ Executes VEBB Backtest with Transaction Costs (0.05%). """ capital = initial_capital position = 0 trade_log = [] commission = 0.0005 # 0.05% per trade (Binance Futures Standard) bb_length = 20 df['sma'] = df['close'].rolling(window=bb_length).mean() df['std'] = df['close'].rolling(window=bb_length).std() df['upper'] = df['sma'] + (2.0 * df['std']) df['lower'] = df['sma'] - (2.0 * df['std']) df['vol_sma'] = df['volume'].rolling(window=bb_length).mean() df['vol_zscore'] = (df['volume'] - df['vol_sma']) / df['volume'].rolling(window=bb_length).std() for i in range(bb_length, len(df)): row = df.iloc[i] active_regime = row['regime'] if is_adaptive else 0 # Static assumes "Safe" # AGENT B CIRCUIT BREAKER: Extreme volatility -> Sit in Cash if is_adaptive and row['gk_vol'] > vol_threshold: if position != 0: cost = abs(position * row['close'] * commission) capital += (position * row['close']) - cost trade_log.append({'ts': row['ts'], 'type': 'CIRCUIT_BREAKER', 'price': row['close'], 'capital': capital}) position = 0 continue # Adaptive logic: Only trade in State 0 (Lowest Vol) if is_adaptive and active_regime != 0: if position != 0: # Force close if we leave safe regime cost = abs(position * row['close'] * commission) capital += (position * row['close']) - cost trade_log.append({'ts': row['ts'], 'type': 'REGIME_EXIT', 'price': row['close'], 'capital': capital}) position = 0 continue if position == 0: # Long Entry if row['low'] <= row['lower'] and row['vol_zscore'] > 2.0: qty = (capital * 0.1) / row['close'] cost = qty * row['close'] * commission capital -= (qty * row['close']) + cost position = qty trade_log.append({'ts': row['ts'], 'type': 'BUY', 'price': row['close'], 'capital': capital}) # Short Entry elif row['high'] >= row['upper'] and row['vol_zscore'] > 2.0: qty = (capital * 0.1) / row['close'] cost = qty * row['close'] * commission capital += (qty * row['close']) - cost position = -qty trade_log.append({'ts': row['ts'], 'type': 'SELL', 'price': row['close'], 'capital': capital}) else: # Exit at Mean if (position > 0 and row['close'] >= row['sma']) or (position < 0 and row['close'] <= row['sma']): cost = abs(position * row['close'] * commission) capital += (position * row['close']) - cost trade_log.append({'ts': row['ts'], 'type': 'EXIT_MEAN', 'price': row['close'], 'capital': capital}) position = 0 return capital, trade_log return capital, trade_log # ============================================================================ # MAIN ORCHESTRATION # ============================================================================ if __name__ == "__main__": print("🚀 Initiating Total Structural Backtest...") # 1. Load Ground Truth df_15m = pd.read_csv('Data/15m/BTC_15m_Clean.csv', parse_dates=['ts']) df_5m = pd.read_csv('Data/5m/BTC_5m_Clean.csv', parse_dates=['ts']) # 2. Train HMM on 2023-2024 longitudinal data pivot_date = '2023-01-01' test_start = '2025-01-01' df_train = get_regime_features(df_15m[(df_15m['ts'] >= pivot_date) & (df_15m['ts'] < test_start)]) print(f"Training HMM on Meso-Scale 15m data ({len(df_train)} bars)...") hmm_model, scaler, sorted_indices, vol_threshold = train_hmm(df_train) # 3. Stress Test: 2025 Out-of-Sample df_test_15m = get_regime_features(df_15m[df_15m['ts'] >= test_start]) df_test_15m = predict_regimes(hmm_model, scaler, sorted_indices, df_test_15m) # Run Adaptive 15m print("Running Adaptive 15m (Meso-Scale) Backtest...") final_15m, logs_15m = backtest_vebb(df_test_15m, is_adaptive=True, vol_threshold=vol_threshold) # Run Static 5m Control Group print("Running Static 5m (High-Frequency) Stress Test...") df_test_5m = get_regime_features(df_5m) # Give it valid column names for the backtester df_test_5m['regime'] = 0 # Assume safe for static baseline final_5m, logs_5m = backtest_vebb(df_test_5m, is_adaptive=False) # 4. Results print("\n--- PERFORMANCE SUMMARY ---") print(f"Static 5m baseline final capital: ${final_5m:,.2f} (Trades: {len(logs_5m)})") print(f"Adaptive 15m final capital: ${final_15m:,.2f} (Trades: {len(logs_15m)})") # Calculate Drawdown for thesis def get_mdd(logs): if not logs: return 0 cap_curve = [l['capital'] for l in logs] running_max = np.maximum.accumulate(cap_curve) dd = (cap_curve - running_max) / running_max return np.min(dd) if len(dd) > 0 else 0 print(f"Static 5m Max Drawdown: {get_mdd(logs_5m):.2%}") print(f"Adaptive 15m Max Drawdown: {get_mdd(logs_15m):.2%}") # Export for Thesis Tables results_summary = { 'Scenario': ['Static 5m', 'Adaptive 15m'], 'Final Capital': [final_5m, final_15m], 'Trade Count': [len(logs_5m), len(logs_15m)], 'Max Drawdown': [get_mdd(logs_5m), get_mdd(logs_15m)] } pd.DataFrame(results_summary).to_csv('thesis_backtest_results.csv', index=False) print("\nData exported to thesis_backtest_results.csv")