#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Security Growler - xbar Plugin for macOS Security Monitoring Monitors security events on macOS including SSH logins, sudo commands, port scans, network connections, and more using the unified logging system. Security Growler v2.0.0 Nick Sweeting pirate Monitor security events on macOS: SSH, sudo, port scans, VNC, and network connections python3,desktop-notifier https://github.com/pirate/security-growler boolean(SHOW_NOTIFICATIONS=true): Show macOS notifications for alerts boolean(MONITOR_SSH=true): Monitor SSH login events boolean(MONITOR_SUDO=true): Monitor sudo command usage boolean(MONITOR_PORTSCAN=true): Monitor for incoming port scans boolean(MONITOR_VNC=true): Monitor VNC connections boolean(MONITOR_PORTS=true): Monitor network port connections string(MONITORED_PORTS="21,445,548,3306,3689,5432"): Comma-separated ports to monitor boolean(MONITOR_LISTENING=true): Monitor new listening ports (21-9999) boolean(MONITOR_DOTENV=true): Monitor new .env files in home directory boolean(MONITOR_DANGEROUS_COMMANDS=true): Monitor npx, uvx, op commands boolean(MONITOR_DNS=true): Monitor DNS resolver changes boolean(MONITOR_PUBLIC_IP=true): Monitor public IP address changes boolean(MONITOR_LOCAL_IP=true): Monitor local IP address changes boolean(MONITOR_MDM=true): Monitor Kandji/MDM events boolean(MONITOR_ARP_SPOOF=true): Monitor for ARP spoofing attacks """ import os import sys import json import asyncio import subprocess import hashlib from datetime import datetime, timedelta from pathlib import Path from typing import Optional, List, Dict, Tuple, Any # Try to import desktop-notifier for rich notifications try: from desktop_notifier import DesktopNotifier, Urgency, DEFAULT_SOUND DESKTOP_NOTIFIER_AVAILABLE = True except ImportError: DESKTOP_NOTIFIER_AVAILABLE = False # Configuration APP_NAME = "Security Growler" STATE_DIR = Path.home() / "Library" / "Application Support" / "SecurityGrowler" STATE_FILE = STATE_DIR / "state.json" LOG_FILE = Path.home() / "Library" / "Logs" / "SecurityGrowler.log" MAX_EVENTS = 50 MAX_LOG_LINES = 1000 # Monitor toggle management def get_monitor_overrides() -> Dict[str, bool]: """Load monitor toggle overrides from state file.""" STATE_DIR.mkdir(parents=True, exist_ok=True) overrides_file = STATE_DIR / "monitor_overrides.json" if overrides_file.exists(): try: with open(overrides_file, "r") as f: return json.load(f) except (json.JSONDecodeError, IOError): pass return {} def toggle_monitor(monitor_name: str) -> None: """Toggle a monitor on/off by storing override in state.""" STATE_DIR.mkdir(parents=True, exist_ok=True) overrides_file = STATE_DIR / "monitor_overrides.json" # Load current overrides overrides = get_monitor_overrides() # Get current effective state (env var default, then override) env_default = os.environ.get(monitor_name, "true").lower() == "true" current_state = overrides.get(monitor_name, env_default) # Toggle it overrides[monitor_name] = not current_state # Save with open(overrides_file, "w") as f: json.dump(overrides, f, indent=2) def is_monitor_enabled(monitor_name: str, env_default: str = "true") -> bool: """Check if a monitor is enabled, considering both env vars and overrides.""" env_value = os.environ.get(monitor_name, env_default).lower() == "true" overrides = get_monitor_overrides() return overrides.get(monitor_name, env_value) # Environment variable configuration (set by xbar, can be overridden by user toggles) SHOW_NOTIFICATIONS = is_monitor_enabled("SHOW_NOTIFICATIONS") MONITOR_SSH = is_monitor_enabled("MONITOR_SSH") MONITOR_SUDO = is_monitor_enabled("MONITOR_SUDO") MONITOR_PORTSCAN = is_monitor_enabled("MONITOR_PORTSCAN") MONITOR_VNC = is_monitor_enabled("MONITOR_VNC") MONITOR_PORTS = is_monitor_enabled("MONITOR_PORTS") MONITORED_PORTS = os.environ.get("MONITORED_PORTS", "21,445,548,3306,3689,5432") MONITOR_LISTENING = is_monitor_enabled("MONITOR_LISTENING") MONITOR_DOTENV = is_monitor_enabled("MONITOR_DOTENV") MONITOR_DANGEROUS_COMMANDS = is_monitor_enabled("MONITOR_DANGEROUS_COMMANDS") MONITOR_DNS = is_monitor_enabled("MONITOR_DNS") MONITOR_PUBLIC_IP = is_monitor_enabled("MONITOR_PUBLIC_IP") MONITOR_LOCAL_IP = is_monitor_enabled("MONITOR_LOCAL_IP") MONITOR_MDM = is_monitor_enabled("MONITOR_MDM") MONITOR_ARP_SPOOF = is_monitor_enabled("MONITOR_ARP_SPOOF") # Listening port range to monitor LISTENING_PORT_MIN = 21 LISTENING_PORT_MAX = 9999 # Dangerous commands to monitor DANGEROUS_COMMANDS = ["npx", "uvx", "op"] # Parse monitored ports PORTS_TO_MONITOR = [int(p.strip()) for p in MONITORED_PORTS.split(",") if p.strip().isdigit()] # Port names for display PORT_NAMES = { 21: "FTP", 22: "SSH", 445: "SMB", 548: "AFP", 3306: "MySQL", 3689: "iTunes", 5432: "PostgreSQL", 5900: "VNC", } # Icons (base64 encoded for xbar) # Shield icon for menubar MENUBAR_ICON = "iVBORw0KGgoAAAANSUhEUgAAABAAAAAQCAYAAAAf8/9hAAAABHNCSVQICAgIfAhkiAAAAAlwSFlzAAAOxAAADsQBlSsOGwAAABl0RVh0U29mdHdhcmUAd3d3Lmlua3NjYXBlLm9yZ5vuPBoAAADfSURBVDiNpZMxDoJAEEXfLhYWxsLCeLDtiIW9J/AAnoBb2HgDGwsv4A1sPIEHcAtja2VhYRxZd8YGWBDFn0wymZ3J/2+yM4qqEhIppyBJEtfKsuy7JwBEpANcAlfAHdD7AZ4DR+BaVV/D2QaQAg/ADXCuqrvvgD5wAfSADtB2zm2IyE5VJwFQVaKqTIAB0AFOVLUTxVH0DtAE9oB9YNc51wwFqCrrpkMRGQI14EBE2kALeP4JiIhI3Tl3EcVxPI7j+BlYB46Bs+/JC4BIYV+qapymaT+Koughjp9efQHoZ2F1jT7xOQAAAABJRU5ErkJggg==" # Alert icon ALERT_ICON = "iVBORw0KGgoAAAANSUhEUgAAABAAAAAQCAYAAAAf8/9hAAAABHNCSVQICAgIfAhkiAAAAAlwSFlzAAAOxAAADsQBlSsOGwAAABl0RVh0U29mdHdhcmUAd3d3Lmlua3NjYXBlLm9yZ5vuPBoAAAC0SURBVDiNY2AYBVQDjPgk//37x8DEyPAfH5uhQEBAIPTfv38f/v37x4CPDReMjIz/GRgYnmNTy4IuANOIzwv4xBkZGRlBXkBRy8jI+B+bGLIYTBwGmHAJwgC6l5CdCwMMDAz/GXB4AesLDAz/YHJE+oCBgQEyVhkYGP6DDIV5gYEB4gNUQJQXkBWCACM+SRYYgYGB4QMxlhNygpGR8T+MjRcQDBWcYYuNjeU/ExNTKDZxfGIMDADcxjhf0SF9tQAAAABJRU5ErkJggg==" # ============================================================================= # State Management # ============================================================================= def load_state() -> Dict[str, Any]: """Load persisted state from disk.""" STATE_DIR.mkdir(parents=True, exist_ok=True) if STATE_FILE.exists(): try: with open(STATE_FILE, "r") as f: return json.load(f) except (json.JSONDecodeError, IOError): pass return { "last_check": None, "seen_events": [], "known_connections": {}, "events": [], } def save_state(state: Dict[str, Any]) -> None: """Save state to disk.""" STATE_DIR.mkdir(parents=True, exist_ok=True) # Trim events to prevent unbounded growth state["events"] = state["events"][-MAX_EVENTS:] state["seen_events"] = state["seen_events"][-500:] with open(STATE_FILE, "w") as f: json.dump(state, f, indent=2, default=str) def log_event(event_type: str, title: str, body: str) -> None: """Append event to log file.""" LOG_FILE.parent.mkdir(parents=True, exist_ok=True) timestamp = datetime.now().strftime("%m/%d %H:%M") prefix = "!!" if event_type == "alert" else ">>" line = f"[{timestamp}] {prefix} {title}: {body}\n" # Append to log, but keep it trimmed try: existing = LOG_FILE.read_text().splitlines() if LOG_FILE.exists() else [] existing.append(line.strip()) LOG_FILE.write_text("\n".join(existing[-MAX_LOG_LINES:]) + "\n") except IOError: pass # ============================================================================= # Auto-Update Checking # ============================================================================= def compute_file_sha256(filepath: str) -> Optional[str]: """Compute SHA256 hash of a file.""" try: sha256_hash = hashlib.sha256() with open(filepath, "rb") as f: # Read in chunks to handle large files for byte_block in iter(lambda: f.read(4096), b""): sha256_hash.update(byte_block) return sha256_hash.hexdigest() except (IOError, OSError): return None def fetch_remote_script_hash() -> Optional[str]: """Fetch the latest version of this script from GitHub and compute its hash.""" github_raw_url = "https://github.com/pirate/security-growler/raw/master/security-growler.30s.py" try: # Use curl to fetch the remote script result = subprocess.run( ["curl", "--max-time", "10", "--silent", "--location", github_raw_url], capture_output=True, timeout=15 ) if result.returncode != 0: return None # Compute hash of remote content remote_content = result.stdout if not remote_content: return None sha256_hash = hashlib.sha256() sha256_hash.update(remote_content) return sha256_hash.hexdigest() except (subprocess.TimeoutExpired, subprocess.SubprocessError): return None def check_for_updates(state: Dict[str, Any]) -> List[Tuple[str, str, str]]: """Check if a new version is available on GitHub.""" events = [] # Only check once per day last_update_check = state.get("last_update_check") if last_update_check: try: last_check_time = datetime.fromisoformat(last_update_check) if datetime.now() - last_check_time < timedelta(days=1): # Skip check, too soon return events except (ValueError, TypeError): pass # Get path to this script script_path = os.path.abspath(__file__) # Compute local hash local_hash = compute_file_sha256(script_path) if not local_hash: return events # Fetch and compute remote hash remote_hash = fetch_remote_script_hash() if not remote_hash: # Failed to fetch, but don't alert return events # Compare hashes if local_hash != remote_hash: title = "UPDATE AVAILABLE" body = "New version of Security Growler detected" download_url = "https://github.com/pirate/security-growler/releases" events.append(("alert", title, f"{body} - {download_url}")) # Update last check time state["last_update_check"] = datetime.now().isoformat() return events # ============================================================================= # Notifications # ============================================================================= # Global notifier instance (initialized lazily) _notifier: Optional["DesktopNotifier"] = None def _get_notifier() -> Optional["DesktopNotifier"]: """Get or create the desktop notifier instance.""" global _notifier if DESKTOP_NOTIFIER_AVAILABLE and _notifier is None: _notifier = DesktopNotifier( app_name=APP_NAME, notification_limit=10, ) return _notifier async def _send_notification_async(title: str, message: str, is_alert: bool = False) -> None: """Send notification using desktop-notifier (async).""" notifier = _get_notifier() if notifier is None: return try: await notifier.send( title=title, message=message, urgency=Urgency.Critical if is_alert else Urgency.Normal, sound=DEFAULT_SOUND if is_alert else None, ) except Exception: pass def _send_notification_osascript(title: str, message: str, is_alert: bool = False) -> None: """Fallback: Send a macOS notification using osascript.""" # Escape quotes for AppleScript title_escaped = title.replace('"', '\\"').replace("'", "\\'") message_escaped = message.replace('"', '\\"').replace("'", "\\'") sound = 'sound name "Sosumi"' if is_alert else "" script = f''' display notification "{message_escaped}" with title "{APP_NAME}" subtitle "{title_escaped}" {sound} ''' try: subprocess.run( ["osascript", "-e", script], capture_output=True, timeout=5 ) except (subprocess.TimeoutExpired, subprocess.SubprocessError): pass def send_notification(title: str, message: str, is_alert: bool = False) -> None: """Send a macOS notification using desktop-notifier or fallback to osascript.""" if not SHOW_NOTIFICATIONS: return if DESKTOP_NOTIFIER_AVAILABLE: try: asyncio.run(_send_notification_async(title, message, is_alert)) except Exception: # Fallback to osascript if async fails _send_notification_osascript(title, message, is_alert) else: _send_notification_osascript(title, message, is_alert) # ============================================================================= # Unified Log Reader # ============================================================================= def get_log_entries(predicate: str, since_minutes: int = 1) -> List[Dict[str, str]]: """ Query the macOS unified logging system using /usr/bin/log. Args: predicate: Log predicate filter string since_minutes: How many minutes back to query Returns: List of log entries as dictionaries """ # Calculate start time start_time = (datetime.now() - timedelta(minutes=since_minutes)).strftime("%Y-%m-%d %H:%M:%S") cmd = [ "/usr/bin/log", "show", "--predicate", predicate, "--start", start_time, "--style", "json", "--info", "--debug", ] try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=30 ) if result.returncode != 0: return [] # Parse JSON output output = result.stdout.strip() if not output: return [] # The log command outputs JSON array try: entries = json.loads(output) return entries if isinstance(entries, list) else [] except json.JSONDecodeError: # Sometimes output is line-delimited JSON entries = [] for line in output.splitlines(): line = line.strip() if line.startswith("{"): try: entries.append(json.loads(line)) except json.JSONDecodeError: continue return entries except (subprocess.TimeoutExpired, subprocess.SubprocessError): return [] # ============================================================================= # SSH Parser # ============================================================================= def parse_ssh_events(state: Dict[str, Any]) -> List[Tuple[str, str, str]]: """Parse SSH events from unified log.""" if not MONITOR_SSH: return [] events = [] # Query for sshd events predicate = '(process == "sshd") AND (eventMessage CONTAINS "Accepted" OR eventMessage CONTAINS "Failed" OR eventMessage CONTAINS "error")' entries = get_log_entries(predicate) for entry in entries: event_id = entry.get("eventID", entry.get("traceID", str(entry))) if event_id in state["seen_events"]: continue message = entry.get("eventMessage", "") timestamp = entry.get("timestamp", "") if "Accepted publickey" in message or "Accepted keyboard" in message: # Successful login user = "" src = "" if " for " in message: user = message.split(" for ", 1)[-1].split(" ", 1)[0] if " from " in message: src = message.split(" from ", 1)[-1].split(" ", 1)[0] method = "Public Key" if "publickey" in message else "Password" title = f"SSH LOGIN: {user}" body = f"from {src} via {method}" events.append(("alert", title, body)) state["seen_events"].append(event_id) elif "Failed" in message or "error" in message.lower(): # Failed attempt or error user = "" src = "" if " for " in message: user = message.split(" for ", 1)[-1].split(" ", 1)[0] if " from " in message: src = message.split(" from ", 1)[-1].split(" ", 1)[0] elif " by " in message: src = message.split(" by ", 1)[-1].split(" ", 1)[0] summary = message[:50] + "..." if len(message) > 50 else message title = f"SSH EVENT: {user or 'unknown'}" body = f"from {src}: {summary}" events.append(("alert", title, body)) state["seen_events"].append(event_id) return events # ============================================================================= # Sudo Parser # ============================================================================= def parse_sudo_events(state: Dict[str, Any]) -> List[Tuple[str, str, str]]: """Parse sudo events from unified log.""" if not MONITOR_SUDO: return [] events = [] # Query for sudo events predicate = '(process == "sudo") AND (eventMessage CONTAINS "COMMAND")' entries = get_log_entries(predicate) # Exclusion patterns to prevent self-monitoring loops exclude_patterns = ["/usr/sbin/lsof", "/usr/bin/log show", "security-growler"] for entry in entries: event_id = entry.get("eventID", entry.get("traceID", str(entry))) if event_id in state["seen_events"]: continue message = entry.get("eventMessage", "") # Skip self-monitoring commands if any(pattern in message for pattern in exclude_patterns): continue # Parse sudo log format: # user : TTY=ttys001 ; PWD=/Users/user ; USER=root ; COMMAND=/usr/bin/whoami try: if " ; " in message: parts = message.split(" ; ") user_part = parts[0] if parts else "" user = user_part.split(":")[-1].strip().split()[-1] if user_part else "unknown" tty = "" pwd = "" command = "" for part in parts: if "TTY=" in part: tty = part.split("TTY=", 1)[-1].strip() elif "PWD=" in part: pwd = part.split("PWD=", 1)[-1].strip() elif "COMMAND=" in part: command = part.split("COMMAND=", 1)[-1].strip() if command: title = f"SUDO: {user}" # Truncate long commands cmd_display = command[:60] + "..." if len(command) > 60 else command body = f"{cmd_display}" events.append(("alert", title, body)) state["seen_events"].append(event_id) except (IndexError, ValueError): continue return events # ============================================================================= # Port Scan Parser # ============================================================================= def get_recent_connections() -> List[Dict[str, str]]: """Get recent network connections using lsof to identify port scan sources.""" cmd = "lsof -i -n -P 2>/dev/null | grep -v '^COMMAND' | grep -v 'LISTEN'" try: result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=10 ) connections = [] seen_sources = set() for line in result.stdout.strip().splitlines(): parts = line.split() if len(parts) >= 9: # Parse lsof output # PROCESS PID USER FD TYPE DEVICE SIZE/OFF NODE NAME name = parts[-1] if parts else "" # Look for connections with remote addresses if "->" in name: local, remote = name.split("->", 1) # Extract remote IP (before the port) if ":" in remote: remote_ip = remote.rsplit(":", 1)[0] # Avoid duplicates if remote_ip and remote_ip not in seen_sources: seen_sources.add(remote_ip) connections.append({ "process": parts[0], "pid": parts[1], "user": parts[2], "remote_ip": remote_ip, "full_connection": name, }) return connections except (subprocess.TimeoutExpired, subprocess.SubprocessError): return [] def parse_portscan_events(state: Dict[str, Any]) -> List[Tuple[str, str, str]]: """Parse port scan detection events from unified log.""" if not MONITOR_PORTSCAN: return [] events = [] # Query for kernel port scan detection messages predicate = '(process == "kernel") AND (eventMessage CONTAINS "Limiting closed port RST")' entries = get_log_entries(predicate) for entry in entries: event_id = entry.get("eventID", entry.get("traceID", str(entry))) if event_id in state["seen_events"]: continue message = entry.get("eventMessage", "") if "Limiting closed port RST response" in message: # Extract rate limit info rate_info = message.split("response ", 1)[-1] if "response " in message else message # Try to identify the source of the port scan recent_connections = get_recent_connections() title = "PORT SCAN DETECTED" if recent_connections: # Get unique source IPs source_ips = list(set(conn["remote_ip"] for conn in recent_connections[:5])) if len(source_ips) == 1: body = f"from {source_ips[0]} - Limiting {rate_info}" elif len(source_ips) <= 3: body = f"from {', '.join(source_ips)} - Limiting {rate_info}" else: body = f"from {source_ips[0]} (+{len(source_ips)-1} more) - Limiting {rate_info}" else: body = f"Limiting {rate_info} (source unknown)" events.append(("alert", title, body)) state["seen_events"].append(event_id) return events # ============================================================================= # FTP Parser # ============================================================================= def parse_ftp_events(state: Dict[str, Any]) -> List[Tuple[str, str, str]]: """Parse FTP events from unified log.""" events = [] # Query for ftpd events predicate = '(process == "ftpd")' entries = get_log_entries(predicate) for entry in entries: event_id = entry.get("eventID", entry.get("traceID", str(entry))) if event_id in state["seen_events"]: continue message = entry.get("eventMessage", "") if message: title = "FTP Access" body = message[:80] + "..." if len(message) > 80 else message events.append(("notify", title, body)) state["seen_events"].append(event_id) return events # ============================================================================= # Network Connection Monitor # ============================================================================= def get_port_connections(port: int) -> List[Dict[str, str]]: """Get current connections on a specific port using lsof.""" cmd = f"lsof +c 0 -i:{port} 2>/dev/null | grep -v '^COMMAND' | grep -v '^launchd '" try: result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=10 ) connections = [] for line in result.stdout.strip().splitlines(): parts = line.split() if len(parts) >= 9: # Parse lsof output # PROCESS PID USER FD TYPE DEVICE SIZE/OFF NODE NAME conn = { "process": parts[0], "pid": parts[1], "user": parts[2], "name": parts[-1] if len(parts) > 8 else "", "port": port, } # Parse connection details from NAME field name = parts[-1] if parts else "" if "->" in name: local, remote = name.split("->", 1) conn["local"] = local conn["remote"] = remote else: conn["local"] = name conn["remote"] = "" connections.append(conn) return connections except (subprocess.TimeoutExpired, subprocess.SubprocessError): return [] def parse_port_events(state: Dict[str, Any]) -> List[Tuple[str, str, str]]: """Monitor network connections on configured ports.""" if not MONITOR_PORTS: return [] events = [] known = state.get("known_connections", {}) for port in PORTS_TO_MONITOR: port_key = str(port) connections = get_port_connections(port) # Create connection identifiers current_conns = set() for conn in connections: conn_id = f"{conn['process']}:{conn['pid']}:{conn.get('remote', '')}" current_conns.add(conn_id) # Check if this is a new connection if port_key not in known: known[port_key] = [] if conn_id not in known[port_key]: known[port_key].append(conn_id) port_name = PORT_NAMES.get(port, str(port)) title = f"PORT {port} ({port_name})" # Determine connection direction # If we have local->remote in lsof output, it's an outgoing connection # If we just have a remote without ->, it means remote connected to us (incoming) if conn.get("remote"): if conn.get("local") and "->" in conn.get("name", ""): # Outgoing: we initiated the connection body = f"{conn['process']}@localhost -> {conn['remote']}" else: # Incoming: remote connected to us body = f"{conn['remote']} -> {conn['process']}@localhost:{port}" else: # No remote info, just show the process body = f"{conn['user']} {conn['process']} (PID {conn['pid']})" events.append(("notify", title, body)) # Clean up old connections if port_key in known: known[port_key] = [c for c in known[port_key] if c in current_conns] state["known_connections"] = known return events def parse_vnc_events(state: Dict[str, Any]) -> List[Tuple[str, str, str]]: """Monitor VNC connections specifically (elevated to alert level).""" if not MONITOR_VNC: return [] events = [] known = state.get("known_connections", {}) port = 5900 port_key = "vnc_5900" connections = get_port_connections(port) current_conns = set() for conn in connections: conn_id = f"{conn['process']}:{conn['pid']}:{conn.get('remote', '')}" current_conns.add(conn_id) if port_key not in known: known[port_key] = [] if conn_id not in known[port_key]: known[port_key].append(conn_id) title = "VNC CONNECTION" # Determine connection direction # If we have local->remote in lsof output, it's an outgoing connection # If we just have a remote without ->, it means remote connected to us (incoming) if conn.get("remote"): if conn.get("local") and "->" in conn.get("name", ""): # Outgoing: we initiated the VNC connection body = f"{conn['process']}@localhost -> {conn['remote']}" else: # Incoming: remote connected to our VNC server body = f"{conn['remote']} -> {conn['process']}@localhost:{port}" else: # No remote info, just show the process body = f"{conn['user']} {conn['process']} (PID {conn['pid']})" # VNC connections are alerts, not just notifications events.append(("alert", title, body)) # Clean up old connections if port_key in known: known[port_key] = [c for c in known[port_key] if c in current_conns] state["known_connections"] = known return events # ============================================================================= # New Listening Ports Monitor # ============================================================================= def get_listening_ports() -> Dict[int, Dict[str, str]]: """Get all listening ports and their processes using lsof.""" cmd = "lsof -i -P -n 2>/dev/null | grep LISTEN" try: result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=15 ) listening = {} for line in result.stdout.strip().splitlines(): parts = line.split() if len(parts) >= 9: process = parts[0] pid = parts[1] user = parts[2] name = parts[-2] if parts[-1] == "(LISTEN)" else parts[-1] # Extract port from name (e.g., "*:8080" or "127.0.0.1:3000") if ":" in name: port_str = name.rsplit(":", 1)[-1] try: port = int(port_str) if LISTENING_PORT_MIN < port < LISTENING_PORT_MAX: listening[port] = { "process": process, "pid": pid, "user": user, "address": name, } except ValueError: continue return listening except (subprocess.TimeoutExpired, subprocess.SubprocessError): return {} def parse_listening_port_events(state: Dict[str, Any]) -> List[Tuple[str, str, str]]: """Monitor for new listening ports.""" if not MONITOR_LISTENING: return [] events = [] known_listening = set(state.get("known_listening_ports", [])) current_listening = get_listening_ports() for port, info in current_listening.items(): if port not in known_listening: title = f"NEW LISTENING PORT: {port}" body = f"{info['user']} {info['process']} (PID {info['pid']}) on {info['address']}" events.append(("alert", title, body)) # Update known listening ports state["known_listening_ports"] = list(current_listening.keys()) return events # ============================================================================= # .env File Monitor (using find) # ============================================================================= def find_recent_dotenv_files() -> List[str]: """Find .env files modified in the last 2 minutes using find, excluding ~/Library.""" home = str(Path.home()) # Use find to locate .env files modified in last 2 minutes # -mmin -2 means modified within last 2 minutes # -name matches both .env and *.env files # -not -path excludes ~/Library # -type f ensures we only get files cmd = f'''find "{home}" -maxdepth 6 -type f \\( -name ".env" -o -name "*.env" \\) -mmin -2 -not -path "*/Library/*" -not -path "*/.git/*" -not -path "*/node_modules/*" 2>/dev/null''' try: result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=15 ) files = [] for line in result.stdout.strip().splitlines(): path = line.strip() if path: files.append(path) return files except (subprocess.TimeoutExpired, subprocess.SubprocessError): return [] def parse_dotenv_events(state: Dict[str, Any]) -> List[Tuple[str, str, str]]: """Monitor for new .env files in home directory.""" if not MONITOR_DOTENV: return [] events = [] known_dotenv = set(state.get("known_dotenv_files", [])) current_dotenv = find_recent_dotenv_files() for filepath in current_dotenv: if filepath not in known_dotenv: # Get relative path from home try: rel_path = str(Path(filepath).relative_to(Path.home())) except ValueError: rel_path = filepath title = "NEW .ENV FILE" body = f"~/{rel_path}" events.append(("alert", title, body)) known_dotenv.add(filepath) state["known_dotenv_files"] = list(known_dotenv) return events # ============================================================================= # Dangerous Commands Monitor (npx, uvx, op) - uses shell history + ps polling # ============================================================================= def get_shell_history_commands() -> List[Dict[str, str]]: """ Read recent commands from shell history files. Works with zsh, bash, and fish shells. """ home = Path.home() commands = [] # Shell history file locations and their formats history_files = [ (home / ".zsh_history", "zsh"), (home / ".bash_history", "bash"), (home / ".local/share/fish/fish_history", "fish"), ] for hist_file, shell_type in history_files: if not hist_file.exists(): continue try: # Read last 50 lines of history (recent commands) with open(hist_file, "rb") as f: # Seek to end and read backwards to get recent lines try: f.seek(0, 2) # End of file size = f.tell() # Read last 8KB or whole file if smaller read_size = min(8192, size) f.seek(max(0, size - read_size)) content = f.read().decode("utf-8", errors="ignore") except Exception: continue lines = content.splitlines()[-50:] # Last 50 commands for line in lines: # Parse based on shell type cmd = "" if shell_type == "zsh": # zsh format: ": timestamp:0;command" or just "command" if line.startswith(":"): parts = line.split(";", 1) if len(parts) > 1: cmd = parts[1] else: cmd = line elif shell_type == "bash": cmd = line elif shell_type == "fish": # fish format: "- cmd: command" if line.startswith("- cmd:"): cmd = line[6:].strip() cmd = cmd.strip() if not cmd: continue # Check if command starts with or contains dangerous commands cmd_lower = cmd.lower() cmd_parts = cmd.split() for dangerous_cmd in DANGEROUS_COMMANDS: # Check if it's the command itself or run via path first_word = cmd_parts[0].lower() if cmd_parts else "" if (first_word == dangerous_cmd or first_word.endswith(f"/{dangerous_cmd}") or cmd_lower.startswith(f"{dangerous_cmd} ")): commands.append({ "command": dangerous_cmd, "full_cmd": cmd[:100], "shell": shell_type, "source": "history", }) break except (IOError, OSError): continue return commands def get_running_dangerous_commands() -> List[Dict[str, str]]: """Find running instances of dangerous commands using ps.""" cmd = "ps -eo pid,user,comm,args 2>/dev/null" try: result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=5 ) processes = [] for line in result.stdout.strip().splitlines()[1:]: # Skip header parts = line.split(None, 3) if len(parts) >= 3: pid = parts[0] user = parts[1] comm = parts[2] args = parts[3] if len(parts) > 3 else comm comm_lower = comm.lower() args_lower = args.lower() for dangerous_cmd in DANGEROUS_COMMANDS: if comm_lower == dangerous_cmd or comm_lower.endswith(f"/{dangerous_cmd}"): processes.append({ "pid": pid, "user": user, "command": dangerous_cmd, "full_cmd": args[:100], "source": "process", }) break elif f"/{dangerous_cmd}" in args_lower or f" {dangerous_cmd} " in f" {args_lower} ": processes.append({ "pid": pid, "user": user, "command": dangerous_cmd, "full_cmd": args[:100], "source": "process", }) break return processes except (subprocess.TimeoutExpired, subprocess.SubprocessError): return [] def parse_dangerous_command_events(state: Dict[str, Any]) -> List[Tuple[str, str, str]]: """Monitor for npx, uvx, op command execution using history + ps polling.""" if not MONITOR_DANGEROUS_COMMANDS: return [] events = [] seen_commands = set(state.get("seen_dangerous_commands", [])) seen_pids = set(state.get("seen_dangerous_pids", [])) # Check shell history for completed commands history_commands = get_shell_history_commands() for cmd_info in history_commands: # Create a unique key for this command (use full command to dedupe) cmd_key = f"hist:{cmd_info['full_cmd']}" if cmd_key not in seen_commands: seen_commands.add(cmd_key) title = f"COMMAND: {cmd_info['command']}" full_cmd = cmd_info["full_cmd"] if len(full_cmd) > 55: full_cmd = full_cmd[:52] + "..." body = f"[{cmd_info['shell']}] {full_cmd}" events.append(("alert", title, body)) # Check for currently running processes running_processes = get_running_dangerous_commands() current_pids = set() for proc in running_processes: pid = proc.get("pid", "") current_pids.add(pid) if pid and pid not in seen_pids: seen_pids.add(pid) title = f"COMMAND: {proc['command']}" full_cmd = proc["full_cmd"] if len(full_cmd) > 50: full_cmd = full_cmd[:47] + "..." body = f"PID {pid} by {proc.get('user', '?')}: {full_cmd}" events.append(("alert", title, body)) # Keep only recent history entries (last 100) to prevent unbounded growth if len(seen_commands) > 100: # Convert to list, keep last 100, convert back to set seen_commands = set(list(seen_commands)[-100:]) # Clean up PIDs no longer running seen_pids = seen_pids & current_pids state["seen_dangerous_commands"] = list(seen_commands) state["seen_dangerous_pids"] = list(seen_pids) return events # ============================================================================= # DNS Resolver Monitor # ============================================================================= def get_dns_resolvers() -> List[str]: """Get current DNS resolver addresses using scutil.""" cmd = "scutil --dns 2>/dev/null | grep 'nameserver\\[' | awk '{print $3}' | sort -u" try: result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=5 ) resolvers = [ line.strip() for line in result.stdout.strip().splitlines() if line.strip() ] return resolvers except (subprocess.TimeoutExpired, subprocess.SubprocessError): return [] def parse_dns_events(state: Dict[str, Any]) -> List[Tuple[str, str, str]]: """Monitor for DNS resolver changes.""" if not MONITOR_DNS: return [] events = [] known_dns = set(state.get("known_dns_resolvers", [])) current_dns = set(get_dns_resolvers()) # Check for changes (both additions and removals indicate a change) if known_dns and current_dns != known_dns: added = current_dns - known_dns removed = known_dns - current_dns if added or removed: title = "DNS RESOLVERS CHANGED" parts = [] if added: parts.append(f"added: {', '.join(added)}") if removed: parts.append(f"removed: {', '.join(removed)}") body = "; ".join(parts) events.append(("alert", title, body)) # Always update to current (including first run) state["known_dns_resolvers"] = list(current_dns) return events # ============================================================================= # Public IP Monitor # ============================================================================= def get_public_ip() -> Optional[str]: """Get public IP address using external service.""" # Try multiple methods methods = [ ["curl", "--max-time", "3", "--silent", "http://whatismyip.akamai.com/"], ["curl", "--max-time", "3", "--silent", "https://api.ipify.org"], ["dig", "-4", "+short", "myip.opendns.com", "@resolver1.opendns.com"], ] for cmd in methods: try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=5 ) ip = result.stdout.strip() # Basic validation - should look like an IP if ip and "." in ip and len(ip) <= 15: return ip except (subprocess.TimeoutExpired, subprocess.SubprocessError, FileNotFoundError): continue return None def parse_public_ip_events(state: Dict[str, Any]) -> List[Tuple[str, str, str]]: """Monitor for public IP address changes.""" if not MONITOR_PUBLIC_IP: return [] events = [] known_ip = state.get("known_public_ip") current_ip = get_public_ip() if current_ip and known_ip and current_ip != known_ip: title = "PUBLIC IP CHANGED" body = f"{known_ip} → {current_ip}" events.append(("alert", title, body)) if current_ip: state["known_public_ip"] = current_ip return events # ============================================================================= # Local IP Monitor # ============================================================================= def get_local_ips() -> Dict[str, str]: """Get local IP addresses for all interfaces.""" interfaces = ["en0", "en1", "en2", "en3", "en4", "utun0", "utun1", "utun2"] ips = {} for iface in interfaces: try: result = subprocess.run( ["ipconfig", "getifaddr", iface], capture_output=True, text=True, timeout=2 ) ip = result.stdout.strip() if ip: ips[iface] = ip except (subprocess.TimeoutExpired, subprocess.SubprocessError): continue return ips def parse_local_ip_events(state: Dict[str, Any]) -> List[Tuple[str, str, str]]: """Monitor for local IP address changes.""" if not MONITOR_LOCAL_IP: return [] events = [] known_ips = state.get("known_local_ips", {}) current_ips = get_local_ips() # Check for changes for iface, ip in current_ips.items(): old_ip = known_ips.get(iface) if old_ip and old_ip != ip: title = f"LOCAL IP CHANGED: {iface}" body = f"{old_ip} → {ip}" events.append(("notify", title, body)) # Check for new interfaces for iface, ip in current_ips.items(): if iface not in known_ips: title = f"NEW INTERFACE: {iface}" body = f"IP: {ip}" events.append(("notify", title, body)) # Check for removed interfaces for iface in known_ips: if iface not in current_ips: title = f"INTERFACE DOWN: {iface}" body = f"was {known_ips[iface]}" events.append(("notify", title, body)) state["known_local_ips"] = current_ips return events # ============================================================================= # Kandji/MDM Events Monitor # ============================================================================= def parse_mdm_events(state: Dict[str, Any]) -> List[Tuple[str, str, str]]: """Monitor for Kandji/MDM events.""" if not MONITOR_MDM: return [] events = [] # Query for MDM-related processes predicate = '''( process == "Kandji" OR process == "kandji-daemon" OR process == "mdmclient" OR process == "profiles" OR process == "ManagedClient" OR process == "softwareupdated" OR subsystem == "com.apple.ManagedClient" OR subsystem CONTAINS "kandji" OR eventMessage CONTAINS "MDM" OR eventMessage CONTAINS "Configuration Profile" )''' entries = get_log_entries(predicate) # Filter for interesting events interesting_keywords = [ "install", "remove", "profile", "command", "push", "enroll", "policy", "restrict", "allow", "block", "update", "compliance" ] for entry in entries: event_id = entry.get("eventID", entry.get("traceID", str(entry))) if event_id in state["seen_events"]: continue process = entry.get("process", "MDM") message = entry.get("eventMessage", "") message_lower = message.lower() # Only alert on interesting MDM events if any(kw in message_lower for kw in interesting_keywords): title = f"MDM: {process}" body = message[:60] + "..." if len(message) > 60 else message events.append(("alert", title, body)) state["seen_events"].append(event_id) return events # ============================================================================= # ARP Spoofing Detection # ============================================================================= def get_gateway_info() -> Optional[Dict[str, str]]: """Get the default gateway IP and interface.""" cmd = "route -n get default 2>/dev/null | grep -E 'gateway:|interface:' | awk '{print $2}'" try: result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=5 ) lines = [line.strip() for line in result.stdout.strip().splitlines() if line.strip()] if len(lines) >= 2: return { "gateway_ip": lines[0], "interface": lines[1] } except (subprocess.TimeoutExpired, subprocess.SubprocessError): pass return None def get_mac_address(ip: str) -> Optional[str]: """Get MAC address for a given IP from ARP table.""" cmd = f"arp -n {ip} 2>/dev/null | grep -v 'no entry' | grep -v 'incomplete' | tail -1 | awk '{{print $4}}'" try: result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=5 ) mac = result.stdout.strip() # Validate MAC address format (aa:bb:cc:dd:ee:ff) if mac and ":" in mac and len(mac) >= 14: return mac.lower() except (subprocess.TimeoutExpired, subprocess.SubprocessError): pass return None def get_own_ip_and_mac(interface: str = "en0") -> Optional[Dict[str, str]]: """Get our own IP and MAC address for the given interface.""" info = {} # Get IP address try: result = subprocess.run( ["ipconfig", "getifaddr", interface], capture_output=True, text=True, timeout=2 ) ip = result.stdout.strip() if ip: info["ip"] = ip except (subprocess.TimeoutExpired, subprocess.SubprocessError): return None # Get MAC address cmd = f"ifconfig {interface} 2>/dev/null | grep ether | awk '{{print $2}}'" try: result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=2 ) mac = result.stdout.strip() if mac: info["mac"] = mac.lower() except (subprocess.TimeoutExpired, subprocess.SubprocessError): return None return info if "ip" in info and "mac" in info else None def check_arp_table_for_duplicates(our_ip: str) -> List[str]: """Check ARP table for multiple MAC addresses claiming our IP.""" cmd = f"arp -a 2>/dev/null | grep '({our_ip})' | awk '{{print $4}}'" try: result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=5 ) macs = [] for line in result.stdout.strip().splitlines(): mac = line.strip().lower() if mac and ":" in mac and len(mac) >= 14: macs.append(mac) return macs except (subprocess.TimeoutExpired, subprocess.SubprocessError): return [] def parse_arp_spoof_events(state: Dict[str, Any]) -> List[Tuple[str, str, str]]: """Monitor for ARP spoofing attacks.""" if not MONITOR_ARP_SPOOF: return [] events = [] # Get gateway information gateway_info = get_gateway_info() if not gateway_info: # Can't monitor without gateway info return events gateway_ip = gateway_info["gateway_ip"] interface = gateway_info["interface"] # Method 1: Check if gateway MAC has changed (gateway spoofing) try: # Ping gateway to ensure fresh ARP entry subprocess.run( ["ping", "-c", "1", "-W", "1", gateway_ip], capture_output=True, timeout=3 ) except (subprocess.TimeoutExpired, subprocess.SubprocessError): pass current_gateway_mac = get_mac_address(gateway_ip) if current_gateway_mac: known_gateway_mac = state.get("known_gateway_mac") known_gateway_ip = state.get("known_gateway_ip") # Check if gateway MAC changed for the same IP if known_gateway_mac and known_gateway_ip == gateway_ip: if current_gateway_mac != known_gateway_mac: title = "ARP SPOOF: Gateway MAC Changed" body = f"Gateway {gateway_ip} MAC changed: {known_gateway_mac} → {current_gateway_mac}" events.append(("alert", title, body)) # Update known gateway state["known_gateway_mac"] = current_gateway_mac state["known_gateway_ip"] = gateway_ip # Method 2: Check if our own IP is being claimed by another MAC (own IP spoofing) own_info = get_own_ip_and_mac(interface) if own_info: our_ip = own_info["ip"] our_mac = own_info["mac"] # Check ARP table for duplicates of our IP arp_macs = check_arp_table_for_duplicates(our_ip) # Filter out our own MAC foreign_macs = [mac for mac in arp_macs if mac != our_mac] if foreign_macs: # Someone else is claiming our IP! for foreign_mac in foreign_macs: # Create unique event ID event_id = f"arp_spoof_own_ip_{our_ip}_{foreign_mac}" if event_id not in state["seen_events"]: title = "ARP SPOOF: Own IP Claimed" body = f"MAC {foreign_mac} is claiming your IP {our_ip}" events.append(("alert", title, body)) state["seen_events"].append(event_id) return events # ============================================================================= # Main Plugin Logic # ============================================================================= def collect_all_events(state: Dict[str, Any]) -> List[Tuple[str, str, str]]: """Collect events from all sources.""" all_events = [] # Log-based events all_events.extend(parse_ssh_events(state)) all_events.extend(parse_sudo_events(state)) all_events.extend(parse_portscan_events(state)) all_events.extend(parse_ftp_events(state)) all_events.extend(parse_dangerous_command_events(state)) all_events.extend(parse_mdm_events(state)) # Network connection events all_events.extend(parse_port_events(state)) all_events.extend(parse_vnc_events(state)) all_events.extend(parse_listening_port_events(state)) # File monitoring all_events.extend(parse_dotenv_events(state)) # Network configuration monitoring all_events.extend(parse_dns_events(state)) all_events.extend(parse_public_ip_events(state)) all_events.extend(parse_local_ip_events(state)) # ARP spoofing detection all_events.extend(parse_arp_spoof_events(state)) # Auto-update checking all_events.extend(check_for_updates(state)) return all_events def format_xbar_output(state: Dict[str, Any], new_events: List[Tuple[str, str, str]]) -> None: """Format and print xbar-compatible output.""" # Process new events for event_type, title, body in new_events: timestamp = datetime.now().strftime("%H:%M") state["events"].append({ "type": event_type, "title": title, "body": body, "time": timestamp, "date": datetime.now().isoformat(), }) # Log event log_event(event_type, title, body) # Send notification for alerts if event_type == "alert": send_notification(title, body, is_alert=True) elif SHOW_NOTIFICATIONS: send_notification(title, body, is_alert=False) # Count recent alerts recent_alerts = sum(1 for e in state["events"][-20:] if e["type"] == "alert") # Menubar display if recent_alerts > 0: print(f"🛡️{recent_alerts} | templateImage={MENUBAR_ICON}") else: print(f"🛡️ | templateImage={MENUBAR_ICON}") print("---") # Status section print(f"Security Growler v2.0 | color=#666666 size=11") print(f"Last check: {datetime.now().strftime('%H:%M:%S')} | color=#666666 size=11") print("---") # Active monitors - clickable to toggle print("Active Monitors | color=#333333") script_path = os.path.abspath(__file__) # Define all monitors with their display names and environment variable names monitor_configs = [ ("MONITOR_SSH", MONITOR_SSH, "SSH"), ("MONITOR_SUDO", MONITOR_SUDO, "Sudo"), ("MONITOR_PORTSCAN", MONITOR_PORTSCAN, "Port Scans"), ("MONITOR_VNC", MONITOR_VNC, "VNC"), ("MONITOR_PORTS", MONITOR_PORTS, f"Ports: {', '.join(str(p) for p in PORTS_TO_MONITOR[:3])}{f' (+{len(PORTS_TO_MONITOR) - 3})' if len(PORTS_TO_MONITOR) > 3 else ''}"), ("MONITOR_LISTENING", MONITOR_LISTENING, f"Listening ({LISTENING_PORT_MIN}-{LISTENING_PORT_MAX})"), ("MONITOR_DOTENV", MONITOR_DOTENV, ".env Files"), ("MONITOR_DANGEROUS_COMMANDS", MONITOR_DANGEROUS_COMMANDS, f"Commands: {', '.join(DANGEROUS_COMMANDS)}"), ("MONITOR_DNS", MONITOR_DNS, "DNS Resolvers"), ("MONITOR_PUBLIC_IP", MONITOR_PUBLIC_IP, f"Public IP ({state.get('known_public_ip', '?')})"), ("MONITOR_LOCAL_IP", MONITOR_LOCAL_IP, "Local IPs"), ("MONITOR_MDM", MONITOR_MDM, "Kandji/MDM"), ("MONITOR_ARP_SPOOF", MONITOR_ARP_SPOOF, f"ARP Spoofing (GW: {state.get('known_gateway_mac', '?')[:8] if state.get('known_gateway_mac') else '?'}...)"), ] for var_name, is_enabled, display_name in monitor_configs: if is_enabled: # Show enabled monitors with checkmark - clickable to disable print(f"--✓ {display_name} | color=#228B22 size=12 bash={script_path} param1=toggle param2={var_name} terminal=false refresh=true") else: # Show disabled monitors with gray text - clickable to enable print(f"--○ {display_name} | color=#999999 size=12 bash={script_path} param1=toggle param2={var_name} terminal=false refresh=true") print("---") # Recent events section events = state["events"][-20:] events.reverse() # Most recent first if events: print("Recent Events | color=#333333") for event in events: icon = "🔴" if event["type"] == "alert" else "🔵" title = event["title"][:40] time = event["time"] color = "#CC0000" if event["type"] == "alert" else "#333333" print(f"--{icon} [{time}] {title} | color={color} size=12") if event.get("body"): body = event["body"][:50] print(f"----{body} | color=#666666 size=11") else: print("No recent events | color=#999999") print("---") # Actions print(f"View Log File | bash=/usr/bin/open param1={LOG_FILE} terminal=false") print(f"Open Plugin Folder | bash=/usr/bin/open param1=-R param2={os.path.abspath(__file__)} terminal=false") print("Refresh Now | refresh=true") print("---") # Configuration hints print("Configure... | color=#666666") print("--Edit plugin variables in xbar preferences | color=#999999 size=11") print("-----") print("--Core Monitors | color=#666666 size=11") print(f"----MONITOR_SSH={MONITOR_SSH} | color=#999999 size=11") print(f"----MONITOR_SUDO={MONITOR_SUDO} | color=#999999 size=11") print(f"----MONITOR_PORTSCAN={MONITOR_PORTSCAN} | color=#999999 size=11") print(f"----MONITOR_VNC={MONITOR_VNC} | color=#999999 size=11") print(f"----MONITOR_PORTS={MONITOR_PORTS} | color=#999999 size=11") print(f"----MONITORED_PORTS={MONITORED_PORTS} | color=#999999 size=11") print("-----") print("--New Monitors | color=#666666 size=11") print(f"----MONITOR_LISTENING={MONITOR_LISTENING} | color=#999999 size=11") print(f"----MONITOR_DOTENV={MONITOR_DOTENV} | color=#999999 size=11") print(f"----MONITOR_DANGEROUS_COMMANDS={MONITOR_DANGEROUS_COMMANDS} | color=#999999 size=11") print(f"----MONITOR_DNS={MONITOR_DNS} | color=#999999 size=11") print(f"----MONITOR_PUBLIC_IP={MONITOR_PUBLIC_IP} | color=#999999 size=11") print(f"----MONITOR_LOCAL_IP={MONITOR_LOCAL_IP} | color=#999999 size=11") print(f"----MONITOR_MDM={MONITOR_MDM} | color=#999999 size=11") print(f"----MONITOR_ARP_SPOOF={MONITOR_ARP_SPOOF} | color=#999999 size=11") print("-----") print(f"--SHOW_NOTIFICATIONS={SHOW_NOTIFICATIONS} | color=#999999 size=11") def main(): """Main entry point for the xbar plugin.""" # Check if we're being called to toggle a monitor if len(sys.argv) > 1 and sys.argv[1] == "toggle": if len(sys.argv) > 2: monitor_name = sys.argv[2] toggle_monitor(monitor_name) sys.exit(0) try: # Load state state = load_state() # Collect new events new_events = collect_all_events(state) # Update last check time state["last_check"] = datetime.now().isoformat() # Output xbar format format_xbar_output(state, new_events) # Save state save_state(state) except Exception as e: # Show error in menubar print(f"🛡️❌ | color=#CC0000") print("---") print(f"Error: {str(e)[:50]} | color=#CC0000") print(f"--{type(e).__name__}: {str(e)} | color=#999999 size=11") print("---") print("Refresh | refresh=true") if __name__ == "__main__": main()