"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"Adopted from work by Steve Phelps:\n",
"https://github.com/phelps-sg/python-bigdata \n",
"This work is licensed under the Creative Commons Attribution 4.0 International license agreement.\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Reference\n",
"- [Spark Documentation](http://spark.apache.org/docs/latest/)\n",
"- [Spark Programming Guide](http://spark.apache.org/docs/latest/programming-guide.html)\n",
"- [DataBricks Login](https://community.cloud.databricks.com)\n",
"- [Pyspark](https://github.com/jupyter/docker-stacks)\n",
"- [Conda](\n",
"\n",
"```conda install -c anaconda-cluster/label/dev spark\n",
" conda install -c conda-forge pyspark\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"\n",
"### Overview\n",
"- History\n",
"- Data Structures\n",
"- Using Apache Spark with Python\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## History\n",
"\n",
"- Apache Spark was first released in 2014. \n",
"\n",
"- It was originally developed by [Matei Zaharia](http://people.csail.mit.edu/matei) as a class project, and later a PhD dissertation, at University of California, Berkeley.\n",
"\n",
"- In contrast to Hadoop, Apache Spark:\n",
"\n",
" - is easy to install and configure.\n",
" - provides a much more natural *iterative* workflow \n"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## Resilient Distributed Datasets (RDD)\n",
"\n",
"- The fundamental abstraction of Apache Spark is a read-only, parallel, distributed, fault-tolerent collection called a resilient distributed datasets (RDD).\n",
"\n",
"- When working with Apache Spark we iteratively apply functions to every elelement of these collections in parallel to produce *new* RDDs.\n",
"\n",
"- For the most part, you can think/use RDDs like distributed dataframes. \n"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"## Resilient Distributed Datasets (RDD)\n",
"\n",
"- Properties resilient distributed datasets (RDDs):\n",
" - The data is distributed across nodes in a cluster of computers.\n",
" - No data is lost if a single node fails.\n",
" - Data is typically stored in HBase tables, or HDFS files.\n",
" - The `map` and `reduce` functions can work in *parallel* across\n",
" different keys, or different elements of the collection.\n",
"\n",
"- The underlying framework (e.g. Hadoop or Apache Spark) allocates data and processing to different nodes, without any intervention from the programmer."
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## Word Count Example\n",
"\n",
"- In this simple example, the input is a set of URLs, each record is a document.
\n",
"\n",
"- **Problem: Compute how many times each word has occurred across data set.**"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## Word Count: Map \n",
"\n",
"\n",
"The input to $\\operatorname{map}$ is a mapping:\n",
"- Key: URL\n",
"- Value: Contents of document \n",
"$\\left< document1, to \\; be \\; or \\; not \\; to \\; be \\right>$ \n",
" \n",
"\n",
"- In this example, our $\\operatorname{map}$ function will process a given URL, and produces a mapping:\n",
"- So our original data-set will be transformed to:\n",
" \n",
" $\\left< to, 1 \\right>$\n",
" $\\left< be, 1 \\right>$\n",
" $\\left< or, 1 \\right>$\n",
" $\\left< not, 1 \\right>$\n",
" $\\left< to, 1 \\right>$\n",
" $\\left< be, 1 \\right>$"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## Word Count: Reduce\n",
"\n",
"\n",
"- The reduce operation groups values according to their key, and then performs areduce on each key.\n",
"\n",
"- The collections are partitioned across different storage units, therefore.\n",
"\n",
"- Map-Reduce will fold the data in such a way that it minimises data-copying across the cluster.\n",
"\n",
"- Data in different partitions are reduced separately in parallel.\n",
"\n",
"- The final result is a reduce of the reduced data in each partition.\n",
"\n",
"- Therefore it is very important that our operator *is both commutative and associative*.\n",
"\n",
"- In our case the function is the `+` operator\n",
"\n",
" $\\left< be, 2 \\right>$ \n",
" $\\left< not, 1 \\right>$ \n",
" $\\left< or, 1 \\right>$ \n",
" $\\left< to, 2 \\right>$ \n",
" "
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## Map-Reduce on a Cluster of Computers\n",
"\n",
"- The code we have written so far will *not* allow us to exploit parallelism from multiple computers in a [cluster](https://en.wikipedia.org/wiki/Computer_cluster).\n",
"\n",
"- Developing such a framework would be a very large software engineering project.\n",
"\n",
"- There are existing frameworks we can use:\n",
" - [Apache Hadoop](https://hadoop.apache.org/)\n",
" - [Apache Spark](https://spark.apache.org/)\n",
" \n",
"- This notebook covers Apache Spark."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Apache Spark\n",
"\n",
"- Apache Spark provides an object-oriented library for processing data on the cluster.\n",
"\n",
"- It provides objects which represent resilient distributed datasets (RDDs).\n",
"\n",
"- RDDs behave a bit like Python collections (e.g. lists).\n",
"\n",
"- However:\n",
" - the underlying data is distributed across the nodes in the cluster, and\n",
" - the collections are *immutable*."
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## Apache Spark and Map-Reduce\n",
"\n",
"- We process the data by using higher-order functions to map RDDs onto *new* RDDs. \n",
"\n",
"- Each instance of an RDD has at least two *methods* corresponding to the Map-Reduce workflow:\n",
" - `map`\n",
" - `reduceByKey`\n",
" \n",
"- These methods work in the same way as the corresponding functions we defined earlier to work with the standard Python collections. \n",
"\n",
"- There are also additional RDD methods in the Apache Spark API including ones for SQL.\n",
" "
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## Word-count in Apache Spark\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"slideshow": {
"slide_type": "-"
}
},
"outputs": [
{
"data": {
"text/plain": [
"['to', 'be', 'or', 'not', 'to', 'be']"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"words = \"to be or not to be\".split()\n",
"words"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"### The `SparkContext` class\n",
"\n",
"- When working with Apache Spark we invoke methods on an object which is an instance of the `pyspark.context.SparkContext` context.\n",
"\n",
"- Typically, (such as when running on DataBricks) an instance of this object will be created automatically for you and assigned to the variable `sc`.\n",
"\n",
"- The `parallelize` method in `SparkContext` can be used to turn any ordinary Python collection into an RDD; "
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"#Don't Execute this on Databricks\n",
"#To be used if executing via docker\n",
"import pyspark\n",
"sc = pyspark.SparkContext('local[*]')"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:480"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"words_rdd = sc.parallelize(words)\n",
"words_rdd"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"### Mapping an RDD\n",
"\n",
"- Now when we invoke the `map` or `reduceByKey` methods on `my_rdd` we can set up a parallel processing computation across the cluster."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"PythonRDD[1] at RDD at PythonRDD.scala:48"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"word_tuples_rdd = words_rdd.map(lambda x: (x, 1))\n",
"word_tuples_rdd"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Collecting the RDD\n",
"- Notice that we do not have a result yet.\n",
"\n",
"- The computation is not performed until we request the final result to be *collected*.\n",
"\n",
"- We do this by invoking the `collect()` method.\n",
"\n",
"- Be careful with the `collect` method, as all data you are collecting must fit in memory. \n",
"\n",
"- The `take` method is similar to `collect`, but only returns the first $n$ elements.\n",
" "
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[('to', 1), ('be', 1), ('or', 1), ('not', 1), ('to', 1), ('be', 1)]"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"word_tuples_rdd.collect()"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[('to', 1), ('be', 1), ('or', 1), ('not', 1)]"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"word_tuples_rdd.take(4)"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"### Reducing an RDD\n",
"\n",
"- However, we require additional processing to reduce the data using the word key. "
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"PythonRDD[8] at RDD at PythonRDD.scala:48"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"word_counts_rdd = word_tuples_rdd.reduceByKey(lambda x, y: x + y)\n",
"word_counts_rdd"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"- Now we request the final result:"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[('to', 2), ('be', 2), ('or', 1), ('not', 1)]"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"word_counts = word_counts_rdd.collect()\n",
"word_counts"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"### Lazy evaluation \n",
"\n",
"- It is only when we invoke `collect()` that the processing is performed on the cluster.\n",
"\n",
"- Invoking `collect()` will cause both the `map` and `reduceByKey` operations to be performed.\n",
"\n",
"- If the resulting collection is very large then this can be an expensive operation.\n"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[('to', 2), ('be', 2)]"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"word_counts_rdd.take(2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Connecting MapReduce in Single Command\n",
"- Can string together `map` and `reduce` commands.\n",
"- Not executed until it is collected."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"outputs": [
{
"data": {
"text/plain": [
"[('to', 2), ('be', 2), ('or', 1), ('not', 1)]"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"text = \"to be or not to be\".split()\n",
"rdd = sc.parallelize(text)\n",
"counts = rdd.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)\n",
"counts.collect()"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## Additional RDD transformations\n",
"\n",
"- Apache Spark offers many more methods for operating on collections of tuples over and above the standard Map-Reduce framework:\n",
"\n",
" - Sorting: `sortByKey`, `sortBy`, `takeOrdered`\n",
" - Mapping: `flatMap`\n",
" - Filtering: `filter`\n",
" - Counting: `count`\n",
" - Set-theoretic: `intersection`, `union`\n",
" - Many others: [see the Transformations section of the programming guide](https://spark.apache.org/docs/latest/programming-guide.html#transformations)\n",
" "
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": true,
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## Creating an RDD from a text file\n",
"\n",
"- In the previous example, we created an RDD from a Python collection.\n",
"\n",
"- This is *not* typically how we would work with big data.\n",
"\n",
"- More commonly we would create an RDD corresponding to data in an\n",
"HBase table, or an HDFS file.\n",
"\n",
"- The following example creates an RDD from a text file on the native filesystem (ext4);\n",
" - With bigger data, you would use an HDFS file, but the principle is the same.\n",
"\n",
"- Each element of the RDD corresponds to a single *line* of text."
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"genome = sc.textFile('../input/iris.csv')"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## Calculating $\\pi$ using Spark\n",
"\n",
"- We can estimate an approximate value for $\\pi$ using the following Monte-Carlo method:\n",
"\n",
"\n",
"1. Inscribe a circle in a square\n",
"2. Randomly generate points in the square\n",
"3. Determine the number of points in the square that are also in the circle\n",
"4. Let $r$ be the number of points in the circle divided by the number of points in the square, then $\\pi \\approx 4 r$.\n",
" \n",
"- Note that the more points generated, the better the approximation\n",
"\n",
"See [this tutorial](https://computing.llnl.gov/tutorials/parallel_comp/#ExamplesPI)."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"0.784568"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import numpy as np\n",
"\n",
"def sample(p):\n",
" #here x,y are the x,y coordinate\n",
" x, y = np.random.random(), np.random.random()\n",
" #Because the circle is of \n",
" return 1 if x*x + y*y < 1 else 0\n",
"\n",
"NUM_SAMPLES = 1000000\n",
"\n",
"count = sc.parallelize(range(0, NUM_SAMPLES)).map(sample) \\\n",
" .reduce(lambda a, b: a + b)\n",
"#Area = 4*PI*r\n",
"r = float(count) / float(NUM_SAMPLES)\n",
"r\n",
"#print (\"Pi is approximately %f\" % (4.0 * r))"
]
}
],
"metadata": {
"anaconda-cloud": {},
"celltoolbar": "Slideshow",
"kernelspec": {
"display_name": "Python [conda root]",
"language": "python",
"name": "conda-root-py"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.2"
}
},
"nbformat": 4,
"nbformat_minor": 1
}