from typing import Dict, List, Optional import numpy as np import pandas as pd from tqdm import tqdm from order_book import OrderBook from utils import CLASS_MILD, CLASS_NEGATIVE, CLASS_POSITIVE, safe_div, slope_from_levels EPS = 1e-12 def _entropy_and_hhi(levels) -> tuple[float, float]: sizes = np.asarray([max(float(q), 0.0) for _, q in levels], dtype=float) total = sizes.sum() if total <= 0.0: return 0.0, 0.0 probs = sizes / total entropy = -np.sum(probs * np.log(probs + EPS)) hhi = np.sum(probs**2) return float(entropy), float(hhi) def load_and_prepare_csv(csv_path: str) -> pd.DataFrame: df = pd.read_csv(csv_path) mem_mb = df.memory_usage(deep=True).sum() / (1024**2) print(f"[load_and_prepare_csv] raw rows={len(df):,}, cols={df.shape[1]}, memory={mem_mb:,.2f} MB") df["action"] = df["action"].astype(str).str.upper().str.strip() df["side"] = df["side"].astype(str).str.upper().str.strip() df["price"] = pd.to_numeric(df["price"], errors="coerce") df["amount"] = pd.to_numeric(df["amount"], errors="coerce") df["remaining"] = pd.to_numeric(df["remaining"], errors="coerce") df["timestampms"] = pd.to_numeric(df["ts"], errors="coerce") df["seq"] = pd.to_numeric(df["seq"], errors="coerce") first_valid_ts = df["timestampms"].dropna().min() is_initial_snapshot = df["action"].eq("INITIAL_SNAPSHOT") df.loc[is_initial_snapshot & df["timestampms"].isna(), "timestampms"] = first_valid_ts df = df.dropna(subset=["timestampms", "seq", "action", "side", "price", "amount"]) df["timestampms"] = df["timestampms"].astype(np.int64) df["seq"] = df["seq"].astype(np.int64) df["action_priority"] = np.where(df["action"].eq("INITIAL_SNAPSHOT"), -1, 0) df = df.sort_values(["timestampms", "action_priority", "seq"]).reset_index(drop=True) return df.drop(columns=["action_priority"]) def process_event(book: OrderBook, action: str, side: str, price: float, remaining): if action in ("INITIAL", "INITIAL_SNAPSHOT", "PLACE", "CANCEL", "FILL_UPDATE", "OTHER_CHANGE"): if side in ("BID", "ASK") and remaining is not None and not pd.isna(remaining): book.update_level(side, price, remaining) book.prune_crossed_book(side) def compute_book_features(book: OrderBook, top_k: int = 5): best_bid = book.best_bid() best_ask = book.best_ask() if best_bid is None or best_ask is None or best_ask <= best_bid: return None mid = (best_bid + best_ask) / 2.0 spread = best_ask - best_bid bid_levels = book.topk("BID", top_k) ask_levels = book.topk("ASK", top_k) bid_depth_1 = bid_levels[0][1] if bid_levels else 0.0 ask_depth_1 = ask_levels[0][1] if ask_levels else 0.0 bid_depth_5 = sum(q for _, q in bid_levels) ask_depth_5 = sum(q for _, q in ask_levels) imbalance_1 = safe_div(bid_depth_1 - ask_depth_1, bid_depth_1 + ask_depth_1) imbalance_5 = safe_div(bid_depth_5 - ask_depth_5, bid_depth_5 + ask_depth_5) microprice = safe_div(best_ask * bid_depth_1 + best_bid * ask_depth_1, bid_depth_1 + ask_depth_1) bid_slope_5 = slope_from_levels(bid_levels) ask_slope_5 = slope_from_levels(ask_levels) bid_entropy_5, bid_hhi_5 = _entropy_and_hhi(bid_levels) ask_entropy_5, ask_hhi_5 = _entropy_and_hhi(ask_levels) combined_levels = [(f"B{idx}", q) for idx, (_, q) in enumerate(bid_levels)] + [ (f"A{idx}", q) for idx, (_, q) in enumerate(ask_levels) ] depth_entropy_10, depth_hhi_10 = _entropy_and_hhi(combined_levels) return { "best_bid": best_bid, "best_ask": best_ask, "mid": mid, "spread": spread, "microprice": microprice, "bid_depth_1": bid_depth_1, "ask_depth_1": ask_depth_1, "bid_depth_5": bid_depth_5, "ask_depth_5": ask_depth_5, "imbalance_1": imbalance_1, "imbalance_5": imbalance_5, "bid_slope_5": bid_slope_5, "ask_slope_5": ask_slope_5, "bid_entropy_5": bid_entropy_5, "ask_entropy_5": ask_entropy_5, "bid_hhi_5": bid_hhi_5, "ask_hhi_5": ask_hhi_5, "depth_entropy_10": depth_entropy_10, "depth_hhi_10": depth_hhi_10, "active_quote_volume_10": bid_depth_5 + ask_depth_5, "depth_pressure_5": float(np.log((bid_depth_5 + EPS) / (ask_depth_5 + EPS))), } def _new_second_state() -> Dict[str, object]: return { "event_count": 0, "trade_count": 0, "place_count": 0, "cancel_count": 0, "fill_count": 0, "quote_update_count": 0, "trade_buy_vol_1s": 0.0, "trade_sell_vol_1s": 0.0, "place_bid_vol_1s": 0.0, "place_ask_vol_1s": 0.0, "cancel_bid_vol_1s": 0.0, "cancel_ask_vol_1s": 0.0, "fill_bid_vol_1s": 0.0, "fill_ask_vol_1s": 0.0, "mid_values": [], "spread_values": [], "microprice_values": [], "last_book_feats": None, "last_seq": np.nan, } def _summarize_second_path(values: List[float], fallback: float) -> dict: if not values: return { "open": fallback, "high": fallback, "low": fallback, "close": fallback, "rv": 0.0, "count": 0.0, } arr = np.asarray(values, dtype=float) if len(arr) <= 1: rv = 0.0 else: safe_arr = np.clip(arr, EPS, None) rv = float(np.sum(np.diff(np.log(safe_arr)) ** 2)) return { "open": float(arr[0]), "high": float(np.max(arr)), "low": float(np.min(arr)), "close": float(arr[-1]), "rv": rv, "count": float(len(arr)), } def _finalize_second_row( second_ts: int, state: Dict[str, object], last_snapshot: Optional[dict], ): snapshot = state["last_book_feats"] if state["last_book_feats"] is not None else last_snapshot if snapshot is None: return None, last_snapshot mid_summary = _summarize_second_path(state["mid_values"], snapshot["mid"]) spread_summary = _summarize_second_path(state["spread_values"], snapshot["spread"]) microprice_summary = _summarize_second_path(state["microprice_values"], snapshot["microprice"]) row = { "timestampms": int(second_ts), "seq": state["last_seq"] if pd.notna(state["last_seq"]) else np.nan, **snapshot, "second_open_mid": mid_summary["open"], "second_high_mid": mid_summary["high"], "second_low_mid": mid_summary["low"], "second_close_mid": mid_summary["close"], "second_open_spread": spread_summary["open"], "second_close_spread": spread_summary["close"], "second_mean_spread": float(np.mean(state["spread_values"])) if state["spread_values"] else snapshot["spread"], "second_mean_microprice": float(np.mean(state["microprice_values"])) if state["microprice_values"] else snapshot["microprice"], "second_mid_range": mid_summary["high"] - mid_summary["low"], "second_log_rv_event": mid_summary["rv"], "second_spread_rv_event": spread_summary["rv"], "second_microprice_rv_event": microprice_summary["rv"], "quote_observation_count_1s": mid_summary["count"], "event_count_1s": float(state["event_count"]), "trade_count_1s": float(state["trade_count"]), "place_count_1s": float(state["place_count"]), "cancel_count_1s": float(state["cancel_count"]), "fill_count_1s": float(state["fill_count"]), "quote_update_count_1s": float(state["quote_update_count"]), "trade_buy_vol_1s": float(state["trade_buy_vol_1s"]), "trade_sell_vol_1s": float(state["trade_sell_vol_1s"]), "trade_volume_1s": float(state["trade_buy_vol_1s"] + state["trade_sell_vol_1s"]), "trade_imbalance_1s": safe_div( state["trade_buy_vol_1s"] - state["trade_sell_vol_1s"], state["trade_buy_vol_1s"] + state["trade_sell_vol_1s"], ), "place_bid_vol_1s": float(state["place_bid_vol_1s"]), "place_ask_vol_1s": float(state["place_ask_vol_1s"]), "cancel_bid_vol_1s": float(state["cancel_bid_vol_1s"]), "cancel_ask_vol_1s": float(state["cancel_ask_vol_1s"]), "fill_bid_vol_1s": float(state["fill_bid_vol_1s"]), "fill_ask_vol_1s": float(state["fill_ask_vol_1s"]), } row["net_passive_flow_1s"] = ( row["place_bid_vol_1s"] - row["cancel_bid_vol_1s"] - row["fill_bid_vol_1s"] - row["place_ask_vol_1s"] + row["cancel_ask_vol_1s"] + row["fill_ask_vol_1s"] ) row["signed_trade_flow_1s"] = row["trade_buy_vol_1s"] - row["trade_sell_vol_1s"] row["net_order_flow_1s"] = row["net_passive_flow_1s"] + row["signed_trade_flow_1s"] row["microprice_minus_mid"] = row["microprice"] - row["mid"] return row, snapshot def _add_rolling_feature_block(df: pd.DataFrame, window: int): min_periods = max(5, window // 5) abs_ret_sum = df["abs_log_mid_ret_1s"].rolling(window, min_periods=min_periods).sum() df[f"log_mid_ret_{window}s"] = df["log_mid"].diff(window) df[f"mid_ret_{window}s"] = df["mid"].pct_change(window) df[f"rv_{window}s"] = np.sqrt(df["log_mid_ret_1s"].pow(2).rolling(window, min_periods=min_periods).sum()) df[f"upside_rv_{window}s"] = np.sqrt( df["log_mid_ret_1s"].clip(lower=0.0).pow(2).rolling(window, min_periods=min_periods).sum() ) df[f"downside_rv_{window}s"] = np.sqrt( df["log_mid_ret_1s"].clip(upper=0.0).pow(2).rolling(window, min_periods=min_periods).sum() ) df[f"vol_of_vol_{window}s"] = df["abs_log_mid_ret_1s"].rolling(window, min_periods=min_periods).std() df[f"spread_bps_mean_{window}s"] = df["spread_bps"].rolling(window, min_periods=min_periods).mean() df[f"spread_bps_std_{window}s"] = df["spread_bps"].rolling(window, min_periods=min_periods).std() df[f"active_quote_volume_mean_{window}s"] = df["active_quote_volume_10"].rolling(window, min_periods=min_periods).mean() df[f"depth_entropy_mean_{window}s"] = df["depth_entropy_10"].rolling(window, min_periods=min_periods).mean() df[f"depth_pressure_mean_{window}s"] = df["depth_pressure_5"].rolling(window, min_periods=min_periods).mean() df[f"trade_volume_sum_{window}s"] = df["trade_volume_1s"].rolling(window, min_periods=min_periods).sum() df[f"trade_count_sum_{window}s"] = df["trade_count_1s"].rolling(window, min_periods=min_periods).sum() df[f"quote_update_sum_{window}s"] = df["quote_update_count_1s"].rolling(window, min_periods=min_periods).sum() df[f"net_order_flow_sum_{window}s"] = df["net_order_flow_log_1s"].rolling(window, min_periods=min_periods).sum() df[f"net_order_flow_std_{window}s"] = df["net_order_flow_log_1s"].rolling(window, min_periods=min_periods).std() df[f"trade_imbalance_mean_{window}s"] = df["trade_imbalance_1s"].rolling(window, min_periods=min_periods).mean() df[f"trade_imbalance_abs_mean_{window}s"] = df["trade_imbalance_1s"].abs().rolling(window, min_periods=min_periods).mean() df[f"imbalance_1_mean_{window}s"] = df["imbalance_1"].rolling(window, min_periods=min_periods).mean() df[f"imbalance_5_mean_{window}s"] = df["imbalance_5"].rolling(window, min_periods=min_periods).mean() df[f"microprice_premium_mean_{window}s"] = df["microprice_premium_bps"].rolling(window, min_periods=min_periods).mean() df[f"price_efficiency_{window}s"] = safe_div(df["log_mid"].diff(window).abs(), abs_ret_sum) df[f"up_move_share_{window}s"] = (df["log_mid_ret_1s"] > 0.0).rolling(window, min_periods=min_periods).mean() df[f"down_move_share_{window}s"] = (df["log_mid_ret_1s"] < 0.0).rolling(window, min_periods=min_periods).mean() df[f"event_rv_mean_{window}s"] = df["second_log_rv_event"].rolling(window, min_periods=min_periods).mean() lagged_flow = df["net_order_flow_log_1s"].shift(1) lagged_imbalance = df["imbalance_5"].shift(1) lagged_ret = df["log_mid_ret_1s"].shift(1) df[f"ofi_ret_corr_{window}s"] = lagged_flow.rolling(window, min_periods=min_periods).corr(df["log_mid_ret_1s"]) df[f"imbalance_ret_corr_{window}s"] = lagged_imbalance.rolling(window, min_periods=min_periods).corr(df["log_mid_ret_1s"]) df[f"ret_autocorr_{window}s"] = lagged_ret.rolling(window, min_periods=min_periods).corr(df["log_mid_ret_1s"]) df[f"ofi_sign_agreement_{window}s"] = ( ((np.sign(lagged_flow) == np.sign(df["log_mid_ret_1s"])) & (np.sign(lagged_flow) != 0)).astype(float) ).rolling(window, min_periods=min_periods).mean() def _build_minute_level_features(second_df: pd.DataFrame) -> pd.DataFrame: df = second_df.copy() df["log_mid"] = np.log(np.clip(df["mid"], EPS, None)) df["log_mid_ret_1s"] = df["log_mid"].diff() df["abs_log_mid_ret_1s"] = df["log_mid_ret_1s"].abs() df["spread_bps"] = 1e4 * safe_div(df["spread"], df["mid"]) df["microprice_premium_bps"] = 1e4 * safe_div(df["microprice_minus_mid"], df["mid"]) df["second_mid_range_bps"] = 1e4 * safe_div(df["second_mid_range"], df["mid"]) df["depth_ratio_5"] = safe_div(df["bid_depth_5"], df["ask_depth_5"]) df["net_order_flow_log_1s"] = np.sign(df["net_order_flow_1s"]) * np.log1p(np.abs(df["net_order_flow_1s"])) df["passive_flow_ratio_1s"] = safe_div(df["net_passive_flow_1s"], df["active_quote_volume_10"]) df["trade_buy_share_1s"] = safe_div(df["trade_buy_vol_1s"], df["trade_volume_1s"]) df["fill_to_trade_ratio_1s"] = safe_div(df["fill_bid_vol_1s"] + df["fill_ask_vol_1s"], df["trade_volume_1s"]) df["cancel_to_place_ratio_1s"] = safe_div( df["cancel_bid_vol_1s"] + df["cancel_ask_vol_1s"], df["place_bid_vol_1s"] + df["place_ask_vol_1s"], ) rolling_windows = [15, 60, 300] for window in rolling_windows: _add_rolling_feature_block(df, window) df["ewm_rv_60s"] = np.sqrt( df["log_mid_ret_1s"].pow(2).ewm(halflife=60, adjust=False, min_periods=30).mean() ) df["ewm_rv_300s"] = np.sqrt( df["log_mid_ret_1s"].pow(2).ewm(halflife=300, adjust=False, min_periods=60).mean() ) df["ewm_net_order_flow_60s"] = df["net_order_flow_log_1s"].ewm(halflife=60, adjust=False, min_periods=30).mean() df["ewm_net_order_flow_300s"] = df["net_order_flow_log_1s"].ewm(halflife=300, adjust=False, min_periods=60).mean() df["ewm_spread_bps_60s"] = df["spread_bps"].ewm(halflife=60, adjust=False, min_periods=30).mean() df["ewm_trade_imbalance_60s"] = df["trade_imbalance_1s"].ewm(halflife=60, adjust=False, min_periods=30).mean() df["ewm_trade_volume_300s"] = df["trade_volume_1s"].ewm(halflife=300, adjust=False, min_periods=60).mean() df["vol_term_structure"] = safe_div(df["ewm_rv_60s"], df["ewm_rv_300s"]) bipower_60 = (np.pi / 2.0) * ( df["abs_log_mid_ret_1s"] * df["abs_log_mid_ret_1s"].shift(1) ).rolling(60, min_periods=15).sum() df["jump_ratio_60s"] = safe_div(df["rv_60s"].pow(2), bipower_60) df["spread_zscore_60s"] = safe_div( df["spread_bps"] - df["spread_bps_mean_60s"], df["spread_bps_std_60s"], ) seconds_of_day = ((df["timestampms"] // 1000) % 86400).astype(float) df["tod_sin_daily"] = np.sin(2.0 * np.pi * seconds_of_day / 86400.0) df["tod_cos_daily"] = np.cos(2.0 * np.pi * seconds_of_day / 86400.0) df["tod_sin_hourly"] = np.sin(2.0 * np.pi * seconds_of_day / 3600.0) df["tod_cos_hourly"] = np.cos(2.0 * np.pi * seconds_of_day / 3600.0) seconds_in_5m = seconds_of_day % 300.0 seconds_in_15m = seconds_of_day % 900.0 df["phase_sin_5m"] = np.sin(2.0 * np.pi * seconds_in_5m / 300.0) df["phase_cos_5m"] = np.cos(2.0 * np.pi * seconds_in_5m / 300.0) df["phase_sin_15m"] = np.sin(2.0 * np.pi * seconds_in_15m / 900.0) df["phase_cos_15m"] = np.cos(2.0 * np.pi * seconds_in_15m / 900.0) df["is_5m_boundary"] = (seconds_in_5m == 0.0).astype(float) df["is_15m_boundary"] = (seconds_in_15m == 0.0).astype(float) return df def build_features_from_csv( csv_path: str, sample_every_n_events: int = 1, top_k: int = 5, window_ms: int = 1000, ) -> pd.DataFrame: del sample_every_n_events df = load_and_prepare_csv(csv_path) book = OrderBook() rows: List[dict] = [] last_snapshot = None state = _new_second_state() current_second: Optional[int] = None for r in tqdm(df.itertuples(index=False), total=len(df), desc="Aggregating to 1s ticks"): ts = int(getattr(r, "timestampms")) second_ts = (ts // window_ms) * window_ms if current_second is None: current_second = second_ts while current_second < second_ts: row, last_snapshot = _finalize_second_row(current_second, state, last_snapshot) if row is not None: rows.append(row) state = _new_second_state() current_second += window_ms action = getattr(r, "action") side = getattr(r, "side") price = float(getattr(r, "price")) amount = float(getattr(r, "amount")) rem = getattr(r, "remaining") remaining = None if pd.isna(rem) else float(rem) process_event(book, action, side, price, remaining) state["event_count"] += 1 state["last_seq"] = int(getattr(r, "seq")) if action == "TRADE": state["trade_count"] += 1 if side == "BUY": state["trade_buy_vol_1s"] += amount elif side == "SELL": state["trade_sell_vol_1s"] += amount elif action == "PLACE": state["place_count"] += 1 if side == "BID": state["place_bid_vol_1s"] += amount elif side == "ASK": state["place_ask_vol_1s"] += amount elif action == "CANCEL": state["cancel_count"] += 1 if side == "BID": state["cancel_bid_vol_1s"] += amount elif side == "ASK": state["cancel_ask_vol_1s"] += amount elif action == "FILL_UPDATE": state["fill_count"] += 1 if side == "BID": state["fill_bid_vol_1s"] += amount elif side == "ASK": state["fill_ask_vol_1s"] += amount book_feats = compute_book_features(book, top_k=top_k) if book_feats is not None: if action in ("INITIAL", "INITIAL_SNAPSHOT", "PLACE", "CANCEL", "FILL_UPDATE", "OTHER_CHANGE"): state["quote_update_count"] += 1 state["mid_values"].append(book_feats["mid"]) state["spread_values"].append(book_feats["spread"]) state["microprice_values"].append(book_feats["microprice"]) state["last_book_feats"] = book_feats last_snapshot = book_feats if current_second is not None: row, last_snapshot = _finalize_second_row(current_second, state, last_snapshot) if row is not None: rows.append(row) feat_df = pd.DataFrame(rows) if feat_df.empty: return feat_df feat_df = feat_df.sort_values("timestampms").reset_index(drop=True) feat_df["seq"] = feat_df["seq"].ffill().fillna(0).astype(np.int64) feat_df = _build_minute_level_features(feat_df) return feat_df def add_targets( feat_df: pd.DataFrame, horizon: int = 20, mild_return_threshold: float = 1e-4, transaction_fee_rate: float = 0.0, cost_aware_label_threshold_multiplier: float = 1.0, ) -> pd.DataFrame: df = feat_df.copy() effective_mild_threshold = max( float(mild_return_threshold), float(transaction_fee_rate) * float(cost_aware_label_threshold_multiplier), ) df["mid_next"] = df["mid"].shift(-1) df["step_ret_1"] = (df["mid_next"] - df["mid"]) / df["mid"] df["mid_future"] = df["mid"].shift(-horizon) df["target_ret"] = (df["mid_future"] - df["mid"]) / df["mid"] df["target_delta"] = df["mid_future"] - df["mid"] df["target_dir"] = np.sign(df["target_delta"]).astype(float) df["target_class"] = np.nan df.loc[df["target_ret"] < -effective_mild_threshold, "target_class"] = CLASS_NEGATIVE df.loc[df["target_ret"].abs() <= effective_mild_threshold, "target_class"] = CLASS_MILD df.loc[df["target_ret"] > effective_mild_threshold, "target_class"] = CLASS_POSITIVE df["target_label"] = df["target_class"].map({ CLASS_NEGATIVE: "negative", CLASS_MILD: "mild", CLASS_POSITIVE: "positive", }) df["mild_return_threshold_used"] = effective_mild_threshold return df def get_feature_columns(data: pd.DataFrame): exclude_cols = { "timestampms", "seq", "log_mid", "mid_next", "step_ret_1", "mid_future", "target_ret", "target_delta", "target_dir", "target_class", "target_label", "mild_return_threshold_used", } return [c for c in data.columns if c not in exclude_cols]