import numpy as np import pandas as pd from sklearn.model_selection import TimeSeriesSplit try: import lightgbm as lgb except ImportError: raise ImportError("Please install lightgbm first: pip install lightgbm") from hft_backtesting import run_long_only_hft_backtest from utils import ( ALL_CLASSES, CLASS_MILD, CLASS_NEGATIVE, CLASS_POSITIVE, align_probability_matrix, build_confusion_matrix_frame, class_ids_to_labels, constant_probability_matrix, evaluate_predictions, filter_report_metrics, predicted_classes_from_probabilities, print_metrics, ) def build_model(params: dict, observed_classes: np.ndarray): observed_classes = np.asarray(observed_classes, dtype=int) if observed_classes.size < 2: return None model_params = dict(params) if observed_classes.size == 2: model_params["objective"] = "binary" model_params.pop("num_class", None) else: model_params["objective"] = "multiclass" model_params["num_class"] = len(ALL_CLASSES) return lgb.LGBMClassifier(**model_params) def split_train_validation( X_train, y_train, validation_fraction: float, purge_size: int = 0, ): n_train = len(X_train) if n_train < 5 or validation_fraction <= 0: return X_train, y_train, None, None val_size = int(np.ceil(n_train * validation_fraction)) val_size = min(max(val_size, 1), n_train - 1) purge_size = max(int(purge_size), 0) fit_end = n_train - val_size - purge_size if fit_end <= 0: return X_train, y_train, None, None X_fit = X_train.iloc[:fit_end] y_fit = y_train[:fit_end] X_val = X_train.iloc[-val_size:] y_val = y_train[-val_size:] return X_fit, y_fit, X_val, y_val def predict_probabilities(model, X_input): raw_probabilities = model.predict_proba(X_input) return align_probability_matrix(raw_probabilities, model.classes_) def train_one_fold( X_train, y_train, X_test, lgb_params: dict, validation_fraction: float, early_stopping_rounds: int, purge_size: int = 0, ): X_fit, y_fit, X_val, y_val = split_train_validation( X_train, y_train, validation_fraction, purge_size=purge_size, ) observed_classes = np.unique(y_fit) model = build_model(lgb_params, observed_classes=observed_classes) if model is None: constant_class = int(observed_classes[0]) pred_prob = constant_probability_matrix(len(X_test), constant_class) importance = np.zeros(X_train.shape[1], dtype=float) return model, pred_prob, importance fit_kwargs = { "X": X_fit, "y": y_fit, "callbacks": [lgb.log_evaluation(0)], } val_classes = np.unique(y_val) if y_val is not None else np.array([], dtype=int) if ( X_val is not None and early_stopping_rounds > 0 and val_classes.size >= 2 and np.all(np.isin(val_classes, observed_classes)) ): fit_kwargs["eval_set"] = [(X_val, y_val)] fit_kwargs["eval_metric"] = "multi_logloss" if np.unique(y_fit).size >= 3 else "binary_logloss" fit_kwargs["callbacks"] = [ lgb.early_stopping(early_stopping_rounds), lgb.log_evaluation(0), ] model.fit(**fit_kwargs) pred_prob = predict_probabilities(model, X_test) importance = model.feature_importances_ return model, pred_prob, importance def _build_fold_metrics( y_true: np.ndarray, pred_prob: np.ndarray, label: str, backtest_result: dict, transaction_fee_rate: float, buy_signal_quantile: float, close_signal_quantile: float, ): metrics = evaluate_predictions( y_true=y_true, y_prob=pred_prob, label=label, strategy_positions=backtest_result["positions"], strategy_gross_returns=backtest_result["gross_returns"], strategy_net_returns=backtest_result["net_returns"], strategy_turnover=backtest_result["turnover"], transaction_fee_rate=transaction_fee_rate, buy_in_threshold=buy_signal_quantile, close_threshold=close_signal_quantile, trade_count=int(backtest_result["metrics"].get("trade_count", 0.0)), avg_trade_interval_ms=backtest_result["metrics"].get("avg_trade_interval_ms"), ) metrics.update(backtest_result["metrics"]) return filter_report_metrics(metrics) def run_5fold_backtest( data: pd.DataFrame, feature_cols, target_col: str, timestamp_col: str, n_splits: int, lgb_params: dict, validation_fraction: float = 0.2, early_stopping_rounds: int = 50, transaction_fee_rate: float = 0.0, signal_score_buy_quantile: float = 0.9, signal_score_close_quantile: float = 0.1, signal_score_window_minutes: int = 60, signal_smoothing_seconds: int = 60, min_holding_seconds: int = 0, prediction_horizon: int = 0, ): quote_cols = ["mid", "best_bid", "best_ask"] required_cols = feature_cols + [target_col, timestamp_col, "target_ret", "target_label"] + quote_cols data = data.dropna(subset=required_cols).reset_index(drop=True) X = data[feature_cols] y = data[target_col].astype(int).values tscv = TimeSeriesSplit(n_splits=n_splits) fold_metrics = [] fold_confusion_frames = [] all_true_class = np.full(len(data), np.nan) all_true_return = np.full(len(data), np.nan) all_prob_negative = np.full(len(data), np.nan) all_prob_mild = np.full(len(data), np.nan) all_prob_positive = np.full(len(data), np.nan) feature_importances = [] purge_size = max(int(prediction_horizon), 0) for fold_id, (train_idx, test_idx) in enumerate(tscv.split(X), start=1): if purge_size > 0: train_cutoff = test_idx[0] - purge_size train_idx = train_idx[train_idx < train_cutoff] if len(train_idx) == 0: raise ValueError(f"Fold {fold_id} has no training rows after applying purge_size={purge_size}.") X_train = X.iloc[train_idx] y_train = y[train_idx] X_test = X.iloc[test_idx] y_test = y[test_idx] test_frame = data.iloc[test_idx].copy() model, pred_prob, importances = train_one_fold( X_train=X_train, y_train=y_train, X_test=X_test, lgb_params=lgb_params, validation_fraction=validation_fraction, early_stopping_rounds=early_stopping_rounds, purge_size=purge_size, ) fold_backtest = run_long_only_hft_backtest( timestamps=test_frame[timestamp_col].to_numpy(dtype=float), mid_prices=test_frame["mid"].to_numpy(dtype=float), best_bids=test_frame["best_bid"].to_numpy(dtype=float), best_asks=test_frame["best_ask"].to_numpy(dtype=float), prob_positive=pred_prob[:, CLASS_POSITIVE], prob_negative=pred_prob[:, CLASS_NEGATIVE], prob_mild=pred_prob[:, CLASS_MILD], buy_signal_quantile=signal_score_buy_quantile, close_signal_quantile=signal_score_close_quantile, signal_window_minutes=signal_score_window_minutes, signal_smoothing_seconds=signal_smoothing_seconds, transaction_fee_rate=transaction_fee_rate, min_holding_seconds=min_holding_seconds, initial_cash=1.0, close_position=True, ) all_true_class[test_idx] = y_test all_true_return[test_idx] = test_frame["target_ret"].to_numpy(dtype=float) all_prob_negative[test_idx] = pred_prob[:, CLASS_NEGATIVE] all_prob_mild[test_idx] = pred_prob[:, CLASS_MILD] all_prob_positive[test_idx] = pred_prob[:, CLASS_POSITIVE] metrics = _build_fold_metrics( y_true=y_test, pred_prob=pred_prob, label=f"fold_{fold_id}", backtest_result=fold_backtest, transaction_fee_rate=transaction_fee_rate, buy_signal_quantile=signal_score_buy_quantile, close_signal_quantile=signal_score_close_quantile, ) fold_metrics.append(metrics) fold_confusion_frames.append(build_confusion_matrix_frame(y_true=y_test, y_prob=pred_prob, label=f"fold_{fold_id}")) print_metrics(f"Fold {fold_id}", metrics) fi = pd.DataFrame({ "feature": feature_cols, "importance": importances, "fold": fold_id, }) feature_importances.append(fi) metrics_df = pd.DataFrame(fold_metrics) fi_df = pd.concat(feature_importances, axis=0).reset_index(drop=True) valid_mask = ~np.isnan(all_prob_positive) oos_frame = data.loc[valid_mask, [timestamp_col, "seq", "mid", "best_bid", "best_ask", "target_ret", target_col, "target_label"]].copy() oos_prob = np.column_stack([ all_prob_negative[valid_mask], all_prob_mild[valid_mask], all_prob_positive[valid_mask], ]) overall_backtest = run_long_only_hft_backtest( timestamps=oos_frame[timestamp_col].to_numpy(dtype=float), mid_prices=oos_frame["mid"].to_numpy(dtype=float), best_bids=oos_frame["best_bid"].to_numpy(dtype=float), best_asks=oos_frame["best_ask"].to_numpy(dtype=float), prob_positive=oos_prob[:, CLASS_POSITIVE], prob_negative=oos_prob[:, CLASS_NEGATIVE], prob_mild=oos_prob[:, CLASS_MILD], buy_signal_quantile=signal_score_buy_quantile, close_signal_quantile=signal_score_close_quantile, signal_window_minutes=signal_score_window_minutes, signal_smoothing_seconds=signal_smoothing_seconds, transaction_fee_rate=transaction_fee_rate, min_holding_seconds=min_holding_seconds, initial_cash=1.0, close_position=True, ) overall_metrics = _build_fold_metrics( y_true=all_true_class[valid_mask].astype(int), pred_prob=oos_prob, label="overall_oos", backtest_result=overall_backtest, transaction_fee_rate=transaction_fee_rate, buy_signal_quantile=signal_score_buy_quantile, close_signal_quantile=signal_score_close_quantile, ) overall_confusion_df = build_confusion_matrix_frame( y_true=all_true_class[valid_mask].astype(int), y_prob=oos_prob, label="overall_oos", ) print_metrics("Overall OOS", overall_metrics) all_positions = np.full(len(data), np.nan) all_gross_strategy_ret = np.full(len(data), np.nan) all_net_strategy_ret = np.full(len(data), np.nan) all_turnover = np.full(len(data), np.nan) all_equity_curve_net = np.full(len(data), np.nan) all_equity_curve_gross = np.full(len(data), np.nan) all_trade_flags = np.full(len(data), np.nan) all_entry_flags = np.full(len(data), np.nan) all_exit_flags = np.full(len(data), np.nan) all_transaction_costs = np.full(len(data), np.nan) all_pred_class = np.full(len(data), np.nan) all_positions[valid_mask] = overall_backtest["positions"] all_gross_strategy_ret[valid_mask] = overall_backtest["gross_returns"] all_net_strategy_ret[valid_mask] = overall_backtest["net_returns"] all_turnover[valid_mask] = overall_backtest["turnover"] all_equity_curve_net[valid_mask] = overall_backtest["equity_curve_net"] all_equity_curve_gross[valid_mask] = overall_backtest["equity_curve_gross"] all_trade_flags[valid_mask] = overall_backtest["trade_flags"] all_entry_flags[valid_mask] = overall_backtest["entry_flags"] all_exit_flags[valid_mask] = overall_backtest["exit_flags"] all_transaction_costs[valid_mask] = overall_backtest["transaction_costs"] all_pred_class[valid_mask] = predicted_classes_from_probabilities(oos_prob) oos_frame[target_col] = oos_frame[target_col].astype(int) oos_frame["predicted_class"] = all_pred_class[valid_mask].astype(int) oos_frame["predicted_label"] = class_ids_to_labels(oos_frame["predicted_class"].values) oos_frame["prob_positive"] = all_prob_positive[valid_mask] oos_frame["prob_negative"] = all_prob_negative[valid_mask] oos_frame["prob_mild"] = all_prob_mild[valid_mask] oos_frame["long_position"] = all_positions[valid_mask] oos_frame["strategy_gross_return"] = all_gross_strategy_ret[valid_mask] oos_frame["strategy_net_return"] = all_net_strategy_ret[valid_mask] oos_frame["turnover"] = all_turnover[valid_mask] oos_frame["equity_curve_net"] = all_equity_curve_net[valid_mask] oos_frame["equity_curve_gross"] = all_equity_curve_gross[valid_mask] oos_frame["trade_flag"] = all_trade_flags[valid_mask] oos_frame["entry_flag"] = all_entry_flags[valid_mask] oos_frame["exit_flag"] = all_exit_flags[valid_mask] oos_frame["transaction_cost"] = all_transaction_costs[valid_mask] oos_frame["signal_strength"] = oos_frame["prob_positive"] - oos_frame["prob_negative"] oos_frame["signal_score"] = overall_backtest["signal_score"] oos_frame["buy_signal_threshold"] = overall_backtest["buy_signal_threshold"] oos_frame["close_signal_threshold"] = overall_backtest["close_signal_threshold"] oos_frame["mid_forward_change"] = oos_frame["mid"].shift(-1) - oos_frame["mid"] oos_frame["profitable_long_interval"] = ( (oos_frame["long_position"] > 0.0) & (oos_frame["mid_forward_change"] > 0.0) ).astype(float) return { "metrics_df": metrics_df, "overall_metrics": overall_metrics, "all_true_class": all_true_class, "all_true_return": all_true_return, "all_pred_class": all_pred_class, "all_prob_positive": all_prob_positive, "all_prob_negative": all_prob_negative, "all_prob_mild": all_prob_mild, "all_positions": all_positions, "all_gross_strategy_ret": all_gross_strategy_ret, "all_net_strategy_ret": all_net_strategy_ret, "all_turnover": all_turnover, "all_equity_curve_net": all_equity_curve_net, "all_equity_curve_gross": all_equity_curve_gross, "all_trade_flags": all_trade_flags, "all_entry_flags": all_entry_flags, "all_exit_flags": all_exit_flags, "all_transaction_costs": all_transaction_costs, "fi_df": fi_df, "confusion_matrix_df": pd.concat(fold_confusion_frames + [overall_confusion_df], axis=0, ignore_index=True), "oos_predictions_df": oos_frame.reset_index(drop=True), "oos_trade_events_df": overall_backtest["trade_events_df"].copy(), "oos_trades_df": overall_backtest["trades_df"].copy(), "data_used": data, }