{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Translated Beginners Guide to ML with Apache Spark\n", "==================================================\n", "\n", "This is a translation into PySpark of the [*Beginners Guide: Apache Spark Machine Learning with Large Data*](http://www.kdnuggets.com/2015/11/petrov-apache-spark-machine-learning-large-data.html) tutorial by Dmitry Petrov from KDNuggets. I'm not going to provide a lot of commentary, I'll refer you back to the original tutorial for that, but I will provide page numbers to make it easier to follow along. Since this was an exercise to familiarize myself with PySpark, there may be some awkward bits; please let me know if you find such.\n", "\n", "First a few preliminaries: I used the Hortonworks sandbox running on VMware, although I'm sure VirtualBox would work as well. I set up Jupyter notebook according these nice [instructions](http://simnotes.github.io) by Simon Streubel. I had to bump up the executor and driver memory of the PySpark using the instructions in the answer to [this](http://stackoverflow.com/questions/32336915/pyspark-java-lang-outofmemoryerror-java-heap-space) question on StackExchange. `spark-defaults.conf` was located in `/etc/spark/conf` on the Hortonworks sandbox VM. Of course, if you have a working PySpark installation, you can probably ignore most of the above.\n", "\n", "In the linked tutorial, a large and a small version of the `Posts.xml` file can be used. I used the small version from [here](https://www.dropbox.com/s/n2skgloqoadpa30/Posts.small.xml?dl=0) since I wasn't running the example on a particularly high powered machine. That file is assumed to be located at `/root/Posts.small.xml`, since the Hortonworks sandbox seems to want you to run as root for some reason (ugh!). If you have a different PySpark installation, you'll almost certainly want to change that location.\n", "\n", "So, without further ado, let the translation begin:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "[Page 2](http://www.kdnuggets.com/2015/11/petrov-apache-spark-machine-learning-large-data.html/2): **4 – Importing Libraries** " ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": true }, "outputs": [], "source": [ "import re\n", "from lxml import etree\n", "from pyspark.sql import Row\n", "from pyspark.sql.types import StructType, StructField, StringType, DoubleType\n", "from pyspark.sql.functions import udf, col\n", "from pyspark.ml import Pipeline\n", "from pyspark.ml.feature import Tokenizer, HashingTF\n", "from pyspark.ml.classification import LogisticRegression\n", "from pyspark.mllib.evaluation import BinaryClassificationMetrics" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "[Page 2](http://www.kdnuggets.com/2015/11/petrov-apache-spark-machine-learning-large-data.html/2): **5 – Parsing XML** " ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "[u'']" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "fileName = \"file:/root/Posts.small.xml\"\n", "textFile = sc.textFile(fileName)\n", "postsXml = (textFile.map(lambda x: x.strip())\n", " .filter(lambda x: not x.startswith(\"\")\n", " .filter(lambda x: x != \"\"))\n", "# Look at the first filtered row of XML\n", "postsXml.collect()[:1]" ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[, When setting a form's opacity should I use a decimal or double? I want to use a track-bar to change a form's opacity. This is my code: decimal trans = trackBar1.Value / 5000; this.Opacity = trans; When I try to build it, I get this error: Cannot implicitly convert type 'decimal' to 'double'. I tried making a , but then the control doesn't work. This code has worked fine for me in VB.NET in the past. )>]\n" ] } ], "source": [ "junk = re.compile(r\"<\\S+>\")\n", "extra_space = re.compile(r\"( )+\")\n", "\n", "def make_row(s):\n", " root = etree.fromstring(s)\n", " id = root.get(\"Id\", \"\")\n", " tags = root.get(\"Tags\", \"\")\n", " title = root.get(\"Title\", \"\")\n", " bodyPlain = junk.sub(\" \", root.get(\"Body\", \"\"))\n", " text = extra_space.sub(\" \", (title + bodyPlain.replace(\"\\n\", \" \")))\n", " return Row(id, tags, text)\n", "\n", "postsRdd = postsXml.map( make_row )\n", "# Look at the first row of postsRDD\n", "print(postsRdd.collect()[:1])" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--+--------------------+--------------------+\n", "|Id| Tags| Text|\n", "+--+--------------------+--------------------+\n", "| 4|...|Why doesn't the p...|\n", "| 7| | An explicit cast...|\n", "| 9|<.net>|How do I calculat...|\n", "|11| |How do I calculat...|\n", "|12| | Well, here's how...|\n", "|13||Difference betwee...|\n", "|16|...|Binary Data in My...|\n", "|18| | For a table like...|\n", "|19|...|Throw an error in...|\n", "|25|...|How to use the C ...|\n", "|26| | The answer by is...|\n", "|27| | \r", " \r", " IMHO yours s...|\n", "|29| | There are no HTT...|\n", "|30| | I've had no trou...|\n", "+--+--------------------+--------------------+\n", "\n" ] } ], "source": [ "schemaString = \"Id Tags Text\"\n", "schema = StructType( [StructField(x, StringType(), True) for x in schemaString.split(\" \")] )\n", "postsDf = sqlContext.createDataFrame(postsRdd, schema)\n", "# Now take a look at the data frame\n", "postsDf.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "[Page 3](http://www.kdnuggets.com/2015/11/petrov-apache-spark-machine-learning-large-data.html/3): **6 – Preparing training and testing datasets** " ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "collapsed": false }, "outputs": [], "source": [ "targetTag = \"java\"\n", "sqlfunc = udf(lambda x : 1.0 if (targetTag in x) else 0.0, DoubleType())\n", "postsLabelled = postsDf.withColumn(\"Label\", sqlfunc(postsDf.Tags))\n", "\n", "positive = postsLabelled.filter(postsLabelled.Label == 1.0)\n", "negative = postsLabelled.filter(postsLabelled.Label != 1.0)\n", "\n", "positiveTrain = positive.sample(False, 0.9)\n", "negativeTrain = negative.sample(False, 0.9)\n", "training = positiveTrain.unionAll(negativeTrain)\n", "\n", "\n", "negativeTrainTmp = negativeTrain.withColumnRenamed(\"Label\", \"Flag\").select('Id', 'Flag')\n", "negativeTest = (negative.join(negativeTrainTmp, \n", " negative[\"Id\"] == negativeTrainTmp[\"Id\"], \n", " \"LeftOuter\").filter(\"Flag is null\")\n", " .select(negative[\"Id\"], 'Tags', 'Text', 'Label'))\n", "\n", "positiveTrainTmp = (positiveTrain.withColumnRenamed(\"Label\", \"Flag\")\n", " .select('Id', 'Flag'))\n", "positiveTest = (positive.join(positiveTrainTmp, \n", " positive[\"Id\"] == positiveTrainTmp[\"Id\"], \n", " \"LeftOuter\").filter(\"Flag is null\")\n", " .select(positive[\"Id\"], 'Tags', 'Text', 'Label'))\n", "\n", "testing = negativeTest.unionAll(positiveTest)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "[Page 3](http://www.kdnuggets.com/2015/11/petrov-apache-spark-machine-learning-large-data.html/3): **7 – Training a model**" ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "PipelineModel_457ba677754b958da92e\n" ] } ], "source": [ "numFeatures = 64000\n", "numEpochs = 30\n", "regParam = 0.02\n", "\n", "tokenizer = Tokenizer(inputCol = \"Text\",\n", " outputCol = \"Words\")\n", "\n", "hashingTF = HashingTF(numFeatures = numFeatures,\n", " inputCol = tokenizer.getOutputCol(),\n", " outputCol = \"Features\")\n", "\n", "lr = LogisticRegression(maxIter = numEpochs,\n", " regParam = regParam,\n", " featuresCol = \"Features\",\n", " labelCol = \"Label\",\n", " # Despite appearing in the docs, rawPredictionCol was not available\n", " # on LogisticRegression in the version of PySpark I had. Perhaps,\n", " # my version was not up to date.\n", " #rawPredictionCol = \"Score\"\n", " predictionCol = \"Prediction\")\n", "\n", "pipeline = Pipeline(stages = [tokenizer, hashingTF, lr]) \n", "\n", "model = pipeline.fit(training)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "[Page 3](http://www.kdnuggets.com/2015/11/petrov-apache-spark-machine-learning-large-data.html/3): **8 – Testing a model** " ] }, { "cell_type": "code", "execution_count": 17, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Prediction: 0.0\n", "Area under the ROC: 0.601640398991\n" ] } ], "source": [ "# Predict the result for a single test case.\n", "\n", "testTitle = \"Easiest way to merge a release into one JAR file\"\n", "\n", "testBody = \"\"\"Is there a tool or script which easily merges a bunch \n", " of href=\"http://en.wikipedia.org/wiki/JAR_%28file_format\n", " %29\" JAR files into one JAR file? A bonus would be to \n", " easily set the main-file manifest and make it executable.\n", " I would like to run it with something like: As far as I \n", " can tell, it has no dependencies which indicates that it \n", " shouldn't be an easy single-file tool, but the downloaded\n", " ZIP file contains a lot of libraries.\"\"\"\n", "\n", "testText = testTitle + testBody\n", "\n", "testDF = sqlContext.createDataFrame([Row(Label=99.0, Text=testText)])\n", "\n", "result = model.transform(testDF)\n", "\n", "prediction = result.collect()[0][6]\n", "\n", "print(\"Prediction: {0}\".format(prediction))\n", "\n", "# Evaluate the quality of the model based on training dataset. \n", "\n", "testingResult = model.transform(testing)\n", "\n", "testingResultScores = (testingResult.select(\"Prediction\", \"Label\").rdd\n", " .map(lambda r: (float(r[0]), float(r[1]))))\n", "\n", "bc = BinaryClassificationMetrics(testingResultScores)\n", "print(\"Area under the ROC: {0}\".format(bc.areaUnderROC))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Thats it. I hope you found this translation helpful. " ] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.5" } }, "nbformat": 4, "nbformat_minor": 0 }