{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Import and convert Neo23x0 Sigma scripts\n", "ianhelle@microsoft.com\n", "\n", "This notebook is a is a quick and dirty Sigma to Log Analytics converter.\n", "It uses the modules from sigmac package to do the conversion.\n", "\n", "Only a subset of the Sigma rules are convertible currently. Failure to convert\n", "could be for one or more of these reasons:\n", "- known limitations of the converter\n", "- mismatch between the syntax expressible in Sigma and KQL\n", "- data sources referenced in Sigma rules do not yet exist in Azure Sentinel\n", "\n", "The sigmac tool is downloadable as a package from PyPi but since we are downloading\n", "the rules from the repo, we also copy and import the package from the repo source.\n", "\n", "After conversion you can use an interactive browser to step through the rules and\n", "view (and copy/save) the KQL equivalents. You can also take the conversion results and \n", "use them in another way (e.g.bulk save to files).\n", "\n", "The notebook is all somewhat experimental and offered as-is without any guarantees" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Download and unzip the Sigma repo" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import requests\n", "# Download the repo ZIP\n", "sigma_git_url = 'https://github.com/Neo23x0/sigma/archive/master.zip'\n", "r = requests.get(sigma_git_url)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from ipywidgets import widgets, Layout\n", "import os\n", "from pathlib import Path\n", "def_path = Path.joinpath(Path(os.getcwd()), \"sigma\")\n", "path_wgt = widgets.Text(value=str(def_path), \n", " description='Path to extract to zipped repo files: ', \n", " layout=Layout(width='50%'),\n", " style={'description_width': 'initial'})\n", "path_wgt" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import zipfile\n", "import io\n", "repo_zip = io.BytesIO(r.content)\n", "\n", "zip_archive = zipfile.ZipFile(repo_zip, mode='r')\n", "zip_archive.extractall(path=path_wgt.value)\n", "RULES_REL_PATH = 'sigma-master/rules'\n", "rules_root = Path(path_wgt.value) / RULES_REL_PATH" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Check that we have the files\n", "You should see a folder with folders such as application, apt, windows..." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "%ls {rules_root}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Convert Sigma Files to Log Analytics Kql queries" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "# Read the Sigma YAML file paths into a dict and make a\n", "# a copy for the target Kql queries\n", "from pathlib import Path\n", "from collections import defaultdict\n", "import copy\n", "\n", "def get_rule_files(rules_root):\n", " file_dict = defaultdict(dict)\n", " for file in Path(rules_root).resolve().rglob(\"*.yml\"):\n", " rel_path = Path(file).relative_to(rules_root)\n", " path_key = '.'.join(rel_path.parent.parts)\n", " file_dict[path_key][rel_path.name] = file\n", " return file_dict\n", " \n", "sigma_dict = get_rule_files(rules_root)\n", "kql_dict = copy.deepcopy(sigma_dict)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Add downloaded sigmac tool to sys.path and import Sigmac functions\n", "import os\n", "import sys\n", "module_path = os.path.abspath(os.path.join('sigma/sigma-master/tools'))\n", "if module_path not in sys.path:\n", " sys.path.append(module_path)\n", "from sigma.parser.collection import SigmaCollectionParser\n", "from sigma.parser.exceptions import SigmaCollectionParseError, SigmaParseError\n", "from sigma.configuration import SigmaConfiguration, SigmaConfigurationChain\n", "from sigma.config.exceptions import SigmaConfigParseError, SigmaRuleFilterParseException\n", "from sigma.filter import SigmaRuleFilter\n", "import sigma.backends.discovery as backends\n", "from sigma.backends.base import BackendOptions\n", "from sigma.backends.exceptions import BackendError, NotSupportedError, PartialMatchError, FullMatchError" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "# Sigma to Log Analytics Conversion\n", "import yaml\n", "_LA_MAPPINGS = '''\n", "fieldmappings:\n", " Image: NewProcessName\n", " ParentImage: ParentProcessName\n", " ParentCommandLine: NO_MAPPING\n", "'''\n", "\n", "NOT_CONVERTIBLE = 'Not convertible'\n", "\n", "def sigma_to_la(file_path):\n", " with open(file_path, 'r') as input_file:\n", " try:\n", " sigmaconfigs = SigmaConfigurationChain()\n", " sigmaconfig = SigmaConfiguration(_LA_MAPPINGS)\n", " sigmaconfigs.append(sigmaconfig)\n", " backend_options = BackendOptions(None, None)\n", " backend = backends.getBackend('ala')(sigmaconfigs, backend_options)\n", " parser = SigmaCollectionParser(input_file, sigmaconfigs, None)\n", " results = parser.generate(backend)\n", " kql_result = ''\n", " for result in results:\n", " kql_result += result\n", " except (NotImplementedError, NotSupportedError):\n", " kql_result = NOT_CONVERTIBLE\n", " input_file.seek(0,0)\n", " sigma_txt = input_file.read()\n", " if not kql_result == NOT_CONVERTIBLE:\n", " try:\n", " kql_header = \"\\n\".join(get_sigma_properties(sigma_txt))\n", " kql_result = kql_header + \"\\n\" + kql_result\n", " except Exception as e:\n", " print(\"exception reading sigma YAML: \", e)\n", " print(sigma_txt, kql_result, sep='\\n')\n", " return sigma_txt, kql_result\n", "\n", "sigma_keys = ['title', 'description', 'tags', 'status', \n", " 'author', 'logsource', 'falsepositives', 'level']\n", "\n", "def get_sigma_properties(sigma_rule):\n", " sigma_docs = yaml.load_all(sigma_rule, Loader=yaml.SafeLoader)\n", " sigma_rule_dict = next(sigma_docs)\n", " for prop in sigma_keys:\n", " yield get_property(prop, sigma_rule_dict)\n", "\n", "def get_property(name, sigma_rule_dict):\n", " sig_prop = sigma_rule_dict.get(name, 'na')\n", " if isinstance(sig_prop, dict):\n", " sig_prop = ' '.join([f\"{k}: {v}\" for k, v in sig_prop.items()])\n", " return f\"// {name}: {sig_prop}\"\n", " \n", " \n", "_KQL_FILTERS = {\n", " 'date': ' | where TimeGenerated >= datetime({start}) and TimeGenerated <= datetime({end}) ',\n", " 'host': ' | where Computer has {host_name} '\n", "}\n", "\n", "def insert_at(source, insert, find_sub):\n", " pos = source.find(find_sub)\n", " if pos != -1:\n", " return source[:pos] + insert + source[pos:]\n", " else:\n", " return source + insert\n", " \n", "def add_filter_clauses(source, **kwargs):\n", " if \"{\" in source or \"}\" in source:\n", " source = (\"// Warning: embedded braces in source. Please edit if necessary.\\n\"\n", " + source)\n", " source = source.replace('{', '{{').replace('}', '}}')\n", " if kwargs.get('host', False):\n", " source = insert_at(source, _KQL_FILTERS['host'], '|')\n", " if kwargs.get('date', False):\n", " source = insert_at(source, _KQL_FILTERS['date'], '|')\n", " return source\n", "\n", "\n", "# Run the conversion\n", "conv_counter = {}\n", "for categ, sources in sigma_dict.items():\n", " src_converted = 0\n", " for file_name, file_path in sources.items():\n", " sigma, kql = sigma_to_la(file_path)\n", " kql_dict[categ][file_name] = (sigma, kql)\n", " if not kql == NOT_CONVERTIBLE:\n", " src_converted += 1\n", " conv_counter[categ] = (len(sources), src_converted)\n", " \n", "print(\"Conversion statistics\")\n", "print(\"-\" * len(\"Conversion statistics\"))\n", "print('\\n'.join([f'{categ}: rules: {counter[0]}, converted: {counter[1]}'\n", " for categ, counter in conv_counter.items()]))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Display the results in an interactive browser" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "from ipywidgets import widgets, Layout\n", "\n", "# Browser Functions\n", "def on_cat_value_change(change):\n", " queries_w.options = kql_dict[change['new']].keys()\n", " queries_w.value = queries_w.options[0]\n", "\n", "def on_query_value_change(change):\n", " if view_qry_check.value:\n", " qry_text = kql_dict[sub_cats_w.value][queries_w.value][1]\n", " if \"Not convertible\" not in qry_text:\n", " qry_text = add_filter_clauses(qry_text,\n", " date=add_date_filter_check.value,\n", " host=add_host_filter_check.value)\n", " query_text_w.value = qry_text.replace('|', '\\n|')\n", " orig_text_w.value = kql_dict[sub_cats_w.value][queries_w.value][0]\n", "\n", "def on_view_query_value_change(change):\n", " vis = 'visible' if view_qry_check.value else 'hidden'\n", " on_query_value_change(None)\n", " query_text_w.layout.visibility = vis\n", " orig_text_w.layout.visibility = vis\n", "\n", "# Function defs for ExecuteQuery cell below\n", "def click_exec_hqry(b):\n", " global qry_results\n", " query_name = queries_w.value\n", " query_cat = sub_cats_w.value\n", " query_text = query_text_w.value\n", " query_text = query_text.format(**qry_wgt.query_params)\n", "\n", " disp_results(query_text)\n", " \n", "def disp_results(query_text):\n", " out_wgt.clear_output()\n", " with out_wgt:\n", " print(\"Running query...\", end=' ')\n", " qry_results = execute_kql_query(query_text)\n", " print(f'done. {len(qry_results)} rows returned.')\n", " display(qry_results)\n", " \n", "exec_hqry_button = widgets.Button(description=\"Execute query..\")\n", "out_wgt = widgets.Output() #layout=Layout(width='100%', height='200px', visiblity='visible'))\n", "exec_hqry_button.on_click(click_exec_hqry)\n", "\n", "# Browser widget setup\n", "categories = list(sorted(kql_dict.keys()))\n", "sub_cats_w = widgets.Select(options=categories, \n", " description='Category : ',\n", " layout=Layout(width='30%', height='120px'),\n", " style = {'description_width': 'initial'})\n", "\n", "queries_w = widgets.Select(options = kql_dict[categories[0]].keys(),\n", " description='Query : ',\n", " layout=Layout(width='30%', height='120px'),\n", " style = {'description_width': 'initial'})\n", "\n", "query_text_w = widgets.Textarea(\n", " value='',\n", " description='Kql Query:',\n", " layout=Layout(width='100%', height='300px', visiblity='hidden'),\n", " disabled=False)\n", "orig_text_w = widgets.Textarea(\n", " value='',\n", " description='Sigma Query:',\n", " layout=Layout(width='100%', height='250px', visiblity='hidden'),\n", " disabled=False)\n", "\n", "query_text_w.layout.visibility = 'hidden'\n", "orig_text_w.layout.visibility = 'hidden'\n", "sub_cats_w.observe(on_cat_value_change, names='value')\n", "queries_w.observe(on_query_value_change, names='value')\n", "\n", "view_qry_check = widgets.Checkbox(description=\"View query\", value=True)\n", "add_date_filter_check = widgets.Checkbox(description=\"Add date filter\", value=False)\n", "add_host_filter_check = widgets.Checkbox(description=\"Add host filter\", value=False)\n", "\n", "view_qry_check.observe(on_view_query_value_change, names='value')\n", "add_date_filter_check.observe(on_view_query_value_change, names='value')\n", "add_host_filter_check.observe(on_view_query_value_change, names='value')\n", "# view_qry_button.on_click(click_exec_hqry)\n", "# display(exec_hqry_button);\n", "\n", "vbox_opts = widgets.VBox([view_qry_check, add_date_filter_check, add_host_filter_check])\n", "hbox = widgets.HBox([sub_cats_w, queries_w, vbox_opts])\n", "vbox = widgets.VBox([hbox, orig_text_w, query_text_w])\n", "on_view_query_value_change(None)\n", "display(vbox)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Click the `Execute query` button to run the currently display query\n", "**Notes:**\n", "- To run the queries, first authenticate to Log Analytics (scroll down and execute remaining cells in the notebook)\n", "- If you added a date filter to the query set the date range below" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "from msticpy.nbtools.nbwidgets import QueryTime\n", "qry_wgt = QueryTime(units='days', before=5, after=0, max_before=30, max_after=10)\n", "vbox = widgets.VBox([exec_hqry_button, out_wgt])\n", "display(vbox)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Set Query Time bounds" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "qry_wgt.display()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Authenticate to Azure Sentinel" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def clean_kql_comments(query_string):\n", " \"\"\"Cleans\"\"\"\n", " import re\n", " return re.sub(r'(//[^\\n]+)', '', query_string, re.MULTILINE).replace('\\n', '').strip()\n", "\n", "def execute_kql_query(query_string):\n", " if not query_string or len(query_string.strip()) == 0:\n", " print('No query supplied')\n", " return None\n", " src_query = clean_kql_comments(query_string)\n", " result = get_ipython().run_cell_magic('kql', line='', cell=src_query)\n", " \n", " if result is not None and result.completion_query_info['StatusCode'] == 0:\n", " results_frame = result.to_dataframe()\n", " return results_frame\n", " return []" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "from msticpy.nbtools.wsconfig import WorkspaceConfig\n", "from msticpy.nbtools import kql, GetEnvironmentKey\n", "\n", "ws_config_file = 'config.json'\n", "try:\n", " ws_config = WorkspaceConfig(ws_config_file)\n", " print('Found config file')\n", " for cf_item in ['tenant_id', 'subscription_id', 'resource_group', 'workspace_id', 'workspace_name']:\n", " print(cf_item, ws_config[cf_item])\n", "except:\n", " ws_config = None\n", "\n", "ws_id = GetEnvironmentKey(env_var='WORKSPACE_ID',\n", " prompt='Log Analytics Workspace Id:')\n", "if ws_config:\n", " ws_id.value = ws_config['workspace_id']\n", "ws_id.display()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "try:\n", " WORKSPACE_ID = select_ws.value\n", "except NameError:\n", " try:\n", " WORKSPACE_ID = ws_id.value\n", " except NameError:\n", " WORKSPACE_ID = None\n", " \n", "if not WORKSPACE_ID:\n", " raise ValueError('No workspace selected.')\n", "\n", "kql.load_kql_magic()\n", "\n", "%kql loganalytics://code().workspace(WORKSPACE_ID)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Save All Converted Files" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "path_save_wgt = widgets.Text(value=str(def_path) + \"_kql_out\",\n", " description='Path to save KQL files: ',\n", " layout=Layout(width='50%'),\n", " style={'description_width': 'initial'})\n", "path_save_wgt" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "root = Path(path_save_wgt.value)\n", "root.mkdir(exist_ok=True)\n", "for categ, kql_files in kql_dict.items():\n", " sub_dir = root.joinpath(categ)\n", " \n", " for file_name, contents in kql_files.items():\n", " kql_txt = contents[1]\n", " if not kql_txt == NOT_CONVERTIBLE:\n", " sub_dir.mkdir(exist_ok=True)\n", " file_path = sub_dir.joinpath(file_name.replace('.yml', '.kql'))\n", " with open(file_path, 'w') as output_file:\n", " output_file.write(kql_txt)\n", " print(f\"Saved {file_path}\")\n" ] } ], "metadata": { "hide_input": false, "kernelspec": { "display_name": "Python 3.6", "language": "python", "name": "python36" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.5" }, "toc": { "base_numbering": 1, "nav_menu": {}, "number_sections": false, "sideBar": true, "skip_h1_title": false, "title_cell": "Table of Contents", "title_sidebar": "Contents", "toc_cell": false, "toc_position": {}, "toc_section_display": true, "toc_window_display": false }, "varInspector": { "cols": { "lenName": 16, "lenType": 16, "lenVar": 40 }, "kernels_config": { "python": { "delete_cmd_postfix": "", "delete_cmd_prefix": "del ", "library": "var_list.py", "varRefreshCmd": "print(var_dic_list())" }, "r": { "delete_cmd_postfix": ") ", "delete_cmd_prefix": "rm(", "library": "var_list.r", "varRefreshCmd": "cat(var_dic_list()) " } }, "types_to_exclude": [ "module", "function", "builtin_function_or_method", "instance", "_Feature" ], "window_display": false } }, "nbformat": 4, "nbformat_minor": 2 }