#!/bin/sh # vim: syntax=python ''':' # First try to run this script with python3, else run with python then python2 if command -v python3 >/dev/null 2>/dev/null; then exec python3 "$0" "$@" elif command -v python >/dev/null 2>/dev/null; then exec python "$0" "$@" else exec python2 "$0" "$@" fi ''' # -*- coding: utf-8 -*- # # Name: rudder-compliance # Synopsis: Get compliance output Rules and Directives # Requirements: requests Python module # Imports - builtins import argparse import json import csv import sys from pathlib import Path # Imports - external import requests from requests.packages.urllib3.exceptions import InsecureRequestWarning, InsecurePlatformWarning, SNIMissingWarning # Configuration - BEGIN api_version = "latest" default_api_url = "https://rudder.example.com/rudder/api" default_api_token = "z7v9j0hsNnrjRvBnLvL01ioUwVJpTgHs" validate_https = False # Configuration - END # Global variables headers = {} compliance_url = "" directives_url = "" rules_url = "" csv_file_string = "/tmp/compliance.csv" non_compliances_keys = ["noReport", "error", "auditNonCompliant", "auditError", "unexpectedMissingComponent"] # file is a Path def write_to_csv(file, entries): with file.open('w') as csvfile: csv_output = csv.writer(csvfile, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) for entry in entries: csv_output.writerow(entry) print("Wrote csv file {}".format(file)) def id_to_name(rules, rule): name = 'Unknown' for i in rules: if i['id'] == rule: name = i['displayName'] return name #returns true is there are any non compliance in the current compliance # file the array of non_compliances with the actual non comlpiance # format is {'id': '32377fd7-02fd-43d0-aab7-28460a91347b', 'name': 'Global configuration for all nodes', 'compliance': 50.0, 'mode': 'full-compliance', 'complianceDetails': {'successAlreadyOK': 16.67, 'successRepaired': 16.67, 'noReport': 50.0, 'successNotApplicable': 16.67}, 'directives': [{'id': '86f6973d-4c1a-4d06-8470-3548b6b4c6cd', 'name': 'Packages', 'compliance': 50.0, 'complianceDetails': {'successAlreadyOK': 25.0, 'noReport': 50.0, 'successNotApplicable': 25.0}, 'components': [{'name': 'Post-modification script', 'compliance': 50.0, 'complianceDetails': {'noReport': 50.0, 'successNotApplicable': 50.0}, 'nodes': [{'id': 'root', 'name': 'server.rudder.local', 'values': [{'value': 'tree', 'reports': [{'status': 'successNotApplicable', 'message': 'No post-modification script was defined'}]}]}, {'id': 'cd9841c5-2267-4a28-8d21-6ba66fbbc7ed', 'name': 'agent1.rudder.local', 'values': [{'value': 'tree', 'reports': [{'status': 'noReport'}]}]}]}, {'name': 'Package', 'compliance': 50.0, 'complianceDetails': {'successAlreadyOK': 50.0, 'noReport': 50.0}, 'nodes': [{'id': 'root', 'name': 'server.rudder.local', 'values': [{'value': 'tree', 'reports': [{'status': 'successAlreadyOK', 'message': 'Presence of package tree in latest available version was correct'}]}]}, {'id': 'cd9841c5-2267-4a28-8d21-6ba66fbbc7ed', 'name': 'agent1.rudder.local', 'values': [{'value': 'tree', 'reports': [{'status': 'noReport'}]}]}]}]}, {'id': '1576dfe5-19ed-4b66-a6e6-36bb3c1ffafe', 'name': 'MOTD and pre-login banner', 'compliance': 50.0, 'complianceDetails': {'noReport': 50.0, 'successRepaired': 50.0}, 'components': [{'name': 'MOTD Configuration', 'compliance': 50.0, 'complianceDetails': {'noReport': 50.0, 'successRepaired': 50.0}, 'nodes': [{'id': 'root', 'name': 'server.rudder.local', 'values': [{'value': 'None', 'reports': [{'status': 'successRepaired', 'message': 'The MOTD file was repaired'}]}]}, {'id': 'cd9841c5-2267-4a28-8d21-6ba66fbbc7ed', 'name': 'agent1.rudder.local', 'values': [{'value': 'None', 'reports': [{'status': 'noReport'}]}]}]}]}]} def get_non_compliance_rule(rule_compliance, non_compliances): # check if key complianceDetails exists if 'complianceDetails' in rule_compliance: compliance = rule_compliance['complianceDetails'] if any (k in compliance for k in non_compliances_keys): rule_name = rule_compliance["name"] for directive_compliance in rule_compliance["directives"]: extract_non_compliance_by_directive(rule_name, directive_compliance, non_compliances) return True else: return False else: print("No compliance details for rule_compliance {}".format(rule_compliance)) # Block compliance is nested, but we can ignore the block layer as we only need to compute the method compliance per node. # Input: {"name": "block 1", "compliance": 50.0, "complianceDetails": {"error": 50.0, "successRepaired": 50.0}, "components": [{"name": "test subblock", "compliance": 100.0, "complianceDetails": {"successRepaired": 100.0}, "components": [{"name": "badaboum", "compliance": 100.0, "complianceDetails": {"successRepaired": 100.0}, "nodes": [{"id": "6f38e8e6-7752-4cb4-8bdb-bfba0bb2ed1e", "name": "agent1.rudder.local", "compliance": 100.0, "complianceDetails": {"successRepaired": 100.0}, "values": [{"value": "/bin/true", "reports": [{"status": "successRepaired", "message": "Execute command /bin/true was repaired"}]}]}]}]}, {"name": "Report if condition", "compliance": 0.0, "complianceDetails": {"error": 100.0}, "nodes": [{"id": "6f38e8e6-7752-4cb4-8bdb-bfba0bb2ed1e", "name": "agent1.rudder.local", "compliance": 0.0, "complianceDetails": {"error": 100.0}, "values": [{"value": "patatra", "reports": [{"status": "error", "message": "patatra could not be repaired"}]}]}]}]} # Output: [['agent1.rudder.local', 'patatra', 'error', 'patatra could not be repaired']] def walk_through_blocks(compliance_component): local_non_compliances = [] if 'components' in compliance_component: for sub_block in compliance_component['components']: local_non_compliances.extend(walk_through_blocks(sub_block)) elif 'nodes' in compliance_component: for nodes in compliance_component["nodes"]: # and now, get the values that have a status of non compliance for values in nodes["values"]: [ local_non_compliances.append( [ nodes["name"], values['value'], x['status'], x.get('message', "") ] ) for x in values['reports'] if x.get('status', []) in non_compliances_keys ] return local_non_compliances # non compliance is the array containing the result def extract_non_compliance_by_directive(rule_name, directive_compliance, non_compliances): # Look if any directive is non compliant if any (k in directive_compliance['complianceDetails'] for k in non_compliances_keys): for components_compliance in directive_compliance["components"]: # Look for the faulty component if any (k in components_compliance['complianceDetails'] for k in non_compliances_keys): # Use recursion to get the per node compliance as it can be nested in blocks for i in walk_through_blocks(components_compliance): entry = [rule_name, directive_compliance["name"], components_compliance["name"]] entry.extend(i) non_compliances.append(entry) def directives_compliance(compliance_in, wanted_directives, csv_output): # Call the API to get the complete list of directive definitions request = requests.get(directives_url, headers=headers, verify=validate_https) directives = json.loads(request.content)["data"]["directives"] # Call the API to get the complete list of rule definitions request = requests.get(rules_url, headers=headers, verify=validate_https) rules = json.loads(request.content)["data"]["rules"] directives_compliance_input = [] directives_compliance_output = {} non_compliances = [] csv_content = [ [ 'rule_id', 'directive_id', 'compliance' ] ] non_compliance_csv_content = [[ 'rule_name', 'directive_name', 'component_name', 'node', 'value', 'status', 'message']] if len(wanted_directives) != 0: # Store directives matching the wanted ones in directives_compliance_input for directive in directives: if directive['id'] in wanted_directives: directives_compliance_input.append(directive) else: # Filter active rules: remove disabled ones for directive in directives: if not directive['enabled']: continue directives_compliance_input.append(directive) # Store result in output # order the compliance by name sorted_compliance = sorted(compliance_in, key=lambda comp: comp['name']) for compliance in sorted_compliance: rule = compliance['id'] rule_name = compliance['name'] for directive in compliance['directives']: for wantedir in directives_compliance_input: if directive['id'] == wantedir['id']: if not rule in directives_compliance_output: directives_compliance_output[rule] = {} directives_compliance_output[rule][directive['id']] = directive['compliance'] directive_name = directive['name'] extract_non_compliance_by_directive(rule_name, directive, non_compliances) # Human-readable output for i in sorted(directives_compliance_output): rule_name = id_to_name(rules, i) print('* Rule {}'.format(rule_name)) for j in directives_compliance_output[i]: directive_name = id_to_name(directives, j) directive_compliance = directives_compliance_output[i][j] print('** {}: {} %'.format(directive_name, directive_compliance)) csv_content.append([ i, j, directive_compliance ]) print() # CSV output if csv_output != None: # convert the csv output to path csv_path = Path(csv_output) write_to_csv(csv_path, csv_content) non_compliance_path = csv_path.with_name('non_compliance_'+csv_path.name) non_compliance_csv_content += non_compliances write_to_csv(non_compliance_path, non_compliance_csv_content) def rules_compliance(compliance_in, wanted_rules, csv_output): # Call the API to get the complete list of rule definitions request = requests.get(rules_url, headers=headers, verify=validate_https) rules = json.loads(request.content)["data"]["rules"] rules_compliance_input = [] rules_compliance_output = [] non_compliances = [] existing_non_compliance = False csv_content = [ [ 'rule_id', 'compliance' ] ] non_compliance_csv_content = [[ 'rule_name', 'directive_name', 'component_name', 'node', 'value', 'status', 'message']] if len(wanted_rules) != 0: # Store rules matching the wanted ones in rules_compliance_input for rule in rules: if rule['id'] in wanted_rules: rules_compliance_input.append(rule) else: # Filter active rules: remove disabled ones for rule in rules: if not rule['enabled']: continue rules_compliance_input.append(rule) # order the input by name sorted_input = sorted(rules_compliance_input, key=lambda rule: rule['displayName']) # Store result in output for rule in sorted_input: id = rule['id'] name = rule['displayName'] compliance = 'NaN' for rule_compliance in compliance_in: if rule_compliance['id'] == rule['id']: compliance = rule_compliance['compliance'] # look for non compliance if get_non_compliance_rule(rule_compliance, non_compliances): existing_non_compliance = True rules_compliance_output.append([id, compliance]) # Human-readable output for i in rules_compliance_output: print("* Rule {}: {} %".format(id_to_name(rules, i[0]), i[1])) # CSV output if csv_output != None: # convert the csv output to path csv_path = Path(csv_output) csv_content += rules_compliance_output write_to_csv(csv_path, csv_content) if existing_non_compliance: non_compliance_path = csv_path.with_name('non_compliance_'+csv_path.name) non_compliance_csv_content += non_compliances write_to_csv(non_compliance_path, non_compliance_csv_content) if __name__ == "__main__": # Parse arguments parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter, description="""\ Get the compliance of Directives or Rules, and optionnally output it in the CSV format. Example: %(prog)s --csv /tmp/rules.csv --rules Will output all the rules compliance to stdout and write them in the csv file /tmp/rules.csv The non-compliances (if any) will stored in file /tmp/non_compliance_rules.csv (csv filename prefixed with 'non_compliance_') """) parser.add_argument('-c', '--csv', nargs='?', help='Output to a CSV file (defaults to ' + csv_file_string + ') and to non_compliance_FILENAME the list of non compliance if any' ) parser.add_argument('-d', '--directives', nargs='*', help='Get directives compliance') parser.add_argument('-r', '--rules', nargs='*', help='Get rules compliance. If one or more rule id is given (separated by space), will restrict to these rules; e.g --rules 32377fd7-02fd-43d0-aab7-28460a91347b ab2f041-0d9f-491d-8323-68a5dbd207ee') parser.add_argument('-s', '--server', nargs='?', help='Rudder server URL, default to localhost') parser.add_argument('-t', '--token', nargs='?', help='Rudder API token') args = vars(parser.parse_args()) if len(sys.argv) == 1: parser.print_help() sys.exit(1) # Disable warnings about using self-signed certificates # if certificate validation has been disabled. if not validate_https: from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) # override default server if defined if args['server'] != None: api_url = "https://%s/rudder/api" % (args['server']) else: api_url = default_api_url if args['token'] != None: api_token = args['token'] else: api_token = default_api_token # define the connexion parameters headers = {"X-API-Token": api_token} compliance_url = "%s/%s/compliance/rules" % (api_url, api_version) directives_url = "%s/%s/directives" % (api_url, api_version) rules_url = "%s/%s/rules" % (api_url, api_version) # Override the default CSV file if needed if args['csv'] != None: if len(args['csv']) >= 1: csv_file_string = args['csv'] else: csv_file_string = None # Call the API to get the complete list of rule compliances request = requests.get(compliance_url + "?level=5", headers=headers, verify=validate_https) compliance_in = json.loads(request.content)["data"]["rules"] compliance_out = [] # If Directive compliance was asked for if args['directives'] != None: directives_compliance(compliance_in, args['directives'], csv_file_string) # Or if Rule compliance was asked for elif args['rules'] != None: rules_compliance(compliance_in, args['rules'], csv_file_string) # If neither has matched, offer some help and exit else: parser.print_help() print('\nERROR: At least one compliance type must be asked for.') sys.exit(1) sys.exit(0)