{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# PySpark workaround for ML with `sparklyr`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This notebook stems from [this one](./sparklyr_test2.ipynb) where we realized there's no method to `unnest` columns in `sparklyr`!. \n", "Fortunately here comes [PySpark](https://spark.apache.org/docs/0.9.0/python-programming-guide.html) to help us. \n", "The following commands are 'forked' from this great tutorial: Sentiment analysis with Spark ML. [Material for Machine Learning Workshop Galicia 2016](http://nbviewer.jupyter.org/github/javicacheiro/machine_learning_galicia_2016/blob/master/notebooks/sentiment_analysis-amazon_books.ipynb). \n", "We import our data as a **Spark dataframe**:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "pyspark.sql.context.HiveContext" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "type(sqlContext)" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": true }, "outputs": [], "source": [ "bin_reviews = sqlContext.read.json('amazon/bin_reviews.json')" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- asin: string (nullable = true)\n", " |-- helpful: array (nullable = true)\n", " | |-- element: long (containsNull = true)\n", " |-- label: double (nullable = true)\n", " |-- overall: double (nullable = true)\n", " |-- reviewText: string (nullable = true)\n", " |-- reviewTime: string (nullable = true)\n", " |-- reviewerID: string (nullable = true)\n", " |-- reviewerName: string (nullable = true)\n", " |-- summary: string (nullable = true)\n", " |-- unixReviewTime: long (nullable = true)\n", "\n" ] } ], "source": [ "bin_reviews.printSchema()" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+-------+-----+\n", "| reviewText|overall|label|\n", "+--------------------+-------+-----+\n", "|Spiritually and m...| 5.0| 1.0|\n", "|This is one my mu...| 5.0| 1.0|\n", "+--------------------+-------+-----+\n", "only showing top 2 rows\n", "\n" ] } ], "source": [ "select_reviews = bin_reviews.select('reviewText', 'overall', 'label')\n", "select_reviews.show(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Tokenizer" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": true }, "outputs": [], "source": [ "from pyspark.ml.feature import Tokenizer\n", "tokenizer = Tokenizer(inputCol=\"reviewText\", outputCol=\"words\")" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+-------+-----+--------------------+\n", "| reviewText|overall|label| words|\n", "+--------------------+-------+-----+--------------------+\n", "|Spiritually and m...| 5.0| 1.0|[spiritually, and...|\n", "|This is one my mu...| 5.0| 1.0|[this, is, one, m...|\n", "+--------------------+-------+-----+--------------------+\n", "only showing top 2 rows\n", "\n" ] } ], "source": [ "tokenized_reviews = tokenizer.transform(select_reviews)\n", "tokenized_reviews.show(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## StopWordsRemover" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "collapsed": true }, "outputs": [], "source": [ "from pyspark.ml.feature import StopWordsRemover\n", "remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol=\"filtered\")" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+-------+-----+--------------------+--------------------+\n", "| reviewText|overall|label| words| filtered|\n", "+--------------------+-------+-----+--------------------+--------------------+\n", "|Spiritually and m...| 5.0| 1.0|[spiritually, and...|[spiritually, men...|\n", "|This is one my mu...| 5.0| 1.0|[this, is, one, m...|[books., masterpi...|\n", "+--------------------+-------+-----+--------------------+--------------------+\n", "only showing top 2 rows\n", "\n", "[u'spiritually', u'and', u'mentally', u'inspiring!', u'a', u'book', u'that', u'allows', u'you', u'to']\n", "[u'spiritually', u'mentally', u'inspiring!', u'book', u'allows', u'question', u'morals', u'help', u'discover', u'really']\n" ] } ], "source": [ "removed_reviews = remover.transform(tokenized_reviews)\n", "removed_reviews.show(2)\n", "sample_review = removed_reviews.first()\n", "print sample_review['words'][:10]\n", "print sample_review['filtered'][:10]" ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "collapsed": true }, "outputs": [], "source": [ "from pyspark.sql.functions import split, explode\n", "unnested_reviews = removed_reviews.select('overall', 'label', explode(\"filtered\").alias(\"word\"))" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+-----+-----------+\n", "|overall|label| word|\n", "+-------+-----+-----------+\n", "| 5.0| 1.0|spiritually|\n", "| 5.0| 1.0| mentally|\n", "| 5.0| 1.0| inspiring!|\n", "| 5.0| 1.0| book|\n", "| 5.0| 1.0| allows|\n", "+-------+-----+-----------+\n", "only showing top 5 rows\n", "\n" ] } ], "source": [ "unnested_reviews.show(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We save our dataframe for further use in our small `sparklyr` pipeline. \n", "It will take a good load of time to save, so be patient! " ] }, { "cell_type": "code", "execution_count": 21, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# unnested_reviews.write.json('unnested_reviews_json')\n", "unnested_reviews.write.save('amazon/unnested_reviews_json', format='json', mode='overwrite')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Return to the [sparklyr notebook](./sparklyr_test2.ipynb) to follow the pipeline!." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## References" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- [PySpark Programming Guide](https://spark.apache.org/docs/0.9.0/python-programming-guide.html).\n", "- [PySpark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf).\n", "- [Material for Machine Learning Workshop Galicia 2016](http://nbviewer.jupyter.org/github/javicacheiro/machine_learning_galicia_2016/blob/master/notebooks/sentiment_analysis-amazon_books.ipynb).\n", "- [PySpark Course](https://github.com/javicacheiro/pyspark_course)." ] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.12" } }, "nbformat": 4, "nbformat_minor": 1 }