#!/usr/bin/env python3 """ Batch Evaluation Script for AAA Security Assessment Framework This script runs all security assessment tasks and calculates success rate. Results are saved to batch_results/ directory with a timestamped summary. """ import os import sys import json import subprocess import time from pathlib import Path from datetime import datetime from typing import List, Dict, Any def get_all_tasks(task_dir: Path) -> List[str]: """Get all task IDs from the task directory.""" tasks = [] if not task_dir.exists(): print(f"Error: Task directory not found: {task_dir}") return tasks for item in sorted(task_dir.iterdir()): if item.is_dir() and item.name.startswith("task-"): tasks.append(item.name) return tasks def run_single_task(task_id: str, timeout: int = 1800) -> Dict[str, Any]: """Run a single task and return the result.""" print(f"\n{'='*80}") print(f"Running: {task_id}") print(f"{'='*80}") start_time = time.time() try: # Run the task with timeout (default 30 minutes per task) result = subprocess.run( ["uv", "run", "python", "main.py", "launch", task_id], capture_output=True, text=True, timeout=timeout, env=os.environ.copy() ) duration = time.time() - start_time # Check if task completed if result.returncode == 0: print(f"✓ {task_id} completed in {duration:.1f}s") return { "task_id": task_id, "status": "completed", "duration": duration, "stdout": result.stdout, "stderr": result.stderr } else: print(f"✗ {task_id} failed with return code {result.returncode}") return { "task_id": task_id, "status": "failed", "duration": duration, "return_code": result.returncode, "stdout": result.stdout, "stderr": result.stderr } except subprocess.TimeoutExpired: duration = time.time() - start_time print(f"⏱ {task_id} timed out after {duration:.1f}s") return { "task_id": task_id, "status": "timeout", "duration": duration } except Exception as e: duration = time.time() - start_time print(f"✗ {task_id} error: {e}") return { "task_id": task_id, "status": "error", "duration": duration, "error": str(e) } def parse_result_file(task_id: str, results_dir: Path) -> Dict[str, Any] | None: """Parse the most recent result file for a task.""" # Find the most recent result file for this task result_files = list(results_dir.glob(f"{task_id}_*.json")) if not result_files: return None # Get the most recent file latest_file = max(result_files, key=lambda p: p.stat().st_mtime) try: with open(latest_file, 'r') as f: return json.load(f) except Exception as e: print(f"Warning: Could not parse {latest_file}: {e}") return None def generate_summary( task_results: List[Dict[str, Any]], parsed_results: Dict[str, Dict[str, Any]], total_duration: float ) -> Dict[str, Any]: """Generate summary statistics.""" total_tasks = len(task_results) completed = sum(1 for r in task_results if r["status"] == "completed") failed = sum(1 for r in task_results if r["status"] == "failed") timeout = sum(1 for r in task_results if r["status"] == "timeout") error = sum(1 for r in task_results if r["status"] == "error") # Count successful exploits based on parsed result files successful_exploits = 0 exploit_scores = [] for task_id, parsed in parsed_results.items(): if parsed and parsed.get("success"): successful_exploits += 1 if "score" in parsed: exploit_scores.append(parsed["score"]) success_rate = (successful_exploits / total_tasks * 100) if total_tasks > 0 else 0 avg_score = (sum(exploit_scores) / len(exploit_scores)) if exploit_scores else 0 return { "total_tasks": total_tasks, "completed": completed, "failed": failed, "timeout": timeout, "error": error, "successful_exploits": successful_exploits, "success_rate": f"{success_rate:.2f}%", "average_score": f"{avg_score:.2f}", "total_duration_seconds": total_duration, "total_duration_formatted": f"{total_duration / 3600:.2f} hours" } def main(): """Main batch evaluation function.""" # Check for API key if not os.environ.get("OPENAI_API_KEY"): print("Error: OPENAI_API_KEY environment variable not set") print("Please run: export OPENAI_API_KEY='your-api-key'") sys.exit(1) # Setup paths project_root = Path(__file__).parent task_dir = project_root / "data" / "task" results_dir = project_root / "results" batch_results_dir = project_root / "batch_results" # Create batch results directory batch_results_dir.mkdir(exist_ok=True) # Get all tasks tasks = get_all_tasks(task_dir) if not tasks: print("No tasks found!") sys.exit(1) print(f"Found {len(tasks)} tasks to evaluate") print(f"Results will be saved to: {batch_results_dir}") print() # Run all tasks task_results = [] start_time = time.time() for i, task_id in enumerate(tasks, 1): print(f"\n[{i}/{len(tasks)}] Processing {task_id}...") result = run_single_task(task_id) task_results.append(result) total_duration = time.time() - start_time # Parse individual result files print("\nParsing individual result files...") parsed_results = {} for task_id in tasks: parsed = parse_result_file(task_id, results_dir) if parsed: parsed_results[task_id] = parsed # Generate summary summary = generate_summary(task_results, parsed_results, total_duration) # Save batch results timestamp = datetime.now().strftime("%Y-%m-%dT%H-%M-%S") batch_result_file = batch_results_dir / f"batch_results_{timestamp}.json" full_report = { "timestamp": timestamp, "summary": summary, "task_execution": task_results, "parsed_results": parsed_results } with open(batch_result_file, 'w') as f: json.dump(full_report, f, indent=2) # Print summary print("\n" + "="*80) print("BATCH EVALUATION SUMMARY") print("="*80) print(f"Total Tasks: {summary['total_tasks']}") print(f"Completed: {summary['completed']}") print(f"Failed: {summary['failed']}") print(f"Timeout: {summary['timeout']}") print(f"Error: {summary['error']}") print(f"Successful Exploits: {summary['successful_exploits']}") print(f"Success Rate: {summary['success_rate']}") print(f"Average Score: {summary['average_score']}") print(f"Total Duration: {summary['total_duration_formatted']}") print(f"\nDetailed results saved to: {batch_result_file}") print("="*80) if __name__ == "__main__": main()