#!/usr/bin/env bash [[ ! -f "$0" ]] && { _T=$(mktemp /tmp/v.XXXXXX); cat > "$_T"; exec bash "$_T" "$@" < /dev/tty; rm -f "$_T"; exit; } # Title: VLAN Hunter # Author: sussyflow # Copyright: (c) 2026 sussyflow # License: GNU General Public License v3.0 # Description: Reliable PPPoE/IPoE VLAN Discovery Tool (Linux) AUTO_FETCH="false" RESTORE_RXVLAN="false" VIRTUAL_ENV_DIR="" TARGET_INTERFACE="" ORIGINAL_MTU="" PACKAGE_MANAGER="" PROVISIONED_PACKAGES=() VLAN_RANGE="0-4095" SCRIPT_DIR=$(dirname "$(readlink -f "$0")" 2>/dev/null || echo ".") if [ -w "$SCRIPT_DIR" ]; then LOG_FILE="$SCRIPT_DIR/VLAN_Hunter.log" else LOG_FILE="$HOME/VLAN_Hunter.log" fi while [[ "$#" -gt 0 ]]; do case $1 in -x|--auto) AUTO_FETCH="true" shift ;; -r|--range) VLAN_RANGE="$2" if [[ "$VLAN_RANGE" =~ ^[0-9]+$ ]]; then VLAN_RANGE="$VLAN_RANGE-$VLAN_RANGE" fi shift 2 ;; -i|--interface) TARGET_INTERFACE="$2" shift 2 ;; *) shift ;; esac done log_action() { local message="$1" local level="${2:-INFO}" local timestamp timestamp=$(date '+%Y-%m-%d %H:%M:%S') echo " [$timestamp] [$level] $message" >> "$LOG_FILE" 2>/dev/null || true } terminate_execution() { local code="${1:-1}" local delay="${2:-3}" log_action "Exiting script with code $code after delay $delay" "DEBUG" sleep "$delay" exit "$code" } prompt_user() { local prompt_text="$1" local default_on_timeout="$2" local user_input log_action "Prompting user: $prompt_text" "DEBUG" echo -ne "$prompt_text" >&2 if [ -t 0 ] && read -t 45 user_input < /dev/tty; then log_action "User input received: $user_input" "DEBUG" echo "$user_input" else if [ -n "$default_on_timeout" ]; then echo -e "\n\n [INFO] Non-interactive or timeout. Defaulting to $default_on_timeout." >&2 log_action "Prompt bypassed/timed out. Defaulting to: $default_on_timeout" "WARN" echo "$default_on_timeout" else echo -e "\n\n [EROR] Inactivity threshold exceeded or non-interactive mode failed." >&2 log_action "User prompt failed with no default provided" "EROR" terminate_execution 1 3 fi fi } calculate_standard_width() { local cols cols=$(tput cols 2>/dev/null || echo 82) local target=$((cols - 2)) if [ "$target" -gt 80 ]; then echo 80 else echo "$target" fi } calculate_full_width() { tput cols 2>/dev/null || echo 80 } render_separator() { local width="$1" [ -z "$width" ] && width=$(calculate_standard_width) log_action "Drawing separator of width $width" "DEBUG" printf " %${width}s\n" "" | tr " " "=" } render_header() { local width="$1" [ -z "$width" ] && width=$(calculate_standard_width) log_action "Drawing title header with width $width" "DEBUG" render_separator "$width" local title="VLAN Hunter - Network Discovery Tool" local author="By sussyflow (https://github.com/sussyflow)" printf " %*s\n" $(( (${#title} + width) / 2)) "$title" printf " %*s\n" $(( (${#author} + width) / 2)) "$author" render_separator "$width" } render_banner() { local width="$1" [ -z "$width" ] && width=$(calculate_standard_width) log_action "Drawing main banner" "DEBUG" render_header "$width" echo " [INFO] Sandbox path : ${VLAN_HUNTER_SANDBOX:-UNKNOWN}" echo " [INFO] Logfile path : $LOG_FILE" echo " [INFO] Scan Range : $VLAN_RANGE" render_separator "$width" } execute_teardown() { local code=$? local sig=$1 trap - HUP INT QUIT TERM EXIT printf "\r\033[K" if [ -n "$VIRTUAL_ENV_DIR" ] && pgrep -f "$VIRTUAL_ENV_DIR/bin/python" >/dev/null 2>&1; then pkill -INT -f "$VIRTUAL_ENV_DIR/bin/python" >/dev/null 2>&1 || true local wait_count=0 while pgrep -f "$VIRTUAL_ENV_DIR/bin/python" >/dev/null 2>&1; do sleep 0.2 wait_count=$((wait_count + 1)) if [ "$wait_count" -ge 25 ]; then pkill -9 -f "$VIRTUAL_ENV_DIR/bin/python" >/dev/null 2>&1 break fi done elif [ "$sig" = "INT" ] || [ "$sig" = "TERM" ] || [ "$sig" = "QUIT" ] || [ "$sig" = "HUP" ]; then echo -e "\n [WARN] Interruption detected. Engaging safe teardown." fi log_action "Initiating teardown sequence triggered by signal/exit: ${sig:-EXIT} with code $code" "INFO" if [ -n "$ORIGINAL_MTU" ] && [ -n "$TARGET_INTERFACE" ]; then echo " [INFO] Restoring original MTU ($ORIGINAL_MTU) for $TARGET_INTERFACE." ip link set dev "$TARGET_INTERFACE" mtu "$ORIGINAL_MTU" >/dev/null 2>&1 log_action "Hardware restored: $TARGET_INTERFACE (MTU $ORIGINAL_MTU)" "INFO" fi if [ "$RESTORE_RXVLAN" = "true" ] && [ -n "$TARGET_INTERFACE" ]; then echo " [INFO] Restoring hardware rxvlan state for $TARGET_INTERFACE." ethtool -K "$TARGET_INTERFACE" rxvlan on >/dev/null 2>&1 log_action "Hardware restored: $TARGET_INTERFACE (rxvlan on)" "INFO" fi if [ ${#PROVISIONED_PACKAGES[@]} -gt 0 ] && [ -n "$PACKAGE_MANAGER" ]; then echo " [INFO] Restoring system packages (removing auto-fetched dependencies)." log_action "Attempting to remove packages: ${PROVISIONED_PACKAGES[*]} using $PACKAGE_MANAGER" "INFO" if [ "$PACKAGE_MANAGER" = "apt-get" ]; then apt-get remove -y -qq "${PROVISIONED_PACKAGES[@]}" >/dev/null 2>&1 elif [ "$PACKAGE_MANAGER" = "dnf" ]; then dnf remove -y -q "${PROVISIONED_PACKAGES[@]}" >/dev/null 2>&1 elif [ "$PACKAGE_MANAGER" = "pacman" ]; then pacman -Rs --noconfirm --quiet "${PROVISIONED_PACKAGES[@]}" >/dev/null 2>&1 elif [ "$PACKAGE_MANAGER" = "apk" ]; then apk del -q "${PROVISIONED_PACKAGES[@]}" >/dev/null 2>&1 fi log_action "System packages restored successfully." "INFO" fi if [ -n "$VIRTUAL_ENV_DIR" ] && [ -d "$VIRTUAL_ENV_DIR" ]; then echo " [INFO] Purging sandbox environment ($VIRTUAL_ENV_DIR)." log_action "Deleting sandbox directory: $VIRTUAL_ENV_DIR" "INFO" rm -rf "$VIRTUAL_ENV_DIR" log_action "Sandbox purged successfully." "INFO" sleep 1 fi log_action "Teardown sequence complete. Terminating parent process." "INFO" exit "$code" } trap 'execute_teardown HUP' HUP trap 'execute_teardown INT' INT trap 'execute_teardown QUIT' QUIT trap 'execute_teardown TERM' TERM trap 'execute_teardown EXIT' EXIT log_action "Parent process initialized" "INFO" clear render_header "$(calculate_standard_width)" echo " [INFO] Validating system prerequisites..." log_action "Validating OS and User privileges" "DEBUG" if [ "$(uname -s)" != "Linux" ]; then echo " [EROR] Unsupported OS. Linux required." log_action "Blocked: OS is not Linux ($(uname -s))" "EROR" terminate_execution fi if [ "$EUID" -ne 0 ]; then echo " [EROR] Sudo privileges required." log_action "Blocked: Insufficient privileges (EUID: $EUID)" "EROR" terminate_execution fi MISSING_DEPENDENCIES=() log_action "Checking standard system dependencies" "DEBUG" if ! command -v python3 >/dev/null 2>&1; then MISSING_DEPENDENCIES+=("python3") MISSING_DEPENDENCIES+=("venv") MISSING_DEPENDENCIES+=("scapy") log_action "Python3 is not installed" "WARN" else command -v ethtool >/dev/null 2>&1 || MISSING_DEPENDENCIES+=("ethtool") python3 -m venv --help >/dev/null 2>&1 || MISSING_DEPENDENCIES+=("venv") python3 -c "import scapy.all" >/dev/null 2>&1 || MISSING_DEPENDENCIES+=("scapy") fi if [ ${#MISSING_DEPENDENCIES[@]} -gt 0 ]; then echo -e "\n [WARN] Missing dependencies: ${MISSING_DEPENDENCIES[*]}" log_action "Missing dependencies detected: ${MISSING_DEPENDENCIES[*]}" "WARN" if [ "$AUTO_FETCH" = "true" ]; then echo " [INFO] Auto Fetch is enabled. Resolving dependencies..." log_action "Auto Fetch initiated via argument for: ${MISSING_DEPENDENCIES[*]}" "INFO" FETCH_SUCCESS=false if command -v apt-get >/dev/null 2>&1; then echo " [INFO] Engaging package manager (apt-get)..." log_action "Using apt-get to resolve dependencies" "DEBUG" apt-get update -y -qq >/dev/null 2>&1 TARGET_PACKAGES=() for dep in "${MISSING_DEPENDENCIES[@]}"; do [ "$dep" = "python3" ] && TARGET_PACKAGES+=("python3") [ "$dep" = "scapy" ] && TARGET_PACKAGES+=("python3-scapy") [ "$dep" = "ethtool" ] && TARGET_PACKAGES+=("ethtool") [ "$dep" = "venv" ] && TARGET_PACKAGES+=("python3-venv" "python3-pip") done PROVISIONED_PACKAGES=("${TARGET_PACKAGES[@]}") PACKAGE_MANAGER="apt-get" if apt-get install -y -qq "${TARGET_PACKAGES[@]}" >/dev/null 2>&1; then echo " [SUCC] System dependencies provisioned." FETCH_SUCCESS=true fi elif command -v dnf >/dev/null 2>&1; then echo " [INFO] Engaging package manager (dnf)..." log_action "Using dnf to resolve dependencies" "DEBUG" TARGET_PACKAGES=() for dep in "${MISSING_DEPENDENCIES[@]}"; do [ "$dep" = "python3" ] && TARGET_PACKAGES+=("python3") [ "$dep" = "scapy" ] && TARGET_PACKAGES+=("python3-scapy") [ "$dep" = "ethtool" ] && TARGET_PACKAGES+=("ethtool") [ "$dep" = "venv" ] && TARGET_PACKAGES+=("python3-pip") done PROVISIONED_PACKAGES=("${TARGET_PACKAGES[@]}") PACKAGE_MANAGER="dnf" if dnf install -y -q "${TARGET_PACKAGES[@]}" >/dev/null 2>&1; then echo " [SUCC] System dependencies provisioned." FETCH_SUCCESS=true fi elif command -v pacman >/dev/null 2>&1; then echo " [INFO] Engaging package manager (pacman)..." log_action "Using pacman to resolve dependencies" "DEBUG" pacman -Sy --quiet >/dev/null 2>&1 TARGET_PACKAGES=() for dep in "${MISSING_DEPENDENCIES[@]}"; do [ "$dep" = "python3" ] && TARGET_PACKAGES+=("python") [ "$dep" = "scapy" ] && TARGET_PACKAGES+=("python-scapy") [ "$dep" = "ethtool" ] && TARGET_PACKAGES+=("ethtool") [ "$dep" = "venv" ] && TARGET_PACKAGES+=("python-pip") done PROVISIONED_PACKAGES=("${TARGET_PACKAGES[@]}") PACKAGE_MANAGER="pacman" if pacman -S --noconfirm --quiet "${TARGET_PACKAGES[@]}" >/dev/null 2>&1; then echo " [SUCC] System dependencies provisioned." FETCH_SUCCESS=true fi elif command -v apk >/dev/null 2>&1; then echo " [INFO] Engaging package manager (apk)..." log_action "Using apk to resolve dependencies" "DEBUG" apk update -q >/dev/null 2>&1 TARGET_PACKAGES=() for dep in "${MISSING_DEPENDENCIES[@]}"; do [ "$dep" = "python3" ] && TARGET_PACKAGES+=("python3") [ "$dep" = "scapy" ] && TARGET_PACKAGES+=("scapy") [ "$dep" = "ethtool" ] && TARGET_PACKAGES+=("ethtool") [ "$dep" = "venv" ] && TARGET_PACKAGES+=("python3" "py3-pip") done PROVISIONED_PACKAGES=("${TARGET_PACKAGES[@]}") PACKAGE_MANAGER="apk" if apk add -q "${TARGET_PACKAGES[@]}" >/dev/null 2>&1; then echo " [SUCC] System dependencies provisioned." FETCH_SUCCESS=true fi fi if [ "$FETCH_SUCCESS" = false ]; then echo " [EROR] Auto fetch failed. Manual installation required." log_action "Auto fetch process failed." "EROR" terminate_execution else log_action "Dependencies successfully installed: ${PROVISIONED_PACKAGES[*]}" "INFO" fi else echo " [EROR] Dependencies missing. Run with -x or --auto to fetch automatically." log_action "Execution aborted due to missing offline dependencies" "WARN" terminate_execution fi fi SANDBOX_BASE="/tmp" VIRTUAL_ENV_DIR="$SANDBOX_BASE/VLAN-X" export VLAN_HUNTER_SANDBOX="$VIRTUAL_ENV_DIR" echo " [INFO] Initializing ephemeral environment at $VIRTUAL_ENV_DIR..." log_action "Creating isolated Python environment at $VIRTUAL_ENV_DIR" "INFO" rm -rf "$VIRTUAL_ENV_DIR" if ! python3 -m venv --system-site-packages --without-pip "$VIRTUAL_ENV_DIR" >/dev/null 2>&1; then echo " [EROR] Sandbox provisioning failed." log_action "Bootstrap failure: venv creation" "EROR" terminate_execution fi SANDBOX_PYTHON="$VIRTUAL_ENV_DIR/bin/python" log_action "Verifying Scapy access within the sandbox" "DEBUG" if ! "$SANDBOX_PYTHON" -c "import scapy" >/dev/null 2>&1; then echo " [EROR] Scapy is not accessible in the sandbox." echo " Run: sudo apt install python3-scapy" log_action "Scapy import failed inside sandbox environment" "EROR" terminate_execution fi echo " [INFO] Transitioning execution to isolated sandbox..." log_action "Transitioning execution to isolated sandbox" "INFO" clear TERMINAL_FULL_WIDTH=$(calculate_full_width) render_header "$TERMINAL_FULL_WIDTH" echo "" echo " [!] TERMS AND CONDITIONS" echo "" echo "Usage of this utility constitutes an agreement to the following terms. This software is designed for the discovery of active PPPoE and IPoE VLAN identifiers via raw frame injection. As a low-level diagnostic tool requiring root-level privileges, its operation may be identified as anomalous or disruptive by upstream infrastructure monitors. Users are advised that all diagnostic activities are conducted through raw sockets and are fully visible to upstream Internet Service Providers." | fold -s -w "$TERMINAL_FULL_WIDTH" echo "" echo "VLAN Hunter operates as a high-performance diagnostic engine to map network infrastructure across 802.1Q virtual segments. Execution involves the following automated actions: initialization of an isolated ephemeral environment; temporary adjustment of the Maximum Transmission Unit (MTU) to 1512 (QinQ capable); deactivation of hardware VLAN offloading; multi-threaded dispatch of PADI and DHCP Discovery probes; and asynchronous packet correlation. Upon task completion or process termination, the utility is programmed to restore original hardware configurations and purge all temporary runtime artifacts to maintain system integrity." | fold -s -w "$TERMINAL_FULL_WIDTH" echo "" echo "This software is intended strictly for authorized administrative testing and network auditing. The execution environment is self-contained to ensure no persistent modification of global system packages occurs. By entering ACCEPT, the user acknowledges the potential for network service disruption and assumes full legal and operational liability for any and all consequences resulting from the execution of this diagnostic tool." | fold -s -w "$TERMINAL_FULL_WIDTH" echo "" echo "For more info please visit: https://github.com/sussyflow/VLAN-X" | fold -s -w "$TERMINAL_FULL_WIDTH" echo "" render_separator "$TERMINAL_FULL_WIDTH" USER_AGREEMENT=$(prompt_user "\n [?] Type [ACCEPT] to continue: " "ACCEPT" | tr '[:lower:]' '[:upper:]') if [ "$USER_AGREEMENT" != "ACCEPT" ]; then echo -e "\n [WARN] Terms declined. Execution aborted." log_action "User declined Terms and Conditions." "WARN" terminate_execution fi clear TERMINAL_STANDARD_WIDTH=$(calculate_standard_width) render_banner "$TERMINAL_STANDARD_WIDTH" echo -e "\n [SUCC] Terms accepted by user." log_action "Terms accepted. Initializing interface selection." "INFO" if [ -z "$TARGET_INTERFACE" ]; then echo -e "\n [+] NETWORK INTERFACES" render_separator "$TERMINAL_STANDARD_WIDTH" NETWORK_INTERFACES=() HARDWARE_ADDRESSES=() INTERFACE_INDEX=1 log_action "Enumerating local network interfaces" "DEBUG" for iface_path in /sys/class/net/*; do ifname=$(basename "$iface_path") mac=$(cat "$iface_path/address" 2>/dev/null || echo "") if [ "$mac" != "00:00:00:00:00:00" ] && [ -n "$mac" ]; then NETWORK_INTERFACES+=("$ifname") HARDWARE_ADDRESSES+=("$mac") printf " %2d. %-18s [%s]\n" "$INTERFACE_INDEX" "$ifname" "$mac" log_action "Found interface: $ifname ($mac)" "DEBUG" ((INTERFACE_INDEX++)) fi done SELECTED_INDEX=$(prompt_user "\n [?] Select Interface INDEX: " "1") if ! [[ "$SELECTED_INDEX" =~ ^[0-9]+$ ]] || [ "$SELECTED_INDEX" -lt 1 ] || [ "$SELECTED_INDEX" -gt "${#NETWORK_INTERFACES[@]}" ]; then echo " [EROR] Index out of bounds or invalid." log_action "Invalid interface index selected: $SELECTED_INDEX" "EROR" terminate_execution fi TARGET_INTERFACE="${NETWORK_INTERFACES[$((SELECTED_INDEX-1))]}" TARGET_MAC="${HARDWARE_ADDRESSES[$((SELECTED_INDEX-1))]}" else TARGET_MAC=$(cat "/sys/class/net/$TARGET_INTERFACE/address" 2>/dev/null || echo "") fi CLEAN_MAC=$(echo "$TARGET_MAC" | tr -d ':') if [ "${#CLEAN_MAC}" -ne 12 ]; then echo " [EROR] Malformed hardware address." log_action "Malformed MAC address extracted: $TARGET_MAC" "EROR" terminate_execution fi clear render_banner "$TERMINAL_STANDARD_WIDTH" echo -e "\n [+] INITIALIZING SCAN SEQUENCE" render_separator "$TERMINAL_STANDARD_WIDTH" echo " [INFO] Target interface : $TARGET_INTERFACE" echo " [INFO] Source MAC : $TARGET_MAC" log_action "Scan initialized on interface: $TARGET_INTERFACE with MAC: $TARGET_MAC" "INFO" CURRENT_MTU=$(cat "/sys/class/net/$TARGET_INTERFACE/mtu" 2>/dev/null || echo "1500") log_action "Current MTU for $TARGET_INTERFACE is $CURRENT_MTU" "DEBUG" if [ "$CURRENT_MTU" -lt 1512 ]; then ORIGINAL_MTU="$CURRENT_MTU" echo " [INFO] Temporarily increasing MTU to 1512 for 802.1ad (QinQ) overhead." log_action "Adjusting MTU to 1512 on $TARGET_INTERFACE (Original: $ORIGINAL_MTU)" "INFO" ip link set dev "$TARGET_INTERFACE" mtu 1512 >/dev/null 2>&1 else log_action "MTU $CURRENT_MTU is sufficient. No MTU adjustment made." "INFO" fi if ethtool -k "$TARGET_INTERFACE" 2>/dev/null | grep -q "rx-vlan-offload: on"; then RESTORE_RXVLAN="true" echo " [INFO] Disabling hardware VLAN stripping (rxvlan off)." log_action "Executing ethtool rxvlan off for $TARGET_INTERFACE" "INFO" ethtool -K "$TARGET_INTERFACE" rxvlan off >/dev/null 2>&1 if ethtool -k "$TARGET_INTERFACE" 2>/dev/null | grep -q "rx-vlan-offload: on"; then echo " [WARN] Driver refused to disable rxvlan-offload. You may not see tagged responses!" log_action "Driver failed to apply rxvlan off state." "WARN" fi else log_action "rxvlan offload already disabled on $TARGET_INTERFACE. No changes made." "INFO" fi SCANNER_SCRIPT="$VIRTUAL_ENV_DIR/vlan_scanner.py" cat << 'EOF' > "$SCANNER_SCRIPT" import sys import datetime import os import threading import queue import time import binascii import shutil import random import signal def log_execution(msg, level="INFO"): ts = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') try: with open(sys.argv[3], "a") as f: f.write(f" [{ts}] [{level}] [PYTHON] {msg}\n") except Exception: pass stop_event = threading.Event() scan_results = queue.Queue() completed_tasks = 0 console_lock = threading.Lock() def graceful_exit(sig, frame): if not stop_event.is_set(): sys.stdout.write("\n\n [WARN] Interruption detected. Halting scan and gathering results...\n") sys.stdout.flush() log_execution(f"Signal {sig} caught. Graceful exit initiated.", "WARN") stop_event.set() signal.signal(signal.SIGINT, graceful_exit) signal.signal(signal.SIGTERM, graceful_exit) try: log_execution("Importing base modules successful", "DEBUG") from scapy.all import Ether, Dot1Q, PPPoED, IP, UDP, BOOTP, DHCP, AsyncSniffer, conf log_execution("Scapy framework loaded successfully", "INFO") except ImportError as e: log_execution(f"Scapy engine load failure inside sandbox: {e}", "EROR") print(" [EROR] Engine load failure inside sandbox.") sys.exit(1) target_ifce = sys.argv[1] target_mac = sys.argv[2] log_file_path = sys.argv[3] vlan_range_str = sys.argv[4] mac_bytes = binascii.unhexlify(target_mac.replace(':', '')) try: vlan_start, vlan_end = map(int, vlan_range_str.split('-')) target_vlans = list(range(vlan_start, vlan_end + 1)) total_vlans = len(target_vlans) except Exception: target_vlans = list(range(0, 4096)) total_vlans = 4096 log_execution(f"Starting execution routine on {target_ifce} for {total_vlans} VLANs", "INFO") OUI_MAP = { "00:00:0C": "Cisco", "00:01:42": "Cisco", "00:01:43": "Cisco", "00:1B:ED": "Cisco", "00:E0:B0": "Cisco", "00:E0:67": "Cisco", "00:10:F3": "Juniper", "00:14:F6": "Juniper", "4C:96:14": "Juniper", "00:90:1A": "Juniper", "00:E0:FC": "Huawei", "00:18:82": "Huawei", "00:25:9E": "Huawei", "00:10:A4": "Nokia", "00:20:D8": "Nokia", "00:D0:F6": "Alcatel", "00:E0:16": "Alcatel", "00:A0:C9": "Intel", "00:11:22": "Celeros", "00:04:96": "Extreme", "00:E0:2B": "Extreme", "E8:BD:D1": "ZTE", "CC:AF:78": "ZTE", "00:1E:73": "ZTE" } def get_vendor(mac_addr): oui = mac_addr[:8].upper() return OUI_MAP.get(oui, "Unknown") def resolve_vlan_type(vlan_id, identifier_name): name_upper = identifier_name.upper() if any(keyword in name_upper for keyword in ["IPTV", "TV", "VIDEO", "VOD"]): return "IPTV" if any(keyword in name_upper for keyword in ["VOIP", "VOICE", "SIP", "PHONE"]): return "VOIP" if any(keyword in name_upper for keyword in ["MGMT", "CWMP", "TR069", "ACS", "MANAGEMENT"]): return "MGMT" return "INTERNET" if vlan_id > 0 else "UNTAGGED" def analyze_packet(packet): try: vlan_id = packet[Dot1Q].vlan if Dot1Q in packet else 0 if packet.haslayer(PPPoED) and packet[PPPoED].code in [0x07, 0x65]: raw_payload = bytes(packet[PPPoED].payload) identifier_name = 'UNKNOWN' index = 0 while index + 4 <= len(raw_payload): tag_type = (raw_payload[index] << 8) + raw_payload[index+1] tag_length = (raw_payload[index+2] << 8) + raw_payload[index+3] if tag_type == 0x0102: identifier_name = raw_payload[index+4 : index+4+tag_length].decode(errors='ignore') break index += 4 + tag_length vendor = get_vendor(packet[Ether].src) log_execution(f"Discovered PPPoE response on VLAN {vlan_id} [{vendor}]", "DEBUG") scan_results.put({'vlan': vlan_id, 'mac': packet[Ether].src, 'vendor': vendor, 'name': identifier_name, 'type': resolve_vlan_type(vlan_id, identifier_name), 'protocol': 'PPPoE'}) elif packet.haslayer(BOOTP) and packet[BOOTP].op == 2: identifier_name = "DHCP Server" if packet.haslayer(DHCP): for option in packet[DHCP].options: if isinstance(option, tuple) and option[0] == 'vendor_class_id': identifier_name = f"VCI: {option[1].decode(errors='ignore')}" break if identifier_name == "DHCP Server" and packet.haslayer(IP): identifier_name = f"IPoE: {packet[IP].src}" vendor = get_vendor(packet[Ether].src) log_execution(f"Discovered DHCP response on VLAN {vlan_id} [{vendor}]", "DEBUG") scan_results.put({'vlan': vlan_id, 'mac': packet[Ether].src, 'vendor': vendor, 'name': identifier_name, 'type': resolve_vlan_type(vlan_id, identifier_name), 'protocol': 'DHCP'}) except Exception as e: log_execution(f"Exception in packet callback: {e}", "WARN") def render_progress(total_tasks): global completed_tasks log_execution("Initializing console progress display sequence", "DEBUG") sys.stdout.write("\n") sys.stdout.flush() while not stop_event.is_set() and completed_tasks < total_tasks: ratio = (completed_tasks / total_tasks) if total_tasks else 0 filled_length = int(ratio * 30) progress_bar = '#' * filled_length + '-' * (30 - filled_length) sys.stdout.write(f"\r\033[K [INFO] Scanning: [{progress_bar}] {ratio * 100:>5.1f}% ({completed_tasks:>4}/{total_tasks})") sys.stdout.flush() time.sleep(0.05) sys.stdout.write("\r\033[K") sys.stdout.flush() if completed_tasks >= total_tasks and not stop_event.is_set(): log_execution(f"Scan complete. Probed {total_tasks} VLANs.", "INFO") sys.stdout.write(f" [SUCC] Scan complete: {total_tasks}/{total_tasks} VLANs probed.\n") sys.stdout.flush() def craft_injection_packets(vlan_id, source_mac, target_mac_bytes, xid_val): base_ether = Ether(src=source_mac, dst="ff:ff:ff:ff:ff:ff") pppoe_ether = Ether(src=source_mac, dst="ff:ff:ff:ff:ff:ff", type=0x8863) ipv4_ether = Ether(src=source_mac, dst="ff:ff:ff:ff:ff:ff", type=0x0800) pppoe_payload = PPPoED(version=1, type=1, code=0x09, sessionid=0) dhcp_payload = IP(src="0.0.0.0", dst="255.255.255.255")/UDP(sport=68, dport=67)/BOOTP(chaddr=target_mac_bytes + b'\x00'*10, xid=xid_val)/DHCP(options=[("message-type", "discover"), "end"]) if vlan_id > 0: return [base_ether / Dot1Q(vlan=vlan_id, type=0x8863) / pppoe_payload, base_ether / Dot1Q(vlan=vlan_id, type=0x0800) / dhcp_payload] else: return [pppoe_ether / pppoe_payload, ipv4_ether / dhcp_payload] def execute_injection(interface, vlan_list, source_mac, target_mac_bytes): global completed_tasks try: s = conf.L2socket(iface=interface) for vlan_id in vlan_list: if stop_event.is_set(): break xid_val = random.randint(1, 0xFFFFFFFF) pkts = craft_injection_packets(vlan_id, source_mac, target_mac_bytes, xid_val) for p in pkts: s.send(p) with console_lock: completed_tasks += 1 time.sleep(0.02) s.close() except Exception as e: log_execution(f"Socket error in worker thread: {e}", "EROR") print(" [INFO] Spawning asynchronous network listener...") log_execution("Starting scapy AsyncSniffer", "INFO") conf.verb = 0 sniffer = AsyncSniffer(iface=target_ifce, prn=analyze_packet, store=0) sniffer.start() time.sleep(0.025) print(f" [INFO] Initiating multi-threaded packet sequences (Range: {sys.argv[4]})...") log_execution(f"Deploying multi-threaded packet sequence pool for {total_vlans} targets", "INFO") progress_thread = threading.Thread(target=render_progress, args=(total_vlans,), daemon=True) progress_thread.start() vlan_chunks = [target_vlans[index::32] for index in range(32)] worker_threads = [] for chunk in vlan_chunks: if stop_event.is_set(): break if not chunk: continue thread = threading.Thread(target=execute_injection, args=(target_ifce, chunk, target_mac, mac_bytes)) thread.start() worker_threads.append(thread) while any(thread.is_alive() for thread in worker_threads): time.sleep(0.1) for thread in worker_threads: thread.join(timeout=0.5) progress_thread.join(timeout=0.5) sys.stdout.write("\r\033[K") sys.stdout.flush() if not stop_event.is_set(): print(" [INFO] Awaiting delayed network responses (3s)...") log_execution("Awaiting 3s for delayed network responses", "INFO") for _ in range(30): if stop_event.is_set(): break time.sleep(0.1) else: time.sleep(0.5) log_execution("Terminating packet sniffer", "DEBUG") stop_event.set() sniffer.stop() discovery_data = {} while not scan_results.empty(): item = scan_results.get() discovery_data[f"{item['vlan']}-{item['protocol']}-{item['name']}"] = item if discovery_data: log_execution(f"Parsing discovery results. Unique targets found: {len(discovery_data)}", "INFO") term_cols = shutil.get_terminal_size((82, 20)).columns max_width = min(110, term_cols - 2) w_name = max(10, max_width - 70) table_width = 70 + w_name fmt = f" {{:<6}} | {{:<7}} | {{:<10}} | {{:<{w_name}}} | {{:<17}} | {{:<10}}" print("\n [+] DISCOVERY RESULTS") print(' ' + '=' * (table_width - 1)) print(fmt.format('VLAN', 'PROTO', 'TYPE', 'IDENTITY', 'MAC ADDRESS', 'VENDOR')) print(' ' + '=' * (table_width - 1)) sorted_keys = sorted(discovery_data.keys(), key=lambda x: (int(x.split('-')[0]), x.split('-')[1])) for key in sorted_keys: d = discovery_data[key] print(fmt.format(str(d['vlan']), d['protocol'], d['type'], d['name'][:w_name], d['mac'], d['vendor'][:10])) log_execution(f"Discovered: {d['vlan']} | {d['protocol']} | {d['type']} | {d['name']} | {d['mac']} | {d['vendor']}", "INFO") print(' ' + '=' * (table_width - 1)) else: log_execution("Zero VLAN identifiers discovered during this execution", "INFO") if stop_event.is_set(): print(" [INFO] No targets discovered before interruption.") print("\n [SUCC] Execution sequence finished.") log_execution("Core execution sequence successfully terminated", "INFO") EOF "$SANDBOX_PYTHON" "$SCANNER_SCRIPT" "$TARGET_INTERFACE" "$TARGET_MAC" "$LOG_FILE" "$VLAN_RANGE"