{ "cells": [ { "cell_type": "markdown", "id": "26fc4cc3", "metadata": {}, "source": [ "# Chrysalis Unpacking Walkthrough (Notebook)\n", "\n", "This notebook demonstrates the **offline + emulation-assisted** steps we used to unpack the Lotus Blossom/Chrysalis sample chain described by Rapid7.\n", "\n", "It focuses on three practical artifacts:\n", "- `input/encrypted_shellcode.bin` (Rapid7 `BluetoothService`): extract + decrypt RC4 config (offline)\n", "- `input/log.dll`: emulate `LogWrite` to decrypt the stage1 buffer (Unicorn-based, still 100% Python)\n", "- Decrypt and materialize the **main module** into a PE-like memory image / patched PE (offline transform)\n", "\n", "## Requirements\n", "- Install dependencies into the same Python environment your Jupyter kernel uses: `unicorn`, `pefile`, `capstone`, `bokeh` (see `requirements.txt`).\n", "- Inputs must exist under `input/`:\n", " - `input/log.dll`\n", " - `input/encrypted_shellcode.bin`\n", " - `input/BluetoothService.exe`\n", "\n", "Outputs are written under `output/` (created if missing).\n", "\n", "## Optional plots\n", "This notebook tries to use:\n", "- `bokeh` if available\n", "- else it will fall back to text summaries.\n", "\n", "If you want interactive plots, install `bokeh` into the same Python environment your Jupyter kernel is using.\n" ] }, { "cell_type": "code", "execution_count": null, "id": "26bc66d2", "metadata": { "execution": { "iopub.execute_input": "2026-02-22T01:34:44.100523Z", "iopub.status.busy": "2026-02-22T01:34:44.100376Z", "iopub.status.idle": "2026-02-22T01:34:44.109489Z", "shell.execute_reply": "2026-02-22T01:34:44.109199Z" } }, "outputs": [], "source": [ "from __future__ import annotations\n", "\n", "import hashlib\n", "import struct\n", "import sys\n", "from pathlib import Path\n", "\n", "# Ensure repo root is on sys.path even if Jupyter started in notebooks/.\n", "_cwd = Path.cwd().resolve()\n", "for _d in [_cwd, *_cwd.parents]:\n", " if (_d / 'scripts' / 'chrysalis_notebook_lib.py').exists():\n", " sys.path.insert(0, str(_d / 'scripts'))\n", " break\n", "\n", "from chrysalis_notebook_lib import (\n", " ArgStruct,\n", " ConfigExtractor,\n", " MainModuleMaterializer,\n", " Stage1Decryptor,\n", " find_repo_root,\n", " sha256_bytes,\n", " shannon_entropy,\n", ")\n", "\n", "ROOT = find_repo_root(Path.cwd())\n", "INPUT_DIR = ROOT / 'input'\n", "OUTPUT_DIR = ROOT / 'output'\n", "OUTPUT_DIR.mkdir(parents=True, exist_ok=True)\n", "\n", "LOG_DLL = INPUT_DIR / 'log.dll'\n", "ENC_SHELLCODE = INPUT_DIR / 'encrypted_shellcode.bin'\n", "CONTAINER_EXE = INPUT_DIR / 'BluetoothService.exe'\n", "\n", "for _p in [LOG_DLL, ENC_SHELLCODE, CONTAINER_EXE]:\n", " assert _p.exists(), f'Missing input file: {_p}'\n", "\n", "def sha256_path(p: Path) -> str:\n", " return hashlib.sha256(p.read_bytes()).hexdigest()\n", "\n", "print('ROOT:', ROOT)\n", "print('log.dll sha256:', sha256_path(LOG_DLL))\n", "print('encrypted_shellcode.bin sha256:', sha256_path(ENC_SHELLCODE))\n", "print('BluetoothService.exe sha256:', sha256_path(CONTAINER_EXE))\n" ] }, { "cell_type": "code", "execution_count": null, "id": "07aa7abe", "metadata": { "execution": { "iopub.execute_input": "2026-02-22T01:34:44.110975Z", "iopub.status.busy": "2026-02-22T01:34:44.110850Z", "iopub.status.idle": "2026-02-22T01:34:44.116523Z", "shell.execute_reply": "2026-02-22T01:34:44.116076Z" } }, "outputs": [], "source": [ "def sha256_bytes(b: bytes) -> str:\n", " return hashlib.sha256(b).hexdigest()\n", "\n", "def sha256_path(p: Path) -> str:\n", " return sha256_bytes(p.read_bytes())\n", "\n", "def shannon_entropy(b: bytes) -> float:\n", " if not b:\n", " return 0.0\n", " counts = [0] * 256\n", " for x in b:\n", " counts[x] += 1\n", " import math\n", " n = len(b)\n", " ent = 0.0\n", " for c in counts:\n", " if c == 0:\n", " continue\n", " p = c / n\n", " ent -= p * math.log2(p)\n", " return ent\n", "\n", "def hexdump(b: bytes, start: int = 0, length: int = 0x100) -> str:\n", " chunk = b[start:start+length]\n", " out = []\n", " for i in range(0, len(chunk), 16):\n", " row = chunk[i:i+16]\n", " hs = \" \".join(f\"{x:02x}\" for x in row)\n", " asc = \"\".join(chr(x) if 32 <= x < 127 else \".\" for x in row)\n", " out.append(f\"{start+i:08x} {hs:<47} {asc}\")\n", " return \"\\n\".join(out)\n", "\n", "print(\"log.dll sha256:\", sha256_path(LOG_DLL))\n", "print(\"encrypted_shellcode.bin sha256:\", sha256_path(ENC_SHELLCODE))\n", "print(\"BluetoothService.exe sha256:\", sha256_path(CONTAINER_EXE))" ] }, { "cell_type": "markdown", "id": "0062538a", "metadata": {}, "source": [ "## 1) Config Extraction + RC4 Decryption (Offline)\n", "\n", "Rapid7 notes the encrypted configuration is stored in `BluetoothService` (our `input/encrypted_shellcode.bin`) at:\n", "- offset `0x30808`\n", "- size `0x980`\n", "- RC4 key `qwhvb^435h&*7`\n", "\n", "We'll implement RC4 in Python and show recovered plaintext strings." ] }, { "cell_type": "code", "execution_count": null, "id": "751fbe2b", "metadata": { "execution": { "iopub.execute_input": "2026-02-22T01:34:44.117786Z", "iopub.status.busy": "2026-02-22T01:34:44.117715Z", "iopub.status.idle": "2026-02-22T01:34:44.121391Z", "shell.execute_reply": "2026-02-22T01:34:44.121069Z" } }, "outputs": [], "source": [ "def rc4_crypt(data: bytes, key: bytes) -> bytes:\n", " # Classic RC4 KSA+PRGA\n", " S = list(range(256))\n", " j = 0\n", " for i in range(256):\n", " j = (j + S[i] + key[i % len(key)]) & 0xFF\n", " S[i], S[j] = S[j], S[i]\n", " i = 0\n", " j = 0\n", " out = bytearray(len(data))\n", " for n, x in enumerate(data):\n", " i = (i + 1) & 0xFF\n", " j = (j + S[i]) & 0xFF\n", " S[i], S[j] = S[j], S[i]\n", " K = S[(S[i] + S[j]) & 0xFF]\n", " out[n] = x ^ K\n", " return bytes(out)\n", "\n", "cfg_off = 0x30808\n", "cfg_len = 0x980\n", "cfg_key = b\"qwhvb^435h&*7\"\n", "\n", "enc = ENC_SHELLCODE.read_bytes()\n", "assert cfg_off + cfg_len <= len(enc)\n", "cfg_enc = enc[cfg_off:cfg_off+cfg_len]\n", "cfg_plain = rc4_crypt(cfg_enc, cfg_key)\n", "\n", "cfg_out = OUTPUT_DIR / \"notebook_config_decrypted.bin\"\n", "cfg_out.write_bytes(cfg_plain)\n", "\n", "print(\"Encrypted cfg sha256:\", sha256_bytes(cfg_enc), \"entropy\", f\"{shannon_entropy(cfg_enc):.3f}\")\n", "print(\"Decrypted cfg sha256:\", sha256_bytes(cfg_plain), \"entropy\", f\"{shannon_entropy(cfg_plain):.3f}\")\n", "#print(\"Wrote:\", cfg_out)\n", "print()\n", "print(\"First 0x120 bytes of decrypted config:\")\n", "print(hexdump(cfg_plain, 0, 0x120))" ] }, { "cell_type": "code", "execution_count": null, "id": "f3f335cf", "metadata": { "execution": { "iopub.execute_input": "2026-02-22T01:34:44.122351Z", "iopub.status.busy": "2026-02-22T01:34:44.122290Z", "iopub.status.idle": "2026-02-22T01:34:44.124758Z", "shell.execute_reply": "2026-02-22T01:34:44.124376Z" } }, "outputs": [], "source": [ "def extract_printable_ascii(b: bytes, min_len: int = 6) -> list[str]:\n", " out = []\n", " cur = bytearray()\n", " for x in b:\n", " if 32 <= x < 127:\n", " cur.append(x)\n", " else:\n", " if len(cur) >= min_len:\n", " out.append(cur.decode(\"ascii\", errors=\"ignore\"))\n", " cur.clear()\n", " if len(cur) >= min_len:\n", " out.append(cur.decode(\"ascii\", errors=\"ignore\"))\n", " return out\n", "\n", "print(\"ASCII strings in decrypted config (min_len=6):\")\n", "for s in extract_printable_ascii(cfg_plain, 6):\n", " print(\"-\", s)" ] }, { "cell_type": "markdown", "id": "046e2cb2", "metadata": {}, "source": [ "## 2) Stage1 Decryption via log.dll (Emulation-Assisted)\n", "\n", "The stage1 buffer is decrypted by `log.dll` at runtime.\n", "\n", "Instead of re-implementing the full LCG-based decrypt routine from scratch, we run the **actual malware decryption function** inside a controlled x86 emulator (Unicorn) and dump the decrypted bytes.\n", "\n", "This is still *pure Python* end-to-end, and it's reproducible on macOS/ARM." ] }, { "cell_type": "code", "execution_count": null, "id": "b298aeac", "metadata": { "execution": { "iopub.execute_input": "2026-02-22T01:34:44.125667Z", "iopub.status.busy": "2026-02-22T01:34:44.125611Z", "iopub.status.idle": "2026-02-22T01:34:46.651900Z", "shell.execute_reply": "2026-02-22T01:34:46.651507Z" } }, "outputs": [], "source": [ "# Run the Unicorn-based log.dll emulator directly (in-kernel).\n", "stage1_runner = Stage1Decryptor()\n", "stage1_runner.run(\n", " log_dll_path=LOG_DLL,\n", " encrypted_payload_path=ENC_SHELLCODE,\n", " output_dir=OUTPUT_DIR,\n", " mode='logwrite',\n", " stop_at='none',\n", ")\n", "\n", "stage1 = OUTPUT_DIR / 'shellcode.bin'\n", "stage1_full = OUTPUT_DIR / 'shellcode_full.bin'\n", "assert stage1.exists(), f'Missing {stage1}'\n", "assert stage1_full.exists(), f'Missing {stage1_full}'\n", "\n", "b1 = stage1.read_bytes()\n", "bfull = stage1_full.read_bytes()\n", "print('stage1 len:', len(b1), 'sha256:', sha256_bytes(b1), 'entropy:', f'{shannon_entropy(b1):.3f}')\n", "print('stage1_full len:', len(bfull), 'sha256:', sha256_bytes(bfull), 'entropy:', f'{shannon_entropy(bfull):.3f}')\n" ] }, { "cell_type": "markdown", "id": "a0d3f233", "metadata": {}, "source": [ "## 3) Main Module Materialization (Offline Transform)\n", "\n", "Rapid7 describes the main module decryption as a simple byte transform using key `gQ2JR&9;`.\n", "\n", "The repo script `offline_extract_stage2.py` does two useful things:\n", "- injects the stage1-provided regions into a known PE container (`BluetoothService.exe`) at 5 RVAs\n", "- applies the decrypt transform to those regions\n", "\n", "We generate two artifacts:\n", "- `output/main_module_patched.exe` (easy to open as PE, signature invalid after modification)\n", "- `output/main_module_mem.bin` (decrypted memory image; often the best artifact for RE)\n" ] }, { "cell_type": "markdown", "id": "3b69b1db", "metadata": {}, "source": [ "### 3a) The Byte Transform (Rapid7 key `gQ2JR&9;`)\n", "\n", "Rapid7's pseudocode per byte is:\n", "\n", "```\n", "x = x + k\n", "x = x ^ k\n", "x = x - k\n", "```\n", "\n", "A useful property (for this sample) is that this transform is its own inverse.\n", "So applying it twice yields the original byte stream again.\n", "\n", "(In our scripts you may see “rounds=5”; for this transform, odd rounds behave like 1 round, even rounds undo it.)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "385b9649", "metadata": { "execution": { "iopub.execute_input": "2026-02-22T01:34:46.653190Z", "iopub.status.busy": "2026-02-22T01:34:46.653105Z", "iopub.status.idle": "2026-02-22T01:34:46.656318Z", "shell.execute_reply": "2026-02-22T01:34:46.656035Z" } }, "outputs": [], "source": [ "KEY = b\"gQ2JR&9;\"\n", "\n", "def bxform_byte(x: int, k: int) -> int:\n", " x = (x + k) & 0xFF\n", " x = x ^ k\n", " x = (x - k) & 0xFF\n", " return x\n", "\n", "def bxform(data: bytes, key: bytes, rounds: int = 1) -> bytes:\n", " out = bytearray(data)\n", " for _ in range(rounds):\n", " for i in range(len(out)):\n", " k = key[i & 7]\n", " out[i] = bxform_byte(out[i], k)\n", " return bytes(out)\n", "\n", "# Quick sanity: involution property on a short prefix\n", "test = b\"hello world\" * 32\n", "t1 = bxform(test, KEY, rounds=1)\n", "t2 = bxform(t1, KEY, rounds=1)\n", "assert t2 == test\n", "\n", "print(\"byte-transform sanity: ok (transform is an involution)\")\n", "print(\"rounds=1 == rounds=5 ?\", bxform(test, KEY, rounds=1) == bxform(test, KEY, rounds=5))\n", "print(\"rounds=1 == rounds=3 ?\", bxform(test, KEY, rounds=1) == bxform(test, KEY, rounds=3))\n", "print(\"rounds=2 restores original ?\", bxform(test, KEY, rounds=2) == test)" ] }, { "cell_type": "code", "execution_count": null, "id": "9e68268a", "metadata": { "execution": { "iopub.execute_input": "2026-02-22T01:34:46.657384Z", "iopub.status.busy": "2026-02-22T01:34:46.657323Z", "iopub.status.idle": "2026-02-22T01:34:46.680285Z", "shell.execute_reply": "2026-02-22T01:34:46.679839Z" } }, "outputs": [], "source": [ "# This arg-struct is sample-specific and comes from stage1 runtime data.\n", "# For the Rapid7 sample it is stable and matches the described layout (5 regions).\n", "ARG_STRUCT = [\n", " 0x116A7, 0x5, 0x1000, 0x24000, 0x2D000,\n", " 0x30000, 0x31000, 0x0, 0x0, 0x23000,\n", " 0x8E00, 0xC00, 0x200, 0x1C00, 0x0,\n", " 0x0, 0x400000, 0x0, 0x31000, 0x2C5D0,\n", " 0x30001000, 0x100014C0, 0x100014D0, 0x30000000, 0x0,\n", "]\n", "arg_struct = ArgStruct.from_iterable(ARG_STRUCT)\n", "\n", "patched_exe = OUTPUT_DIR / 'main_module_patched.exe'\n", "mem_img = OUTPUT_DIR / 'main_module_mem.bin'\n", "\n", "mat = MainModuleMaterializer(key=b'gQ2JR&9;', rounds=1)\n", "info = mat.materialize(\n", " container_pe_path=CONTAINER_EXE,\n", " stage1_full_path=stage1_full,\n", " arg_struct=arg_struct,\n", " out_patched_pe_path=patched_exe,\n", " out_mem_image_path=mem_img,\n", ")\n", "\n", "print('patched_exe:', patched_exe, 'sha256:', info['patched_pe_sha256'], 'size:', patched_exe.stat().st_size)\n", "print('mem_img: ', mem_img, 'sha256:', info['mem_image_sha256'], 'size:', mem_img.stat().st_size)\n", "print('regions:', [(hex(r), hex(s)) for (r,s) in info['regions']])\n" ] }, { "cell_type": "code", "execution_count": null, "id": "d18a8300", "metadata": { "execution": { "iopub.execute_input": "2026-02-22T01:34:46.681669Z", "iopub.status.busy": "2026-02-22T01:34:46.681585Z", "iopub.status.idle": "2026-02-22T01:34:46.684491Z", "shell.execute_reply": "2026-02-22T01:34:46.684124Z" } }, "outputs": [], "source": [ "def parse_pe_minimal(path: Path):\n", " b = path.read_bytes()\n", " if b[:2] != b\"MZ\":\n", " raise ValueError(\"Not MZ\")\n", " pe = struct.unpack_from(\"