# pylint: disable=line-too-long, wrong-import-order
import headscale, helper, pytz, os, yaml, logging, json
from flask import Flask, Markup, render_template
from datetime import datetime
from dateutil import parser
from concurrent.futures import ALL_COMPLETED, wait
from flask_executor import Executor
LOG_LEVEL = os.environ["LOG_LEVEL"].replace('"', '').upper()
# Initiate the Flask application and logging:
app = Flask(__name__, static_url_path="/static")
match LOG_LEVEL:
case "DEBUG" : app.logger.setLevel(logging.DEBUG)
case "INFO" : app.logger.setLevel(logging.INFO)
case "WARNING" : app.logger.setLevel(logging.WARNING)
case "ERROR" : app.logger.setLevel(logging.ERROR)
case "CRITICAL": app.logger.setLevel(logging.CRITICAL)
executor = Executor(app)
url_node_name = headscale.set_url_node_name()
def render_overview():
app.logger.info("Rendering the Overview page")
url = headscale.get_url()
api_key = headscale.get_api_key()
timezone = pytz.timezone(os.environ["TZ"] if os.environ["TZ"] else "UTC")
local_time = timezone.localize(datetime.now())
# Overview page will just read static information from the config file and display it
# Open the config.yaml and parse it.
config_file = ""
try:
config_file = open("/etc/headscale/config.yml", "r")
app.logger.info("Opening /etc/headscale/config.yml")
except:
config_file = open("/etc/headscale/config.yaml", "r")
app.logger.info("Opening /etc/headscale/config.yaml")
config_yaml = yaml.safe_load(config_file)
# Get and display the following information:
# Overview of the server's machines, users, preauth keys, API key expiration, server version
# Get all machines:
machines = headscale.get_machines(url, api_key)
machines_count = len(machines[str(url_node_name)+"s"])
# Need to check if routes are attached to an active machine:
# ISSUE: https://github.com/iFargle/headscale-webui/issues/36
# ISSUE: https://github.com/juanfont/headscale/issues/1228
# Get all routes:
routes = headscale.get_routes(url,api_key)
total_routes = 0
for route in routes["routes"]:
if int(route[str(url_node_name)]['id']) != 0:
total_routes += 1
enabled_routes = 0
for route in routes["routes"]:
if route["enabled"] and route['advertised'] and int(route[str(url_node_name)]['id']) != 0:
enabled_routes += 1
# Get a count of all enabled exit routes
exits_count = 0
exits_enabled_count = 0
for route in routes["routes"]:
if route['advertised'] and int(route[str(url_node_name)]['id']) != 0:
if route["prefix"] == "0.0.0.0/0" or route["prefix"] == "::/0":
exits_count +=1
if route["enabled"]:
exits_enabled_count += 1
# Get User and PreAuth Key counts
user_count = 0
usable_keys_count = 0
users = headscale.get_users(url, api_key)
for user in users["users"]:
user_count +=1
preauth_keys = headscale.get_preauth_keys(url, api_key, user["name"])
for key in preauth_keys["preAuthKeys"]:
expiration_parse = parser.parse(key["expiration"])
key_expired = True if expiration_parse < local_time else False
if key["reusable"] and not key_expired: usable_keys_count += 1
if not key["reusable"] and not key["used"] and not key_expired: usable_keys_count += 1
# General Content variables:
ip_prefixes, server_url, disable_check_updates, ephemeral_node_inactivity_timeout, node_update_check_interval = "N/A", "N/A", "N/A", "N/A", "N/A"
if "ip_prefixes" in config_yaml: ip_prefixes = str(config_yaml["ip_prefixes"])
if "server_url" in config_yaml: server_url = str(config_yaml["server_url"])
if "disable_check_updates" in config_yaml: disable_check_updates = str(config_yaml["disable_check_updates"])
if "ephemeral_node_inactivity_timeout" in config_yaml: ephemeral_node_inactivity_timeout = str(config_yaml["ephemeral_node_inactivity_timeout"])
if "node_update_check_interval" in config_yaml: node_update_check_interval = str(config_yaml["node_update_check_interval"])
# OIDC Content variables:
issuer, client_id, scope, use_expiry_from_token, expiry = "N/A", "N/A", "N/A", "N/A", "N/A"
if "oidc" in config_yaml:
if "issuer" in config_yaml["oidc"] : issuer = str(config_yaml["oidc"]["issuer"])
if "client_id" in config_yaml["oidc"] : client_id = str(config_yaml["oidc"]["client_id"])
if "scope" in config_yaml["oidc"] : scope = str(config_yaml["oidc"]["scope"])
if "use_expiry_from_token" in config_yaml["oidc"] : use_expiry_from_token = str(config_yaml["oidc"]["use_expiry_from_token"])
if "expiry" in config_yaml["oidc"] : expiry = str(config_yaml["oidc"]["expiry"])
# Embedded DERP server information.
enabled, region_id, region_code, region_name, stun_listen_addr = "N/A", "N/A", "N/A", "N/A", "N/A"
if "derp" in config_yaml:
if "server" in config_yaml["derp"] and config_yaml["derp"]["server"]["enabled"]:
if "enabled" in config_yaml["derp"]["server"]: enabled = str(config_yaml["derp"]["server"]["enabled"])
if "region_id" in config_yaml["derp"]["server"]: region_id = str(config_yaml["derp"]["server"]["region_id"])
if "region_code" in config_yaml["derp"]["server"]: region_code = str(config_yaml["derp"]["server"]["region_code"])
if "region_name" in config_yaml["derp"]["server"]: region_name = str(config_yaml["derp"]["server"]["region_name"])
if "stun_listen_addr" in config_yaml["derp"]["server"]: stun_listen_addr = str(config_yaml["derp"]["server"]["stun_listen_addr"])
nameservers, magic_dns, domains, base_domain = "N/A", "N/A", "N/A", "N/A"
if "dns_config" in config_yaml:
if "nameservers" in config_yaml["dns_config"]: nameservers = str(config_yaml["dns_config"]["nameservers"])
if "magic_dns" in config_yaml["dns_config"]: magic_dns = str(config_yaml["dns_config"]["magic_dns"])
if "domains" in config_yaml["dns_config"]: domains = str(config_yaml["dns_config"]["domains"])
if "base_domain" in config_yaml["dns_config"]: base_domain = str(config_yaml["dns_config"]["base_domain"])
elif "dns" in config_yaml:
if "nameservers" in config_yaml["dns"]: nameservers = "global: " + str(config_yaml["dns"]["nameservers"]["global"]) + " split: " + str(config_yaml["dns"]["nameservers"]["split"])
if "magic_dns" in config_yaml["dns"]: magic_dns = str(config_yaml["dns"]["magic_dns"])
if "domains" in config_yaml["dns"]: domains = str(config_yaml["dns"]["domains"])
if "base_domain" in config_yaml["dns"]: base_domain = str(config_yaml["dns"]["base_domain"])
# Start putting the content together
overview_content = """
"""
general_content = """
"""
oidc_content = """
"""
derp_content = """
"""
dns_content = """
"""
# Remove content that isn't needed:
# Remove OIDC if it isn't available:
if "oidc" not in config_yaml: oidc_content = ""
# Remove DERP if it isn't available or isn't enabled
if "derp" not in config_yaml: derp_content = ""
if "derp" in config_yaml:
if "server" in config_yaml["derp"]:
if str(config_yaml["derp"]["server"]["enabled"]) == "False":
derp_content = ""
# TODO:
# Whether there are custom DERP servers
# If there are custom DERP servers, get the file location from the config file. Assume mapping is the same.
# Whether the built-in DERP server is enabled
# The IP prefixes
# The DNS config
if config_yaml["derp"]["paths"]: pass
# # open the path:
# derp_file =
# config_file = open("/etc/headscale/config.yaml", "r")
# config_yaml = yaml.safe_load(config_file)
# The ACME config, if not empty
# Whether updates are running
# Whether metrics are enabled (and their listen addr)
# The log level
# What kind of Database is being used to drive headscale
content = "
" + overview_content + general_content + derp_content + oidc_content + dns_content + ""
return Markup(content)
def thread_machine_content(machine, machine_content, idx, all_routes, failover_pair_prefixes):
# machine = passed in machine information
# content = place to write the content
# app.logger.debug("Machine Information")
# app.logger.debug(str(machine))
app.logger.debug("Machine Information =================")
app.logger.debug("Name: %s, ID: %s, User: %s, givenName: %s, ", str(machine["name"]), str(machine["id"]), str(machine["user"]["name"]), str(machine["givenName"]))
url = headscale.get_url()
api_key = headscale.get_api_key()
# Set the current timezone and local time
timezone = pytz.timezone(os.environ["TZ"] if os.environ["TZ"] else "UTC")
local_time = timezone.localize(datetime.now())
# Get the machines routes
pulled_routes = headscale.get_machine_routes(url, api_key, machine["id"])
routes = ""
# Test if the machine is an exit node:
exit_route_found = False
exit_route_enabled = False
# If the device has enabled Failover routes (High Availability routes)
ha_enabled = False
# If the length of "routes" is NULL/0, there are no routes, enabled or disabled:
if len(pulled_routes["routes"]) > 0:
advertised_routes = False
# First, check if there are any routes that are both enabled and advertised
# If that is true, we will output the collection-item for routes. Otherwise, it will not be displayed.
for route in pulled_routes["routes"]:
if route["advertised"]:
advertised_routes = True
if advertised_routes:
routes = """
directions
Routes
"""
# app.logger.debug("Pulled Routes Dump: "+str(pulled_routes))
# app.logger.debug("All Routes Dump: "+str(all_routes))
# Find all exits and put their ID's into the exit_routes array
exit_routes = []
exit_enabled_color = "red"
exit_tooltip = "enable"
exit_route_enabled = False
for route in pulled_routes["routes"]:
if route["prefix"] == "0.0.0.0/0" or route["prefix"] == "::/0":
exit_routes.append(route["id"])
exit_route_found = True
# Test if it is enabled:
if route["enabled"]:
exit_enabled_color = "green"
exit_tooltip = 'disable'
exit_route_enabled = True
app.logger.debug("Found exit route ID's: "+str(exit_routes))
app.logger.debug("Exit Route Information: ID: %s | Enabled: %s | exit_route_enabled: %s / Found: %s", str(route["id"]), str(route["enabled"]), str(exit_route_enabled), str(exit_route_found))
# Print the button for the Exit routes:
if exit_route_found:
routes = routes+"""
Exit Route
"""
# Check if the route has another enabled identical route.
# Check all routes from the current machine...
for route in pulled_routes["routes"]:
# ... against all routes from all machines ....
for route_info in all_routes["routes"]:
app.logger.debug("Comparing routes %s and %s", str(route["prefix"]), str(route_info["prefix"]))
# ... If the route prefixes match and are not exit nodes ...
if str(route_info["prefix"]) == str(route["prefix"]) and (route["prefix"] != "0.0.0.0/0" and route["prefix"] != "::/0"):
# Check if the route ID's match. If they don't ...
app.logger.debug("Found a match: %s and %s", str(route["prefix"]), str(route_info["prefix"]))
if route_info["id"] != route["id"]:
app.logger.debug("Route ID's don't match. They're on different nodes.")
# ... Check if the routes prefix is already in the array...
if route["prefix"] not in failover_pair_prefixes:
# IF it isn't, add it.
app.logger.info("New HA pair found: %s", str(route["prefix"]))
failover_pair_prefixes.append(str(route["prefix"]))
if route["enabled"] and route_info["enabled"]:
# If it is already in the array. . .
# Show as HA only if both routes are enabled:
app.logger.debug("Both routes are enabled. Setting as HA [%s] (%s) ", str(machine["name"]), str(route["prefix"]))
ha_enabled = True
# If the route is an exit node and already counted as a failover route, it IS a failover route, so display it.
if route["prefix"] != "0.0.0.0/0" and route["prefix"] != "::/0" and route["prefix"] in failover_pair_prefixes:
route_enabled = "red"
route_tooltip = 'enable'
color_index = failover_pair_prefixes.index(str(route["prefix"]))
route_enabled_color = helper.get_color(color_index, "failover")
if route["enabled"]:
color_index = failover_pair_prefixes.index(str(route["prefix"]))
route_enabled = helper.get_color(color_index, "failover")
route_tooltip = 'disable'
routes = routes+"""
"""+route['prefix']+"""
"""
# Get the remaining routes:
for route in pulled_routes["routes"]:
# Get the remaining routes - No exits or failover pairs
if route["prefix"] != "0.0.0.0/0" and route["prefix"] != "::/0" and route["prefix"] not in failover_pair_prefixes:
app.logger.debug("Route: ["+str(route[str(url_node_name)]['name'])+"] id: "+str(route['id'])+" / prefix: "+str(route['prefix'])+" enabled?: "+str(route['enabled']))
route_enabled = "red"
route_tooltip = 'enable'
if route["enabled"]:
route_enabled = "green"
route_tooltip = 'disable'
routes = routes+"""
"""+route['prefix']+"""
"""
routes = routes+""
# Get machine tags
tag_array = ""
for tag in machine["forcedTags"]:
tag_array = tag_array+"{tag: '"+tag[4:]+"'}, "
tags = """
label
Tags
"""
# Get the machine IP's
machine_ips = ""
for ip_address in machine["ipAddresses"]:
machine_ips = machine_ips+"- "+ip_address+"
"
machine_ips = machine_ips+"
"
# Format the dates for easy readability
last_seen_parse = parser.parse(machine["lastSeen"])
if url_node_name == "machine":
last_seen_local = last_seen_parse.astimezone(timezone)
else:
last_seen_local = local_time if machine["online"] is True else last_seen_parse.astimezone(timezone)
last_seen_delta = local_time - last_seen_local
last_seen_print = helper.pretty_print_duration(last_seen_delta)
last_seen_time = str(last_seen_local.strftime('%A %m/%d/%Y, %H:%M:%S'))+" "+str(timezone)+" ("+str(last_seen_print)+")"
if url_node_name == "machine":
last_update_parse = local_time if machine["lastSuccessfulUpdate"] is None else parser.parse(machine["lastSuccessfulUpdate"])
else:
last_update_parse = local_time if machine["online"] is True else last_seen_parse
last_update_local = last_update_parse.astimezone(timezone)
last_update_delta = local_time - last_update_local
last_update_print = helper.pretty_print_duration(last_update_delta)
last_update_time = str(last_update_local.strftime('%A %m/%d/%Y, %H:%M:%S'))+" "+str(timezone)+" ("+str(last_update_print)+")"
created_parse = parser.parse(machine["createdAt"])
created_local = created_parse.astimezone(timezone)
created_delta = local_time - created_local
created_print = helper.pretty_print_duration(created_delta)
created_time = str(created_local.strftime('%A %m/%d/%Y, %H:%M:%S'))+" "+str(timezone)+" ("+str(created_print)+")"
# If there is no expiration date, we don't need to do any calculations:
if machine["expiry"] != "0001-01-01T00:00:00Z":
expiry_parse = parser.parse(machine["expiry"])
expiry_local = expiry_parse.astimezone(timezone)
expiry_delta = expiry_local - local_time
expiry_print = helper.pretty_print_duration(expiry_delta, "expiry")
if str(expiry_local.strftime('%Y')) in ("0001", "9999", "0000"):
expiry_time = "No expiration date."
elif int(expiry_local.strftime('%Y')) > int(expiry_local.strftime('%Y'))+2:
expiry_time = str(expiry_local.strftime('%m/%Y'))+" "+str(timezone)+" ("+str(expiry_print)+")"
else:
expiry_time = str(expiry_local.strftime('%A %m/%d/%Y, %H:%M:%S'))+" "+str(timezone)+" ("+str(expiry_print)+")"
expiring_soon = True if int(expiry_delta.days) < 14 and int(expiry_delta.days) > 0 else False
app.logger.debug("Machine: "+machine["name"]+" expires: "+str(expiry_local.strftime('%Y'))+" / "+str(expiry_delta.days))
else:
expiry_time = "No expiration date."
expiring_soon = False
app.logger.debug("Machine: "+machine["name"]+" has no expiration date")
# Get the first 10 characters of the PreAuth Key:
if machine["preAuthKey"]:
preauth_key = str(machine["preAuthKey"]["key"])[0:10]
else: preauth_key = "None"
# Set the status and user badge color:
text_color = helper.text_color_duration(last_seen_delta)
user_color = helper.get_color(int(machine["user"]["id"]))
# Generate the various badges:
status_badge = "fiber_manual_record"
user_badge = ""+machine["user"]["name"]+""
exit_node_badge = "" if not exit_route_enabled else "Exit"
ha_route_badge = "" if not ha_enabled else "HA"
expiration_badge = "" if not expiring_soon else "Expiring!"
machine_content[idx] = (str(render_template(
'machines_card.html',
given_name = machine["givenName"],
machine_id = machine["id"],
hostname = machine["name"],
ns_name = machine["user"]["name"],
ns_id = machine["user"]["id"],
ns_created = machine["user"]["createdAt"],
last_seen = str(last_seen_print),
last_update = str(last_update_print),
machine_ips = Markup(machine_ips),
advertised_routes = Markup(routes),
exit_node_badge = Markup(exit_node_badge),
ha_route_badge = Markup(ha_route_badge),
status_badge = Markup(status_badge),
user_badge = Markup(user_badge),
last_update_time = str(last_update_time),
last_seen_time = str(last_seen_time),
created_time = str(created_time),
expiry_time = str(expiry_time),
preauth_key = str(preauth_key),
expiration_badge = Markup(expiration_badge),
machine_tags = Markup(tags),
taglist = machine["forcedTags"]
)))
app.logger.info("Finished thread for machine "+machine["givenName"]+" index "+str(idx))
def render_machines_cards():
app.logger.info("Rendering machine cards")
url = headscale.get_url()
api_key = headscale.get_api_key()
machines_list = headscale.get_machines(url, api_key)
#########################################
# Thread this entire thing.
num_threads = len(machines_list[str(url_node_name)+"s"])
iterable = []
machine_content = {}
failover_pair_prefixes = []
for i in range (0, num_threads):
app.logger.debug("Appending iterable: "+str(i))
iterable.append(i)
# Flask-Executor Method:
# Get all routes
all_routes = headscale.get_routes(url, api_key)
# app.logger.debug("All found routes")
# app.logger.debug(str(all_routes))
if LOG_LEVEL == "DEBUG":
# DEBUG: Do in a forloop:
for idx in iterable: thread_machine_content(machines_list[str(url_node_name)+"s"][idx], machine_content, idx, all_routes, failover_pair_prefixes)
else:
app.logger.info("Starting futures")
futures = [executor.submit(thread_machine_content, machines_list[str(url_node_name)+"s"][idx], machine_content, idx, all_routes, failover_pair_prefixes) for idx in iterable]
# Wait for the executor to finish all jobs:
wait(futures, return_when=ALL_COMPLETED)
app.logger.info("Finished futures")
# Sort the content by machine_id:
sorted_machines = {key: val for key, val in sorted(machine_content.items(), key = lambda ele: ele[0])}
content = ""
# Print the content
for index in range(0, num_threads):
content = content+str(sorted_machines[index])
content = content+"
"
return Markup(content)
def render_users_cards():
app.logger.info("Rendering Users cards")
url = headscale.get_url()
api_key = headscale.get_api_key()
user_list = headscale.get_users(url, api_key)
content = ""
for user in user_list["users"]:
# Get all preAuth Keys in the user, only display if one exists:
preauth_keys_collection = build_preauth_key_table(user["name"])
# Set the user badge color:
user_color = helper.get_color(int(user["id"]), "text")
# Generate the various badges:
status_badge = "fiber_manual_record"
content = content + render_template(
'users_card.html',
status_badge = Markup(status_badge),
user_name = user["name"],
user_id = user["id"],
preauth_keys_collection = Markup(preauth_keys_collection)
)
content = content+"
"
return Markup(content)
def build_preauth_key_table(user_name):
app.logger.info("Building the PreAuth key table for User: %s", str(user_name))
url = headscale.get_url()
api_key = headscale.get_api_key()
preauth_keys = headscale.get_preauth_keys(url, api_key, user_name)
preauth_keys_collection = """
Toggle Expired
Add PreAuth Key
vpn_key
PreAuth Keys
"""
if len(preauth_keys["preAuthKeys"]) == 0: preauth_keys_collection += "No keys defined for this user
"
if len(preauth_keys["preAuthKeys"]) > 0:
preauth_keys_collection += """
ID |
Key Prefix |
Reusable |
Used |
Ephemeral |
Usable |
Actions |
"""
for key in preauth_keys["preAuthKeys"]:
# Get the key expiration date and compare it to now to check if it's expired:
# Set the current timezone and local time
timezone = pytz.timezone(os.environ["TZ"] if os.environ["TZ"] else "UTC")
local_time = timezone.localize(datetime.now())
expiration_parse = parser.parse(key["expiration"])
key_expired = True if expiration_parse < local_time else False
expiration_time = str(expiration_parse.strftime('%A %m/%d/%Y, %H:%M:%S'))+" "+str(timezone)
key_usable = False
if key["reusable"] and not key_expired: key_usable = True
if not key["reusable"] and not key["used"] and not key_expired: key_usable = True
# Class for the javascript function to look for to toggle the hide function
hide_expired = "expired-row" if not key_usable else ""
btn_reusable = "fiber_manual_record" if key["reusable"] else ""
btn_ephemeral = "fiber_manual_record" if key["ephemeral"] else ""
btn_used = "fiber_manual_record" if key["used"] else ""
btn_usable = "fiber_manual_record" if key_usable else ""
# Other buttons:
btn_delete = "Expire" if key_usable else ""
tooltip_data = "Expiration: "+expiration_time
# TR ID will look like "1-albert-tr"
preauth_keys_collection = preauth_keys_collection+"""
"""+str(key["id"])+""" |
"""+str(key["key"])[0:10]+""" |
"""+btn_reusable+""" |
"""+btn_used+""" |
"""+btn_ephemeral+""" |
"""+btn_usable+""" |
"""+btn_delete+""" |
"""
preauth_keys_collection = preauth_keys_collection+"""
"""
return preauth_keys_collection
def oidc_nav_dropdown(user_name, email_address, name):
app.logger.info("OIDC is enabled. Building the OIDC nav dropdown")
html_payload = """
-
email
Email
"""+email_address+"""
-
person_outline
Username
"""+user_name+"""
- exit_to_app Logout
"""+name+""" account_circle
"""
return Markup(html_payload)
def oidc_nav_mobile(user_name, email_address, name):
html_payload = """
exit_to_appLogout
"""
return Markup(html_payload)
def render_search():
html_payload = """
search
"""
return Markup(html_payload)
def render_routes():
app.logger.info("Rendering Routes page")
url = headscale.get_url()
api_key = headscale.get_api_key()
all_routes = headscale.get_routes(url, api_key)
# If there are no routes, just exit:
if len(all_routes) == 0: return Markup("
There are no routes to display!")
# Get a list of all Route ID's to iterate through:
all_routes_id_list = []
for route in all_routes["routes"]:
all_routes_id_list.append(route["id"])
if route[str(url_node_name)]["name"]:
app.logger.info("Found route %s / machine: %s", str(route["id"]), route[str(url_node_name)]["name"])
else:
app.logger.info("Route id %s has no machine associated.", str(route["id"]))
route_content = ""
failover_content = ""
exit_content = ""
route_title='Routes'
failover_title='Failover Routes'
exit_title='Exit Routes'
markup_pre = """
"""
##############################################################################################
# Step 1: Get all non-exit and non-failover routes:
route_content = markup_pre+route_title
route_content += """
ID |
Machine |
Route |
Enabled |
"""
for route in all_routes["routes"]:
# Get relevant info:
route_id = route["id"]
machine = route[str(url_node_name)]["givenName"]
prefix = route["prefix"]
is_enabled = route["enabled"]
is_primary = route["isPrimary"]
is_failover = False
is_exit = False
enabled = "fiber_manual_record"
disabled = "fiber_manual_record"
# Set the displays:
enabled_display = disabled
if is_enabled: enabled_display = enabled
# Check if a prefix is an Exit route:
if prefix == "0.0.0.0/0" or prefix == "::/0": is_exit = True
# Check if a prefix is part of a failover pair:
for route_check in all_routes["routes"]:
if not is_exit:
if route["prefix"] == route_check["prefix"]:
if route["id"] != route_check["id"]:
is_failover = True
if not is_exit and not is_failover and machine != "":
# Build a simple table for all non-exit routes:
route_content += """
"""+str(route_id )+""" |
"""+str(machine )+""" |
"""+str(prefix )+""" |
"""+str(enabled_display )+""" |
"""
route_content += "
"+markup_post
##############################################################################################
# Step 2: Get all failover routes only. Add a separate table per failover prefix
failover_route_prefix = []
failover_available = False
for route in all_routes["routes"]:
# Get a list of all prefixes for all routes...
for route_check in all_routes["routes"]:
# ... that aren't exit routes...
if route["prefix"] !="0.0.0.0/0" and route["prefix"] != "::/0":
# if the curren route matches any prefix of any other route...
if route["prefix"] == route_check["prefix"]:
# and the route ID's are different ...
if route["id"] != route_check["id"]:
# ... and the prefix is not already in the list...
if route["prefix"] not in failover_route_prefix:
# append the prefix to the failover_route_prefix list
failover_route_prefix.append(route["prefix"])
failover_available = True
if failover_available:
# Set up the display code:
enabled = "fiber_manual_record"
disabled = "fiber_manual_record"
failover_content = markup_pre+failover_title
# Build the display for failover routes:
for route_prefix in failover_route_prefix:
# Get all route ID's associated with the route_prefix:
route_id_list = []
for route in all_routes["routes"]:
if route["prefix"] == route_prefix:
route_id_list.append(route["id"])
# Set up the display code:
failover_enabled = "fiber_manual_record"
failover_disabled = "fiber_manual_record"
failover_display = failover_disabled
for route_id in route_id_list:
# Get the routes index:
current_route_index = all_routes_id_list.index(route_id)
if all_routes["routes"][current_route_index]["enabled"]: failover_display = failover_enabled
# Get all route_id's associated with the route prefix:
failover_content += """
"""+failover_display+"""
"""+str(route_prefix)+"""
Machine |
Enabled |
Primary |
"""
# Build the display:
for route_id in route_id_list:
idx = all_routes_id_list.index(route_id)
machine = all_routes["routes"][idx]["machine"]["givenName"]
machine_id = all_routes["routes"][idx]["machine"]["id"]
is_primary = all_routes["routes"][idx]["isPrimary"]
is_enabled = all_routes["routes"][idx]["enabled"]
payload = []
for item in route_id_list: payload.append(int(item))
app.logger.debug("[%s] Machine: [%s] %s : %s / %s", str(route_id), str(machine_id), str(machine), str(is_enabled), str(is_primary))
app.logger.debug(str(all_routes["routes"][idx]))
# Set up the display code:
enabled_display_enabled = "fiber_manual_record"
enabled_display_disabled = "fiber_manual_record"
primary_display_enabled = "fiber_manual_record"
primary_display_disabled = "fiber_manual_record"
# Set displays:
enabled_display = enabled_display_enabled if is_enabled else enabled_display_disabled
primary_display = primary_display_enabled if is_primary else primary_display_disabled
# Build a simple table for all non-exit routes:
failover_content += """
"""+str(machine)+""" |
"""+str(enabled_display)+""" |
"""+str(primary_display)+""" |
"""
failover_content += "
"
failover_content += markup_post
##############################################################################################
# Step 3: Get exit nodes only:
exit_node_list = []
# Get a list of nodes with exit routes:
for route in all_routes["routes"]:
# For every exit route found, store the machine name in an array:
if route["prefix"] == "0.0.0.0/0" or route["prefix"] == "::/0":
if route[str(url_node_name)]["givenName"] not in exit_node_list:
exit_node_list.append(route[str(url_node_name)]["givenName"])
# Exit node display building:
# Display by machine, not by route
exit_content = markup_pre+exit_title
exit_content += """
Machine |
Enabled |
"""
# Get exit route ID's for each node in the list:
for node in exit_node_list:
node_exit_route_ids = []
exit_enabled = False
exit_available = False
machine_id = 0
for route in all_routes["routes"]:
if route["prefix"] == "0.0.0.0/0" or route["prefix"] == "::/0":
if route[str(url_node_name)]["givenName"] == node:
node_exit_route_ids.append(route["id"])
machine_id = route[str(url_node_name)]["id"]
exit_available = True
if route["enabled"]:
exit_enabled = True
if exit_available:
# Set up the display code:
enabled = "fiber_manual_record"
disabled = "fiber_manual_record"
# Set the displays:
enabled_display = enabled if exit_enabled else disabled
exit_content += """
"""+str(node)+""" |
"""+str(enabled_display)+""" |
"""
exit_content += "
"+markup_post
content = route_content + failover_content + exit_content
return Markup(content)