from typing import Dict import numpy as np import pandas as pd from utils import info_ratio def _compute_signal_score( timestamps: np.ndarray, prob_positive: np.ndarray, prob_negative: np.ndarray, prob_mild: np.ndarray, smoothing_seconds: int, ) -> np.ndarray: timestamps = np.asarray(timestamps, dtype=float) prob_positive = np.asarray(prob_positive, dtype=float) prob_negative = np.asarray(prob_negative, dtype=float) prob_mild = np.asarray(prob_mild, dtype=float) directional_edge = np.subtract( prob_positive, prob_negative, out=np.full_like(prob_positive, np.nan, dtype=float), where=np.isfinite(prob_positive) & np.isfinite(prob_negative), ) confidence_weight = np.clip(1.0 - np.nan_to_num(prob_mild, nan=1.0), 0.0, 1.0) raw_signal = directional_edge * confidence_weight signal_series = pd.Series( raw_signal, index=pd.to_datetime(timestamps.astype(np.int64), unit="ms"), dtype=float, ) min_periods = max(5, int(max(smoothing_seconds, 1) // 10)) signal_score = signal_series.ewm( halflife=max(int(smoothing_seconds), 1), adjust=False, min_periods=min_periods, ).mean() return signal_score.to_numpy(dtype=float) def _compute_signal_thresholds( timestamps: np.ndarray, signal_score: np.ndarray, window_minutes: int, buy_signal_quantile: float, close_signal_quantile: float, ) -> tuple[np.ndarray, np.ndarray]: timestamps = np.asarray(timestamps, dtype=float) signal_score = np.asarray(signal_score, dtype=float) signal_series = pd.Series( signal_score, index=pd.to_datetime(timestamps.astype(np.int64), unit="ms"), dtype=float, ) rolling_window = f"{int(window_minutes)}min" min_periods = max(60, int(window_minutes * 60 * 0.25)) rolling_signal = signal_series.rolling(rolling_window, min_periods=min_periods) buy_threshold = rolling_signal.quantile(buy_signal_quantile).shift(1) close_threshold = rolling_signal.quantile(close_signal_quantile).shift(1) return ( buy_threshold.to_numpy(dtype=float), close_threshold.to_numpy(dtype=float), ) def _compute_step_returns(equity_curve: np.ndarray) -> np.ndarray: equity_curve = np.asarray(equity_curve, dtype=float) step_returns = np.zeros(len(equity_curve), dtype=float) if len(equity_curve) <= 1: return step_returns prev_equity = equity_curve[:-1] next_equity = equity_curve[1:] valid = np.isfinite(prev_equity) & np.isfinite(next_equity) & (prev_equity > 0.0) step_returns[1:] = 0.0 step_returns[1:][valid] = next_equity[valid] / prev_equity[valid] - 1.0 return step_returns def _compute_max_drawdown(equity_curve: np.ndarray) -> float: equity_curve = np.asarray(equity_curve, dtype=float) if equity_curve.size == 0: return np.nan running_peak = np.maximum.accumulate(equity_curve) drawdowns = equity_curve / np.maximum(running_peak, 1e-12) - 1.0 return float(np.min(drawdowns)) def _compute_profit_factor(trade_returns: np.ndarray) -> float: trade_returns = np.asarray(trade_returns, dtype=float) if trade_returns.size == 0: return np.nan gross_profit = trade_returns[trade_returns > 0.0].sum() gross_loss = -trade_returns[trade_returns < 0.0].sum() if gross_loss <= 0.0: return np.inf if gross_profit > 0.0 else np.nan return float(gross_profit / gross_loss) def run_long_only_hft_backtest( timestamps: np.ndarray, mid_prices: np.ndarray, best_bids: np.ndarray, best_asks: np.ndarray, prob_positive: np.ndarray, prob_negative: np.ndarray, prob_mild: np.ndarray, buy_signal_quantile: float, close_signal_quantile: float, signal_window_minutes: int, signal_smoothing_seconds: int, transaction_fee_rate: float, min_holding_seconds: int = 0, initial_cash: float = 1.0, close_position: bool = True, ) -> Dict[str, object]: timestamps = np.asarray(timestamps, dtype=float) mid_prices = np.asarray(mid_prices, dtype=float) best_bids = np.asarray(best_bids, dtype=float) best_asks = np.asarray(best_asks, dtype=float) prob_positive = np.asarray(prob_positive, dtype=float) prob_negative = np.asarray(prob_negative, dtype=float) prob_mild = np.asarray(prob_mild, dtype=float) n_samples = len(timestamps) arrays = [mid_prices, best_bids, best_asks, prob_positive, prob_negative, prob_mild] if any(len(arr) != n_samples for arr in arrays): raise ValueError("All backtest inputs must have the same length.") signal_score = _compute_signal_score( timestamps=timestamps, prob_positive=prob_positive, prob_negative=prob_negative, prob_mild=prob_mild, smoothing_seconds=signal_smoothing_seconds, ) buy_signal_threshold, close_signal_threshold = _compute_signal_thresholds( timestamps=timestamps, signal_score=signal_score, window_minutes=signal_window_minutes, buy_signal_quantile=buy_signal_quantile, close_signal_quantile=close_signal_quantile, ) cash_net = np.full(n_samples, np.nan, dtype=float) cash_gross = np.full(n_samples, np.nan, dtype=float) qty_net = np.zeros(n_samples, dtype=float) qty_gross = np.zeros(n_samples, dtype=float) equity_net = np.full(n_samples, np.nan, dtype=float) equity_gross = np.full(n_samples, np.nan, dtype=float) positions = np.zeros(n_samples, dtype=float) turnover = np.zeros(n_samples, dtype=float) trade_flags = np.zeros(n_samples, dtype=float) entry_flags = np.zeros(n_samples, dtype=float) exit_flags = np.zeros(n_samples, dtype=float) transaction_costs = np.zeros(n_samples, dtype=float) trade_events = [] round_trips = [] open_trade = None trade_event_columns = [ "timestampms", "index", "event", "price", "mid_price", "quantity", "equity_before", "equity_after", ] trade_columns = [ "entry_index", "exit_index", "entry_timestampms", "exit_timestampms", "entry_price", "exit_price", "entry_mid_price", "exit_mid_price", "holding_time_ms", "entry_equity", "exit_equity", "trade_return", "trade_pnl", "won", ] if n_samples == 0: empty_trades = pd.DataFrame(columns=trade_columns) empty_events = pd.DataFrame(columns=trade_event_columns) return { "positions": positions, "gross_returns": np.zeros(0, dtype=float), "net_returns": np.zeros(0, dtype=float), "turnover": turnover, "equity_curve_net": np.zeros(0, dtype=float), "equity_curve_gross": np.zeros(0, dtype=float), "cash_curve_net": cash_net, "cash_curve_gross": cash_gross, "quantity_curve_net": qty_net, "quantity_curve_gross": qty_gross, "trade_flags": trade_flags, "entry_flags": entry_flags, "exit_flags": exit_flags, "transaction_costs": transaction_costs, "trade_events_df": empty_events, "trades_df": empty_trades, "signal_score": np.zeros(0, dtype=float), "buy_signal_threshold": np.zeros(0, dtype=float), "close_signal_threshold": np.zeros(0, dtype=float), "metrics": {}, } cash_net[0] = initial_cash cash_gross[0] = initial_cash equity_net[0] = initial_cash equity_gross[0] = initial_cash min_holding_ms = max(int(min_holding_seconds), 0) * 1000 for idx in range(n_samples - 1): next_idx = idx + 1 cash_net[next_idx] = cash_net[idx] cash_gross[next_idx] = cash_gross[idx] qty_net[next_idx] = qty_net[idx] qty_gross[next_idx] = qty_gross[idx] current_equity_net = equity_net[idx] is_long = qty_net[idx] > 0.0 current_signal = signal_score[idx] current_buy_threshold = buy_signal_threshold[idx] current_close_threshold = close_signal_threshold[idx] buy_signal = ( np.isfinite(current_signal) and np.isfinite(current_buy_threshold) and current_signal > current_buy_threshold ) close_signal = ( np.isfinite(current_signal) and np.isfinite(current_close_threshold) and current_signal < current_close_threshold ) holding_time_ms = 0.0 if open_trade is None else timestamps[idx] - open_trade["entry_time"] can_close = holding_time_ms >= min_holding_ms if buy_signal and close_signal: buy_signal = False close_signal = False if (not is_long) and buy_signal: ask_price = best_asks[next_idx] if np.isfinite(ask_price) and ask_price > 0.0: gross_qty = cash_gross[idx] / ask_price if cash_gross[idx] > 0.0 else 0.0 net_qty = cash_net[idx] / (ask_price * (1.0 + transaction_fee_rate)) if cash_net[idx] > 0.0 else 0.0 traded_notional = ask_price * net_qty cash_gross[next_idx] = 0.0 cash_net[next_idx] = 0.0 qty_gross[next_idx] = gross_qty qty_net[next_idx] = net_qty turnover[next_idx] = traded_notional / max(current_equity_net, 1e-12) transaction_costs[next_idx] = traded_notional * transaction_fee_rate trade_flags[next_idx] = 1.0 entry_flags[next_idx] = 1.0 open_trade = { "entry_idx": next_idx, "entry_time": timestamps[next_idx], "entry_price": ask_price, "entry_mid_price": mid_prices[next_idx], "entry_equity": current_equity_net, "quantity_net": net_qty, "quantity_gross": gross_qty, } trade_events.append({ "timestampms": timestamps[next_idx], "index": next_idx, "event": "buy", "price": ask_price, "mid_price": mid_prices[next_idx], "quantity": net_qty, "equity_before": current_equity_net, "equity_after": np.nan, }) elif is_long and close_signal and can_close: bid_price = best_bids[next_idx] if np.isfinite(bid_price) and bid_price > 0.0: gross_exit_value = qty_gross[idx] * bid_price net_exit_value = qty_net[idx] * bid_price * (1.0 - transaction_fee_rate) traded_notional = qty_net[idx] * bid_price cash_gross[next_idx] = gross_exit_value cash_net[next_idx] = net_exit_value qty_gross[next_idx] = 0.0 qty_net[next_idx] = 0.0 turnover[next_idx] = traded_notional / max(current_equity_net, 1e-12) transaction_costs[next_idx] = traded_notional * transaction_fee_rate trade_flags[next_idx] = 1.0 exit_flags[next_idx] = 1.0 trade_events.append({ "timestampms": timestamps[next_idx], "index": next_idx, "event": "sell", "price": bid_price, "mid_price": mid_prices[next_idx], "quantity": qty_net[idx], "equity_before": current_equity_net, "equity_after": net_exit_value, }) if open_trade is not None: trade_return = net_exit_value / max(open_trade["entry_equity"], 1e-12) - 1.0 round_trips.append({ "entry_index": open_trade["entry_idx"], "exit_index": next_idx, "entry_timestampms": open_trade["entry_time"], "exit_timestampms": timestamps[next_idx], "entry_price": open_trade["entry_price"], "exit_price": bid_price, "entry_mid_price": open_trade["entry_mid_price"], "exit_mid_price": mid_prices[next_idx], "holding_time_ms": timestamps[next_idx] - open_trade["entry_time"], "entry_equity": open_trade["entry_equity"], "exit_equity": net_exit_value, "trade_return": trade_return, "trade_pnl": net_exit_value - open_trade["entry_equity"], "won": float(trade_return > 0.0), }) open_trade = None positions[next_idx] = float(qty_net[next_idx] > 0.0) equity_gross[next_idx] = cash_gross[next_idx] + qty_gross[next_idx] * mid_prices[next_idx] equity_net[next_idx] = cash_net[next_idx] + qty_net[next_idx] * mid_prices[next_idx] if trade_events and np.isnan(trade_events[-1]["equity_after"]): trade_events[-1]["equity_after"] = equity_net[next_idx] if close_position and qty_net[-1] > 0.0: bid_price = best_bids[-1] if np.isfinite(bid_price) and bid_price > 0.0: current_equity_net = equity_net[-1] gross_qty_before = qty_gross[-1] net_qty_before = qty_net[-1] gross_exit_value = gross_qty_before * bid_price net_exit_value = net_qty_before * bid_price * (1.0 - transaction_fee_rate) traded_notional = net_qty_before * bid_price cash_gross[-1] = gross_exit_value cash_net[-1] = net_exit_value qty_gross[-1] = 0.0 qty_net[-1] = 0.0 positions[-1] = 0.0 turnover[-1] += traded_notional / max(current_equity_net, 1e-12) transaction_costs[-1] += traded_notional * transaction_fee_rate trade_flags[-1] = 1.0 exit_flags[-1] = 1.0 equity_gross[-1] = gross_exit_value equity_net[-1] = net_exit_value trade_events.append({ "timestampms": timestamps[-1], "index": n_samples - 1, "event": "forced_sell", "price": bid_price, "mid_price": mid_prices[-1], "quantity": net_qty_before, "equity_before": current_equity_net, "equity_after": net_exit_value, }) if open_trade is not None: trade_return = net_exit_value / max(open_trade["entry_equity"], 1e-12) - 1.0 round_trips.append({ "entry_index": open_trade["entry_idx"], "exit_index": n_samples - 1, "entry_timestampms": open_trade["entry_time"], "exit_timestampms": timestamps[-1], "entry_price": open_trade["entry_price"], "exit_price": bid_price, "entry_mid_price": open_trade["entry_mid_price"], "exit_mid_price": mid_prices[-1], "holding_time_ms": timestamps[-1] - open_trade["entry_time"], "entry_equity": open_trade["entry_equity"], "exit_equity": net_exit_value, "trade_return": trade_return, "trade_pnl": net_exit_value - open_trade["entry_equity"], "won": float(trade_return > 0.0), }) open_trade = None net_returns = _compute_step_returns(equity_net) gross_returns = _compute_step_returns(equity_gross) trades_df = pd.DataFrame(round_trips, columns=trade_columns) trade_events_df = pd.DataFrame(trade_events, columns=trade_event_columns) trade_event_times = timestamps[trade_flags > 0.0] avg_trade_interval_ms = np.nan if len(trade_event_times) >= 2: avg_trade_interval_ms = float(np.mean(np.diff(trade_event_times))) avg_holding_time_ms = np.nan win_rate = np.nan avg_trade_return = np.nan median_trade_return = np.nan best_trade_return = np.nan worst_trade_return = np.nan profit_factor = np.nan if not trades_df.empty: trade_returns = trades_df["trade_return"].to_numpy(dtype=float) avg_holding_time_ms = float(trades_df["holding_time_ms"].mean()) win_rate = float((trades_df["trade_return"] > 0.0).mean()) avg_trade_return = float(np.mean(trade_returns)) median_trade_return = float(np.median(trade_returns)) best_trade_return = float(np.max(trade_returns)) worst_trade_return = float(np.min(trade_returns)) profit_factor = _compute_profit_factor(trade_returns) metrics = { "trade_count": float(np.sum(trade_flags > 0.0)), "entry_count": float(np.sum(entry_flags > 0.0)), "exit_count": float(np.sum(exit_flags > 0.0)), "round_trip_count": float(len(trades_df)), "avg_trade_interval_ms": avg_trade_interval_ms, "avg_trade_interval_s": np.nan if np.isnan(avg_trade_interval_ms) else avg_trade_interval_ms / 1000.0, "avg_holding_time_ms": avg_holding_time_ms, "avg_holding_time_s": np.nan if np.isnan(avg_holding_time_ms) else avg_holding_time_ms / 1000.0, "win_rate": win_rate, "avg_trade_return": avg_trade_return, "median_trade_return": median_trade_return, "best_trade_return": best_trade_return, "worst_trade_return": worst_trade_return, "profit_factor": profit_factor, "avg_strategy_ret": float(np.mean(net_returns)) if len(net_returns) else np.nan, "strategy_ir": info_ratio(net_returns), "avg_gross_strategy_ret": float(np.mean(gross_returns)) if len(gross_returns) else np.nan, "gross_strategy_ir": info_ratio(gross_returns), "trade_rate": float(np.mean(trade_flags > 0.0)) if len(trade_flags) else np.nan, "avg_turnover": float(np.mean(turnover)) if len(turnover) else np.nan, "exposure_rate": float(np.mean(positions > 0.0)) if len(positions) else np.nan, "cum_strategy_ret": float(equity_net[-1] / initial_cash - 1.0), "cum_gross_strategy_ret": float(equity_gross[-1] / initial_cash - 1.0), "final_equity_net": float(equity_net[-1]), "final_equity_gross": float(equity_gross[-1]), "max_drawdown": _compute_max_drawdown(equity_net), "gross_max_drawdown": _compute_max_drawdown(equity_gross), "transaction_cost_paid": float(np.sum(transaction_costs)), } return { "positions": positions, "gross_returns": gross_returns, "net_returns": net_returns, "turnover": turnover, "equity_curve_net": equity_net, "equity_curve_gross": equity_gross, "cash_curve_net": cash_net, "cash_curve_gross": cash_gross, "quantity_curve_net": qty_net, "quantity_curve_gross": qty_gross, "trade_flags": trade_flags, "entry_flags": entry_flags, "exit_flags": exit_flags, "transaction_costs": transaction_costs, "trade_events_df": trade_events_df, "trades_df": trades_df, "signal_score": signal_score, "buy_signal_threshold": buy_signal_threshold, "close_signal_threshold": close_signal_threshold, "metrics": metrics, }