{ "cells": [ { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "[![AnalyticsDojo](https://s3.amazonaws.com/analyticsdojo/logo/final-logo.png)](http://rpi.analyticsdojo.com)\n", "

Introduction to Spark

\n", "

rpi.analyticsdojo.com

" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "Adopted from work by Steve Phelps:\n", "https://github.com/phelps-sg/python-bigdata \n", "This work is licensed under the Creative Commons Attribution 4.0 International license agreement.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Reference\n", "- [Spark Documentation](http://spark.apache.org/docs/latest/)\n", "- [Spark Programming Guide](http://spark.apache.org/docs/latest/programming-guide.html)\n", "- [DataBricks Login](https://community.cloud.databricks.com)\n", "- [Pyspark](https://github.com/jupyter/docker-stacks)\n", "- [Conda](\n", "\n", "```conda install -c anaconda-cluster/label/dev spark\n", " conda install -c conda-forge pyspark\n", "```" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "\n", "### Overview\n", "- History\n", "- Data Structures\n", "- Using Apache Spark with Python\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## History\n", "\n", "- Apache Spark was first released in 2014. \n", "\n", "- It was originally developed by [Matei Zaharia](http://people.csail.mit.edu/matei) as a class project, and later a PhD dissertation, at University of California, Berkeley.\n", "\n", "- In contrast to Hadoop, Apache Spark:\n", "\n", " - is easy to install and configure.\n", " - provides a much more natural *iterative* workflow \n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Resilient Distributed Datasets (RDD)\n", "\n", "- The fundamental abstraction of Apache Spark is a read-only, parallel, distributed, fault-tolerent collection called a resilient distributed datasets (RDD).\n", "\n", "- When working with Apache Spark we iteratively apply functions to every elelement of these collections in parallel to produce *new* RDDs.\n", "\n", "- For the most part, you can think/use RDDs like distributed dataframes. \n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "## Resilient Distributed Datasets (RDD)\n", "\n", "- Properties resilient distributed datasets (RDDs):\n", " - The data is distributed across nodes in a cluster of computers.\n", " - No data is lost if a single node fails.\n", " - Data is typically stored in HBase tables, or HDFS files.\n", " - The `map` and `reduce` functions can work in *parallel* across\n", " different keys, or different elements of the collection.\n", "\n", "- The underlying framework (e.g. Hadoop or Apache Spark) allocates data and processing to different nodes, without any intervention from the programmer." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Word Count Example\n", "\n", "- In this simple example, the input is a set of URLs, each record is a document.


\n", "\n", "- **Problem: Compute how many times each word has occurred across data set.**" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Word Count: Map \n", "\n", "\n", "The input to $\\operatorname{map}$ is a mapping:\n", "- Key: URL\n", "- Value: Contents of document
\n", "$\\left< document1, to \\; be \\; or \\; not \\; to \\; be \\right>$ \n", " \n", "\n", "- In this example, our $\\operatorname{map}$ function will process a given URL, and produces a mapping:\n", "- So our original data-set will be transformed to:\n", " \n", " $\\left< to, 1 \\right>$\n", " $\\left< be, 1 \\right>$\n", " $\\left< or, 1 \\right>$\n", " $\\left< not, 1 \\right>$\n", " $\\left< to, 1 \\right>$\n", " $\\left< be, 1 \\right>$" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Word Count: Reduce\n", "\n", "\n", "- The reduce operation groups values according to their key, and then performs areduce on each key.\n", "\n", "- The collections are partitioned across different storage units, therefore.\n", "\n", "- Map-Reduce will fold the data in such a way that it minimises data-copying across the cluster.\n", "\n", "- Data in different partitions are reduced separately in parallel.\n", "\n", "- The final result is a reduce of the reduced data in each partition.\n", "\n", "- Therefore it is very important that our operator *is both commutative and associative*.\n", "\n", "- In our case the function is the `+` operator\n", "\n", " $\\left< be, 2 \\right>$ \n", " $\\left< not, 1 \\right>$ \n", " $\\left< or, 1 \\right>$ \n", " $\\left< to, 2 \\right>$ \n", " " ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Map-Reduce on a Cluster of Computers\n", "\n", "- The code we have written so far will *not* allow us to exploit parallelism from multiple computers in a [cluster](https://en.wikipedia.org/wiki/Computer_cluster).\n", "\n", "- Developing such a framework would be a very large software engineering project.\n", "\n", "- There are existing frameworks we can use:\n", " - [Apache Hadoop](https://hadoop.apache.org/)\n", " - [Apache Spark](https://spark.apache.org/)\n", " \n", "- This notebook covers Apache Spark." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Apache Spark\n", "\n", "- Apache Spark provides an object-oriented library for processing data on the cluster.\n", "\n", "- It provides objects which represent resilient distributed datasets (RDDs).\n", "\n", "- RDDs behave a bit like Python collections (e.g. lists).\n", "\n", "- However:\n", " - the underlying data is distributed across the nodes in the cluster, and\n", " - the collections are *immutable*." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Apache Spark and Map-Reduce\n", "\n", "- We process the data by using higher-order functions to map RDDs onto *new* RDDs. \n", "\n", "- Each instance of an RDD has at least two *methods* corresponding to the Map-Reduce workflow:\n", " - `map`\n", " - `reduceByKey`\n", " \n", "- These methods work in the same way as the corresponding functions we defined earlier to work with the standard Python collections. \n", "\n", "- There are also additional RDD methods in the Apache Spark API including ones for SQL.\n", " " ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Word-count in Apache Spark\n", "\n" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "slideshow": { "slide_type": "-" } }, "outputs": [ { "data": { "text/plain": [ "['to', 'be', 'or', 'not', 'to', 'be']" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "words = \"to be or not to be\".split()\n", "words" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "### The `SparkContext` class\n", "\n", "- When working with Apache Spark we invoke methods on an object which is an instance of the `pyspark.context.SparkContext` context.\n", "\n", "- Typically, (such as when running on DataBricks) an instance of this object will be created automatically for you and assigned to the variable `sc`.\n", "\n", "- The `parallelize` method in `SparkContext` can be used to turn any ordinary Python collection into an RDD; " ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "#Don't Execute this on Databricks\n", "#To be used if executing via docker\n", "import pyspark\n", "sc = pyspark.SparkContext('local[*]')" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:480" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "words_rdd = sc.parallelize(words)\n", "words_rdd" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "### Mapping an RDD\n", "\n", "- Now when we invoke the `map` or `reduceByKey` methods on `my_rdd` we can set up a parallel processing computation across the cluster." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "PythonRDD[1] at RDD at PythonRDD.scala:48" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "word_tuples_rdd = words_rdd.map(lambda x: (x, 1))\n", "word_tuples_rdd" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Collecting the RDD\n", "- Notice that we do not have a result yet.\n", "\n", "- The computation is not performed until we request the final result to be *collected*.\n", "\n", "- We do this by invoking the `collect()` method.\n", "\n", "- Be careful with the `collect` method, as all data you are collecting must fit in memory. \n", "\n", "- The `take` method is similar to `collect`, but only returns the first $n$ elements.\n", " " ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('to', 1), ('be', 1), ('or', 1), ('not', 1), ('to', 1), ('be', 1)]" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "word_tuples_rdd.collect()" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('to', 1), ('be', 1), ('or', 1), ('not', 1)]" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "word_tuples_rdd.take(4)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "### Reducing an RDD\n", "\n", "- However, we require additional processing to reduce the data using the word key. " ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "PythonRDD[8] at RDD at PythonRDD.scala:48" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "word_counts_rdd = word_tuples_rdd.reduceByKey(lambda x, y: x + y)\n", "word_counts_rdd" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- Now we request the final result:" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('to', 2), ('be', 2), ('or', 1), ('not', 1)]" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "word_counts = word_counts_rdd.collect()\n", "word_counts" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "### Lazy evaluation \n", "\n", "- It is only when we invoke `collect()` that the processing is performed on the cluster.\n", "\n", "- Invoking `collect()` will cause both the `map` and `reduceByKey` operations to be performed.\n", "\n", "- If the resulting collection is very large then this can be an expensive operation.\n" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('to', 2), ('be', 2)]" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "word_counts_rdd.take(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Connecting MapReduce in Single Command\n", "- Can string together `map` and `reduce` commands.\n", "- Not executed until it is collected." ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "slideshow": { "slide_type": "subslide" } }, "outputs": [ { "data": { "text/plain": [ "[('to', 2), ('be', 2), ('or', 1), ('not', 1)]" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "text = \"to be or not to be\".split()\n", "rdd = sc.parallelize(text)\n", "counts = rdd.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)\n", "counts.collect()" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Additional RDD transformations\n", "\n", "- Apache Spark offers many more methods for operating on collections of tuples over and above the standard Map-Reduce framework:\n", "\n", " - Sorting: `sortByKey`, `sortBy`, `takeOrdered`\n", " - Mapping: `flatMap`\n", " - Filtering: `filter`\n", " - Counting: `count`\n", " - Set-theoretic: `intersection`, `union`\n", " - Many others: [see the Transformations section of the programming guide](https://spark.apache.org/docs/latest/programming-guide.html#transformations)\n", " " ] }, { "cell_type": "markdown", "metadata": { "collapsed": true, "slideshow": { "slide_type": "slide" } }, "source": [ "## Creating an RDD from a text file\n", "\n", "- In the previous example, we created an RDD from a Python collection.\n", "\n", "- This is *not* typically how we would work with big data.\n", "\n", "- More commonly we would create an RDD corresponding to data in an\n", "HBase table, or an HDFS file.\n", "\n", "- The following example creates an RDD from a text file on the native filesystem (ext4);\n", " - With bigger data, you would use an HDFS file, but the principle is the same.\n", "\n", "- Each element of the RDD corresponds to a single *line* of text." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "genome = sc.textFile('../input/iris.csv')" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Calculating $\\pi$ using Spark\n", "\n", "- We can estimate an approximate value for $\\pi$ using the following Monte-Carlo method:\n", "\n", "\n", "1. Inscribe a circle in a square\n", "2. Randomly generate points in the square\n", "3. Determine the number of points in the square that are also in the circle\n", "4. Let $r$ be the number of points in the circle divided by the number of points in the square, then $\\pi \\approx 4 r$.\n", " \n", "- Note that the more points generated, the better the approximation\n", "\n", "See [this tutorial](https://computing.llnl.gov/tutorials/parallel_comp/#ExamplesPI)." ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "0.784568" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import numpy as np\n", "\n", "def sample(p):\n", " #here x,y are the x,y coordinate\n", " x, y = np.random.random(), np.random.random()\n", " #Because the circle is of \n", " return 1 if x*x + y*y < 1 else 0\n", "\n", "NUM_SAMPLES = 1000000\n", "\n", "count = sc.parallelize(range(0, NUM_SAMPLES)).map(sample) \\\n", " .reduce(lambda a, b: a + b)\n", "#Area = 4*PI*r\n", "r = float(count) / float(NUM_SAMPLES)\n", "r\n", "#print (\"Pi is approximately %f\" % (4.0 * r))" ] } ], "metadata": { "anaconda-cloud": {}, "celltoolbar": "Slideshow", "kernelspec": { "display_name": "Python [conda root]", "language": "python", "name": "conda-root-py" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.2" } }, "nbformat": 4, "nbformat_minor": 1 }