{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "# Bro to Spark: Clustering\n", "In this notebook we will pull Bro data into Spark then do some analysis and clustering. The first step is to convert your Bro log data into a Parquet file, for instructions on how to do this (just a few lines of Python code using the BAT package) please see this notebook:\n", "\n", "\n", "\n", "### How to convert Zeek/Bro log to Parquet Notebook\n", "- [Bro to Spark (and Parquet)](https://github.com/SuperCowPowers/bat/blob/master/notebooks/Bro_to_Spark.ipynb)\n", "\n", "Apache Parquet is a columnar storage format focused on performance. Parquet data is often used within the Hadoop ecosystem and we will specifically be using it for loading data into Spark.\n", "\n", "\n", "\n", "\n", "### Software\n", "- Bro Analysis Tools (BAT): https://github.com/SuperCowPowers/bat\n", "- Parquet: https://parquet.apache.org\n", "- Spark: https://spark.apache.org\n", "- Spark MLLib: https://spark.apache.org/mllib/\n", "\n", "### Data\n", "- Sec Repo: http://www.secrepo.com (no headers on these)\n", "- SuperCowPowers: [data.kitware.com](https://data.kitware.com/#collection/58d564478d777f0aef5d893a) (with headers)" ] }, { "cell_type": "code", "execution_count": 102, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "BAT: 0.2.9\n", "PySpark: 2.2.0\n", "PyArrow: 0.6.0\n" ] } ], "source": [ "# Third Party Imports\n", "import pyspark\n", "from pyspark.sql import SparkSession\n", "import pyarrow\n", "\n", "# Local imports\n", "import bat\n", "from bat.log_to_parquet import log_to_parquet\n", "\n", "# Good to print out versions of stuff\n", "print('BAT: {:s}'.format(bat.__version__))\n", "print('PySpark: {:s}'.format(pyspark.__version__))\n", "print('PyArrow: {:s}'.format(pyarrow.__version__))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "# Spark It!\n", "### Spin up Spark with 4 Parallel Executors\n", "Here we're spinning up a local spark server with 4 parallel executors, although this might seem a bit silly since we're probably running this on a laptop, there are a couple of important observations:\n", "\n", "\n", "\n", "- If you have 4/8 cores use them!\n", "- It's the exact same code logic as if we were running on a distributed cluster.\n", "- We run the same code on **DataBricks** (www.databricks.com) which is awesome BTW.\n", "\n" ] }, { "cell_type": "code", "execution_count": 103, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Spin up a local Spark Session (with 4 executors)\n", "spark = SparkSession.builder.master(\"local[4]\").appName('my_awesome').getOrCreate()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## Read in our Parquet File\n", "Here we're loading in a Bro HTTP log with ~2 million rows to demonstrate the functionality and do some analysis and clustering on the data. For more information on converting Bro logs to Parquet files please see our Bro to Parquet notebook:\n", "\n", "#### Bro logs to Parquet Notebook\n", "- [Bro to Parquet to Spark](https://github.com/SuperCowPowers/bat/blob/master/notebooks/Bro_to_Parquet_to_Spark.ipynb)" ] }, { "cell_type": "code", "execution_count": 104, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Have Spark read in the Parquet File\n", "spark_df = spark.read.parquet(\"dns.parquet\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "# Lets look at our data\n", "We should always inspect out data when it comes in. Look at both the data values and the data types to make sure you're getting exactly what you should be." ] }, { "cell_type": "code", "execution_count": 105, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Number of Rows: 427935\n", "Columns: AA,RA,RD,TC,TTLs,Z,answers,id.orig_h,id.orig_p,id.resp_h,id.resp_p,proto,qclass,qclass_name,qtype,qtype_name,query,rcode,rcode_name,rejected,trans_id,uid,ts\n" ] } ], "source": [ "# Get information about the Spark DataFrame\n", "num_rows = spark_df.count()\n", "print(\"Number of Rows: {:d}\".format(num_rows))\n", "columns = spark_df.columns\n", "print(\"Columns: {:s}\".format(','.join(columns)))" ] }, { "cell_type": "code", "execution_count": 106, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+-----+------+\n", "|qtype_name|proto| count|\n", "+----------+-----+------+\n", "| A| udp|212473|\n", "| NB| udp| 77199|\n", "| AAAA| udp| 54519|\n", "| PTR| udp| 52991|\n", "| TXT| udp| 12644|\n", "| SRV| udp| 12268|\n", "| -| udp| 3472|\n", "| *| udp| 882|\n", "| AXFR| tcp| 440|\n", "| SOA| udp| 346|\n", "| TXT| tcp| 226|\n", "| -| tcp| 176|\n", "| MX| udp| 169|\n", "| NS| udp| 43|\n", "| HINFO| udp| 30|\n", "| NAPTR| udp| 27|\n", "| PTR| tcp| 26|\n", "| A| tcp| 4|\n", "+----------+-----+------+\n", "\n" ] } ], "source": [ "spark_df.groupby('qtype_name','proto').count().sort('count', ascending=False).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "# Data looks good, lets take a deeper dive\n", "Spark has a powerful SQL engine as well as a Machine Learning library. So now that we've loaded our Bro data we're going to utilize the Spark SQL commands to do some investigation of our data including clustering from the MLLib.\n", "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 107, "metadata": {}, "outputs": [], "source": [ "# Add a column with the string length of the DNS query\n", "from pyspark.sql.functions import col, length\n", "\n", "# Create new dataframe that includes two new column\n", "spark_df = spark_df.withColumn('query_length', length(col('query')))\n", "spark_df = spark_df.withColumn('answer_length', length(col('answers')))" ] }, { "cell_type": "code", "execution_count": 108, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Plotting defaults\n", "%matplotlib inline\n", "import matplotlib.pyplot as plt\n", "from bat.utils import plot_utils\n", "plot_utils.plot_defaults()" ] }, { "cell_type": "code", "execution_count": 109, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "