{"cells":[{"cell_type":"markdown","source":["## SQL at Scale with Spark SQL and DataFrames\n\nSpark SQL brings native support for SQL to Spark and streamlines the process of querying data stored both in RDDs (Spark’s distributed datasets) and in external sources. Spark SQL conveniently blurs the lines between RDDs and relational tables. Unifying these powerful abstractions makes it easy for developers to intermix SQL commands querying external data with complex analytics, all within in a single application. Concretely, Spark SQL will allow developers to:\n\n- Import relational data from Parquet files and Hive tables\n- Run SQL queries over imported data and existing RDDs\n- Easily write RDDs out to Hive tables or Parquet files\n\nSpark SQL also includes a cost-based optimizer, columnar storage, and code generation to make queries fast. At the same time, it scales to thousands of nodes and multi-hour queries using the Spark engine, which provides full mid-query fault tolerance, without having to worry about using a different engine for historical data.\n\n_For getting a deeper perspective into the background, concepts, architecture of Spark SQL and DataFrames you can check out the original article, __['SQL at Scale with Apache Spark SQL and DataFrames - Concepts, Architecture and Examples'](https://medium.com/p/c567853a702f)___\n\nThis tutorial will familiarize you with essential Spark capabilities to deal with structured data typically often obtained from databases or flat files. We will explore typical ways of querying and aggregating relational data by leveraging concepts of DataFrames and SQL using Spark. We will work on an interesting dataset from the [KDD Cup 1999](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html) and try to query the data using high level abstrations like the dataframe which has already been a hit in popular data analysis tools like R and Python. We will also look at how easy it is to build data queries using the SQL language which you have learnt and retrieve insightful information from our data. This also happens at scale without us having to do a lot more since Spark distributes these data structures efficiently in the backend which makes our queries scalable and as efficient as possible."],"metadata":{}},{"cell_type":"code","source":["import pandas as pd\nimport matplotlib.pyplot as plt\nplt.style.use('fivethirtyeight')"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
"]}}],"execution_count":2},{"cell_type":"markdown","source":["## Data Retrieval\n\nWe will use data from the [KDD Cup 1999](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html), which is the data set used for The Third International Knowledge Discovery and Data Mining Tools Competition, which was held in conjunction with KDD-99 The Fifth International Conference on Knowledge Discovery and Data Mining. The competition task was to build a network intrusion detector, a predictive model capable of distinguishing between \"bad\" connections, called intrusions or attacks, and \"good\" normal connections. This database contains a standard set of data to be audited, which includes a wide variety of intrusions simulated in a military network environment. \n\nWe will be using the reduced dataset `kddcup.data_10_percent.gz` containing nearly half million nework interactions since we would be downloading this Gzip file from the web locally and then work on the same. If you have a good, stable internet connection, feel free to download and work with the full dataset available as `kddcup.data.gz`"],"metadata":{}},{"cell_type":"markdown","source":["#### Working with data from the web\n\nDealing with datasets retrieved from the web can be a bit tricky in Databricks. Fortunately, we have some excellent utility packages like `dbutils` which help in making our job easier. Let's take a quick look at some essential functions for this module."],"metadata":{}},{"cell_type":"code","source":["dbutils.help()"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["
\nThis module provides various utilities for users to interact with the rest of Databricks.\n

fs: DbfsUtils -> Manipulates the Databricks filesystem (DBFS) from the console
meta: MetaUtils -> Methods to hook into the compiler (EXPERIMENTAL)
notebook: NotebookUtils -> Utilities for the control flow of a notebook (EXPERIMENTAL)
preview: Preview -> Utilities under preview category
secrets: SecretUtils -> Provides utilities for leveraging secrets within notebooks
widgets: WidgetsUtils -> Methods to create and get bound value of input widgets inside notebooks

"]}}],"execution_count":5},{"cell_type":"markdown","source":["#### Retrieve and store data in Databricks\n\nWe will now leverage the python `urllib` library to extract the KDD Cup 99 data from their web repository, store it in a temporary location and then move it to the Databricks filesystem which can enable easy access to this data for analysis\n\n__Note:__ If you skip this step and download the data directly, you may end up getting a `InvalidInputException: Input path does not exist` error"],"metadata":{}},{"cell_type":"code","source":["import urllib\nurllib.urlretrieve(\"http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz\", \"/tmp/kddcup_data.gz\")\ndbutils.fs.mv(\"file:/tmp/kddcup_data.gz\", \"dbfs:/kdd/kddcup_data.gz\")\ndisplay(dbutils.fs.ls(\"dbfs:/kdd\"))"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["
pathnamesize
dbfs:/kdd/kddcup_data.gzkddcup_data.gz2144903
"]}}],"execution_count":7},{"cell_type":"markdown","source":["## Building the KDD Dataset\n\nNow that we have our data stored in the Databricks filesystem. Let's load up our data from the disk into Spark's traditional abstracted data structure, the [Resilient Distributed Dataset (RDD)](https://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds)"],"metadata":{}},{"cell_type":"code","source":["data_file = \"dbfs:/kdd/kddcup_data.gz\"\nraw_rdd = sc.textFile(data_file).cache()\nraw_rdd.take(5)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
Out[2]: \n[u'0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',\n u'0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.',\n u'0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',\n u'0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,39,39,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',\n u'0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,49,49,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.']\n
"]}}],"execution_count":9},{"cell_type":"code","source":["type(raw_rdd)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
Out[3]: pyspark.rdd.RDD\n
"]}}],"execution_count":10},{"cell_type":"markdown","source":["## Building a Spark DataFrame on our Data\n\nA Spark DataFrame is an interesting data structure representing a distributed collecion of data. A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a dataframe in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs in our case.\n\nTypically the entry point into all SQL functionality in Spark is the `SQLContext` class. To create a basic instance of this call, all we need is a `SparkContext` reference. In Databricks this global context object is available as `sc` for this purpose."],"metadata":{}},{"cell_type":"code","source":["from pyspark.sql import SQLContext\nsqlContext = SQLContext(sc)\nsqlContext "],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
Out[4]: <pyspark.sql.context.SQLContext at 0x7f470b54d490>\n
"]}}],"execution_count":12},{"cell_type":"markdown","source":["#### Splitting the CSV data\nEach entry in our RDD is a comma-separated line of data which we first need to split before we can parse and build our dataframe"],"metadata":{}},{"cell_type":"code","source":["csv_rdd = raw_rdd.map(lambda row: row.split(\",\"))\nprint(csv_rdd.take(2))\nprint(type(csv_rdd))"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
[[u'0', u'tcp', u'http', u'SF', u'181', u'5450', u'0', u'0', u'0', u'0', u'0', u'1', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'8', u'8', u'0.00', u'0.00', u'0.00', u'0.00', u'1.00', u'0.00', u'0.00', u'9', u'9', u'1.00', u'0.00', u'0.11', u'0.00', u'0.00', u'0.00', u'0.00', u'0.00', u'normal.'], [u'0', u'tcp', u'http', u'SF', u'239', u'486', u'0', u'0', u'0', u'0', u'0', u'1', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'8', u'8', u'0.00', u'0.00', u'0.00', u'0.00', u'1.00', u'0.00', u'0.00', u'19', u'19', u'1.00', u'0.00', u'0.05', u'0.00', u'0.00', u'0.00', u'0.00', u'0.00', u'normal.']]\n<class 'pyspark.rdd.PipelinedRDD'>\n
"]}}],"execution_count":14},{"cell_type":"markdown","source":["#### Check the total number of features (columns)\nWe can use the following code to check the total number of potential columns in our dataset"],"metadata":{}},{"cell_type":"code","source":["len(csv_rdd.take(1)[0])"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
Out[6]: 42\n
"]}}],"execution_count":16},{"cell_type":"markdown","source":["#### Data Understanding and Parsing\n\nThe KDD 99 Cup data consists of different attributes captured from connection data. The full list of attributes in the data can be obtained [__here__](http://kdd.ics.uci.edu/databases/kddcup99/kddcup.names) and further details pertaining to the description for each attribute\\column can be found [__here__](http://kdd.ics.uci.edu/databases/kddcup99/task.html). We will just be using some specific columns from the dataset, the details of which are specified below.\n\n\n| feature num | feature name | description | type |\n|-------------|--------------------|--------------------------------------------------------------|------------|\n| 1 | duration | length (number of seconds) of the connection | continuous |\n| 2 | protocol_type | type of the protocol, e.g. tcp, udp, etc. | discrete |\n| 3 | service | network service on the destination, e.g., http, telnet, etc. | discrete |\n| 4 | src_bytes | number of data bytes from source to destination | continuous |\n| 5 | dst_bytes | number of data bytes from destination to source | continuous |\n| 6 | flag | normal or error status of the connection | discrete |\n| 7 | wrong_fragment | number of ``wrong'' fragments | continuous |\n| 8 | urgent | number of urgent packets | continuous |\n| 9 | hot | number of ``hot'' indicators | continuous |\n| 10 | num_failed_logins | number of failed login attempts | continuous |\n| 11 | num_compromised | number of ``compromised'' conditions | continuous |\n| 12 | su_attempted | 1 if ``su root'' command attempted; 0 otherwise | discrete |\n| 13 | num_root | number of ``root'' accesses | continuous |\n| 14 | num_file_creations | number of file creation operations | continuous |\n\nWe will be extracting the following columns based on their positions in each datapoint (row) and build a new RDD as follows"],"metadata":{}},{"cell_type":"code","source":["from pyspark.sql import Row\n\nparsed_rdd = csv_rdd.map(lambda r: Row(\n duration=int(r[0]), \n protocol_type=r[1],\n service=r[2],\n flag=r[3],\n src_bytes=int(r[4]),\n dst_bytes=int(r[5]),\n wrong_fragment=int(r[7]),\n urgent=int(r[8]),\n hot=int(r[9]),\n num_failed_logins=int(r[10]),\n num_compromised=int(r[12]),\n su_attempted=r[14],\n num_root=int(r[15]),\n num_file_creations=int(r[16]),\n label=r[-1]\n )\n)\nparsed_rdd.take(5)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
Out[7]: \n[Row(dst_bytes=5450, duration=0, flag=u'SF', hot=0, label=u'normal.', num_compromised=0, num_failed_logins=0, num_file_creations=0, num_root=0, protocol_type=u'tcp', service=u'http', src_bytes=181, su_attempted=u'0', urgent=0, wrong_fragment=0),\n Row(dst_bytes=486, duration=0, flag=u'SF', hot=0, label=u'normal.', num_compromised=0, num_failed_logins=0, num_file_creations=0, num_root=0, protocol_type=u'tcp', service=u'http', src_bytes=239, su_attempted=u'0', urgent=0, wrong_fragment=0),\n Row(dst_bytes=1337, duration=0, flag=u'SF', hot=0, label=u'normal.', num_compromised=0, num_failed_logins=0, num_file_creations=0, num_root=0, protocol_type=u'tcp', service=u'http', src_bytes=235, su_attempted=u'0', urgent=0, wrong_fragment=0),\n Row(dst_bytes=1337, duration=0, flag=u'SF', hot=0, label=u'normal.', num_compromised=0, num_failed_logins=0, num_file_creations=0, num_root=0, protocol_type=u'tcp', service=u'http', src_bytes=219, su_attempted=u'0', urgent=0, wrong_fragment=0),\n Row(dst_bytes=2032, duration=0, flag=u'SF', hot=0, label=u'normal.', num_compromised=0, num_failed_logins=0, num_file_creations=0, num_root=0, protocol_type=u'tcp', service=u'http', src_bytes=217, su_attempted=u'0', urgent=0, wrong_fragment=0)]\n
"]}}],"execution_count":18},{"cell_type":"markdown","source":["#### Constructing the DataFrame\nNow that our data is neatly parsed and formatted, let's build our DataFrame!"],"metadata":{}},{"cell_type":"code","source":["df = sqlContext.createDataFrame(parsed_rdd)\ndisplay(df.head(10))"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["
dst_bytesdurationflaghotlabelnum_compromisednum_failed_loginsnum_file_creationsnum_rootprotocol_typeservicesrc_bytessu_attemptedurgentwrong_fragment
54500SF0normal.0000tcphttp181000
4860SF0normal.0000tcphttp239000
13370SF0normal.0000tcphttp235000
13370SF0normal.0000tcphttp219000
20320SF0normal.0000tcphttp217000
20320SF0normal.0000tcphttp217000
19400SF0normal.0000tcphttp212000
40870SF0normal.0000tcphttp159000
1510SF0normal.0000tcphttp210000
7860SF1normal.0000tcphttp212000
"]}}],"execution_count":20},{"cell_type":"markdown","source":["Now, we can easily have a look at our dataframe's schema using tne `printSchema(...)` function."],"metadata":{}},{"cell_type":"code","source":["df.printSchema()"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
root\n-- dst_bytes: long (nullable = true)\n-- duration: long (nullable = true)\n-- flag: string (nullable = true)\n-- hot: long (nullable = true)\n-- label: string (nullable = true)\n-- num_compromised: long (nullable = true)\n-- num_failed_logins: long (nullable = true)\n-- num_file_creations: long (nullable = true)\n-- num_root: long (nullable = true)\n-- protocol_type: string (nullable = true)\n-- service: string (nullable = true)\n-- src_bytes: long (nullable = true)\n-- su_attempted: string (nullable = true)\n-- urgent: long (nullable = true)\n-- wrong_fragment: long (nullable = true)\n\n
"]}}],"execution_count":22},{"cell_type":"markdown","source":["## Building a temporary table \n\nWe can leverage the `registerTempTable()` function to build a temporaty table to run SQL commands on our DataFrame at scale! A point to remember is that the lifetime of this temp table is tied to the session. It creates an in-memory table that is scoped to the cluster in which it was created. The data is stored using Hive's highly-optimized, in-memory columnar format. \n\nYou can also check out `saveAsTable()` which creates a permanent, physical table stored in S3 using the Parquet format. This table is accessible to all clusters. The table metadata including the location of the file(s) is stored within the Hive metastore.`"],"metadata":{}},{"cell_type":"code","source":["help(df.registerTempTable)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
Help on method registerTempTable in module pyspark.sql.dataframe:\n\nregisterTempTable(self, name) method of pyspark.sql.dataframe.DataFrame instance\n Registers this DataFrame as a temporary table using the given name.\n \n The lifetime of this temporary table is tied to the :class:`SparkSession`\n that was used to create this :class:`DataFrame`.\n \n >>> df.registerTempTable("people")\n >>> df2 = spark.sql("select * from people")\n >>> sorted(df.collect()) == sorted(df2.collect())\n True\n >>> spark.catalog.dropTempView("people")\n \n .. note:: Deprecated in 2.0, use createOrReplaceTempView instead.\n \n .. versionadded:: 1.3\n\n
"]}}],"execution_count":24},{"cell_type":"code","source":["df.registerTempTable(\"connections\")"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
"]}}],"execution_count":25},{"cell_type":"markdown","source":["## Executing SQL at Scale\nLet's look at a few examples of how we can run SQL queries on our table based off our dataframe. We will start with some simple queries and then look at aggregations, filters, sorting, subqueries and pivots"],"metadata":{}},{"cell_type":"markdown","source":["### Connections based on the protocol type\n\nLet's look at how we can get the total number of connections based on the type of connectivity protocol. First we will get this information using normal DataFrame DSL syntax to perform aggregations."],"metadata":{}},{"cell_type":"code","source":["display(df.groupBy('protocol_type')\n .count()\n .orderBy('count', ascending=False))"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["
protocol_typecount
icmp283602
tcp190065
udp20354
"]}}],"execution_count":28},{"cell_type":"markdown","source":["Can we also use SQL to perform the same aggregation? Yes we can leverage the table we built earlier for this!"],"metadata":{}},{"cell_type":"code","source":["protocols = sqlContext.sql(\"\"\"\n SELECT protocol_type, count(*) as freq\n FROM connections\n GROUP BY protocol_type\n ORDER BY 2 DESC\n \"\"\")\ndisplay(protocols)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["
protocol_typefreq
icmp283602
tcp190065
udp20354
"]}}],"execution_count":30},{"cell_type":"markdown","source":["You can clearly see, that you get the same results and you do not need to worry about your background infrastructure or how the code is executed. Just write simple SQL!"],"metadata":{}},{"cell_type":"markdown","source":["### Connections based on good or bad (attack types) signatures\n\nWe will now run a simple aggregation to check the total number of connections based on good (normal) or bad (intrusion attacks) types."],"metadata":{}},{"cell_type":"code","source":["labels = sqlContext.sql(\"\"\"\n SELECT label, count(*) as freq\n FROM connections\n GROUP BY label\n ORDER BY 2 DESC\n \"\"\")\ndisplay(labels)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["
labelfreq
smurf.280790
neptune.107201
normal.97278
back.2203
satan.1589
ipsweep.1247
portsweep.1040
warezclient.1020
teardrop.979
pod.264
nmap.231
guess_passwd.53
buffer_overflow.30
land.21
warezmaster.20
imap.12
rootkit.10
loadmodule.9
ftp_write.8
multihop.7
phf.4
perl.3
spy.2
"]}}],"execution_count":33},{"cell_type":"markdown","source":["We have a lot of different attack types. We can visualize this in the form of a bar chart. The simplest way is to use the excellent interface options in the Databricks notebook itself as depicted in the following figure!\n\n![](https://cdn-images-1.medium.com/max/800/1*MWtgLR6H4siUB1Ugc8sqog.png)\n\nThis gives us the following nice looking bar chart! Which you can customize further by clicking on __`Plot Options`__ as needed."],"metadata":{}},{"cell_type":"code","source":["labels = sqlContext.sql(\"\"\"\n SELECT label, count(*) as freq\n FROM connections\n GROUP BY label\n ORDER BY 2 DESC\n \"\"\")\ndisplay(labels)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["
labelfreq
smurf.280790
neptune.107201
normal.97278
back.2203
satan.1589
ipsweep.1247
portsweep.1040
warezclient.1020
teardrop.979
pod.264
nmap.231
guess_passwd.53
buffer_overflow.30
land.21
warezmaster.20
imap.12
rootkit.10
loadmodule.9
ftp_write.8
multihop.7
phf.4
perl.3
spy.2
"]}}],"execution_count":35},{"cell_type":"markdown","source":["Another way is to write the code yourself to do it. You can extract the aggregated data as a `pandas` DataFrame and then plot it as a regular bar chart."],"metadata":{}},{"cell_type":"code","source":["labels_df = pd.DataFrame(labels.toPandas())\nlabels_df.set_index(\"label\", drop=True,inplace=True)\nlabels_fig = labels_df.plot(kind='barh')\n\nplt.rcParams[\"figure.figsize\"] = (7, 5)\nplt.rcParams.update({'font.size': 10})\nplt.tight_layout()\ndisplay(labels_fig.figure)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"image/png":[""]}}],"execution_count":37},{"cell_type":"markdown","source":["### Connections based on protocols and attacks\n\nLet's look at which protocols are most vulnerable to attacks now based on the following SQL query."],"metadata":{}},{"cell_type":"code","source":["attack_protocol = sqlContext.sql(\"\"\"\n SELECT \n protocol_type, \n CASE label\n WHEN 'normal.' THEN 'no attack'\n ELSE 'attack'\n END AS state,\n COUNT(*) as freq\n FROM connections\n GROUP BY protocol_type, state\n ORDER BY 3 DESC\n \"\"\")\ndisplay(attack_protocol)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["
protocol_typestatefreq
icmpattack282314
tcpattack113252
tcpno attack76813
udpno attack19177
icmpno attack1288
udpattack1177
"]}}],"execution_count":39},{"cell_type":"markdown","source":["Well, looks like ICMP connections followed by TCP connections have had the maximum attacks!"],"metadata":{}},{"cell_type":"markdown","source":["### Connection stats based on protocols and attacks\n\nLet's take a look at some statistical measures pertaining to these protocols and attacks for our connection requests."],"metadata":{}},{"cell_type":"code","source":["attack_stats = sqlContext.sql(\"\"\"\n SELECT \n protocol_type, \n CASE label\n WHEN 'normal.' THEN 'no attack'\n ELSE 'attack'\n END AS state,\n COUNT(*) as total_freq,\n ROUND(AVG(src_bytes), 2) as mean_src_bytes,\n ROUND(AVG(dst_bytes), 2) as mean_dst_bytes,\n ROUND(AVG(duration), 2) as mean_duration,\n SUM(num_failed_logins) as total_failed_logins,\n SUM(num_compromised) as total_compromised,\n SUM(num_file_creations) as total_file_creations,\n SUM(su_attempted) as total_root_attempts,\n SUM(num_root) as total_root_acceses\n FROM connections\n GROUP BY protocol_type, state\n ORDER BY 3 DESC\n \"\"\")\ndisplay(attack_stats)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["
protocol_typestatetotal_freqmean_src_bytesmean_dst_bytesmean_durationtotal_failed_loginstotal_compromisedtotal_file_creationstotal_root_attemptstotal_root_acceses
icmpattack282314932.140.00.00000.00
tcpattack1132529880.38881.4123.19572269761.0152
tcpno attack768131439.314263.9711.0818277645917.05456
udpno attack1917798.0189.891054.630000.00
icmpno attack128891.470.00.00000.00
udpattack117727.50.230.00000.00
"]}}],"execution_count":42},{"cell_type":"markdown","source":["Looks like average amount of data being transmitted in TCP requests are much higher which is not surprising. Interestingly attacks have a much higher average payload of data being transmitted from the source to the destination."],"metadata":{}},{"cell_type":"markdown","source":["#### Filtering connection stats based on the TCP protocol by service and attack type\n\nLet's take a closer look at TCP attacks given that we have more relevant data and statistics for the same. We will now aggregate different types of TCP attacks based on service, attack type and observe different metrics."],"metadata":{}},{"cell_type":"code","source":["tcp_attack_stats = sqlContext.sql(\"\"\"\n SELECT \n service,\n label as attack_type,\n COUNT(*) as total_freq,\n ROUND(AVG(duration), 2) as mean_duration,\n SUM(num_failed_logins) as total_failed_logins,\n SUM(num_file_creations) as total_file_creations,\n SUM(su_attempted) as total_root_attempts,\n SUM(num_root) as total_root_acceses\n FROM connections\n WHERE protocol_type = 'tcp'\n AND label != 'normal.'\n GROUP BY service, attack_type\n ORDER BY total_freq DESC\n \"\"\")\ndisplay(tcp_attack_stats)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["
serviceattack_typetotal_freqmean_durationtotal_failed_loginstotal_file_creationstotal_root_attemptstotal_root_acceses
privateneptune.1013170.0000.00
httpback.22030.13000.00
othersatan.12210.0000.00
privateportsweep.7251915.81000.00
ftp_datawarezclient.708403.71000.00
ftpwarezclient.3071063.79000.00
otherportsweep.2601058.22000.00
telnetneptune.1970.0000.00
httpneptune.1920.0000.00
fingerneptune.1770.0000.00
ftp_dataneptune.1700.0000.00
privatesatan.1700.05000.00
csnet_nsneptune.1230.0000.00
smtpneptune.1200.0000.00
remote_jobneptune.1180.0000.00
pop_3neptune.1180.0000.00
discardneptune.1150.0000.00
iso_tsapneptune.1150.0000.00
systatneptune.1130.0000.00
domainneptune.1120.0000.00
gopherneptune.1120.0000.00
shellneptune.1110.0000.00
echoneptune.1110.0000.00
sql_netneptune.1090.0000.00
authneptune.1080.0000.00
rjeneptune.1080.0000.00
printerneptune.1070.0000.00
whoisneptune.1070.0000.00
courierneptune.1070.0000.00
bgpneptune.1060.0000.00
nntpneptune.1060.0000.00
netbios_ssnneptune.1060.0000.00
kloginneptune.1060.0000.00
uucp_pathneptune.1050.0000.00
nnspneptune.1050.0000.00
mtpneptune.1050.0000.00
imap4neptune.1050.0000.00
ftpneptune.1040.0000.00
uucpneptune.1040.0000.00
sunrpcneptune.1040.0000.00
timeneptune.1030.0000.00
loginneptune.1020.0000.00
hostnamesneptune.1020.0000.00
efsneptune.1020.0000.00
sshneptune.1020.0000.00
daytimeneptune.1020.0000.00
netbios_nsneptune.1010.0000.00
pop_2neptune.1010.0000.00
supdupneptune.1010.0000.00
ldapneptune.1010.0000.00
vmnetneptune.1010.0000.00
execneptune.990.0000.00
linkneptune.990.0000.00
privatenmap.990.0000.00
netbios_dgmneptune.990.0000.00
http_443neptune.980.0000.00
kshellneptune.980.0000.00
nameneptune.970.0000.00
ctfneptune.960.0000.00
netstatneptune.920.0000.00
otherneptune.910.0000.00
Z39_50neptune.910.0000.00
privateipsweep.680.0000.00
telnetguess_passwd.532.725600.00
telnetbuffer_overflow.21130.670150.05
fingerland.200.0000.00
ftp_datawarezmaster.188.06000.00
imap4imap.126.0000.016
ftp_databuffer_overflow.80.0000.00
telnetloadmodule.563.8090.03
telnetrootkit.5197.4120.025
otherwarezclient.53031.0000.00
httpphf.44.5000.00
ftp_dataftp_write.40.0000.01
vmnetportsweep.40.0000.00
supdupportsweep.410171.5000.00
ftp_dataipsweep.30.0000.00
csnet_nsportsweep.313484.0000.00
telnetperl.341.33060.06
ftp_dataloadmodule.30.0000.00
fingersatan.31.0000.00
ftp_datamultihop.30.33000.00
httpportsweep.313689.67000.00
ftpportsweep.30.0000.00
ftp_datasatan.30.0000.00
pop_3portsweep.313446.33000.00
gopheripsweep.34.33000.00
httpipsweep.32.0000.00
ftpftp_write.229.0020.00
sunrpcportsweep.20.0000.00
smtpportsweep.20.0000.00
ftpmultihop.2185.5020.00
rjeipsweep.20.0000.00
printerportsweep.215467.5000.00
httpsatan.20.0000.00
telnetspy.2318.0011.00
ftpwarezmaster.278.00220.00
systatportsweep.20.0000.00
mtpipsweep.20.0000.00
smtpsatan.20.5000.00
telnetmultihop.2458.0080.093
X11satan.20.0000.00
linkipsweep.20.0000.00
ftp_dataportsweep.221224.0000.00
whoisportsweep.22.0000.00
timeipsweep.21.0000.00
telnetportsweep.20.0000.00
loginftp_write.2100.5000.01
netstatportsweep.20.0000.00
uucp_pathportsweep.14.0000.00
sshportsweep.10.0000.00
telnetipsweep.16.0000.00
uucpsatan.10.0000.00
whoisipsweep.10.0000.00
linkportsweep.10.0000.00
nntpsatan.15.0000.00
nntpnmap.10.0000.00
remote_jobipsweep.10.0000.00
vmnetsatan.10.0000.00
ftpipsweep.12.0000.00
ftprootkit.121.0010.00
fingeripsweep.12.0000.00
discardsatan.111.0000.00
telnetnmap.10.0000.00
courierportsweep.130619.0000.00
echoportsweep.10.0000.00
efsportsweep.130835.0000.00
ftpbuffer_overflow.17.0040.00
ftp_datarootkit.10.0000.02
pop_3nmap.10.0000.00
sunrpcsatan.111.0000.00
http_443portsweep.10.0000.00
gophersatan.12.0000.00
ctfnmap.10.0000.00
telnetland.10.0000.00
fingerportsweep.12.0000.00
pop_3satan.15.0000.00
uucpportsweep.130418.0000.00
domainipsweep.16.0000.00
hostnamessatan.10.0000.00
telnetsatan.13.0000.00
remote_jobportsweep.10.0000.00
rjeportsweep.11.0000.00
gopherportsweep.10.0000.00
netstatsatan.10.0000.00
IRCsatan.10.0000.00
nameipsweep.10.0000.00
smtpipsweep.10.0000.00
netbios_nsportsweep.10.0000.00
ftploadmodule.17.0040.00
sshipsweep.16.0000.00
sql_netportsweep.10.0000.00
netbios_ssnportsweep.10.0000.00
daytimeportsweep.10.0000.00
hostnamesportsweep.10.0000.00
ftpsatan.18.0000.00
pm_dumpsatan.10.0000.00
Z39_50portsweep.10.0000.00
"]}}],"execution_count":45},{"cell_type":"markdown","source":["There are a lot of attack types and the preceding output shows a specific section of the same."],"metadata":{}},{"cell_type":"markdown","source":["### Filtering connection stats based on the TCP protocol by service and attack type\n\nWe will now filter some of these attack types by imposing some constraints based on duration, file creations, root accesses in our query."],"metadata":{}},{"cell_type":"code","source":["tcp_attack_stats = sqlContext.sql(\"\"\"\n SELECT \n service,\n label as attack_type,\n COUNT(*) as total_freq,\n ROUND(AVG(duration), 2) as mean_duration,\n SUM(num_failed_logins) as total_failed_logins,\n SUM(num_file_creations) as total_file_creations,\n SUM(su_attempted) as total_root_attempts,\n SUM(num_root) as total_root_acceses\n FROM connections\n WHERE (protocol_type = 'tcp'\n AND label != 'normal.')\n GROUP BY service, attack_type\n HAVING (mean_duration >= 50\n AND total_file_creations >= 5\n AND total_root_acceses >= 1)\n ORDER BY total_freq DESC\n \"\"\")\ndisplay(tcp_attack_stats)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["
serviceattack_typetotal_freqmean_durationtotal_failed_loginstotal_file_creationstotal_root_attemptstotal_root_acceses
telnetbuffer_overflow.21130.670150.05
telnetloadmodule.563.8090.03
telnetmultihop.2458.0080.093
"]}}],"execution_count":48},{"cell_type":"markdown","source":["Interesting to see multihop attacks being able to get root accesses to the destination hosts!"],"metadata":{}},{"cell_type":"markdown","source":["### Subqueries to filter TCP attack types based on service\n\nLet's try to get all the TCP attacks based on service and attack type such that the overall mean duration of these attacks is greater than zero (`> 0`). For this we can do an inner query with all aggregation statistics and then extract the relevant queries and apply a mean duration filter in the outer query as shown below."],"metadata":{}},{"cell_type":"code","source":["tcp_attack_stats = sqlContext.sql(\"\"\"\n SELECT \n t.service,\n t.attack_type,\n t.total_freq\n FROM\n (SELECT \n service,\n label as attack_type,\n COUNT(*) as total_freq,\n ROUND(AVG(duration), 2) as mean_duration,\n SUM(num_failed_logins) as total_failed_logins,\n SUM(num_file_creations) as total_file_creations,\n SUM(su_attempted) as total_root_attempts,\n SUM(num_root) as total_root_acceses\n FROM connections\n WHERE protocol_type = 'tcp'\n AND label != 'normal.'\n GROUP BY service, attack_type\n ORDER BY total_freq DESC) as t\n WHERE t.mean_duration > 0 \n \"\"\")\ndisplay(tcp_attack_stats)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["
serviceattack_typetotal_freq
httpback.2203
privateportsweep.725
ftp_datawarezclient.708
ftpwarezclient.307
otherportsweep.260
privatesatan.170
telnetguess_passwd.53
telnetbuffer_overflow.21
ftp_datawarezmaster.18
imap4imap.12
otherwarezclient.5
telnetloadmodule.5
telnetrootkit.5
httpphf.4
supdupportsweep.4
gopheripsweep.3
telnetperl.3
ftp_datamultihop.3
csnet_nsportsweep.3
pop_3portsweep.3
fingersatan.3
httpipsweep.3
httpportsweep.3
telnetspy.2
smtpsatan.2
ftpmultihop.2
whoisportsweep.2
timeipsweep.2
ftpftp_write.2
ftpwarezmaster.2
printerportsweep.2
ftp_dataportsweep.2
telnetmultihop.2
loginftp_write.2
uucp_pathportsweep.1
ftprootkit.1
telnetipsweep.1
nntpsatan.1
fingerportsweep.1
ftpbuffer_overflow.1
ftpipsweep.1
sunrpcsatan.1
discardsatan.1
ftploadmodule.1
courierportsweep.1
domainipsweep.1
rjeportsweep.1
ftpsatan.1
gophersatan.1
fingeripsweep.1
pop_3satan.1
uucpportsweep.1
sshipsweep.1
telnetsatan.1
efsportsweep.1
"]}}],"execution_count":51},{"cell_type":"markdown","source":["This is nice! Now an interesting way to also view this data is to use a pivot table where one attribute represents rows and another one represents columns. Let's see if we can leverage Spark DataFrames to do this!"],"metadata":{}},{"cell_type":"markdown","source":["### Building a Pivot Table from Aggregated Data\n\nHere, we will build upon the previous DataFrame object we obtained where we aggregated attacks based on type and service. For this, we can leverage the power of Spark DataFrames and the DataFrame DSL."],"metadata":{}},{"cell_type":"code","source":["display((tcp_attack_stats.groupby('service')\n .pivot('attack_type')\n .agg({'total_freq':'max'})\n .na.fill(0))\n)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["
serviceback.buffer_overflow.ftp_write.guess_passwd.imap.ipsweep.loadmodule.multihop.perl.phf.portsweep.rootkit.satan.spy.warezclient.warezmaster.
telnet021053015230051200
ftp012001120001103072
pop_30000000000301000
discard0000000000001000
login0020000000000000
smtp0000000000002000
domain0000010000000000
http2203000030004300000
courier0000000000100000
other000000000026000050
efs0000000000100000
private00000000007250170000
ftp_data0000000300200070818
whois0000000000200000
nntp0000000000001000
uucp_path0000000000100000
supdup0000000000400000
finger0000010000103000
printer0000000000200000
time0000020000000000
csnet_ns0000000000300000
sunrpc0000000000001000
imap400001200000000000
gopher0000030000001000
uucp0000000000100000
ssh0000010000000000
rje0000000000100000
"]}}],"execution_count":54},{"cell_type":"markdown","source":["We get a nice neat pivot table showing all the occurences based on service and attack type!\n\nThere are plenty of articles\\tutorials available online so I would recommend you to check them out. Some useful resources for you to check out include, [the complete guide to Spark SQL from Databricks](https://docs.databricks.com/spark/latest/spark-sql/index.html)."],"metadata":{}}],"metadata":{"name":"Working with SQL at Scale - Spark SQL Tutorial","notebookId":3704545280501166},"nbformat":4,"nbformat_minor":0}