{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "# Zeek to Kafka to Spark\n", "This notebook covers how to stream Zeek data into Spark using Kafka as a message queue. The setup takes a bit of work but the result will be a nice scalable, robust way to process and analyze streaming data from Zeek.\n", "\n", "For getting started with Spark (without Kafka) you can view this notebook:\n", "- [Zeek to Spark](https://nbviewer.jupyter.org/github/SuperCowPowers/zat/blob/main/notebooks/Zeek_to_Spark.ipynb)\n", "\n", "
\n", "
\n", "\n", "### Software\n", "- Zeek Network Monitor: https://www.zeek.org\n", "- Kafka Zeek Plugin: https://github.com/apache/metron-bro-plugin-kafka\n", "- Kafka: https://kafka.apache.org\n", "- Spark: https://spark.apache.org" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Getting Zeek to Kafka Setup\n", "We have an entire notebook on getting the Kafka plugin for Zeek setup.\n", " - [Zeek to Kafka (Part 1: Streaming data pipeline)](https://nbviewer.jupyter.org/github/SuperCowPowers/zat/blob/main/notebooks/Zeek_to_Kafka.ipynb)\n", " \n", "Completing the 'Zeek to Kafka' notebook will ensure your zeek instance with Kafka plugin is up and running. Once that's complete you're ready for the next phase of our Streaming Data Pipeline (Spark)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Part 2: Streaming data pipeline\n", "To set some context, our long term plan is to build out a streaming data pipeline. This notebook is the second phase of our streaming pipeline architecture. So our network data pipeline looks conceptually like this.\n", "
\n", "\n", "- Kafka Plugin for Zeek\n", "- Publish (provides a nice decoupled architecture)\n", "- **Subscribe to whatever feed you want (http, dns, conn, x509...)**\n", "- **ETL (Extract Transform Load) on the raw message data (parsed data with types)**\n", "- **Perform Filtering/Aggregation**\n", "- Data Analysis and Machine Learning" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "# Structured Streaming in Spark\n", "Structured Streaming is the new hotness with Spark. Michael Armbrust from DataBricks gave a great talk at Spark Summit 2017 on Structured Streaming:\n", "- https://www.youtube.com/watch?v=8o-cyjMRJWg\n", "\n", "There's also a good example on the DataBricks blog:\n", "- https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html\n" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "PySpark: 2.4.4\n" ] } ], "source": [ "import pyspark\n", "from pyspark.sql import SparkSession\n", "\n", "# Always good to print out versions of libraries\n", "print('PySpark: {:s}'.format(pyspark.__version__))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "# Spark It!\n", "### Spin up Spark with 4 Parallel Executors\n", "Here we're spinning up a local spark server with 4 parallel executors, although this might seem a bit silly since we're probably running this on a laptop, there are a couple of important observations:\n", "\n", "
\n", "\n", "- If you have 4/8 cores use them!\n", "- It's the exact same code logic as if we were running on a distributed cluster.\n", "- We run the same code on **DataBricks** (www.databricks.com) which is awesome BTW." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "# Spin up a local Spark Session (with 4 executors)\n", "spark = SparkSession.builder.master('local[4]').appName('my_awesome')\\\n", " .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4')\\\n", " .getOrCreate()\n", "spark.sparkContext.setLogLevel('ERROR')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Loading the Kafka package\n", "In the Spark builder call above we have added the Kafka package as part of the session creation. There are two important things of note:\n", "1. The version at the end (2.4.4) must match the current Spark version.\n", "1. The latest package is ```spark-sql-kafka-0-10_2.12```, we've had no luck with that version in our local testing, it would crash during the 'readStream' call below so we've reverted to the ```spark-sql-kafka-0-10_2.11``` version." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "# Sidebar: Checkout Apache Arrow\n", "For all kinds of reasons, multi-core pipelines, cross language storage, basically it will improve and enable flexible/performant data analysis and machine learning pipelines.\n", "- Apache Arrow: https://arrow.apache.org" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "# Optimize the conversion to Spark\n", "spark.conf.set(\"spark.sql.execution.arrow.enable\", \"true\")" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "# SUBSCRIBE: Setup connection to Kafka Stream \n", "raw_data = spark.readStream.format('kafka') \\\n", " .option('kafka.bootstrap.servers', 'localhost:9092') \\\n", " .option('subscribe', 'dns') \\\n", " .option('startingOffsets', 'latest') \\\n", " .load()\n", "\n", "# Notes:\n", "# Using 'latest' for the 'startingOffsets' option will give only 'new' live data.\n", "# We could also use the value 'earliest' and that would give us everything Kafka has" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "# ETL: Hardcoded Schema for DNS records (do this better later)\n", "from pyspark.sql.types import StructType, StringType, BooleanType, IntegerType\n", "from pyspark.sql.functions import from_json, to_json, col, struct, udf\n", "\n", "dns_schema = StructType() \\\n", " .add('ts', StringType()) \\\n", " .add('uid', StringType()) \\\n", " .add('id.orig_h', StringType()) \\\n", " .add('id.orig_p', IntegerType()) \\\n", " .add('id.resp_h', StringType()) \\\n", " .add('id.resp_p', IntegerType()) \\\n", " .add('proto', StringType()) \\\n", " .add('trans_id', IntegerType()) \\\n", " .add('query', StringType()) \\\n", " .add('qclass', IntegerType()) \\\n", " .add('qclass_name', StringType()) \\\n", " .add('qtype', IntegerType()) \\\n", " .add('qtype_name', StringType()) \\\n", " .add('rcode', IntegerType()) \\\n", " .add('rcode_name', StringType()) \\\n", " .add('AA', BooleanType()) \\\n", " .add('TC', BooleanType()) \\\n", " .add('RD', BooleanType()) \\\n", " .add('RA', BooleanType()) \\\n", " .add('Z', IntegerType()) \\\n", " .add('answers', StringType()) \\\n", " .add('TTLs', StringType()) \\\n", " .add('rejected', BooleanType())" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "# ETL: Convert raw data into parsed and proper typed data\n", "parsed_data = raw_data \\\n", " .select(from_json(col(\"value\").cast(\"string\"), dns_schema).alias('data')) \\\n", " .select('data.*')\n", "\n", "# FILTER: Only get DNS records that have 'query' field filled out\n", "filtered_data = parsed_data.filter(parsed_data.query.isNotNull() & (parsed_data.query!='')==True)\n", "\n", "# FILTER 2: Remove Local/mDNS queries\n", "filtered_data = filtered_data.filter(~filtered_data.query.like('%.local')) # Note: using the '~' negation operator" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "# Helper method that allows us to compute the 2nd level domain\n", "import tldextract\n", "\n", "def compute_domain(query):\n", " # Pull out the domain\n", " if query.endswith('.local'):\n", " return 'local'\n", " return tldextract.extract(query).registered_domain if query else None" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "# COMPUTE: A new column with the 2nd level domain extracted from the query\n", "udf_compute_domain = udf(compute_domain, StringType())\n", "computed_data = filtered_data.withColumn('domain', udf_compute_domain('query'))\n", "\n", "# AGGREGATE: In this case a simple groupby operation\n", "group_data = computed_data.groupBy('`id.orig_h`', 'domain', 'qtype_name').count()" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- id.orig_h: string (nullable = true)\n", " |-- domain: string (nullable = true)\n", " |-- qtype_name: string (nullable = true)\n", " |-- count: long (nullable = false)\n", "\n" ] } ], "source": [ "# At any point in the pipeline you can see what you're getting out\n", "group_data.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Streaming pipeline output to an in-memory table\n", "Now, for demonstration and discussion purposes, we're going to pull the end of the pipeline back into memory to inspect the output. A couple of things to note explicitly here:\n", "\n", "- Writing a stream to memory is dangerous and should be done only on small data. Since this is aggregated output we know it's going to be small.\n", "\n", "- The queryName param used below will be the name of the in-memory table." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "# Take the end of our pipeline and pull it into memory\n", "dns_count_memory_table = group_data.writeStream.format('memory') \\\n", " .queryName('dns_counts') \\\n", " .outputMode('complete') \\\n", " .start()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "# Streaming Query/Table: Looking Deeper\n", "Note: The in-memory table above is **dynamic**. So as the streaming data pipeline continues to process data the table contents will change. Below we make two of the **same** queries and as more data streams in the results will change." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n", "DNS Query Total Counts = 1\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
id.orig_hdomainqtype_namecount
0192.168.1.7toggl.comA1
\n", "
" ], "text/plain": [ " id.orig_h domain qtype_name count\n", "0 192.168.1.7 toggl.com A 1" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Create a Pandas Dataframe by querying the in memory table and converting\n", "dns_counts_df = spark.sql(\"select * from dns_counts\").toPandas()\n", "print('\\nDNS Query Total Counts = {:d}'.format(dns_counts_df['count'].sum()))\n", "dns_counts_df.sort_values(ascending=False, by='count')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "# Same Query with Updated Results\n", "Now we run the same query as above and since the streaming pipeline continues to process new incoming data the in-memory table will **dynamically** update." ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n", "DNS Query Total Counts = 20\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
id.orig_hdomainqtype_namecount
5192.168.1.7google.comA3
11192.168.1.7stackoverflow.comA2
14192.168.1.7doubleclick.netA2
0192.168.1.7t-mobile.comA1
1192.168.1.7google-analytics.comA1
2192.168.1.7googlesyndication.comA1
3192.168.1.7quantserve.comA1
4192.168.1.7gravatar.comA1
6192.168.1.7imgur.comA1
7192.168.1.7googleapis.comA1
8192.168.1.7githubusercontent.comA1
9192.168.1.7googleusercontent.comA1
10192.168.1.7toggl.comA1
12192.168.1.7gstatic.comA1
13192.168.1.7scorecardresearch.comA1
15192.168.1.7stackexchange.comA1
\n", "
" ], "text/plain": [ " id.orig_h domain qtype_name count\n", "5 192.168.1.7 google.com A 3\n", "11 192.168.1.7 stackoverflow.com A 2\n", "14 192.168.1.7 doubleclick.net A 2\n", "0 192.168.1.7 t-mobile.com A 1\n", "1 192.168.1.7 google-analytics.com A 1\n", "2 192.168.1.7 googlesyndication.com A 1\n", "3 192.168.1.7 quantserve.com A 1\n", "4 192.168.1.7 gravatar.com A 1\n", "6 192.168.1.7 imgur.com A 1\n", "7 192.168.1.7 googleapis.com A 1\n", "8 192.168.1.7 githubusercontent.com A 1\n", "9 192.168.1.7 googleusercontent.com A 1\n", "10 192.168.1.7 toggl.com A 1\n", "12 192.168.1.7 gstatic.com A 1\n", "13 192.168.1.7 scorecardresearch.com A 1\n", "15 192.168.1.7 stackexchange.com A 1" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Create a Pandas Dataframe by querying the in memory table and converting\n", "dns_counts_df = spark.sql(\"select * from dns_counts\").toPandas()\n", "print('\\nDNS Query Total Counts = {:d}'.format(dns_counts_df['count'].sum()))\n", "dns_counts_df.sort_values(ascending=False, by='count')" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "# We should stop our streaming pipeline when we're done\n", "dns_count_memory_table.stop()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Part 2: Streaming data pipeline\n", "Recall that our long term plan is to build out a streaming data pipeline. This notebook has covered the steps in **bold** of our growing network data pipeline.\n", "\n", "
\n", "\n", "- Kafka Plugin for Zeek\n", "- Publish (provides a nice decoupled architecture)\n", "- **Subscribe to whatever feed you want (http, dns, conn, x509...)**\n", "- **ETL (Extract Transform Load) on the raw message data (parsed data with types)**\n", "- **Perform Filtering/Aggregation**\n", "- Data Analysis and Machine Learning\n", "\n", "### Software\n", "- Zeek Network Monitor: https://www.zeek.org\n", "- Kafka Zeek Plugin: https://github.com/apache/metron-bro-plugin-kafka\n", "- Kafka: https://kafka.apache.org\n", "- Spark: https://spark.apache.org\n", "\n", "\n", "\n", "## Wrap Up\n", "Well that's it for this notebook, we know this ended before we got to the **exciting** part of the streaming data pipeline. For this notebook we showed everything in the pipeline up to aggregation. In future notebooks we'll dive into the deep end of our pipeline and cover the data analysis and machine learning aspects of Spark.\n", "\n", "If you liked this notebook please visit the [zat](https://github.com/SuperCowPowers/zat) project for more notebooks and examples.\n", "\n", "## About SuperCowPowers\n", "The company was formed so that its developers could follow their passion for Python, streaming data pipelines and having fun with data analysis. We also think cows are cool and should be superheros or at least carry around rayguns and burner phones. Visit SuperCowPowers\n", "\n", "\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.4" } }, "nbformat": 4, "nbformat_minor": 2 }