import influxdb_client import argparse import logging import sys argParser = argparse.ArgumentParser() argParser.add_argument("-t", "--token", type=str, help="Custom or allAccess token to access influx DB instance") argParser.add_argument("-e", "--endpointUrl", type=str, help="Endpoint Url of influxdb instance (ex. \"https://myInfluxdbInstance:8086/\")") argParser.add_argument("-v", "--verbose", type=bool, const=True, nargs='?', help="Enable verbose logging - INFO") argParser.add_argument("-vv", "--vverbose", type=bool, const=True, nargs='?', help="Enable verbose logging - DEBUG") args = argParser.parse_args() # Using user retrieved values or default (hardcoded) ones all_access_token = "" influx_endpoint_url = "" # Defining some colors red = "\033[31m" yellow = "\033[93m" purple = "\33[1;95m" green = "\033[0;92m" cyan = "\033[96m" bold ="\033[1m" endc = "\033[39m" if args.vverbose == True: logging.basicConfig(level=logging.DEBUG) elif args.verbose == True: logging.basicConfig(level=logging.INFO) logger = logging.getLogger() if args.token: token = args.token else: logger.debug(f"{yellow}User did not set a token, using default one{endc}") token = all_access_token if args.endpointUrl: endpointUrl = args.endpointUrl else: logger.debug(f"{yellow}User did not set an endpoint Url for influxdb, using default one{endc}") endpointUrl = influx_endpoint_url logger.info(f"{cyan}Connecting to influx DB instance{endc}") # Connecting to influxdb instance try: conn = influxdb_client.InfluxDBClient( url=endpointUrl, token=token, debug=False, verify_ssl=True ) # Verify InfluxDB connection health = conn.ping() if not health: logger.error(f"{red}Unable to connect to db instace " + endpointUrl + f"{endc}") print(f"{red}Quitting execution...{endc}") sys.exit(1) except Exception as e: logger.error(f"{red}Failed to connect to db instance: " + endpointUrl + " Error: " + str(e) + f"{endc}") print(f"{red}Quitting execution...{endc}") sys.exit(1) # Retrieving all current auths logger.debug(f"{yellow}Retrieving all auth tokens{endc}") print(f"{cyan}Enumerating current authorizations...{endc}") try: auths = conn.authorizations_api().find_authorizations() except Exception as e: logger.error(f"{red}Unable to retrieve authorizations. ERR: " + str(e) +f"{endc}") print(f"{red}Unable to retrieve authorizations. Quitting...{endc}") sys.exit(1) if not auths: print(f"{cyan}No Authorization tokens found on the instance{endc}") sys.exit(1) print(f"{cyan}{str(len(auths))} tokens found on the instance{endc}\n") # Extracting operator token -> Parsing permissions to look for ("org = None" and "authType = write/auths"), not 100% efficiency -> TO OPTIMIZE logger.debug(f"{yellow}Parsing auth permissions to retrieve operator tokens{endc}") print(f"{cyan}Enumerating all operator tokens:{endc}") op_tokens = [] # In order to understand if a token is of type "operator" we need to enumerate all permissions and look for "write/auths" on org 'None' -> Unrescticted access try: for auth in auths: if auth.permissions: for perm in auth.permissions: if perm.action == "write" and perm.resource.org == None and perm.resource.type == "authorizations": op_tokens.append(auth.token) except Exception as e: logger.error(f"{red}Unable to parse permissions on found authorizations. ERR: " + str(e) + f"{endc}") print(f"{red}Unable to parse permissions on found authorizations. Quitting execution...{endc}") sys.exit(1) logger.info(f"{cyan}Printing all operator auth tokens{endc}") print(f"{cyan}{str(len(op_tokens))} operator tokens found.\n\nListing all operator tokens:\n{endc}") for op_t in op_tokens: print(f"{green}{op_t}{endc}")