#!/usr/bin/env python3 import json import time import threading import hashlib import os import sys from datetime import datetime class ExploitLogger: def __init__(self, debug=False): self.debug = debug self.session_id = hashlib.md5(str(time.time()).encode()).hexdigest()[:8] self.log_file = f"exploit_log_{self.session_id}.json" self.logs = [] self.lock = threading.Lock() def _log(self, level, message, **kwargs): timestamp = datetime.now().isoformat() log_entry = { "timestamp": timestamp, "session_id": self.session_id, "level": level, "message": message, "data": kwargs } with self.lock: self.logs.append(log_entry) if self.debug or level in ["ERROR", "CRITICAL", "SUCCESS"]: print(f"[{timestamp}] [{level}] {message}") if kwargs: for key, value in kwargs.items(): print(f" {key}: {value}") def info(self, message, **kwargs): self._log("INFO", message, **kwargs) def warning(self, message, **kwargs): self._log("WARNING", message, **kwargs) def error(self, message, **kwargs): self._log("ERROR", message, **kwargs) def critical(self, message, **kwargs): self._log("CRITICAL", message, **kwargs) def success(self, message, **kwargs): self._log("SUCCESS", message, **kwargs) def debug(self, message, **kwargs): if self.debug: self._log("DEBUG", message, **kwargs) def save_logs(self): try: with open(self.log_file, 'w') as f: json.dump(self.logs, f, indent=2) return self.log_file except Exception as e: print(f"Failed to save logs: {e}") return None def get_logs(self, level=None): if level: return [log for log in self.logs if log["level"] == level] return self.logs.copy() class PerformanceMonitor: def __init__(self): self.timings = {} self.active_timers = {} self.lock = threading.Lock() def start_monitoring(self, operation): with self.lock: self.active_timers[operation] = time.time() def end_monitoring(self, operation): with self.lock: if operation in self.active_timers: start_time = self.active_timers.pop(operation) duration = time.time() - start_time if operation not in self.timings: self.timings[operation] = [] self.timings[operation].append(duration) return duration return None def get_average_time(self, operation): if operation in self.timings and self.timings[operation]: return sum(self.timings[operation]) / len(self.timings[operation]) return None def get_total_time(self, operation): if operation in self.timings: return sum(self.timings[operation]) return None def get_operation_count(self, operation): if operation in self.timings: return len(self.timings[operation]) return 0 def get_all_stats(self): stats = {} for operation, times in self.timings.items(): if times: stats[operation] = { "count": len(times), "total_time": sum(times), "average_time": sum(times) / len(times), "min_time": min(times), "max_time": max(times) } return stats class MetricsCollector: def __init__(self): self.metrics = { "connection_attempts": 0, "connection_successes": 0, "kernel_leaks": 0, "privilege_escalations": 0, "payload_injections": 0, "persistence_attempts": 0, "persistence_successes": 0, "msr_read_attempts": 0, "msr_read_successes": 0, "msr_write_attempts": 0, "msr_write_successes": 0, "errors": 0, "crashes": 0 } self.lock = threading.Lock() def increment(self, metric): with self.lock: if metric in self.metrics: self.metrics[metric] += 1 def set_metric(self, metric, value): with self.lock: self.metrics[metric] = value def get_metric(self, metric): with self.lock: return self.metrics.get(metric, 0) def get_all_metrics(self): with self.lock: return self.metrics.copy() def calculate_success_rates(self): with self.lock: rates = {} if self.metrics["connection_attempts"] > 0: rates["connection_success_rate"] = ( self.metrics["connection_successes"] / self.metrics["connection_attempts"] * 100 ) if self.metrics["persistence_attempts"] > 0: rates["persistence_success_rate"] = ( self.metrics["persistence_successes"] / self.metrics["persistence_attempts"] * 100 ) if self.metrics["msr_read_attempts"] > 0: rates["msr_read_success_rate"] = ( self.metrics["msr_read_successes"] / self.metrics["msr_read_attempts"] * 100 ) if self.metrics["msr_write_attempts"] > 0: rates["msr_write_success_rate"] = ( self.metrics["msr_write_successes"] / self.metrics["msr_write_attempts"] * 100 ) total_operations = sum([ self.metrics["connection_attempts"], self.metrics["kernel_leaks"], self.metrics["privilege_escalations"], self.metrics["payload_injections"], self.metrics["persistence_attempts"] ]) if total_operations > 0: rates["overall_success_rate"] = ( (total_operations - self.metrics["errors"]) / total_operations * 100 ) return rates class SystemHealthMonitor: def __init__(self): self.health_checks = [] def check_memory_usage(self): try: import psutil memory = psutil.virtual_memory() return { "status": "healthy" if memory.percent < 90 else "warning", "usage_percent": memory.percent, "available_gb": memory.available / (1024**3) } except: return {"status": "unknown", "error": "psutil not available"} def check_cpu_usage(self): try: import psutil cpu_percent = psutil.cpu_percent(interval=1) return { "status": "healthy" if cpu_percent < 80 else "warning", "usage_percent": cpu_percent } except: return {"status": "unknown", "error": "psutil not available"} def check_disk_space(self): try: import psutil disk = psutil.disk_usage('/') free_percent = (disk.free / disk.total) * 100 return { "status": "healthy" if free_percent > 10 else "warning", "free_percent": free_percent, "free_gb": disk.free / (1024**3) } except: return {"status": "unknown", "error": "psutil not available"} def check_process_count(self): try: import psutil process_count = len(psutil.pids()) return { "status": "healthy" if process_count < 500 else "warning", "process_count": process_count } except: return {"status": "unknown", "error": "psutil not available"} def run_all_checks(self): checks = { "memory": self.check_memory_usage(), "cpu": self.check_cpu_usage(), "disk": self.check_disk_space(), "processes": self.check_process_count() } overall_status = "healthy" for check_name, check_result in checks.items(): if check_result.get("status") == "warning": overall_status = "warning" elif check_result.get("status") == "unknown": if overall_status == "healthy": overall_status = "unknown" checks["overall_status"] = overall_status return checks class ExploitMonitor: def __init__(self, debug=False): self.logger = ExploitLogger(debug) self.perf_monitor = PerformanceMonitor() self.metrics = MetricsCollector() self.health_monitor = SystemHealthMonitor() self.start_time = None self.end_time = None def start_monitoring(self): self.start_time = time.time() self.logger.info("Exploit monitoring started", session_id=self.logger.session_id) def stop_monitoring(self): self.end_time = time.time() self.logger.info("Exploit monitoring stopped") def get_session_duration(self): if self.start_time and self.end_time: return self.end_time - self.start_time elif self.start_time: return time.time() - self.start_time return None def run_health_checks(self): self.logger.info("Running system health checks") health_status = self.health_monitor.run_all_checks() if health_status["overall_status"] == "healthy": self.logger.success("System health checks passed") return True elif health_status["overall_status"] == "warning": self.logger.warning("System health checks show warnings", health=health_status) return True else: self.logger.error("System health checks failed", health=health_status) return False def generate_reliability_report(self): success_rates = self.metrics.calculate_success_rates() performance_stats = self.perf_monitor.get_all_stats() reliability_score = 0 total_weight = 0 weights = { "connection_success_rate": 0.2, "msr_read_success_rate": 0.2, "msr_write_success_rate": 0.2, "persistence_success_rate": 0.15, "overall_success_rate": 0.25 } for metric, weight in weights.items(): if metric in success_rates: reliability_score += success_rates[metric] * weight total_weight += weight if total_weight > 0: reliability_score = reliability_score / total_weight else: reliability_score = 0 return { "reliability_score": reliability_score, "success_rates": success_rates, "performance_stats": performance_stats, "session_duration": self.get_session_duration() } def generate_final_report(self): reliability_report = self.generate_reliability_report() report = { "session_info": { "session_id": self.logger.session_id, "start_time": datetime.fromtimestamp(self.start_time).isoformat() if self.start_time else None, "end_time": datetime.fromtimestamp(self.end_time).isoformat() if self.end_time else None, "duration_seconds": self.get_session_duration() }, "metrics": { "raw_metrics": self.metrics.get_all_metrics(), "success_rates": reliability_report["success_rates"], "reliability_score": reliability_report["reliability_score"] }, "performance": reliability_report["performance_stats"], "logs": { "total_logs": len(self.logger.logs), "errors": len(self.logger.get_logs("ERROR")), "warnings": len(self.logger.get_logs("WARNING")), "successes": len(self.logger.get_logs("SUCCESS")) }, "system_health": self.health_monitor.run_all_checks() } report_file = f"exploit_report_{self.logger.session_id}.json" try: with open(report_file, 'w') as f: json.dump(report, f, indent=2) except Exception as e: self.logger.error("Failed to save final report", error=str(e)) self.logger.save_logs() return report def log_exploit_attempt(self, operation, success, details=None): if success: self.logger.success(f"{operation} completed successfully", details=details) else: self.logger.error(f"{operation} failed", details=details) self.metrics.increment("errors") def log_security_event(self, event_type, description, severity="medium"): self.logger.info(f"Security event: {event_type}", description=description, severity=severity, event_type=event_type) def log_evasion_result(self, technique, detected, details=None): if detected: self.logger.warning(f"Evasion technique detected: {technique}", details=details) else: self.logger.success(f"Evasion technique successful: {technique}", details=details) def get_real_time_stats(self): return { "session_duration": self.get_session_duration(), "current_metrics": self.metrics.get_all_metrics(), "active_operations": list(self.perf_monitor.active_timers.keys()), "recent_logs": self.logger.logs[-10:] if len(self.logger.logs) > 10 else self.logger.logs } def export_logs_csv(self): import csv csv_file = f"exploit_logs_{self.logger.session_id}.csv" try: with open(csv_file, 'w', newline='') as f: if self.logger.logs: fieldnames = ["timestamp", "session_id", "level", "message"] if self.logger.logs[0].get("data"): data_keys = set() for log in self.logger.logs: if log.get("data"): data_keys.update(log["data"].keys()) fieldnames.extend(sorted(data_keys)) writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() for log in self.logger.logs: row = { "timestamp": log["timestamp"], "session_id": log["session_id"], "level": log["level"], "message": log["message"] } if log.get("data"): row.update(log["data"]) writer.writerow(row) return csv_file except Exception as e: self.logger.error("Failed to export CSV", error=str(e)) return None