#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Symfony Generic Risk Scanner (safe PoC) Autor: m10sec (m10sec@proton.me) Se enfocan en detección no invasiva y evidencia segura. """ import argparse import json import socket import ssl from urllib.parse import urlparse, urljoin import requests try: from colorama import init as colorama_init, Fore, Style colorama_init() except Exception: # Fallback si no está colorama class _F: RESET=""; RED=""; GREEN=""; YELLOW=""; CYAN=""; MAGENTA="" class _S: BRIGHT=""; RESET_ALL="" Fore=_F(); Style=_S() def banner(): ascii_art = f""" {Fore.CYAN}{Style.BRIGHT} ▗▄▄▖▄ ▄ ▄▄▄▄ ▗▞▀▀▘▄▄▄ ▄▄▄▄ ▄ ▄ ▗▄▄▖▗▞▀▘▗▞▀▜▌▄▄▄▄ ▐▌ █ █ █ █ █ ▐▌ █ █ █ █ █ █ ▐▌ ▝▚▄▖▝▚▄▟▌█ █ ▝▀▚▖ ▀▀▀█ █ █ ▐▛▀▘▀▄▄▄▀ █ █ ▀▀▀█ ▝▀▚▖ █ █ ▗▄▄▞▘▄ █ ▐▌ ▄ █ ▗▄▄▞▘ ▀▀▀ ▀▀▀ {Style.RESET_ALL} """ print(ascii_art) print(Style.BRIGHT + Fore.CYAN + "="*54) print(" Symfony Generic Risk Scanner (safe PoC)") print(" m10sec@proton.me") print("="*54 + Style.RESET_ALL) print("[1] Inyección de cabeceras (CRLF, genérico)") print("[2] Host Header Injection (genérico)") print("[3] Symfony Profiler expuesto") print("[4] Exposición de /_fragment (firma/errores)") print("[9] Ejecutar todas las pruebas") print("="*54 + "\n") def norm_url(u: str) -> str: if not u.startswith(("http://", "https://")): return "http://" + u return u.rstrip("/") def result(ok: bool, msg: str): color = Fore.GREEN if ok else Fore.RED print(color + ("[+]" if ok else "[-]") + " " + msg + Style.RESET_ALL) def info(msg: str): print(Fore.CYAN + "[*] " + msg + Style.RESET_ALL) def warn(msg: str): print(Fore.YELLOW + "[!] " + msg + Style.RESET_ALL) def build_session(insecure: bool=False, proxy: str=None, timeout: int=8) -> requests.Session: s = requests.Session() s.verify = not insecure s.headers.update({"User-Agent": "SymfonySafeScanner/1.0"}) if proxy: s.proxies = {"http": proxy, "https": proxy} s.timeout = timeout return s def test_crlf_raw(url: str, timeout: int=6): """ Intenta inyectar un header adicional usando CRLF en un header controlado. Evidencia: si la respuesta devuelve 'X-CVE-Test: injected' en headers. """ info("Probando inyección de cabeceras (CRLF, genérico)...") parsed = urlparse(url) host = parsed.hostname port = parsed.port or (443 if parsed.scheme == "https" else 80) path = parsed.path or "/" # Header con CRLF para intentar romper la línea y añadir un header nuevo evil_header = "example.com%0d%0aX-CVE-Test: injected" req_lines = [ f"GET {path} HTTP/1.1", f"Host: {host}", f"X-Forwarded-Host: {evil_header}", "Connection: close", "", "" ] payload = "\r\n".join(req_lines).encode() try: s = socket.create_connection((host, port), timeout=timeout) if parsed.scheme == "https": ctx = ssl.create_default_context() s = ctx.wrap_socket(s, server_hostname=host) s.sendall(payload) data = b"" while True: chunk = s.recv(4096) if not chunk: break data += chunk s.close() head = data.split(b"\r\n\r\n", 1)[0].decode("iso-8859-1", errors="ignore") if "X-CVE-Test: injected" in head: result(True, "Posible CRLF/resp smuggling: header inyectado reflejado.") return {"vulnerable": True, "evidence": "X-CVE-Test header found in response"} else: result(False, "No se observó inyección de header en respuesta.") return {"vulnerable": False} except Exception as e: warn(f"Error en prueba CRLF: {e}") return {"error": str(e)} def test_host_header_injection(url: str, session: requests.Session): """ Cambia Host y observa si el servidor lo refleja (en Location, enlaces o body). """ info("Probando Host Header Injection (genérico)...") evil = "evil.com" try: r = session.get(url, headers={"Host": evil, "X-Forwarded-Host": evil}, allow_redirects=False) indicators = [] loc = r.headers.get("Location", "") if evil in loc: indicators.append("Location refleja Host malicioso") if evil in r.text[:5000]: indicators.append("Body refleja Host malicioso (primeros 5KB)") if r.status_code in (301,302,303,307,308) and indicators: result(True, f"Posible HHI: {', '.join(indicators)} (HTTP {r.status_code}).") return {"vulnerable": True, "indicators": indicators, "status": r.status_code} elif indicators: result(True, f"Posible HHI (sin redirección): {', '.join(indicators)}.") return {"vulnerable": True, "indicators": indicators, "status": r.status_code} else: result(False, "No se observaron reflejos/redirect influenciados por Host.") return {"vulnerable": False, "status": r.status_code} except Exception as e: warn(f"Error en HHI: {e}") return {"error": str(e)} def test_profiler(url: str, session: requests.Session): """ Detecta exposición de Symfony Profiler/WDT. """ info("Probando exposición de Symfony Profiler...") endpoints = ["/_profiler", "/_wdt"] found = [] try: for ep in endpoints: r = session.get(urljoin(url+"/", ep)) if ("Symfony Profiler" in r.text) or ("Web Debug Toolbar" in r.text) or ("_profiler_search" in r.text): found.append(ep) if found: result(True, f"Profiler expuesto en: {', '.join(found)}") return {"exposed": True, "endpoints": found} result(False, "No se detectó Profiler/WDT.") return {"exposed": False} except Exception as e: warn(f"Error en Profiler: {e}") return {"error": str(e)} def test_fragment_exposure(url: str, session: requests.Session): """ Prueba SEGURA en /_fragment: - No intenta leer archivos ni hacer SSRF. - Solo comprueba si el endpoint responde y revela mensajes de firma/errores que indiquen superficie de ataque (p.ej., firma faltante). """ info("Probando exposición de /_fragment (seguro, sin exfiltración)...") target = urljoin(url+"/", "/_fragment") try: # Petición mínima con datos inocuos r = session.post(target, data={"_path": "controller:index"}, allow_redirects=False) txt = (r.text or "")[:1000] clues = [] for needle in ["Invalid signature", "An error occurred", "Fragment", "signature", "URI must be absolute", "Invalid _path"]: if needle.lower() in txt.lower(): clues.append(needle) if r.status_code in (400, 403, 404, 500) and clues: result(True, f"/_fragment accesible con mensajes diagnósticos: {', '.join(set(clues))}") return {"exposed": True, "status": r.status_code, "clues": list(set(clues))} elif r.status_code == 200: # Responder 200 aquí puede indicar configuración muy laxa result(True, "Respuesta 200 en /_fragment (revisar firma/config).") return {"exposed": True, "status": r.status_code} else: result(False, f"/_fragment no parece accesible (HTTP {r.status_code}).") return {"exposed": False, "status": r.status_code} except Exception as e: warn(f"Error en /_fragment: {e}") return {"error": str(e)} def ejecutar_todo(url: str, session: requests.Session): print(Fore.MAGENTA + "\n[+] Ejecutando todas las pruebas disponibles...\n" + Style.RESET_ALL) summary = {} summary["crlf"] = test_crlf_raw(url) summary["host_header"] = test_host_header_injection(url, session) summary["profiler"] = test_profiler(url, session) summary["fragment"] = test_fragment_exposure(url, session) print(Fore.MAGENTA + "\n[✓] Escaneo completo.\n" + Style.RESET_ALL) return summary def main(): banner() parser = argparse.ArgumentParser(description="Symfony Generic Risk Scanner (safe PoC)") parser.add_argument("url", help="URL completa del sitio (http(s)://...)") parser.add_argument("-o","--option", choices=["1","2","3","4","9"], default="9", help="Prueba a ejecutar (1..4) o 9 para todas (default).") parser.add_argument("--timeout", type=int, default=8, help="Timeout en segundos (default 8)") parser.add_argument("--insecure", action="store_true", help="No verificar TLS") parser.add_argument("--proxy", help="Proxy (ej. http://127.0.0.1:8080)") parser.add_argument("--json", action="store_true", help="Salida en JSON (además de texto)") args = parser.parse_args() base = norm_url(args.url) session = build_session(insecure=args.insecure, proxy=args.proxy, timeout=args.timeout) if args.option == "1": out = {"crlf": test_crlf_raw(base, timeout=args.timeout)} elif args.option == "2": out = {"host_header": test_host_header_injection(base, session)} elif args.option == "3": out = {"profiler": test_profiler(base, session)} elif args.option == "4": out = {"fragment": test_fragment_exposure(base, session)} else: out = ejecutar_todo(base, session) if args.json: print(json.dumps(out, indent=2, ensure_ascii=False)) if __name__ == "__main__": main()