#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ CVE-2021-30809 PoC Server Server HTTP per testare un harness crash-inducing per UAF WebKit su PS4 """ import http.server import socketserver import os import sys import datetime import threading import time import socket import urllib.parse import ipaddress # Configurazione del server PORT = 8080 HOST = '0.0.0.0' # Utility: verifica RFC1918 RFC1918_NETS = [ ipaddress.ip_network('10.0.0.0/8'), ipaddress.ip_network('172.16.0.0/12'), ipaddress.ip_network('192.168.0.0/16'), ] def is_rfc1918_ipv4(ip: str) -> bool: try: ip_obj = ipaddress.ip_address(ip) if ip_obj.version != 4: return False return any(ip_obj in net for net in RFC1918_NETS) except Exception: return False def is_valid_lan_candidate(ip: str) -> bool: try: ip_obj = ipaddress.ip_address(ip) if ip_obj.version != 4: return False if ip_obj.is_loopback or ip_obj.is_link_local or ip_obj.is_unspecified: return False # Escludi CGNAT 100.64.0.0/10 if ip_obj in ipaddress.ip_network('100.64.0.0/10'): return False return True except Exception: return False # Ordine di preferenza: 192.168.x.x, 10.x.x.x, 172.16-31.x.x def lan_priority(ip: str) -> int: if ip.startswith('192.168.'): return 0 if ip.startswith('10.'): return 1 # 172.16.0.0 - 172.31.255.255 try: ip_obj = ipaddress.ip_address(ip) if ip_obj in ipaddress.ip_network('172.16.0.0/12'): return 2 except Exception: pass return 3 # Enumerazione IP locali def enumerate_local_ipv4(): addrs = set() try: # getaddrinfo del hostname for fam, _, _, _, sockaddr in socket.getaddrinfo(socket.gethostname(), None): if fam == socket.AF_INET: addrs.add(sockaddr[0]) except Exception: pass try: # Altri nomi eventuali hn = socket.gethostname() for ip in socket.gethostbyname_ex(hn)[2]: addrs.add(ip) except Exception: pass return list(addrs) # Calcolo dinamico IP locale (preferisci SOLO RFC1918 validi) try: _s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) _s.connect(("1.1.1.1", 80)) detected_ip = _s.getsockname()[0] except Exception: detected_ip = '127.0.0.1' finally: try: _s.close() except Exception: pass candidates_raw = [detected_ip] + enumerate_local_ipv4() # Filtra candidati validi valid_candidates = [ip for ip in candidates_raw if is_valid_lan_candidate(ip)] # Solo RFC1918 lan_candidates = [ip for ip in valid_candidates if is_rfc1918_ipv4(ip)] lan_candidates = sorted(set(lan_candidates), key=lan_priority) # Scelta finale if lan_candidates: SERVER_IP = lan_candidates[0] else: # Fallback: se detected_ip è valido e non link-local/loopback/CGNAT, usalo, altrimenti localhost SERVER_IP = detected_ip if is_valid_lan_candidate(detected_ip) and not ipaddress.ip_address(detected_ip).is_loopback else 'localhost' SERVER_URL = f"http://{SERVER_IP}:{PORT}" class CustomHTTPRequestHandler(http.server.SimpleHTTPRequestHandler): """Custom HTTP handler con logging dettagliato""" def log_message(self, format, *args): """Override del logging per aggiungere timestamp e dettagli e non crashare se headers non è disponibile""" timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") client_ip = None try: client_ip = self.client_address[0] except Exception: client_ip = "?" try: ua = None hdrs = getattr(self, 'headers', None) if hdrs is not None: ua = hdrs.get('User-Agent', 'Unknown') else: ua = 'Unknown' print(f"[{timestamp}] {client_ip} - {format % args}") print(f"[{timestamp}] User-Agent: {ua}") if ua and ('PlayStation' in ua or 'PS4' in ua): print(f"[{timestamp}] *** PS4 BROWSER DETECTED! ***") print(f"[{timestamp}] Full User-Agent: {ua}") except Exception: # fallback minimale print(f"[{timestamp}] {client_ip} - {format % args}") def _send_cors(self): self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') self.send_header('Access-Control-Allow-Headers', '*') def do_OPTIONS(self): self.send_response(204) self._send_cors() self.end_headers() def do_GET(self): """Routing GET con headers corretti e senza inviare 200 prima del percorso""" print(f"\n=== NEW REQUEST ===") print(f"Path: {self.path}") print(f"Client: {self.client_address[0]}:{self.client_address[1]}") try: if self.path == '/' or self.path == '/index.html': self.send_response(200) self._send_cors() self.send_header('Content-Type', 'text/html; charset=utf-8') self.end_headers() with open('exploit.html', 'rb') as f: self.wfile.write(f.read()) return if self.path == '/poc.js': self.send_response(200) self._send_cors() self.send_header('Content-Type', 'application/javascript') self.end_headers() with open('poc.js', 'rb') as f: self.wfile.write(f.read()) return if self.path == '/server-info.js': self.send_response(200) self._send_cors() self.send_header('Content-Type', 'application/javascript') self.end_headers() payload = ( "window.SERVER_INFO = {" f"ip: '{SERVER_IP}', port: {PORT}, url: '{SERVER_URL}', ips: {lan_candidates}" "};" ) self.wfile.write(payload.encode('utf-8')) return if self.path == '/ip': self.send_response(200) self._send_cors() self.send_header('Content-Type', 'text/plain; charset=utf-8') self.end_headers() self.wfile.write(SERVER_IP.encode('utf-8')) return if self.path == '/favicon.ico': # Evita 404 rumorosi self.send_response(204) self._send_cors() self.end_headers() return # Non trovato self.send_response(404) self._send_cors() self.send_header('Content-Type', 'text/plain; charset=utf-8') self.end_headers() self.wfile.write(b'Not Found') except FileNotFoundError: self.send_response(404) self._send_cors() self.send_header('Content-Type', 'text/plain; charset=utf-8') self.end_headers() self.wfile.write(b'File not found') def do_POST(self): """Handler per POST requests (log dei crash)""" try: content_length = int(self.headers.get('Content-Length', 0) or 0) except Exception: content_length = 0 post_data = self.rfile.read(content_length) if content_length else b'' timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"\n[{timestamp}] *** POST DATA RECEIVED ***") try: print(f"Data: {post_data.decode('utf-8', errors='ignore')}") except Exception: print("[WARN] Unable to decode POST data") self.send_response(200) self._send_cors() self.send_header('Content-Type', 'text/plain; charset=utf-8') self.end_headers() self.wfile.write(b'OK') def print_banner(): """Stampa il banner di avvio""" print("=" * 60) print(" CVE-2021-30809 WebKit UAF PoC Server") print(" Target: PlayStation 4 Browser (WebKit 605.1.15)") print("=" * 60) print(f"Server starting on {HOST}:{PORT}") print(f"Access URL: {SERVER_URL}") if lan_candidates: print("LAN candidates:") for ip in lan_candidates: print(f" - http://{ip}:{PORT}") else: print("No RFC1918 LAN IP detected. Using fallback.") print("Waiting for PS4 connections...") print("=" * 60) def monitor_connections(): """Thread per monitorare le connessioni""" while True: time.sleep(30) timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"\n[{timestamp}] Server still running, waiting for connections... ({SERVER_URL})") def main(): """Funzione principale""" print_banner() try: # Crea il server with http.server.ThreadingHTTPServer((HOST, PORT), CustomHTTPRequestHandler) as httpd: print(f"Server successfully started on {SERVER_URL}") # Avvia thread di monitoraggio monitor_thread = threading.Thread(target=monitor_connections, daemon=True) monitor_thread.start() # Avvia il server httpd.serve_forever() except KeyboardInterrupt: print("\n\nServer shutting down...") sys.exit(0) except Exception as e: print(f"Error starting server: {e}") sys.exit(1) if __name__ == '__main__': main()