{
"cells": [
{
"cell_type": "markdown",
"source": [
"# Guided Investigation - Anomaly Lookup\n",
"\n",
"__Notebook Version:__ 2.0
\n",
"__Python Version:__ Python 3.8 - AzureML
\n",
"__Platforms Supported:__ Azure Machine Learning Notebooks\n",
" \n",
"__Data Source Required:__ Log Analytics tables \n",
" \n",
"### Description\n",
"Gain insights into the possible root cause of an alert by searching for related anomalies on the corresponding entities around the alert’s time. This notebook will provide valuable leads for an alert’s investigation, listing all suspicious increase in event counts or their properties around the time of the alert, and linking to the corresponding raw records in Log Analytics for the investigator to focus on and interpret.\n",
"\n",
"You may need to select Python 3.8 - AzureML on Azure Machine Learning Notebooks.\n",
"\n",
"## Table of Contents\n",
"\n",
"1. Initialize Azure Resource Management Clients\n",
"2. Looking up for anomaly entities"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"!pip install azure-monitor-query"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"# Loading Python libraries\n",
"from azure.mgmt.loganalytics import LogAnalyticsManagementClient\n",
"from azure.monitor.query import LogsQueryClient, MetricsQueryClient, LogsQueryStatus\n",
"from azure.identity import AzureCliCredential, DefaultAzureCredential\n",
"\n",
"from datetime import timezone\n",
"import sys\n",
"import timeit\n",
"import datetime as dt\n",
"import pandas as pd\n",
"import copy\n",
"import base64\n",
"import json\n",
"from IPython.display import display, HTML, Markdown\n",
"from cryptography.fernet import Fernet"
],
"outputs": [],
"execution_count": null,
"metadata": {
"collapsed": true,
"gather": {
"logged": 1632434728633
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "markdown",
"source": [
"The following cell has classes and functions for this notebook, code is hidden to unclutter the notebook. please RUN the cell, you may view the code by clicking 'input hidden'."
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"# Classes will be used in this notebook\n",
"class AnomalyQueries():\n",
" \"\"\" KQLs for anomaly lookup \"\"\"\n",
" QUERIES = {}\n",
" QUERIES['LISTTABLES'] = b'dW5pb24gd2l0aHNvdXJjZSA9IFNlbnRpbmVsVGFibGVOYW1lICogfCBkaXN0aW5jdCBTZW50aW5lbFRhYmxlTmFtZSB8IHNvcnQgYnkgU2VudGluZWxUYWJsZU5hbWUgYXNjIA=='\n",
" QUERIES['ISCATCOLUMN'] = b'e3RhYmxlfSB8IGdldHNjaGVtYSB8IHdoZXJlIENvbHVtblR5cGUgaW4gKCdpbnQnLCAnbG9uZycsICdzdHJpbmcnKSB8IHByb2plY3QgQ29sdW1uTmFtZQ=='\n",
" QUERIES['ISCATHEURISTIC'] = b'e3RhYmxlfSB8IHdoZXJlIGluZ2VzdGlvbl90aW1lKCkgPiBhZ28oMWQpIHwgdGFrZSB0b2ludCgxZTgpIHwgc3VtbWFyaXplIGRjID0gZGNvdW50KHtjb2x1bW59KSwgY291bnQoKSB8IHdoZXJlIGRjPCAxMDAwIGFuZCBkYyA+IDEgfCBwcm9qZWN0IHJhdGlvID0gdG9kb3VibGUoZGMpIC8gY291bnRfIHwgd2hlcmUgcmF0aW88IDFlLTIg'\n",
" QUERIES['TIMESERIESANOMALYDETECTION'] = b'bGV0IGZ1bGxEYWlseUNvdW50ID0gbWF0ZXJpYWxpemUoIHt0YWJsZX0gfCBleHRlbmQgVGltZUNyZWF0ZWQgPSBUaW1lR2VuZXJhdGVkIHwgd2hlcmUgVGltZUNyZWF0ZWQgPiBkYXRldGltZSgne21pblRpbWVzdGFtcH0nKSBhbmQgVGltZUNyZWF0ZWQ8ZGF0ZXRpbWUoJ3ttYXhUaW1lc3RhbXB9JykgfCB3aGVyZSB7ZW50Q29sdW1ufSBoYXMgJ3txRW50aXR5fScgfCBtYWtlLXNlcmllcyBjb3VudCgpIGRlZmF1bHQgPSAwIG9uIFRpbWVDcmVhdGVkIGZyb20gZGF0ZXRpbWUoJ3ttaW5UaW1lc3RhbXB9JykgdG8gZGF0ZXRpbWUoJ3ttYXhUaW1lc3RhbXB9Jykgc3RlcCAxZCBieSB7Y29sdW1ufSk7IGZ1bGxEYWlseUNvdW50IHwgZXh0ZW5kKGFub21hbGllcywgYW5vbWFseVNjb3JlLCBleHBlY3RlZENvdW50KSA9IHNlcmllc19kZWNvbXBvc2VfYW5vbWFsaWVzKGNvdW50XywxLjUsLTEsJ2F2ZycsMSkgfCB3aGVyZSBhbm9tYWx5U2NvcmVbLTFdID4gMS41IHwgd2hlcmUgdG9pbnQoY291bnRfWy0xXSkgPiB0b2RvdWJsZShleHBlY3RlZENvdW50Wy0xXSkgfCBtdi1hcHBseSBhbm9tYWxpZXMgdG8gdHlwZW9mKGxvbmcpIG9uIChzdW1tYXJpemUgdG90QW5vbWFsaWVzPXN1bShhbm9tYWxpZXMpKSB8IHdoZXJlIHRvdEFub21hbGllcyA8IDUgfCBwcm9qZWN0IHFFbnRpdHkgPSAne3FFbnRpdHl9JywgcVRpbWVzdGFtcCA9IGRhdGV0aW1lKCd7cVRpbWVzdGFtcH0nKSwgbWluVGltZXN0YW1wID0gZGF0ZXRpbWUoJ3ttaW5UaW1lc3RhbXB9JyksIG1heFRpbWVzdGFtcCA9IGRhdGV0aW1lKCd7bWF4VGltZXN0YW1wfScpLCBkZWx0YSA9IHRvdGltZXNwYW4oe2RlbHRhfSksIFRhYmxlID0gJ3t0YWJsZX0nLCBlbnRDb2wgPSAne2VudENvbHVtbn0nLCBjb2xOYW1lID0gJ3tjb2x1bW59JywgY29sVmFsID0gdG9zdHJpbmcoe2NvbHVtbn0pLCBjb2xUeXBlID0gZ2V0dHlwZSh7Y29sdW1ufSksIGV4cGVjdGVkQ291bnQgPSBleHBlY3RlZENvdW50Wy0xXSwgYWN0dWFsQ291bnQgPSBjb3VudF9bLTFdLCBhbm9tYWx5U2NvcmUgPSBhbm9tYWx5U2NvcmVbLTFd'\n",
" QUERIES['TIMEWINDOWQUERY'] = b'bGV0IGluZERhdGUgPSB0b2RhdGV0aW1lKCd7cURhdGV9Jyk7IHt0YWJsZX0gfCBleHRlbmQgaW5nZXN0aW9uX3RpbWUoKSB8IHdoZXJlICRJbmdlc3Rpb25UaW1lID4gaW5kRGF0ZSArIHtmfXtkZWx0YX0gYW5kICRJbmdlc3Rpb25UaW1lPGluZERhdGUgKyB7dH17ZGVsdGF9IHwgd2hlcmUge2VudENvbHVtbn0gaGFzICd7cUVudGl0eX0nIHwgcHJvamVjdCBpbmcgPSRJbmdlc3Rpb25UaW1lIHwgdGFrZSAxIA=='\n",
" QUERIES['ISENTITYINTABLE'] = b'bGV0IGluZERhdGUgPSB0b2RhdGV0aW1lKCd7cURhdGV9Jyk7IHt0YWJsZX0gfCB3aGVyZSBpbmdlc3Rpb25fdGltZSgpIGJldHdlZW4oKGluZERhdGUgLTFoKSAuLiAoaW5kRGF0ZSArIDFoKSkgfCBzZWFyY2ggJ3txRW50aXR5fScgfCB0YWtlIDE='\n",
"\n",
" @staticmethod\n",
" def get_query(name):\n",
" \"\"\" get KQL \"\"\"\n",
" en_query = AnomalyQueries.QUERIES[name]\n",
" query = base64.b64decode(en_query).decode('utf=8')\n",
" return query\n",
"\n",
"class AnomalyFinder():\n",
" \"\"\"\n",
" This class provides process flow functions for anomaly lookup.\n",
" Method - run is the main entry point.\n",
" \"\"\"\n",
" def __init__(self, workspace_id, la_data_client):\n",
" self.workspace_id = workspace_id\n",
" self.la_data_client = la_data_client\n",
" self.anomaly = ''\n",
"\n",
" def query_table_list(self):\n",
" \"\"\" Get a list of data tables from Log Analytics for the user \"\"\"\n",
" query = AnomalyQueries.get_query('LISTTABLES')\n",
" return self.query_loganalytics(query)\n",
"\n",
" def query_loganalytics(self, query):\n",
" \"\"\" This method will call Log Analytics through LA client \"\"\"\n",
" start_time = dt.datetime.now(timezone.utc) - dt.timedelta(30)\n",
" end_time=dt.datetime.now(timezone.utc)\n",
"\n",
" result = self.la_data_client.query_workspace(\n",
" workspace_id=self.workspace_id,\n",
" query=query,\n",
" timespan=(start_time, end_time))\n",
"\n",
" df = pd.DataFrame(data=result.tables[0].rows, columns=result.tables[0].columns)\n",
" return df\n",
"\n",
" @staticmethod\n",
" def construct_related_queries(df_anomalies):\n",
" \"\"\" This method constructs query for user to repo and can be saves for future references \"\"\"\n",
"\n",
" if df_anomalies.shape[0] == 0:\n",
" return None\n",
"\n",
" queries = ''\n",
" for tbl in df_anomalies.Table.unique():\n",
"\n",
" cur_table_anomalies = df_anomalies.loc[df_anomalies.Table == tbl, :]\n",
" query = \"\"\"{tbl} \\\n",
" | where TimeGenerated > datetime({maxTimestamp})-14d and TimeGenerated < datetime({maxTimestamp}) \\\n",
" | where {entCol} has \"{qEntity}\" \\\n",
" | where \"\"\".format(**{\n",
" 'tbl': tbl,\n",
" 'qTimestamp': cur_table_anomalies.qTimestamp.iloc[0],\n",
" 'maxTimestamp': cur_table_anomalies.maxTimestamp.iloc[0],\n",
" 'entCol': cur_table_anomalies.entCol.iloc[0],\n",
" 'qEntity': cur_table_anomalies.qEntity.iloc[0]\n",
" })\n",
"\n",
" for j, row in cur_table_anomalies.iterrows(): # pylint: disable=unused-variable\n",
" query += \" {col} == to{colType}(\\\"{colVal}\\\") or\".format(\n",
" col=row.colName,\n",
" colType=(row.colType) if 'colType' in row.keys() else 'string',\n",
" colVal=row.colVal.replace('\"', '')\n",
" )\n",
"\n",
" query = query[:-2] # drop the last or\n",
" query += \" | take 1000; \" # limit the output size\n",
" query = query.replace(\"\\\\\", \"\\\\\\\\\")\n",
"\n",
" queries += query\n",
" return queries\n",
"\n",
" def get_timewindow(self, q_entity, q_timestamp, ent_col, tbl):\n",
" \"\"\" find the relevant time window for analysis \"\"\"\n",
"\n",
" win_start = 0\n",
" min_timestamp = None\n",
" delta = None\n",
" max_timestamp = None\n",
" long_min_timestamp = None\n",
" time_window_query_template = AnomalyQueries.get_query('TIMEWINDOWQUERY')\n",
"\n",
" for from_hour in range(-30, 0, 1):\n",
" kql_time_range_d = time_window_query_template.format(\n",
" table=tbl,\n",
" qDate=q_timestamp,\n",
" entColumn=ent_col,\n",
" qEntity=q_entity,\n",
" f=from_hour,\n",
" t=from_hour+1,\n",
" delta='d')\n",
"\n",
" df_time_range = self.query_loganalytics(kql_time_range_d)\n",
"\n",
" if df_time_range.shape[0] > 0:\n",
" win_start = from_hour\n",
" break\n",
"\n",
" dt_q_timestamp = pd.to_datetime(q_timestamp)\n",
" ind2now = dt.datetime.utcnow() - dt_q_timestamp\n",
" if win_start < -3:\n",
" if ind2now > dt.timedelta(days=1):\n",
" delta = '1d'\n",
" max_timestamp = dt_q_timestamp + dt.timedelta(days=1)\n",
" else:\n",
" delta = '1d'\n",
" max_timestamp = dt.datetime.now()\n",
" long_min_timestamp = max_timestamp + dt.timedelta(days=win_start)\n",
" min_timestamp = max_timestamp + dt.timedelta(days=max([-6, win_start]))\n",
"\n",
" elif win_start < 0: # switch to hours\n",
" win_start_hour = -5\n",
" for from_hour in range(-3*24, -5, 1):\n",
" kql_time_range_h = time_window_query_template.format(\n",
" table=tbl,\n",
" qDate=q_timestamp,\n",
" entColumn=ent_col,\n",
" qEntity=q_entity,\n",
" f=from_hour,\n",
" t=from_hour+1,\n",
" delta='h')\n",
"\n",
" df_time_range = self.query_loganalytics(kql_time_range_h)\n",
"\n",
" if df_time_range.shape[0] > 0:\n",
" win_start_hour = from_hour\n",
" break\n",
" if win_start_hour < -5:\n",
" if ind2now > dt.timedelta(hours=1):\n",
" delta = '1h'\n",
" max_timestamp = dt_q_timestamp + dt.timedelta(hours=1)\n",
" else:\n",
" delta = '1h'\n",
" max_timestamp = dt.datetime.now()\n",
" min_timestamp = max_timestamp + dt.timedelta(hours=win_start_hour)\n",
" long_min_timestamp = min_timestamp\n",
"\n",
" return min_timestamp, delta, max_timestamp, long_min_timestamp\n",
"\n",
" def run(self, q_timestamp, q_entity, tables):\n",
" \"\"\" Main function for Anomaly Lookup \"\"\"\n",
"\n",
" progress_bar = WidgetViewHelper.define_int_progress_bar()\n",
" display(progress_bar) # pylint: disable=undefined-variable\n",
"\n",
" # list tables if not given\n",
" if not tables:\n",
" kql_list_tables = AnomalyQueries.get_query('LISTTABLES')\n",
" tables = self.query_loganalytics(kql_list_tables)\n",
" tables = tables.SentinelTableName.tolist()\n",
"\n",
" progress_bar.value += 1\n",
"\n",
" # find the column in which the query entity appears in each table\n",
" # - assumption that it appears in just one columns\n",
" tables2search = []\n",
" is_entity_in_table_template = AnomalyQueries.get_query('ISENTITYINTABLE')\n",
"\n",
" for tbl in tables:\n",
" kql_entity_in_table = is_entity_in_table_template.format(\n",
" table=tbl,\n",
" qDate=q_timestamp,\n",
" qEntity=q_entity)\n",
" ent_in_table = self.query_loganalytics(kql_entity_in_table)\n",
"\n",
" if ent_in_table.shape[0] > 0:\n",
" ent_col = [col for col in ent_in_table.select_dtypes('object').columns[1:] if\n",
" ent_in_table.loc[0, col] is not None\n",
" and ent_in_table.loc[:, col].str.contains(q_entity, case=False).all()]\n",
" if ent_col:\n",
" ent_col = ent_col[0]\n",
" tables2search.append({'table': tbl, 'entCol': ent_col})\n",
"\n",
" progress_bar.value += 2\n",
"\n",
" # for each table, find the time window to query on\n",
" for tbl in tables2search:\n",
" tbl['minTimestamp'], tbl['delta'], tbl['maxTimestamp'], tbl['longMinTimestamp'] = \\\n",
" self.get_timewindow(q_entity, q_timestamp, tbl['entCol'], tbl['table'])\n",
"\n",
" progress_bar.value += 1\n",
"\n",
" # identify all the categorical columns per table on which we will find anomalies\n",
" categorical_cols = []\n",
" is_cat_column_template = AnomalyQueries.get_query('ISCATCOLUMN')\n",
" is_cat_heuristic_template = AnomalyQueries.get_query('ISCATHEURISTIC')\n",
" for tbl in tables2search:\n",
" kql_is_cat_column = is_cat_column_template.format(table=tbl['table'])\n",
" df_cols = self.query_loganalytics(kql_is_cat_column)\n",
"\n",
" for col in df_cols.ColumnName:\n",
" kql_is_cat_heuristic = is_cat_heuristic_template.format(\n",
" table=tbl['table'],\n",
" column=col)\n",
" df_is_cat = self.query_loganalytics(kql_is_cat_heuristic)\n",
"\n",
" if df_is_cat.shape[0] > 0:\n",
" cat_col_info = copy.deepcopy(tbl)\n",
" cat_col_info['col'] = col\n",
" categorical_cols.append(cat_col_info)\n",
"\n",
" progress_bar.value += 2\n",
"\n",
" anomalies_list = []\n",
" time_series_anomaly_detection_template = \\\n",
" AnomalyQueries.get_query('TIMESERIESANOMALYDETECTION')\n",
" for col_info in categorical_cols:\n",
" max_timestamp = col_info['maxTimestamp'].strftime('%Y-%m-%dT%H:%M:%S.%f')\n",
" long_min_timestamp = col_info['longMinTimestamp'].strftime('%Y-%m-%dT%H:%M:%S.%f')\n",
"\n",
" kql_time_series_anomaly_detection = time_series_anomaly_detection_template.format(\n",
" table=col_info['table'],\n",
" column=col_info['col'],\n",
" entColumn=col_info['entCol'],\n",
" qEntity=q_entity,\n",
" minTimestamp=long_min_timestamp,\n",
" maxTimestamp=max_timestamp,\n",
" qTimestamp=q_timestamp,\n",
" delta=col_info['delta'])\n",
"\n",
" cur_anomalies = self.query_loganalytics(kql_time_series_anomaly_detection)\n",
"\n",
" anomalies_list.append(cur_anomalies)\n",
"\n",
" progress_bar.value += 2\n",
"\n",
" if anomalies_list:\n",
" anomalies = pd.concat(anomalies_list, axis=0)\n",
" else:\n",
" anomalies = pd.DataFrame()\n",
"\n",
" progress_bar.value += 2\n",
" queries = AnomalyFinder.construct_related_queries(anomalies)\n",
" progress_bar.close()\n",
" self.anomaly = str(anomalies.to_json(orient='records'))\n",
"\n",
" return anomalies, queries\n",
"\n",
"class WidgetViewHelper():\n",
" \"\"\" This classes provides helper methods for UI controls and components. \"\"\"\n",
" def __init__(self):\n",
" self.variable = None\n",
"\n",
" @staticmethod\n",
" def select_table(anomaly_lookup):\n",
" \"\"\" Select data tables \"\"\"\n",
" table_list = anomaly_lookup.query_table_list()\n",
" tables = list(table_list[\"SentinelTableName\"])\n",
" return widgets.Select(options=tables,\n",
" row=len(tables),\n",
" #value=[],\n",
" description='Tables:')\n",
"\n",
" @staticmethod\n",
" def define_int_progress_bar():\n",
" \"\"\" define progress bar \"\"\"\n",
" # pylint: disable=line-too-long\n",
" return IntProgress(value=0, min=0, max=10, step=1, description='Loading:', bar_style='success', orientation='horizontal', position='top')\n",
"\n",
" @staticmethod\n",
" def define_int_progress_bar():\n",
" \"\"\" Define a progress bar \"\"\"\n",
" return widgets.IntProgress(value=0,\n",
" min=0,\n",
" max=10,\n",
" step=1,\n",
" description='Loading:',\n",
" bar_style='success',\n",
" orientation='horizontal',\n",
" position='top')\n",
"\n",
" @staticmethod\n",
" # pylint: disable=line-too-long\n",
" def copy_to_clipboard(url, text_body, label_text):\n",
" \"\"\" Copy text to Clipboard \"\"\"\n",
" html_str = (\n",
" \"\"\"\n",
"