#!/usr/bin/env python3 """Check Fortinet target for specific FortiOS auth bypass response.""" import argparse import asyncio import csv import http.client import itertools import json import socket import re import ssl import sys import threading import time from typing import Dict, Iterable, List, Optional, Tuple from urllib.parse import urljoin, urlparse TARGET_PATH = "/api/v2.0/cmdb/system/admin/../../../../../cgi-bin/fwbcgi" EXPECTED_ERRCODE = "0" EXPECTED_MESSAGE = "(null)" FORTI_REGEX = re.compile(r"[A-Za-z]*forti[A-Za-z]*", re.IGNORECASE) REDIRECT_STATUSES = {301, 302, 303, 307, 308} DEFAULT_USER_AGENT = "fwbcgi-scanner/1.0" PATCHED_CONTENT_LENGTH = "199" PATCHED_CONTENT_TYPE = "text/html; charset=iso-8859-1" PATCHED_BODY_EXACT = ( b"\n" b"
\n" b"You don't have permission to access this resource.
\n" b"\n" ) class Spinner: def __init__(self, message: str = "Scanning") -> None: self.message = message self._stop = threading.Event() self._thread: Optional[threading.Thread] = None def start(self) -> None: if self._thread is not None: return self._stop.clear() self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() def stop(self) -> None: if self._thread is None: return self._stop.set() self._thread.join() self._thread = None self._stop.clear() sys.stderr.write("\r" + " " * (len(self.message) + 4) + "\r") sys.stderr.flush() def _run(self) -> None: for char in itertools.cycle("|/-\\"): if self._stop.is_set(): break sys.stderr.write(f"\r{self.message} {char}") sys.stderr.flush() time.sleep(0.1) def parse_target(target: str) -> Tuple[str, str, int]: parsed = urlparse(target) if parsed.scheme not in {"http", "https"}: raise ValueError("Targets must start with http:// or https://") if not parsed.hostname: raise ValueError("Target must include a hostname") port = parsed.port if port is None: port = _default_port_for_scheme(parsed.scheme) if parsed.path not in ("", "/") or parsed.query or parsed.fragment: raise ValueError("Targets may not include paths, queries, or fragments") if not (0 < port < 65536): raise ValueError(f"Port out of range in target '{target}'") return parsed.scheme, parsed.hostname, port def classify_response(status: int, body: bytes, headers: Dict[str, str]) -> str: if status == 403: if _is_definite_patched(body, headers): return "[+] PATCHED" return "[+] POSSIBLY PATCHED" if status == 200: decoded = body.decode("utf-8", errors="ignore") try: payload = json.loads(decoded) except json.JSONDecodeError: return "[-] INDETERMINATE" if isinstance(payload, dict): errcode = str(payload.get("errcode")) if "errcode" in payload else None message = str(payload.get("message")) if "message" in payload else None if errcode == EXPECTED_ERRCODE and message == EXPECTED_MESSAGE: return "[+] VULNERABLE" return "[+] MAY BE VULNERABLE" return "[-] INDETERMINATE" return "[-] INDETERMINATE" def find_forti_tokens(body: bytes) -> List[str]: text = body.decode("utf-8", errors="ignore") hits = [] for match in FORTI_REGEX.finditer(text): token = match.group(0) # Ensure we only capture tokens with real content beyond just "forti" hits.append(token) return hits def _default_port_for_scheme(scheme: str) -> int: return 443 if scheme == "https" else 80 def _normalize_path(path: str) -> str: if not path: return "/" return path if path.startswith("/") else f"/{path}" def _host_header(host: str, port: int, scheme: str) -> str: default_port = _default_port_for_scheme(scheme) return host if port == default_port else f"{host}:{port}" def _build_ssl_context(validate_tls: bool) -> ssl.SSLContext: protocol = getattr(ssl, "PROTOCOL_TLS_CLIENT", ssl.PROTOCOL_TLS) context = ssl.SSLContext(protocol) tls_version = getattr(ssl, "TLSVersion", None) if tls_version is not None: context.minimum_version = ssl.TLSVersion.TLSv1_2 else: for opt_name in ("OP_NO_TLSv1", "OP_NO_TLSv1_1"): opt = getattr(ssl, opt_name, 0) context.options |= opt if validate_tls: context.check_hostname = True context.verify_mode = ssl.CERT_REQUIRED context.load_default_certs() else: context.check_hostname = False context.verify_mode = ssl.CERT_NONE return context def _is_definite_patched(body: bytes, headers: Dict[str, str]) -> bool: content_length = headers.get("content-length") content_type = headers.get("content-type") if content_length != PATCHED_CONTENT_LENGTH: return False if content_type is None or content_type.lower() != PATCHED_CONTENT_TYPE: return False return body == PATCHED_BODY_EXACT def _build_current_url(scheme: str, host: str, port: int, path: str) -> str: default_port = _default_port_for_scheme(scheme) port_part = "" if port == default_port else f":{port}" normalized_path = _normalize_path(path) return f"{scheme}://{host}{port_part}{normalized_path}" def _resolve_redirect( scheme: str, host: str, port: int, path: str, location: str, ) -> Tuple[Tuple[str, str, int, str], str]: base_url = _build_current_url(scheme, host, port, path) resolved = urljoin(base_url, location) parsed = urlparse(resolved) target_scheme = parsed.scheme or scheme if target_scheme not in ("http", "https"): return (scheme, host, port, path), f"redirect to unsupported scheme '{parsed.scheme}'" target_host = parsed.hostname if not target_host: return (scheme, host, port, path), "redirect missing hostname" target_port = parsed.port or _default_port_for_scheme(target_scheme) target_path = parsed.path or "/" if parsed.query: target_path = f"{target_path}?{parsed.query}" return (target_scheme, target_host, target_port, target_path), "" def check_target( scheme: str, host: str, port: int, timeout: float, capture_forti: bool, follow_redirects: bool, max_redirects: int, validate_tls: bool, user_agent: str, ) -> Tuple[str, List[str], str]: current_scheme = scheme current_host = host current_port = port current_path = TARGET_PATH redirects_followed = 0 reason_text = "" insecure_context = None secure_context = None while True: if current_scheme == "http": conn = http.client.HTTPConnection(current_host, current_port, timeout=timeout) else: if validate_tls: if secure_context is None: secure_context = _build_ssl_context(True) context = secure_context else: if insecure_context is None: insecure_context = _build_ssl_context(False) context = insecure_context conn = http.client.HTTPSConnection(current_host, current_port, timeout=timeout, context=context) request_path = _normalize_path(current_path) try: conn.request( "GET", request_path, headers={ "Host": _host_header(current_host, current_port, current_scheme), "Connection": "keep-alive", "User-Agent": user_agent, }, ) resp = conn.getresponse() status = resp.status location_header = resp.getheader("Location") body = resp.read() header_map = {k.lower(): v for k, v in resp.getheaders()} except (socket.timeout, ConnectionError, OSError, http.client.HTTPException) as exc: return "[-] INDETERMINATE", [], f"request failed: {exc}" finally: conn.close() if follow_redirects and status in REDIRECT_STATUSES: if redirects_followed >= max_redirects: reason_text = f"redirect limit {max_redirects} reached" elif not location_header: reason_text = "redirect missing Location header" else: (next_scheme, next_host, next_port, next_path), error = _resolve_redirect( current_scheme, current_host, current_port, request_path, location_header, ) if error: reason_text = error else: current_scheme = next_scheme current_host = next_host current_port = next_port current_path = next_path redirects_followed += 1 continue classification = classify_response(status, body, header_map) forti_hits = find_forti_tokens(body) if capture_forti else [] details = f"HTTP {status}" if reason_text: details = f"{details} ({reason_text})" elif follow_redirects and redirects_followed: plural = "s" if redirects_followed != 1 else "" details = f"{details} (followed {redirects_followed} redirect{plural})" return classification, forti_hits, details def load_targets_from_files(paths: Iterable[str]) -> List[str]: targets: List[str] = [] for path in paths: try: with open(path, "r", encoding="utf-8") as handle: for raw_line in handle: line = raw_line.strip() if not line or line.startswith("#"): continue targets.append(line) except OSError as err: raise ValueError(f"Failed to read targets file '{path}': {err}") from err return targets def write_csv(results: List[Tuple[str, str, List[str], str]], path: str) -> None: try: with open(path, "w", newline="", encoding="utf-8") as handle: writer = csv.writer(handle) for target, classification, _, _ in results: writer.writerow([target, classification]) except OSError as err: raise RuntimeError(f"Failed to write CSV '{path}': {err}") from err async def _scan_single_target( raw_target: str, args: argparse.Namespace, concurrency_guard: asyncio.Semaphore, ) -> Tuple[str, str, List[str], str]: try: scheme, host, port = parse_target(raw_target) except ValueError as err: return raw_target, "[-] INDETERMINATE", [], str(err) async with concurrency_guard: classification, forti_hits, details = await asyncio.to_thread( check_target, scheme, host, port, args.timeout, args.find_forti, args.follow_redirects, args.max_redirects, args.validate_tls, args.user_agent, ) return raw_target, classification, forti_hits, details async def run_scans(targets: List[str], args: argparse.Namespace) -> List[Tuple[str, str, List[str], str]]: guard = asyncio.Semaphore(args.workers) tasks = [asyncio.create_task(_scan_single_target(target, args, guard)) for target in targets] return await asyncio.gather(*tasks) def main() -> None: parser = argparse.ArgumentParser(description="Probe Fortinet targets for fwbcgi exposure") parser.add_argument( "targets", nargs="*", help="List of targets as