{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# How to Build an Interactive Spark Notebook with BeakerX and Jupyter on DC/OS\n", "\n", "[Jupyter Notebooks](https://jupyter.org/)—one of the industry’s favorite open-source web applications for creating and sharing documents housing live code, equations, visualizations, and more—are great for iterative data science work.\n", "\n", "However, working with large datasets can be challenging since data doesn’t typically fit on the local disk or into local memory. Instead, data is stored in a cluster and a distributed computing framework such as [Apache Spark](https://spark.apache.org/) is required to process it within a reasonable amount of time.\n", "\n", "Now, this is where BeakerX comes in.\n", "\n", "[BeakerX](http://beakerx.com/) is a collection of Jupyter kernels and notebook extensions that allows you to work efficiently with large Spark datasets directly from a notebook. In addition, you can still use the data science libraries you're familiar with for local development.\n", "\n", "In this tutorial, you’ll learn how to use BeakerX to build an interactive notebook that reads a dataset from [HDFS](http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsUserGuide.html) and classifies the data using a linear model built with Spark in Scala. We’ll also produce a report using popular Python libraries.\n", "\n", "Before we dive in, it’s important to note that this tutorial assumes you're using [Jupyter Notebooks on DC/OS](https://d2iq.com/service-catalog/beta-mesosphere-jupyter-service), which offers notebooks preconfigured with many popular data science tools such as Spark, BeakerX, [pandas](https://pandas.pydata.org/), and [scikit-learn](https://scikit-learn.org/)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 1: Prepare your Cluster\n", "\n", "To run this tutorial you need the [HDFS](https://d2iq.com/service-catalog/hdfs) and [Jupyter](https://d2iq.com/service-catalog/beta-mesosphere-jupyter-service) packages installed on your DC/OS cluster.\n", "Directions for installing HDFS and configuring Jupyter can be found [in this tutorial video](https://www.youtube.com/watch?v=PRgJsGwfIDk)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once you have these packages installed, navigate to your Jupyter instance and open a terminal tab. Run the following commands from the Jupyter terminal to clone the repository that contains this notebook.\n", "\n", "```\n", "git clone https://github.com/dcos-labs/data-science.git\n", "```\n", "\n", "Using the Jupyter file browser, navigate to the folder called **data-science**, then **beakerx**, and open the notebook called **BeakerX.ipynb**." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 2: Using an Interactive UI to Select Input Parameters\n", "\n", "The code below renders a slider that we can use to change the value of `sample_size`, which will be used further down. UI components are a great way to produce interactive notebooks that make data exploration more engaging and BeakerX offers a number of [forms, widgets, and interactive components](https://nbviewer.jupyter.org/github/twosigma/beakerx/blob/master/StartHere.ipynb#Forms,-Widgets,-and-Interaction).\n", "\n", "Notice the `%%groovy` magic on the first line. This tells BeakerX to interpret the cell content using the Groovy programming language. BeakerX supports multiple languages per notebook this way, allowing users to take advantage of the best tools for the job—even if they're in different programming languages." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%groovy\n", "import com.twosigma.beakerx.widget.IntSlider\n", "\n", "sample_size = new IntSlider()\n", "sample_size.min = 100\n", "sample_size.max = 10000\n", "sample_size.step = 100\n", "sample_size.value = 5000\n", "sample_size" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The next cell demonstrates how to use the BeakerX [Autotranslation](https://nbviewer.jupyter.org/github/twosigma/beakerx/blob/master/doc/groovy/GeneralAutotranslation.ipynb) feature. This feature allows you to synchronize data between cells that are written in different programming languages. Assigning the value to the `beakerx` object allows us to read it back in other languages later." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%groovy\n", "beakerx.sample_size = sample_size.value" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, we can easily read the value of `sample_size` from the default language of this notebook, which is Scala." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "beakerx.sample_size" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It is also available in other languages, for example Python. For more examples in additional languages, see the [docs for Autotranslation](https://nbviewer.jupyter.org/github/twosigma/beakerx/blob/master/doc/groovy/GeneralAutotranslation.ipynb)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%python\n", "from beakerx.object import beakerx\n", "beakerx.sample_size" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, we’ll use the sample size given above to generate a set of records and load them into HDFS. This cell outputs the exit code of the script, which should be **0** on success." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sys.process._\n", "val runme = \"python generate_example.py somedata \" + beakerx.sample_size\n", "val exitCode = runme !" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 3: Adding Spark and Hadoop Libraries\n", "\n", "The Jupyter package includes the Spark and Hadoop JARs that we need to run our Spark job. So, you don't need to download them from the internet. Now, let’s add them to the classpath." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%classpath add jar /mnt/mesos/sandbox\n", "%classpath add jar /opt/spark/jars/*\n", "%classpath add jar /opt/hadoop/share/hadoop/common/*\n", "%classpath add jar /opt/hadoop/share/hadoop/common/lib/*\n", "%classpath add jar /opt/hadoop/share/hadoop/hdfs/*\n", "%classpath add jar /opt/hadoop/share/hadoop/hdfs/lib/*\n", "%classpath add jar /opt/hadoop/share/hadoop/yarn/*\n", "%classpath add jar /opt/hadoop/share/hadoop/yarn/lib/*\n", "%classpath add jar /opt/hadoop/share/hadoop/mapreduce/*\n", "%classpath add jar /opt/hadoop/share/hadoop/mapreduce/lib/*\n", "%classpath add jar /opt/hadoop/share/hadoop/tools/lib/*" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 4: Using the BeakerX GUI to Run Spark Jobs\n", "\n", "To run distributed Spark jobs on DC/OS using the BeakerX Spark UI, we need to change some of the default settings. Run the cell below and once the UI loads make the following changes:\n", "\n", "* Remove the setting for **spark.mesos.principal** by clicking the **X** next to it\n", "* Change the value for **spark.mesos.executor.docker.image** to **mesosphere/mesosphere-data-toolkit:1.0.0-1.0.0**\n", "* Click **Save**\n", "\n", "Once this is done, hit **Start** to launch a Spark cluster directly from this notebook. Depending on the specs of your cluster, this will take anywhere from a few seconds to a few minutes. Once it's ready, you'll see a star-shaped Spark logo on a green label button, along with additional buttons." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%spark" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The GUI provides useful metrics for working with Spark and allows you to track the progress of your jobs. It also allows you to easily create, save, and select multiple configurations. You can jump directly to the Spark UI by clicking the green Spark logo." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 5: Run a Spark Job\n", "\n", "Now, let's write the Spark job that builds a linear model for our dataset. First, we need to import its dependencies." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import org.apache.spark.ml.Pipeline\n", "import org.apache.spark.ml.evaluation.RegressionEvaluator\n", "import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}\n", "import org.apache.spark.ml.regression.GBTRegressor\n", "import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType, IntegerType}\n", "import org.apache.spark.sql.{Encoders, SparkSession}\n", "import org.apache.spark.ml.classification.LogisticRegression\n", "import scala.math.{ pow, sqrt }" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we need to load the data from HDFS that was generated previously." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "// Create a schema\n", "val schemaStruct = StructType(\n", " StructField(\"x1\", DoubleType) ::\n", " StructField(\"x2\", DoubleType) ::\n", " StructField(\"y\", IntegerType) :: Nil\n", " )\n", "\n", "// Use schema to read data from HDFS using default url\n", "val df = spark.read\n", " .option(\"header\", true)\n", " .schema(schemaStruct)\n", " .csv(\"hdfs://name-0-node.hdfs.autoip.dcos.thisdcos.directory:9001/somedata.csv\")\n", " .na.drop()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once this is complete, display the results with a table view using `df.display()`. This will trigger the Spark job and a progress bar will appear. You can click on the ellipses in the column headers for filtering, sorting, and other options." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df.display()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we’ll run a logistic regression and get results. The output of this cell is the model accuracy." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "// Break data into training and test\n", "val Array(trainingData, testData) = df.randomSplit(Array(0.8, 0.2))\n", "val labelColumn = \"y\"\n", "\n", "// Use the VectorAssembler to combine the x1, x2 columns into a single vector column\n", "val assembler = new VectorAssembler()\n", " .setInputCols(Array(\"x1\", \"x2\"))\n", " .setOutputCol(\"features\")\n", "\n", "// logistic regression model\n", "val lr = new LogisticRegression()\n", " .setLabelCol(labelColumn)\n", " .setFeaturesCol(\"features\")\n", " .setPredictionCol(labelColumn + \"_pred\")\n", " .setMaxIter(100)\n", " .setRegParam(0.1)\n", " .setElasticNetParam(0.8)\n", "\n", "// Run simple pipeline\n", "val stages = Array(assembler, lr)\n", "val pipeline = new Pipeline().setStages(stages)\n", "val model = pipeline.fit(trainingData)\n", "\n", "// We'll make predictions using the model and the test data and get accuracy\n", "val predictions = model.transform(testData)\n", "val accuracy = predictions.filter(\"y == y_pred\").count() / predictions.count().toDouble" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The accuracy is only about 0.5 which is equivalent to flipping a coin, so our model isn't very good." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 6: Debug Model Performance\n", "\n", "Let's find out why our model performed so poorly by plotting the data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "// Create function to remove dimensionality of a list\n", "def flatten(l: List[Any]): List[Any] = {\n", " def _flatten(res: List[Any], rem: List[Any]):List[Any] = rem match {\n", " case Nil => res\n", " case (h:List[_])::Nil => _flatten(res, h)\n", " case (h:List[_])::tail => _flatten(res:::h, tail)\n", " case h::tail => _flatten(res:::List(h), tail)\n", " }\n", " _flatten(List(), l)\n", "}\n", "\n", "// Split positive and negative cases\n", "val dfPOS = df.filter(\"y == 1\")\n", "val dfNEG = df.filter(\"y == 0\")\n", "\n", "// Plot the first 300 records of positve and negative cases\n", "val plotPOSx1 = flatten(dfPOS.select(\"x1\").collect().map(_ (0)).toList).map(_.toString.toDouble)\n", "val plotPOSx2 = flatten(dfPOS.select(\"x2\").collect().map(_ (0)).toList).map(_.toString.toDouble)\n", "val plotNEGx1 = flatten(dfNEG.select(\"x1\").collect().map(_ (0)).toList).map(_.toString.toDouble)\n", "val plotNEGx2 = flatten(dfNEG.select(\"x2\").collect().map(_ (0)).toList).map(_.toString.toDouble)\n", "\n", "println(\" \")\n", "\n", "val plot = new Plot { title = \"Plot of raw data\" }\n", "\n", "val list = List(\n", " new Points {\n", " x = plotPOSx1.take(300)\n", " y = plotPOSx2.take(300)\n", " size = 5.0\n", " color = Color.red\n", " },\n", " new Points {\n", " x = plotNEGx1.take(300)\n", " y = plotNEGx2.take(300)\n", " size = 5.0\n", " color = Color.blue\n", " }\n", ")\n", "\n", "plot.add(list)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The data is plotted in a circle. So, it’s easy to see why a linear partition won't split the positive and negative cases.\n", "Using the Pythagorean Theorem $ a^2 + b^2 = c^2 $ we can map the data into something we can linearly split." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "// Transform data and replot\n", "val ax1 = plotPOSx1.take(300).map(pow(_,2))\n", "val ax2 = plotPOSx2.take(300).map(pow(_,2))\n", "val bx1 = plotNEGx1.take(300).map(pow(_,2))\n", "val bx2 = plotNEGx2.take(300).map(pow(_,2))\n", "val plot = new Plot { title = \"Plot of data with features squared\" }\n", "\n", "val list = List(\n", " new Points {\n", " x = ax1\n", " y = ax2\n", " size = 5.0\n", " color = Color.red\n", " },\n", " new Points {\n", " x = bx1\n", " y = bx2\n", " size = 5.0\n", " color = Color.blue\n", " }\n", ")\n", "\n", "plot.add(list)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This should be easy for a logistic regression to classify. Let's redo the model with the transformed data and output the accuracy." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "// w1 = x1^2, w2 = x2^2\n", "val dfT = df.withColumn(\"w1\", ((df.col(\"x1\")*df.col(\"x1\")))).withColumn(\"w2\", ((df.col(\"x2\")*df.col(\"x2\"))))\n", "val Array(trainingData, testData) = dfT.randomSplit(Array(0.8, 0.2))\n", "val labelColumn = \"y\"\n", "\n", "// Use the VectorAssembler to combine the w1, w2 columns into a single vector column\n", "val assembler = new VectorAssembler()\n", " .setInputCols(Array(\"w1\", \"w2\"))\n", " .setOutputCol(\"features\")\n", "\n", "// logistic regression model used above\n", "val lr = new LogisticRegression()\n", " .setLabelCol(labelColumn)\n", " .setFeaturesCol(\"features\")\n", " .setPredictionCol(labelColumn + \"_pred\")\n", " .setMaxIter(100)\n", " .setRegParam(0.1)\n", " .setElasticNetParam(0.8)\n", "\n", "// Same pipeline as above\n", "val stages = Array(assembler, lr)\n", "val pipeline = new Pipeline().setStages(stages)\n", "val model = pipeline.fit(trainingData)\n", "\n", "// We'll make predictions using the model and the test data\n", "val predictions = model.transform(testData)\n", "val accuracy = predictions.filter(\"y == y_pred\").count() / predictions.count().toDouble" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As expected, the model works much better, with accuracy being close to 1.0.\n", "\n", "But accuracy is only one useful metric. A confusion matrix will give us more details on how our model performed. You can easily create one using scikit-learn, which is a Python library included in the Jupyter package. We can use the BeakerX Autotranslation feature again to pass the data to Python." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "val evalData = predictions.select(\"y\", \"y_pred\").head(predictions.count().toInt)\n", "beakerx.evalData = evalData\n", "println(\"Data moved to BeakerX\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now the data is accessible from Python and we can build and plot our confusion matrix using scikit-learn." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%python\n", "from beakerx.object import beakerx\n", "import pandas as pd\n", "import matplotlib.pyplot as plt\n", "import seaborn as sns\n", "from sklearn.metrics import confusion_matrix, classification_report\n", "\n", "dfEval = pd.DataFrame(' '.join(beakerx.evalData).replace('[','').replace(']','').split())[0].str.split(',', expand = True)\n", "y_true = pd.to_numeric(dfEval[0], downcast='integer')\n", "y_pred = pd.to_numeric(dfEval[1], downcast='integer')\n", "\n", "# Print a text report of precision, recall, f1-score, support\n", "print(classification_report(y_true, y_pred))\n", "\n", "# Plot the confusion matrix\n", "cm = confusion_matrix(y_true, y_pred)\n", "\n", "ax = plt.subplot()\n", "sns.heatmap(cm, annot=True, ax=ax, fmt='g')\n", "ax.set_xlabel(\"Predicted labels\")\n", "ax.set_ylabel(\"True labels\")\n", "ax.set_title(\"Confusion Matrix\")\n", "ax" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## What Did We Learn Today?\n", "\n", "In this tutorial, we learned how to create an interactive UI within a notebook to read input parameters from a user. We then launched Spark jobs and tracked their progress using the BeakerX Spark UI without leaving the notebook environment. Lastly, we used multiple programming languages in a single notebook and easily synchronized data between them using BeakerX Autotranslation." ] } ], "metadata": { "kernelspec": { "display_name": "Scala", "language": "scala", "name": "scala" }, "language_info": { "codemirror_mode": "text/x-scala", "file_extension": ".scala", "mimetype": "", "name": "Scala", "nbconverter_exporter": "", "version": "2.11.12" } }, "nbformat": 4, "nbformat_minor": 2 }