{ "cells": [ { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "[![AnalyticsDojo](https://s3.amazonaws.com/analyticsdojo/logo/final-logo.png)](http://rpi.analyticsdojo.com)\n", "

Introduction to MapReduce

\n", "

rpi.analyticsdojo.com

" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "Adopted from work by Steve Phelps:\n", "https://github.com/phelps-sg/python-bigdata \n", "This work is licensed under the Creative Commons Attribution 4.0 International license agreement.\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "\n", "### Overview\n", "\n", "1. Recap of functional programming in Python\n", "2. Python's `map` and `reduce` functions\n", "3. Writing parallel code using `map`\n", "4. The Map-Reduce programming model\n", "\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## History\n", "\n", "- The Map-Reduce programming model was popularised by Google (Dean and Ghemawat 2008).\n", "\n", "- The first popular open-source implementation was Apache Hadoop, first released in 2011.\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Functional programming\n", "\n", "Consider the following code:" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "def double_everything_in(data):\n", " result = []\n", " for i in data:\n", " result.append(2 * i)\n", " return result\n", "\n", "def quadruple_everything_in(data):\n", " result = []\n", " for i in data:\n", " result.append(4 * i)\n", " return result" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[2, 4, 6, 8, 10]" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "double_everything_in([1, 2, 3, 4, 5])" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[4, 8, 12, 16, 20]" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "quadruple_everything_in([1, 2, 3, 4, 5])" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "### DRY - Fundamental Programming Concept\n", "\n", "- The above code violates the [\"do not repeat yourself\"](https://en.wikipedia.org/wiki/Don't_repeat_yourself_) principle of good software engineering practice.\n", "\n", "- How can rewrite the code so that it avoids duplication?" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "def multiply_by_x_everything_in(x, data):\n", " result = []\n", " for i in data:\n", " result.append(x * i)\n", " return result" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[2, 4, 6, 8, 10]" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "multiply_by_x_everything_in(2, [1, 2, 3, 4, 5])" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[4, 8, 12, 16, 20]" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "multiply_by_x_everything_in(4, [1, 2, 3, 4, 5])" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "- Now consider the following code:" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "def squared(x):\n", " return x*x\n", "\n", "def double(x):\n", " return x*2\n", "\n", "def square_everything_in(data):\n", " result = []\n", " for i in data:\n", " result.append(squared(i))\n", " return result\n", "\n", "def double_everything_in(data):\n", " result = []\n", " for i in data:\n", " result.append(double(i))\n", " return result" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[1, 4, 9, 16, 25]" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "square_everything_in([1, 2, 3, 4, 5])" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[2, 4, 6, 8, 10]" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "double_everything_in([1, 2, 3, 4, 5])" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "### DRY - Fundamental Programming Concept\n", "- The above code violates the [\"do not repeat yourself\"](https://en.wikipedia.org/wiki/Don't_repeat_yourself_) principle of good software engineering practice.\n", "\n", "- How can rewrite the code so that it avoids duplication?" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "### Passing Functions as Values\n", "- Functions can be passed to other functions as values.\n", "-\n" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "slideshow": { "slide_type": "-" } }, "outputs": [], "source": [ "def apply_f_to_everything_in(f, data):\n", " result = []\n", " for x in data:\n", " result.append(f(x))\n", " return result" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[1, 4, 9, 16, 25]" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "apply_f_to_everything_in(squared, [1, 2, 3, 4, 5])" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[2, 4, 6, 8, 10]" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "apply_f_to_everything_in(double, [1, 2, 3, 4, 5])" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "### Lambda expressions\n", "\n", "- We can use anonymous functions to save having to define a function each time we want to use map." ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[1, 4, 9, 16, 25]" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "apply_f_to_everything_in(lambda x: x*x, [1, 2, 3, 4, 5])" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "# Python's `map` function\n", "\n", "- Python has a built-in function `map` which is much faster than our version.\n", "\n" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "map(lambda x: x*x, [1, 2, 3, 4, 5])" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "## Implementing reduce\n", "\n", "- The `reduce` function is an example of a [fold](https://en.wikipedia.org/wiki/Fold_%28higher-order_function%29).\n", "\n", "- There are different ways we can fold data.\n", "\n", "- The following implements a *left* fold.\n" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "def foldl(f, data, z):\n", " if (len(data) == 0):\n", " print (z)\n", " return z\n", " else:\n", " head = data[0]\n", " tail = data[1:]\n", " print (\"Folding\", head, \"with\", tail, \"using\", z)\n", " partial_result = f(z, data[0])\n", " print (\"Partial result is\", partial_result)\n", " return foldl(f, tail, partial_result) " ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Folding 3 with [3, 3, 3, 3] using 0\n", "Partial result is 3\n", "Folding 3 with [3, 3, 3] using 3\n", "Partial result is 6\n", "Folding 3 with [3, 3] using 6\n", "Partial result is 9\n", "Folding 3 with [3] using 9\n", "Partial result is 12\n", "Folding 3 with [] using 12\n", "Partial result is 15\n", "15\n" ] }, { "data": { "text/plain": [ "15" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def add(x, y):\n", " return x + y\n", "\n", "foldl(add, [3, 3, 3, 3, 3], 0)" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Folding 1 with [2, 3, 4, 5] using 0\n", "Partial result is 1\n", "Folding 2 with [3, 4, 5] using 1\n", "Partial result is 3\n", "Folding 3 with [4, 5] using 3\n", "Partial result is 6\n", "Folding 4 with [5] using 6\n", "Partial result is 10\n", "Folding 5 with [] using 10\n", "Partial result is 15\n", "15\n" ] }, { "data": { "text/plain": [ "15" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "foldl(lambda x, y: x + y, [1, 2, 3, 4, 5], 0)" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Folding 1 with [2, 3, 4, 5] using 0\n", "Partial result is -1\n", "Folding 2 with [3, 4, 5] using -1\n", "Partial result is -3\n", "Folding 3 with [4, 5] using -3\n", "Partial result is -6\n", "Folding 4 with [5] using -6\n", "Partial result is -10\n", "Folding 5 with [] using -10\n", "Partial result is -15\n", "-15\n" ] }, { "data": { "text/plain": [ "-15" ] }, "execution_count": 19, "metadata": {}, "output_type": "execute_result" } ], "source": [ "foldl(lambda x, y: x - y, [1, 2, 3, 4, 5], 0)" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "-15" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "(((((0 - 1) - 2) - 3) - 4) - 5)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "- Subtraction is neither [commutative](https://en.wikipedia.org/wiki/Commutative_property) nor [associative](https://en.wikipedia.org/wiki/Associative_property), so the order in which apply the fold matters:" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "3" ] }, "execution_count": 21, "metadata": {}, "output_type": "execute_result" } ], "source": [ "(1 - (2 - (3 - (4 - (5 - 0)))))" ] }, { "cell_type": "code", "execution_count": 22, "metadata": { "slideshow": { "slide_type": "-" } }, "outputs": [], "source": [ "def foldr(f, data, z):\n", " if (len(data) == 0):\n", " return z\n", " else:\n", " return f(data[0], foldr(f, data[1:], z)) " ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Folding 1 with [2, 3, 4, 5] using 0\n", "Partial result is -1\n", "Folding 2 with [3, 4, 5] using -1\n", "Partial result is -3\n", "Folding 3 with [4, 5] using -3\n", "Partial result is -6\n", "Folding 4 with [5] using -6\n", "Partial result is -10\n", "Folding 5 with [] using -10\n", "Partial result is -15\n", "-15\n" ] }, { "data": { "text/plain": [ "-15" ] }, "execution_count": 23, "metadata": {}, "output_type": "execute_result" } ], "source": [ "foldl(lambda x, y: x - y, [1, 2, 3, 4, 5], 0)" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "3" ] }, "execution_count": 24, "metadata": {}, "output_type": "execute_result" } ], "source": [ "foldr(lambda x, y: x - y, [1, 2, 3, 4, 5], 0)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Python's `reduce` function.\n", "\n", "- Python's built-in `reduce` function is a *left* fold." ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "15" ] }, "execution_count": 25, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from functools import reduce\n", "reduce(lambda x, y: x + y, [1, 2, 3, 4, 5])" ] }, { "cell_type": "code", "execution_count": 26, "metadata": { "slideshow": { "slide_type": "subslide" } }, "outputs": [ { "data": { "text/plain": [ "-15" ] }, "execution_count": 26, "metadata": {}, "output_type": "execute_result" } ], "source": [ "reduce(lambda x, y: x - y, [1, 2, 3, 4, 5], 0)" ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Folding 1 with [2, 3, 4, 5] using 0\n", "Partial result is -1\n", "Folding 2 with [3, 4, 5] using -1\n", "Partial result is -3\n", "Folding 3 with [4, 5] using -3\n", "Partial result is -6\n", "Folding 4 with [5] using -6\n", "Partial result is -10\n", "Folding 5 with [] using -10\n", "Partial result is -15\n", "-15\n" ] }, { "data": { "text/plain": [ "-15" ] }, "execution_count": 27, "metadata": {}, "output_type": "execute_result" } ], "source": [ "foldl(lambda x, y: x - y, [1, 2, 3, 4, 5], 0)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "# Functional programming and parallelism\n", "\n", "- Functional programming lends itself to [parallel programming](https://computing.llnl.gov/tutorials/parallel_comp/#Models).\n", "\n", "- The `map` function can easily be parallelised through [data-level parallelism](https://en.wikipedia.org/wiki/Data_parallelism),\n", " - provided that the function we supply as an argument is *free from* [side-effects](https://en.wikipedia.org/wiki/Side_effect_%28computer_science%29)\n", " - (which is why we avoid working with mutable data).\n", "\n", "- We can see this by rewriting it so:\n" ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [], "source": [ "def perform_computation(f, result, data, i):\n", " print (\"Computing the \", i, \"th result...\")\n", " # This could be scheduled on a different CPU\n", " result[i] = f(data[i])\n", "\n", "def my_map(f, data):\n", " result = [None] * len(data)\n", " for i in range(len(data)):\n", " perform_computation(f, result, data, i)\n", " # Wait for other CPUs to finish, and then..\n", " return result" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Computing the 0 th result...\n", "Computing the 1 th result...\n", "Computing the 2 th result...\n", "Computing the 3 th result...\n", "Computing the 4 th result...\n" ] }, { "data": { "text/plain": [ "[1, 4, 9, 16, 25]" ] }, "execution_count": 30, "metadata": {}, "output_type": "execute_result" } ], "source": [ "my_map(lambda x: x * x, [1, 2, 3, 4, 5])" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## A multi-threaded `map` function" ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [], "source": [ "from threading import Thread\n", "\n", "def schedule_computation_threaded(f, result, data, threads, i): \n", " # Each function evaluation is scheduled on a different core.\n", " def my_job(): \n", " print (\"Processing data:\", data[i], \"... \")\n", " result[i] = f(data[i])\n", " print (\"Finished job #\", i) \n", " print (\"Result was\", result[i]) \n", " threads[i] = Thread(target=my_job)\n", " \n", "def my_map_multithreaded(f, data):\n", " n = len(data)\n", " result = [None] * n\n", " threads = [None] * n\n", " print (\"Scheduling jobs.. \")\n", " for i in range(n):\n", " schedule_computation_threaded(f, result, data, threads, i)\n", " print (\"Starting jobs.. \")\n", " for i in range(n):\n", " threads[i].start()\n", " print (\"Waiting for jobs to finish.. \")\n", " for i in range(n):\n", " threads[i].join()\n", " print (\"All done.\")\n", " return result" ] }, { "cell_type": "code", "execution_count": 32, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Scheduling jobs.. \n", "Starting jobs.. \n", "Processing data: 1 ... Processing data: 2 ... \n", "Finished job #\n", "Finished job # 1\n", "Result was 4\n", "Processing data: 3 0... \n", "Finished job # 2\n", "Result was Processing data:9 \n", "4 ... \n", "Finished job # 3\n", "Result was 16\n", "Waiting for jobs to finish.. Processing data: \n", "5 ... \n", "Finished job # \n", "Result was4\n", " 1Result was 25\n", "\n", "All done.\n" ] }, { "data": { "text/plain": [ "[1, 4, 9, 16, 25]" ] }, "execution_count": 32, "metadata": {}, "output_type": "execute_result" } ], "source": [ "my_map_multithreaded(lambda x: x*x, [1, 2, 3, 4, 5])" ] }, { "cell_type": "code", "execution_count": 33, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Scheduling jobs.. \n", "Starting jobs.. \n", "Processing data: 1 ... \n", "Processing data: 2 ... \n", "Processing data: 3 ... \n", "Processing data: 4 ... \n", "Processing data:Waiting for jobs to finish.. \n", " 5 ... \n", "Finished job # 3\n", "Result was 16\n", "Finished job # 0\n", "Result was 1\n", "Finished job # 1\n", "Result was 4\n", "Finished job # 4\n", "Result was 25\n", "Finished job #All done.\n", " " ] }, { "data": { "text/plain": [ "[1, 4, 9, 16, 25]" ] }, "execution_count": 33, "metadata": {}, "output_type": "execute_result" }, { "name": "stdout", "output_type": "stream", "text": [ "2\n", "Result was 9\n" ] } ], "source": [ "from numpy.random import uniform\n", "from time import sleep\n", "\n", "def a_function_which_takes_a_long_time(x):\n", " sleep(uniform(2, 10)) # Simulate some long computation\n", " return x*x\n", "\n", "my_map_multithreaded(a_function_which_takes_a_long_time, [1, 2, 3, 4, 5])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Map Reduce\n", "\n", "- Map Reduce is a _programming model_ for scalable parallel processing.\n", "- Scalable here means that it can work on big data with very large compute clusters.\n", "- There are many implementations: e.g. Apache Hadoop and Apache Spark.\n", "- We can use Map-Reduce with any programming language:\n", " - Hadoop is written in Java\n", " - Spark is written in Scala, but has a Python interface.\n", "- *Functional programming* languages such as Python or Scala fit very well with the Map Reduce model:\n", " - However, we don't *have* to use functional programming." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "- A MapReduce implementation will take care of the low-level functionality so that you don't have to worry about:\n", " - load balancing\n", " - network I/O\n", " - network and disk transfer optimisation\n", " - handling of machine failures\n", " - serialization of data\n", " - etc..\n", "- The model is designed to move the processing to where the data resides." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Typical steps in a Map Reduce Computation\n", "\n", "1. ETL a big data set.\n", "2. _Map_ operation: extract something you care about from each row\n", "3. \"Shuffle and Sort\": task/node allocation\n", "4. _Reduce_ operation: aggregate, summarise, filter or transform\n", "5. Write the results." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Callbacks for Map Reduce\n", "\n", "- The data set, and the state of each stage of the computation, is represented as a set of key-value pairs.\n", "\n", "- The programmer provides a map function:\n", "\n", "$\\operatorname{map}(k, v) \\rightarrow \\; \\left< k', v' \\right>*$ \n", "\n", "- and a reduce function:\n", "\n", "$\\operatorname{reduce}(k', \\left< k', v'\\right> *) \\rightarrow \\; \\left< k', v''\n", "\\right> *$\n", "\n", "- The $*$ refers to a *collection* of values.\n", "\n", "- These collections are *not* ordered." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Word Count Example\n", "\n", "- In this simple example, the input is a set of URLs, each record is a document.\n", "\n", "- Problem: compute how many times each word has occurred across data set." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Word Count: Map \n", "\n", "\n", "- The input to $\\operatorname{map}$ is a mapping:\n", "\n", "- Key: URL\n", "- Value: Contents of document" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "$\\left< document1, to \\; be \\; or \\; not \\; to \\; be \\right>$ \n", " \n", "\n", "- In this example, our $\\operatorname{map}$ function will process a given URL, and produces a mapping:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- Key: word\n", "- Value: 1\n", "\n", "- So our original data-set will be transformed to:\n", " \n", " $\\left< to, 1 \\right>$\n", " $\\left< be, 1 \\right>$\n", " $\\left< or, 1 \\right>$\n", " $\\left< not, 1 \\right>$\n", " $\\left< to, 1 \\right>$\n", " $\\left< be, 1 \\right>$" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Word Count: Reduce\n", "\n", "\n", "- The reduce operation groups values according to their key, and then performs areduce on each key.\n", "\n", "- The collections are partitioned across different storage units, therefore.\n", "\n", "- Map-Reduce will fold the data in such a way that it minimises data-copying across the cluster.\n", "\n", "- Data in different partitions are reduced separately in parallel.\n", "\n", "- The final result is a reduce of the reduced data in each partition.\n", "\n", "- Therefore it is very important that our operator *is both commutative and associative*.\n", "\n", "- In our case the function is the `+` operator\n", "\n", " $\\left< be, 2 \\right>$ \n", " $\\left< not, 1 \\right>$ \n", " $\\left< or, 1 \\right>$ \n", " $\\left< to, 2 \\right>$ \n", " " ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Map and Reduce compared with Python\n", "\n", "- Notice that these functions are formulated differently from the standard Python functions of the same name.\n", "\n", "- The `reduce` function works with key-value *pairs*.\n", "\n", "- It would be more apt to call it something like `reduceByKey`.\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## MiniMapReduce\n", "\n", "- To illustrate how the Map-Reduce programming model works, we can implement our own Map-Reduce framework in Python.\n", "\n", "- This *illustrates* how a problem can be written in terms of `map` and `reduce` operations.\n", "\n", "- Note that these are illustrative functions; this is *not* how Hadoop or Apache Spark actually implement them." ] }, { "cell_type": "code", "execution_count": 34, "metadata": { "slideshow": { "slide_type": "subslide" } }, "outputs": [], "source": [ "##########################################################\n", "#\n", "# MiniMapReduce\n", "#\n", "# A non-parallel, non-scalable Map-Reduce implementation\n", "##########################################################\n", "\n", "def groupByKey(data):\n", " result = dict()\n", " for key, value in data:\n", " if key in result:\n", " result[key].append(value)\n", " else:\n", " result[key] = [value]\n", " return result\n", " \n", "def reduceByKey(f, data):\n", " key_values = groupByKey(data)\n", " return map(lambda key: \n", " (key, reduce(f, key_values[key])), \n", " key_values)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Word-count using MiniMapReduce\n" ] }, { "cell_type": "code", "execution_count": 35, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 35, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data = map(lambda x: (x, 1), \"to be or not to be\".split())\n", "data" ] }, { "cell_type": "code", "execution_count": 36, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "{'be': [1, 1], 'not': [1], 'or': [1], 'to': [1, 1]}" ] }, "execution_count": 36, "metadata": {}, "output_type": "execute_result" } ], "source": [ "groupByKey(data)" ] }, { "cell_type": "code", "execution_count": 37, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 37, "metadata": {}, "output_type": "execute_result" } ], "source": [ "reduceByKey(lambda x, y: x + y, data)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Parallelising MiniMapReduce\n", "\n", "- We can easily turn our Map-Reduce implementation into a parallel, multi-threaded framework\n", "by using the `my_map_multithreaded` function we defined earlier.\n", "\n", "- This will allow us to perform map-reduce computations that exploit parallel processing using *multiple* cores on a *single* computer." ] }, { "cell_type": "code", "execution_count": 38, "metadata": {}, "outputs": [], "source": [ "def reduceByKey_multithreaded(f, data):\n", " key_values = groupByKey(data)\n", " return my_map_multithreaded(\n", " lambda key: (key, reduce(f, key_values[key])), key_values.keys())" ] }, { "cell_type": "code", "execution_count": 39, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Scheduling jobs.. \n", "Starting jobs.. \n", "Waiting for jobs to finish.. \n", "All done.\n" ] }, { "data": { "text/plain": [ "[]" ] }, "execution_count": 39, "metadata": {}, "output_type": "execute_result" } ], "source": [ "reduceByKey_multithreaded(lambda x, y: x + y, data)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "## Parallelising the reduce step\n", "\n", "- Provided that our operator is both associative and commutative we can\n", "also parallelise the reduce operation.\n", "\n", "- We partition the data into approximately equal subsets.\n", "\n", "- We then reduce each subset independently on a separate core.\n", "\n", "- The results can be combined in a final reduce step." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Partitioning the data" ] }, { "cell_type": "code", "execution_count": 40, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[['a', 'b', 'c'], ['d', 'e', 'f', 'g']]" ] }, "execution_count": 40, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def split_data(data, split_points):\n", " partitions = []\n", " n = 0\n", " for i in split_points:\n", " partitions.append(data[n:i])\n", " n = i\n", " partitions.append(data[n:])\n", " return partitions\n", "\n", "data = ['a', 'b', 'c', 'd', 'e', 'f', 'g']\n", "partitioned_data = split_data(data, [3])\n", "partitioned_data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Reducing across partitions in parallel" ] }, { "cell_type": "code", "execution_count": 41, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'abcdefg'" ] }, "execution_count": 41, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from threading import Thread\n", "\n", "def parallel_reduce(f, partitions):\n", "\n", " n = len(partitions)\n", " results = [None] * n\n", " threads = [None] * n\n", " \n", " def job(i):\n", " results[i] = reduce(f, partitions[i])\n", "\n", " for i in range(n):\n", " threads[i] = Thread(target = lambda: job(i))\n", " threads[i].start()\n", " \n", " for i in range(n):\n", " threads[i].join()\n", " \n", " return reduce(f, results)\n", "\n", "parallel_reduce(lambda x, y: x + y, partitioned_data)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Map-Reduce on a cluster of computers\n", "\n", "- The code we have written so far will *not* allow us to exploit parallelism from multiple computers in a [cluster](https://en.wikipedia.org/wiki/Computer_cluster).\n", "\n", "- Developing such a framework would be a very large software engineering project.\n", "\n", "- There are existing frameworks we can use:\n", " - [Apache Hadoop](https://hadoop.apache.org/)\n", " - [Apache Spark](https://spark.apache.org/)\n", " \n", "- In this lecture we will cover Apache Spark." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Apache Spark\n", "\n", "- Apache Spark provides an object-oriented library for processing data on the cluster.\n", "\n", "- It provides objects which represent resilient distributed datasets (RDDs).\n", "\n", "- RDDs behave a bit like Python collections (e.g. lists).\n", "\n", "- However:\n", " - the underlying data is distributed across the nodes in the cluster, and\n", " - the collections are *immutable*." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Apache Spark and Map-Reduce\n", "\n", "- We process the data by using higher-order functions to map RDDs onto *new* RDDs. \n", "\n", "- Each instance of an RDD has at least two *methods* corresponding to the Map-Reduce workflow:\n", " - `map`\n", " - `reduceByKey`\n", " \n", "- These methods work in the same way as the corresponding functions we defined earlier to work with the standard Python collections. \n", "\n", "- There are also additional RDD methods in the Apache Spark API;\n", " - Apache Spark is a *super-set* of Map-Reduce.\n", " " ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Word-count in Apache Spark\n", "\n" ] }, { "cell_type": "code", "execution_count": 42, "metadata": { "slideshow": { "slide_type": "-" } }, "outputs": [ { "data": { "text/plain": [ "['to', 'be', 'or', 'not', 'to', 'be']" ] }, "execution_count": 42, "metadata": {}, "output_type": "execute_result" } ], "source": [ "words = \"to be or not to be\".split()\n", "words" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "### The `SparkContext` class\n", "\n", "- When working with Apache Spark we invoke methods on an object which is an instance of the `pyspark.context.SparkContext` context.\n", "\n", "- Typically, an instance of this object will be created automatically for you and assigned to the variable `sc`.\n", "\n", "- The `parallelize` method in `SparkContext` can be used to turn any ordinary Python collection into an RDD;\n", " - normally we would create an RDD from a large file or an HBase table. " ] }, { "cell_type": "code", "execution_count": 43, "metadata": {}, "outputs": [], "source": [ "#Don't Execute this on Databricks\n", "#To be used if executing via docker\n", "#import pyspark\n", "#sc = pyspark.SparkContext('local[*]')" ] }, { "cell_type": "code", "execution_count": 44, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:480" ] }, "execution_count": 44, "metadata": {}, "output_type": "execute_result" } ], "source": [ "words_rdd = sc.parallelize(words)\n", "words_rdd" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "### Mapping an RDD\n", "\n", "- Now when we invoke the `map` or `reduceByKey` methods on `my_rdd` we can set up a parallel processing computation across the cluster." ] }, { "cell_type": "code", "execution_count": 45, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "PythonRDD[1] at RDD at PythonRDD.scala:48" ] }, "execution_count": 45, "metadata": {}, "output_type": "execute_result" } ], "source": [ "word_tuples_rdd = words_rdd.map(lambda x: (x, 1))\n", "word_tuples_rdd" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- Notice that we do not have a result yet.\n", "\n", "- The computation is not performed until we request the final result to be *collected*.\n", "\n", "- We do this by invoking the `collect()` method:" ] }, { "cell_type": "code", "execution_count": 46, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('to', 1), ('be', 1), ('or', 1), ('not', 1), ('to', 1), ('be', 1)]" ] }, "execution_count": 46, "metadata": {}, "output_type": "execute_result" } ], "source": [ "word_tuples_rdd.collect()" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "### Reducing an RDD\n", "\n", "- However, we require additional processing:" ] }, { "cell_type": "code", "execution_count": 47, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "PythonRDD[6] at RDD at PythonRDD.scala:48" ] }, "execution_count": 47, "metadata": {}, "output_type": "execute_result" } ], "source": [ "word_counts_rdd = word_tuples_rdd.reduceByKey(lambda x, y: x + y)\n", "word_counts_rdd" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- Now we request the final result:" ] }, { "cell_type": "code", "execution_count": 48, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('to', 2), ('be', 2), ('or', 1), ('not', 1)]" ] }, "execution_count": 48, "metadata": {}, "output_type": "execute_result" } ], "source": [ "word_counts = word_counts_rdd.collect()\n", "word_counts" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "### Lazy evaluation \n", "\n", "- It is only when we invoke `collect()` that the processing is performed on the cluster.\n", "\n", "- Invoking `collect()` will cause both the `map` and `reduceByKey` operations to be performed.\n", "\n", "- If the resulting collection is very large then this can be an expensive operation.\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "### The head of an RDD\n", "\n", "- The `take` method is similar to `collect`, but only returns the first $n$ elements.\n", "\n", "- This can be very useful for testing.\n" ] }, { "cell_type": "code", "execution_count": 49, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('to', 2), ('be', 2)]" ] }, "execution_count": 49, "metadata": {}, "output_type": "execute_result" } ], "source": [ "word_counts_rdd.take(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### The complete word-count example" ] }, { "cell_type": "code", "execution_count": 50, "metadata": { "slideshow": { "slide_type": "subslide" } }, "outputs": [ { "data": { "text/plain": [ "[('to', 2), ('be', 2), ('or', 1), ('not', 1)]" ] }, "execution_count": 50, "metadata": {}, "output_type": "execute_result" } ], "source": [ "text = \"to be or not to be\".split()\n", "rdd = sc.parallelize(text)\n", "counts = rdd.map(lambda word: (word, 1)) \\\n", " .reduceByKey(lambda x, y: x + y)\n", "counts.collect()" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Additional RDD transformations\n", "\n", "- Apache Spark offers many more methods for operating on collections of tuples over and above the standard Map-Reduce framework:\n", "\n", " - Sorting: `sortByKey`, `sortBy`, `takeOrdered`\n", " - Mapping: `flatMap`\n", " - Filtering: `filter`\n", " - Counting: `count`\n", " - Set-theoretic: `intersection`, `union`\n", " - Many others: [see the Transformations section of the programming guide](https://spark.apache.org/docs/latest/programming-guide.html#transformations)\n", " " ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Calculating $\\pi$ using Spark\n", "\n", "- We can estimate an approximate value for $\\pi$ using the following Monte-Carlo method:\n", "\n", "\n", "1. Inscribe a circle in a square\n", "2. Randomly generate points in the square\n", "3. Determine the number of points in the square that are also in the circle\n", "4. Let $r$ be the number of points in the circle divided by the number of points in the square, then $\\pi \\approx 4 r$.\n", " \n", "- Note that the more points generated, the better the approximation\n", "\n", "See [this tutorial](https://computing.llnl.gov/tutorials/parallel_comp/#ExamplesPI)." ] }, { "cell_type": "code", "execution_count": 59, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Pi is approximately 3.168000\n" ] } ], "source": [ "import numpy as np\n", "\n", "def sample(p):\n", " x, y = np.random.random(), np.random.random()\n", " #if (xcoordinate, ycoordinate) inside circle\n", " #then circle_count = circle_count + 1\n", " return 1 if x*x + y*y < 1 else 0\n", "\n", "NUM_SAMPLES = 1000\n", "\n", "count = sc.parallelize(range(0, NUM_SAMPLES)).map(sample) \\\n", " .reduce(lambda a, b: a + b)\n", "r = float(count) / float(NUM_SAMPLES)\n", "print (\"Pi is approximately %f\" % (4.0 * r))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "celltoolbar": "Slideshow", "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.3" } }, "nbformat": 4, "nbformat_minor": 1 }