#!/usr/bin/env python3 """AAA Security Assessment Framework - CLI Entry Point""" import asyncio import os import typer import dotenv dotenv.load_dotenv() app = typer.Typer( name="aaa-security", help="AAA Security Assessment Framework for CVE Exploitation Testing", ) @app.command() def green( host: str = typer.Option( default=os.getenv("GREEN_AGENT_HOST", "localhost"), help="Host to bind Green Agent", ), port: int = typer.Option( default=int(os.getenv("GREEN_AGENT_PORT", "9001")), help="Port for Green Agent", ), ): """Start the Green Agent (Assessment Orchestrator).""" from src.green_agent.agent import start_green_agent print(f"Starting Green Agent on {host}:{port}") start_green_agent(host=host, port=port) @app.command() def white( host: str = typer.Option( default=os.getenv("WHITE_AGENT_HOST", "localhost"), help="Host to bind White Agent", ), port: int = typer.Option( default=int(os.getenv("WHITE_AGENT_PORT", "9002")), help="Port for White Agent", ), ): """Start the White Agent (Task Executor).""" from src.white_agent.agent import start_white_agent print(f"Starting White Agent on {host}:{port}") start_white_agent(host=host, port=port) @app.command() def launch( task_id: str = typer.Argument( default="task-cve-2024-32964-ssrf", help="Task ID to evaluate", ), green_host: str = typer.Option( default=os.getenv("GREEN_AGENT_HOST", "localhost"), help="Host for Green Agent", ), green_port: int = typer.Option( default=int(os.getenv("GREEN_AGENT_PORT", "9001")), help="Port for Green Agent", ), white_host: str = typer.Option( default=os.getenv("WHITE_AGENT_HOST", "localhost"), help="Host for White Agent", ), white_port: int = typer.Option( default=int(os.getenv("WHITE_AGENT_PORT", "9002")), help="Port for White Agent", ), ): """Launch complete evaluation with both agents.""" from src.launcher import launch_evaluation asyncio.run( launch_evaluation( task_id=task_id, green_host=green_host, green_port=green_port, white_host=white_host, white_port=white_port, ) ) @app.command() def tasks(): """List all available tasks.""" from src.agentxploit.task_loader import TaskLoader loader = TaskLoader() task_list = loader.list_tasks() print("Available Tasks:") print("-" * 60) for tid in task_list: try: summary = loader.get_task_summary(tid) print(f" {tid}") print(f" CVE: {summary['cve']}") print(f" Type: {summary['type']}") print(f" Severity: {summary['severity']}") print(f" Runtime: {summary['runtime']}") print() except Exception as e: print(f" {tid} (error loading: {e})") print() @app.command() def info( task_id: str = typer.Argument(..., help="Task ID to show info for"), ): """Show detailed information about a task.""" from src.agentxploit.task_loader import TaskLoader loader = TaskLoader() try: config = loader.load_task(task_id) print(f"Task: {config.get('task_name', 'Unknown')}") print("=" * 60) print() vuln = config.get("vulnerability", {}) print("Vulnerability:") print(f" CVE: {vuln.get('cve', 'Unknown')}") print(f" Type: {vuln.get('type', 'Unknown')}") print(f" Severity: {vuln.get('severity', 'Unknown')}") print(f" Summary: {vuln.get('summary', 'Unknown')}") print() obj = config.get("objective", {}) print("Objective:") print(f" Goal: {obj.get('goal', 'Unknown')}") print(f" Target: {obj.get('target_endpoint', 'Unknown')}") print() print(f"Runtime: {config.get('_runtime', 'Unknown')}") print(f"Timeout: {config.get('timeout', 300)}s") except FileNotFoundError: print(f"Error: Task '{task_id}' not found") raise typer.Exit(1) @app.command() def docker_up( task_id: str = typer.Argument( default="task-cve-2024-32964-ssrf", help="Task ID for Docker environment", ), ): """Start Docker environment manually (for debugging).""" from src.agentxploit.task_loader import TaskLoader from src.agentxploit.docker_manager import DockerManager loader = TaskLoader() try: config = loader.load_task(task_id) except FileNotFoundError: print(f"Error: Task '{task_id}' not found") raise typer.Exit(1) manager = DockerManager(task_id, config) if manager.start_environment(): print("Docker environment started successfully") print(f"Target container: {manager.target_container}") print(f"Attacker container: {manager.attacker_container}") else: print("Failed to start Docker environment") @app.command() def docker_down( task_id: str = typer.Argument( default="task-cve-2024-32964-ssrf", help="Task ID for Docker environment", ), ): """Stop Docker environment manually (for debugging).""" from src.agentxploit.task_loader import TaskLoader from src.agentxploit.docker_manager import DockerManager loader = TaskLoader() try: config = loader.load_task(task_id) except FileNotFoundError: print(f"Error: Task '{task_id}' not found") raise typer.Exit(1) manager = DockerManager(task_id, config) if manager.stop_environment(): print("Docker environment stopped successfully") else: print("Failed to stop Docker environment") if __name__ == "__main__": app()