""" features/indicators.py ----------------------- Pure-function TA indicator calculations on top of pandas. Each function returns a Series or DataFrame without modifying the input. """ import numpy as np import pandas as pd from config.settings import ( RSI_PERIOD, MACD_FAST, MACD_SLOW, MACD_SIGNAL_P, BB_PERIOD, BB_STD, ATR_PERIOD, EMA_PERIODS, VOLUME_MA_PERIOD, ) class TechnicalIndicators: @staticmethod def ema(series: pd.Series, period: int) -> pd.Series: return series.ewm(span=period, adjust=False).mean() @staticmethod def sma(series: pd.Series, period: int) -> pd.Series: return series.rolling(window=period).mean() @staticmethod def macd(close: pd.Series, fast=MACD_FAST, slow=MACD_SLOW, signal=MACD_SIGNAL_P) -> pd.DataFrame: ema_f = close.ewm(span=fast, adjust=False).mean() ema_s = close.ewm(span=slow, adjust=False).mean() line = ema_f - ema_s sig = line.ewm(span=signal, adjust=False).mean() return pd.DataFrame({"macd": line, "macd_signal": sig, "macd_hist": line - sig}) @staticmethod def rsi(close: pd.Series, period=RSI_PERIOD) -> pd.Series: delta = close.diff() gain = delta.clip(lower=0).ewm(com=period - 1, adjust=False).mean() loss = (-delta).clip(lower=0).ewm(com=period - 1, adjust=False).mean() rs = gain / loss.replace(0, np.nan) return 100 - (100 / (1 + rs)) @staticmethod def stochastic(high: pd.Series, low: pd.Series, close: pd.Series, k_period=14, d_period=3) -> pd.DataFrame: ll = low.rolling(k_period).min() hh = high.rolling(k_period).max() k = 100 * (close - ll) / (hh - ll + 1e-9) return pd.DataFrame({"stoch_k": k, "stoch_d": k.rolling(d_period).mean()}) @staticmethod def bollinger_bands(close: pd.Series, period=BB_PERIOD, n_std=BB_STD) -> pd.DataFrame: mid = close.rolling(period).mean() std = close.rolling(period).std() upper = mid + n_std * std lower = mid - n_std * std return pd.DataFrame({ "bb_upper": upper, "bb_mid": mid, "bb_lower": lower, "bb_pct_b": (close - lower) / (upper - lower + 1e-9), "bb_bandwidth": (upper - lower) / (mid + 1e-9), }) @staticmethod def atr(high: pd.Series, low: pd.Series, close: pd.Series, period=ATR_PERIOD) -> pd.Series: pc = close.shift(1) tr = pd.concat([high - low, (high - pc).abs(), (low - pc).abs()], axis=1).max(axis=1) return tr.ewm(com=period - 1, adjust=False).mean() @staticmethod def keltner_channel(high, low, close, ema_period=20, atr_mult=1.5) -> pd.DataFrame: mid = TechnicalIndicators.ema(close, ema_period) a = TechnicalIndicators.atr(high, low, close, ema_period) return pd.DataFrame({"kc_upper": mid + atr_mult*a, "kc_mid": mid, "kc_lower": mid - atr_mult*a}) @staticmethod def obv(close: pd.Series, volume: pd.Series) -> pd.Series: return (np.sign(close.diff()).fillna(0) * volume).cumsum().rename("obv") @staticmethod def vwap(high, low, close, volume) -> pd.Series: tp = (high + low + close) / 3 return (tp * volume).cumsum() / volume.cumsum() @staticmethod def volume_ma(volume: pd.Series, period=VOLUME_MA_PERIOD) -> pd.Series: return volume.rolling(period).mean() @staticmethod def volume_ratio(volume: pd.Series, period=VOLUME_MA_PERIOD) -> pd.Series: return volume / (volume.rolling(period).mean() + 1e-9) @staticmethod def price_change(close: pd.Series, periods=1) -> pd.Series: return close.pct_change(periods) @staticmethod def higher_high_lower_low(high, low, window=5) -> pd.DataFrame: hh = (high > high.rolling(window).max().shift(1)).astype(int) ll = (low < low.rolling(window).min().shift(1)).astype(int) return pd.DataFrame({"hh": hh, "ll": ll}) # --- quant / regime indicators --- @staticmethod def adx(high: pd.Series, low: pd.Series, close: pd.Series, period=14) -> pd.DataFrame: """ Average Directional Index (Wilder 1978). ADX > 25 = trending; ADX < 20 = ranging/choppy. """ pc = close.shift(1) tr = pd.concat([high - low, (high - pc).abs(), (low - pc).abs()], axis=1).max(axis=1) up = high - high.shift(1) down = low.shift(1) - low plus_dm = up.where((up > down) & (up > 0), 0.0) minus_dm = down.where((down > up) & (down > 0), 0.0) smooth = tr.ewm(com=period - 1, adjust=False).mean() plus_di = 100 * plus_dm.ewm(com=period - 1, adjust=False).mean() / (smooth + 1e-9) minus_di = 100 * minus_dm.ewm(com=period - 1, adjust=False).mean() / (smooth + 1e-9) dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-9) adx = dx.ewm(com=period - 1, adjust=False).mean() return pd.DataFrame({"adx": adx, "plus_di": plus_di, "minus_di": minus_di}) @staticmethod def efficiency_ratio(close: pd.Series, window=20) -> pd.Series: """ Kaufman's Efficiency Ratio (Kaufman 1995). ER = |net N-period move| / sum(|each bar's move|) ER → 1.0 = perfectly trending; ER → 0.0 = choppy/going nowhere. """ direction = close.diff(window).abs() noise = close.diff().abs().rolling(window).sum() return direction / (noise + 1e-9) @staticmethod def zscore(series: pd.Series, window=50) -> pd.Series: """Rolling z-score. Use for entry timing: enter on pullbacks not extensions.""" m = series.rolling(window).mean() s = series.rolling(window).std() return (series - m) / (s + 1e-9) @staticmethod def squeeze_momentum(high, low, close) -> pd.Series: """TTM Squeeze — momentum expanding out of BB-inside-KC squeeze.""" bb = TechnicalIndicators.bollinger_bands(close) kc = TechnicalIndicators.keltner_channel(high, low, close) sqz = (bb["bb_lower"] > kc["kc_lower"]) & (bb["bb_upper"] < kc["kc_upper"]) mid = (high + low) / 2 delta = close - mid.rolling(20).mean() return delta.where(sqz, other=np.nan)