{ "cells": [ { "cell_type": "markdown", "metadata": { "toc": true }, "source": [ "

Table of Contents

\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Alert Triage Notebook\n", "\n", "**Notebook Version:** 1.1
\n", "**Python Version:** Python 3.6 (including Python 3.6 - AzureML)
\n", "**Data Sources Required:** SecurityAlerts
\n", "\n", " \n", "This Notebook assists analysts in triage Alerts within Microsoft Sentinel by enriching them with Threat Intelligence and OSINT data. This purpose it to allow analysts to quickly triage a large number of alerts and identify those to focus investigation on.\n", "\n", "**How to use:**
\n", "Run the cells in this Notebook in order, at various points in the Notebook flow you will be prompted to enter or select options relevant to the scope of your triage.
\n", "This Notebook presumes you have Microsoft Sentinel Workspace settings and Threat Intelligence providers configured in a config file. If you do not have this in place please refer https://msticpy.readthedocs.io/en/latest/getting_started/msticpyconfig.html# to https://github.com/Azure/Azure-Sentinel-Notebooks/blob/master/ConfiguringNotebookEnvironment.ipynb and to set this up." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "### Notebook initialization\n", "The next cell:\n", "- Checks for the correct Python version\n", "- Checks versions and optionally installs required packages\n", "- Imports the required packages into the notebook\n", "- Sets a number of configuration options.\n", "\n", "This should complete without errors. If you encounter errors or warnings look at the following two notebooks:\n", "- [TroubleShootingNotebooks](https://github.com/Azure/Azure-Sentinel-Notebooks/blob/master/TroubleShootingNotebooks.ipynb)\n", "- [ConfiguringNotebookEnvironment](https://github.com/Azure/Azure-Sentinel-Notebooks/blob/master/ConfiguringNotebookEnvironment.ipynb)\n", "\n", "If you are running in the Microsoft Sentinel Notebooks environment (Azure Notebooks or Azure ML) you can run live versions of these notebooks:\n", "- [Run TroubleShootingNotebooks](./TroubleShootingNotebooks.ipynb)\n", "- [Run ConfiguringNotebookEnvironment](./ConfiguringNotebookEnvironment.ipynb)\n", "\n", "You may also need to do some additional configuration to successfully use functions such as Threat Intelligence service lookup and Geo IP lookup. \n", "There are more details about this in the `ConfiguringNotebookEnvironment` notebook and in these documents:\n", "- [msticpy configuration](https://msticpy.readthedocs.io/en/latest/getting_started/msticpyconfig.html)\n", "- [Threat intelligence provider configuration](https://msticpy.readthedocs.io/en/latest/data_acquisition/TIProviders.html#configuration-file)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2020-03-20T17:27:16.928865Z", "start_time": "2020-03-20T17:27:00.160439Z" } }, "outputs": [], "source": [ "from pathlib import Path\r\n", "from IPython.display import display, HTML\r\n", "\r\n", "REQ_PYTHON_VER=(3, 6)\r\n", "REQ_MSTICPY_VER=(1, 0, 0)\r\n", "\r\n", "\r\n", "display(HTML(\"

Starting Notebook setup...

\"))\r\n", " \r\n", "\r\n", "# If not using Azure Notebooks, install msticpy with\r\n", "# %pip install msticpy\r\n", "from msticpy.nbtools import nbinit\r\n", "extra_imports = [\r\n", " \"whois\", \r\n", " \"datetime,,dt\",\r\n", " \"msticpy.nbtools.foliummap, get_center_ip_entities\",\r\n", " \"msticpy.nbtools.observationlist, Observations\",\r\n", " \"msticpy.nbtools.observationlist, Observation\",\r\n", " \"msticpy.sectools.ip_utils, convert_to_ip_entities\"\r\n", "]\r\n", "nbinit.init_notebook(\r\n", " namespace=globals(),\r\n", " additional_packages=[\"IPWhois\", \"tldextract\", \"python-whois\"],\r\n", " extra_imports=extra_imports,\r\n", ")\r\n", "pd.options.mode.chained_assignment = None" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Initialize TI and Observation list" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2020-03-20T17:52:34.135544Z", "start_time": "2020-03-20T17:52:24.354Z" }, "scrolled": true }, "outputs": [], "source": [ "# Initialize observations and TI modules\n", "summary = Observations()\n", "ti = TILookup()\n", "print('Observation summary and TILookup loaded.')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Connect to your Microsoft Sentinel Workspace\n", "This cell collects Workspace details contained in your msticpyconfig.yaml file and uses them to authenticate." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2020-03-20T17:27:56.110152Z", "start_time": "2020-03-20T17:27:54.540327Z" } }, "outputs": [], "source": [ "# See if we have a Microsoft Sentinel Workspace defined in our config file.\n", "# If not, let the user specify Workspace and Tenant IDs\n", "\n", "ws_config = WorkspaceConfig()\n", "if not ws_config.config_loaded:\n", " ws_config.prompt_for_ws()\n", " \n", "qry_prov = QueryProvider(data_environment=\"AzureSentinel\")\n", "print(\"done\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Authenticate to Microsoft Sentinel workspace\n", "qry_prov.connect(ws_config)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Investigation Information\n", "\n", "Enter some information relevant to your triage work , this will then be stored as part of this Notebook for future reference and recall. \n", "Please also select which Threat Intelligence providers to use for enrichment. Please note you need to have auth details for each provider in order for this to operate. You can select one or more providers or select \"All\" to use all avaliable providers." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2020-03-20T17:27:56.164662Z", "start_time": "2020-03-20T17:27:56.111153Z" } }, "outputs": [], "source": [ "#Collect details for triage record\n", "md(\"Enter Name:\")\n", "name = widgets.Text()\n", "display(name)\n", "md(\"Enter Ticket ID: \")\n", "ticket = widgets.Text()\n", "display(ticket)\n", "md(\"Enter Description:\")\n", "description = widgets.Textarea()\n", "display(description)\n", "\n", "# Get list of configured TI providers and filter out non TI enrichments\n", "ti_provs = [x for x in ti.configured_providers if not x == \"OPR\" and not x == \"Tor\"]\n", "if not ti_provs:\n", " raise Exception(\"\"\"You do not have any Threat Intelligence providers configured. \n", " Please refer to https://msticpy.readthedocs.io/en/latest/data_acquisition/TIProviders.html on how to configure them.\"\"\")\n", "\n", "md(\"Select TI providers to use for enrichment.\")\n", "sel_ti_provs = widgets.SelectMultiple(\n", " options=ti_provs + [\"All\"],\n", " value=['All'],\n", " description='TI providers:',\n", " disabled=False\n", ")\n", "display(sel_ti_provs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Set the time window you wish to triage events from:\n", "\n", "Adjust the time slider to select the timeframe for which you wish to triage alerts for." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2020-03-20T17:27:56.235659Z", "start_time": "2020-03-20T17:27:56.166659Z" } }, "outputs": [], "source": [ "# Set list of TI providers to use and record this in our summary record alongside triage details entered above.\n", "if \"All\" in sel_ti_provs.value:\n", " ti_prov_use = ti_provs\n", "else:\n", " ti_prov_use = list(sel_ti_provs.value)\n", "invest_summary = Observation(caption=\"Investigation Details\", data=\n", " {\"Analyst\" : name.value, \"Ticket\": ticket.value, \"Investigation Description\" :description.value,\n", " \"Investigation Date\": dt.datetime.now(), \"TI Providers\": ti_prov_use})\n", "summary.add_observation(invest_summary)\n", "\n", "# Widget to select time window in\n", "query_times = nbwidgets.QueryTime(units='day',\n", " max_before=30, max_after=1, before=3)\n", "query_times.display()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Select the alert types you are interested in:\n", "\n", "You can choose to select a subset of alerts based on provider in order to narrow your triage scope. You can also select \"All\" to return security alerts from all providers. Once a provider is selected you can additionally filter by Alert Name in order to focus on a specific alert type." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2020-03-20T17:28:02.017533Z", "start_time": "2020-03-20T17:27:56.236659Z" } }, "outputs": [], "source": [ "# Collect alerts based on the scope set above\n", "alerts = qry_prov.SecurityAlert.list_alerts(query_times)\n", "alerts_summ = Observation(caption=\"Alerts\", data={\"Data\" : alerts, \"Times\": query_times })\n", "summary.add_observation(alerts_summ)\n", "# display summary of alerts retrieved\n", "md(\"Alert summary\", \"large\")\n", "display(alerts.groupby(\"ProviderName\")[[\"AlertName\"]]\n", " .count()\n", " .reset_index()\n", " .rename(columns={\"AlertName\": \"Alerts\"})\n", ")\n", "def update_alert_names(_):\n", " selected_alert_type = sel_alerts.value\n", " if sel_prov.value != \"All\": \n", " alert_names = alerts[alerts['ProviderName']==sel_prov.value]['AlertName'].unique()\n", " else:\n", " alert_names = alerts['AlertName'].unique()\n", " alert_names = np.append(alert_names, [\"All\"])\n", " sel_alerts.options = alert_names\n", " if selected_alert_type in alert_names:\n", " sel_alerts.value = selected_alert_type\n", " else:\n", " sel_alerts.value = \"All\"\n", " \n", "if alerts.empty:\n", " md(f\"No alerts in this Workspace during between {query_times.start} and {query_times.end}\", \"bold\")\n", "else:\n", " w_layout = list_layout = widgets.Layout(width=\"400px\")\n", " #Select Provider to filter by\n", " providers = alerts['ProviderName'].unique()\n", " providers = np.append(providers, [\"All\"])\n", " sel_prov = widgets.Dropdown(\n", " options=providers,\n", " description='Providers:',\n", " disabled=False,\n", " layout=w_layout,\n", " )\n", " sel_prov.observe(update_alert_names, names=\"value\")\n", " alert_names = alerts[alerts['ProviderName']==sel_prov.value]['AlertName'].unique()\n", " alert_names = np.append(alert_names, [\"All\"])\n", " sel_alerts = widgets.Dropdown(\n", " options=alert_names,\n", " description='Alert Names:',\n", " disabled=False,\n", " value = \"All\",\n", " layout=w_layout,\n", " )\n", " md(\"Select provider and/or Alert type to triage\", \"large\")\n", " display(widgets.VBox([sel_prov, sel_alerts]))\n", " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Lookup Threat Intel reports for selected alerts\n", "Once alerts are collected we can enrich these alerts by looking up the entities associated with these alerts in Threat Intelligence. The TI Risk column in the table below represents an aggregation of results from the selected TI providers." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2020-03-20T17:28:10.278108Z", "start_time": "2020-03-20T17:28:02.019530Z" } }, "outputs": [], "source": [ "import json\n", "from tqdm.notebook import tqdm\n", "\n", "# Filter alerts based on AlertName and Provider\n", "if sel_prov.value == \"All\":\n", " sent_alerts = alerts\n", "else:\n", " sent_alerts = alerts[alerts['ProviderName'] == sel_prov.value]\n", "if sel_alerts.value == \"All\":\n", " selected_alert_type = sent_alerts\n", "else:\n", " selected_alert_type = sent_alerts[sent_alerts['AlertName']== sel_alerts.value]\n", "\n", "def entity_load(entity):\n", " try:\n", " return json.loads(entity)\n", " except json.JSONDecodeError:\n", " return None\n", "\n", "selected_alert_type['Entities'] = selected_alert_type['Entities'].apply(entity_load)\n", "\n", "# Lookup each entity in TI and aggregate results into a overall severity based on the highest indicator severity.\n", "def lookup(row):\n", " sev = []\n", " if row['Entities'] is not None:\n", " for entity in row['Entities']:\n", " try:\n", " if entity[\"Type\"] == 'ip' or entity[\"Type\"] == 'ipaddress':\n", " resp = ti.lookup_ioc(observable=entity[\"Address\"], providers=ti_prov_use)\n", " elif entity[\"Type\"] == 'url':\n", " resp = ti.lookup_ioc(observable=entity[\"Url\"], providers=ti_prov_use)\n", " else:\n", " resp = None \n", " if resp:\n", " for response in resp[1]:\n", " sev.append(response[1].severity)\n", " except KeyError:\n", " pass\n", " \n", " if 'high' in sev:\n", " severity = \"High\"\n", " elif 'warning' in sev:\n", " severity = \"Warning\"\n", " elif 'information' in sev:\n", " severity = \"Information\"\n", " else:\n", " severity = \"None\"\n", " return severity\n", "\n", "\n", "# Highlight cells based on Threat Intelligence results. \n", "def color_cells(val):\n", " if isinstance(val, str):\n", " if val.lower() == \"high\":\n", " color = 'Red'\n", " elif val.lower() == 'warning':\n", " color = 'Orange'\n", " elif val.lower() == 'information':\n", " color = 'Green'\n", " else:\n", " color = 'none'\n", " else:\n", " color = 'none'\n", " return 'background-color: %s' % color \n", "\n", "tqdm.pandas(desc=\"Lookup progress\")\n", "\n", "selected_alert_type['TI Risk'] = selected_alert_type.progress_apply(lookup, axis=1)\n", "display(selected_alert_type[['StartTimeUtc','AlertName','Severity','TI Risk', 'Description']]\n", " .sort_values(by=['StartTimeUtc']).style.applymap(color_cells).hide_index())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Pick an Alert to Examine" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can drill down into a specific alert by selecting it from the list below. This will return additional details on the alert as well as details of any threat intelligence matches." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2020-03-20T17:28:10.859462Z", "start_time": "2020-03-20T17:28:10.280107Z" } }, "outputs": [], "source": [ "from msticpy.sectools.ip_utils import convert_to_ip_entities\n", "from msticpy.nbtools.foliummap import FoliumMap, get_center_ip_entities\n", "\n", "#Display full alert details when selected\n", "def show_full_alert(selected_alert):\n", " global security_alert, alert_ip_entities\n", " output = []\n", " security_alert = SecurityAlert(\n", " rel_alert_select.selected_alert)\n", " output.append(nbdisplay.format_alert(security_alert, show_entities=True))\n", " ioc_list = []\n", " if security_alert['Entities'] is not None:\n", " for entity in security_alert['Entities']:\n", " if entity['Type'] == 'ipaddress' or entity['Type'] == 'ip':\n", " ioc_list.append(entity['Address'])\n", " elif entity[\"Type\"] == 'url':\n", " ioc_list.append(entity['Url'])\n", " if len(ioc_list) > 0:\n", " ti_data = ti.lookup_iocs(data=ioc_list, providers=ti_prov_use)\n", " output.append(ti_data[['Ioc','IocType','Provider','Result','Severity','Details']].reset_index().style.applymap(color_cells).hide_index())\n", " ti_ips = ti_data[ti_data['IocType'] == 'ipv4']\n", " # If we have IP entities try and plot these on a map\n", " if not ti_ips.empty:\n", " ip_ents = [convert_to_ip_entities(i)[0] for i in ti_ips['Ioc'].unique()]\n", " center = get_center_ip_entities(ip_ents)\n", " ip_map = FoliumMap(location=center, zoom_start=4)\n", " ip_map.add_ip_cluster(ip_ents, color='red')\n", " output.append(ip_map)\n", " else:\n", " output.append(\"\")\n", " else:\n", " output.append(\"No IoCs\")\n", " else:\n", " output.append(\"No Entities with IoCs\")\n", " return output\n", " \n", "# Show selected alert when selected\n", "if isinstance(alerts, pd.DataFrame) and not alerts.empty:\n", " ti_data = None\n", " md('Click on alert to view details.', \"large\")\n", " rel_alert_select = nbwidgets.SelectAlert(alerts=selected_alert_type,\n", " action=show_full_alert)\n", " rel_alert_select.display()\n", " # Add alert details to summary.\n", " if ti_data is not None:\n", " alert_details = Observation(caption=\"Alert Details\", data={\"Alert\":security_alert, \"TI\":ti_data})\n", " else:\n", " alert_details = Observation(caption=\"Alert Details\", data=security_alert)\n", " summary.add_observation(alert_details)\n", "else:\n", " md('No alerts found.')\n", " \n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Alerts Timeline\n", "The cell below displays a timeline of the alerts you are triaging, with the selected alert highlighted in order to provide context on the alert." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2020-03-20T17:28:11.161979Z", "start_time": "2020-03-20T17:28:10.862462Z" } }, "outputs": [], "source": [ "# Display timeline of all alerts grouped by the TI risk score of them\n", "selected_alert = Observation(caption=\"Alert Details\", data=rel_alert_select.selected_alert)\n", "summary.add_observation(selected_alert)\n", "\n", "if len(selected_alert_type) == 1:\n", " md(\"Only one alert in selected alert provider/type - can't display timeline.\")\n", "else:\n", " nbdisplay.display_timeline(\n", " data=selected_alert_type, time_column=\"StartTimeUtc\",\n", " group_by=\"TI Risk\", source_columns=[\"AlertName\"], \n", " alert=rel_alert_select.selected_alert, title=\"Alerts over time grouped by TI risk score\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Next Steps\n", "\n", "Now that we have selected an alert of interest and triage key details we need to identify next investigative steps. The cell below identifies and extracts key entities from the selected alert. It provides additional enrichment to them using OSINT and based on their type recommends an additional Notebook to run for further investigation based on the Notebooks available at https://github.com/Azure/Azure-Sentinel-Notebooks/ or via the Microsoft Sentinel portal." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2020-03-20T17:28:11.183976Z", "start_time": "2020-03-20T17:28:11.165977Z" } }, "outputs": [], "source": [ "from ipwhois import IPWhois\n", "import whois\n", "from ipaddress import ip_address\n", "import tldextract\n", "\n", "# Based on the extracted entity enrich it with OSINT\n", "def enhance(row):\n", " if row['Type'] == \"ipaddress\":\n", " return whois_desc(row['Entity'])\n", " elif row['Type'] == \"host\":\n", " return host_sum(row['Entity'])\n", " elif row['Type'] == \"url\":\n", " return whois_url(row['Entity'])\n", " \n", "# If entity is a hostname, get key details of the host.\n", "def host_sum(host):\n", " hb_q = f\"Heartbeat | where TimeGenerated > datetime({query_times.start}) and TimeGenerated < datetime({query_times.end}) | where Computer == '{host}' | take 1\"\n", " hb = qry_prov.exec_query(hb_q)\n", " if not hb.empty:\n", " hb_str = f\"{host} - {hb['ComputerIP'][0]} - {hb['OSType'][0]} - {hb['ComputerEnvironment'][0]}\"\n", " else:\n", " hb_str = \"No host heartbeat\"\n", " return hb_str\n", " \n", "# If entity is IP address work out what type of address it is and if a public IP address get ASN name.\n", "def whois_desc(ip_lookup, progress=False):\n", " try:\n", " ip = ip_address(ip_lookup)\n", " except ValueError:\n", " return \"Not an IP Address\"\n", " if ip.is_private:\n", " return \"Private address space\"\n", " if not ip.is_global:\n", " return \"Other address space\"\n", " ip_whois = IPWhois(ip)\n", " whois_result = ip_whois.lookup_whois()\n", " return whois_result[\"asn_description\"]\n", " \n", "# If entity is a URL get the name of the organisation that registered the domain.\n", "def whois_url(url):\n", " _, domain,tld = tldextract.extract(url)\n", " wis = whois.whois(f\"{domain}.{tld}\")\n", " return wis['org']\n", " \n", "# Based on the entity type suggest a Notebook for future investigation.\n", "def notebook_suggestor(row):\n", " if row['Type'] in notebooks.keys():\n", " return notebooks[row['Type']]\n", " else:\n", " return \"Write your own Notebook\" \n", "\n", "notebooks = {\"ipaddress\" : \"Entity Explorer - IP Address\",\n", " \"host\" : \"Entity Explorer - Linux Host/Windows Host\",\n", " \"account\" : \"Entity Explorer - Account\",\n", " \"url\" : \"Entity Explorer - Domain and URL\"}\n", " \n", "md('Entities for further investigation:', 'bold')\n", "ents = security_alert.get_all_entities()\n", "if not ents.empty:\n", " ents['Notebook'] = ents.apply(notebook_suggestor, axis=1)\n", " ents['Enrichment'] = ents.apply(enhance, axis=1)\n", " display(ents.style.hide_index())\n", "\n", " # Save entity details into our summary.\n", " entities = Observation(caption=\"Entities for further investigation\", data=ents)\n", " summary.add_observation(entities)\n", "else:\n", " md('No entities found in this alert')" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2020-03-20T17:28:11.192978Z", "start_time": "2020-03-20T17:28:11.187980Z" } }, "outputs": [], "source": [ "#Uncomment the line below to see a summary of this Notebook's output\n", "#summary.display_observations()" ] } ], "metadata": { "hide_input": false, "kernelspec": { "display_name": "Python 3.8 - AzureML", "language": "python", "name": "python38-azureml" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.7" }, "latex_envs": { "LaTeX_envs_menu_present": true, "autoclose": false, "autocomplete": true, "bibliofile": "biblio.bib", "cite_by": "apalike", "current_citInitial": 1, "eqLabelWithNumbers": true, "eqNumInitial": 1, "hotkeys": { "equation": "Ctrl-E", "itemize": "Ctrl-I" }, "labels_anchors": false, "latex_user_defs": false, "report_style_numbering": false, "user_envs_cfg": false }, "toc": { "base_numbering": 1, "nav_menu": {}, "number_sections": false, "sideBar": true, "skip_h1_title": false, "title_cell": "Table of Contents", "title_sidebar": "Contents", "toc_cell": true, "toc_position": {}, "toc_section_display": true, "toc_window_display": false }, "varInspector": { "cols": { "lenName": 16, "lenType": 16, "lenVar": 40 }, "kernels_config": { "python": { "delete_cmd_postfix": "", "delete_cmd_prefix": "del ", "library": "var_list.py", "varRefreshCmd": "print(var_dic_list())" }, "r": { "delete_cmd_postfix": ") ", "delete_cmd_prefix": "rm(", "library": "var_list.r", "varRefreshCmd": "cat(var_dic_list()) " } }, "types_to_exclude": [ "module", "function", "builtin_function_or_method", "instance", "_Feature" ], "window_display": false }, "widgets": { "application/vnd.jupyter.widget-state+json": { "state": {}, "version_major": 2, "version_minor": 0 } } }, "nbformat": 4, "nbformat_minor": 4 }