# **Masquerading Process Name Anomaly Algorithm**

**Notebook Version:** 1.0

**Python Version:** Python 3.8

**Required Packages:** azure_sentinel_utilities, damerauLevenshtein, azureml-synapse

**Platforms Supported:** Azure Synapse Workspace, Azure Sentinel, Azure Log Analytics Workspace, Storage Account, Azure Machine Learning Notebooks connected to Azure Synapse Workspace

**Data Source Required:** Yes

**Data Source:** SecurityEvents

**Spark Version:** 3.1 or above

## **Description**

This notebook demonstrates how to apply custom machine learning algorithms to data in Azure Sentinel. 
It showcases a Masquerading Process Name anomaly algorithm, which looks for Windows process creation events for processes whose names are similar to known normal processes.
It is a very common attack vector for malicious processes to masquerade as known normal processes by having names similar to known normal ones but different by a single character. Since these are easy to miss when simply looked at, they can succeed at running malicious code on your machine. Examples of such malicious processes are scvhost.exe, svch0st.exe, etc. -> Known normal process here was svchost.exe.

The data used here is from the SecurityEvents table with EventID = 4688. These correspond to process creation events from Windows machines. 

You will have to export this data from your Log Analytics workspace into a storage account. Instructions for this LA export mechanism can be found here: [LA export mechanism](https://docs.microsoft.com/azure/azure-monitor/logs/logs-data-export?tabs=portal). 

Here is a [Blog explaining data export](https://techcommunity.microsoft.com/t5/microsoft-sentinel-blog/export-historical-log-data-from-microsoft-sentinel/ba-p/3413418)

Data is then loaded from this storage account container and the results are published to your Log Analytics resource.

**This notebook can be run either from the AML platform or directly off of Synapse. Based on what you choose, the setup will differ. Please follow either section A or B, that suits you, for setup before running the main pyspark code.**

## **A. Running on AML**

You will need to configure your environment to use a Synapse cluster with your AML workspace. For this, you require to setup the Synapse compute and attach the necessary packages/wheel files. Then, for the rest of the code, you need to convert to using Synapse language by marking each cell with a %%synapse header.

Steps:
1. Install AzureML Synapse package on the AML compute to use spark magics
2. Configure AzureML and Azure Synapse Analytics
3. Attach the required packages and wheel files to the compute.
4. Start Spark session

In [None]:
from azureml.core import Workspace, LinkedService

### **1. Install AzureML Synapse package on the AML compute to use spark magics**

You will have to setup the AML compute that is attached to your notebook with some packages so that the rest of this code can run properly.

In [None]:
# Run the following line and confirm that 'azureml-synapse' is not installed.
%pip list

In [None]:
# Now run the following line and then restart the kernel/compute so that the package is installed.
%pip install azureml-synapse

In [None]:
# Rerun the following line and confirm that 'azureml-synapse' is installed.
%pip list

### **2. Configure AzureML and Azure Synapse Analytics**

Please use notebook [Configurate Azure ML and Azure Synapse Analytics](https://github.com/Azure/Azure-Sentinel-Notebooks/blob/master/Configurate%20Azure%20ML%20and%20Azure%20Synapse%20Analytics.ipynb) to configure environment.

The notebook will configure existing Azure synapse workspace to create and connect to Spark pool. You can then create linked service and connect AML workspace to Azure Synapse workspaces. You can skip point 6 which exports data from Log Analytics to Datalake Storage Gen2 because you have already set up the data export to the storage account above.

**Note:** Specify the input parameters in below step in order to connect AML workspace to synapse workspace using linked service.

In [None]:
amlworkspace = "" # fill in your AML workspace name
subscription_id = "" # fill in your subscription id
resource_group = '' # fill in your resource groups for AML workspace
linkedservice = '' # fill in your linked service created to connect to synapse workspace

**Authentication to Azure Resources:** 

In this step we will connect aml workspace to linked service connected to Azure Synapse workspace

In [None]:
# Get the aml workspace
aml_workspace = Workspace.get(name=amlworkspace, subscription_id=subscription_id, resource_group=resource_group)

# Retrieve a known linked service
linked_service = LinkedService.get(aml_workspace, linkedservice)

### **3. Attach the required packages and wheel files to the compute.**

You will have to setup the spark pool that is attached to your notebook with some packages so that the rest of this code can run properly.

Please follow these steps:
1. On the AML Studio left menu, navigate to **Linked Services**
2. Click on the name of the Link Service you want to use
3. Select **Spark pools** tab
4. Click the Spark pool you want to use.
5. In Synapse Properties, click the Synapse workspace. It will open the workspace in a new tab.
6. Click on 'Manage' in the left window.
7. Click on 'Apache Spark pools' in the left window.
8. Select the '...' in the pool you want to use and click on 'Packages'.
9. Now upload the following two files in this blade.

a. Create a requirements.txt with the following line in it and upload it to the Requirements section

 fastDamerauLevenshtein
b. Download the azure_sentinel_utilities whl package from [Repo](https://github.com/Azure/Azure-Sentinel/tree/master/BYOML/Libraries)

 First upload this package in the 'Workspace packages' in the left tab of the original blade.
 
c. Then select this package from there in this tab.


### **4. Start Spark session**

Enter your Synapse Spark compute below. To find the Spark compute, please follow these steps:
1. On the AML Studio left menu, navigate to **Linked Services**
2. Click on the name of the Link Service you want to use
3. Select **Spark pools** tab
4. Get the Name of the Spark pool you want to use.

In [None]:
synapse_spark_compute = input('Synapse Spark compute:')

In [None]:
# Start spark session
%synapse start -s $subscription_id -w $amlworkspace -r $resource_group -c $synapse_spark_compute

## **B. Running directly on Synapse**

You will need to attach the required packages and wheel files to the cluster you intend to use with this notebook. Follow Step 3 above to complete this.

## **Common Code**

From here on below, all the steps are the same for both AML and Synapse platforms. The main difference is, if you have setup through AML then pre-pend each pyspark block with the synapse header %%synapse. For Synapse runs, don't add that header.

1. One-time: Set credentials in KeyVault so the notebook can access
 - [Create KeyVault](https://docs.microsoft.com/azure/key-vault/general/quick-create-portal)
 - Store the following secrets in the KeyVault
 - Storage Account connection string: the keyName should be 'saConnectionString'
 - Log Analytics workspaceSharedKey: the keyName should be 'wsSharedKey'
 - Log Analytics workspaceId: the keyName should be 'wsId'
 - Log Analytics workspaceResourceId: the keyName should be 'wsResourceId'
 - Add the KeyVault as a [linked service](https://docs.microsoft.com/azure/data-factory/store-credentials-in-key-vault) to your Azure Synapse workspace

2. Ensure the settings in the cell below are filled in.

In [None]:
%%synapse

from azure_sentinel_utilities.azure_storage import storage_blob_manager
from azure_sentinel_utilities.log_analytics import log_analytics_client
import re
import datetime as dt
import time
from pyspark.sql import functions as F, types as T
from pyspark.sql.window import Window
from fastDamerauLevenshtein import damerauLevenshtein
import random
import string

These are some customizable variables which are used further in the code.

In [None]:
%%synapse

frequentThreshold = 0.8 # If the percentile of a process's creation count is above this threshold, it is considered a frequent and normal process.
infrequentThreshold = 0.2 # If the percentile of a process's creation count is lower than this threshold, it is considered an infrequent and possibly malicious process.
levenDistThreshold = 0.85 # Higher the levenshtein distance, more similar are the two sequences. This threshold will help you select only the very similar processes and remove the noise.

Making Connections to the Storage Account and KeyVaults for user credentials

In [None]:
%%synapse

#Log Analytics WorkSpace (Sentinel) to write the results
workspaceId = mssparkutils.credentials.getSecret(keyVault = 'YOUR_KEYVAULT_HERE', keyName = 'wsId', linkedServiceName = 'YOUR_LINKED_SERVICE_HERE') # wks_guid
workspaceSharedKey = mssparkutils.credentials.getSecret(keyVault = 'YOUR_KEYVAULT_HERE', keyName = 'wsSharedKey', linkedServiceName = 'YOUR_LINKED_SERVICE_HERE')
workspaceResourceId = mssparkutils.credentials.getSecret(keyVault = 'YOUR_KEYVAULT_HERE', keyName = 'wsResourceId', linkedServiceName = 'YOUR_LINKED_SERVICE_HERE') # eg: /subscriptions//resourcegroups//providers/microsoft.operationalinsights/work

#extract storage account and key from connection string
connectionString = mssparkutils.credentials.getSecret(keyVault = 'YOUR_KEYVAULT_HERE', keyName = 'saConnectionString', linkedServiceName = 'YOUR_LINKED_SERVICE_HERE')
print("Connection String to your storage account is : ", connectionString)

keyPattern = 'DefaultEndpointsProtocol=(\w+);AccountName=(\w+);AccountKey=([^;]+);'
match = re.match(keyPattern, connectionString)
storageAccount = match.group(2)
storageKey = match.group(3)
print("Storage Account is : ", storageAccount)
print("Storage Key is : ", storageKey)

containerName = "am-securityevent" # This name is fixed for security events
basePath = "WorkspaceResourceId={workspaceResourceId}".format(workspaceResourceId=workspaceResourceId)
print("BasePath is : ", basePath)

startTime = dt.datetime.now() - dt.timedelta(days=1)
endTime = dt.datetime.now() - dt.timedelta(days=0)
startTimeStr = startTime.strftime("%m/%d/%Y, %I:%M:%S.%f %p")
print("Start Time of Algo run is : ", startTime)
endTimeStr = endTime.strftime("%m/%d/%Y, %I:%M:%S.%f %p")
print("End Time of Algo run is : ", endTime)

This cell defines the helper functions.
1. calcDist() -> calculates the Levenshtein distance. This is a measure of the difference between two sequences by calculating the edit distance. It takes into account the number of different characters in the sequence as well as the length of the sequences. If the extensions of both the processes are the same, then it excludes the extension when calculating the distance. 
2. getRandomTimeStamp() -> calculates a random timestamp. This is added to the synthetically created process events.
3. getKnownNormalProcs() -> creates a hardcoded list of known normal processes which malicious processes may masquerade as.
4. getSyntheticMaliciousProcs() -> creates a list of potentially malicious processes by modifying a single random letter of the normal processes to form new names.
5. getSyntheticEvents() -> synthetically creates a list of 4688 events. It gets the known normal and synthetically created malicious process names from previous functions and creates entire events using time stamp and process path.

In [None]:
%%synapse

def calcDist(one, two):

 oneDot = -1
 twoDot = -1
 if(one is not None and two is not None):
 oneDot = one.rfind('.')
 twoDot = two.rfind('.')

 if((oneDot != -1) and (twoDot != -1)):
 oneEnd = one[oneDot+1:]
 twoEnd = two[twoDot+1:]
 if(oneEnd == twoEnd):
 one = one[:oneDot]
 two = two[:twoDot]

 return damerauLevenshtein(one, two)
 return 0.0

calcDistUdf = F.udf(calcDist, T.FloatType())

def getRandomTimeStamp():

 input_time_format = '%m/%d/%Y, %I:%M:%S.%f %p'
 output_time_format = '%m/%d/%Y, %I:%M:%S.000 %p'
 randAdd = random.random()

 stime = time.mktime(time.strptime(startTimeStr, input_time_format))
 etime = time.mktime(time.strptime(endTimeStr, input_time_format))

 ptime = stime + randAdd * (etime - stime)

 return time.strftime(output_time_format, time.localtime(ptime))

getRandomTimeStampUdf = F.udf(getRandomTimeStamp, T.StringType())

def getListGoodProcs():

 return [
 "svchost", "csrss", "smss", "services",
 "winlogon", "wininit", "lsass",
 "spoolsv", "conhost", "powershell",
 "init", "bash", "avahi-daemon", "gnome-session", "getty",
 "acpid", "dbus-daemon", "dbus-launch", "networkmanager",
 "explorer", "more.com", "mode.com",
 "nbtstat", "netstat", "cscript", "cnscript",
 "tracert", "tracerpt", "systeminfo", "system_info",
 "aitagent", "adtagent", "wininit", "wininst",
 "userinit", "userinst", "dnscmd", "dfscmd",
 "nslookup", "nblookup"
 ]

def getListKnownProcs():
 
 goodProcs = getListGoodProcs()

 goodProcs = [s + ('.exe' if not '.' in s else '') for s in goodProcs]
 df = spark.createDataFrame(goodProcs, T.StringType())
 df = df.withColumnRenamed("value", "Process")
 return df

def getSyntheticMaliciousProcs():

 badProcs = []
 goodProcs = getListGoodProcs()
 for process in goodProcs:
 length = len(process)
 randNum = random.randint(1, length-2) # not changing first or last letter because that is easier to spot

 # get other random integer
 randLetter = random.choice(string.ascii_lowercase)

 # substitute original letter with random letter
 temp = list(process)
 temp[randNum] = randLetter
 badProcs = badProcs + ["".join(temp)]

 badProcs = [s + ('.exe' if not '.' in s else '') for s in badProcs]
 badProcs = badProcs + ["scvhost.exe", "svch0st.exe"] # adding known masquerading processes
 df = spark.createDataFrame(badProcs, T.StringType())
 df = df.withColumnRenamed("value", "Process")
 return df

def getSyntheticEvents(typeOfEvent):

 processPath = ""
 numExplode = 1
 if(typeOfEvent == "normal"):
 processPath = "C:\Windows\System32\\"
 df = getListKnownProcs()
 numExplode = 20
 elif(typeOfEvent == "malicious"):
 processPath = "C:\Windows\Temp\\"
 df = getSyntheticMaliciousProcs()

 df = df.withColumn("NewProcessName", F.concat(F.lit(processPath), F.col("Process")))
 df = df.withColumn("NumExplode", F.lit(numExplode))
 df = df.withColumn("EventID", F.lit("4688"))

 new_df = df.withColumn('NumExplode', F.expr('explode(array_repeat(NumExplode,int(NumExplode)))')).drop("NumExplode")
 new_df = new_df.withColumn("TimeGenerated", getRandomTimeStampUdf())
 new_df = new_df.withColumn("From", F.lit("Hardcoded"))
 new_df = new_df.select("NewProcessName", "Process", "TimeGenerated", "From")
 
 return new_df

Next, we define the schema of the input and get the raw customer 4688 events. We are using the following details: EventID, NewProcessName, Process, TimeGenerated.

In [None]:
%%synapse

def security_event_schema():

 return T.StructType([
 T.StructField(name = "EventID", dataType = T.StringType(), nullable = True),
 T.StructField(name = "NewProcessName", dataType = T.StringType(), nullable = True),
 T.StructField(name = "Process", dataType = T.StringType(), nullable = True),
 T.StructField(name = "TimeGenerated", dataType = T.StringType(), nullable = True),
 ])

blobManager = storage_blob_manager(connectionString)
raw_df = blobManager.get_raw_df(startTime, endTime, containerName, basePath, security_event_schema(), blobManager.get_blob_service_client(connectionString))

final = raw_df.where(F.col("EventID") == "4688")
final = final.withColumn("Process", F.lower("Process"))
final = final.drop("EventID")
final = final.withColumn("From", F.lit("User"))

final = final.cache()
print("There are ", final.count(), " events of type 4688 to process.")
final.show()


Here we append synthetically created normal and malicious process creation events. This is being done to show performance of this algorithm by ensuring some masquerading process names are caught.

In [None]:
%%synapse

normalEvents = getSyntheticEvents("normal")
potentiallyMaliciousEvents = getSyntheticEvents("malicious")
final = final.union(normalEvents).union(potentiallyMaliciousEvents)
print("Count of SecurityEvents + Synthethically created 4688 events are : ", final.count())

We are comparing frequent to infrequent processes to decide maliciousness of a process.

The approach here is that we consider processes occuring more than 'frequentThreshold' percentile of the time as normal and those occuring less than 'infrequentThreshold' percentile of the time as potentially malicious. Those in the middle range are excluded from analysis because they fall in the grey area of being of relatively high popularity but falling below the first threshold.

The values of these thresholds can be customized by you based on your needs.

In [None]:
%%synapse

groupProcess = final.groupBy("Process").count().sort(F.desc("count"))
groupProcess = groupProcess.select("Process","count", F.percent_rank().over(Window.partitionBy().orderBy(groupProcess['count'])).alias("percent_rank"))
groupProcess = groupProcess.sort(F.desc("percent_rank"))

frequentProcess = groupProcess.where(F.col("percent_rank") >= frequentThreshold).select("Process")
frequentProcess = frequentProcess.withColumnRenamed("Process", "frequentProcess")
infrequentProcess = groupProcess.where(F.col("percent_rank") < infrequentThreshold).select("Process")
infrequentProcess = infrequentProcess.withColumnRenamed("Process", "infrequentProcess")

print("There are ", frequentProcess.count(), " normal processes in your data")
print("There are ", infrequentProcess.count(), " potentially malicious processes in your data")
print("Examples of some potentially malicious processes: ")
infrequentProcess.show()

Next we find the Levenshtein distance between the normal and potentially malicious processes to check whether we have any masquerading processes.

In [None]:
%%synapse

compare = frequentProcess.crossJoin(infrequentProcess)
compare = compare.withColumn("Dist", calcDistUdf(F.col("frequentProcess"), F.col("infrequentProcess")))
print("Showing the Levenshtein distances between various processes")
compare.show()

It is always useful to have the corresponding process paths from where the processes spawned to understand maliciousness of the process. This cell finds the paths of all the processes, for context. We also filter based on a threshold values which you can alter to better fit your criteria.

In [None]:
%%synapse

frequentProcessWhole = compare.join(final, (final.Process == compare.frequentProcess), how = "left").drop("Process")
frequentProcessWhole = frequentProcessWhole.withColumnRenamed("NewProcessName", "frequentProcessPath")
frequentProcessWhole = frequentProcessWhole.withColumnRenamed("TimeGenerated", "frequentTimeGenerated")
frequentProcessWhole = frequentProcessWhole.withColumnRenamed("From", "frequentFrom")

infrequentProcessWhole = frequentProcessWhole.join(final, (final.Process == frequentProcessWhole.infrequentProcess), how = "left").drop("Process")
infrequentProcessWhole = infrequentProcessWhole.withColumnRenamed("NewProcessName", "infrequentProcessPath")
infrequentProcessWhole = infrequentProcessWhole.withColumnRenamed("TimeGenerated", "infrequentTimeGenerated")
infrequentProcessWhole = infrequentProcessWhole.withColumnRenamed("From", "infrequentFrom")

infrequentProcessWholeFiltered = infrequentProcessWhole.where(F.col("Dist") > levenDistThreshold)
print("Your anomalies: ")
(infrequentProcessWholeFiltered.where((F.col("frequentFrom") == "User") & (F.col("infrequentFrom") == "User"))).show()

print("Hardcoded anomalies examples")
(infrequentProcessWholeFiltered.where((F.col("frequentFrom") == "Hardcoded") | (F.col("infrequentFrom") == "Hardcoded"))).show()

To remove noise, we are extracting only the process names and path information of the potentially malicious process names.

In [None]:
%%synapse

print("Showing potential anomalies after removing noise")
(infrequentProcessWholeFiltered.drop("frequentTimeGenerated", "infrequentTimeGenerated").distinct()).show()

Sending results to Log Analytics

In [None]:
%%synapse

def escape_str(str):
 return str.replace('\\','\\\\')

escape_strUdf = F.udf(escape_str, T.StringType())

def send_results_to_log_analytics(df_to_la):
 # The log type is the name of the event that is being submitted. This will show up under "Custom Logs" as log_type + '_CL'
 log_type = 'MasqueradingProcessNameResult'
 df_to_la = df_to_la.withColumn('timestamp', F.current_timestamp())

 # concatenate columns to form one json record
 json_records = df_to_la.withColumn('json_field', F.concat(F.lit('{'), 
 F.lit(' \"TimeStamp\": \"'), F.from_unixtime(F.unix_timestamp(F.col("timestamp")), "y-MM-dd'T'hh:mm:ss.SSS'Z'"), F.lit('\",'),
 F.lit(' \"NormalProcess\": \"'), escape_strUdf(F.col('frequentProcess')), F.lit('\",'),
 F.lit(' \"PotentiallyMaliciousProcess\": \"'), escape_strUdf(F.col('infrequentProcess')), F.lit('\",'),
 F.lit(' \"AnomalyScore\":'), F.col('Dist'),
 F.lit('}')
 ) 
 )
 # combine json record column to create the array
 json_body = json_records.agg(F.concat_ws(", ", F.collect_list('json_field')).alias('body'))

 if len(json_body.first()) > 0:
 json_payload = json_body.first()['body']
 json_payload = '[' + json_payload + ']'

 payload = json_payload.encode('utf-8')
 return log_analytics_client(workspaceId, workspaceSharedKey).post_data(payload, log_type)
 else:
 return "No json data to send to LA"

In [None]:
%%synapse

print("Sending results to LogAnalytics")
print("Sending ", infrequentProcessWholeFiltered.count(), " results to Log Analytics")
send_results_to_log_analytics(infrequentProcessWholeFiltered)
print("Done")

In [None]:
# Run the following line if you have been running through AML
%synapse stop