import pandas as pd import numpy as np import matplotlib.pylab as plt # # Loading Data # #List all your parquet files raw_data = { '1d': '1d.parquet', '1h': '1h.parquet', '15m': '15m.parquet', '5m': '5m.parquet', } # loading data into seperate loading_data={} for key,value in raw_data.items(): loading_data[key]=pd.read_parquet(value) #print(loading_data['1d'].index) #getting names of all coin coins= loading_data['1d'].index.get_level_values('symbol').unique() #print(coins) # splitting data coin wise split_data={} for key,value in loading_data.items(): split_data[key]={} for coin in coins: split_data[key][coin]=value.xs(coin,level='symbol') #for coin in coins: # print(split_data['1h'][coin].shape) # choosing starting date start_date=[] for coin in coins: start_date_coin=split_data['1d'][coin].index.min() start_date.append(start_date_coin) common_date=max(start_date) for key,value in split_data.items(): for coin in split_data[key]: split_data[key][coin]=split_data[key][coin].loc[split_data[key][coin].index>=common_date] # converting data into ascending order split_data[key][coin] = split_data[key][coin].loc[...].sort_index() #for coin in coins: # print(split_data['1d'][coin].shape) # it gives data regaing the candel for 5m ,15m and 1h def microstructure(df): df=df.copy() high=df['high'] low=df['low'] op=df['open'] close=df['close'] total_range= high-low df['upper_wick_ratio']=np.where(total_range==0,0,(high-np.maximum(op, close))/total_range) df['lower_wick_ratio']=np.where(total_range==0,0,(np.minimum(op, close)-low)/total_range) df['body_ratio']=np.where(total_range==0,0,abs(close-op)/total_range) df['close_location_value']=np.where(total_range==0,0.5,(close-low)/total_range) return df #print(split_data['1h']['BTCUSDT']) #adds microstructure data to split data for time in ['1h','15m','5m']: for coin in coins: split_data[time][coin]=microstructure(split_data[time][coin]) #print(split_data['15m']['BTCUSDT']) # it computes ATR def compute_ATR(df): df=df.copy() high=df['high'] low=df['low'] previous_close=df['close'].shift(1) value_1=high-low value_2=abs(high-previous_close) value_3=abs(low-previous_close) true_range=np.maximum(value_1,np.maximum(value_2,value_3)) true_range_shift=true_range.shift(1) df['ATR']=true_range_shift.rolling(14).mean() # using shift so todays range does not come into play shifted_ATR=df['ATR'].shift(1) df['ATR_baseline']=shifted_ATR.rolling(20).mean() return df #print(compute_ATR(split_data['1h']['BTCUSDT']).head(20)) # adds ATR to split data for coin in coins: split_data['1h'][coin]=compute_ATR(split_data['1h'][coin]) #print(split_data['1h']['BTCUSDT']) # computes volume ratio def compute_volume_ratio(df): df=df.copy() volume=df['volume'] histroy_volume=df['volume'].shift(1) volume_mean=histroy_volume.rolling(20).mean() # same here using shift so todays volume does not comes into play df['volume_ratio']=np.where(volume_mean==0,0,volume/volume_mean) return df #print(compute_volume_ratio(split_data['1h']['BTCUSDT']).head(25)) #adds volume ratio to split data for coin in coins: split_data['1h'][coin]=compute_volume_ratio(split_data['1h'][coin]) #print(split_data['1h']['BTCUSDT']) # computes rolling momentum for different time period def compute_rolling_return(df,window): df=df.copy() rolling_return=df['close'].pct_change(periods=window) for_calculation=rolling_return.shift(1) df[f'return_{window}']=for_calculation return df #print(compute_rolling_return(split_data['1h']['BTCUSDT'],6).head(25)) #add rolling momentum of period 6,12and 24 to 1h for window in [24,6,12]: for coin in coins: split_data['1h'][coin]=compute_rolling_return(split_data['1h'][coin],window) #print(split_data['1h']['BTCUSDT']) #add rolling momentum of period 4 to 15m for coin in coins: split_data['15m'][coin]=compute_rolling_return(split_data['15m'][coin],4) #print(split_data['15m']['BTCUSDT']) # combines rolling momentum of all coins together combine_momentum={} for coin in coins: combine_momentum[coin]=split_data['1h'][coin]['return_24'] return_combined=pd.DataFrame(combine_momentum) #print(return_combined) # calculates relative momentum return_combined_mean=return_combined.mean(axis=1) dispersion=return_combined.std(axis=1) cross_rank=return_combined.rank(axis=1) relative_momentum=return_combined.sub(return_combined_mean,axis=0) # add relative momentum to split data for coin in coins: split_data['1h'][coin]['relative_momentum']=relative_momentum[coin] split_data['1h'][coin]['dispersion']=dispersion split_data['1h'][coin]['cross_rank']=cross_rank[coin] #print(split_data['1h']['BTCUSDT']) # this is rolling momentum used in regime calculation combined_momentum_1d={} for coin in coins: combined_momentum_1d[coin]=split_data['1d'][coin]['close'] return_combined_1d=pd.DataFrame(combined_momentum_1d) #print(return_combined_1d) # calcualtion of regime rolling_mean_20 = return_combined_1d.rolling(window=20).mean().shift(1) above_MA = (return_combined_1d > rolling_mean_20).astype(int) regime_score=above_MA.mean(axis=1) regime_score.iloc[:20]=np.nan #as they we getting converted in false and then giving values as 0 #print(regime_score) # adds regime to split data for coin in coins: split_data['1d'][coin]['regime_score']=regime_score #print(split_data['1d']['BTCUSDT']) # braking in 70 for training , 15 for valadating and 15 for testing daily_index=split_data['1d']['BTCUSDT'].index total=len(daily_index) train_end=daily_index[int(total*0.70)] val_end=daily_index[int(total*0.85)] test_end=daily_index[-1] #print(train_end,val_end,test_end) train_data={} for time in ['1d','1h','15m','5m']: train_data[time]={} for coin in coins: train_data[time][coin]=split_data[time][coin].loc[split_data[time][coin].index<=train_end] val_data={} for time in ['1d','1h','15m','5m']: val_data[time]={} for coin in coins: val_data[time][coin]=split_data[time][coin].loc[(split_data[time][coin].index>train_end) & (split_data[time][coin].index < val_end)] test_data={} for time in ['1d','1h','15m','5m']: test_data[time]={} for coin in coins: test_data[time][coin]=split_data[time][coin].loc[split_data[time][coin].index >=val_end] #print(train_data['1d']['BTCUSDT'].shape) #print(train_data['1h']['BTCUSDT'].shape) #print(val_data['1d']['BTCUSDT'].shape) #print(val_data['1h']['BTCUSDT'].shape) # Signal Logic def generate_signal(df): df=df.copy() cross_rank=df['cross_rank'] ATR=df['ATR'] ATR_baseline=df['ATR_baseline'] volume_ratio=df['volume_ratio'] long_siganl= (cross_rank>=6) & (ATR > ATR_baseline) & (volume_ratio > 1) short_signal=(cross_rank<=2) & (ATR > ATR_baseline) & (volume_ratio > 1) df['signal']=np.where(long_siganl,1, np.where(short_signal,-1,0)) return df #print(generate_signal(train_data['1h']['BTCUSDT'])['signal'].value_counts()) # adds signal logic to data frame for coin in coins: val_data['1h'][coin] = generate_signal(val_data['1h'][coin]) test_data['1h'][coin] = generate_signal(test_data['1h'][coin]) train_data['1h'][coin]=generate_signal(train_data['1h'][coin]) #print(train_data['1h']['BTCUSDT']) # most important part - computes actual backtest def backtest(df,coins,regime_score): hourly_index=df['1h']['BTCUSDT'].index regime_hourly=regime_score.reindex(hourly_index, method='ffill') regime_hourly = regime_hourly.fillna(0.5) #print(regime_hourly.head(50)) combined_siganl={} for coin in coins: combined_siganl[coin]=df['1h'][coin]['signal'].shift(1) combined_siganl=pd.DataFrame(combined_siganl) #print(combined_siganl) combined_return6 = {} for coin in coins: combined_return6[coin] = df['1h'][coin]['return_6'] combined_return6 = pd.DataFrame(combined_return6) combined_return12 = {} for coin in coins: combined_return12[coin] = df['1h'][coin]['return_12'] combined_return12 = pd.DataFrame(combined_return12) deterioration = (combined_return6<0) & (combined_return12<0) combined_siganl[deterioration] = 0 combined_return={} for coin in coins: combined_return[coin] = df['1h'][coin]['open'].pct_change().shift(-1) combined_return=pd.DataFrame(combined_return) combined_return = combined_return.clip(-0.1, 0.1) regime_matrix = np.repeat(regime_hourly.values[:, None], len(coins), axis=1) weights = np.where( combined_siganl == 1, regime_matrix, np.where(combined_siganl == -1, regime_matrix - 1, 0) ) weights = pd.DataFrame(weights, index=combined_siganl.index, columns=coins) weight_sum = weights.abs().sum(axis=1).replace(0, 1) weights = weights.div(weight_sum, axis=0) portfolio_returns=(combined_return * weights).sum(axis=1) normalized_return=portfolio_returns total_return=(1+normalized_return).cumprod() active_returns = portfolio_returns[portfolio_returns != 0] return normalized_return, total_return, combined_siganl train_returns, train_equity, train_signals = backtest(train_data, coins, regime_score) val_returns, val_equity, val_signals = backtest(val_data, coins, regime_score) test_returns, test_equity, test_signals = backtest(test_data, coins, regime_score) #print(train_equity.iloc[-1]) #print(val_equity.iloc[-1]) #print(test_equity.iloc[-1]) # computes all return matrix def compute_metrics(returns, equity, combined_signal, label): initial_amount=100000 if returns.std() == 0: sharpe = 0 else: sharpe = returns.mean() / returns.std() * np.sqrt(8760) drawdown_series = (equity / equity.cummax() - 1) max_drawdown = drawdown_series.min() winrate = (returns[returns != 0] > 0).mean() starting_date = returns.index[0] ending_date = returns.index[-1] total_return = (equity.iloc[-1] - 1) * 100 is_drawdown = drawdown_series < 0 groups = (is_drawdown != is_drawdown.shift()).cumsum() duration = groups[is_drawdown].value_counts().max() total_trades = 0 for coin in combined_signal.columns: signal = combined_signal[coin] entries = ((signal != 0) & (signal.shift(1) == 0)).sum() total_trades += entries print(f"\n{label}") print(f"Start: {starting_date}") print(f"End: {ending_date}") print(f"Total Return: {total_return:.2f}%") print(f'Final portfolio value: {(initial_amount * equity.iloc[-1]):.2f}') print(f"Sharpe: {sharpe:.2f}") print(f"Max Drawdown: {max_drawdown:.2%}") print(f"Max DD Duration:{duration} hours") print(f"Win Rate: {winrate:.2%}") print(f"Total Trades: {total_trades}") # for train , val and test compute_metrics(train_returns, train_equity, train_signals, 'Train') compute_metrics(val_returns, val_equity, val_signals, 'Val') compute_metrics(test_returns, test_equity, test_signals, 'Test') # for whole data set for coin in coins: split_data['1h'][coin] = generate_signal(split_data['1h'][coin]) full_returns, full_equity, full_signals = backtest(split_data, coins, regime_score) compute_metrics(full_returns, full_equity, full_signals, 'Full Dataset') fig, axes = plt.subplots(4, 2, figsize=(14, 16)) for i, (equity, returns, label) in enumerate([ (full_equity, full_returns, 'Full Dataset'), (train_equity, train_returns, 'Train'), (val_equity, val_returns, 'Val'), (test_equity, test_returns, 'Test') ]): dd_series = (equity / equity.cummax() - 1) equity.plot(ax=axes[i][0], title=f'{label} Equity Curve') dd_series.plot(ax=axes[i][1], title=f'{label} Drawdown', color='red') axes[i][1].fill_between(dd_series.index, dd_series, 0, alpha=0.3, color='red') plt.tight_layout() plt.savefig('equity_curves.png') plt.show()