import sys import os import argparse import struct import hashlib import pefile from unicorn import Uc, UcError, UC_ARCH_X86, UC_MODE_32, UC_HOOK_CODE, UC_HOOK_MEM_INVALID, UC_HOOK_MEM_WRITE, UC_MEM_FETCH_UNMAPPED, UC_MEM_READ_UNMAPPED, UC_MEM_WRITE_UNMAPPED # pyright: ignore[reportPrivateImportUsage] from unicorn.x86_const import * from capstone import Cs, CS_ARCH_X86, CS_MODE_32 from capstone.x86 import X86_OP_MEM # Bump this when sharing logs so we can confirm what code actually ran. SCRIPT_VERSION = "2026-02-06u" # ---------------------------- # Config you care about # ---------------------------- BREAK_RVA = 0x1C11 # your .text:10001C11 (IDA base 0x10000000) DUMP_MAX = 0x400000 # max bytes to attempt dumping DUMP_MIN = 0x1000 # minimum to keep if heuristics fail OUTFILE = "shellcode.bin" OUTFILE_FULL = "shellcode_full.bin" # larger dump from stage1 base (see STAGE1_FULL_DUMP_LEN) STAGE2_OUTFILE = "stage2_payload.bin" STAGE2_LIVE_OUTFILE = "stage2_live_dump.bin" STAGE2_MEMIMG_OUTFILE = "stage2_mem_image.bin" STAGE2_MEMIMG_BASE = 0x00400000 STAGE2_SPAN_OUTFILE = "stage2_written_span.bin" STAGE2_TRACE = "stage2_trace.log" MAX_INSN = 5_000_000 # instruction cap to avoid infinite loops STAGE2_MAX_INSN = 2_000_000 # per-candidate cap (bruteforce stage2 mode) STAGE2_DUMP_SIZE = 0x40000 # 256KB default dump for shellcode FORCE_BREAK = False # set True to jump to breakpoint from anti-analysis region RUN_INIT_ONLY = False # set True to only run init and capture seed # Stage1 (the decrypted buffer) often includes exception-driven / anti-emulation junk instructions # (e.g., IN/OUT, INT, RETF, segment register ops, stack pivot). We can't emulate Windows SEH here, # so we optionally skip a small set of these while executing inside stage1. STAGE1_SKIP_MAX = 10_000 # Cap stage1 execution in --mode full so we don't run forever. STAGE1_MAX_INSN = 20_000_000 # Many samples VirtualProtect() a full 0x200000 region for the stage1 buffer and keep data past # the initially-reported "out_len". Dumping the full region helps offline extraction of stage2. STAGE1_FULL_DUMP_LEN = 0x200000 # ---------------------------- # log.dll globals (RVA offsets) # ---------------------------- G_SEED_RVA = 0x16354 G_LEN_RVA = 0x16358 G_K32_RVA = 0x1635C G_BUF_RVA = 0x16360 # ---------------------------- # Helpers # ---------------------------- def align_down(x, a): return x & ~(a - 1) def align_up(x, a): return (x + (a - 1)) & ~(a - 1) def resolve_input_path(path: str, input_dir: str) -> str: """ Convenience: if the user passes just a filename and it doesn't exist in CWD, automatically resolve it from input_dir. """ if not path: return path if os.path.exists(path): return path # Only auto-resolve "bare" paths (no directory component) if os.path.basename(path) == path: cand = os.path.join(input_dir, path) if os.path.exists(cand): return cand return path def resolve_output_path(path: str, output_dir: str) -> str: """ If path is a bare filename, write it under output_dir. If it already contains a directory component or is absolute, keep it as-is. """ if not path: return path if path.lower() == "none": return path if os.path.isabs(path) or os.path.dirname(path): return path return os.path.join(output_dir, path) def read_cstr(mu: Uc, addr: int, max_len: int = 0x200) -> bytes: if not addr: return b"" out = bytearray() for i in range(max_len): try: b = mu.mem_read(addr + i, 1) except Exception: break if b == b"\x00": break out += b return bytes(out) def read_wstr(mu: Uc, addr: int, max_chars: int = 0x200) -> str: if not addr: return "" bs = bytearray() for i in range(max_chars): try: c = mu.mem_read(addr + i * 2, 2) except Exception: break if c == b"\x00\x00": break bs += c try: return bs.decode("utf-16le", errors="ignore") except Exception: return "" def is_probably_end(blob: bytes) -> int: """ Heuristic trimming: - If we see a long run of 0x00 or 0xCC after some data, trim there. - Otherwise keep full blob. """ if len(blob) <= DUMP_MIN: return len(blob) # Look for 32 consecutive nulls or int3s after at least DUMP_MIN patterns = [b"\x00" * 32, b"\xCC" * 32] for pat in patterns: idx = blob.find(pat, DUMP_MIN) if idx != -1: return idx return len(blob) def parse_imm_from_op_str(op_str: str): s = (op_str or "").strip() try: if s.startswith("0x") or s.startswith("-0x"): return int(s, 16) return int(s, 10) except Exception: return None def op_str_has_seg_reg(op_str: str) -> bool: s = (op_str or "").lower().replace(" ", "") # Match things like: "ds,edi" or "es:[edi],dx" or "movds,ax" (capstone formats vary) for seg in ("ds", "es", "ss", "cs", "fs", "gs"): if s.startswith(seg + ","): return True if (seg + ":[") in s: return True return False def sha256_file(path: str) -> str: h = hashlib.sha256() with open(path, "rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): h.update(chunk) return h.hexdigest() def write_stage1_base_file(path: str, base_va: int, length: int, sha256_hex: str): try: with open(path + ".base.txt", "w") as f: f.write(f"base_va=0x{base_va:08X}\n") f.write(f"length=0x{length:X}\n") f.write(f"sha256={sha256_hex}\n") except Exception: pass def write_base_file(path: str, base_va: int, length: int, extra: dict | None = None): try: with open(path + ".base.txt", "w") as f: f.write(f"base_va=0x{base_va:08X}\n") f.write(f"length=0x{length:X}\n") if extra: for k, v in extra.items(): f.write(f"{k}={v}\n") except Exception: pass # ---------------------------- # Output tee (stdout log) # ---------------------------- class TeeStdout: def __init__(self, *streams): self.streams = streams def write(self, data): for s in self.streams: try: s.write(data) except Exception: pass def flush(self): for s in self.streams: try: s.flush() except Exception: pass # ---------------------------- # Minimal WinAPI stubs # ---------------------------- class WinStubs: def __init__(self, mu: Uc): self.mu = mu self.alloc_base = 0x30000000 self.alloc_page = 0x1000 self.allocations = {} # addr -> size self.process_heap = 0x70000000 self.handle_base = 0x70000000 self.files = {} # handle -> {"data": bytes, "pos": int} self.default_data = b"" self.default_pos = 0 self.payload_data = b"" self.tls_next = 1 self.tls = {} # idx -> value def virtual_alloc(self, lpAddress, dwSize, flAllocationType, flProtect): size = align_up(dwSize, self.alloc_page) if size == 0: size = self.alloc_page # If the caller requests a specific address, try to honor it. Stage payloads often # expect to build a PE at 0x00400000. if lpAddress: req = align_down(lpAddress, self.alloc_page) # If it's already mapped, treat as success. try: _ = self.mu.mem_read(req, 1) self.allocations.setdefault(req, size) print(f"[*] VirtualAlloc reuse-mapped 0x{req:08X}-0x{req+size:08X} size=0x{size:X}") return req except Exception: pass try: for page in range(req, req + size, self.alloc_page): self.mu.mem_map(page, self.alloc_page) self.allocations[req] = size print(f"[*] VirtualAlloc mapped 0x{req:08X}-0x{req+size:08X} size=0x{size:X} (requested)") return req except Exception: # Fall back to heap-style allocations below. pass addr = align_up(self.alloc_base, self.alloc_page) self.alloc_base = addr + size # map RWX for simplicity in emulation # Force-map page by page to ensure it's readable/executable later. for page in range(addr, addr + size, self.alloc_page): try: self.mu.mem_map(page, self.alloc_page) except Exception: pass print(f"[*] VirtualAlloc mapped 0x{addr:08X}-0x{addr+size:08X} size=0x{size:X}") self.allocations[addr] = size return addr def heap_alloc(self, hHeap, dwFlags, dwBytes): addr = self.virtual_alloc(0, dwBytes, 0, 0) # HEAP_ZERO_MEMORY = 0x00000008 if dwFlags & 0x8: try: self.mu.mem_write(addr, b"\x00" * min(align_up(dwBytes, 1), 0x1000000)) except Exception: pass return addr def heap_size(self, hHeap, dwFlags, lpMem): for base, sz in self.allocations.items(): if base == lpMem: return sz return 0 def heap_realloc(self, hHeap, dwFlags, lpMem, dwBytes): if not lpMem: return self.heap_alloc(hHeap, dwFlags, dwBytes) old_sz = self.heap_size(hHeap, dwFlags, lpMem) new_addr = self.heap_alloc(hHeap, dwFlags, dwBytes) if old_sz: try: data = bytes(self.mu.mem_read(lpMem, min(old_sz, dwBytes))) self.mu.mem_write(new_addr, data) except Exception: pass # We don't unmap old pages (would require tracking exact page set); emulate success. return new_addr def virtual_protect(self, lpAddress, dwSize, flNewProtect, lpflOldProtect): # no-op for emulator return 1 def load_library_a(self, lpLibFileName): # return a fake module handle return 0x50000000 def get_proc_address(self, hModule, lpProcName): # return a fake function pointer (not used if your code calls IAT directly) return 0x50001000 def memcpy(self, dst, src, n): data = self.mu.mem_read(src, n) self.mu.mem_write(dst, data) return dst def new_handle(self): h = self.handle_base self.handle_base += 4 return h def open_file_bytes(self, data: bytes): h = self.new_handle() self.files[h] = {"data": data, "pos": 0} return h def read_file(self, h, n): f = self.files.get(h) if not f: return b"" pos = f["pos"] chunk = f["data"][pos:pos + n] f["pos"] = pos + len(chunk) return chunk def read_default(self, n): pos = self.default_pos chunk = self.default_data[pos:pos + n] self.default_pos = pos + len(chunk) return chunk def tls_alloc(self) -> int: idx = self.tls_next self.tls_next += 1 self.tls[idx] = 0 return idx def tls_free(self, idx: int) -> int: self.tls.pop(idx, None) return 1 def tls_get(self, idx: int) -> int: return int(self.tls.get(idx, 0)) & 0xFFFFFFFF def tls_set(self, idx: int, val: int) -> int: self.tls[idx] = val & 0xFFFFFFFF return 1 # ---------------------------- # PE mapping into Unicorn # ---------------------------- def map_pe(mu: Uc, pe: pefile.PE): image_base = pe.OPTIONAL_HEADER.ImageBase # type: ignore size_image = pe.OPTIONAL_HEADER.SizeOfImage # type: ignore page = 0x1000 mu.mem_map(align_down(image_base, page), align_up(size_image, page)) # Map headers headers = pe.get_memory_mapped_image()[: pe.OPTIONAL_HEADER.SizeOfHeaders] # type: ignore mu.mem_write(image_base, headers) # Map sections for s in pe.sections: va = image_base + s.VirtualAddress raw = s.get_data() mu.mem_write(va, raw) return image_base, size_image def find_export_rva(pe: pefile.PE, name: str) -> int: pe.parse_data_directories(directories=[pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_EXPORT"]]) for exp in pe.DIRECTORY_ENTRY_EXPORT.symbols: # type: ignore if exp.name and exp.name.decode(errors="ignore") == name: return exp.address # RVA raise RuntimeError(f"Export not found: {name}") # ---------------------------- # Emulation harness # ---------------------------- def emulate_and_dump(dll_path: str, payload_path: str, mode: str, stop_at: str, output_dir: str): os.makedirs(output_dir, exist_ok=True) out_shellcode_path = os.path.join(output_dir, OUTFILE) out_shellcode_full_path = os.path.join(output_dir, OUTFILE_FULL) out_stage2_path = os.path.join(output_dir, STAGE2_OUTFILE) out_stage2_live_path = os.path.join(output_dir, STAGE2_LIVE_OUTFILE) out_stage2_memimg_path = os.path.join(output_dir, STAGE2_MEMIMG_OUTFILE) out_stage2_span_path = os.path.join(output_dir, STAGE2_SPAN_OUTFILE) out_stage2_trace_path = os.path.join(output_dir, STAGE2_TRACE) print(f"[*] emulate_logwrite_dump_shellcode.py {SCRIPT_VERSION} mode={mode} stop_at={stop_at}") pe = pefile.PE(dll_path, fast_load=False) if pe.FILE_HEADER.Machine != pefile.MACHINE_TYPE["IMAGE_FILE_MACHINE_I386"]: # type: ignore raise RuntimeError("This script is for 32-bit x86 DLLs only.") mu = Uc(UC_ARCH_X86, UC_MODE_32) stubs = WinStubs(mu) # Map NULL page and install a minimal fake DOS+PE header to avoid unmapped reads like [0x3C]. mu.mem_map(0x00000000, 0x1000) mu.mem_write(0x00000000, b"MZ") mu.mem_write(0x0000003C, struct.pack(" 0x200000). try: mu.mem_map(0x00200000, 0x00200000) except Exception: pass # Build IAT stubs to avoid jumping into import name table RVAs. pe.parse_data_directories(directories=[pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_IMPORT"]]) STUB_BASE = 0x20000000 STUB_SIZE = 0x10000 mu.mem_map(STUB_BASE, STUB_SIZE) stub_map = {} stub_next = STUB_BASE name_to_stub = {} def add_stub(imp_name): nonlocal stub_next addr = stub_next stub_next += 0x10 stub_map[addr] = imp_name # Prefer first mapping for name->stub. name_to_stub.setdefault(imp_name, addr) return addr for entry in pe.DIRECTORY_ENTRY_IMPORT: # type: ignore for imp in entry.imports: if not imp.name: continue name = imp.name.decode(errors="ignore") stub_addr = add_stub(name) mu.mem_write(imp.address, struct.pack(" 0x47C204CA. hash_to_api = { 0xE2F5E21B: "GetModuleFileNameA", 0x54BFC47B: "CreateFileW", 0x053FAAA4: "ReadFile", 0xD6410922: "CloseHandle", 0x47C204CA: "VirtualProtect", } dyn_stub_addrs = {} def resolve_hash(hash_val): name = hash_to_api.get(hash_val) if not name: return 0 if name not in dyn_stub_addrs: dyn_stub_addrs[name] = add_stub(name) return dyn_stub_addrs[name] logwrite_rva = find_export_rva(pe, "LogWrite") logwrite_va = image_base + logwrite_rva break_va = image_base + BREAK_RVA init_va = image_base + 0x1000 decrypt_va = image_base + 0x1640 seed_value = 0x216707EA # hash/finalizer of "kernel32.dll" g_seed = image_base + G_SEED_RVA g_len = image_base + G_LEN_RVA g_k32 = image_base + G_K32_RVA g_buf = image_base + G_BUF_RVA # Stack STACK_BASE = 0x0FF00000 STACK_SIZE = 0x00100000 mu.mem_map(STACK_BASE, STACK_SIZE) esp = STACK_BASE + STACK_SIZE - 0x1000 mu.reg_write(UC_X86_REG_ESP, esp) # Fake args (stdcall-ish): LogWrite(arg1,arg2,arg3,arg4) # If it crashes due to arg expectations, you can tweak these. def push32(val): nonlocal esp esp -= 4 mu.mem_write(esp, struct.pack(" bool: """ If the current instruction has a memory operand with disp==target_disp and base reg value is 0, rewrite that base reg to new_base. Returns True if patched. """ try: for op in insn.operands: if op.type != X86_OP_MEM: continue if op.mem.disp != target_disp: continue base_reg = op.mem.base if not base_reg: continue try: base_val = mu.reg_read(base_reg) except Exception: continue if base_val != 0: continue mu.reg_write(base_reg, new_base) return True except Exception: return False return False def map_range(addr, size): if size <= 0: return start = align_down(addr, 0x1000) end = align_up(addr + size, 0x1000) for page in range(start, end, 0x1000): try: mu.mem_map(page, 0x1000) except Exception: pass def map_fake_kernel32(): # Minimal fake kernel32 with export table for stage2 API resolution. K32_BASE = 0x50000000 K32_SIZE = 0x10000 map_range(K32_BASE, K32_SIZE) # DOS header + PE header stub mu.mem_write(K32_BASE + 0x0, b"MZ") mu.mem_write(K32_BASE + 0x3C, struct.pack("Ldr mu.mem_write(PEB + 0x0C, struct.pack("InLoadOrderModuleList (self-linked) mu.mem_write(LDR + 0x0C, struct.pack("= max_len: break density = total / max_len if max_len else 0.0 # Score: prefer lower entropy + higher density + more non-zero bytes score = (-ent * 10.0) + (density * 100.0) + (nz / 4096.0) return (score, ent, insn_count) def hash_name(s: bytes) -> int: edx = 0x811C9DC5 for b in s: eax = b ^ edx edx = (eax * 0x1000193) & 0xFFFFFFFF eax = edx eax = ((eax >> 0xF) ^ edx) & 0xFFFFFFFF ecx = (eax * 0x85EBCA6B) & 0xFFFFFFFF eax = ((ecx >> 0xD) ^ ecx) & 0xFFFFFFFF return eax def resolve_hash_to_stub(h: int) -> int: # Map hash->API and return stub address if known. for addr, name in stub_map.items(): try: if hash_name(name.encode()) == h: return addr except Exception: pass return 0 def in_stage2_region(addr): base = stage2_region["base"] size = stage2_region["size"] return base and base <= addr < base + size def in_allocations(addr): for base, sz in stubs.allocations.items(): if base <= addr < base + sz: return True return False def in_stub_regions(addr): # IAT stubs + fake kernel32/ntdll ranges if 0x20000000 <= addr < 0x20010000: return True if 0x00200000 <= addr < 0x00400000: return True # Common PE mapping base if 0x00400000 <= addr < 0x02000000: return True if 0x50000000 <= addr < 0x50010000: return True if 0x60000000 <= addr < 0x60010000: return True return False def in_image(addr): return image_base <= addr < image_base + size_image def in_expected_exec(addr): return ( in_stage2_region(addr) or in_allocations(addr) or in_stub_regions(addr) or in_image(addr) ) def in_stage1(addr: int) -> bool: if stage1_entry["va"] is None or stage1_entry["len"] is None: return False return stage1_entry["va"] <= addr < stage1_entry["va"] + stage1_entry["len"] def _score_stage1_entry(raw: bytes, off: int) -> float | None: """ Score a candidate stage1 entry offset by decoding a small window. Prefer dense, non-privileged instruction streams. """ if off < 0 or off >= len(raw): return None window = raw[off : off + 0x80] if len(window) < 0x10: return None insn_count = 0 total = 0 bad = 0 # Quick prologue bonuses bonus = 0 if window[:3] == b"\x55\x8B\xEC": # push ebp; mov ebp, esp bonus += 50 if window[:2] == b"\x8B\xFF": # mov edi, edi (msvc hotpatch) bonus += 10 # call $+5; pop reg (position-independent) if len(window) >= 6 and window[0] == 0xE8 and window[1:5] == b"\x00\x00\x00\x00" and 0x58 <= window[5] <= 0x5F: bonus += 20 for insn in md.disasm(window, 0x10000000 + off): insn_count += 1 total += insn.size m = (insn.mnemonic or "").lower() if m.startswith("f") or m in ("in", "insb", "insw", "insd", "out", "outsb", "outsw", "outsd", "int", "into", "iret", "iretd", "hlt", "cli", "sti", "retf", "bound"): bad += 1 if insn_count >= 12 or total >= 0x40: break if insn_count < 4: return None density = total / 0x40 return density * 100 + insn_count * 3 + bonus - bad * 30 def pick_stage1_entry_offset(raw: bytes) -> int: """ Stage1 buffers in this family often start with junk bytes (privileged/FPU/SEH driven). Find a better entrypoint within the first 0x40000 bytes. """ max_scan = min(len(raw), 0x40000) if max_scan < 0x40: return 0 seed = set([0]) for i in range(max_scan - 8): if raw[i:i+3] == b"\x55\x8B\xEC": seed.add(i) if raw[i:i+2] == b"\x8B\xFF": seed.add(i) if raw[i] == 0xFC and i + 6 < max_scan and raw[i+1] == 0xE8: seed.add(i) if raw[i] == 0xE8 and raw[i+1:i+5] == b"\x00\x00\x00\x00" and 0x58 <= raw[i+5] <= 0x5F: seed.add(i) for i in range(0, max_scan, 0x10): seed.add(i) best = (float("-inf"), 0) for off in seed: sc = _score_stage1_entry(raw[:max_scan], off) if sc is None: continue # Prefer non-zero offsets if scores tie. if sc > best[0] or (sc == best[0] and best[1] == 0 and off != 0): best = (sc, off) return best[1] def stage1_skip_insn(mu, address: int, insn) -> bool: if not stage1_ran["done"] or not in_stage1(address) or insn is None: return False if stage1_skip["n"] >= STAGE1_SKIP_MAX: return False mnem = (insn.mnemonic or "").lower() op_str = insn.op_str or "" # Suspicious stack pivot used as anti-emulation: "mov esp, imm32" to a high address. if mnem == "mov": ops = op_str.replace(" ", "") if ops.startswith("esp,"): imm = parse_imm_from_op_str(ops.split(",", 1)[1]) if imm is not None and imm >= 0x80000000: # Don't skip the pivot outright: later instructions may expect the new stack # layout. Instead, proactively map a small stack window so subsequent pops # don't fault. try: old_esp = mu.reg_read(UC_X86_REG_ESP) win_start = align_down(imm - 0x2000, 0x1000) map_range(win_start, 0x8000) # Heuristic: copy a small window of the current stack into the new one # so pop/popal sequences don't immediately go off the rails. try: shadow = bytes(mu.mem_read(old_esp, 0x400)) mu.mem_write(imm, shadow) seeded = " + seeded" except Exception: seeded = "" print( f"[!] Stage1 stack pivot detected at 0x{address:08X}: {insn.mnemonic} {insn.op_str} " f"(mapped 0x{win_start:08X}-0x{win_start+0x8000:08X}{seeded})" ) except Exception: pass return False # Segment register moves frequently raise GPF and rely on SEH for control-flow. if op_str_has_seg_reg(op_str): stage1_skip["n"] += 1 mu.reg_write(UC_X86_REG_EIP, address + insn.size) if stage1_skip["n"] <= 30 or (stage1_skip["n"] % 500 == 0): print(f"[!] Stage1 skip#{stage1_skip['n']} @0x{address:08X}: {insn.mnemonic} {insn.op_str}") return True # Privileged / exception-producing instructions in user-mode. # NOTE: Do NOT blanket-skip x87 FPU instructions ("f*"). Many real shellcodes use # FNSTENV/FSTENV tricks to obtain EIP, and skipping them breaks control-flow. if mnem in { "retf", "retfw", "iret", "iretd", "iretq", "int", "into", "ud2", "in", "insb", "insw", "insd", "out", "outsb", "outsw", "outsd", "cli", "sti", "hlt", # Rare in real decoders; often appears as junk. "bound", }: stage1_skip["n"] += 1 mu.reg_write(UC_X86_REG_EIP, address + insn.size) if stage1_skip["n"] <= 30 or (stage1_skip["n"] % 500 == 0): print(f"[!] Stage1 skip#{stage1_skip['n']} @0x{address:08X}: {insn.mnemonic} {insn.op_str}") return True return False def on_code(mu, address, size, user_data): insn_count["n"] += 1 if not stage2_active["on"] and insn_count["n"] >= MAX_INSN: print(f"[!] Reached instruction cap ({MAX_INSN}); stopping at 0x{address:08X}") mu.emu_stop() return # Intercept the sample's API resolver at 0x100014E0 and return stubs directly. # This avoids needing a fully-correct kernel32 image + export walking + string helpers. if address == image_base + 0x14E0: try: esp = mu.reg_read(UC_X86_REG_ESP) ret = struct.unpack(" 0x{stub_addr:08X} RET=0x{ret:08X}") mu.reg_write(UC_X86_REG_EAX, stub_addr) mu.reg_write(UC_X86_REG_EIP, ret) mu.reg_write(UC_X86_REG_ESP, esp + 12) return except Exception: # Fall through and let the real resolver run if our interception fails. pass # Precise fix for the early failure you saw: code trying to read the DOS header field # e_lfanew at offset 0x3C from a NULL base register. if address == image_base + 0x14F3: insn = decode_insn(address) if insn: if patch_low_base_for_disp(insn, 0x3C, image_base): print("[!] Patched NULL base reg -> ImageBase for disp 0x3C at 0x100014F3") elif patch_low_base_for_disp(insn, 0x3C, LOW_MIRROR_BASE): # type: ignore print("[!] Patched NULL base reg -> low-mirror for disp 0x3C at 0x100014F3") if stage2_active["on"]: if stage2_active.get("kind") == "stage1" and insn_count["n"] >= STAGE1_MAX_INSN: print(f"[!] Stage1 hit instruction cap ({STAGE1_MAX_INSN}); stopping at 0x{address:08X}") mu.emu_stop() return # Stage1 executes inside the decrypted buffer and often contains junk instructions # that rely on Windows SEH for control flow. Skip a small set pre-execution. insn = decode_insn(address) if stage1_skip_insn(mu, address, insn): return if stage2_active.get("kind") == "candidate": # Per-candidate instruction cap if insn_count["n"] >= STAGE2_MAX_INSN: print(f"[!] Stage2 hit per-candidate cap ({STAGE2_MAX_INSN}); stopping candidate") mu.emu_stop() return # If no writes for a long stretch, this is likely a bad entrypoint. if (insn_count["n"] - stage2_writes["last_insn"]) > 200_000: print("[!] Stage2 no-write streak exceeded 200k; stopping candidate") mu.emu_stop() return # Bail out quickly if execution runs into unmapped/zero-filled junk. # Only enforce "expected regions" for brute-force stage2 entrypoint candidates. if stage2_active.get("kind") == "candidate" and not in_expected_exec(address): print(f"[!] Stage2 jumped out of expected regions at 0x{address:08X}; stopping candidate") mu.emu_stop() return if insn and insn.mnemonic in ("iretd", "bound"): if stage1_skip_insn(mu, address, insn): return print(f"[!] Stage2 hit unlikely instruction '{insn.mnemonic}' at 0x{address:08X}; stopping candidate") mu.emu_stop() return if insn and stage2_trace["fh"]: try: regs = { "eax": mu.reg_read(UC_X86_REG_EAX), "ebx": mu.reg_read(UC_X86_REG_EBX), "ecx": mu.reg_read(UC_X86_REG_ECX), "edx": mu.reg_read(UC_X86_REG_EDX), "esi": mu.reg_read(UC_X86_REG_ESI), "edi": mu.reg_read(UC_X86_REG_EDI), "ebp": mu.reg_read(UC_X86_REG_EBP), "esp": mu.reg_read(UC_X86_REG_ESP), } stage2_trace["fh"].write( f"0x{address:08X} {insn.mnemonic} {insn.op_str} " f"EAX=0x{regs['eax']:08X} EBX=0x{regs['ebx']:08X} " f"ECX=0x{regs['ecx']:08X} EDX=0x{regs['edx']:08X} " f"ESI=0x{regs['esi']:08X} EDI=0x{regs['edi']:08X} " f"EBP=0x{regs['ebp']:08X} ESP=0x{regs['esp']:08X}\n" ) except Exception: pass try: b = bytes(mu.mem_read(address, 8)) if stage2_active.get("kind") == "candidate" and b == b"\x00" * 8: print(f"[!] Stage2 executing zero-filled bytes at 0x{address:08X}; stopping candidate") mu.emu_stop() return except Exception: pass # If we captured the stage1 entry pointer at the LogWrite breakpoint, enable stage2-style # monitoring when execution first transfers into that buffer. if stage1_entry["va"] is not None and not stage2_active["on"] and address == stage1_entry["va"]: stage2_active["on"] = True stage2_active["kind"] = "stage1" stage1_ran["done"] = True stage2_region["base"] = stage1_entry["va"] stage2_region["size"] = stage1_entry["len"] or 0 stage2_writes["last_insn"] = 0 stage2_invalid["n"] = 0 stage2_writes["n"] = 0 stage2_page_writes["counts"].clear() stage2_page_writes["best_page"] = None stage2_best_shellcode["addr"] = None stage2_best_shellcode["score"] = None insn_count["n"] = 0 # Stage1 commonly unpacks a PE to 0x00400000. Also, some decoder stubs use early # memory ops on EDI; if EDI is 0 (common in our stubbed environment), steer it # toward the expected PE base to avoid NULL-page dependent traps. try: map_range(0x00400000, 0x400000) # 4MB window is enough for this family except Exception: pass try: edi = mu.reg_read(UC_X86_REG_EDI) if edi == 0: mu.reg_write(UC_X86_REG_EDI, 0x00400000) print("[!] Stage1 fixup: set EDI=0x00400000") except Exception: pass # Stage1 buffers often contain junk bytes at offset 0. Redirect EIP to a better-looking # internal entrypoint, then proceed with normal execution/monitoring. try: base = int(stage1_entry["va"] or 0) blen = int(stage1_entry["len"] or 0) if base and blen >= 0x40: scan_len = min(blen, 0x40000) raw = bytes(mu.mem_read(base, scan_len)) pick_off = pick_stage1_entry_offset(raw) if pick_off: mu.reg_write(UC_X86_REG_EIP, base + pick_off) print(f"[+] Stage1 entry redirect: 0x{base:08X} -> 0x{base + pick_off:08X} (off=0x{pick_off:X})") except Exception as e: print(f"[!] Stage1 entry redirect failed: {e}") try: regs = { "eax": mu.reg_read(UC_X86_REG_EAX), "ebx": mu.reg_read(UC_X86_REG_EBX), "ecx": mu.reg_read(UC_X86_REG_ECX), "edx": mu.reg_read(UC_X86_REG_EDX), "esi": mu.reg_read(UC_X86_REG_ESI), "edi": mu.reg_read(UC_X86_REG_EDI), "ebp": mu.reg_read(UC_X86_REG_EBP), "esp": mu.reg_read(UC_X86_REG_ESP), "efl": mu.reg_read(UC_X86_REG_EFLAGS), } arg1 = struct.unpack(" best: best = (nz, ent) best_addr = b2 best_buf = buf if best_addr is not None and best is not None: keep = is_probably_end(best_buf) keep = max(keep, DUMP_MIN) best_buf = best_buf[:keep] with open(out_shellcode_path, "wb") as f: f.write(best_buf) print(f"[+] Best heap region 0x{best_addr:08X} nz={best[0]} ent={best[1]:.3f} -> wrote {len(best_buf)} bytes") dumped["done"] = True mu.emu_stop() return # Short-circuit the export resolver (0x100014E0) with a hash-based stub. if address == image_base + 0x14E0: try: esp = mu.reg_read(UC_X86_REG_ESP) ret = struct.unpack(" 0x{stub_addr:08X}") return except Exception: pass # Stage2 hash resolver: if EAX holds a hash and we can resolve it, redirect. if stage2_active["on"]: try: eax = mu.reg_read(UC_X86_REG_EAX) stub = resolve_hash_to_stub(eax) if stub: mu.reg_write(UC_X86_REG_EAX, stub) except Exception: pass # Cap large REP MOVSD copies to avoid walking off the image. if address == image_base + 0x4E50: try: esi = mu.reg_read(UC_X86_REG_ESI) ecx = mu.reg_read(UC_X86_REG_ECX) img_end = image_base + size_image if image_base <= esi < img_end: remaining = img_end - esi max_ecx = remaining // 4 if ecx > max_ecx: mu.reg_write(UC_X86_REG_ECX, max_ecx) print(f"[!] Capped ECX for rep movsd to {max_ecx}") except Exception: pass # Cap long hash loops over export names. if address == image_base + 0x1585: try: ebx = mu.reg_read(UC_X86_REG_EBX) if ebx > 0x1000: mu.reg_write(UC_X86_REG_EBX, 0x1000) print("[!] Capped EBX in hash loop to 0x1000") except Exception: pass # Avoid ECX-1 underflow in string loops. if address == image_base + 0x145B: try: ecx = mu.reg_read(UC_X86_REG_ECX) if ecx == 0: mu.reg_write(UC_X86_REG_ECX, 1) print("[!] Fixed ECX underflow at 0x1000145B") except Exception: pass # Skip suspicious arithmetic on invalid memory in the 0x1000E1xx range. if image_base + 0xE180 <= address <= image_base + 0xE300: try: esp = mu.reg_read(UC_X86_REG_ESP) ret = struct.unpack("= 0xC0000000 or base_val < 0x1000 or (base_val == 0 and abs(op.mem.disp) > 0x10000): mu.reg_write(UC_X86_REG_EIP, address + insn.size) print(f"[!] Skipped suspicious mem op at 0x{address:08X}") return except Exception: pass if address == image_base + 0xE1C0 and insn: mu.reg_write(UC_X86_REG_EIP, address + insn.size) print("[!] Skipped op at 0x1000E1C0") return # Skip security cookie / exception helpers that can raise in emulation. if address == image_base + 0x220E: try: esp = mu.reg_read(UC_X86_REG_ESP) ret = struct.unpack(" {len(data)} bytes") eax = stubs.open_file_bytes(data) elif name == "GetModuleFileNameA": argc = 3 lp_filename = struct.unpack(" EAX=0x{eax:08X} RET=0x{ret:08X}") mu.reg_write(UC_X86_REG_EAX, eax) esp += 4 + (argc * 4) mu.reg_write(UC_X86_REG_ESP, esp) mu.reg_write(UC_X86_REG_EIP, ret) return # Patch register base for the known faulting instruction. if address == image_base + 0x14F3: insn = decode_insn(address) if insn: for op in insn.operands: if op.type == X86_OP_MEM and op.mem.disp == 0x3C: base_reg = op.mem.base if base_reg: try: base_val = mu.reg_read(base_reg) if base_val == 0: mu.reg_write(base_reg, image_base) except Exception: pass # Stop when we hit the breakpoint instruction if address == break_va: if dumped["done"]: return eax = mu.reg_read(UC_X86_REG_EAX) print(f"[+] BREAK hit at VA 0x{address:08X}") print(f"[+] ImageBase: 0x{image_base:08X}") print(f"[+] EAX: 0x{eax:08X}") try: out_len = struct.unpack(" RVA 0x{rva:08X}") else: print("[!] EAX does not point inside the PE image (likely heap/VirtualAlloc).") # Try reading up to the stage1 length (bounded by DUMP_MAX) from EAX and trim with heuristics try: want = out_len if out_len else DUMP_MAX want = min(want, DUMP_MAX) blob = bytes(mu.mem_read(eax, want)) except Exception as e: print(f"[!] Failed to read memory at EAX: {e}") blob = b"" if blob: keep = is_probably_end(blob) keep = max(keep, DUMP_MIN) blob = blob[:keep] with open(out_shellcode_path, "wb") as f: f.write(blob) sh = sha256_file(out_shellcode_path) print(f"[+] Wrote {len(blob)} bytes to {out_shellcode_path} sha256={sh}") write_stage1_base_file(out_shellcode_path, eax, len(blob), sh) # Also dump the full stage1 region (commonly 0x200000) for offline stage2 extraction. # This includes bytes beyond the initial stage1 length. try: want_full = min(DUMP_MAX, STAGE1_FULL_DUMP_LEN) full_blob = bytes(mu.mem_read(eax, want_full)) with open(out_shellcode_full_path, "wb") as f: f.write(full_blob) sh_full = sha256_file(out_shellcode_full_path) print(f"[+] Wrote {len(full_blob)} bytes to {out_shellcode_full_path} sha256={sh_full}") write_stage1_base_file(out_shellcode_full_path, eax, len(full_blob), sh_full) except Exception as e: print(f"[!] Failed to dump full stage1 region: {e}") else: print("[!] No bytes dumped.") dumped["done"] = True # In --mode logwrite, the user explicitly wants to stop here. # In --mode full, continue so LogWrite can transfer control to the stage1 payload. if stop_at == "break" or mode == "logwrite": mu.emu_stop() return # Stop if the function returns to our fake return address if address == RET_ADDR: print("[*] Returned to fake RET; stopping.") returned["done"] = True mu.emu_stop() def on_mem_invalid(mu, access, address, size, value, user_data): # Attempt very light “API call” handling if the code jumps/calls into nowhere. # Many DLLs call kernel32!VirtualAlloc/etc via IAT -> address in kernel32 range. # In emulation, that won't exist. If you see it failing here, add a rule. eip = mu.reg_read(UC_X86_REG_EIP) try: regs = { "eax": mu.reg_read(UC_X86_REG_EAX), "ebx": mu.reg_read(UC_X86_REG_EBX), "ecx": mu.reg_read(UC_X86_REG_ECX), "edx": mu.reg_read(UC_X86_REG_EDX), "esi": mu.reg_read(UC_X86_REG_ESI), "edi": mu.reg_read(UC_X86_REG_EDI), "ebp": mu.reg_read(UC_X86_REG_EBP), "esp": mu.reg_read(UC_X86_REG_ESP), "efl": mu.reg_read(UC_X86_REG_EFLAGS), } print( f"[!] Invalid memory access at EIP=0x{eip:08X} addr=0x{address:08X} access={access} " f"EAX=0x{regs['eax']:08X} EBX=0x{regs['ebx']:08X} ECX=0x{regs['ecx']:08X} EDX=0x{regs['edx']:08X} " f"ESI=0x{regs['esi']:08X} EDI=0x{regs['edi']:08X} EBP=0x{regs['ebp']:08X} ESP=0x{regs['esp']:08X} EFLAGS=0x{regs['efl']:08X}" ) except Exception: print(f"[!] Invalid memory access at EIP=0x{eip:08X} addr=0x{address:08X} access={access}") insn = decode_insn(eip) if insn: print(f"[!] Disasm: {insn.mnemonic} {insn.op_str}") for op in insn.operands: if op.type == X86_OP_MEM: base_reg = op.mem.base if base_reg: try: base_val = mu.reg_read(base_reg) reg_name = insn.reg_name(base_reg) print(f"[!] Base reg {reg_name} = 0x{base_val:08X}") except Exception: pass # Stage1 often uses exception-driven control flow by touching bogus high addresses. # We do NOT want to map huge "upper zero pages" (they devolve into 0x00-filled loops), # and we also can't reliably change EIP from inside the invalid-memory callback on # this host (it can trip Unicorn into UC_ERR_MAP). # # So: record the fault and stop. The outer emulation loop will advance EIP and resume. if stage2_active.get("kind") == "stage1": if access in (UC_MEM_READ_UNMAPPED, UC_MEM_WRITE_UNMAPPED) and address >= 0xC0000000 and not (0xA0000000 <= address < 0xB0000000): stage1_fault["pending"] = True stage1_fault["eip"] = eip stage1_fault["next_eip"] = eip + insn.size stage1_fault["access"] = access stage1_fault["addr"] = address stage1_fault["disasm"] = f"{insn.mnemonic} {insn.op_str}".strip() return False if access == UC_MEM_READ_UNMAPPED and address == image_base + size_image: try: mu.mem_map(address, 0x1000) mu.mem_write(address, b"\x00" * 0x1000) print(f"[!] Mapped zero page at image end 0x{address:08X}") return True except Exception: pass if access == UC_MEM_READ_UNMAPPED and address >= 0xFFFF0000: page = align_down(address, 0x1000) try: mu.mem_map(page, 0x1000) mu.mem_write(page, b"\x00" * 0x1000) print(f"[!] Mapped high zero page at 0x{page:08X}") return True except Exception: pass if access == UC_MEM_READ_UNMAPPED and address >= 0xC0000000 and stage2_active.get("kind") != "stage1": page = align_down(address, 0x1000) try: mu.mem_map(page, 0x1000) mu.mem_write(page, b"\x00" * 0x1000) print(f"[!] Mapped upper zero page at 0x{page:08X}") return True except Exception: pass if access == UC_MEM_READ_UNMAPPED and address >= image_base + size_image and address < image_base + size_image + 0x10000: page = align_down(address, 0x1000) try: mu.mem_map(page, 0x1000) mu.mem_write(page, b"\x00" * 0x1000) print(f"[!] Mapped zero page at 0x{page:08X}") return True except Exception: pass if access == UC_MEM_READ_UNMAPPED and address < size_image: page = align_down(address, 0x1000) try: mu.mem_map(page, 0x1000) src = image_base + page length = min(0x1000, size_image - page) data = mu.mem_read(src, length) mu.mem_write(page, data) print(f"[!] Mapped RVA mirror page at 0x{page:08X}") return True except Exception: pass for op in insn.operands: if op.type == X86_OP_MEM and op.mem.base: try: base_val = mu.reg_read(op.mem.base) if base_val < size_image: mu.reg_write(op.mem.base, base_val + image_base) print(f"[!] Rebased mem base to 0x{base_val + image_base:08X}") return True except Exception: pass # If we couldn't decode the instruction, we can't safely skip. Fail closed. if stage2_active.get("kind") == "stage1": if access in (UC_MEM_READ_UNMAPPED, UC_MEM_WRITE_UNMAPPED) and address >= 0xC0000000 and not (0xA0000000 <= address < 0xB0000000): return False if access == UC_MEM_FETCH_UNMAPPED and address == RET_ADDR: print("[*] Returned to fake RET (unmapped fetch); stopping.") return False if stage2_active["on"]: insn = decode_insn(eip) if stage2_active.get("kind") == "candidate" and insn and insn.mnemonic in ("ret", "retf", "iret", "iretd"): print(f"[*] Stage2 hit return '{insn.mnemonic}' at 0x{eip:08X}; stopping candidate") return False if stage2_active["on"]: if access == UC_MEM_FETCH_UNMAPPED: if stage2_active.get("kind") == "candidate": stage2_invalid["n"] += 1 if stage2_invalid["n"] > 200: print("[!] Too many stage2 invalid memory events; stopping candidate") return False else: # Stage1 tends to use exception-driven control flow. If we start *executing* # out of mapped/expected regions, we usually drift into zero pages and spin # on 'add byte ptr [eax], al'. Stop early and let the post-run dump capture # whatever was unpacked to 0x00400000 so far. if not in_expected_exec(address): print(f"[!] Unmapped fetch in stage1 at 0x{address:08X}; stopping stage1 execution") return False # During stage2, auto-map any missing page to allow unpacking. page = align_down(address, 0x1000) try: mu.mem_map(page, 0x1000) return True except Exception: pass # If stack access faulted, map around ESP. try: esp = mu.reg_read(UC_X86_REG_ESP) if abs(address - esp) < 0x20000: page = align_down(address, 0x1000) mu.mem_map(page, 0x1000) return True except Exception: pass # Some stages use a high stack / bogus pointers in the 0xA0-0xAF range. if 0xA0000000 <= address < 0xB0000000: try: page = align_down(address, 0x1000) mu.mem_map(page, 0x1000) return True except Exception: pass elif access == UC_MEM_FETCH_UNMAPPED and address < size_image: rebased = image_base + address rebased_insn = decode_insn(rebased) if rebased_insn: print(f"[!] Disasm (rebased): {rebased_insn.mnemonic} {rebased_insn.op_str}") try: mu.reg_write(UC_X86_REG_EIP, rebased) print(f"[!] Rebased EIP to 0x{rebased:08X}") return True except Exception: pass elif access == UC_MEM_FETCH_UNMAPPED: # Last-resort: treat as an external call returning 0. try: esp = mu.reg_read(UC_X86_REG_ESP) ret = struct.unpack(" returning to 0x{ret:08X} with EAX=0") return True except Exception: pass return False # stop emu def on_mem_write(mu, access, address, size, value, user_data): # Track last heap write as candidate shellcode buffer. for base, sz in stubs.allocations.items(): if base <= address < base + sz: last_write["addr"] = address last_write["size"] = size break if stage2_active["on"]: stage2_writes["n"] += 1 stage2_writes["last_insn"] = insn_count["n"] # Track page write counts page = align_down(address, 0x1000) counts = stage2_page_writes["counts"] counts[page] = counts.get(page, 0) + 1 if stage2_page_writes["best_page"] is None or counts[page] > counts.get(stage2_page_writes["best_page"], 0): stage2_page_writes["best_page"] = page # Track possible PE writes in the 0x00400000 region. if 0x00400000 <= address < 0x01400000: # If we haven't confirmed a PE yet, check for MZ/PE at base. if stage2_pe_write["addr"] is None: try: base = 0x00400000 hdr = bytes(mu.mem_read(base, 0x200)) if hdr[:2] == b"MZ": e = struct.unpack_from(" 0 and address < 0x80000000: b0 = bytes(mu.mem_read(address - 1, 2)) if b0 == b"MZ": stage2["found"] = True stage2["addr"] = address - 1 except Exception: pass # If we wrote a larger chunk, scan for MZ/PE and record candidates. if size >= 0x200: try: buf = bytes(mu.mem_read(address, min(size, 0x2000))) idx = buf.find(b"MZ") if idx != -1: off = address + idx try: e_lfanew = struct.unpack_from(" 5000: print("[!] Too many stage1 exception-skips; stopping stage1") raise try: nxt = int(stage1_fault.get("next_eip") or (cur_eip + 1)) print( f"[!] Stage1 exc-skip#{stage1_exc_skips['n']} " f"@0x{cur_eip:08X} addr=0x{int(stage1_fault.get('addr') or 0):08X} " f":: {stage1_fault.get('disasm')}" ) mu.reg_write(UC_X86_REG_EIP, nxt) mu.emu_start(mu.reg_read(UC_X86_REG_EIP), 0, timeout=0, count=0) return except Exception: raise if stage1_ran["done"] and in_stage1(cur_eip) and ( "UC_ERR_INSN_INVALID" in str(e) or "UC_ERR_EXCEPTION" in str(e) ): budget = 2048 while budget > 0: budget -= 1 cur = mu.reg_read(UC_X86_REG_EIP) insn = decode_insn(cur) step = insn.size if insn else 1 mu.reg_write(UC_X86_REG_EIP, cur + step) if budget in (2047, 2000, 1500, 1000, 500) or (budget % 256 == 0): print(f"[!] Stage1 recovery: advanced EIP 0x{cur:08X} -> 0x{cur+step:08X} (step={step})") try: mu.emu_start(mu.reg_read(UC_X86_REG_EIP), 0, timeout=0, count=0) break except UcError as e2: # Keep skipping while errors remain inside stage1. try: cur2 = mu.reg_read(UC_X86_REG_EIP) except Exception: cur2 = 0 if not in_stage1(cur2) or ("UC_ERR_INSN_INVALID" not in str(e2) and "UC_ERR_EXCEPTION" not in str(e2)): raise else: print("[!] Stage1 recovery budget exhausted; stopping.") raise # If we recovered, don't fall through to generic handler. return cur_eip = mu.reg_read(UC_X86_REG_EIP) if cur_eip == image_base + 0x220E and "UC_ERR_EXCEPTION" in str(e): esp = mu.reg_read(UC_X86_REG_ESP) ret = struct.unpack(" best: best = score best_addr = b2 best_buf = buf if best_addr is not None: with open(out_stage2_path, "wb") as f: f.write(best_buf) try: _, ent = score_buffer(best_buf) except Exception: ent = 0.0 print(f"[+] Wrote candidate heap buffer to {out_stage2_path} at 0x{best_addr:08X} (entropy={ent:.3f})") if returned["done"] and not dumped["done"]: print("[!] LogWrite returned before breakpoint.") if not dumped["done"]: print("[!] Breakpoint was not reached. See notes below for improving stubs.") def find_pe_in_region(start, size, label): try: buf = bytes(mu.mem_read(start, size)) except Exception: return None idx = buf.find(b"MZ") if idx == -1: return None # look for PE sig near e_lfanew try: e_lfanew = struct.unpack_from("= max_len: break if insn_count == 0: return None density = total / max_len # Small bonuses for common prologue-ish instructions bonus = 0 if window[:2] in (b"\x55\x8B", b"\x60\x8B"): bonus += 2 if window[:1] == b"\xFC": # cld bonus += 1 score = density * 100 + insn_count + bonus - bad return score def find_shellcode_entry(buf): # Heuristic: scan for plausible shellcode entry points and rank them. max_scan = min(len(buf) - 6, 0x40000) if max_scan <= 0: return [(0, 0.0)] seed_candidates = set() for i in range(max_scan): # call $+5; pop reg if buf[i] == 0xE8 and buf[i+1:i+5] == b"\x00\x00\x00\x00" and 0x58 <= buf[i+5] <= 0x5F: seed_candidates.add(i) # pushad; mov ebp, esp if buf[i:i+3] == b"\x60\x8B\xEC": seed_candidates.add(i) # cld; call if buf[i] == 0xFC and i + 5 < max_scan and buf[i+1] == 0xE8: seed_candidates.add(i) # sub esp, imm8/imm32; call if buf[i] == 0x83 and buf[i+1] == 0xEC and i + 4 < max_scan: seed_candidates.add(i) # Add a light-weight scan every 16 bytes to find dense instruction regions. for i in range(0, max_scan, 16): sc = score_entry(buf, i) if sc is not None and sc > 40: seed_candidates.add(i) # Always consider offset 0 seed_candidates.add(0) ranked = [] for off in seed_candidates: sc = score_entry(buf, off) if sc is None: continue ranked.append((sc, off)) ranked.sort(reverse=True) return [(off, sc) for sc, off in ranked[:40]] # Stage2 emulation: execute decrypted shellcode and look for PE drop. if enc_payload and mode in ("stage2", "full") and stop_at != "stage2" and not stage1_ran["done"]: try: stage2_base = 0x40000000 try: out_ptr = struct.unpack(" prev: stage2_best_shellcode["score"] = score # type: ignore stage2_best_shellcode["addr"] = base print(f"[+] Stage2 best-page candidate at 0x{base:08X} score={score:.1f} ent={ent:.3f} insn={insn_count2}") except Exception: pass if stage2["found"]: break except Exception as e: stage2_active["on"] = False stage2_active["kind"] = "none" if stage2_trace["fh"]: stage2_trace["fh"].close() stage2_trace["fh"] = None print(f"[!] Stage2 emulation error at offset 0x{off:X}: {e}") except Exception as e: print(f"[!] Stage2 emulation error: {e}") # Scan likely regions for a PE header candidates = [] candidates.append(find_pe_in_region(0x00200000, min(DUMP_MAX, len(enc_payload)), "scratch")) candidates.append(find_pe_in_region(stage2_base, min(DUMP_MAX, len(enc_payload)), "stage2_base")) for base, sz in stubs.allocations.items(): candidates.append(find_pe_in_region(base, min(sz, DUMP_MAX), "heap")) candidates = [c for c in candidates if c] if stage2_candidates: try: # pick the first candidate that lies in a mapped region dump_addr = None for cand in stage2_candidates: if in_stage2_region(cand) or in_allocations(cand): dump_addr = cand break if dump_addr is None: raise RuntimeError("No mapped stage2 candidates") dump_len = 0x100000 blob = bytes(mu.mem_read(dump_addr, dump_len)) with open(out_stage2_path, "wb") as f: f.write(blob) print(f"[+] Wrote stage2 payload to {out_stage2_path} at 0x{dump_addr:08X} (candidate)") except Exception as e: print(f"[!] Failed to dump stage2 payload: {e}") elif candidates: pe_base, pe_sig, label = candidates[0] try: # dump 1MB from PE base dump_addr = pe_base dump_len = 0x100000 blob = bytes(mu.mem_read(dump_addr, dump_len)) with open(out_stage2_path, "wb") as f: f.write(blob) print(f"[+] Wrote stage2 payload to {out_stage2_path} at 0x{dump_addr:08X} ({label})") except Exception as e: print(f"[!] Failed to dump stage2 payload: {e}") elif stage2_pe_write["addr"]: try: dump_addr = stage2_pe_write["addr"] dump_len = stage2_pe_write["size"] or 0x100000 blob = bytes(mu.mem_read(dump_addr, dump_len)) with open(out_stage2_path, "wb") as f: f.write(blob) print(f"[+] Wrote stage2 payload to {out_stage2_path} at 0x{dump_addr:08X} (pe-region, size=0x{dump_len:X})") except Exception as e: print(f"[!] Failed to dump stage2 payload from PE region: {e}") elif stage2_best_shellcode["addr"] is not None: try: dump_addr = stage2_best_shellcode["addr"] dump_len = STAGE2_DUMP_SIZE blob = bytes(mu.mem_read(dump_addr, dump_len)) with open(out_stage2_path, "wb") as f: f.write(blob) print(f"[+] Wrote stage2 shellcode to {out_stage2_path} at 0x{dump_addr:08X} (size=0x{dump_len:X})") except Exception as e: print(f"[!] Failed to dump stage2 shellcode from best page: {e}") elif stage2["found"] and stage2["addr"]: try: dump_addr = stage2["addr"] dump_len = 0x100000 blob = bytes(mu.mem_read(dump_addr, dump_len)) with open(out_stage2_path, "wb") as f: f.write(blob) print(f"[+] Wrote stage2 payload to {out_stage2_path} at 0x{dump_addr:08X}") except Exception as e: print(f"[!] Failed to dump stage2 payload: {e}") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Emulate log.dll LogWrite/decrypt payload.") parser.add_argument("dll", help="Path to log.dll") parser.add_argument("--payload", default="encrypted_shellcode.bin", help="Encrypted payload path") parser.add_argument("--input-dir", default="input", help="Directory to resolve bare input filenames from") parser.add_argument("--output-dir", default="output", help="Directory to write outputs to (created if missing)") parser.add_argument("--mode", choices=["decrypt", "logwrite", "stage2", "full"], default="full", help="Which stages to run") parser.add_argument("--stop-at", choices=["none", "init", "decrypt", "break", "stage2"], default="none", help="Early stop point") parser.add_argument("--stdout-log", default="emu_stdout.log", help="Write stdout to this file (set to 'none' to disable)") args = parser.parse_args() dll_path = resolve_input_path(args.dll, args.input_dir) payload_path = resolve_input_path(args.payload, args.input_dir) os.makedirs(args.output_dir, exist_ok=True) stdout_log_path = resolve_output_path(args.stdout_log, args.output_dir) if stdout_log_path and stdout_log_path.lower() != "none": try: log_fh = open(stdout_log_path, "w") sys.stdout = TeeStdout(sys.stdout, log_fh) except Exception as e: print(f"[!] Failed to open stdout log {stdout_log_path}: {e}") emulate_and_dump(dll_path, payload_path, args.mode, args.stop_at, args.output_dir)