{ "metadata": { "name": "", "signature": "sha256:505c148e82b4ecad4c08737e5a9969463e8753f8f0bc83aa1139a987b034d932" }, "nbformat": 3, "nbformat_minor": 0, "worksheets": [ { "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "- [spark 1.1.1 quick start doc](http://spark.apache.org/docs/latest/quick-start.html)\n", "- [spark 1.1.1 programming guide](http://spark.apache.org/docs/latest/programming-guide.html#initializing-spark)" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import startspark \n", "import pyspark" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 1 }, { "cell_type": "code", "collapsed": false, "input": [ "sc = startspark.create_spark_instance() # run once" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 2 }, { "cell_type": "markdown", "metadata": {}, "source": [ "The key abstraction of spark is ***Resilient Distributed Dataset (RDD)***, which represents a distributed collection of items. \n", "- RDD can be created from input formats such as local files or HDFS files\n", "- RDD can be `acted` to calcualte values\n", "- RDD can be `transformed` to new RDDs" ] }, { "cell_type": "code", "collapsed": false, "input": [ "textfile = sc.textFile(\"startspark.py\") ## construction\n", "print textfile.count() ## action - number of items (lines)\n", "print textfile.first() ## action - first item" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "22\n", "import os, sys" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "\n" ] } ], "prompt_number": 3 }, { "cell_type": "code", "collapsed": false, "input": [ "## filter to transform the RDD to a new RDD\n", "lineswithspark = textfile.filter(lambda line: \"spark\" in line)\n", "print lineswithspark.first()\n", "print lineswithspark.count()" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "SPARK_HOME = path.abspath(\"/home/dola/opt/spark-1.1.1/\")\n", "9" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "\n" ] } ], "prompt_number": 4 }, { "cell_type": "code", "collapsed": false, "input": [ "## combination and action and transformation can do a lot of things\n", "## e.g., find the line with most words in a file - use reduce to find max\n", "## map is a transform, reduce is an action\n", "textfile.map(lambda line: len(line.split())).reduce(lambda a, b: a if a>=b else b)" ], "language": "python", "metadata": {}, "outputs": [ { "metadata": {}, "output_type": "pyout", "prompt_number": 5, "text": [ "7" ] } ], "prompt_number": 5 }, { "cell_type": "code", "collapsed": false, "input": [ "## word count - \"hello world\" map reduce example\n", "## unlike reduce, reduceByKey generates another RDD\n", "word_counts = sc.textFile(\"README.md\").flatMap(lambda line: line.split()) \\\n", " .map(lambda word: (word, 1)) \\\n", " .reduceByKey(lambda a, b: a+b)\n", " \n", "print word_counts.collect()" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "[(u'and', 2), (u'useful', 1), (u'is', 1), (u'am', 2), (u'not', 1), (u'But', 1), (u'learning', 2), (u'go', 1), (u'plan', 1), (u'spark', 1), (u'are', 1), (u'for', 1), (u'how', 1), (u'with', 1), (u'least', 1), (u'machine', 1), (u'to', 2), (u'collections', 1), (u'tutorials', 1), (u'tasks.', 1), (u'include', 1), (u'(impyla).', 1), (u'sure', 1), (u'that', 1), (u'I', 3), (u'some', 2), (u'here', 1), (u'framework', 1), (u'preparing', 1), (u'across.', 1), (u'The', 1), (u'Impala', 1), (u'a', 1), (u'articles', 1), (u'about', 1), (u'this', 1), (u'of', 1), (u'yet.', 1), (u'at', 1), (u'came', 1), (u'Blaze', 1)]\n" ] } ], "prompt_number": 6 }, { "cell_type": "markdown", "metadata": {}, "source": [ "Cluster-wide in-memory cache is one of the biggest selling point of spark. It is generally required for hot data that will be continousely accessed in an iterative algorithm." ] }, { "cell_type": "code", "collapsed": false, "input": [ "## it is easy to cache a RDD\n", "textfile.cache() ## side effect instead of returning a new RDD\n", "textfile.count()" ], "language": "python", "metadata": {}, "outputs": [ { "metadata": {}, "output_type": "pyout", "prompt_number": 7, "text": [ "22" ] } ], "prompt_number": 7 }, { "cell_type": "markdown", "metadata": {}, "source": [ "When working with Spark, we can pass Python functions to Spark, which are automatically serialized along with any variables that they reference. For applications that use custom classes or third-party libraries, we can also add code dependencies to `spark-submit` through its `--py-files` argument by packaging them into a .zip file (see `spark-submit --help` for details). For example,\n", "\n", "```\n", "\"\"\"SimpleApp.py\"\"\"\n", "from pyspark import SparkContext\n", "\n", "logFile = \"YOUR_SPARK_HOME/README.md\" # Should be some file on your system\n", "sc = SparkContext(\"local\", \"Simple App\")\n", "logData = sc.textFile(logFile).cache()\n", "\n", "numAs = logData.filter(lambda s: 'a' in s).count()\n", "numBs = logData.filter(lambda s: 'b' in s).count()\n", "\n", "print \"Lines with a: %i, lines with b: %i\" % (numAs, numBs)\n", "```\n", "submit the python standalone to spark\n", "\n", "```\n", "# Use spark-submit to run your application\n", "$ YOUR_SPARK_HOME/bin/spark-submit \\\n", " --master local[4] \\\n", " SimpleApp.py\n", "...\n", "Lines with a: 46, Lines with b: 23\n", "```\n", "\n", "**As in the above example, in practice, when running on a cluster, you will not want to hardcode master in the program, but rather launch the application with `spark-submit` and receive it there. However, for local testing and unit tests, you can pass \u201clocal\u201d to run Spark in-process.**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Spark Programming Guide\n", "- Main Data Objects:\n", " - RDD: can be intutively imagined as a collection of things, distributed on cluster's files systems (e.g. by HDFS). An RDD can be persisted in (distributed) memory\n", " - Variables (different from RDD) by default have their own local copies in each task. However, they can be shared as \"broadcast variables\" (caching a value in memory on all nodes) or \"accumulators\" (variables that can only be added to from different tasks, such as counters). \n", "- There are two ways of creating an RDD\n", " - *parallelizing* an existing collection from driver program (for interactive exploration) - The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. \n", " - referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat (including local file system, in this case, however, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system).\n", "- Passing functions to Spark, three main ways:\n", " - lambda expression for short code\n", " - local defs inside the function calling Spark for longer code - similiar to a local variable\n", " - top-level functions (if it is a method defined on a class, the whole object might need to be passed to spark)\n", "- Key-Value paris RDD: While most Spark operations work on RDDs containing any type of objects, a few special operations are only available on RDDs of key-value pairs. The most common ones are distributred \"shuffle\" operations, such as grouping or aggregating the lements by a key, e.g., reduceByKey\n", "- All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program.\n", "- Persistence of RDD: By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.\n", "- One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use. Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD.unpersist() method." ] }, { "cell_type": "code", "collapsed": false, "input": [ "## parallelize an object in driver program to form an RDD\n", "import numpy as np\n", "data = np.arange(10) ## not just python list!\n", "para_data = sc.parallelize(data)\n", "para_data.count()\n", "!rm -fR data/temp-data/\n", "para_data.saveAsPickleFile(\"data/temp-data\", )\n", "print sc.pickleFile(\"data/temp-data/\").collect()" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]\n" ] } ], "prompt_number": 8 }, { "cell_type": "code", "collapsed": false, "input": [ "## several ways of load files\n", "!cat data/a.txt \n", "!cat data/b.txt\n", "\n", "allInOne = sc.textFile(\"data/*.txt\")\n", "print allInOne.count()\n", "\n", "nameToFiles = sc.wholeTextFiles(\"data/*.txt\")\n", "print nameToFiles.count()\n", "print nameToFiles.collect()" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "hello\r\n", "this is a\r\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "hello\r\n", "this is b\r\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "4\n", "2" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "\n", "[(u'/home/dola/workspace/dola/tutorials/learn-spark/data/a.txt', u'hello\\nthis is a\\n'), (u'/home/dola/workspace/dola/tutorials/learn-spark/data/b.txt', u'hello\\nthis is b\\n')]\n" ] } ], "prompt_number": 9 }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Shared Variables**\n", "\n", "Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of shared variables for two common usage patterns: broadcast variables and accumulators.\n", "\n", "*Broadcast variables* allow the programmer to keep a **read-only** variable cached on each machine rather than shipping a copy of it with tasks. Broadcast variables are created from a variable v by calling SparkContext.broadcast(v). The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method. " ] }, { "cell_type": "code", "collapsed": false, "input": [], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 9 }, { "cell_type": "code", "collapsed": false, "input": [], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 9 }, { "cell_type": "code", "collapsed": false, "input": [], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 9 }, { "cell_type": "markdown", "metadata": {}, "source": [ "## an example of how to paralize tasks in Spark\n", "It is based on the discussion on [stackoverflow](http://stackoverflow.com/questions/26828987/why-is-this-simple-spark-program-not-utlizing-multiple-cores)" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import random\n", "import time\n", "\n", "N = 12500000\n", "\n", "def sample(p):\n", " x, y = random.random(), random.random()\n", " return 1 if x*x + y*y < 1 else 0" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 10 }, { "cell_type": "code", "collapsed": false, "input": [ "## The following code will probably run on a single core, because spark implicitly partition \n", "## it to one partition\n", "\n", "tic = time.time()\n", "count = sc.parallelize(xrange(0, N)).map(sample).reduce(lambda a, b: a + b)\n", "print \"Pi is roughly %f\" % (4.0 * count / N)\n", "print 'time: %g s' % (time.time() - tic)" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "Pi is roughly 3.141940\n", "time: 9.78296 s\n" ] } ], "prompt_number": 11 }, { "cell_type": "code", "collapsed": false, "input": [ "## It can be made better by explicitly partitioning the data into multi parts.\n", "## But it will still have to generate the huge range in the driver thread - with single core\n", "ncores = 16\n", "tic = time.time()\n", "count = sc.parallelize(xrange(0, N), ncores).map(sample).reduce(lambda a, b: a + b)\n", "print \"Pi is roughly %f\" % (4.0 * count / N)\n", "print 'time: %g s' % (time.time() - tic)" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "Pi is roughly 3.141770\n", "time: 9.82372 s\n" ] } ], "prompt_number": 12 }, { "cell_type": "code", "collapsed": false, "input": [ "## so a better solution is to partition the data separately\n", "N = 12500000\n", "part = 16\n", "tic = time.time()\n", "count = ( sc.parallelize([None] * part, part)\n", " .flatMap(lambda blah: [sample(p) for p in xrange( N/part)])\n", " .reduce(lambda a, b: a + b)\n", " )\n", "print \"Pi is roughly %f\" % (4.0 * count / N)\n", "print 'time: %g s' % (time.time() - tic)" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "Pi is roughly 3.141796\n", "time: 6.60225 s\n" ] } ], "prompt_number": 13 }, { "cell_type": "code", "collapsed": false, "input": [], "language": "python", "metadata": {}, "outputs": [] } ], "metadata": {} } ] }