<div style="float: right; margin: 20px 20px 20px 20px"><img src="images/bro.png" width="100px"></div>

# Zeek to Spark
In this notebook will show how easy it is to load up really big Zeek/Zeek logs by using the classes within the Zeek Analysis Tools. We'll also show converting a Zeek log into a Parquet file in one line of code. 

<div style="float: right; margin: 30px 0px 0px 0px"><img src="images/spark.png" width="200px"></div>

### Software
- Zeek Analysis Tools (ZAT): https://github.com/SuperCowPowers/zat
- Parquet: https://parquet.apache.org
- Spark: https://spark.apache.org

### Data
- HTTP log with ~2 million rows, to show speed/simplicity and simple Spark processing
- Grab the Data: https://data.kitware.com/#collection/58d564478d777f0aef5d893a

<div style="float: right; margin: -80px 0px 0px 0px"><img src="images/parquet.png" width="250px"></div>

In [1]:
# Third Party Imports
import pyspark
from pyspark.sql import SparkSession

# Local imports
import zat
from zat import log_to_sparkdf

# Good to print out versions of stuff
print('ZAT: {:s}'.format(zat.__version__))
print('PySpark: {:s}'.format(pyspark.__version__))

ZAT: 0.3.7
PySpark: 2.4.4


<div style="float: right; margin: 20px 20px 20px 20px"><img src="images/spark.png" width="200px"></div>

# Spark It!
### Spin up Spark with 4 Parallel Executors
Here we're spinning up a local spark server with 4 parallel executors, although this might seem a bit silly since we're probably running this on a laptop, there are a couple of important observations:

<div style="float: right; margin: 20px 20px 20px 20px"><img src="images/spark_jobs.png" width="400px"></div>

- If you have 4/8 cores use them!
- It's the exact same code logic as if we were running on a distributed cluster.
- We run the same code on **DataBricks** (www.databricks.com) which is awesome BTW.



In [2]:
# Spin up a local Spark Session (with 4 executors)
spark = SparkSession.builder.master('local[4]').appName('my_awesome').getOrCreate()

In [3]:
# Use the ZAT class to load our log file into a Spark dataframe (2 lines of code!)
spark_it = log_to_sparkdf.LogToSparkDF(spark)
spark_df = spark_it.create_dataframe('/Users/briford/data/bro/http.log')

<div style="float: right; margin: 0px 0px 0px -80px"><img src="images/spark_distributed.png" width="500px"></div>

# Spark Workers and Data Partitions
Spark will read in and partition the data out to our workers. Our dataframe(rdd) will have some number of partitions that are divided up amongst the worker pool. Each worker will operate on only a subset of the data and Spark will manage the 'magic' for how that work gets run, aggregated and presented.


**Image Credit:** Jacek Laskowski, please see his excellent book - Mastering Apache Spark  https://jaceklaskowski.gitbooks.io/mastering-apache-spark

In [4]:
spark_df.rdd.getNumPartitions()

11

<div style="float: left; margin: 20px 20px 20px 20px"><img src="images/nuked_crop.jpg" width="150px"></div>


# Light it Up!
Here we're going to demonstrate just a few simple Spark operations but obviously you now have the full power of the Death Star in your hands.

<div style="float: left; margin: 0px 0px 0px 50px"><img src="images/spark_sql.jpg" width="150px"></div>
<div style="float: left; margin: -20px 50px 0px 0px"><img src="images/mllib.png" width="150px"></div>

In [5]:
# Get information about the Spark DataFrame
num_rows = spark_df.count()
print("Number of Rows: {:d}".format(num_rows))
columns = spark_df.columns
print("Columns: {:s}".format(','.join(columns)))

Number of Rows: 2048442
Columns: ts,uid,id_orig_h,id_orig_p,id_resp_h,id_resp_p,trans_depth,method,host,uri,referrer,user_agent,request_body_len,response_body_len,status_code,status_msg,info_code,info_msg,filename,tags,username,password,proxied,orig_fuids,orig_mime_types,resp_fuids,resp_mime_types


In [6]:
spark_df.select(['id_orig_h', 'host', 'uri', 'status_code', 'user_agent']).show(5)

+--------------+---------------+--------------+-----------+--------------------+
|     id_orig_h|           host|           uri|status_code|          user_agent|
+--------------+---------------+--------------+-----------+--------------------+
|192.168.202.79|192.168.229.251|/DEASLog02.nsf|        404|Mozilla/5.0 (comp...|
|192.168.202.79|192.168.229.251|/DEASLog03.nsf|        404|Mozilla/5.0 (comp...|
|192.168.202.79|192.168.229.251|/DEASLog04.nsf|        404|Mozilla/5.0 (comp...|
|192.168.202.79|192.168.229.251|/DEASLog05.nsf|        404|Mozilla/5.0 (comp...|
|192.168.202.79|192.168.229.251|  /DEASLog.nsf|        404|Mozilla/5.0 (comp...|
+--------------+---------------+--------------+-----------+--------------------+
only showing top 5 rows



In [7]:
spark_df.groupby('method','status_code').count().sort('count', ascending=False).show()

+-------+-----------+-------+
| method|status_code|  count|
+-------+-----------+-------+
|   HEAD|        404|1294022|
|    GET|        404| 429283|
|   POST|        200| 125638|
|    GET|        200|  88631|
|   POST|          0|  32918|
|    GET|        400|  29152|
|    GET|        303|  10858|
|    GET|        403|   8530|
|   POST|        404|   4277|
|    GET|        304|   3851|
|    GET|        302|   3250|
|    GET|          0|   2906|
|    GET|        401|   2159|
|OPTIONS|        200|   1897|
|   POST|        302|   1226|
|   HEAD|        503|   1010|
|   POST|        206|    869|
|    GET|        301|    642|
|   HEAD|          0|    606|
|    GET|        503|    550|
+-------+-----------+-------+
only showing top 20 rows



<div style="float: right; margin: 30px 0px 0px 0px"><img src="images/parquet.png" width="400px"></div>

# What about Parquet files?
Apache Parquet is a columnar storage format focused on performance. Parquet data is often used within the Hadoop ecosystem and converting your Zeek/Zeek log to a Parquet file is one line of code!

In [8]:
# DataFrames can be saved as Parquet files, maintaining the schema information.
spark_df.write.parquet('http.parquet')

In [9]:
# Have Spark read in the Parquet File
spark_df = spark.read.parquet("http.parquet")

<div style="float: right; margin: 20px 20px 20px 20px"><img src="images/compressed.jpeg" width="300px"></div>

# Parquet files are compressed
Here we see the first benefit of Parquet which stores data with compressed columnar format. There are several compression options available (including uncompressed).

## Original http.log = 1.3 GB 
## http.parquet = ~100 MB (multi-file)

<div style="float: right; margin: 20px 20px 20px 20px"><img src="images/fast.jpg" width="350px"></div>

# Did we mention fast?
The query below was executed on 4 workers. The data contains over 2 million HTTP requests/responses and the time to complete was a **fraction of a second** running on my Mac Laptop :)

In [11]:
%time spark_df.groupby('method','status_code').count().sort('count', ascending=False).show()

+-------+-----------+-------+
| method|status_code|  count|
+-------+-----------+-------+
|   HEAD|        404|1294022|
|    GET|        404| 429283|
|   POST|        200| 125638|
|    GET|        200|  88631|
|   POST|          0|  32918|
|    GET|        400|  29152|
|    GET|        303|  10858|
|    GET|        403|   8530|
|   POST|        404|   4277|
|    GET|        304|   3851|
|    GET|        302|   3250|
|    GET|          0|   2906|
|    GET|        401|   2159|
|OPTIONS|        200|   1897|
|   POST|        302|   1226|
|   HEAD|        503|   1010|
|   POST|        206|    869|
|    GET|        301|    642|
|   HEAD|          0|    606|
|    GET|        503|    550|
+-------+-----------+-------+
only showing top 20 rows

CPU times: user 3.05 ms, sys: 1.41 ms, total: 4.46 ms
Wall time: 369 ms


<div style="float: right; margin: 50px 0px 0px 20px"><img src="images/deep_dive.jpeg" width="350px"></div>

# Data looks good, lets take a deeper dive
Spark has a powerful SQL engine as well as a Machine Learning library. So now that we've got the data loaded into a Spark Dataframe we're going to utilize Spark SQL commands to do some investigation and clustering using the Spark MLLib. For this deeper dive we're going to go to another notebook :)

### Spark Clustering Notebook
- [Zeek Spark Clustering](https://nbviewer.jupyter.org/github/SuperCowPowers/zat/blob/main/notebooks/Spark_Clustering.ipynb)

<div style="float: left; margin: 0px 0px 0px 0px"><img src="images/spark_sql.jpg" width="150px"></div>
<div style="float: left; margin: -20px 50px 0px 0px"><img src="images/mllib.png" width="150px"></div>

<img align="right" style="padding:20px" src="images/SCP_med.png" width="180">

## Wrap Up
Well that's it for this notebook, we went from a Zeek log to a high performance Parquet file and then did some digging with high speed, parallel SQL and groupby operations.

If you liked this notebook please visit the [ZAT](https://github.com/SuperCowPowers/zat) project for more notebooks and examples.

## About SuperCowPowers
The company was formed so that its developers could follow their passion for Python, streaming data pipelines and having fun with data analysis. We also think cows are cool and should be superheros or at least carry around rayguns and burner phones. <a href="https://www.supercowpowers.com" target="_blank">Visit SuperCowPowers</a>