# How to Build an Interactive Spark Notebook with BeakerX and Jupyter on DC/OS

[Jupyter Notebooks](https://jupyter.org/)—one of the industry’s favorite open-source web applications for creating and sharing documents housing live code, equations, visualizations, and more—are great for iterative data science work.

However, working with large datasets can be challenging since data doesn’t typically fit on the local disk or into local memory. Instead, data is stored in a cluster and a distributed computing framework such as [Apache Spark](https://spark.apache.org/) is required to process it within a reasonable amount of time.

Now, this is where BeakerX comes in.

[BeakerX](http://beakerx.com/) is a collection of Jupyter kernels and notebook extensions that allows you to work efficiently with large Spark datasets directly from a notebook. In addition, you can still use the data science libraries you're familiar with for local development.

In this tutorial, you’ll learn how to use BeakerX to build an interactive notebook that reads a dataset from [HDFS](http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsUserGuide.html) and classifies the data using a linear model built with Spark in Scala. We’ll also produce a report using popular Python libraries.

Before we dive in, it’s important to note that this tutorial assumes you're using [Jupyter Notebooks on DC/OS](https://d2iq.com/service-catalog/beta-mesosphere-jupyter-service), which offers notebooks preconfigured with many popular data science tools such as Spark, BeakerX, [pandas](https://pandas.pydata.org/), and [scikit-learn](https://scikit-learn.org/).

## Step 1: Prepare your Cluster

To run this tutorial you need the [HDFS](https://d2iq.com/service-catalog/hdfs) and [Jupyter](https://d2iq.com/service-catalog/beta-mesosphere-jupyter-service) packages installed on your DC/OS cluster.
Directions for installing HDFS and configuring Jupyter can be found [in this tutorial video](https://www.youtube.com/watch?v=PRgJsGwfIDk).

Once you have these packages installed, navigate to your Jupyter instance and open a terminal tab. Run the following commands from the Jupyter terminal to clone the repository that contains this notebook.

```
git clone https://github.com/dcos-labs/data-science.git
```

Using the Jupyter file browser, navigate to the folder called **data-science**, then **beakerx**, and open the notebook called **BeakerX.ipynb**.

## Step 2: Using an Interactive UI to Select Input Parameters

The code below renders a slider that we can use to change the value of `sample_size`, which will be used further down. UI components are a great way to produce interactive notebooks that make data exploration more engaging and BeakerX offers a number of [forms, widgets, and interactive components](https://nbviewer.jupyter.org/github/twosigma/beakerx/blob/master/StartHere.ipynb#Forms,-Widgets,-and-Interaction).

Notice the `%%groovy` magic on the first line. This tells BeakerX to interpret the cell content using the Groovy programming language. BeakerX supports multiple languages per notebook this way, allowing users to take advantage of the best tools for the job—even if they're in different programming languages.

In [None]:
%%groovy
import com.twosigma.beakerx.widget.IntSlider

sample_size = new IntSlider()
sample_size.min = 100
sample_size.max = 10000
sample_size.step = 100
sample_size.value = 5000
sample_size

The next cell demonstrates how to use the BeakerX [Autotranslation](https://nbviewer.jupyter.org/github/twosigma/beakerx/blob/master/doc/groovy/GeneralAutotranslation.ipynb) feature. This feature allows you to synchronize data between cells that are written in different programming languages. Assigning the value to the `beakerx` object allows us to read it back in other languages later.

In [None]:
%%groovy
beakerx.sample_size = sample_size.value

Now, we can easily read the value of `sample_size` from the default language of this notebook, which is Scala.

In [None]:
beakerx.sample_size

It is also available in other languages, for example Python. For more examples in additional languages, see the [docs for Autotranslation](https://nbviewer.jupyter.org/github/twosigma/beakerx/blob/master/doc/groovy/GeneralAutotranslation.ipynb).

In [None]:
%%python
from beakerx.object import beakerx
beakerx.sample_size

Now, we’ll use the sample size given above to generate a set of records and load them into HDFS. This cell outputs the exit code of the script, which should be **0** on success.

In [None]:
import sys.process._
val runme = "python generate_example.py somedata " + beakerx.sample_size
val exitCode = runme !

## Step 3: Adding Spark and Hadoop Libraries

The Jupyter package includes the Spark and Hadoop JARs that we need to run our Spark job. So, you don't need to download them from the internet. Now, let’s add them to the classpath.

In [None]:
%classpath add jar /mnt/mesos/sandbox
%classpath add jar /opt/spark/jars/*
%classpath add jar /opt/hadoop/share/hadoop/common/*
%classpath add jar /opt/hadoop/share/hadoop/common/lib/*
%classpath add jar /opt/hadoop/share/hadoop/hdfs/*
%classpath add jar /opt/hadoop/share/hadoop/hdfs/lib/*
%classpath add jar /opt/hadoop/share/hadoop/yarn/*
%classpath add jar /opt/hadoop/share/hadoop/yarn/lib/*
%classpath add jar /opt/hadoop/share/hadoop/mapreduce/*
%classpath add jar /opt/hadoop/share/hadoop/mapreduce/lib/*
%classpath add jar /opt/hadoop/share/hadoop/tools/lib/*

## Step 4: Using the BeakerX GUI to Run Spark Jobs

To run distributed Spark jobs on DC/OS using the BeakerX Spark UI, we need to change some of the default settings. Run the cell below and once the UI loads make the following changes:

* Remove the setting for **spark.mesos.principal** by clicking the **X** next to it
* Change the value for **spark.mesos.executor.docker.image** to **mesosphere/mesosphere-data-toolkit:1.0.0-1.0.0**
* Click **Save**

Once this is done, hit **Start** to launch a Spark cluster directly from this notebook. Depending on the specs of your cluster, this will take anywhere from a few seconds to a few minutes. Once it's ready, you'll see a star-shaped Spark logo on a green label button, along with additional buttons.

In [None]:
%%spark

The GUI provides useful metrics for working with Spark and allows you to track the progress of your jobs. It also allows you to easily create, save, and select multiple configurations. You can jump directly to the Spark UI by clicking the green Spark logo.

## Step 5: Run a Spark Job

Now, let's write the Spark job that builds a linear model for our dataset. First, we need to import its dependencies.

In [None]:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.ml.regression.GBTRegressor
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType, IntegerType}
import org.apache.spark.sql.{Encoders, SparkSession}
import org.apache.spark.ml.classification.LogisticRegression
import scala.math.{ pow, sqrt }

Next, we need to load the data from HDFS that was generated previously.

In [None]:
// Create a schema
val schemaStruct = StructType(
 StructField("x1", DoubleType) ::
 StructField("x2", DoubleType) ::
 StructField("y", IntegerType) :: Nil
 )

// Use schema to read data from HDFS using default url
val df = spark.read
 .option("header", true)
 .schema(schemaStruct)
 .csv("hdfs://name-0-node.hdfs.autoip.dcos.thisdcos.directory:9001/somedata.csv")
 .na.drop()

Once this is complete, display the results with a table view using `df.display()`. This will trigger the Spark job and a progress bar will appear. You can click on the ellipses in the column headers for filtering, sorting, and other options.

In [None]:
df.display()

Next, we’ll run a logistic regression and get results. The output of this cell is the model accuracy.

In [None]:
// Break data into training and test
val Array(trainingData, testData) = df.randomSplit(Array(0.8, 0.2))
val labelColumn = "y"

// Use the VectorAssembler to combine the x1, x2 columns into a single vector column
val assembler = new VectorAssembler()
 .setInputCols(Array("x1", "x2"))
 .setOutputCol("features")

// logistic regression model
val lr = new LogisticRegression()
 .setLabelCol(labelColumn)
 .setFeaturesCol("features")
 .setPredictionCol(labelColumn + "_pred")
 .setMaxIter(100)
 .setRegParam(0.1)
 .setElasticNetParam(0.8)

// Run simple pipeline
val stages = Array(assembler, lr)
val pipeline = new Pipeline().setStages(stages)
val model = pipeline.fit(trainingData)

// We'll make predictions using the model and the test data and get accuracy
val predictions = model.transform(testData)
val accuracy = predictions.filter("y == y_pred").count() / predictions.count().toDouble

The accuracy is only about 0.5 which is equivalent to flipping a coin, so our model isn't very good.

## Step 6: Debug Model Performance

Let's find out why our model performed so poorly by plotting the data.

In [None]:
// Create function to remove dimensionality of a list
def flatten(l: List[Any]): List[Any] = {
 def _flatten(res: List[Any], rem: List[Any]):List[Any] = rem match {
 case Nil => res
 case (h:List[_])::Nil => _flatten(res, h)
 case (h:List[_])::tail => _flatten(res:::h, tail)
 case h::tail => _flatten(res:::List(h), tail)
 }
 _flatten(List(), l)
}

// Split positive and negative cases
val dfPOS = df.filter("y == 1")
val dfNEG = df.filter("y == 0")

// Plot the first 300 records of positve and negative cases
val plotPOSx1 = flatten(dfPOS.select("x1").collect().map(_ (0)).toList).map(_.toString.toDouble)
val plotPOSx2 = flatten(dfPOS.select("x2").collect().map(_ (0)).toList).map(_.toString.toDouble)
val plotNEGx1 = flatten(dfNEG.select("x1").collect().map(_ (0)).toList).map(_.toString.toDouble)
val plotNEGx2 = flatten(dfNEG.select("x2").collect().map(_ (0)).toList).map(_.toString.toDouble)

println(" ")

val plot = new Plot { title = "Plot of raw data" }

val list = List(
 new Points {
 x = plotPOSx1.take(300)
 y = plotPOSx2.take(300)
 size = 5.0
 color = Color.red
 },
 new Points {
 x = plotNEGx1.take(300)
 y = plotNEGx2.take(300)
 size = 5.0
 color = Color.blue
 }
)

plot.add(list)

The data is plotted in a circle. So, it’s easy to see why a linear partition won't split the positive and negative cases.
Using the Pythagorean Theorem $ a^2 + b^2 = c^2 $ we can map the data into something we can linearly split.

In [None]:
// Transform data and replot
val ax1 = plotPOSx1.take(300).map(pow(_,2))
val ax2 = plotPOSx2.take(300).map(pow(_,2))
val bx1 = plotNEGx1.take(300).map(pow(_,2))
val bx2 = plotNEGx2.take(300).map(pow(_,2))
val plot = new Plot { title = "Plot of data with features squared" }

val list = List(
 new Points {
 x = ax1
 y = ax2
 size = 5.0
 color = Color.red
 },
 new Points {
 x = bx1
 y = bx2
 size = 5.0
 color = Color.blue
 }
)

plot.add(list)

This should be easy for a logistic regression to classify. Let's redo the model with the transformed data and output the accuracy.

In [None]:
// w1 = x1^2, w2 = x2^2
val dfT = df.withColumn("w1", ((df.col("x1")*df.col("x1")))).withColumn("w2", ((df.col("x2")*df.col("x2"))))
val Array(trainingData, testData) = dfT.randomSplit(Array(0.8, 0.2))
val labelColumn = "y"

// Use the VectorAssembler to combine the w1, w2 columns into a single vector column
val assembler = new VectorAssembler()
 .setInputCols(Array("w1", "w2"))
 .setOutputCol("features")

// logistic regression model used above
val lr = new LogisticRegression()
 .setLabelCol(labelColumn)
 .setFeaturesCol("features")
 .setPredictionCol(labelColumn + "_pred")
 .setMaxIter(100)
 .setRegParam(0.1)
 .setElasticNetParam(0.8)

// Same pipeline as above
val stages = Array(assembler, lr)
val pipeline = new Pipeline().setStages(stages)
val model = pipeline.fit(trainingData)

// We'll make predictions using the model and the test data
val predictions = model.transform(testData)
val accuracy = predictions.filter("y == y_pred").count() / predictions.count().toDouble

As expected, the model works much better, with accuracy being close to 1.0.

But accuracy is only one useful metric. A confusion matrix will give us more details on how our model performed. You can easily create one using scikit-learn, which is a Python library included in the Jupyter package. We can use the BeakerX Autotranslation feature again to pass the data to Python.

In [None]:
val evalData = predictions.select("y", "y_pred").head(predictions.count().toInt)
beakerx.evalData = evalData
println("Data moved to BeakerX")

Now the data is accessible from Python and we can build and plot our confusion matrix using scikit-learn.

In [None]:
%%python
from beakerx.object import beakerx
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report

dfEval = pd.DataFrame(' '.join(beakerx.evalData).replace('[','').replace(']','').split())[0].str.split(',', expand = True)
y_true = pd.to_numeric(dfEval[0], downcast='integer')
y_pred = pd.to_numeric(dfEval[1], downcast='integer')

# Print a text report of precision, recall, f1-score, support
print(classification_report(y_true, y_pred))

# Plot the confusion matrix
cm = confusion_matrix(y_true, y_pred)

ax = plt.subplot()
sns.heatmap(cm, annot=True, ax=ax, fmt='g')
ax.set_xlabel("Predicted labels")
ax.set_ylabel("True labels")
ax.set_title("Confusion Matrix")
ax

## What Did We Learn Today?

In this tutorial, we learned how to create an interactive UI within a notebook to read input parameters from a user. We then launched Spark jobs and tracked their progress using the BeakerX Spark UI without leaving the notebook environment. Lastly, we used multiple programming languages in a single notebook and easily synchronized data between them using BeakerX Autotranslation.