#!/usr/bin/python3 import glob import ipaddress import os import pwd import re import requests import shlex import socket import subprocess import sys from collections import Counter from datetime import datetime, timedelta, UTC from urllib.parse import urlparse, urlunparse from requests.packages.urllib3.poolmanager import PoolManager from requests.adapters import HTTPAdapter _registration_re = re.compile(r'^(\d+):([^@]+)@([a-zA-Z0-9\.]+)$') _reghost_re = re.compile(r'^register\.allstarlink\.org$') def print_element_msg(msg, msg_type): print(f"{msg_type:>10}: {msg}") def print_error(msg): print_element_msg(msg,"Error") def print_warning(msg): print_element_msg(msg,"Warning") def print_info(msg): print_element_msg(msg,"Info") def print_ok(msg): print_element_msg(msg,"OK") def require_root_or_asterisk(): uid = os.geteuid() user = pwd.getpwuid(uid).pw_name if user not in ("root", "asterisk"): print_error(f"Access denied: script must run as root or asterisk (current user: {user})\n") sys.exit(1) def compare_node_lists(configed_nodes, registrations_nodes): set_config = set(configed_nodes) set_regs = set(registrations_nodes) identical = (set_config == set_regs) missing_from_config = set_regs - set_config missing_from_regs = set_config - set_regs return identical, list(missing_from_config), list(missing_from_regs) def reverse_dns(ip): """ Perform a reverse DNS lookup for IPv4 or IPv6. Returns the hostname string, or None if lookup fails. """ try: # Validate IP (works for both IPv4 and IPv6) ipaddress.ip_address(ip) except ValueError: return None try: hostname, aliaslist, ipaddrlist = socket.gethostbyaddr(ip) return hostname except socket.herror: return None def is_register_allstarlink_up(timeout=5): """ Returns True if https://register.allstarlink.org responds with HTTP 200. Returns False for any other status code or network error. """ url = "https://register.allstarlink.org" try: response = requests.get(url, timeout=timeout) return response.status_code == 200 except requests.RequestException: return False def parse_registration_string(s): """ Parse 'node:secret@host' into (node, secret, host). Return None if the string does not strictly match. """ m = _registration_re.match(s) if not m: return None node, secret, host = m.groups() return node, secret, host def extract_registration_values(filename, regtype): """ extract the register => lines from a config file """ pattern = re.compile(r'^register\s*=>?\s*(\S+)\s.*$') results = [] with open(filename, 'r') as f: for line in f: m = pattern.match(line) if m: value = re.sub(r'\s+', '', m.group(1)) # remove all whitespace regtuple = parse_registration_string(value) if regtuple is None: print_error(f"Invalid registration >> {value} << in {filename}") continue else: regtuple = regtuple + (regtype,) results.append(regtuple) return results def get_registrations(): """ obtain all the configurations from a registration file """ conf_files = [ ("/etc/asterisk/iax.conf","IAX"), ("/etc/asterisk/rpt_http_registrations.conf","HTTP") ] config_registers = [] config_has_one_reg = False for filename, regtype in conf_files: r = extract_registration_values(filename, regtype) if len(r) < 1: print_info(f"{filename} contains no registration lines.") continue else: i = len(r) print_info(f"{filename} contains {i} registration line(s)") config_registers.extend(r) config_has_one_reg = True if config_has_one_reg is False: return None return config_registers def get_rpt_nodes(conf_path="/etc/asterisk/rpt.conf"): """ Returns a list of node numbers that appear as: [12345](node-main) ignoring whitespace. Also follows #tryinclude directives by scanning the entire file for tryincludes first, then scanning for nodes. """ node_pattern = re.compile(r"^\[\s*(\d+)\s*\]", re.IGNORECASE) # Matches: # #tryinclude # #include # Allows whitespace around tokens. include_pattern = re.compile( r"^\s*#\s*(tryinclude|include)\s+(\S+)", re.IGNORECASE ) nodes = [] seen_nodes = set() seen_files = set() def resolve_includes(base_file, include_target): """Resolve relative paths from base_file dir and expand globs.""" target = include_target.strip() if not os.path.isabs(target): target = os.path.join(os.path.dirname(base_file), target) # Expand globs like /etc/asterisk/rpt*.conf matches = sorted(glob.glob(target)) return matches def scan_file(path): path = os.path.realpath(path) if path in seen_files: return seen_files.add(path) try: with open(path, "r") as f: # Read all lines so we can do a true "scan entire file" pass lines = f.readlines() except FileNotFoundError: print_info(f"Config file not found: {path}") return except PermissionError: print_info(f"Permission denied reading: {path}") return except OSError as e: print_info(f"Error reading {path}: {e}") return # ------------------------------------------------------------ # PASS 1: scan the ENTIRE file for includes (tryinclude/include) # ------------------------------------------------------------ include_paths = [] for raw in lines: line = raw.strip() m = include_pattern.match(line) if not m: continue kind = m.group(1).lower() # tryinclude / include inc = m.group(2).strip() # filename inside <...> resolved = resolve_includes(path, inc) if not resolved: # tryinclude is allowed to be missing; include usually isn't if kind == "include": print_info(f"Include target not found: {inc} (from {path})") continue include_paths.extend(resolved) # Recurse into includes AFTER we've found them all for inc_path in include_paths: scan_file(inc_path) # ------------------------------------------------------------ # PASS 2: scan the ENTIRE file for node section headers # ------------------------------------------------------------ for raw in lines: line = raw.strip() m = node_pattern.match(line) if not m: continue node = m.group(1) if int(node) > 1999: if node not in seen_nodes: nodes.append(node) seen_nodes.add(node) else: print_info(f"Skipping evaluation of private node {node}") scan_file(conf_path) return nodes def check_node_formedness(node,reghost): """ validate node and reghost """ n_errors = 0 n_warnings = 0 try: node_int = int(node) if node_int < 2000 or node_int > 999989: print_error(f"Node {node} is outside the numeric range 2000-999989") n_errors += 1 except Exception as e: print_error(f"Registration has \"node\" {node} which is not a number") n_errors += 1 m = _reghost_re.match(reghost) if m is None: print_error(f"Registration has a reghost of '{reghost}' which is not 'register.allstarlink.org'") n_errors += 1 return n_errors, n_warnings def find_iax_registration(node): """ Runs 'asterisk -rx "iax2 show registry"' and returns a tuple: (Host, Perceived, State) for the given username, or None if not found. """ cmd = "asterisk -rx 'iax2 show registry'" try: output = subprocess.check_output( shlex.split(cmd), text=True, stderr=subprocess.STDOUT ) except subprocess.CalledProcessError: return None, None, None lines = output.strip().splitlines() if len(lines) < 3: return None, None, None # Skip header line data_lines = lines[1:] for line in data_lines: # Skip summary line like "1 IAX2 registrations." if "registrations" in line: continue # Columns are aligned, so splitting on whitespace works reliably parts = line.split() if len(parts) < 6: continue host = parts[0] dnsmgr = parts[1] user = parts[2] perceived = parts[3] refresh = parts[4] state = " ".join(parts[5:]) # State may contain spaces if user == node: return host, perceived, state return None, None, None def check_iax_registration(node): """ validate IAX registration status """ host, myip, state = find_iax_registration(node) if host is None: print_error(f"There is no apparent IAX registration attempt for node {node}") return False if not state == "Registered": print_error(f"IAX registration state is {state}") return False print_ok(f"IAX registration state is {state}") print_ok(f"Perceived IAX IP:PORT for this node is: {myip}") host = host.split(":")[0] reghost_name = reverse_dns(host) print_ok(f"Registered to {reghost_name} - {host}") return True def find_http_registration(node): """ Runs 'asterisk -rx "rpt show registrations"' and returns a list of tuples: (Host, Perceived, State) for all rows matching the given username. Returns an empty list if none match. """ cmd = "asterisk -rx 'rpt show registrations'" try: output = subprocess.check_output( shlex.split(cmd), text=True, stderr=subprocess.STDOUT ) except subprocess.CalledProcessError: return None, None, None lines = output.strip().splitlines() if len(lines) < 3: return None, None, None results = [] # Skip header line data_lines = lines[1:] for line in data_lines: # Skip summary line like "2 HTTP registrations." if "registrations" in line: continue parts = line.split() if len(parts) < 5: continue host = parts[0] user = parts[1] perceived = parts[2] refresh = parts[3] state = " ".join(parts[4:]) # State may contain multiple words if user == node: return host, perceived, state return None, None, None def check_http_registration(node): """ validate HTTP registration status """ host, myip, state = find_http_registration(node) if host is None: print_error(f"There is no apparent HTTP registration attempt for node {node}") return False if not state == "Registered": print_error(f"HTTP registration state is {state}") return False print_ok(f"HTTP registration state is {state}") print_ok(f"Perceived IAX IP:PORT for this node is: {myip}") reghost_name = reverse_dns(host.split(":")[0]) host = host.split(":")[0] print_ok(f"Registered to {reghost_name} - {host}") return True def get_iax_bindport(filename="/etc/asterisk/iax.conf"): """ Reads the given iax.conf file, finds a line of the form: bindport = and returns the number as an int. Returns None if not found or malformed. """ pattern = re.compile(r'^bindport\s*=\s*(\d+)') try: with open(filename, "r") as f: for line in f: m = pattern.match(line) if m: return int(m.group(1)) except OSError: return None print_error("binddport does not appear to be set in iax.conf") return None def get_node_stats(node): """ Fetch JSON from an HTTP API endpoint. Returns the parsed JSON (dict/list) or None on failure. """ try: url = f"https://stats.allstarlink.org/api/stats/{node}" response = requests.get(url, timeout=5) response.raise_for_status() # raises for 4xx/5xx return response.json() except (requests.RequestException, ValueError): return None def get_node_regtime(node): """ Fetch JSON from an HTTP API endpoint. Returns the parsed JSON (dict/list) or None on failure. """ try: url = f"https://www.allstarlink.org/nodelist/nodelist-server.php?s={node}" response = requests.get(url, timeout=5) response.raise_for_status() # raises for 4xx/5xx return response.json() except (requests.RequestException, ValueError): return None def check_node_status(node): """ compare stats operational data """ n_errors = 0 n_warnings = 0 bind_udpport = get_iax_bindport() if bind_udpport is None: n_errors += 1 stats = get_node_stats(node) if stats is None: print_error(f"No stats available... cannot continue this check set") n_warnings += 1 return n_errors, n_warnings stats_node = stats.get('node') if isinstance(stats, dict) else None if not isinstance(stats_node, dict): print_error("Node stats are unavailable in the response.") n_errors += 1 return n_errors, n_warnings stats_server = stats_node.get('server') if not isinstance(stats_server, dict): print_error("Node server stats are unavailable in the response.") n_errors += 1 return n_errors, n_warnings stats_udpport = stats_server.get('udpport') if stats_udpport is None: print_error("UDP port is missing from node stats.") n_errors += 1 elif stats_udpport == bind_udpport: print_ok(f"Server-set UDP port matches bindport in iax.conf") else: print_error(f"Registered UDP port does not match bindport in iax.conf: {stats_udpport} != {bind_udpport}") print_info("Mismatched IAX information might be okay if your router has been configured to") print_info("perform the external->internal port forwarding. The \"Nodeping\" status, below,") print_info("can be used to confirm inbound connectivity.") n_errors += 1 node_reginfo = get_node_regtime(node) if node_reginfo is None or len(node_reginfo) == 0: print_error(f"No registration information available.") n_errors += 1 return n_errors, n_warnings reg_since = node_reginfo[0].get('regseconds') if reg_since is None: print_error(f"Registration information is missing 'regseconds' field.") n_errors += 1 return n_errors, n_warnings reg_since_dt = datetime.fromtimestamp(reg_since, UTC) reg_since_str = reg_since_dt.strftime("%Y-%m-%d %H:%M:%S UTC") if reg_since_dt < datetime.now(UTC) - timedelta(minutes=10): print_error(f"Node not registered in last 10 minutes: {reg_since_str}") n_errors += 1 else: print_ok(f"Node registration within 10 minutes: {reg_since_str}") iptime = stats_node.get('iptime') if iptime: print_info(f"Last time IP changed was {iptime} UTC") else: print_warning("Last time IP change is unavailable in node stats") n_warnings += 1 return n_errors, n_warnings def get_node_ping(node): """ Fetch JSON from an HTTP API endpoint. Returns the parse JSON (dict/list) or None on failure. """ try: url = f"https://nodeping.allstarlink.org?node={node}" response = requests.get(url, timeout=5) response.raise_for_status() # raises for 4xx/5xx return response.json() except (requests.RequestException, ValueError): return None def check_node_reachability(node): """ checks inbound node reachability """ node_check = get_node_ping(node) if node_check is None: print_warning("Nodeping is not responding - rate-limited or it's down") return 0,1 if node_check['ipv4']['status'] == "unregistered" or node_check['ipv4']['rc'] == -9: print_warning("Node is not perceived as registered; not tested") return 0,1 status = node_check['ipv4']['status'] if status == "unreachable": print_error("Incoming connections to IAX from other nodes failed; client IP or firewall issue") return 1,0 if status == "ok": pingms = node_check['ipv4']['pingms'] print_ok(f"Incoming connections to IAX from other nodes is successful") print_info(f"IAX ping test has a roundtrip time of {pingms}ms") return 0,0 print_warning("Nodeping is returning something I don't understand") return 0,1 def get_remote_ip_http(url): """ Fetch JSON from an HTTP API endpoint. Returns the parsed text or None """ try: response = requests.get(url, timeout=5) response.raise_for_status() # raises for 4xx/5xx return response.text.strip() except (requests.RequestException, ValueError) as e: return None def udp_ping(url: str, message: str = "ping", timeout: float = 3.0) -> str: """ Send a UDP packet to the host/port in the given udp:// URL and return the text response. Parameters ---------- udp_pingurl : str A URL like "udp://hostname:port" message : str The text to send (default: "ping") timeout : float Seconds to wait for a response Returns ------- str The decoded text returned by the server. Raises ------ ValueError If the URL is invalid or missing host/port. TimeoutError If no response is received within the timeout. OSError For underlying socket errors. """ parsed = urlparse(url) if parsed.scheme != "udp": raise ValueError(f"URL must start with udp://, got: {url}") host = parsed.hostname port = parsed.port if not host or not port: raise ValueError(f"URL must include host and port, got: {url}") # Resolve IPv4 only try: infos = socket.getaddrinfo( host, port, family=socket.AF_INET, type=socket.SOCK_DGRAM ) except socket.gaierror: return None # Use the first IPv4 result family, socktype, proto, canonname, sockaddr = infos[0] sock = socket.socket(family, socktype, proto) sock.settimeout(timeout) try: sock.sendto(message.encode("utf-8"), sockaddr) data, _ = sock.recvfrom(2048) ip = data.decode("utf-8", errors="replace") return ip.split(":")[0] except socket.timeout: return None finally: sock.close() def check_remote_ip_perception(): """ checks remote targets for perceived IPs """ remote_http_checks = [ "https://conntest-east1.allstarlink.org/ip" , "https://conntest-west1.allstarlink.org/ip" , "https://conntest-west2.allstarlink.org/ip" ] remote_udp_checks = [ "udp://conntest-east1.allstarlink.org:4570" , "udp://conntest-west1.allstarlink.org:4569" , "udp://conntest-west2.allstarlink.org:4569" ] probed_ips_http = [] probed_ips_iax = [] consensus_ip = None for url in remote_http_checks: ip = get_remote_ip_http(url) if not ip is None: print_info(f"IP from {url} reports: {ip}") probed_ips_http.append(ip) else: print_warning(f"IP test to {url} failed") for host in remote_udp_checks: ip = udp_ping(host) if not ip is None: print_info(f"IP from {host} reports: {ip}") probed_ips_iax.append(ip) else: print_warning(f"IP test to {host} failed") probed_ips_http = set(probed_ips_http) probed_ips_iax = set(probed_ips_iax) if len(probed_ips_http) == 1: print_ok("HTTP IP probes have consensus on the same perceived IP") else: print_error("HTTP IP probes DO NOT have consensus on the same perceived IP! CGNAT?") if len(probed_ips_iax) == 1: print_ok("IAX IP probes have consensus on the same perceived IP") else: print_error("IAX IP probes DO NOT have consensus on the same perceived IP! CGNAT?") def main(): print("\n-----===== AllStarLink Node Authentication Check =====-----\n") print("Checking configuration:") configed_nodes = get_rpt_nodes() if configed_nodes is None: print_error("NO NODE ARE CONFIGURED IN rpt.conf") print_error("RUN asl-menu OR INSPECT THE CONFIG FILES") print("") sys.exit() configed_nodes_string = ", ".join(configed_nodes) print_info(f"rpt.conf has configuration for: {configed_nodes_string}") registrations = (get_registrations()) if registrations is None: print_error("NO VALID REGISTRATIONS EXIST IN THE ASTERISK CONFIGURATION!") print_error("RUN asl-menu OR INSPECT THE CONFIG FILES") print("") sys.exit() registrations_nodes = [n[0] for n in registrations] registration_nodes_string = ", ".join(registrations_nodes) print_info(f"Registrations present for configured node(s): {registration_nodes_string}") counts = Counter(registrations_nodes) duplicates = [s for s, count in counts.items() if count >= 2] if duplicates != []: duplicates_string = ", ".join(duplicates) print_error(f"Duplicated HTTP and IAX registrations for: {duplicates_string}") print_error(f"Cannot continue; reconcile configuration before proceeding") sys.exit() identical, missing_from_config, missing_from_regs = compare_node_lists(configed_nodes, registrations_nodes) if not identical: if len(missing_from_regs) > 0: mr = ", ".join(missing_from_regs) print_error(f"No registration present for configured node(s): {mr}") if len(missing_from_config) > 0: mc = ", ".join(missing_from_config) print_error(f"No configuration present for registrations for node(s): {mc}") print_error(f"Cannot continue; reconcile configuration before proceeding") sys.exit() check_remote_ip_perception() for node, password, reghost, regtype in registrations: total_errors = 0 total_warnings = 0 print(f"\nTesting node {node}:") e, r = check_node_formedness(node, reghost) if e > 0: total_errors += e if r > 0: total_warnings += r if e == 0 and r == 0: print_ok("Node registration config is well-formed") print_ok(f"Node registration type is {regtype}") regup = is_register_allstarlink_up() if regup is True: print_ok("register.allstarlink.org is reachable (via HTTP)") else: print_error("register.allstarlink.org is unreachable (via HTTP)") if regtype == "IAX": r = check_iax_registration(node) if regtype == "HTTP": r= check_http_registration(node) if r is False: total_errors += 1 print_info("Stopping node checks due to registration failure") print_info("as further information will be unreliable") if r is True: e, r = check_node_status(node) if e > 0: total_errors += e if r > 0: total_warnings += r e, r = check_node_reachability(node) if e > 0: total_errors += e if r > 0: total_warnings += r if total_errors > 0: print("") print_error(f"Node {node} has {total_errors} error(s)!") if total_warnings > 0: print("") print_warning(f"Node {node} has {total_warnings} warning(s)!") print_warning(f"Warnings may or may not be problems depending on configuration") if total_errors == 0 and total_warnings == 0: print_ok(f"No problems detected with node {node}") print("") def entrypoint(): require_root_or_asterisk() main() if __name__ == "__main__": entrypoint()