{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Spark\n", "\n", "Before you turn this problem in, make sure everything runs as expected. First, restart the kernel (in the menubar, select Kernel → Restart) and then run all cells (in the menubar, select Cell → Run All). You can speak with others regarding the assignment but all work must be your own. \n", "\n", "\n", "### This is a 20 point assignment graded from answers to questions. \n", "\n", "\n", "#![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png) + ![Python Logo](http://spark-mooc.github.io/web-assets/images/python-logo-master-v3-TM-flattened_small.png)\n", "# **Word Count Lab: Building a word count application**\n", "#### This lab will build on the techniques covered in the Spark tutorial to develop a simple word count application. The volume of unstructured text in existence is growing dramatically, and Spark is an excellent tool for analyzing this type of data. \n", "#### *Part 1:* Creating a base RDD and pair RDDs\n", "#### *Part 2:* Counting with pair RDDs\n", "#### *Part 3:* Finding unique words and a mean value\n", "\n", "#### Note that, for reference, you can look up the details of the relevant methods in [Spark's Python API](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Don't need to do this if running on databricks.\n", "import pyspark\n", "#sc = pyspark.SparkContext('local[*]')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### ** Part 1: Creating a base RDD and pair RDDs **" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### In this part of the lab, we will explore creating a base RDD with `parallelize` and using pair RDDs to count words." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (1a) Create a base RDD **\n", "#### We'll start by generating a base RDD by using a Python list and the `sc.parallelize` method. Then we'll print out the type of the base RDD." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']\n", "wordsRDD = sc.parallelize(wordsList, 4)\n", "# Print out the type of wordsRDD\n", "print (type(wordsRDD))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### What does it mean that the list is an RDD? What special operations does this enable and how is it different from a dataset? \n", "#\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (1b) Pluralize and test **\n", "#### Let's use a `map()` transformation to add the letter 's' to each string in the base RDD we just created. We'll define a Python function that returns the word with an 's' at the end of the word. Please replace `` with your solution. If you have trouble, the next cell has the solution. After you have defined `makePlural` you can run the third cell which contains a test. If you implementation is correct it will print `1 test passed`.\n", "#### This is the general form that exercises will take, except that no example solution will be provided. Exercises will include an explanation of what is expected, followed by code cells where one cell will have one or more `` sections. The cell that needs to be modified will have `# TODO: Replace with appropriate code` on its first line. Once the `` sections are updated and the code is run, the test cell can then be run to verify the correctness of your solution. The last code cell before the next markdown section will contain the tests." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# TODO: Replace with appropriate code\n", "def makePlural(word):\n", " \"\"\"Adds an 's' to `word`.\n", "\n", " Note:\n", " This is a simple function that only adds an 's'. No attempt is made to follow proper\n", " pluralization rules.\n", "\n", " Args:\n", " word (str): A string.\n", "\n", " Returns:\n", " str: A string with 's' added to it.\n", " \"\"\"\n", " return \n", "\n", "makePlural('cat')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (1c) Apply `makePlural` to the base RDD **\n", "#### Now pass each item in the base RDD into a [map()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.map) transformation that applies the `makePlural()` function to each element. And then call the [collect()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.collect) action to see the transformed RDD." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# TODO: Replace with appropriate code\n", "pluralRDD = wordsRDD.map()\n", "print(pluralRDD.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (1d) Pass a `lambda` function to `map` **\n", "#### Let's create the same RDD using a `lambda` function.\n", "Let's remember that a lampda function \n", "\n", "lambda x: x + 1" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# TODO: Replace with appropriate code\n", "pluralLambdaRDD = wordsRDD.map()\n", "print(pluralLambdaRDD.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (1e) Length of each word **\n", "#### Now use `map()` and a `lambda` function to return the number of characters in each word. You can do this with the length function. We'll `collect` this result directly into a variable." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# TODO: Replace with appropriate code\n", "pluralLengths = (pluralRDD\n", " .map(lambda w: )\n", " .collect())\n", "print(pluralLengths)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (1f) Pair RDDs **\n", "#### The next step in writing our word counting program is to create a new type of RDD, called a pair RDD. A pair RDD is an RDD where each element is a pair tuple `(k, v)` where `k` is the key and `v` is the value. In this example, we will create a pair consisting of `('', 1)` for each word element in the RDD.\n", "#### We can create the pair RDD using the `map()` transformation with a `lambda()` function to create a new RDD." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# TODO: Replace with appropriate code\n", "wordPairs = wordsRDD.map( w: (w, 1))\n", "print(wordPairs.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### ** Part 2: Counting with pair RDDs **" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Now, let's count the number of times a particular word appears in the RDD. There are multiple ways to perform the counting, but some are much less efficient than others.\n", "#### A naive approach would be to `collect()` all of the elements and count them in the driver program. While this approach could work for small datasets, we want an approach that will work for any size dataset including terabyte- or petabyte-sized datasets. In addition, performing all of the work in the driver program is slower than performing it in parallel in the workers. For these reasons, we will use data parallel operations." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (2a) `groupByKey()` approach **\n", "#### An approach you might first consider (we'll see shortly that there are better ways) is based on using the [groupByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.groupByKey) transformation. As the name implies, the `groupByKey()` transformation groups all the elements of the RDD with the same key into a single list in one of the partitions. There are two problems with using `groupByKey()`:\n", " + #### The operation requires a lot of data movement to move all the values into the appropriate partitions.\n", " + #### The lists can be very large. Consider a word count of English Wikipedia: the lists for common words (e.g., the, a, etc.) would be huge and could exhaust the available memory in a worker.\n", " \n", "#### Use `groupByKey()` to generate a pair RDD of type `('word', iterator)`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# TODO: Replace with appropriate code\n", "# Note that groupByKey requires no parameters\n", " = wordPairs.groupByKey()\n", "for key, value in wordsGrouped.collect():\n", " print('{0}: {1}'.format(key, list(value)))\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (2b) Use `groupByKey()` to obtain the counts **\n", "#### Using the `groupByKey()` transformation creates an RDD containing 3 elements, each of which is a pair of a word and a Python iterator.\n", "#### Now sum the iterator using a `map()` transformation. The result should be a pair RDD consisting of (word, count) pairs." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# TODO: Replace with appropriate code\n", "wordCountsGrouped = wordsGrouped.map( kv: (kv[0], len(kv[1])))\n", "print(wordCountsGrouped.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (2c) Counting using `reduceByKey` **\n", "#### A better approach is to start from the pair RDD and then use the [reduceByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey) transformation to create a new pair RDD. The `reduceByKey()` transformation gathers together pairs that have the same key and applies the function provided to two values at a time, iteratively reducing all of the values to a single value. `reduceByKey()` operates by applying the function first within each partition on a per-key basis and then across the partitions, allowing it to scale efficiently to large datasets." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# TODO: Replace with appropriate code\n", "# Note that reduceByKey takes in a function that accepts two values and returns a single value\n", "wordCounts = wordPairs.reduceByKey(lambda a,b : a + b)\n", "print(wordCounts.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (2d) All together **\n", "#### The expert version of the code performs the `map()` to pair RDD, `reduceByKey()` transformation, and `collect` in one statement." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# TODO: Replace with appropriate code\n", "wordCountsCollected = (wordsRDD\n", " .map(lambda w: (w,1))\n", " .reduceByKey(lambda a, b : a + b)\n", " .collect())\n", "print()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### ** Part 3: Finding unique words and a mean value **" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "#See Unique words. \n", "uniqueWords = wordsRDD.distinct().count()\n", "print (uniqueWords)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (3a) Mean using `reduce` **\n", "#### Find the mean number of words per unique word in `wordCounts`.\n", "#### Use a `reduce()` action to sum the counts in `wordCounts` and then divide by the number of unique words. First `map()` the pair RDD `wordCounts`, which consists of (key, value) pairs, to an RDD of values." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# TODO: Replace with appropriate code\n", "from operator import add\n", "totalCount = (wordCounts\n", " .map(lambda kv: kv[0])\n", " .count())\n", "average = totalCount / float(uniqueWords)\n", "print (totalCount)\n", "print (round(average, 2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### ** Part 4: Apply word count to a file **" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### In this section we will finish developing our word count application. We'll have to build the `wordCount` function, deal with real world problems like capitalization and punctuation, load in our data source, and compute the word count on the new data." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (4a) `wordCount` function **\n", "#### First, define a function for word counting. You should reuse the techniques that have been covered in earlier parts of this lab. This function should take in an RDD that is a list of words like `wordsRDD` and return a pair RDD that has all of the words and their associated counts." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# TODO: Replace with appropriate code\n", "def wordCount(wordListRDD):\n", " \"\"\"Creates a pair RDD with word counts from an RDD of words.\n", "\n", " Args:\n", " wordListRDD (RDD of str): An RDD consisting of words.\n", "\n", " Returns:\n", " RDD of (str, int): An RDD consisting of (word, count) tuples.\n", " \"\"\"\n", " return wordListRDD.map(lambda w: (w, 1)).reduceByKey(lambda a, b: a + b)\n", " \n", "print (wordCount(wordsRDD).collect())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "anaconda-cloud": {}, "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.3" }, "name": "ML_lab2_word_count_student", "notebookId": 799244247729459 }, "nbformat": 4, "nbformat_minor": 2 }