""" signals/generator.py --------------------- Fetch → features → rules filter → Claude confirmation → TradingSignal. Two-phase approach to save API costs: Phase 1 (free): run indicator rules on all symbols, score them Phase 2 (paid): call Claude only on the top scoring ones """ import logging import time from dataclasses import dataclass, field from typing import Dict, List, Optional import numpy as np from config.settings import ( SEQUENCE_LENGTH, MIN_VOLUME_USDT, TIMEFRAME, SYMBOLS, SCAN_TOP_N, SCAN_MIN_CHANGE, SCAN_MAX_CHANGE, SCAN_BLACKLIST, USE_MARKET_SCANNER, ) from data.fetcher import DataFetcher from features.feature_engineer import FeatureEngineer logger = logging.getLogger(__name__) @dataclass class TradingSignal: symbol: str direction: int # +1 LONG, -1 SHORT, 0 HOLD confidence: float # 0–1 entry_price: float = 0.0 atr: float = 0.0 reasoning: str = "" metadata: dict = field(default_factory=dict) @property def side(self) -> str: return {1: "LONG", -1: "SHORT", 0: "HOLD"}[self.direction] class SignalGenerator: def __init__(self, fetcher: DataFetcher, claude_trader=None): self.fetcher = fetcher self.fe = FeatureEngineer() self.claude_trader = claude_trader # cache: symbol → (timestamp, signal) — reuse Claude decisions for 1h self._cache: Dict[str, tuple] = {} self._cache_ttl = 3600 mode = "Claude + rules" if claude_trader else "rules only (free)" logger.info("SignalGenerator ready — %s", mode) def generate(self, symbol: str) -> TradingSignal: hold = TradingSignal(symbol=symbol, direction=0, confidence=0.0) try: ticker = self.fetcher.get_ticker_24h(symbol) vol = float(ticker.get("quoteVolume", 0)) if vol < MIN_VOLUME_USDT: logger.debug("%s: low volume %.0f, skip", symbol, vol) return hold except Exception as e: logger.warning("Volume check failed %s: %s", symbol, e) return hold try: df_raw = self.fetcher.get_klines(symbol, interval=TIMEFRAME) df_feat = self.fe.build(df_raw) except Exception as e: logger.error("Feature build failed %s: %s", symbol, e) return hold if len(df_feat) < SEQUENCE_LENGTH + 10: logger.warning("%s: not enough data (%d rows)", symbol, len(df_feat)) return hold row = df_feat.iloc[-1] atr = float(row.get("atr", 0)) price = float(row["close"]) if self.claude_trader is not None: return self._smart_claude(symbol, df_feat, df_raw, row, atr, price, vol) return self._rules(symbol, row, atr, price, vol) def _claude_call(self, symbol, df_feat, df_raw, row, atr, price, vol): try: dec = self.claude_trader.analyse_full(symbol, df_feat, df_raw) except Exception as e: logger.error("Claude failed %s: %s", symbol, e) return TradingSignal(symbol=symbol, direction=0, confidence=0.0) direction = {"LONG": 1, "SHORT": -1, "HOLD": 0}.get(dec.direction.upper().strip(), 0) return TradingSignal( symbol=symbol, direction=direction, confidence=dec.confidence, entry_price=price, atr=atr, reasoning=dec.reasoning, metadata={ "rsi": float(row.get("rsi", 50)), "macd": float(row.get("macd", 0)), "bb_pct_b": float(row.get("bb_pct_b", 0.5)), "vol_usdt": vol, "key_factors": dec.key_factors, "risk_notes": dec.risk_notes, }, ) def _smart_claude(self, symbol, df_feat, df_raw, row, atr, price, vol): # run free rules first — if they say hold, no need to ask Claude rule_sig = self._rules(symbol, row, atr, price, vol) if rule_sig.direction == 0: return rule_sig # check cache entry = self._cache.get(symbol) if entry: ts, sig = entry if time.time() - ts < self._cache_ttl: logger.debug("[%s] cache hit (age=%.0fs)", symbol, time.time() - ts) return TradingSignal( symbol=sig.symbol, direction=sig.direction, confidence=sig.confidence, entry_price=price, atr=atr, reasoning=sig.reasoning, metadata=sig.metadata, ) logger.info("[%s] rule score ok → calling Claude", symbol) sig = self._claude_call(symbol, df_feat, df_raw, row, atr, price, vol) if sig.direction != 0: self._cache[symbol] = (time.time(), sig) return sig logger.info("[%s] Claude overrides rules → HOLD", symbol) return sig def _rules(self, symbol, row, atr, price, vol) -> TradingSignal: """ Quant-enhanced rule filter. Combines classical TA with research-backed regime filters (ADX, Kaufman ER, z-score entry timing). Scoring max 12pts — need ≥ 8 to generate a signal. Hard gates (immediate HOLD): ADX < 20 → choppy, momentum doesn't work ER < 0.20 → price going nowhere (Kaufman 1995) """ hold = TradingSignal(symbol=symbol, direction=0, confidence=0.0, entry_price=price, atr=atr) rsi = float(row.get("rsi", 50)) macd_h = float(row.get("macd_hist", 0)) macd = float(row.get("macd", 0)) macd_s = float(row.get("macd_signal", 0)) ema9 = float(row.get("ema_9", price)) ema21 = float(row.get("ema_21", price)) ema50 = float(row.get("ema_50", price)) ema200 = float(row.get("ema_200", price)) bb_pct = float(row.get("bb_pct_b", 0.5)) vol_r = float(row.get("volume_ratio", 1.0)) # quant indicators adx = float(row.get("adx", 25.0)) plus_di = float(row.get("plus_di", 25.0)) minus_di = float(row.get("minus_di", 25.0)) er = float(row.get("efficiency_ratio", 0.3)) zscore = float(row.get("zscore_50", 0.0)) # hard gates — regime filters if adx < 20: logger.debug("[%s] ADX=%.1f < 20, choppy market", symbol, adx) return hold if er < 0.20: logger.debug("[%s] ER=%.2f < 0.20, inefficient price action", symbol, er) return hold macd_thresh = price * 0.0001 long_sc = short_sc = 0 # 1. EMA trend stack — 3pts if price > ema9 > ema21 > ema50: long_sc += 3 elif price < ema9 < ema21 < ema50: short_sc += 3 # 2. macro trend (EMA200) — 1pt if price > ema200: long_sc += 1 else: short_sc += 1 # 3. MACD momentum — 2pts if macd_h > macd_thresh and macd > macd_s: long_sc += 2 elif macd_h < -macd_thresh and macd < macd_s: short_sc += 2 # 4. RSI zone — 1pt, penalty if extended if 40 <= rsi <= 55: long_sc += 1 elif 45 <= rsi <= 60: short_sc += 1 if rsi > 68: long_sc -= 2 if rsi < 32: short_sc -= 2 # 5. Bollinger room to move — 1pt, penalty if at extremes if 0.25 <= bb_pct <= 0.70: long_sc += 1 short_sc += 1 if bb_pct > 0.90: long_sc -= 2 if bb_pct < 0.10: short_sc -= 2 # 6. volume confirmation — 1pt if vol_r > 1.4: long_sc += 1 short_sc += 1 # 7. ADX directional (Wilder 1978) — 1pt if plus_di > minus_di: long_sc += 1 else: short_sc += 1 # 8. Efficiency ratio conviction (Kaufman 1995) — 1pt if er > 0.40: long_sc += 1 short_sc += 1 # 9. z-score entry timing — 1pt on pullback, -2pts if chasing if -1.5 <= zscore <= -0.2: long_sc += 1 elif 0.2 <= zscore <= 1.5: short_sc += 1 if zscore > 2.0: long_sc -= 2 if zscore < -2.0: short_sc -= 2 long_sc = max(0, long_sc) short_sc = max(0, short_sc) def make_sig(direction, score): conf = min(0.60 + (score - 8) * 0.06, 0.90) side = "LONG" if direction == 1 else "SHORT" reason = ( f"RSI={rsi:.0f} MACD={macd_h:.5f} " f"EMAs={ema9:.1f}/{ema21:.1f}/{ema50:.1f}/{ema200:.1f} " f"BB={bb_pct:.2f} Vol={vol_r:.2f} " f"ADX={adx:.1f}(+{plus_di:.0f}/-{minus_di:.0f}) " f"ER={er:.2f} Z={zscore:.2f} score={score}/12" ) logger.info("[%s] Rules → %s (%.2f) | %s", symbol, side, conf, reason) return TradingSignal(symbol=symbol, direction=direction, confidence=conf, entry_price=price, atr=atr, reasoning=reason) if long_sc >= 8 and long_sc > short_sc: return make_sig(1, long_sc) if short_sc >= 8 and short_sc > long_sc: return make_sig(-1, short_sc) logger.debug("[%s] HOLD (L=%d S=%d/12 | ADX=%.1f ER=%.2f Z=%.2f)", symbol, long_sc, short_sc, adx, er, zscore) return hold def _fetch_and_score(self, symbol: str): """Fetch data + run free rule scoring. Returns (signal, data_dict).""" hold_result = TradingSignal(symbol=symbol, direction=0, confidence=0.0), {} try: ticker = self.fetcher.get_ticker_24h(symbol) vol = float(ticker.get("quoteVolume", 0)) if vol < MIN_VOLUME_USDT: return hold_result except Exception as e: logger.warning("Volume check failed %s: %s", symbol, e) return hold_result try: df_raw = self.fetcher.get_klines(symbol, interval=TIMEFRAME) df_feat = self.fe.build(df_raw) except Exception as e: logger.error("Feature build failed %s: %s", symbol, e) return hold_result if len(df_feat) < SEQUENCE_LENGTH + 10: return hold_result row = df_feat.iloc[-1] atr = float(row.get("atr", 0)) price = float(row["close"]) sig = self._rules(symbol, row, atr, price, vol) data = {"df_feat": df_feat, "df_raw": df_raw, "row": row, "atr": atr, "price": price, "vol": vol} return sig, data def select_scan_universe(self) -> List[str]: """Full market scan — returns top movers by momentum × volume composite.""" try: tickers = self.fetcher.get_active_usdt_perps() except Exception as e: logger.error("Market scan failed: %s", e) return [] candidates = [] for t in tickers: sym = t["symbol"] pct = abs(t["priceChangePercent"]) vol = t["quoteVolume"] if sym in SCAN_BLACKLIST or vol < MIN_VOLUME_USDT: continue if pct < SCAN_MIN_CHANGE or pct > SCAN_MAX_CHANGE: continue candidates.append({"symbol": sym, "pct": pct, "vol": vol, "dir_pct": t["priceChangePercent"]}) if not candidates: logger.warning("Market scan: 0 candidates") return [] max_vol = max(c["vol"] for c in candidates) or 1.0 for c in candidates: c["score"] = 0.70 * c["pct"] + 0.30 * (c["vol"] / max_vol) * 25 candidates.sort(key=lambda c: c["score"], reverse=True) top = candidates[:SCAN_TOP_N] logger.info("Market scan: %d → top %d | best=%s(%.1f%%) worst=%s(%.1f%%)", len(candidates), len(top), top[0]["symbol"], top[0]["dir_pct"], top[-1]["symbol"], top[-1]["dir_pct"]) return [c["symbol"] for c in top] def scan_and_rank(self) -> List[TradingSignal]: syms = SYMBOLS if not USE_MARKET_SCANNER else (self.select_scan_universe() or SYMBOLS) if not USE_MARKET_SCANNER: logger.info("Scanning %d approved pairs", len(syms)) return self.rank_symbols(syms) def rank_symbols(self, symbols: List[str]) -> List[TradingSignal]: """ Two-phase: free rules on all symbols, Claude only on top 2 cache-misses. Keeps Claude spend to ~2 calls/cycle max. """ MAX_CLAUDE = 2 if self.claude_trader is None: results = [] for sym in symbols: try: sig = self.generate(sym) if sig.direction != 0: results.append(sig) except Exception as e: logger.error("%s error: %s", sym, e) results.sort(key=lambda s: s.confidence, reverse=True) return results # phase 1: cache hits + rule scoring cached = [] candidates = [] # (sym, conf, rule_sig, data) for sym in symbols: try: entry = self._cache.get(sym) if entry: ts, sig = entry if time.time() - ts < self._cache_ttl: if sig.direction != 0: cached.append(sig) logger.debug("[%s] cache hit (%.0fs old)", sym, time.time() - ts) continue rule_sig, data = self._fetch_and_score(sym) if rule_sig.direction != 0: candidates.append((sym, rule_sig.confidence, rule_sig, data)) except Exception as e: logger.error("Phase-1 %s: %s", sym, e) candidates.sort(key=lambda x: x[1], reverse=True) logger.info("Phase-1: %d cached, %d rule candidates (Claude budget=%d)", len(cached), len(candidates), MAX_CLAUDE) # phase 2: Claude confirmation on best candidates confirmed = [] calls = 0 for sym, _, rule_sig, data in candidates: if calls < MAX_CLAUDE: try: logger.info("[%s] Claude call %d/%d", sym, calls + 1, MAX_CLAUDE) sig = self._claude_call( sym, data["df_feat"], data["df_raw"], data["row"], data["atr"], data["price"], data["vol"] ) calls += 1 if sig.direction != 0: self._cache[sym] = (time.time(), sig) confirmed.append(sig) else: logger.info("[%s] Claude → HOLD (overrides rules)", sym) except Exception as e: logger.error("Claude failed %s: %s — using rule signal", sym, e) confirmed.append(rule_sig) calls += 1 else: # budget used — rule signal is still actionable confirmed.append(rule_sig) results = cached + confirmed results.sort(key=lambda s: s.confidence, reverse=True) return results