# Agent Implementation Patterns This document provides complete implementation patterns for the three-agent pipeline. ## Overview Our system uses three specialized agents in sequence: 1. **Scout Agent** (Haiku) - Fast filter, 90% rejection rate 2. **Analyst Agent** (Sonnet) - Deep analysis, generates trade thesis 3. **Risk Manager Agent** (Sonnet) - Final approval, position sizing ## Scout Agent (Haiku) ### Purpose Rapid triage of signals to filter obvious noise before expensive analysis. ### Model `claude-3-5-haiku-20241022` ### Tools None - Pure reasoning, no external tool calls ### System Prompt ``` You are a financial market scout specializing in social sentiment analysis. Your job is RAPID TRIAGE. You have 3 seconds to decide if a signal deserves deeper analysis. EVALUATION CRITERIA: 1. Quality of Discourse - Is this genuine analysis or spam? - Are posts from established accounts or bots? - Is there actual DD (due diligence) or just hype? 2. Signal Strength - How many quality mentions (not just volume)? - Is sentiment coherent or contradictory? - Is there cross-platform correlation? 3. Timing & Catalyst - Is there a news catalyst or just random pump? - Does market data align with social sentiment? - Is this early momentum or late FOMO? 4. Red Flags - Pump & dump language ("moon", "to the moon", "buy now") - Coordinated posting (same time, similar language) - New accounts with high activity - Contradictory price action (sentiment up, price down) OUTPUT FORMAT (JSON only, no other text): { "score": <0-10>, "reasoning": "", "decision": "PASS" | "ESCALATE", "red_flags": ["flag1", "flag2"] or [] } Thresholds: - Score 0-5: PASS (noise) - Score 6-10: ESCALATE (worth deeper analysis) BE DECISIVE. Err on the side of filtering. False positives are expensive. ``` ### Input Format ```python input_context = { "ticker": "GME", "signal_type": "momentum_spike", "metrics": { "mentions_15m": 47, "quality_score_avg": 7.2, "sentiment": 0.65, "momentum": 2.5, "cross_platform": True }, "market_data": { "price": 45.67, "change_pct": 2.3, "volume_vs_avg": 1.8 }, "top_post_summary": "User DeepFuckingValue (500K karma, 4yr account): 'GME - Updated DD on short interest...' (15K upvotes, 50 awards)" } ``` ### Expected Output ```json { "score": 8, "reasoning": "High quality author with detailed DD, strong engagement, price action confirms social sentiment", "decision": "ESCALATE", "red_flags": [] } ``` ### Implementation ```python from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions import json class ScoutAgent: def __init__(self): self.options = ClaudeAgentOptions( model="claude-3-5-haiku-20241022", allowed_tools=[], system_prompt="""[System prompt from above]""", max_turns=1 # Single turn only ) self.client = ClaudeSDKClient(self.options) async def triage(self, signal: dict) -> dict: """Rapid triage of signal""" prompt = f""" Analyze this trading signal: Ticker: {signal['ticker']} Signal Type: {signal['signal_type']} Metrics: - 15m mentions: {signal['metrics']['mentions_15m']} - Quality score: {signal['metrics']['quality_score_avg']}/10 - Sentiment: {signal['metrics']['sentiment']} - Momentum: {signal['metrics']['momentum']}x Market Data: - Price: ${signal['market_data']['price']} ({signal['market_data']['change_pct']:+.1f}%) - Volume: {signal['market_data']['volume_vs_avg']}x normal Top Evidence: {signal['top_post_summary']} Provide JSON triage assessment. """ await self.client.connect() await self.client.query(prompt) response_text = "" async for message in self.client.receive_response(): if hasattr(message, 'content'): for block in message.content: if hasattr(block, 'text'): response_text = block.text await self.client.disconnect() # Parse JSON response result = json.loads(response_text) return result ``` ### Token Usage - Input: ~300-500 tokens (condensed context) - Output: ~50-100 tokens (JSON only) - Cost: ~$0.0001 per signal --- ## Analyst Agent (Sonnet) ### Purpose Deep analysis of high-quality signals to generate actionable trade recommendations. ### Model `claude-3-5-sonnet-20241022` ### Tools - `Read` - Access historical analysis data - `Bash` - Run data analysis scripts (optional) ### System Prompt ``` You are a quantitative trading analyst specializing in momentum and sentiment-driven trades. Your expertise: - Social sentiment analysis (Reddit, Twitter) - Technical analysis (price action, volume, options flow) - Risk/reward assessment - Trade structure optimization ANALYSIS FRAMEWORK: 1. Social Sentiment Quality Assessment - WHO is talking? (check author reputation, track record) - WHAT is the thesis? (genuine analysis vs hype) - HOW strong is conviction? (position sizes, time horizon) - WHEN did this start? (early wave vs late FOMO) 2. Market Context Analysis - Price Action: Breakout, consolidation, reversal pattern? - Volume: Is this real volume or algorithmic wash? - Options Flow: Smart money positioning or retail gambling? - Historical Patterns: Similar setups in the past? 3. Catalyst Identification - News catalyst (earnings, FDA approval, product launch)? - Technical catalyst (resistance break, squeeze potential)? - Sentiment catalyst (viral post, influencer mention)? 4. Risk Assessment - Is this early or late in the move? - What are downside catalysts? (earnings miss, dilution, regulation) - Liquidity risk (can we exit easily)? - Correlation risk (other positions in same sector)? 5. Trade Structure Recommendation - Best vehicle: shares, calls, puts, spreads? - Entry strategy: market, limit, scale-in? - Risk parameters: stop loss, position size - Profit targets: take-profit levels, time-based exit OUTPUT FORMAT (JSON only, no other text): { "recommend": true/false, "confidence": <0-100>, "thesis": "<2-3 sentence trade thesis>", "catalyst": "", "risks": ["risk1", "risk2", "risk3"], "trade_structure": { "action": "BUY_CALLS" | "BUY_SHARES" | "SELL_PUTS" | "BUY_CALL_SPREAD", "strike": , "expiry": "", "entry_price_target": , "stop_loss": , "take_profit": , "time_horizon": "intraday" | "swing" | "position" }, "reasoning": "", "similar_historical_signals": "" } If recommend = false, provide rejection reasoning. ``` ### Input Format ```python input_context = { "ticker": "GME", "scout_score": 8, # Aggregated metrics "metrics": { "mentions_15m": 47, "quality_score_avg": 7.2, "sentiment": 0.65, "momentum": 2.5 }, # Top evidence (not all posts, just best) "top_posts": [ { "author": "DeepFuckingValue", "author_karma": 500000, "text": "Full DD text...", "engagement": {"upvotes": 15000, "awards": 50}, "quality": 9.5 }, # ... 4 more top posts ], # Market data "market": { "price": 45.67, "change_pct": 2.3, "volume": "15M", "volume_vs_avg": 1.8, "options_flow": { "unusual_calls": True, "call_put_ratio": 3.2, "top_strikes": [50, 55, 60] }, "technical": { "rsi": 65, "pattern": "ascending triangle" } }, # Historical context "history": { "last_analyzed": "2024-01-15", "past_signals_count": 3, "past_performance": { "avg_return": 0.15, "win_rate": 0.67 }, "current_position": None } } ``` ### Implementation ```python from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions import json class AnalystAgent: def __init__(self): self.options = ClaudeAgentOptions( model="claude-3-5-sonnet-20241022", allowed_tools=["Read"], system_prompt="""[System prompt from above]""", max_turns=3 ) self.client = ClaudeSDKClient(self.options) async def analyze(self, signal: dict, context: dict) -> dict: """Deep analysis of signal""" # Format top posts for readability posts_text = "\n\n".join([ f"Post {i+1}: {post['author']} ({post['author_karma']} karma)\n" f"Quality: {post['quality']}/10\n" f"Engagement: {post['engagement']['upvotes']} upvotes, {post['engagement']['awards']} awards\n" f"Content: {post['text'][:300]}..." for i, post in enumerate(context['top_posts']) ]) prompt = f""" Deep Analysis Request: {signal['ticker']} SIGNAL STRENGTH: Scout approved this signal with score {context['scout_score']}/10 SOCIAL SENTIMENT: 15m mentions: {context['metrics']['mentions_15m']} Quality score: {context['metrics']['quality_score_avg']}/10 Sentiment: {context['metrics']['sentiment']} Momentum: {context['metrics']['momentum']}x TOP EVIDENCE: {posts_text} MARKET DATA: Price: ${context['market']['price']} ({context['market']['change_pct']:+.1f}%) Volume: {context['market']['volume']} ({context['market']['volume_vs_avg']}x avg) Options Activity: {json.dumps(context['market']['options_flow'], indent=2)} Technical: RSI {context['market']['technical']['rsi']}, Pattern: {context['market']['technical']['pattern']} HISTORICAL CONTEXT: {json.dumps(context['history'], indent=2)} Provide comprehensive trade analysis in JSON format. """ await self.client.connect() await self.client.query(prompt) response_text = "" async for message in self.client.receive_response(): if hasattr(message, 'content'): for block in message.content: if hasattr(block, 'text'): response_text += block.text await self.client.disconnect() result = json.loads(response_text) return result ``` ### Token Usage - Input: ~2000-3000 tokens (full context) - Output: ~500-1000 tokens (detailed analysis) - Cost: ~$0.01-0.02 per analysis --- ## Risk Manager Agent (Sonnet) ### Purpose Final approval gate with portfolio-aware position sizing and risk validation. ### Model `claude-3-5-sonnet-20241022` ### Tools - `Read` - Access portfolio state, position data ### System Prompt ``` You are a risk management specialist. You are the final gate before trade execution. RISK RULES (STRICTLY ENFORCED): 1. Maximum position size: 2% of account value per trade 2. Stop loss: Minimum 5% below entry (tighter stops allowed) 3. Risk/Reward ratio: Minimum 1:2 (reward must be 2x risk) 4. Maximum daily loss: 5% of account value 5. Maximum concurrent positions: 5 open trades 6. Sector concentration: Maximum 20% of portfolio in any sector 7. Correlation limits: Avoid highly correlated positions (>0.7) YOUR RESPONSIBILITIES: 1. Calculate appropriate position size based on 2% rule 2. Verify stop loss and R:R ratio meet minimums 3. Check portfolio for: - Existing positions in same ticker - Correlated positions (same sector/theme) - Total exposure by sector 4. Verify sufficient buying power 5. Assess systemic risks (market volatility, geopolitical) 6. Final approve or reject POSITION SIZING FORMULA: ```python account_value = 50000 # example risk_per_trade = account_value * 0.02 # $1000 entry = 100 stop = 95 risk_per_share = entry - stop # $5 position_size = risk_per_trade / risk_per_share # 200 shares = $20,000 ``` OUTPUT FORMAT (JSON only): { "approved": true/false, "position_size_usd": , "position_size_shares": , "risk_amount_usd": , "risk_pct": , "reward_risk_ratio": , "portfolio_impact": { "total_exposure": , "sector_exposure": , "correlation_concern": true/false }, "warnings": ["warning1", "warning2"], "risk_assessment": "", "rejection_reason": "" } NEVER approve trades that violate risk rules. Safety over profits. ``` ### Input Format ```python input_context = { "ticker": "GME", "analysis": { "recommend": True, "confidence": 75, "trade_structure": { "action": "BUY_SHARES", "entry_price_target": 46.00, "stop_loss": 43.50, "take_profit": 52.00 } }, "portfolio": { "total_value": 50000, "cash_available": 25000, "positions": [ {"ticker": "AAPL", "value": 10000, "sector": "Tech"}, {"ticker": "MSFT", "value": 8000, "sector": "Tech"} ], "open_position_count": 2, "daily_pnl": -500, # down $500 today "sector_exposure": { "Tech": 0.36, "Retail": 0.0, "Finance": 0.0 } } } ``` ### Implementation ```python from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions import json class RiskManagerAgent: def __init__(self): self.options = ClaudeAgentOptions( model="claude-3-5-sonnet-20241022", allowed_tools=["Read"], system_prompt="""[System prompt from above]""", max_turns=2 ) self.client = ClaudeSDKClient(self.options) async def evaluate(self, analysis: dict, portfolio: dict) -> dict: """Final risk check and position sizing""" prompt = f""" Risk Management Review PROPOSED TRADE: Ticker: {analysis['ticker']} Action: {analysis['analysis']['trade_structure']['action']} Entry: ${analysis['analysis']['trade_structure']['entry_price_target']} Stop Loss: ${analysis['analysis']['trade_structure']['stop_loss']} Take Profit: ${analysis['analysis']['trade_structure']['take_profit']} Confidence: {analysis['analysis']['confidence']}% PORTFOLIO STATE: Total Value: ${portfolio['total_value']:,} Cash Available: ${portfolio['cash_available']:,} Open Positions: {portfolio['open_position_count']}/5 Today's P&L: ${portfolio['daily_pnl']:+,} EXISTING POSITIONS: {json.dumps(portfolio['positions'], indent=2)} SECTOR EXPOSURE: {json.dumps(portfolio['sector_exposure'], indent=2)} Calculate position size and perform final risk assessment. Provide JSON response. """ await self.client.connect() await self.client.query(prompt) response_text = "" async for message in self.client.receive_response(): if hasattr(message, 'content'): for block in message.content: if hasattr(block, 'text'): response_text += block.text await self.client.disconnect() result = json.loads(response_text) return result ``` ### Token Usage - Input: ~800-1200 tokens - Output: ~200-400 tokens - Cost: ~$0.003-0.006 per check --- ## Complete Pipeline Flow ```python import asyncio from typing import Optional class AgentPipeline: def __init__(self): self.scout = ScoutAgent() self.analyst = AnalystAgent() self.risk_manager = RiskManagerAgent() async def process_signal(self, signal: dict, context: dict, portfolio: dict) -> Optional[dict]: """Run complete three-stage pipeline""" # Stage 1: Scout (fast filter) print(f"[Scout] Triaging {signal['ticker']}...") scout_result = await self.scout.triage(signal) if scout_result['decision'] == 'PASS': print(f"[Scout] REJECTED: {scout_result['reasoning']}") await log_rejection('scout', signal, scout_result) return None print(f"[Scout] ESCALATE (score: {scout_result['score']}/10)") # Stage 2: Analyst (deep dive) print(f"[Analyst] Deep analysis of {signal['ticker']}...") context['scout_score'] = scout_result['score'] analyst_result = await self.analyst.analyze(signal, context) if not analyst_result['recommend']: print(f"[Analyst] REJECTED: {analyst_result['reasoning']}") await log_rejection('analyst', signal, analyst_result) return None print(f"[Analyst] RECOMMEND (confidence: {analyst_result['confidence']}%)") print(f"[Analyst] Thesis: {analyst_result['thesis']}") # Stage 3: Risk Manager (final gate) print(f"[Risk] Final approval check...") risk_result = await self.risk_manager.evaluate( {'ticker': signal['ticker'], 'analysis': analyst_result}, portfolio ) if not risk_result['approved']: print(f"[Risk] REJECTED: {risk_result['rejection_reason']}") await log_rejection('risk', signal, risk_result) return None print(f"[Risk] APPROVED") print(f"[Risk] Position: {risk_result['position_size_shares']} shares (${risk_result['position_size_usd']:,})") print(f"[Risk] Risk: ${risk_result['risk_amount_usd']:.2f} ({risk_result['risk_pct']:.2f}%)") # All stages passed - create alert alert = { "signal": signal, "scout": scout_result, "analysis": analyst_result, "risk": risk_result, "approved_at": datetime.now().isoformat() } await emit_discord_alert(alert) return alert async def log_rejection(stage: str, signal: dict, result: dict): """Log rejection to Postgres for analysis""" # Implementation... pass async def emit_discord_alert(alert: dict): """Publish to Discord via Redis""" # Implementation... pass ``` ## Cost Analysis Per 100 signals: ``` Scout (100 signals): $0.01 (filters 90) Analyst (10 signals): $0.15 (rejects 5) Risk Manager (5): $0.02 (approves 2-3) ----------------------------------------- Total: $0.18 per 100 signals ``` Compare to single-agent approach: ``` Sonnet only (100): $1.50 per 100 signals ``` **Savings: 88% cost reduction with better filtering** ## Testing Patterns ```python import pytest @pytest.mark.asyncio async def test_scout_filters_spam(): """Scout should reject obvious pump & dump""" signal = { "ticker": "PUMP", "metrics": { "mentions_15m": 100, # High volume "quality_score_avg": 2.0, # Low quality "sentiment": 0.9 # Suspiciously positive }, "top_post_summary": "New account: 'BUY NOW!!! MOON INCOMING!!!'" } scout = ScoutAgent() result = await scout.triage(signal) assert result['decision'] == 'PASS' assert 'pump' in result['reasoning'].lower() or 'spam' in result['reasoning'].lower() @pytest.mark.asyncio async def test_analyst_generates_valid_structure(): """Analyst output should match expected schema""" # Test implementation... pass @pytest.mark.asyncio async def test_risk_manager_enforces_2pct_rule(): """Risk manager should cap position at 2% of account""" # Test implementation... pass ``` ## Monitoring & Improvement ### Metrics to Track 1. **Scout Performance** - Escalation rate (should be ~10%) - False negative rate (good signals filtered) - Average score of escalated signals 2. **Analyst Performance** - Recommendation rate (should be ~50% of escalated) - Win rate of recommended trades - Average confidence vs actual outcome 3. **Risk Manager Performance** - Rejection rate (should be minimal if analyst is good) - Compliance with risk rules (should be 100%) - Position sizing accuracy ### Continuous Improvement - Review rejected signals weekly - Track which signal types convert to profitable trades - Adjust Scout thresholds based on data - Refine Analyst prompts based on missed opportunities - Update risk rules based on portfolio performance ## Deployment Notes - Each agent runs in the same Docker container (main app) - Agents are initialized once on startup - Connections are persistent for session reuse - Implement circuit breakers for API failures - Cache common analyses to reduce redundant calls - Log all agent interactions for debugging and auditing