"""CHEETAH — Shared config, MCP helpers, state I/O. v7.0.0: senpi_runtime_helpers migration. mcporter_call now routes through SenpiClient.mcp_call() (direct HTTPS) instead of mcporter subprocess. _wrapper_client is exposed for the producer's signal push (cfg._wrapper_client.push_signal(...)). All other helpers (atomic_write, load_config, get_wallet_and_strategy, get_positions, output, log, now_*) preserved verbatim. """ # Copyright 2026 Senpi (https://senpi.ai) # Licensed under MIT # Source: https://github.com/Senpi-ai/senpi-skills import functools import json import os import sys import tempfile import time from datetime import datetime, timezone from pathlib import Path WORKSPACE = os.environ.get("OPENCLAW_WORKSPACE", "/data/workspace") SKILL_DIR = Path(WORKSPACE) / "skills" / "cheetah-strategy" CONFIG_PATH = SKILL_DIR / "config" / "cheetah-config.json" STATE_DIR = SKILL_DIR / "state" STATE_DIR.mkdir(parents=True, exist_ok=True) # ─── senpi_runtime_helpers (lazy + auth-validated) ─── # The wrapper is mounted via sys.path at module load, but SenpiClient # construction is DEFERRED until first attribute access. Two reasons: # 1. Importing cheetah_config from a test, REPL, or sibling helper # shouldn't instantiate a network client or write to stderr. # 2. SENPI_AUTH_TOKEN is validated explicitly on first use — a # missing token raises loudly here instead of silently producing # a 401 on the first MCP call. # Pattern ported verbatim from pangolin/scripts/pangolin_config.py. # senpi_runtime_helpers ships inside the senpi-trading-runtime skill. # Global skills install under ~/.openclaw/skills/ on standard hosts # (e.g. /data/.openclaw/skills/ on Railway). Some setups install user # skills under ${OPENCLAW_WORKSPACE}/skills/. Probe both in order. _sdk_candidates = [ str(Path.home() / ".openclaw" / "skills" / "senpi-trading-runtime"), str(Path(os.environ.get("OPENCLAW_WORKSPACE", "/data/workspace")) / "skills" / "senpi-trading-runtime"), ] _sdk_path = next( (p for p in _sdk_candidates if (Path(p) / "senpi_runtime_helpers").is_dir()), _sdk_candidates[0], ) if _sdk_path not in sys.path: sys.path.insert(0, _sdk_path) from senpi_runtime_helpers import SenpiClient, log_event # type: ignore # noqa: E402 @functools.lru_cache(maxsize=1) def _get_wrapper_client() -> SenpiClient: """Lazy SenpiClient accessor with explicit auth validation.""" if not os.environ.get("SENPI_AUTH_TOKEN", "").strip(): raise RuntimeError( "SENPI_AUTH_TOKEN is not set. Cheetah's MCP calls and signal " "POST both require it. Set it on the runtime host (e.g. as a " "Railway service variable) before starting the producer." ) client = SenpiClient() log_event("cheetah_wrapper_enabled", sdk_path=_sdk_path) return client class _WrapperClientProxy: """Module-level attribute that defers SenpiClient construction until first attribute access. Preserves the legacy `cfg._wrapper_client.` call shape used by the producer without forcing eager instantiation at import time.""" def __getattr__(self, name: str): return getattr(_get_wrapper_client(), name) _wrapper_client = _WrapperClientProxy() # ─── Atomic Write ──────────────────────────────────────────── def atomic_write(path, data): """Write JSON atomically via tmp file + os.replace.""" path = str(path) dir_name = os.path.dirname(path) or "." os.makedirs(dir_name, exist_ok=True) fd, tmp_path = tempfile.mkstemp(dir=dir_name, suffix=".tmp") try: with os.fdopen(fd, "w") as f: json.dump(data, f, indent=2, default=str) os.replace(tmp_path, path) except BaseException: try: os.unlink(tmp_path) except OSError: pass raise # ─── Config ────────────────────────────────────────────────── def load_config(): if CONFIG_PATH.exists(): with open(CONFIG_PATH) as f: return json.load(f) return {} def get_wallet_and_strategy(): wallet = os.environ.get("CHEETAH_WALLET", "") strategy_id = os.environ.get("CHEETAH_STRATEGY_ID", "") if not wallet or not strategy_id: config = load_config() wallet = wallet or config.get("wallet", "") strategy_id = strategy_id or config.get("strategyId", "") return wallet, strategy_id # ─── State I/O ─────────────────────────────────────────────── def load_state(filename="state.json"): path = STATE_DIR / filename if path.exists(): try: with open(path) as f: return json.load(f) except (json.JSONDecodeError, IOError): pass return {} def save_state(data, filename="state.json"): atomic_write(str(STATE_DIR / filename), data) # ─── MCP Helpers ───────────────────────────────────────────── def mcporter_call(tool, retries=2, timeout=30, **params): """Call a Senpi MCP tool via the senpi_runtime_helpers wrapper. v7.0.0: routes through SenpiClient.mcp_call() — direct HTTPS, no mcporter subprocess. Returns the unwrapped JSON document on success, or None if the wrapper raised a transport / protocol error. The producer's pre-existing `if not r:` early returns continue to handle the no-data case gracefully. `retries` is preserved as a parameter for backward compatibility with existing call sites but is not implemented here — a transient failure is recovered by the daemon's next tick (5 minutes). """ try: return _wrapper_client.mcp_call(tool, timeout=timeout, **params) except Exception as e: # noqa: BLE001 — transport / protocol surface sys.stderr.write( f"[senpi_helpers] cheetah_mcp_call_failed tool={tool} " f"err={type(e).__name__}: {e}\n" ) return None def get_positions(wallet=None): """Returns (account_value, positions_list).""" if not wallet: wallet, _ = get_wallet_and_strategy() if not wallet: return 0, [] ch = mcporter_call("strategy_get_clearinghouse_state", strategy_wallet=wallet) if not ch or not isinstance(ch, dict): return 0, [] data = ch.get("data", ch) positions = [] account_value = 0 for section in ("main", "xyz"): s = data.get(section, {}) if not isinstance(s, dict): continue ms = s.get("marginSummary", {}) account_value += float(ms.get("accountValue", 0)) for ap in s.get("assetPositions", []): pos = ap.get("position", ap) szi = float(pos.get("szi", 0)) if szi == 0: continue positions.append({ "coin": pos.get("coin", ""), "direction": "LONG" if szi > 0 else "SHORT", "szi": szi, "size": abs(szi), "margin": float(pos.get("marginUsed", 0)), "entryPrice": float(pos.get("entryPx", 0)), "markPrice": float(pos.get("markPx", 0)), "leverage": float( pos.get("leverage", {}).get("value", 5) if isinstance(pos.get("leverage"), dict) else pos.get("leverage", 5) ), "upnl": float(pos.get("unrealizedPnl", 0)), }) return account_value, positions def output(data): print(json.dumps(data, default=str)) sys.stdout.flush() def log(msg): print(f"[CHEETAH-APEX] {msg}", file=sys.stderr) def now_ts(): return time.time() def now_iso(): return datetime.now(timezone.utc).isoformat() def now_date(): return datetime.now(timezone.utc).strftime("%Y-%m-%d")