{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "# Zeek to Spark Clustering\n", "In this notebook we will pull Zeek data into Spark then do some analysis and clustering. The first step is to convert your Zeek log data into a Parquet file, for instructions on how to do this (just a few lines of Python code using the ZAT package) please see this notebook:\n", "\n", "
\n", "\n", "### See these related notebooks\n", "- [Zeek to Parquet](https://nbviewer.jupyter.org/github/SuperCowPowers/zat/blob/main/notebooks/Zeek_to_Parquet.ipynb)\n", "- [Zeek to Spark](https://nbviewer.jupyter.org/github/SuperCowPowers/zat/blob/main/notebooks/Zeek_to_Spark.ipynb)\n", "\n", "Apache Parquet is a columnar storage format focused on performance. Reading Parquet data is fast and efficient, for this notebook we will specifically be using it for loading data into Spark.\n", "\n", "
\n", "
\n", "\n", "### Software\n", "- Zeek Analysis Tools (ZAT): https://github.com/SuperCowPowers/zat\n", "- Parquet: https://parquet.apache.org\n", "- Spark: https://spark.apache.org\n", "- Spark MLLib: https://spark.apache.org/mllib/\n", "\n", "### Data\n", "- About 1/2 million rows of a Zeek dns.log\n", "- Grabe the data here: [data.kitware.com](https://data.kitware.com/#collection/58d564478d777f0aef5d893a) (with headers)" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "ZAT: 0.3.7\n", "PySpark: 2.4.4\n" ] } ], "source": [ "# Third Party Imports\n", "import pyspark\n", "from pyspark.sql import SparkSession\n", "\n", "# Local imports\n", "import zat\n", "\n", "# Good to print out versions of stuff\n", "print('ZAT: {:s}'.format(zat.__version__))\n", "print('PySpark: {:s}'.format(pyspark.__version__))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "# Spark It!\n", "### Spin up Spark with 4 Parallel Executors\n", "Here we're spinning up a local spark server with 4 parallel executors, although this might seem a bit silly since we're probably running this on a laptop, there are a couple of important observations:\n", "\n", "
\n", "\n", "- If you have 4/8 cores use them!\n", "- It's the exact same code logic as if we were running on a distributed cluster.\n", "- We run the same code on **DataBricks** (www.databricks.com) which is awesome BTW.\n", "\n" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "# Spin up a local Spark Session (with 4 executors)\n", "spark = SparkSession.builder.master(\"local[4]\").appName('my_awesome').getOrCreate()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "## Read in our Parquet File\n", "Here we're loading in a Zeek DNS log with ~1/2 million rows to demonstrate the functionality and do some analysis and clustering on the data. For more information on converting Zeek logs to Parquet files please see our Zeek to Spark notebook:\n", "\n", "#### Zeek logs to Parquet Notebook\n", "- [Zeek to Spark (and Parquet)](https://nbviewer.jupyter.org/github/SuperCowPowers/zat/blob/main/notebooks/Zeek_to_Spark.ipynb)" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "# Have Spark read in the Parquet File\n", "spark_df = spark.read.parquet('/Users/briford/data/bro/dns.parquet')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "# Lets look at our data\n", "We should always inspect out data when it comes in. Look at both the data values and the data types to make sure you're getting exactly what you should be." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Number of Rows: 427935\n", "Columns: ts,uid,id_orig_h,id_orig_p,id_resp_h,id_resp_p,proto,trans_id,query,qclass,qclass_name,qtype,qtype_name,rcode,rcode_name,AA,TC,RD,RA,Z,answers,TTLs,rejected\n" ] } ], "source": [ "# Get information about the Spark DataFrame\n", "num_rows = spark_df.count()\n", "print(\"Number of Rows: {:d}\".format(num_rows))\n", "columns = spark_df.columns\n", "print(\"Columns: {:s}\".format(','.join(columns)))" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+-----+------+\n", "|qtype_name|proto| count|\n", "+----------+-----+------+\n", "| A| udp|212473|\n", "| NB| udp| 77199|\n", "| AAAA| udp| 54519|\n", "| PTR| udp| 52991|\n", "| TXT| udp| 12644|\n", "| SRV| udp| 12268|\n", "| -| udp| 3472|\n", "| *| udp| 882|\n", "| AXFR| tcp| 440|\n", "| SOA| udp| 346|\n", "| TXT| tcp| 226|\n", "| -| tcp| 176|\n", "| MX| udp| 169|\n", "| NS| udp| 43|\n", "| HINFO| udp| 30|\n", "| NAPTR| udp| 27|\n", "| PTR| tcp| 26|\n", "| A| tcp| 4|\n", "+----------+-----+------+\n", "\n" ] } ], "source": [ "spark_df.groupby('qtype_name','proto').count().sort('count', ascending=False).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "# Data looks good, lets take a deeper dive\n", "Spark has a powerful SQL engine as well as a Machine Learning library. So now that we've loaded our Zeek data we're going to utilize the Spark SQL commands to do some investigation of our data including clustering from the MLLib.\n", "\n", "
\n", "
" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "# Plotting defaults\n", "%matplotlib inline\n", "import matplotlib.pyplot as plt\n", "from zat.utils import plot_utils\n", "plot_utils.plot_defaults()" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "# Add a column with the string length of the DNS query\n", "from pyspark.sql.functions import col, length\n", "\n", "# Create new dataframe that includes two new column\n", "spark_df = spark_df.withColumn('query_length', length(col('query')))\n", "spark_df = spark_df.withColumn('answer_length', length(col('answers')))" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Text(0, 0.5, 'Counts')" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "# Show histogram of the Spark DF request body lengths\n", "bins, counts = spark_df.select('query_length').rdd.flatMap(lambda x: x).histogram(50)\n", "\n", "# This is a bit awkward but I believe this is the correct way to do it\n", "plt.hist(bins[:-1], bins=bins, weights=counts, log=True)\n", "plt.grid(True)\n", "plt.xlabel('DNS Query Lengths')\n", "plt.ylabel('Counts')" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Text(0, 0.5, 'Counts')" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "# Show histogram of the Spark DF request body lengths\n", "bins, counts = spark_df.select('answer_length').rdd.flatMap(lambda x: x).histogram(50)\n", "\n", "# This is a bit awkward but I believe this is the correct way to do it\n", "plt.hist(bins[:-1], bins=bins, weights=counts, log=True)\n", "plt.grid(True)\n", "plt.xlabel('DNS Answer Lengths')\n", "plt.ylabel('Counts')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "# Cleanup\n", "**Note:** This bit of cleanup code is no longer needed as the ZAT log_to_sparkdf now takes care of these things for us. :)\n", "\n", "There are two bits of cleanup that we MUST do:\n", "- Remove '.' from the column names (see Note:)\n", "- Drop NULLs\n", "\n", "**Note:** Yes you can do backticks when selecting the column names BUT some of the pipeline operations below will FAIL internally if the column names have a '.' in them." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "# Spark Pipelines\n", "A Spark pipeline is a way to combine a sequence of complex algorithms and transformations to create a workflow. Once a pipeline is created Spark can optimize that pipeline when it's executed.\n", "\n", "Below our pipeline consists of the following stages:\n", "- **String Indexer:** Takes our string columns and assigns an index to each unique string\n", "- **OneHotEncoder:** Takes our string index and maps it to a bit vector\n", "- **Normalization:** Converts our numeric data into a 0-1 range\n", "- **Assembler:** Combines the encoded categorical data and numerical data into a combined matrix\n", "\n", "\n", "For more information on the details of Categorical Type to One Hot Encoding see our SCP Labs [Encoding Dangers](https://nbviewer.jupyter.org/github/SuperCowPowers/scp-labs/blob/main/notebooks/Categorical_Encoding_Dangers.ipynb) notebook." ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "from pyspark.ml import Pipeline\n", "from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler\n", "\n", "cat_columns = ['qtype_name', 'proto']\n", "num_columns = ['query_length', 'answer_length']\n", "features = cat_columns + num_columns\n", "stages = []\n", "\n", "# String Indexer + One Hot Encoder (for categorical columns)\n", "for cat_col in cat_columns:\n", " string_indexer = StringIndexer(inputCol=cat_col, outputCol=cat_col + '_index')\n", " encoder = OneHotEncoder(inputCol=cat_col + '_index', outputCol=cat_col + '_onehot')\n", " stages += [string_indexer, encoder]\n", "\n", "# Run StandardScaler on all the numerical features\n", "num_vector = VectorAssembler(inputCols=num_columns, outputCol = 'num_features')\n", "norm = StandardScaler(inputCol='num_features', outputCol='num_features_norm')\n", "stages += [num_vector, norm]\n", "\n", "# Assemble the categorical (one hot vectors) and numeric columns together\n", "assembler_inputs = [c + \"_onehot\" for c in cat_columns] + ['num_features_norm']\n", "assembler = VectorAssembler(inputCols=assembler_inputs, outputCol='features')\n", "stages += [assembler]\n", "\n", "# Run the pipeline\n", "pipeline = Pipeline(stages=stages)\n", "pipelineModel = pipeline.fit(spark_df)\n", "spark_df = pipelineModel.transform(spark_df)" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------------------------------------------------------------------+\n", "|features |\n", "+------------------------------------------------------------------+\n", "|(16,[0,13,14,15],[1.0,1.0,2.280420188456751,0.07560960809619262]) |\n", "|(16,[0,13,14,15],[1.0,1.0,1.1858184979975104,0.07560960809619262])|\n", "|(16,[0,13,14,15],[1.0,1.0,1.4594689206123206,0.07560960809619262])|\n", "|(16,[0,13,14,15],[1.0,1.0,3.0101546487629114,0.07560960809619262])|\n", "|(16,[0,13,14,15],[1.0,1.0,1.3682521130740506,0.07560960809619262])|\n", "|(16,[0,13,14,15],[1.0,1.0,3.0101546487629114,0.07560960809619262])|\n", "|(16,[0,13,14,15],[1.0,1.0,1.4594689206123206,0.07560960809619262])|\n", "|(16,[0,13,14,15],[1.0,1.0,2.280420188456751,0.07560960809619262]) |\n", "|(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])|\n", "|(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])|\n", "|(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])|\n", "|(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])|\n", "|(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])|\n", "|(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])|\n", "|(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])|\n", "|(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])|\n", "|(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])|\n", "|(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])|\n", "|(16,[3,13,14,15],[1.0,1.0,2.462853803533291,0.07560960809619262]) |\n", "|(16,[2,13,14,15],[1.0,1.0,1.5506857281505906,0.07560960809619262])|\n", "+------------------------------------------------------------------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "spark_df.select('features').show(truncate = False)" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [], "source": [ "from pyspark.ml.clustering import KMeans\n", "\n", "# Train a k-means model.\n", "kmeans = KMeans().setK(40)\n", "model = kmeans.fit(spark_df)" ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Within Set Sum of Squared Errors = 6885.514765318761\n" ] } ], "source": [ "# Evaluate clustering by computing Within Set Sum of Squared Errors.\n", "wssse = model.computeCost(spark_df)\n", "print(\"Within Set Sum of Squared Errors = \" + str(wssse))" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+-----+----------+-----+\n", "|qtype_name|proto|prediction|count|\n", "+----------+-----+----------+-----+\n", "| TXT| tcp| 0| 135|\n", "| TXT| udp| 0|11149|\n", "| PTR| udp| 1|11809|\n", "| AXFR| tcp| 2| 87|\n", "| AXFR| tcp| 3| 78|\n", "| A| udp| 4|28609|\n", "| PTR| udp| 5|40370|\n", "| PTR| tcp| 5| 25|\n", "| A| udp| 6|55667|\n", "| NB| udp| 7|11374|\n", "| NB| udp| 8|20059|\n", "| AAAA| udp| 9| 2|\n", "| -| udp| 9| 180|\n", "| -| tcp| 9| 5|\n", "| AAAA| udp| 10| 9241|\n", "| *| udp| 11| 144|\n", "| AAAA| udp| 11| 71|\n", "| SRV| udp| 11|10419|\n", "| NAPTR| udp| 12| 27|\n", "| A| udp| 12|25369|\n", "| MX| udp| 12| 163|\n", "| NB| udp| 13|15787|\n", "| AAAA| udp| 14| 6062|\n", "| AXFR| tcp| 15| 68|\n", "| -| udp| 15| 37|\n", "| PTR| udp| 15| 48|\n", "| *| udp| 16| 52|\n", "| A| udp| 16| 6059|\n", "| PTR| udp| 17| 100|\n", "| TXT| udp| 17| 12|\n", "| PTR| tcp| 17| 1|\n", "| AXFR| tcp| 18| 107|\n", "| SOA| udp| 19| 31|\n", "| NB| udp| 19| 1920|\n", "| MX| udp| 20| 6|\n", "| AXFR| tcp| 20| 24|\n", "| *| udp| 20| 652|\n", "| HINFO| udp| 20| 30|\n", "| AAAA| udp| 21|14022|\n", "| AAAA| udp| 22| 3764|\n", "| -| tcp| 23| 163|\n", "| -| udp| 23| 3255|\n", "| SRV| udp| 24| 727|\n", "| A| udp| 25| 614|\n", "| *| udp| 25| 31|\n", "| AAAA| udp| 25| 38|\n", "| A| udp| 26| 5429|\n", "| TXT| udp| 27| 1445|\n", "| A| udp| 28|13479|\n", "| A| udp| 29|50330|\n", "+----------+-----+----------+-----+\n", "only showing top 50 rows\n", "\n" ] } ], "source": [ "# Lets look at some of the clustering results\n", "transformed = model.transform(spark_df).select(features + ['prediction'])\n", "transformed.groupby(cat_columns + ['prediction']).count().sort('prediction').show(50)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "# The clusters seem to look okay\n", "We can see that there's some natural grouping/clusters around the different qtype_names and protocpls but we also see that many of the query types/protocols are in several clusters... so lets take a closer look at the 'TXT' queries (Note: Replace 'TXT', with any other type and feel free to explore the other 'sub-clusters')" ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+-----+------------+-------------+----------+-----+\n", "|qtype_name|proto|query_length|answer_length|prediction|count|\n", "+----------+-----+------------+-------------+----------+-----+\n", "| TXT| udp| 12| 1| 0| 488|\n", "| TXT| udp| 9| 1| 0| 21|\n", "| TXT| udp| 12| 12| 0| 1|\n", "| TXT| tcp| 12| 5| 0| 106|\n", "| TXT| udp| 23| 1| 0| 12|\n", "| TXT| udp| 22| 1| 0| 62|\n", "| TXT| udp| 14| 17| 0| 1|\n", "| TXT| tcp| 12| 1| 0| 24|\n", "| TXT| udp| 12| 5| 0| 214|\n", "| TXT| udp| 14| 1| 0|10305|\n", "| TXT| udp| 13| 1| 0| 31|\n", "| TXT| udp| 24| 1| 0| 1|\n", "| TXT| tcp| 12| 12| 0| 5|\n", "| TXT| udp| 13| 6| 0| 13|\n", "| TXT| udp| 29| 32| 17| 6|\n", "| TXT| udp| 36| 33| 17| 6|\n", "| TXT| udp| 36| 1| 27| 9|\n", "| TXT| udp| 33| 1| 27| 2|\n", "| TXT| udp| 36| 22| 27| 2|\n", "| TXT| udp| 40| 1| 27| 347|\n", "| TXT| udp| 39| 1| 27| 98|\n", "| TXT| udp| 41| 1| 27| 436|\n", "| TXT| udp| 35| 1| 27| 16|\n", "| TXT| udp| 42| 1| 27| 532|\n", "| TXT| udp| 34| 1| 27| 3|\n", "| TXT| udp| 72| 1| 34| 4|\n", "| TXT| udp| 64| 1| 34| 2|\n", "| TXT| udp| 67| 1| 34| 2|\n", "| TXT| udp| 82| 11| 34| 6|\n", "| TXT| udp| 83| 1| 34| 2|\n", "| TXT| udp| 12| 33| 37| 22|\n", "| TXT| tcp| 12| 33| 37| 91|\n", "+----------+-----+------------+-------------+----------+-----+\n", "\n" ] } ], "source": [ "# Lets look at the 'TXT' query_name clusters\n", "txt_queries = transformed.where(transformed['qtype_name'] == 'TXT').groupby(features + ['prediction']).\\\n", "count().sort('prediction')\n", "txt_queries.show(50)" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [ { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "# The groupby/count produces a very small amount of data that \n", "# we can easily pull down and plot from our local client\n", "import pandas as pd\n", "spark.conf.set(\"spark.sql.execution.arrow.enabled\", \"true\")\n", "\n", "# Convert to Pandas (make sure it's small)\n", "txt_df = txt_queries.toPandas()\n", "\n", "# Now use dataframe group by cluster\n", "cluster_groups = txt_df.groupby('prediction')\n", "\n", "# Plot the Machine Learning results\n", "choices = ['red', 'green', 'blue', 'black', 'orange', 'purple', 'brown',\n", " 'pink', 'lightblue', 'grey', 'yellow']\n", "colors = {value: choices[index] for index, value in enumerate(txt_df['prediction'].unique())}\n", "\n", "fig, ax = plt.subplots()\n", "for key, group in cluster_groups:\n", " group.plot(ax=ax, kind='scatter', x='query_length', y='answer_length', alpha=0.5, s=250,\n", " label='Cluster: {:d}'.format(key), color=colors[key])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "## Interesting...\n", "So we gave the clustering algorithm both categorical types and numerical types and it seems to have done a reasonable job using both, we can see that the categorical types are clustered and then within the categorical clustering we have a set of 'sub-clusters' based on the numerical values." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "## Wrap Up\n", "Well that's it for this notebook, we pulled in Zeek log data from a Parquet file, then did some digging with high speed, parallel SQL operations and finally we clustered our data to organize the restuls.\n", "\n", "If you liked this notebook please visit the [ZAT](https://github.com/SuperCowPowers/zat) project for more notebooks and examples.\n", "\n", "## About SuperCowPowers\n", "The company was formed so that its developers could follow their passion for Python, streaming data pipelines and having fun with data analysis. We also think cows are cool and should be superheros or at least carry around rayguns and burner phones. Visit SuperCowPowers" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.4" } }, "nbformat": 4, "nbformat_minor": 2 }