<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Example-Alert-Triage-Notebook" data-toc-modified-id="Example-Alert-Triage-Notebook-1">Example Alert Triage Notebook</a></span><ul class="toc-item"><li><ul class="toc-item"><li><span><a href="#Notebook-Setup" data-toc-modified-id="Notebook-Setup-1.0.1">Notebook Setup</a></span></li><li><span><a href="#Connect-to-your-Azure-Sentinel-Workspace" data-toc-modified-id="Connect-to-your-Azure-Sentinel-Workspace-1.0.2">Connect to your Azure Sentinel Workspace</a></span></li><li><span><a href="#Investigation-Information" data-toc-modified-id="Investigation-Information-1.0.3">Investigation Information</a></span></li><li><span><a href="#Set-the-time-window-you-wish-to-triage-events-from:" data-toc-modified-id="Set-the-time-window-you-wish-to-triage-events-from:-1.0.4">Set the time window you wish to triage events from:</a></span></li><li><span><a href="#Select-the-alert-types-you-are-interested-in:" data-toc-modified-id="Select-the-alert-types-you-are-interested-in:-1.0.5">Select the alert types you are interested in:</a></span></li><li><span><a href="#Lookup-Threat-Intel-reports-for-selected-alerts" data-toc-modified-id="Lookup-Threat-Intel-reports-for-selected-alerts-1.0.6">Lookup Threat Intel reports for selected alerts</a></span></li><li><span><a href="#Pick-an-Alert-to-Examine" data-toc-modified-id="Pick-an-Alert-to-Examine-1.0.7">Pick an Alert to Examine</a></span></li><li><span><a href="#Alerts-Timeline" data-toc-modified-id="Alerts-Timeline-1.0.8">Alerts Timeline</a></span></li><li><span><a href="#Next-Steps" data-toc-modified-id="Next-Steps-1.0.9">Next Steps</a></span></li></ul></li></ul></li></ul></div>

# Example Alert Triage Notebook

**Notebook Version:** 1.0<br>
**Python Version:** Python 3.6 (including Python 3.6 - AzureML)<br>
**Data Sources Required:** SecurityAlerts<br>

 
This Notebook assists analysts in triage Alerts within Azure Sentinel by enriching them with Threat Intelligence and OSINT data. This purpose it to allow analysts to quickly triage a large number of alerts and identify those to focus investigation on.

**How to use:**<br>
Run the cells in this Notebook in order, at various points in the Notebook flow you will be prompted to enter or select options relevant to the scope of your triage.<br>
This Notebook presumes you have Azure Sentinel Workspace settings and Threat Intelligence providers configured in a config file. If you do not have this in place please refer https://msticpy.readthedocs.io/en/latest/getting_started/msticpyconfig.html# to https://github.com/Azure/Azure-Sentinel-Notebooks/blob/master/ConfiguringNotebookEnvironment.ipynb  and to set this up.

### Notebook Setup

If this is your first time running this Notebook please run the cell below before proceeding to ensure you have the required packages installed correctly. 

In [None]:
#Check that the Notebook kernel is Pytyhon 3.6
import sys
MIN_REQ_PYTHON = (3,6)
if sys.version_info < MIN_REQ_PYTHON:
    print('Check the Kernel->Change Kernel menu and ensure that Python 3.6')
    print('or later is selected as the active kernel.')
    sys.exit("Python %s.%s or later is required.\n" % MIN_REQ_PYTHON)

# Install required packages for this Notebook
!pip install msticpy --upgrade --user
!pip install python-whois --user --upgrade
!pip install tqdm --upgrade --user
!pip install IPWhois --upgrade --user
!pip install tldextract --upgrade --user

Import the required packages and initialize a set of required entities and properties:

In [None]:
#Import required packages
print('Importing python packages....')
import whois
import numpy as np
import datetime as dt
import ipywidgets as widgets
import pandas as pd
print('Importing msticpy packages...')
from msticpy.sectools import *
from msticpy.nbtools import *
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', 50)
pd.set_option('display.max_colwidth', 100)
%env KQLMAGIC_LOAD_MODE=silent
WIDGET_DEFAULTS = {'layout': widgets.Layout(width="900px"),
                   'style': {'description_width': 'initial'}}
from msticpy.nbtools.foliummap import FoliumMap, get_center_ip_entities
from msticpy.nbtools.observationlist import Observations, Observation
summary = Observations()
from msticpy.data.data_providers import QueryProvider
ti = TILookup()
from msticpy.nbtools.utility import md, md_warn
pd.options.mode.chained_assignment = None
from msticpy.nbtools.wsconfig import WorkspaceConfig
from msticpy.sectools.ip_utils import convert_to_ip_entities

print('Imports complete')

### Connect to your Azure Sentinel Workspace
This cell collects Workspace details contained in your msticpyconfig.yaml file and uses them to authenticate.

In [None]:
#Collect Azure Sentinel Workspace Details from our config file and use them to connect
try:
    # Update to WorkspaceConfig(workspace="WORKSPACE_NAME") to get alerts from a Workspace other than your default one.
    # Run WorkspaceConfig().list_workspaces() to see a list of configured workspaces
    ws_config = WorkspaceConfig()
    ws_id = ws_config['workspace_id']
    ten_id = ws_config['tenant_id']
    md("Workspace details collected from config file")
    qry_prov = QueryProvider('LogAnalytics')
    qry_prov.connect(connection_str=ws_config.code_connect_str)
except RuntimeError:
    md("""You do not have any Workspaces configured in your config files.
       Please run the https://github.com/Azure/Azure-Sentinel-Notebooks/blob/master/ConfiguringNotebookEnvironment.ipynb
       to setup these files before proceeding""" ,'bold')

### Investigation Information

Enter some information relevant to your triage work , this will then be stored as part of this Notebook for future reference and recall. 
Please also select which Threat Intelligence providers  to use for enrichment. Please note you need to have auth details for each provider in order for this to operate. You can select one or more providers or select "All" to use all avaliable providers.

In [None]:
#Collect details for triage record
md("Enter Name:")
name = widgets.Text()
display(name)
md("Enter Ticket ID: ")
ticket = widgets.Text()
display(ticket)
md("Enter Description:")
description = widgets.Textarea()
display(description)

# Get list of configured TI providers and filter out non TI enrichments
ti_provs = [x for x in ti.configured_providers if not x == "OPR" and not x == "Tor"]
if not ti_provs:
    raise Exception("""You do not have any Threat Intelligence providers configured. 
                    Please refer to https://msticpy.readthedocs.io/en/latest/data_acquisition/TIProviders.html on how to configure them.""")

md("Select TI providers to use for enrichment.")
sel_ti_provs = widgets.SelectMultiple(
    options=ti_provs + ["All"],
    value=['All'],
    description='TI providers:',
    disabled=False
)
display(sel_ti_provs)

### Set the time window you wish to triage events from:

Adjust the time slider to select the timeframe for which you wish to triage alerts for.

In [None]:
# Set list of TI providers to use and record this in our summary record alongside triage details entered above.
if "All" in sel_ti_provs.value:
    ti_prov_use = ti_provs
else:
    ti_prov_use = list(sel_ti_provs.value)
invest_summary = Observation(caption="Investigation Details", data=
                             {"Analyst" : name.value, "Ticket": ticket.value, "Investigation Description" :description.value,
                              "Investigation Date": dt.datetime.now(), "TI Providers": ti_prov_use})
summary.add_observation(invest_summary)

# Widget to select time window in
query_times = nbwidgets.QueryTime(units='day',
                                      max_before=30, max_after=1, before=3)
query_times.display()

### Select the alert types you are interested in:

You can choose to select a subset of alerts based on provider in order to narrow your triage scope. You can also select "All" to return security alerts from all providers. Once a provider is selected you can additionally filter by Alert Name in order to focus on a specific alert type.

In [None]:
# Collect alerts based on the scope set above
alerts = qry_prov.SecurityAlert.list_alerts(query_times)
alerts_summ = Observation(caption="Alerts", data={"Data" : alerts, "Times": query_times })
summary.add_observation(alerts_summ)
# display summary of alerts retrieved
md("Alert summary", "large")
display(alerts.groupby("ProviderName")[["AlertName"]]
        .count()
        .reset_index()
        .rename(columns={"AlertName": "Alerts"})
)
def update_alert_names(_):
    selected_alert_type = sel_alerts.value
    if sel_prov.value != "All": 
        alert_names = alerts[alerts['ProviderName']==sel_prov.value]['AlertName'].unique()
    else:
        alert_names = alerts['AlertName'].unique()
    alert_names = np.append(alert_names, ["All"])
    sel_alerts.options = alert_names
    if selected_alert_type in alert_names:
        sel_alerts.value = selected_alert_type
    else:
        sel_alerts.value = "All"
    
if alerts.empty:
    md(f"No alerts in this Workspace during between {query_times.start} and {query_times.end}", "bold")
else:
    w_layout = list_layout = widgets.Layout(width="400px")
    #Select Provider to filter by
    providers = alerts['ProviderName'].unique()
    providers = np.append(providers, ["All"])
    sel_prov = widgets.Dropdown(
        options=providers,
        description='Providers:',
        disabled=False,
        layout=w_layout,
    )
    sel_prov.observe(update_alert_names, names="value")
    alert_names = alerts[alerts['ProviderName']==sel_prov.value]['AlertName'].unique()
    alert_names = np.append(alert_names, ["All"])
    sel_alerts = widgets.Dropdown(
        options=alert_names,
        description='Alert Names:',
        disabled=False,
        value = "All",
        layout=w_layout,
    )
    md("Select provider and/or Alert type to triage", "large")
    display(widgets.VBox([sel_prov, sel_alerts]))
    

### Lookup Threat Intel reports for selected alerts
Once alerts are collected we can enrich these alerts by looking up the entities associated with these alerts in Threat Intelligence. The TI Risk column in the table below represents an aggregation of results from the selected TI providers.

In [None]:
import json
from tqdm.notebook import tqdm

# Filter alerts based on AlertName and Provider
if sel_prov.value == "All":
    sent_alerts = alerts
else:
    sent_alerts = alerts[alerts['ProviderName'] == sel_prov.value]
if sel_alerts.value == "All":
    selected_alert_type = sent_alerts
else:
    selected_alert_type = sent_alerts[sent_alerts['AlertName']== sel_alerts.value]

def entity_load(entity):
    try:
        return json.loads(entity)
    except json.JSONDecodeError:
        return None

selected_alert_type['Entities'] = selected_alert_type['Entities'].apply(entity_load)

# Lookup each entity in TI and aggregate results into a overall severity based on the highest indicator severity.
def lookup(row):
    sev = []
    if row['Entities'] is not None:
        for entity in row['Entities']:
            try:
                if entity["Type"] == 'ip' or entity["Type"] == 'ipaddress':
                    resp = ti.lookup_ioc(observable=entity["Address"], providers=ti_prov_use)
                elif entity["Type"] == 'url':
                    resp = ti.lookup_ioc(observable=entity["Url"], providers=ti_prov_use)
                else:
                    resp = None          
                if resp:
                    for response in resp[1]:
                        sev.append(response[1].severity)
            except KeyError:
                pass
            
    if 'high' in sev:
        severity = "High"
    elif 'warning' in sev:
        severity = "Warning"
    elif 'information' in sev:
        severity = "Information"
    else:
        severity = "None"
    return severity


# Highlight cells based on Threat Intelligence results.        
def color_cells(val):
    if isinstance(val, str):
        if val.lower() == "high":
            color = 'Red'
        elif val.lower() == 'warning':
            color = 'Orange'
        elif val.lower() == 'information':
            color = 'Green'
        else:
            color = 'none'
    else:
        color = 'none'
    return 'background-color: %s' % color 

tqdm.pandas(desc="Lookup progress")

selected_alert_type['TI Risk'] = selected_alert_type.progress_apply(lookup, axis=1)
display(selected_alert_type[['StartTimeUtc','AlertName','Severity','TI Risk', 'Description']]
        .sort_values(by=['StartTimeUtc']).style.applymap(color_cells).hide_index())

### Pick an Alert to Examine

We can drill down into a specific alert by selecting it from the list below. This will return additional details on the alert as well as details of any threat intelligence matches.

In [None]:
from msticpy.sectools.ip_utils import convert_to_ip_entities
from msticpy.nbtools.foliummap import FoliumMap, get_center_ip_entities

#Display full alert details when selected
def show_full_alert(selected_alert):
    global security_alert, alert_ip_entities
    security_alert = SecurityAlert(
        rel_alert_select.selected_alert)
    nbdisplay.display_alert(security_alert, show_entities=True) 
    ioc_list = []
    if security_alert['Entities'] is not None:
        for entity in security_alert['Entities']:
            if entity['Type'] == 'ipaddress' or entity['Type'] == 'ip':
                ioc_list.append(entity['Address'])
            elif entity["Type"] == 'url':
                ioc_list.append(entity['Url'])
        if len(ioc_list) > 0:
            ti_data = ti.lookup_iocs(data=ioc_list, providers=ti_prov_use)
            display(ti_data[['Ioc','IocType','Provider','Result','Severity','Details']].reset_index().style.applymap(color_cells).hide_index())
            ti_ips = ti_data[ti_data['IocType'] == 'ipv4']
            # If we have IP entities try and plot these on a map
            if not ti_ips.empty:
                ip_ents = [convert_to_ip_entities(i)[0] for i in ti_ips['Ioc'].unique()]
                center = get_center_ip_entities(ip_ents)
                ip_map = FoliumMap(location=center, zoom_start=4)
                ip_map.add_ip_cluster(ip_ents, color='red')
                display(ip_map)
        else:
            md("No IoCs")
    else:
        md("No IoCs")
            
# Show selected alert when selected
if isinstance(alerts, pd.DataFrame) and not alerts.empty:
    ti_data = None
    md('Click on alert to view details.', "large")
    rel_alert_select = nbwidgets.AlertSelector(alerts=selected_alert_type,
                                               action=show_full_alert)
    rel_alert_select.display()
    # Add alert details to summary.
    if ti_data is not None:
        alert_details = Observation(caption="Alert Details", data={"Alert":security_alert, "TI":ti_data})
    else:
        alert_details = Observation(caption="Alert Details", data=security_alert)
    summary.add_observation(alert_details)
else:
    md('No alerts found.')
    


### Alerts Timeline
The cell below displays a timeline of the alerts you are triaging, with the selected alert highlighted in order to provide context on the alert.

In [None]:
# Display timeline of all alerts grouped by the TI risk score of them
selected_alert = Observation(caption="Alert Details", data=rel_alert_select.selected_alert)
summary.add_observation(selected_alert)

if len(selected_alert_type) == 1:
    md("Only one alert in selected alert provider/type - can't display timeline.")
else:
    nbdisplay.display_timeline(
        data=selected_alert_type, time_column="StartTimeUtc",
        group_by="TI Risk", source_columns=["AlertName"], 
        alert=rel_alert_select.selected_alert, title="Alerts over time grouped by TI risk score")

### Next Steps

Now that we have selected an alert of interest and triage key details we need to identify next investigative steps. The cell below identifies and extracts key entities from the selected alert. It provides additional enrichment to them using OSINT and based on their type recommends an additional Notebook to run for further investigation based on the Notebooks available at https://github.com/Azure/Azure-Sentinel-Notebooks/ or via the Azure Sentinel portal.

In [None]:
from ipwhois import IPWhois
import whois
from ipaddress import ip_address
import tldextract

# Based on the extracted entity enrich it with OSINT
def enhance(row):
    if row['Type'] == "ipaddress":
        return whois_desc(row['Entity'])
    elif row['Type'] == "host":
        return host_sum(row['Entity'])
    elif row['Type'] == "url":
        return whois_url(row['Entity'])
    
# If entity is a hostname, get key details of the host.
def host_sum(host):
    hb_q = f"Heartbeat | where TimeGenerated > datetime({query_times.start}) and TimeGenerated < datetime({query_times.end}) | where Computer == '{host}' | take 1"
    hb = qry_prov.exec_query(hb_q)
    if not hb.empty:
        hb_str = f"{host} - {hb['ComputerIP'][0]} - {hb['OSType'][0]} - {hb['ComputerEnvironment'][0]}"
    else:
        hb_str = "No host heartbeat"
    return hb_str
        
# If entity is IP address work out what type of address it is and if a public IP address get ASN name.
def whois_desc(ip_lookup, progress=False):
    try:
        ip = ip_address(ip_lookup)
    except ValueError:
        return "Not an IP Address"
    if ip.is_private:
        return "Private address space"
    if not ip.is_global:
        return "Other address space"
    ip_whois = IPWhois(ip)
    whois_result = ip_whois.lookup_whois()
    return whois_result["asn_description"]
        
# If entity is a URL get the name of the organisation that registered the domain.
def whois_url(url):
    _, domain,tld = tldextract.extract(url)
    wis = whois.whois(f"{domain}.{tld}")
    return wis['org']
    
# Based on the entity type suggest a Notebook for future investigation.
def notebook_suggestor(row):
    if row['Type'] in notebooks.keys():
        return notebooks[row['Type']]
    else:
        return "Write your own Notebook" 

notebooks = {"ipaddress" : "Entity Explorer - IP Address",
            "host" : "Entity Explorer - Linux Host/Windows Host",
            "account" : "Entity Explorer - Account",
            "url" : "Entity Explorer - Domain and URL"}
    
md('Entities for further investigation:', 'bold')
ents = security_alert.get_all_entities()
if not ents.empty:
    ents['Notebook'] = ents.apply(notebook_suggestor, axis=1)
    ents['Enrichment'] = ents.apply(enhance, axis=1)
    display(ents.style.hide_index())

    # Save entity details into our summary.
    entities = Observation(caption="Entities for further investigation", data=ents)
    summary.add_observation(entities)
else:
    md('No entities found in this alert')

In [None]:
#Uncomment the line below to see a summary of this Notebook's output
#summary.display_observations()