{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "# Zeek to Parquet\n", "In this notebook will show how easy it is to load up really big Zeek logs by using the classes within the Zeek Analysis Tools, convert it to a Parquet file, and do some Spark analysis. \n", "\n", "
\n", "\n", "### Software\n", "- Zeek Analysis Tools (ZAT): https://github.com/SuperCowPowers/zat\n", "- Parquet: https://parquet.apache.org\n", "- Spark: https://spark.apache.org\n", "\n", "### Data\n", "- Conn log with ~23 million rows, to show speed/simplicity and do some simple Spark processing\n", "- Grab the data from here: https://data.kitware.com/#collection/58d564478d777f0aef5d893a\n", "\n", "
" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "ZAT: 0.3.7\n", "PySpark: 2.4.4\n" ] } ], "source": [ "# Third Party Imports\n", "import pyspark\n", "from pyspark.sql import SparkSession\n", "\n", "# Local imports\n", "import zat\n", "from zat import log_to_sparkdf\n", "\n", "# Good to print out versions of stuff\n", "print('ZAT: {:s}'.format(zat.__version__))\n", "print('PySpark: {:s}'.format(pyspark.__version__))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "# Spark It!\n", "### Spin up Spark with 4 Parallel Executors\n", "Here we're spinning up a local spark server with 4 parallel executors, although this might seem a bit silly since we're probably running this on a laptop, there are a couple of important observations:\n", "\n", "
\n", "\n", "- If you have 4/8 cores use them!\n", "- It's the exact same code logic as if we were running on a distributed cluster.\n", "- We run the same code on **DataBricks** (www.databricks.com) which is awesome BTW.\n", "\n" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "# Spin up a local Spark Session (with 4 executors)\n", "spark = SparkSession.builder.master('local[4]').appName('my_awesome').getOrCreate()" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "# Use the ZAT class to load our log file into a Spark dataframe (2 lines of code!)\n", "spark_it = log_to_sparkdf.LogToSparkDF(spark)\n", "spark_df = spark_it.create_dataframe('/Users/briford/data/bro/conn.log')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "# Spark Workers and Data Partitions\n", "Spark will read in and partition the data out to our workers. Our dataframe(rdd) will have some number of partitions that are divided up amongst the worker pool. Each worker will operate on only a subset of the data and Spark will manage the 'magic' for how that work gets run, aggregated and presented.\n", "\n", "\n", "**Image Credit:** Jacek Laskowski, please see his excellent book - Mastering Apache Spark https://jaceklaskowski.gitbooks.io/mastering-apache-spark" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "21" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark_df.rdd.getNumPartitions()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "# Convert my Zeek logs to Parquet files\n", "Apache Parquet is a columnar storage format focused on performance. Here's we going to convert our Zeek/Zeek log to a Parquet file is one line of code. The conversion is super scalable since we're using spark distributed executors to do the conversions." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "# DataFrames can be saved as Parquet files, maintaining the schema information.\n", "spark_df.write.parquet('conn.parquet', compression='gzip')" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "# Have Spark read in the Parquet File\n", "spark_df = spark.read.parquet('conn.parquet')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "# Parquet files are compressed\n", "Here we see the first benefit of Parquet which stores data with compressed columnar format. There are several compression options available (including uncompressed).\n", "\n", "## Original conn.log = 2.5 GB \n", "## conn.parquet = ~420MB" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "\n", "# Light it Up!\n", "Now that we have our Parquet data loaded into Spark, we're going to demonstrate just a few simple Spark operations but obviously you now have the full power of the Death Star in your hands.\n", "\n", "
\n", "
" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Number of Rows: 22694356\n", "Columns: ts,uid,id_orig_h,id_orig_p,id_resp_h,id_resp_p,proto,service,duration,orig_bytes,resp_bytes,conn_state,local_orig,missed_bytes,history,orig_pkts,orig_ip_bytes,resp_pkts,resp_ip_bytes,tunnel_parents\n" ] } ], "source": [ "# Get information about the Spark DataFrame\n", "num_rows = spark_df.count()\n", "print(\"Number of Rows: {:d}\".format(num_rows))\n", "columns = spark_df.columns\n", "print(\"Columns: {:s}\".format(','.join(columns)))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "# Did we mention fast?\n", "The query below was executed on 4 workers. The data contains over 22 million Zeek conn log entries and the time to complete was a **fraction of a second** running on my Mac Laptop :)" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+--------+------+\n", "|proto| service| count|\n", "+-----+--------+------+\n", "| tcp| http|445214|\n", "| udp| dns|160559|\n", "| tcp| ssl| 49017|\n", "| tcp| ssh| 4778|\n", "| udp| dhcp| 3052|\n", "| tcp|ftp-data| 2880|\n", "| tcp| ftp| 2675|\n", "| tcp| dns| 706|\n", "| tcp| smtp| 194|\n", "| tcp| pop3| 2|\n", "+-----+--------+------+\n", "\n", "482 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)\n" ] } ], "source": [ "# Lets look at some 'service' breakdowns in our Zeek conn log\n", "spark_df = spark_df.filter(spark_df['service'] != '-')\n", "%timeit -r 1 -n 1 spark_df.groupby('proto','service').count().sort('count', ascending=False).show() " ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---------------+---------+-------+-----------+\n", "| id_orig_h|id_resp_p|service|total_bytes|\n", "+---------------+---------+-------+-----------+\n", "| 192.168.202.88| 22| ssh| 491259422|\n", "| 192.168.203.63| 80| http| 381506783|\n", "|192.168.202.102| 80| http| 80956460|\n", "| 192.168.202.79| 80| http| 54699732|\n", "| 192.168.202.81| 22| ssh| 42247132|\n", "|192.168.202.110| 80| http| 37952120|\n", "|192.168.202.118| 80| http| 18731116|\n", "|192.168.202.110| 443| ssl| 17883212|\n", "| 192.168.202.95| 22| ssh| 13947240|\n", "| 192.168.202.96| 80| http| 11871726|\n", "|192.168.202.138| 80| http| 10689231|\n", "| 192.168.202.65| 443| ssl| 8550078|\n", "|192.168.202.140| 80| http| 7860895|\n", "| 192.168.204.45| 55553| ssl| 6489031|\n", "|192.168.202.110| 8080| http| 5595350|\n", "|192.168.202.140| 443| ssl| 4883939|\n", "|192.168.202.125| 80| http| 4289446|\n", "|192.168.202.141| 80| http| 4248981|\n", "| 192.168.28.203| 22| ssh| 3656175|\n", "| 192.168.203.64| 55553| ssl| 3510471|\n", "+---------------+---------+-------+-----------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "# Lets look at our individual hosts, group by ports/services and\n", "# sum up the bytes transferred from the originating host\n", "import pyspark.sql.functions as sf \n", "spark_df.groupby('id_orig_h','id_resp_p','service') \\\n", " .agg(sf.sum('orig_bytes').alias('total_bytes')) \\\n", " .sort('total_bytes', ascending=False).show(20)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "# Data looks good, lets take a deeper dive\n", "Spark has a powerful SQL engine as well as a Machine Learning library. So now that we've got the data loaded into a Spark Dataframe we're going to utilize Spark SQL commands to do some investigation and clustering using the Spark MLLib. For this deeper dive we're going to go to another notebook :)\n", "\n", "### Spark Clustering Notebook\n", "- [Zeek Spark Clustering](https://nbviewer.jupyter.org/github/SuperCowPowers/zat/blob/main/notebooks/Spark_Clustering.ipynb)\n", "\n", "
\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "## Wrap Up\n", "Well that's it for this notebook, we went from a Zeek log to a high performance Parquet file and then did some digging with high speed, parallel SQL and groupby operations.\n", "\n", "If you liked this notebook please visit the [ZAT](https://github.com/SuperCowPowers/zat) project for more notebooks and examples.\n", "\n", "## About SuperCowPowers\n", "The company was formed so that its developers could follow their passion for Python, streaming data pipelines and having fun with data analysis. We also think cows are cool and should be superheros or at least carry around rayguns and burner phones. Visit SuperCowPowers" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.4" } }, "nbformat": 4, "nbformat_minor": 2 }