{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "#![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png) + ![Python Logo](http://spark-mooc.github.io/web-assets/images/python-logo-master-v3-TM-flattened_small.png)\n", "# **Spark Tutorial: Learning Apache Spark**\n", "#### This tutorial will teach you how to use [Apache Spark](http://spark.apache.org/), a framework for large-scale data processing, within a notebook. Many traditional frameworks were designed to be run on a single computer. However, many datasets today are too large to be stored on a single computer, and even when a dataset can be stored on one computer (such as the datasets in this tutorial), the dataset can often be processed much more quickly using multiple computers. Spark has efficient implementations of a number of transformations and actions that can be composed together to perform data processing and analysis. Spark excels at distributing these operations across a cluster while abstracting away many of the underlying implementation details. Spark has been designed with a focus on scalability and efficiency. With Spark you can begin developing your solution on your laptop, using a small dataset, and then use that same code to process terabytes or even petabytes across a distributed cluster.\n", "#### **During this tutorial we will cover:**\n", "#### *Part 1:* Basic notebook usage and [Python](https://docs.python.org/2/) integration\n", "#### *Part 2:* An introduction to using [Apache Spark](https://spark.apache.org/) with the Python [pySpark API](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD) running in the browser\n", "#### *Part 3:* Using RDDs and chaining together transformations and actions\n", "#### *Part 4:* Lambda functions\n", "#### *Part 5:* Additional RDD actions\n", "#### *Part 6:* Additional RDD transformations\n", "#### *Part 7:* Caching RDDs and storage options\n", "#### *Part 8:* Debugging Spark applications and lazy evaluation\n", "#### The following transformations will be covered:\n", "* #### `map()`, `mapPartitions()`, `mapPartitionsWithIndex()`, `filter()`, `flatMap()`, `reduceByKey()`, `groupByKey()`\n", "#### The following actions will be covered:\n", "* #### `first()`, `take()`, `takeSample()`, `takeOrdered()`, `collect()`, `count()`, `countByValue()`, `reduce()`, `top()`\n", "#### Also covered:\n", "* #### `cache()`, `unpersist()`, `id()`, `setName()`\n", "#### Note that, for reference, you can look up the details of these methods in [Spark's Python API](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### **Part 1: Basic notebook usage and [Python](https://docs.python.org/2/) integration **" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### **(1a) Notebook usage**\n", "#### A notebook is comprised of a linear sequence of cells. These cells can contain either markdown or code, but we won't mix both in one cell. When a markdown cell is executed it renders formatted text, images, and links just like HTML in a normal webpage. The text you are reading right now is part of a markdown cell. Python code cells allow you to execute arbitrary Python commands just like in any Python shell. Place your cursor inside the cell below, and press \"Shift\" + \"Enter\" to execute the code and advance to the next cell. You can also press \"Ctrl\" + \"Enter\" to execute the code and remain in the cell. These commands work the same in both markdown and code cells." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "The sum of 1 and 1 is 2\n" ] } ], "source": [ "# This is a Python cell. You can run normal Python code here...\n", "print 'The sum of 1 and 1 is {0}'.format(1+1)" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "The sum of 1 and 2 is 3\n" ] } ], "source": [ "# Here is another Python cell, this time with a variable (x) declaration and an if statement:\n", "x = 42\n", "if x > 40:\n", " print 'The sum of 1 and 2 is {0}'.format(1+2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### **(1b) Notebook state**\n", "#### As you work through a notebook it is important that you run all of the code cells. The notebook is stateful, which means that variables and their values are retained until the notebook is detached (in Databricks Cloud) or the kernel is restarted (in IPython notebooks). If you do not run all of the code cells as you proceed through the notebook, your variables will not be properly initialized and later code might fail. You will also need to rerun any cells that you have modified in order for the changes to be available to other cells." ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "84\n" ] } ], "source": [ "# This cell relies on x being defined already.\n", "# If we didn't run the cells from part (1a) this code would fail.\n", "print x * 2" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### **(1c) Library imports**\n", "#### We can import standard Python libraries ([modules](https://docs.python.org/2/tutorial/modules.html)) the usual way. An `import` statement will import the specified module. In this tutorial and future labs, we will provide any imports that are necessary." ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "'def'" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Import the regular expression library\n", "import re\n", "m = re.search('(?<=abc)def', 'abcdef')\n", "m.group(0)" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "This was last run on: 2015-07-16 00:32:33.921297\n" ] } ], "source": [ "# Import the datetime library\n", "import datetime\n", "print 'This was last run on: {0}'.format(datetime.datetime.now())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### **Part 2: An introduction to using [Apache Spark](https://spark.apache.org/) with the Python [pySpark API](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD) running in the browser**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### **Spark Context**\n", "#### In Spark, communication occurs between a driver and executors. The driver has Spark jobs that it needs to run and these jobs are split into tasks that are submitted to the executors for completion. The results from these tasks are delivered back to the driver.\n", "#### In part 1, we saw that normal python code can be executed via cells. When using Databricks Cloud this code gets executed in the Spark driver's Java Virtual Machine (JVM) and not in an executor's JVM, and when using an IPython notebook it is executed within the kernel associated with the notebook. Since no Spark functionality is actually being used, no tasks are launched on the executors.\n", "#### In order to use Spark and its API we will need to use a `SparkContext`. When running Spark, you start a new Spark application by creating a [SparkContext](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext). When the `SparkContext` is created, it asks the master for some cores to use to do work. The master sets these cores aside just for you; they won't be used for other applications. When using Databricks Cloud or the virtual machine provisioned for this class, the `SparkContext` is created for you automatically as `sc`." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### **(2a) Example Cluster**\n", "#### The diagram below shows an example cluster, where the cores allocated for an application are outlined in purple.\n", "![executors](http://spark-mooc.github.io/web-assets/images/executors.png)\n", "#### You can view the details of your Spark application in the Spark web UI. The web UI is accessible in Databricks cloud by going to \"Clusters\" and then clicking on the \"View Spark UI\" link for your cluster. When running locally you'll find it at [localhost:4040](http://localhost:4040). In the web UI, under the \"Jobs\" tab, you can see a list of jobs that have been scheduled or run. It's likely there isn't any thing interesting here yet because we haven't run any jobs, but we'll return to this page later.\n", "#### At a high level, every Spark application consists of a driver program that launches various parallel operations on executor Java Virtual Machines (JVMs) running either in a cluster or locally on the same machine. In Databricks Cloud, \"Databricks Shell\" is the driver program. When running locally, \"PySparkShell\" is the driver program. In all cases, this driver program contains the main loop for the program and creates distributed datasets on the cluster, then applies operations (transformations & actions) to those datasets.\n", "#### Driver programs access Spark through a SparkContext object, which represents a connection to a computing cluster. A Spark context object (`sc`) is the main entry point for Spark functionality. A Spark context can be used to create Resilient Distributed Datasets (RDDs) on a cluster.\n", "#### Try printing out `sc` to see its type." ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "pyspark.context.SparkContext" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Display the type of the Spark Context sc\n", "type(sc)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### **(2b) `SparkContext` attributes**\n", "#### You can use Python's [dir()](https://docs.python.org/2/library/functions.html?highlight=dir#dir) function to get a list of all the attributes (including methods) accessible through the `sc` object." ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "['PACKAGE_EXTENSIONS',\n", " '__class__',\n", " '__delattr__',\n", " '__dict__',\n", " '__doc__',\n", " '__enter__',\n", " '__exit__',\n", " '__format__',\n", " '__getattribute__',\n", " '__getnewargs__',\n", " '__hash__',\n", " '__init__',\n", " '__module__',\n", " '__new__',\n", " '__reduce__',\n", " '__reduce_ex__',\n", " '__repr__',\n", " '__setattr__',\n", " '__sizeof__',\n", " '__str__',\n", " '__subclasshook__',\n", " '__weakref__',\n", " '_accumulatorServer',\n", " '_active_spark_context',\n", " '_batchSize',\n", " '_callsite',\n", " '_checkpointFile',\n", " '_conf',\n", " '_dictToJavaMap',\n", " '_do_init',\n", " '_ensure_initialized',\n", " '_gateway',\n", " '_getJavaStorageLevel',\n", " '_initialize_context',\n", " '_javaAccumulator',\n", " '_jsc',\n", " '_jvm',\n", " '_lock',\n", " '_next_accum_id',\n", " '_pickled_broadcast_vars',\n", " '_python_includes',\n", " '_temp_dir',\n", " '_unbatched_serializer',\n", " 'accumulator',\n", " 'addFile',\n", " 'addPyFile',\n", " 'appName',\n", " 'binaryFiles',\n", " 'binaryRecords',\n", " 'broadcast',\n", " 'cancelAllJobs',\n", " 'cancelJobGroup',\n", " 'clearFiles',\n", " 'defaultMinPartitions',\n", " 'defaultParallelism',\n", " 'dump_profiles',\n", " 'environment',\n", " 'getLocalProperty',\n", " 'hadoopFile',\n", " 'hadoopRDD',\n", " 'master',\n", " 'newAPIHadoopFile',\n", " 'newAPIHadoopRDD',\n", " 'parallelize',\n", " 'pickleFile',\n", " 'profiler_collector',\n", " 'pythonExec',\n", " 'runJob',\n", " 'sequenceFile',\n", " 'serializer',\n", " 'setCheckpointDir',\n", " 'setJobGroup',\n", " 'setLocalProperty',\n", " 'setSystemProperty',\n", " 'show_profiles',\n", " 'sparkHome',\n", " 'sparkUser',\n", " 'statusTracker',\n", " 'stop',\n", " 'textFile',\n", " 'union',\n", " 'version',\n", " 'wholeTextFiles']" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# List sc's attributes\n", "dir(sc)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### **(2c) Getting help**\n", "#### Alternatively, you can use Python's [help()](https://docs.python.org/2/library/functions.html?highlight=help#help) function to get an easier to read list of all the attributes, including examples, that the `sc` object has." ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Help on SparkContext in module pyspark.context object:\n", "\n", "class SparkContext(__builtin__.object)\n", " | Main entry point for Spark functionality. A SparkContext represents the\n", " | connection to a Spark cluster, and can be used to create L{RDD} and\n", " | broadcast variables on that cluster.\n", " | \n", " | Methods defined here:\n", " | \n", " | __enter__(self)\n", " | Enable 'with SparkContext(...) as sc: app(sc)' syntax.\n", " | \n", " | __exit__(self, type, value, trace)\n", " | Enable 'with SparkContext(...) as sc: app' syntax.\n", " | \n", " | Specifically stop the context on exit of the with block.\n", " | \n", " | __getnewargs__(self)\n", " | \n", " | __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=)\n", " | Create a new SparkContext. At least the master and app name should be set,\n", " | either through the named parameters here or through C{conf}.\n", " | \n", " | :param master: Cluster URL to connect to\n", " | (e.g. mesos://host:port, spark://host:port, local[4]).\n", " | :param appName: A name for your job, to display on the cluster web UI.\n", " | :param sparkHome: Location where Spark is installed on cluster nodes.\n", " | :param pyFiles: Collection of .zip or .py files to send to the cluster\n", " | and add to PYTHONPATH. These can be paths on the local file\n", " | system or HDFS, HTTP, HTTPS, or FTP URLs.\n", " | :param environment: A dictionary of environment variables to set on\n", " | worker nodes.\n", " | :param batchSize: The number of Python objects represented as a single\n", " | Java object. Set 1 to disable batching, 0 to automatically choose\n", " | the batch size based on object sizes, or -1 to use an unlimited\n", " | batch size\n", " | :param serializer: The serializer for RDDs.\n", " | :param conf: A L{SparkConf} object setting Spark properties.\n", " | :param gateway: Use an existing gateway and JVM, otherwise a new JVM\n", " | will be instantiated.\n", " | :param jsc: The JavaSparkContext instance (optional).\n", " | :param profiler_cls: A class of custom Profiler used to do profiling\n", " | (default is pyspark.profiler.BasicProfiler).\n", " | \n", " | \n", " | >>> from pyspark.context import SparkContext\n", " | >>> sc = SparkContext('local', 'test')\n", " | \n", " | >>> sc2 = SparkContext('local', 'test2') # doctest: +IGNORE_EXCEPTION_DETAIL\n", " | Traceback (most recent call last):\n", " | ...\n", " | ValueError:...\n", " | \n", " | accumulator(self, value, accum_param=None)\n", " | Create an L{Accumulator} with the given initial value, using a given\n", " | L{AccumulatorParam} helper object to define how to add values of the\n", " | data type if provided. Default AccumulatorParams are used for integers\n", " | and floating-point numbers if you do not provide one. For other types,\n", " | a custom AccumulatorParam can be used.\n", " | \n", " | addFile(self, path)\n", " | Add a file to be downloaded with this Spark job on every node.\n", " | The C{path} passed can be either a local file, a file in HDFS\n", " | (or other Hadoop-supported filesystems), or an HTTP, HTTPS or\n", " | FTP URI.\n", " | \n", " | To access the file in Spark jobs, use\n", " | L{SparkFiles.get(fileName)} with the\n", " | filename to find its download location.\n", " | \n", " | >>> from pyspark import SparkFiles\n", " | >>> path = os.path.join(tempdir, \"test.txt\")\n", " | >>> with open(path, \"w\") as testFile:\n", " | ... testFile.write(\"100\")\n", " | >>> sc.addFile(path)\n", " | >>> def func(iterator):\n", " | ... with open(SparkFiles.get(\"test.txt\")) as testFile:\n", " | ... fileVal = int(testFile.readline())\n", " | ... return [x * fileVal for x in iterator]\n", " | >>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect()\n", " | [100, 200, 300, 400]\n", " | \n", " | addPyFile(self, path)\n", " | Add a .py or .zip dependency for all tasks to be executed on this\n", " | SparkContext in the future. The C{path} passed can be either a local\n", " | file, a file in HDFS (or other Hadoop-supported filesystems), or an\n", " | HTTP, HTTPS or FTP URI.\n", " | \n", " | binaryFiles(self, path, minPartitions=None)\n", " | .. note:: Experimental\n", " | \n", " | Read a directory of binary files from HDFS, a local file system\n", " | (available on all nodes), or any Hadoop-supported file system URI\n", " | as a byte array. Each file is read as a single record and returned\n", " | in a key-value pair, where the key is the path of each file, the\n", " | value is the content of each file.\n", " | \n", " | Note: Small files are preferred, large file is also allowable, but\n", " | may cause bad performance.\n", " | \n", " | binaryRecords(self, path, recordLength)\n", " | .. note:: Experimental\n", " | \n", " | Load data from a flat binary file, assuming each record is a set of numbers\n", " | with the specified numerical format (see ByteBuffer), and the number of\n", " | bytes per record is constant.\n", " | \n", " | :param path: Directory to the input data files\n", " | :param recordLength: The length at which to split the records\n", " | \n", " | broadcast(self, value)\n", " | Broadcast a read-only variable to the cluster, returning a\n", " | L{Broadcast}\n", " | object for reading it in distributed functions. The variable will\n", " | be sent to each cluster only once.\n", " | \n", " | cancelAllJobs(self)\n", " | Cancel all jobs that have been scheduled or are running.\n", " | \n", " | cancelJobGroup(self, groupId)\n", " | Cancel active jobs for the specified group. See L{SparkContext.setJobGroup}\n", " | for more information.\n", " | \n", " | clearFiles(self)\n", " | Clear the job's list of files added by L{addFile} or L{addPyFile} so\n", " | that they do not get downloaded to any new nodes.\n", " | \n", " | dump_profiles(self, path)\n", " | Dump the profile stats into directory `path`\n", " | \n", " | getLocalProperty(self, key)\n", " | Get a local property set in this thread, or null if it is missing. See\n", " | L{setLocalProperty}\n", " | \n", " | hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)\n", " | Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS,\n", " | a local file system (available on all nodes), or any Hadoop-supported file system URI.\n", " | The mechanism is the same as for sc.sequenceFile.\n", " | \n", " | A Hadoop configuration can be passed in as a Python dict. This will be converted into a\n", " | Configuration in Java.\n", " | \n", " | :param path: path to Hadoop file\n", " | :param inputFormatClass: fully qualified classname of Hadoop InputFormat\n", " | (e.g. \"org.apache.hadoop.mapred.TextInputFormat\")\n", " | :param keyClass: fully qualified classname of key Writable class\n", " | (e.g. \"org.apache.hadoop.io.Text\")\n", " | :param valueClass: fully qualified classname of value Writable class\n", " | (e.g. \"org.apache.hadoop.io.LongWritable\")\n", " | :param keyConverter: (None by default)\n", " | :param valueConverter: (None by default)\n", " | :param conf: Hadoop configuration, passed in as a dict\n", " | (None by default)\n", " | :param batchSize: The number of Python objects represented as a single\n", " | Java object. (default 0, choose batchSize automatically)\n", " | \n", " | hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)\n", " | Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary\n", " | Hadoop configuration, which is passed in as a Python dict.\n", " | This will be converted into a Configuration in Java.\n", " | The mechanism is the same as for sc.sequenceFile.\n", " | \n", " | :param inputFormatClass: fully qualified classname of Hadoop InputFormat\n", " | (e.g. \"org.apache.hadoop.mapred.TextInputFormat\")\n", " | :param keyClass: fully qualified classname of key Writable class\n", " | (e.g. \"org.apache.hadoop.io.Text\")\n", " | :param valueClass: fully qualified classname of value Writable class\n", " | (e.g. \"org.apache.hadoop.io.LongWritable\")\n", " | :param keyConverter: (None by default)\n", " | :param valueConverter: (None by default)\n", " | :param conf: Hadoop configuration, passed in as a dict\n", " | (None by default)\n", " | :param batchSize: The number of Python objects represented as a single\n", " | Java object. (default 0, choose batchSize automatically)\n", " | \n", " | newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)\n", " | Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS,\n", " | a local file system (available on all nodes), or any Hadoop-supported file system URI.\n", " | The mechanism is the same as for sc.sequenceFile.\n", " | \n", " | A Hadoop configuration can be passed in as a Python dict. This will be converted into a\n", " | Configuration in Java\n", " | \n", " | :param path: path to Hadoop file\n", " | :param inputFormatClass: fully qualified classname of Hadoop InputFormat\n", " | (e.g. \"org.apache.hadoop.mapreduce.lib.input.TextInputFormat\")\n", " | :param keyClass: fully qualified classname of key Writable class\n", " | (e.g. \"org.apache.hadoop.io.Text\")\n", " | :param valueClass: fully qualified classname of value Writable class\n", " | (e.g. \"org.apache.hadoop.io.LongWritable\")\n", " | :param keyConverter: (None by default)\n", " | :param valueConverter: (None by default)\n", " | :param conf: Hadoop configuration, passed in as a dict\n", " | (None by default)\n", " | :param batchSize: The number of Python objects represented as a single\n", " | Java object. (default 0, choose batchSize automatically)\n", " | \n", " | newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)\n", " | Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary\n", " | Hadoop configuration, which is passed in as a Python dict.\n", " | This will be converted into a Configuration in Java.\n", " | The mechanism is the same as for sc.sequenceFile.\n", " | \n", " | :param inputFormatClass: fully qualified classname of Hadoop InputFormat\n", " | (e.g. \"org.apache.hadoop.mapreduce.lib.input.TextInputFormat\")\n", " | :param keyClass: fully qualified classname of key Writable class\n", " | (e.g. \"org.apache.hadoop.io.Text\")\n", " | :param valueClass: fully qualified classname of value Writable class\n", " | (e.g. \"org.apache.hadoop.io.LongWritable\")\n", " | :param keyConverter: (None by default)\n", " | :param valueConverter: (None by default)\n", " | :param conf: Hadoop configuration, passed in as a dict\n", " | (None by default)\n", " | :param batchSize: The number of Python objects represented as a single\n", " | Java object. (default 0, choose batchSize automatically)\n", " | \n", " | parallelize(self, c, numSlices=None)\n", " | Distribute a local Python collection to form an RDD. Using xrange\n", " | is recommended if the input represents a range for performance.\n", " | \n", " | >>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()\n", " | [[0], [2], [3], [4], [6]]\n", " | >>> sc.parallelize(xrange(0, 6, 2), 5).glom().collect()\n", " | [[], [0], [], [2], [4]]\n", " | \n", " | pickleFile(self, name, minPartitions=None)\n", " | Load an RDD previously saved using L{RDD.saveAsPickleFile} method.\n", " | \n", " | >>> tmpFile = NamedTemporaryFile(delete=True)\n", " | >>> tmpFile.close()\n", " | >>> sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name, 5)\n", " | >>> sorted(sc.pickleFile(tmpFile.name, 3).collect())\n", " | [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]\n", " | \n", " | runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False)\n", " | Executes the given partitionFunc on the specified set of partitions,\n", " | returning the result as an array of elements.\n", " | \n", " | If 'partitions' is not specified, this will run over all partitions.\n", " | \n", " | >>> myRDD = sc.parallelize(range(6), 3)\n", " | >>> sc.runJob(myRDD, lambda part: [x * x for x in part])\n", " | [0, 1, 4, 9, 16, 25]\n", " | \n", " | >>> myRDD = sc.parallelize(range(6), 3)\n", " | >>> sc.runJob(myRDD, lambda part: [x * x for x in part], [0, 2], True)\n", " | [0, 1, 16, 25]\n", " | \n", " | sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, minSplits=None, batchSize=0)\n", " | Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS,\n", " | a local file system (available on all nodes), or any Hadoop-supported file system URI.\n", " | The mechanism is as follows:\n", " | \n", " | 1. A Java RDD is created from the SequenceFile or other InputFormat, and the key\n", " | and value Writable classes\n", " | 2. Serialization is attempted via Pyrolite pickling\n", " | 3. If this fails, the fallback is to call 'toString' on each key and value\n", " | 4. C{PickleSerializer} is used to deserialize pickled objects on the Python side\n", " | \n", " | :param path: path to sequncefile\n", " | :param keyClass: fully qualified classname of key Writable class\n", " | (e.g. \"org.apache.hadoop.io.Text\")\n", " | :param valueClass: fully qualified classname of value Writable class\n", " | (e.g. \"org.apache.hadoop.io.LongWritable\")\n", " | :param keyConverter:\n", " | :param valueConverter:\n", " | :param minSplits: minimum splits in dataset\n", " | (default min(2, sc.defaultParallelism))\n", " | :param batchSize: The number of Python objects represented as a single\n", " | Java object. (default 0, choose batchSize automatically)\n", " | \n", " | setCheckpointDir(self, dirName)\n", " | Set the directory under which RDDs are going to be checkpointed. The\n", " | directory must be a HDFS path if running on a cluster.\n", " | \n", " | setJobGroup(self, groupId, description, interruptOnCancel=False)\n", " | Assigns a group ID to all the jobs started by this thread until the group ID is set to a\n", " | different value or cleared.\n", " | \n", " | Often, a unit of execution in an application consists of multiple Spark actions or jobs.\n", " | Application programmers can use this method to group all those jobs together and give a\n", " | group description. Once set, the Spark web UI will associate such jobs with this group.\n", " | \n", " | The application can use L{SparkContext.cancelJobGroup} to cancel all\n", " | running jobs in this group.\n", " | \n", " | >>> import thread, threading\n", " | >>> from time import sleep\n", " | >>> result = \"Not Set\"\n", " | >>> lock = threading.Lock()\n", " | >>> def map_func(x):\n", " | ... sleep(100)\n", " | ... raise Exception(\"Task should have been cancelled\")\n", " | >>> def start_job(x):\n", " | ... global result\n", " | ... try:\n", " | ... sc.setJobGroup(\"job_to_cancel\", \"some description\")\n", " | ... result = sc.parallelize(range(x)).map(map_func).collect()\n", " | ... except Exception as e:\n", " | ... result = \"Cancelled\"\n", " | ... lock.release()\n", " | >>> def stop_job():\n", " | ... sleep(5)\n", " | ... sc.cancelJobGroup(\"job_to_cancel\")\n", " | >>> supress = lock.acquire()\n", " | >>> supress = thread.start_new_thread(start_job, (10,))\n", " | >>> supress = thread.start_new_thread(stop_job, tuple())\n", " | >>> supress = lock.acquire()\n", " | >>> print result\n", " | Cancelled\n", " | \n", " | If interruptOnCancel is set to true for the job group, then job cancellation will result\n", " | in Thread.interrupt() being called on the job's executor threads. This is useful to help\n", " | ensure that the tasks are actually stopped in a timely manner, but is off by default due\n", " | to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.\n", " | \n", " | setLocalProperty(self, key, value)\n", " | Set a local property that affects jobs submitted from this thread, such as the\n", " | Spark fair scheduler pool.\n", " | \n", " | show_profiles(self)\n", " | Print the profile stats to stdout\n", " | \n", " | sparkUser(self)\n", " | Get SPARK_USER for user who is running SparkContext.\n", " | \n", " | statusTracker(self)\n", " | Return :class:`StatusTracker` object\n", " | \n", " | stop(self)\n", " | Shut down the SparkContext.\n", " | \n", " | textFile(self, name, minPartitions=None, use_unicode=True)\n", " | Read a text file from HDFS, a local file system (available on all\n", " | nodes), or any Hadoop-supported file system URI, and return it as an\n", " | RDD of Strings.\n", " | \n", " | If use_unicode is False, the strings will be kept as `str` (encoding\n", " | as `utf-8`), which is faster and smaller than unicode. (Added in\n", " | Spark 1.2)\n", " | \n", " | >>> path = os.path.join(tempdir, \"sample-text.txt\")\n", " | >>> with open(path, \"w\") as testFile:\n", " | ... testFile.write(\"Hello world!\")\n", " | >>> textFile = sc.textFile(path)\n", " | >>> textFile.collect()\n", " | [u'Hello world!']\n", " | \n", " | union(self, rdds)\n", " | Build the union of a list of RDDs.\n", " | \n", " | This supports unions() of RDDs with different serialized formats,\n", " | although this forces them to be reserialized using the default\n", " | serializer:\n", " | \n", " | >>> path = os.path.join(tempdir, \"union-text.txt\")\n", " | >>> with open(path, \"w\") as testFile:\n", " | ... testFile.write(\"Hello\")\n", " | >>> textFile = sc.textFile(path)\n", " | >>> textFile.collect()\n", " | [u'Hello']\n", " | >>> parallelized = sc.parallelize([\"World!\"])\n", " | >>> sorted(sc.union([textFile, parallelized]).collect())\n", " | [u'Hello', 'World!']\n", " | \n", " | wholeTextFiles(self, path, minPartitions=None, use_unicode=True)\n", " | Read a directory of text files from HDFS, a local file system\n", " | (available on all nodes), or any Hadoop-supported file system\n", " | URI. Each file is read as a single record and returned in a\n", " | key-value pair, where the key is the path of each file, the\n", " | value is the content of each file.\n", " | \n", " | If use_unicode is False, the strings will be kept as `str` (encoding\n", " | as `utf-8`), which is faster and smaller than unicode. (Added in\n", " | Spark 1.2)\n", " | \n", " | For example, if you have the following files::\n", " | \n", " | hdfs://a-hdfs-path/part-00000\n", " | hdfs://a-hdfs-path/part-00001\n", " | ...\n", " | hdfs://a-hdfs-path/part-nnnnn\n", " | \n", " | Do C{rdd = sparkContext.wholeTextFiles(\"hdfs://a-hdfs-path\")},\n", " | then C{rdd} contains::\n", " | \n", " | (a-hdfs-path/part-00000, its content)\n", " | (a-hdfs-path/part-00001, its content)\n", " | ...\n", " | (a-hdfs-path/part-nnnnn, its content)\n", " | \n", " | NOTE: Small files are preferred, as each file will be loaded\n", " | fully in memory.\n", " | \n", " | >>> dirPath = os.path.join(tempdir, \"files\")\n", " | >>> os.mkdir(dirPath)\n", " | >>> with open(os.path.join(dirPath, \"1.txt\"), \"w\") as file1:\n", " | ... file1.write(\"1\")\n", " | >>> with open(os.path.join(dirPath, \"2.txt\"), \"w\") as file2:\n", " | ... file2.write(\"2\")\n", " | >>> textFiles = sc.wholeTextFiles(dirPath)\n", " | >>> sorted(textFiles.collect())\n", " | [(u'.../1.txt', u'1'), (u'.../2.txt', u'2')]\n", " | \n", " | ----------------------------------------------------------------------\n", " | Class methods defined here:\n", " | \n", " | setSystemProperty(cls, key, value) from __builtin__.type\n", " | Set a Java system property, such as spark.executor.memory. This must\n", " | must be invoked before instantiating SparkContext.\n", " | \n", " | ----------------------------------------------------------------------\n", " | Data descriptors defined here:\n", " | \n", " | __dict__\n", " | dictionary for instance variables (if defined)\n", " | \n", " | __weakref__\n", " | list of weak references to the object (if defined)\n", " | \n", " | defaultMinPartitions\n", " | Default min number of partitions for Hadoop RDDs when not given by user\n", " | \n", " | defaultParallelism\n", " | Default level of parallelism to use when not given by user (e.g. for\n", " | reduce tasks)\n", " | \n", " | version\n", " | The version of Spark on which this application is running.\n", " | \n", " | ----------------------------------------------------------------------\n", " | Data and other attributes defined here:\n", " | \n", " | PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')\n", "\n" ] } ], "source": [ "# Use help to obtain more detailed information\n", "help(sc)" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "u'1.3.1'" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# After reading the help we've decided we want to use sc.version to see what version of Spark we are running\n", "sc.version" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Help on built-in function map in module __builtin__:\n", "\n", "map(...)\n", " map(function, sequence[, sequence, ...]) -> list\n", " \n", " Return a list of the results of applying the function to the items of\n", " the argument sequence(s). If more than one sequence is given, the\n", " function is called with an argument list consisting of the corresponding\n", " item of each sequence, substituting None for missing values when not all\n", " sequences have the same length. If the function is None, return a list of\n", " the items of the sequence (or a list of tuples if more than one sequence).\n", "\n" ] } ], "source": [ "# Help can be used on any Python object\n", "help(map)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### **Part 3: Using RDDs and chaining together transformations and actions**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### **Working with your first RDD**\n", "#### In Spark, we first create a base [Resilient Distributed Dataset](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD) (RDD). We can then apply one or more transformations to that base RDD. *An RDD is immutable, so once it is created, it cannot be changed.* As a result, each transformation creates a new RDD. Finally, we can apply one or more actions to the RDDs. Note that Spark uses lazy evaluation, so transformations are not actually executed until an action occurs.\n", "#### We will perform several exercises to obtain a better understanding of RDDs:\n", "* ##### Create a Python collection of 10,000 integers\n", "* ##### Create a Spark base RDD from that collection\n", "* ##### Subtract one from each value using `map`\n", "* ##### Perform action `collect` to view results\n", "* ##### Perform action `count` to view counts\n", "* ##### Apply transformation `filter` and view results with `collect`\n", "* ##### Learn about lambda functions\n", "* ##### Explore how lazy evaluation works and the debugging challenges that it introduces" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### **(3a) Create a Python collection of integers in the range of 1 .. 10000**\n", "#### We will use the [xrange()](https://docs.python.org/2/library/functions.html?highlight=xrange#xrange) function to create a [list()](https://docs.python.org/2/library/functions.html?highlight=list#list) of integers. `xrange()` only generates values as they are needed. This is different from the behavior of [range()](https://docs.python.org/2/library/functions.html?highlight=range#range) which generates the complete list upon execution. Because of this `xrange()` is more memory efficient than `range()`, especially for large ranges." ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "collapsed": false }, "outputs": [], "source": [ "data = xrange(1, 10001)" ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "1" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Data is just a normal Python list\n", "# Obtain data's first element\n", "data[0]" ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "10000" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# We can check the size of the list using the len() function\n", "len(data)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### **(3b) Distributed data and using a collection to create an RDD**\n", "#### In Spark, datasets are represented as a list of entries, where the list is broken up into many different partitions that are each stored on a different machine. Each partition holds a unique subset of the entries in the list. Spark calls datasets that it stores \"Resilient Distributed Datasets\" (RDDs).\n", "#### One of the defining features of Spark, compared to other data analytics frameworks (e.g., Hadoop), is that it stores data in memory rather than on disk. This allows Spark applications to run much more quickly, because they are not slowed down by needing to read data from disk.\n", "#### The figure below illustrates how Spark breaks a list of data entries into partitions that are each stored in memory on a worker.\n", "![partitions](http://spark-mooc.github.io/web-assets/images/partitions.png)\n", "#### To create the RDD, we use `sc.parallelize()`, which tells Spark to create a new set of input data based on data that is passed in. In this example, we will provide an `xrange`. The second argument to the [sc.parallelize()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext.parallelize) method tells Spark how many partitions to break the data into when it stores the data in memory (we'll talk more about this later in this tutorial). Note that for better performance when using `parallelize`, `xrange()` is recommended if the input represents a range. This is the reason why we used `xrange()` in 3a.\n", "#### There are many different types of RDDs. The base class for RDDs is [pyspark.RDD](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD) and other RDDs subclass `pyspark.RDD`. Since the other RDD types inherit from `pyspark.RDD` they have the same APIs and are functionally identical. We'll see that `sc.parallelize()` generates a `pyspark.rdd.PipelinedRDD` when its input is an `xrange`, and a `pyspark.RDD` when its input is a `range`.\n", "#### After we generate RDDs, we can view them in the \"Storage\" tab of the web UI. You'll notice that new datasets are not listed until Spark needs to return a result due to an action being executed. This feature of Spark is called \"lazy evaluation\". This allows Spark to avoid performing unnecessary calculations." ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# Parallelize data using 8 partitions\n", "# This operation is a transformation of data into an RDD\n", "# Spark uses lazy evaluation, so no Spark jobs are run at this point\n", "xrangeRDD = sc.parallelize(data, 8)" ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Help on method parallelize in module pyspark.context:\n", "\n", "parallelize(self, c, numSlices=None) method of pyspark.context.SparkContext instance\n", " Distribute a local Python collection to form an RDD. Using xrange\n", " is recommended if the input represents a range for performance.\n", " \n", " >>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()\n", " [[0], [2], [3], [4], [6]]\n", " >>> sc.parallelize(xrange(0, 6, 2), 5).glom().collect()\n", " [[], [0], [], [2], [4]]\n", "\n" ] } ], "source": [ "# Let's view help on parallelize\n", "help(sc.parallelize)" ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "type of xrangeRDD: \n", "type of dataRangeRDD: \n" ] } ], "source": [ "# Let's see what type sc.parallelize() returned\n", "print 'type of xrangeRDD: {0}'.format(type(xrangeRDD))\n", "\n", "# How about if we use a range\n", "dataRange = range(1, 10001)\n", "rangeRDD = sc.parallelize(dataRange, 8)\n", "print 'type of dataRangeRDD: {0}'.format(type(rangeRDD))" ] }, { "cell_type": "code", "execution_count": 17, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "xrangeRDD id: 2\n", "rangeRDD id: 1\n" ] } ], "source": [ "# Each RDD gets a unique ID\n", "print 'xrangeRDD id: {0}'.format(xrangeRDD.id())\n", "print 'rangeRDD id: {0}'.format(rangeRDD.id())" ] }, { "cell_type": "code", "execution_count": 18, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "My first RDD PythonRDD[2] at RDD at PythonRDD.scala:43" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# We can name each newly created RDD using the setName() method\n", "xrangeRDD.setName('My first RDD')" ] }, { "cell_type": "code", "execution_count": 19, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "(8) My first RDD PythonRDD[2] at RDD at PythonRDD.scala:43 []\n", " | ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:392 []\n" ] } ], "source": [ "# Let's view the lineage (the set of transformations) of the RDD using toDebugString()\n", "print xrangeRDD.toDebugString()" ] }, { "cell_type": "code", "execution_count": 20, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Help on PipelinedRDD in module pyspark.rdd object:\n", "\n", "class PipelinedRDD(RDD)\n", " | Pipelined maps:\n", " | \n", " | >>> rdd = sc.parallelize([1, 2, 3, 4])\n", " | >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect()\n", " | [4, 8, 12, 16]\n", " | >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect()\n", " | [4, 8, 12, 16]\n", " | \n", " | Pipelined reduces:\n", " | >>> from operator import add\n", " | >>> rdd.map(lambda x: 2 * x).reduce(add)\n", " | 20\n", " | >>> rdd.flatMap(lambda x: [x, x]).reduce(add)\n", " | 20\n", " | \n", " | Method resolution order:\n", " | PipelinedRDD\n", " | RDD\n", " | __builtin__.object\n", " | \n", " | Methods defined here:\n", " | \n", " | __del__(self)\n", " | \n", " | __init__(self, prev, func, preservesPartitioning=False)\n", " | \n", " | id(self)\n", " | \n", " | ----------------------------------------------------------------------\n", " | Methods inherited from RDD:\n", " | \n", " | __add__(self, other)\n", " | Return the union of this RDD and another one.\n", " | \n", " | >>> rdd = sc.parallelize([1, 1, 2, 3])\n", " | >>> (rdd + rdd).collect()\n", " | [1, 1, 2, 3, 1, 1, 2, 3]\n", " | \n", " | __getnewargs__(self)\n", " | \n", " | __repr__(self)\n", " | \n", " | aggregate(self, zeroValue, seqOp, combOp)\n", " | Aggregate the elements of each partition, and then the results for all\n", " | the partitions, using a given combine functions and a neutral \"zero\n", " | value.\"\n", " | \n", " | The functions C{op(t1, t2)} is allowed to modify C{t1} and return it\n", " | as its result value to avoid object allocation; however, it should not\n", " | modify C{t2}.\n", " | \n", " | The first function (seqOp) can return a different result type, U, than\n", " | the type of this RDD. Thus, we need one operation for merging a T into\n", " | an U and one operation for merging two U\n", " | \n", " | >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))\n", " | >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))\n", " | >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)\n", " | (10, 4)\n", " | >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp)\n", " | (0, 0)\n", " | \n", " | aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None)\n", " | Aggregate the values of each key, using given combine functions and a neutral\n", " | \"zero value\". This function can return a different result type, U, than the type\n", " | of the values in this RDD, V. Thus, we need one operation for merging a V into\n", " | a U and one operation for merging two U's, The former operation is used for merging\n", " | values within a partition, and the latter is used for merging values between\n", " | partitions. To avoid memory allocation, both of these functions are\n", " | allowed to modify and return their first argument instead of creating a new U.\n", " | \n", " | cache(self)\n", " | Persist this RDD with the default storage level (C{MEMORY_ONLY_SER}).\n", " | \n", " | cartesian(self, other)\n", " | Return the Cartesian product of this RDD and another one, that is, the\n", " | RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and\n", " | C{b} is in C{other}.\n", " | \n", " | >>> rdd = sc.parallelize([1, 2])\n", " | >>> sorted(rdd.cartesian(rdd).collect())\n", " | [(1, 1), (1, 2), (2, 1), (2, 2)]\n", " | \n", " | checkpoint(self)\n", " | Mark this RDD for checkpointing. It will be saved to a file inside the\n", " | checkpoint directory set with L{SparkContext.setCheckpointDir()} and\n", " | all references to its parent RDDs will be removed. This function must\n", " | be called before any job has been executed on this RDD. It is strongly\n", " | recommended that this RDD is persisted in memory, otherwise saving it\n", " | on a file will require recomputation.\n", " | \n", " | coalesce(self, numPartitions, shuffle=False)\n", " | Return a new RDD that is reduced into `numPartitions` partitions.\n", " | \n", " | >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()\n", " | [[1], [2, 3], [4, 5]]\n", " | >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()\n", " | [[1, 2, 3, 4, 5]]\n", " | \n", " | cogroup(self, other, numPartitions=None)\n", " | For each key k in C{self} or C{other}, return a resulting RDD that\n", " | contains a tuple with the list of values for that key in C{self} as\n", " | well as C{other}.\n", " | \n", " | >>> x = sc.parallelize([(\"a\", 1), (\"b\", 4)])\n", " | >>> y = sc.parallelize([(\"a\", 2)])\n", " | >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect())))\n", " | [('a', ([1], [2])), ('b', ([4], []))]\n", " | \n", " | collect(self)\n", " | Return a list that contains all of the elements in this RDD.\n", " | \n", " | collectAsMap(self)\n", " | Return the key-value pairs in this RDD to the master as a dictionary.\n", " | \n", " | >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()\n", " | >>> m[1]\n", " | 2\n", " | >>> m[3]\n", " | 4\n", " | \n", " | combineByKey(self, createCombiner, mergeValue, mergeCombiners, numPartitions=None)\n", " | Generic function to combine the elements for each key using a custom\n", " | set of aggregation functions.\n", " | \n", " | Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a \"combined\n", " | type\" C. Note that V and C can be different -- for example, one might\n", " | group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).\n", " | \n", " | Users provide three functions:\n", " | \n", " | - C{createCombiner}, which turns a V into a C (e.g., creates\n", " | a one-element list)\n", " | - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of\n", " | a list)\n", " | - C{mergeCombiners}, to combine two C's into a single one.\n", " | \n", " | In addition, users can control the partitioning of the output RDD.\n", " | \n", " | >>> x = sc.parallelize([(\"a\", 1), (\"b\", 1), (\"a\", 1)])\n", " | >>> def f(x): return x\n", " | >>> def add(a, b): return a + str(b)\n", " | >>> sorted(x.combineByKey(str, add, add).collect())\n", " | [('a', '11'), ('b', '1')]\n", " | \n", " | count(self)\n", " | Return the number of elements in this RDD.\n", " | \n", " | >>> sc.parallelize([2, 3, 4]).count()\n", " | 3\n", " | \n", " | countApprox(self, timeout, confidence=0.95)\n", " | .. note:: Experimental\n", " | \n", " | Approximate version of count() that returns a potentially incomplete\n", " | result within a timeout, even if not all tasks have finished.\n", " | \n", " | >>> rdd = sc.parallelize(range(1000), 10)\n", " | >>> rdd.countApprox(1000, 1.0)\n", " | 1000\n", " | \n", " | countApproxDistinct(self, relativeSD=0.05)\n", " | .. note:: Experimental\n", " | \n", " | Return approximate number of distinct elements in the RDD.\n", " | \n", " | The algorithm used is based on streamlib's implementation of\n", " | \"HyperLogLog in Practice: Algorithmic Engineering of a State\n", " | of The Art Cardinality Estimation Algorithm\", available\n", " | here.\n", " | \n", " | :param relativeSD: Relative accuracy. Smaller values create\n", " | counters that require more space.\n", " | It must be greater than 0.000017.\n", " | \n", " | >>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct()\n", " | >>> 950 < n < 1050\n", " | True\n", " | >>> n = sc.parallelize([i % 20 for i in range(1000)]).countApproxDistinct()\n", " | >>> 18 < n < 22\n", " | True\n", " | \n", " | countByKey(self)\n", " | Count the number of elements for each key, and return the result to the\n", " | master as a dictionary.\n", " | \n", " | >>> rdd = sc.parallelize([(\"a\", 1), (\"b\", 1), (\"a\", 1)])\n", " | >>> sorted(rdd.countByKey().items())\n", " | [('a', 2), ('b', 1)]\n", " | \n", " | countByValue(self)\n", " | Return the count of each unique value in this RDD as a dictionary of\n", " | (value, count) pairs.\n", " | \n", " | >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())\n", " | [(1, 2), (2, 3)]\n", " | \n", " | distinct(self, numPartitions=None)\n", " | Return a new RDD containing the distinct elements in this RDD.\n", " | \n", " | >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())\n", " | [1, 2, 3]\n", " | \n", " | filter(self, f)\n", " | Return a new RDD containing only the elements that satisfy a predicate.\n", " | \n", " | >>> rdd = sc.parallelize([1, 2, 3, 4, 5])\n", " | >>> rdd.filter(lambda x: x % 2 == 0).collect()\n", " | [2, 4]\n", " | \n", " | first(self)\n", " | Return the first element in this RDD.\n", " | \n", " | >>> sc.parallelize([2, 3, 4]).first()\n", " | 2\n", " | >>> sc.parallelize([]).first()\n", " | Traceback (most recent call last):\n", " | ...\n", " | ValueError: RDD is empty\n", " | \n", " | flatMap(self, f, preservesPartitioning=False)\n", " | Return a new RDD by first applying a function to all elements of this\n", " | RDD, and then flattening the results.\n", " | \n", " | >>> rdd = sc.parallelize([2, 3, 4])\n", " | >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())\n", " | [1, 1, 1, 2, 2, 3]\n", " | >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())\n", " | [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]\n", " | \n", " | flatMapValues(self, f)\n", " | Pass each value in the key-value pair RDD through a flatMap function\n", " | without changing the keys; this also retains the original RDD's\n", " | partitioning.\n", " | \n", " | >>> x = sc.parallelize([(\"a\", [\"x\", \"y\", \"z\"]), (\"b\", [\"p\", \"r\"])])\n", " | >>> def f(x): return x\n", " | >>> x.flatMapValues(f).collect()\n", " | [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]\n", " | \n", " | fold(self, zeroValue, op)\n", " | Aggregate the elements of each partition, and then the results for all\n", " | the partitions, using a given associative function and a neutral \"zero\n", " | value.\"\n", " | \n", " | The function C{op(t1, t2)} is allowed to modify C{t1} and return it\n", " | as its result value to avoid object allocation; however, it should not\n", " | modify C{t2}.\n", " | \n", " | >>> from operator import add\n", " | >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)\n", " | 15\n", " | \n", " | foldByKey(self, zeroValue, func, numPartitions=None)\n", " | Merge the values for each key using an associative function \"func\"\n", " | and a neutral \"zeroValue\" which may be added to the result an\n", " | arbitrary number of times, and must not change the result\n", " | (e.g., 0 for addition, or 1 for multiplication.).\n", " | \n", " | >>> rdd = sc.parallelize([(\"a\", 1), (\"b\", 1), (\"a\", 1)])\n", " | >>> from operator import add\n", " | >>> rdd.foldByKey(0, add).collect()\n", " | [('a', 2), ('b', 1)]\n", " | \n", " | foreach(self, f)\n", " | Applies a function to all elements of this RDD.\n", " | \n", " | >>> def f(x): print x\n", " | >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)\n", " | \n", " | foreachPartition(self, f)\n", " | Applies a function to each partition of this RDD.\n", " | \n", " | >>> def f(iterator):\n", " | ... for x in iterator:\n", " | ... print x\n", " | >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)\n", " | \n", " | fullOuterJoin(self, other, numPartitions=None)\n", " | Perform a right outer join of C{self} and C{other}.\n", " | \n", " | For each element (k, v) in C{self}, the resulting RDD will either\n", " | contain all pairs (k, (v, w)) for w in C{other}, or the pair\n", " | (k, (v, None)) if no elements in C{other} have key k.\n", " | \n", " | Similarly, for each element (k, w) in C{other}, the resulting RDD will\n", " | either contain all pairs (k, (v, w)) for v in C{self}, or the pair\n", " | (k, (None, w)) if no elements in C{self} have key k.\n", " | \n", " | Hash-partitions the resulting RDD into the given number of partitions.\n", " | \n", " | >>> x = sc.parallelize([(\"a\", 1), (\"b\", 4)])\n", " | >>> y = sc.parallelize([(\"a\", 2), (\"c\", 8)])\n", " | >>> sorted(x.fullOuterJoin(y).collect())\n", " | [('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]\n", " | \n", " | getCheckpointFile(self)\n", " | Gets the name of the file to which this RDD was checkpointed\n", " | \n", " | getNumPartitions(self)\n", " | Returns the number of partitions in RDD\n", " | \n", " | >>> rdd = sc.parallelize([1, 2, 3, 4], 2)\n", " | >>> rdd.getNumPartitions()\n", " | 2\n", " | \n", " | getStorageLevel(self)\n", " | Get the RDD's current storage level.\n", " | \n", " | >>> rdd1 = sc.parallelize([1,2])\n", " | >>> rdd1.getStorageLevel()\n", " | StorageLevel(False, False, False, False, 1)\n", " | >>> print(rdd1.getStorageLevel())\n", " | Serialized 1x Replicated\n", " | \n", " | glom(self)\n", " | Return an RDD created by coalescing all elements within each partition\n", " | into a list.\n", " | \n", " | >>> rdd = sc.parallelize([1, 2, 3, 4], 2)\n", " | >>> sorted(rdd.glom().collect())\n", " | [[1, 2], [3, 4]]\n", " | \n", " | groupBy(self, f, numPartitions=None)\n", " | Return an RDD of grouped items.\n", " | \n", " | >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])\n", " | >>> result = rdd.groupBy(lambda x: x % 2).collect()\n", " | >>> sorted([(x, sorted(y)) for (x, y) in result])\n", " | [(0, [2, 8]), (1, [1, 1, 3, 5])]\n", " | \n", " | groupByKey(self, numPartitions=None)\n", " | Group the values for each key in the RDD into a single sequence.\n", " | Hash-partitions the resulting RDD with into numPartitions partitions.\n", " | \n", " | Note: If you are grouping in order to perform an aggregation (such as a\n", " | sum or average) over each key, using reduceByKey or aggregateByKey will\n", " | provide much better performance.\n", " | \n", " | >>> x = sc.parallelize([(\"a\", 1), (\"b\", 1), (\"a\", 1)])\n", " | >>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect()))\n", " | [('a', [1, 1]), ('b', [1])]\n", " | \n", " | groupWith(self, other, *others)\n", " | Alias for cogroup but with support for multiple RDDs.\n", " | \n", " | >>> w = sc.parallelize([(\"a\", 5), (\"b\", 6)])\n", " | >>> x = sc.parallelize([(\"a\", 1), (\"b\", 4)])\n", " | >>> y = sc.parallelize([(\"a\", 2)])\n", " | >>> z = sc.parallelize([(\"b\", 42)])\n", " | >>> map((lambda (x,y): (x, (list(y[0]), list(y[1]), list(y[2]), list(y[3])))), sorted(list(w.groupWith(x, y, z).collect())))\n", " | [('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))]\n", " | \n", " | histogram(self, buckets)\n", " | Compute a histogram using the provided buckets. The buckets\n", " | are all open to the right except for the last which is closed.\n", " | e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50],\n", " | which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1\n", " | and 50 we would have a histogram of 1,0,1.\n", " | \n", " | If your histogram is evenly spaced (e.g. [0, 10, 20, 30]),\n", " | this can be switched from an O(log n) inseration to O(1) per\n", " | element(where n = # buckets).\n", " | \n", " | Buckets must be sorted and not contain any duplicates, must be\n", " | at least two elements.\n", " | \n", " | If `buckets` is a number, it will generates buckets which are\n", " | evenly spaced between the minimum and maximum of the RDD. For\n", " | example, if the min value is 0 and the max is 100, given buckets\n", " | as 2, the resulting buckets will be [0,50) [50,100]. buckets must\n", " | be at least 1 If the RDD contains infinity, NaN throws an exception\n", " | If the elements in RDD do not vary (max == min) always returns\n", " | a single bucket.\n", " | \n", " | It will return an tuple of buckets and histogram.\n", " | \n", " | >>> rdd = sc.parallelize(range(51))\n", " | >>> rdd.histogram(2)\n", " | ([0, 25, 50], [25, 26])\n", " | >>> rdd.histogram([0, 5, 25, 50])\n", " | ([0, 5, 25, 50], [5, 20, 26])\n", " | >>> rdd.histogram([0, 15, 30, 45, 60]) # evenly spaced buckets\n", " | ([0, 15, 30, 45, 60], [15, 15, 15, 6])\n", " | >>> rdd = sc.parallelize([\"ab\", \"ac\", \"b\", \"bd\", \"ef\"])\n", " | >>> rdd.histogram((\"a\", \"b\", \"c\"))\n", " | (('a', 'b', 'c'), [2, 2])\n", " | \n", " | intersection(self, other)\n", " | Return the intersection of this RDD and another one. The output will\n", " | not contain any duplicate elements, even if the input RDDs did.\n", " | \n", " | Note that this method performs a shuffle internally.\n", " | \n", " | >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])\n", " | >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])\n", " | >>> rdd1.intersection(rdd2).collect()\n", " | [1, 2, 3]\n", " | \n", " | isCheckpointed(self)\n", " | Return whether this RDD has been checkpointed or not\n", " | \n", " | isEmpty(self)\n", " | Returns true if and only if the RDD contains no elements at all. Note that an RDD\n", " | may be empty even when it has at least 1 partition.\n", " | \n", " | >>> sc.parallelize([]).isEmpty()\n", " | True\n", " | >>> sc.parallelize([1]).isEmpty()\n", " | False\n", " | \n", " | join(self, other, numPartitions=None)\n", " | Return an RDD containing all pairs of elements with matching keys in\n", " | C{self} and C{other}.\n", " | \n", " | Each pair of elements will be returned as a (k, (v1, v2)) tuple, where\n", " | (k, v1) is in C{self} and (k, v2) is in C{other}.\n", " | \n", " | Performs a hash join across the cluster.\n", " | \n", " | >>> x = sc.parallelize([(\"a\", 1), (\"b\", 4)])\n", " | >>> y = sc.parallelize([(\"a\", 2), (\"a\", 3)])\n", " | >>> sorted(x.join(y).collect())\n", " | [('a', (1, 2)), ('a', (1, 3))]\n", " | \n", " | keyBy(self, f)\n", " | Creates tuples of the elements in this RDD by applying C{f}.\n", " | \n", " | >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)\n", " | >>> y = sc.parallelize(zip(range(0,5), range(0,5)))\n", " | >>> map((lambda (x,y): (x, (list(y[0]), (list(y[1]))))), sorted(x.cogroup(y).collect()))\n", " | [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))]\n", " | \n", " | keys(self)\n", " | Return an RDD with the keys of each tuple.\n", " | \n", " | >>> m = sc.parallelize([(1, 2), (3, 4)]).keys()\n", " | >>> m.collect()\n", " | [1, 3]\n", " | \n", " | leftOuterJoin(self, other, numPartitions=None)\n", " | Perform a left outer join of C{self} and C{other}.\n", " | \n", " | For each element (k, v) in C{self}, the resulting RDD will either\n", " | contain all pairs (k, (v, w)) for w in C{other}, or the pair\n", " | (k, (v, None)) if no elements in C{other} have key k.\n", " | \n", " | Hash-partitions the resulting RDD into the given number of partitions.\n", " | \n", " | >>> x = sc.parallelize([(\"a\", 1), (\"b\", 4)])\n", " | >>> y = sc.parallelize([(\"a\", 2)])\n", " | >>> sorted(x.leftOuterJoin(y).collect())\n", " | [('a', (1, 2)), ('b', (4, None))]\n", " | \n", " | lookup(self, key)\n", " | Return the list of values in the RDD for key `key`. This operation\n", " | is done efficiently if the RDD has a known partitioner by only\n", " | searching the partition that the key maps to.\n", " | \n", " | >>> l = range(1000)\n", " | >>> rdd = sc.parallelize(zip(l, l), 10)\n", " | >>> rdd.lookup(42) # slow\n", " | [42]\n", " | >>> sorted = rdd.sortByKey()\n", " | >>> sorted.lookup(42) # fast\n", " | [42]\n", " | >>> sorted.lookup(1024)\n", " | []\n", " | \n", " | map(self, f, preservesPartitioning=False)\n", " | Return a new RDD by applying a function to each element of this RDD.\n", " | \n", " | >>> rdd = sc.parallelize([\"b\", \"a\", \"c\"])\n", " | >>> sorted(rdd.map(lambda x: (x, 1)).collect())\n", " | [('a', 1), ('b', 1), ('c', 1)]\n", " | \n", " | mapPartitions(self, f, preservesPartitioning=False)\n", " | Return a new RDD by applying a function to each partition of this RDD.\n", " | \n", " | >>> rdd = sc.parallelize([1, 2, 3, 4], 2)\n", " | >>> def f(iterator): yield sum(iterator)\n", " | >>> rdd.mapPartitions(f).collect()\n", " | [3, 7]\n", " | \n", " | mapPartitionsWithIndex(self, f, preservesPartitioning=False)\n", " | Return a new RDD by applying a function to each partition of this RDD,\n", " | while tracking the index of the original partition.\n", " | \n", " | >>> rdd = sc.parallelize([1, 2, 3, 4], 4)\n", " | >>> def f(splitIndex, iterator): yield splitIndex\n", " | >>> rdd.mapPartitionsWithIndex(f).sum()\n", " | 6\n", " | \n", " | mapPartitionsWithSplit(self, f, preservesPartitioning=False)\n", " | Deprecated: use mapPartitionsWithIndex instead.\n", " | \n", " | Return a new RDD by applying a function to each partition of this RDD,\n", " | while tracking the index of the original partition.\n", " | \n", " | >>> rdd = sc.parallelize([1, 2, 3, 4], 4)\n", " | >>> def f(splitIndex, iterator): yield splitIndex\n", " | >>> rdd.mapPartitionsWithSplit(f).sum()\n", " | 6\n", " | \n", " | mapValues(self, f)\n", " | Pass each value in the key-value pair RDD through a map function\n", " | without changing the keys; this also retains the original RDD's\n", " | partitioning.\n", " | \n", " | >>> x = sc.parallelize([(\"a\", [\"apple\", \"banana\", \"lemon\"]), (\"b\", [\"grapes\"])])\n", " | >>> def f(x): return len(x)\n", " | >>> x.mapValues(f).collect()\n", " | [('a', 3), ('b', 1)]\n", " | \n", " | max(self, key=None)\n", " | Find the maximum item in this RDD.\n", " | \n", " | :param key: A function used to generate key for comparing\n", " | \n", " | >>> rdd = sc.parallelize([1.0, 5.0, 43.0, 10.0])\n", " | >>> rdd.max()\n", " | 43.0\n", " | >>> rdd.max(key=str)\n", " | 5.0\n", " | \n", " | mean(self)\n", " | Compute the mean of this RDD's elements.\n", " | \n", " | >>> sc.parallelize([1, 2, 3]).mean()\n", " | 2.0\n", " | \n", " | meanApprox(self, timeout, confidence=0.95)\n", " | .. note:: Experimental\n", " | \n", " | Approximate operation to return the mean within a timeout\n", " | or meet the confidence.\n", " | \n", " | >>> rdd = sc.parallelize(range(1000), 10)\n", " | >>> r = sum(xrange(1000)) / 1000.0\n", " | >>> (rdd.meanApprox(1000) - r) / r < 0.05\n", " | True\n", " | \n", " | min(self, key=None)\n", " | Find the minimum item in this RDD.\n", " | \n", " | :param key: A function used to generate key for comparing\n", " | \n", " | >>> rdd = sc.parallelize([2.0, 5.0, 43.0, 10.0])\n", " | >>> rdd.min()\n", " | 2.0\n", " | >>> rdd.min(key=str)\n", " | 10.0\n", " | \n", " | name(self)\n", " | Return the name of this RDD.\n", " | \n", " | partitionBy(self, numPartitions, partitionFunc=)\n", " | Return a copy of the RDD partitioned using the specified partitioner.\n", " | \n", " | >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))\n", " | >>> sets = pairs.partitionBy(2).glom().collect()\n", " | >>> set(sets[0]).intersection(set(sets[1]))\n", " | set([])\n", " | \n", " | persist(self, storageLevel=StorageLevel(False, True, False, False, 1))\n", " | Set this RDD's storage level to persist its values across operations\n", " | after the first time it is computed. This can only be used to assign\n", " | a new storage level if the RDD does not have a storage level set yet.\n", " | If no storage level is specified defaults to (C{MEMORY_ONLY_SER}).\n", " | \n", " | >>> rdd = sc.parallelize([\"b\", \"a\", \"c\"])\n", " | >>> rdd.persist().is_cached\n", " | True\n", " | \n", " | pipe(self, command, env={})\n", " | Return an RDD created by piping elements to a forked external process.\n", " | \n", " | >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()\n", " | ['1', '2', '', '3']\n", " | \n", " | randomSplit(self, weights, seed=None)\n", " | Randomly splits this RDD with the provided weights.\n", " | \n", " | :param weights: weights for splits, will be normalized if they don't sum to 1\n", " | :param seed: random seed\n", " | :return: split RDDs in a list\n", " | \n", " | >>> rdd = sc.parallelize(range(5), 1)\n", " | >>> rdd1, rdd2 = rdd.randomSplit([2, 3], 17)\n", " | >>> rdd1.collect()\n", " | [1, 3]\n", " | >>> rdd2.collect()\n", " | [0, 2, 4]\n", " | \n", " | reduce(self, f)\n", " | Reduces the elements of this RDD using the specified commutative and\n", " | associative binary operator. Currently reduces partitions locally.\n", " | \n", " | >>> from operator import add\n", " | >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)\n", " | 15\n", " | >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)\n", " | 10\n", " | >>> sc.parallelize([]).reduce(add)\n", " | Traceback (most recent call last):\n", " | ...\n", " | ValueError: Can not reduce() empty RDD\n", " | \n", " | reduceByKey(self, func, numPartitions=None)\n", " | Merge the values for each key using an associative reduce function.\n", " | \n", " | This will also perform the merging locally on each mapper before\n", " | sending results to a reducer, similarly to a \"combiner\" in MapReduce.\n", " | \n", " | Output will be hash-partitioned with C{numPartitions} partitions, or\n", " | the default parallelism level if C{numPartitions} is not specified.\n", " | \n", " | >>> from operator import add\n", " | >>> rdd = sc.parallelize([(\"a\", 1), (\"b\", 1), (\"a\", 1)])\n", " | >>> sorted(rdd.reduceByKey(add).collect())\n", " | [('a', 2), ('b', 1)]\n", " | \n", " | reduceByKeyLocally(self, func)\n", " | Merge the values for each key using an associative reduce function, but\n", " | return the results immediately to the master as a dictionary.\n", " | \n", " | This will also perform the merging locally on each mapper before\n", " | sending results to a reducer, similarly to a \"combiner\" in MapReduce.\n", " | \n", " | >>> from operator import add\n", " | >>> rdd = sc.parallelize([(\"a\", 1), (\"b\", 1), (\"a\", 1)])\n", " | >>> sorted(rdd.reduceByKeyLocally(add).items())\n", " | [('a', 2), ('b', 1)]\n", " | \n", " | repartition(self, numPartitions)\n", " | Return a new RDD that has exactly numPartitions partitions.\n", " | \n", " | Can increase or decrease the level of parallelism in this RDD.\n", " | Internally, this uses a shuffle to redistribute data.\n", " | If you are decreasing the number of partitions in this RDD, consider\n", " | using `coalesce`, which can avoid performing a shuffle.\n", " | \n", " | >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)\n", " | >>> sorted(rdd.glom().collect())\n", " | [[1], [2, 3], [4, 5], [6, 7]]\n", " | >>> len(rdd.repartition(2).glom().collect())\n", " | 2\n", " | >>> len(rdd.repartition(10).glom().collect())\n", " | 10\n", " | \n", " | repartitionAndSortWithinPartitions(self, numPartitions=None, partitionFunc=, ascending=True, keyfunc=>)\n", " | Repartition the RDD according to the given partitioner and, within each resulting partition,\n", " | sort records by their keys.\n", " | \n", " | >>> rdd = sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)])\n", " | >>> rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: x % 2, 2)\n", " | >>> rdd2.glom().collect()\n", " | [[(0, 5), (0, 8), (2, 6)], [(1, 3), (3, 8), (3, 8)]]\n", " | \n", " | rightOuterJoin(self, other, numPartitions=None)\n", " | Perform a right outer join of C{self} and C{other}.\n", " | \n", " | For each element (k, w) in C{other}, the resulting RDD will either\n", " | contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w))\n", " | if no elements in C{self} have key k.\n", " | \n", " | Hash-partitions the resulting RDD into the given number of partitions.\n", " | \n", " | >>> x = sc.parallelize([(\"a\", 1), (\"b\", 4)])\n", " | >>> y = sc.parallelize([(\"a\", 2)])\n", " | >>> sorted(y.rightOuterJoin(x).collect())\n", " | [('a', (2, 1)), ('b', (None, 4))]\n", " | \n", " | sample(self, withReplacement, fraction, seed=None)\n", " | Return a sampled subset of this RDD.\n", " | \n", " | >>> rdd = sc.parallelize(range(100), 4)\n", " | >>> rdd.sample(False, 0.1, 81).count()\n", " | 10\n", " | \n", " | sampleByKey(self, withReplacement, fractions, seed=None)\n", " | Return a subset of this RDD sampled by key (via stratified sampling).\n", " | Create a sample of this RDD using variable sampling rates for\n", " | different keys as specified by fractions, a key to sampling rate map.\n", " | \n", " | >>> fractions = {\"a\": 0.2, \"b\": 0.1}\n", " | >>> rdd = sc.parallelize(fractions.keys()).cartesian(sc.parallelize(range(0, 1000)))\n", " | >>> sample = dict(rdd.sampleByKey(False, fractions, 2).groupByKey().collect())\n", " | >>> 100 < len(sample[\"a\"]) < 300 and 50 < len(sample[\"b\"]) < 150\n", " | True\n", " | >>> max(sample[\"a\"]) <= 999 and min(sample[\"a\"]) >= 0\n", " | True\n", " | >>> max(sample[\"b\"]) <= 999 and min(sample[\"b\"]) >= 0\n", " | True\n", " | \n", " | sampleStdev(self)\n", " | Compute the sample standard deviation of this RDD's elements (which\n", " | corrects for bias in estimating the standard deviation by dividing by\n", " | N-1 instead of N).\n", " | \n", " | >>> sc.parallelize([1, 2, 3]).sampleStdev()\n", " | 1.0\n", " | \n", " | sampleVariance(self)\n", " | Compute the sample variance of this RDD's elements (which corrects\n", " | for bias in estimating the variance by dividing by N-1 instead of N).\n", " | \n", " | >>> sc.parallelize([1, 2, 3]).sampleVariance()\n", " | 1.0\n", " | \n", " | saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None)\n", " | Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file\n", " | system, using the old Hadoop OutputFormat API (mapred package). Keys/values are\n", " | converted for output using either user specified converters or, by default,\n", " | L{org.apache.spark.api.python.JavaToWritableConverter}.\n", " | \n", " | :param conf: Hadoop job configuration, passed in as a dict\n", " | :param keyConverter: (None by default)\n", " | :param valueConverter: (None by default)\n", " | \n", " | saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, conf=None, compressionCodecClass=None)\n", " | Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file\n", " | system, using the old Hadoop OutputFormat API (mapred package). Key and value types\n", " | will be inferred if not specified. Keys and values are converted for output using either\n", " | user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The\n", " | C{conf} is applied on top of the base Hadoop conf associated with the SparkContext\n", " | of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.\n", " | \n", " | :param path: path to Hadoop file\n", " | :param outputFormatClass: fully qualified classname of Hadoop OutputFormat\n", " | (e.g. \"org.apache.hadoop.mapred.SequenceFileOutputFormat\")\n", " | :param keyClass: fully qualified classname of key Writable class\n", " | (e.g. \"org.apache.hadoop.io.IntWritable\", None by default)\n", " | :param valueClass: fully qualified classname of value Writable class\n", " | (e.g. \"org.apache.hadoop.io.Text\", None by default)\n", " | :param keyConverter: (None by default)\n", " | :param valueConverter: (None by default)\n", " | :param conf: (None by default)\n", " | :param compressionCodecClass: (None by default)\n", " | \n", " | saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None)\n", " | Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file\n", " | system, using the new Hadoop OutputFormat API (mapreduce package). Keys/values are\n", " | converted for output using either user specified converters or, by default,\n", " | L{org.apache.spark.api.python.JavaToWritableConverter}.\n", " | \n", " | :param conf: Hadoop job configuration, passed in as a dict\n", " | :param keyConverter: (None by default)\n", " | :param valueConverter: (None by default)\n", " | \n", " | saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, conf=None)\n", " | Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file\n", " | system, using the new Hadoop OutputFormat API (mapreduce package). Key and value types\n", " | will be inferred if not specified. Keys and values are converted for output using either\n", " | user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The\n", " | C{conf} is applied on top of the base Hadoop conf associated with the SparkContext\n", " | of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.\n", " | \n", " | :param path: path to Hadoop file\n", " | :param outputFormatClass: fully qualified classname of Hadoop OutputFormat\n", " | (e.g. \"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat\")\n", " | :param keyClass: fully qualified classname of key Writable class\n", " | (e.g. \"org.apache.hadoop.io.IntWritable\", None by default)\n", " | :param valueClass: fully qualified classname of value Writable class\n", " | (e.g. \"org.apache.hadoop.io.Text\", None by default)\n", " | :param keyConverter: (None by default)\n", " | :param valueConverter: (None by default)\n", " | :param conf: Hadoop job configuration, passed in as a dict (None by default)\n", " | \n", " | saveAsPickleFile(self, path, batchSize=10)\n", " | Save this RDD as a SequenceFile of serialized objects. The serializer\n", " | used is L{pyspark.serializers.PickleSerializer}, default batch size\n", " | is 10.\n", " | \n", " | >>> tmpFile = NamedTemporaryFile(delete=True)\n", " | >>> tmpFile.close()\n", " | >>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tmpFile.name, 3)\n", " | >>> sorted(sc.pickleFile(tmpFile.name, 5).collect())\n", " | [1, 2, 'rdd', 'spark']\n", " | \n", " | saveAsSequenceFile(self, path, compressionCodecClass=None)\n", " | Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file\n", " | system, using the L{org.apache.hadoop.io.Writable} types that we convert from the\n", " | RDD's key and value types. The mechanism is as follows:\n", " | \n", " | 1. Pyrolite is used to convert pickled Python RDD into RDD of Java objects.\n", " | 2. Keys and values of this Java RDD are converted to Writables and written out.\n", " | \n", " | :param path: path to sequence file\n", " | :param compressionCodecClass: (None by default)\n", " | \n", " | saveAsTextFile(self, path, compressionCodecClass=None)\n", " | Save this RDD as a text file, using string representations of elements.\n", " | \n", " | @param path: path to text file\n", " | @param compressionCodecClass: (None by default) string i.e.\n", " | \"org.apache.hadoop.io.compress.GzipCodec\"\n", " | \n", " | >>> tempFile = NamedTemporaryFile(delete=True)\n", " | >>> tempFile.close()\n", " | >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name)\n", " | >>> from fileinput import input\n", " | >>> from glob import glob\n", " | >>> ''.join(sorted(input(glob(tempFile.name + \"/part-0000*\"))))\n", " | '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n'\n", " | \n", " | Empty lines are tolerated when saving to text files.\n", " | \n", " | >>> tempFile2 = NamedTemporaryFile(delete=True)\n", " | >>> tempFile2.close()\n", " | >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name)\n", " | >>> ''.join(sorted(input(glob(tempFile2.name + \"/part-0000*\"))))\n", " | '\\n\\n\\nbar\\nfoo\\n'\n", " | \n", " | Using compressionCodecClass\n", " | \n", " | >>> tempFile3 = NamedTemporaryFile(delete=True)\n", " | >>> tempFile3.close()\n", " | >>> codec = \"org.apache.hadoop.io.compress.GzipCodec\"\n", " | >>> sc.parallelize(['foo', 'bar']).saveAsTextFile(tempFile3.name, codec)\n", " | >>> from fileinput import input, hook_compressed\n", " | >>> ''.join(sorted(input(glob(tempFile3.name + \"/part*.gz\"), openhook=hook_compressed)))\n", " | 'bar\\nfoo\\n'\n", " | \n", " | setName(self, name)\n", " | Assign a name to this RDD.\n", " | \n", " | >>> rdd1 = sc.parallelize([1,2])\n", " | >>> rdd1.setName('RDD1').name()\n", " | 'RDD1'\n", " | \n", " | sortBy(self, keyfunc, ascending=True, numPartitions=None)\n", " | Sorts this RDD by the given keyfunc\n", " | \n", " | >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]\n", " | >>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()\n", " | [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]\n", " | >>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()\n", " | [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]\n", " | \n", " | sortByKey(self, ascending=True, numPartitions=None, keyfunc=>)\n", " | Sorts this RDD, which is assumed to consist of (key, value) pairs.\n", " | # noqa\n", " | \n", " | >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]\n", " | >>> sc.parallelize(tmp).sortByKey().first()\n", " | ('1', 3)\n", " | >>> sc.parallelize(tmp).sortByKey(True, 1).collect()\n", " | [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]\n", " | >>> sc.parallelize(tmp).sortByKey(True, 2).collect()\n", " | [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]\n", " | >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]\n", " | >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])\n", " | >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()\n", " | [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]\n", " | \n", " | stats(self)\n", " | Return a L{StatCounter} object that captures the mean, variance\n", " | and count of the RDD's elements in one operation.\n", " | \n", " | stdev(self)\n", " | Compute the standard deviation of this RDD's elements.\n", " | \n", " | >>> sc.parallelize([1, 2, 3]).stdev()\n", " | 0.816...\n", " | \n", " | subtract(self, other, numPartitions=None)\n", " | Return each value in C{self} that is not contained in C{other}.\n", " | \n", " | >>> x = sc.parallelize([(\"a\", 1), (\"b\", 4), (\"b\", 5), (\"a\", 3)])\n", " | >>> y = sc.parallelize([(\"a\", 3), (\"c\", None)])\n", " | >>> sorted(x.subtract(y).collect())\n", " | [('a', 1), ('b', 4), ('b', 5)]\n", " | \n", " | subtractByKey(self, other, numPartitions=None)\n", " | Return each (key, value) pair in C{self} that has no pair with matching\n", " | key in C{other}.\n", " | \n", " | >>> x = sc.parallelize([(\"a\", 1), (\"b\", 4), (\"b\", 5), (\"a\", 2)])\n", " | >>> y = sc.parallelize([(\"a\", 3), (\"c\", None)])\n", " | >>> sorted(x.subtractByKey(y).collect())\n", " | [('b', 4), ('b', 5)]\n", " | \n", " | sum(self)\n", " | Add up the elements in this RDD.\n", " | \n", " | >>> sc.parallelize([1.0, 2.0, 3.0]).sum()\n", " | 6.0\n", " | \n", " | sumApprox(self, timeout, confidence=0.95)\n", " | .. note:: Experimental\n", " | \n", " | Approximate operation to return the sum within a timeout\n", " | or meet the confidence.\n", " | \n", " | >>> rdd = sc.parallelize(range(1000), 10)\n", " | >>> r = sum(xrange(1000))\n", " | >>> (rdd.sumApprox(1000) - r) / r < 0.05\n", " | True\n", " | \n", " | take(self, num)\n", " | Take the first num elements of the RDD.\n", " | \n", " | It works by first scanning one partition, and use the results from\n", " | that partition to estimate the number of additional partitions needed\n", " | to satisfy the limit.\n", " | \n", " | Translated from the Scala implementation in RDD#take().\n", " | \n", " | >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)\n", " | [2, 3]\n", " | >>> sc.parallelize([2, 3, 4, 5, 6]).take(10)\n", " | [2, 3, 4, 5, 6]\n", " | >>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3)\n", " | [91, 92, 93]\n", " | \n", " | takeOrdered(self, num, key=None)\n", " | Get the N elements from a RDD ordered in ascending order or as\n", " | specified by the optional key function.\n", " | \n", " | >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)\n", " | [1, 2, 3, 4, 5, 6]\n", " | >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)\n", " | [10, 9, 7, 6, 5, 4]\n", " | \n", " | takeSample(self, withReplacement, num, seed=None)\n", " | Return a fixed-size sampled subset of this RDD.\n", " | \n", " | >>> rdd = sc.parallelize(range(0, 10))\n", " | >>> len(rdd.takeSample(True, 20, 1))\n", " | 20\n", " | >>> len(rdd.takeSample(False, 5, 2))\n", " | 5\n", " | >>> len(rdd.takeSample(False, 15, 3))\n", " | 10\n", " | \n", " | toDF(self, schema=None, sampleRatio=None)\n", " | Converts current :class:`RDD` into a :class:`DataFrame`\n", " | \n", " | This is a shorthand for ``sqlContext.createDataFrame(rdd, schema, sampleRatio)``\n", " | \n", " | :param schema: a StructType or list of names of columns\n", " | :param samplingRatio: the sample ratio of rows used for inferring\n", " | :return: a DataFrame\n", " | \n", " | >>> rdd.toDF().collect()\n", " | [Row(name=u'Alice', age=1)]\n", " | \n", " | toDebugString(self)\n", " | A description of this RDD and its recursive dependencies for debugging.\n", " | \n", " | toLocalIterator(self)\n", " | Return an iterator that contains all of the elements in this RDD.\n", " | The iterator will consume as much memory as the largest partition in this RDD.\n", " | >>> rdd = sc.parallelize(range(10))\n", " | >>> [x for x in rdd.toLocalIterator()]\n", " | [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]\n", " | \n", " | top(self, num, key=None)\n", " | Get the top N elements from a RDD.\n", " | \n", " | Note: It returns the list sorted in descending order.\n", " | \n", " | >>> sc.parallelize([10, 4, 2, 12, 3]).top(1)\n", " | [12]\n", " | >>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2)\n", " | [6, 5]\n", " | >>> sc.parallelize([10, 4, 2, 12, 3]).top(3, key=str)\n", " | [4, 3, 2]\n", " | \n", " | treeAggregate(self, zeroValue, seqOp, combOp, depth=2)\n", " | Aggregates the elements of this RDD in a multi-level tree\n", " | pattern.\n", " | \n", " | :param depth: suggested depth of the tree (default: 2)\n", " | \n", " | >>> add = lambda x, y: x + y\n", " | >>> rdd = sc.parallelize([-5, -4, -3, -2, -1, 1, 2, 3, 4], 10)\n", " | >>> rdd.treeAggregate(0, add, add)\n", " | -5\n", " | >>> rdd.treeAggregate(0, add, add, 1)\n", " | -5\n", " | >>> rdd.treeAggregate(0, add, add, 2)\n", " | -5\n", " | >>> rdd.treeAggregate(0, add, add, 5)\n", " | -5\n", " | >>> rdd.treeAggregate(0, add, add, 10)\n", " | -5\n", " | \n", " | treeReduce(self, f, depth=2)\n", " | Reduces the elements of this RDD in a multi-level tree pattern.\n", " | \n", " | :param depth: suggested depth of the tree (default: 2)\n", " | \n", " | >>> add = lambda x, y: x + y\n", " | >>> rdd = sc.parallelize([-5, -4, -3, -2, -1, 1, 2, 3, 4], 10)\n", " | >>> rdd.treeReduce(add)\n", " | -5\n", " | >>> rdd.treeReduce(add, 1)\n", " | -5\n", " | >>> rdd.treeReduce(add, 2)\n", " | -5\n", " | >>> rdd.treeReduce(add, 5)\n", " | -5\n", " | >>> rdd.treeReduce(add, 10)\n", " | -5\n", " | \n", " | union(self, other)\n", " | Return the union of this RDD and another one.\n", " | \n", " | >>> rdd = sc.parallelize([1, 1, 2, 3])\n", " | >>> rdd.union(rdd).collect()\n", " | [1, 1, 2, 3, 1, 1, 2, 3]\n", " | \n", " | unpersist(self)\n", " | Mark the RDD as non-persistent, and remove all blocks for it from\n", " | memory and disk.\n", " | \n", " | values(self)\n", " | Return an RDD with the values of each tuple.\n", " | \n", " | >>> m = sc.parallelize([(1, 2), (3, 4)]).values()\n", " | >>> m.collect()\n", " | [2, 4]\n", " | \n", " | variance(self)\n", " | Compute the variance of this RDD's elements.\n", " | \n", " | >>> sc.parallelize([1, 2, 3]).variance()\n", " | 0.666...\n", " | \n", " | zip(self, other)\n", " | Zips this RDD with another one, returning key-value pairs with the\n", " | first element in each RDD second element in each RDD, etc. Assumes\n", " | that the two RDDs have the same number of partitions and the same\n", " | number of elements in each partition (e.g. one was made through\n", " | a map on the other).\n", " | \n", " | >>> x = sc.parallelize(range(0,5))\n", " | >>> y = sc.parallelize(range(1000, 1005))\n", " | >>> x.zip(y).collect()\n", " | [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]\n", " | \n", " | zipWithIndex(self)\n", " | Zips this RDD with its element indices.\n", " | \n", " | The ordering is first based on the partition index and then the\n", " | ordering of items within each partition. So the first item in\n", " | the first partition gets index 0, and the last item in the last\n", " | partition receives the largest index.\n", " | \n", " | This method needs to trigger a spark job when this RDD contains\n", " | more than one partitions.\n", " | \n", " | >>> sc.parallelize([\"a\", \"b\", \"c\", \"d\"], 3).zipWithIndex().collect()\n", " | [('a', 0), ('b', 1), ('c', 2), ('d', 3)]\n", " | \n", " | zipWithUniqueId(self)\n", " | Zips this RDD with generated unique Long ids.\n", " | \n", " | Items in the kth partition will get ids k, n+k, 2*n+k, ..., where\n", " | n is the number of partitions. So there may exist gaps, but this\n", " | method won't trigger a spark job, which is different from\n", " | L{zipWithIndex}\n", " | \n", " | >>> sc.parallelize([\"a\", \"b\", \"c\", \"d\", \"e\"], 3).zipWithUniqueId().collect()\n", " | [('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)]\n", " | \n", " | ----------------------------------------------------------------------\n", " | Data descriptors inherited from RDD:\n", " | \n", " | __dict__\n", " | dictionary for instance variables (if defined)\n", " | \n", " | __weakref__\n", " | list of weak references to the object (if defined)\n", " | \n", " | context\n", " | The L{SparkContext} that this RDD was created on.\n", "\n" ] } ], "source": [ "# Let's use help to see what methods we can call on this RDD\n", "help(xrangeRDD)" ] }, { "cell_type": "code", "execution_count": 21, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "8" ] }, "execution_count": 21, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Let's see how many partitions the RDD will be split into by using the getNumPartitions()\n", "xrangeRDD.getNumPartitions()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### **(3c): Subtract one from each value using `map`**\n", "#### So far, we've created a distributed dataset that is split into many partitions, where each partition is stored on a single machine in our cluster. Let's look at what happens when we do a basic operation on the dataset. Many useful data analysis operations can be specified as \"do something to each item in the dataset\". These data-parallel operations are convenient because each item in the dataset can be processed individually: the operation on one entry doesn't effect the operations on any of the other entries. Therefore, Spark can parallelize the operation.\n", "#### `map(f)`, the most common Spark transformation, is one such example: it applies a function `f` to each item in the dataset, and outputs the resulting dataset. When you run `map()` on a dataset, a single *stage* of tasks is launched. A *stage* is a group of tasks that all perform the same computation, but on different input data. One task is launched for each partitition, as shown in the example below. A task is a unit of execution that runs on a single machine. When we run `map(f)` within a partition, a new *task* applies `f` to all of the entries in a particular partition, and outputs a new partition. In this example figure, the dataset is broken into four partitions, so four `map()` tasks are launched.\n", "![tasks](http://spark-mooc.github.io/web-assets/images/tasks.png)\n", "#### The figure below shows how this would work on the smaller data set from the earlier figures. Note that one task is launched for each partition.\n", "![foo](http://spark-mooc.github.io/web-assets/images/map.png)\n", "#### When applying the `map()` transformation, each item in the parent RDD will map to one element in the new RDD. So, if the parent RDD has twenty elements, the new RDD will also have twenty items.\n", "#### Now we will use `map()` to subtract one from each value in the base RDD we just created. First, we define a Python function called `sub()` that will subtract one from the input integer. Second, we will pass each item in the base RDD into a `map()` transformation that applies the `sub()` function to each element. And finally, we print out the RDD transformation hierarchy using `toDebugString()`." ] }, { "cell_type": "code", "execution_count": 22, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "(8) PythonRDD[3] at RDD at PythonRDD.scala:43 []\n", " | ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:392 []\n" ] } ], "source": [ "# Create sub function to subtract 1\n", "def sub(value):\n", " \"\"\"\"Subtracts one from `value`.\n", "\n", " Args:\n", " value (int): A number.\n", "\n", " Returns:\n", " int: `value` minus one.\n", " \"\"\"\n", " return (value - 1)\n", "\n", "# Transform xrangeRDD through map transformation using sub function\n", "# Because map is a transformation and Spark uses lazy evaluation, no jobs, stages,\n", "# or tasks will be launched when we run this code.\n", "subRDD = xrangeRDD.map(sub)\n", "\n", "# Let's see the RDD transformation hierarchy\n", "print subRDD.toDebugString()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (3d) Perform action `collect` to view results **\n", "#### To see a list of elements decremented by one, we need to create a new list on the driver from the the data distributed in the executor nodes. To do this we call the `collect()` method on our RDD. `collect()` is often used after a filter or other operation to ensure that we are only returning a *small* amount of data to the driver. This is done because the data returned to the driver must fit into the driver's available memory. If not, the driver will crash.\n", "#### The `collect()` method is the first action operation that we have encountered. Action operations cause Spark to perform the (lazy) transformation operations that are required to compute the RDD returned by the action. In our example, this means that tasks will now be launched to perform the `parallelize`, `map`, and `collect` operations.\n", "#### In this example, the dataset is broken into four partitions, so four `collect()` tasks are launched. Each task collects the entries in its partition and sends the result to the SparkContext, which creates a list of the values, as shown in the figure below.\n", "![collect](http://spark-mooc.github.io/web-assets/images/collect.png)\n", "#### The above figures showed what would happen if we ran `collect()` on a small example dataset with just four partitions.\n", "#### Now let's run `collect()` on `subRDD`." ] }, { "cell_type": "code", "execution_count": 23, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 247, 248, 249, 250, 251, 252, 253, 254, 255, 256, 257, 258, 259, 260, 261, 262, 263, 264, 265, 266, 267, 268, 269, 270, 271, 272, 273, 274, 275, 276, 277, 278, 279, 280, 281, 282, 283, 284, 285, 286, 287, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 298, 299, 300, 301, 302, 303, 304, 305, 306, 307, 308, 309, 310, 311, 312, 313, 314, 315, 316, 317, 318, 319, 320, 321, 322, 323, 324, 325, 326, 327, 328, 329, 330, 331, 332, 333, 334, 335, 336, 337, 338, 339, 340, 341, 342, 343, 344, 345, 346, 347, 348, 349, 350, 351, 352, 353, 354, 355, 356, 357, 358, 359, 360, 361, 362, 363, 364, 365, 366, 367, 368, 369, 370, 371, 372, 373, 374, 375, 376, 377, 378, 379, 380, 381, 382, 383, 384, 385, 386, 387, 388, 389, 390, 391, 392, 393, 394, 395, 396, 397, 398, 399, 400, 401, 402, 403, 404, 405, 406, 407, 408, 409, 410, 411, 412, 413, 414, 415, 416, 417, 418, 419, 420, 421, 422, 423, 424, 425, 426, 427, 428, 429, 430, 431, 432, 433, 434, 435, 436, 437, 438, 439, 440, 441, 442, 443, 444, 445, 446, 447, 448, 449, 450, 451, 452, 453, 454, 455, 456, 457, 458, 459, 460, 461, 462, 463, 464, 465, 466, 467, 468, 469, 470, 471, 472, 473, 474, 475, 476, 477, 478, 479, 480, 481, 482, 483, 484, 485, 486, 487, 488, 489, 490, 491, 492, 493, 494, 495, 496, 497, 498, 499, 500, 501, 502, 503, 504, 505, 506, 507, 508, 509, 510, 511, 512, 513, 514, 515, 516, 517, 518, 519, 520, 521, 522, 523, 524, 525, 526, 527, 528, 529, 530, 531, 532, 533, 534, 535, 536, 537, 538, 539, 540, 541, 542, 543, 544, 545, 546, 547, 548, 549, 550, 551, 552, 553, 554, 555, 556, 557, 558, 559, 560, 561, 562, 563, 564, 565, 566, 567, 568, 569, 570, 571, 572, 573, 574, 575, 576, 577, 578, 579, 580, 581, 582, 583, 584, 585, 586, 587, 588, 589, 590, 591, 592, 593, 594, 595, 596, 597, 598, 599, 600, 601, 602, 603, 604, 605, 606, 607, 608, 609, 610, 611, 612, 613, 614, 615, 616, 617, 618, 619, 620, 621, 622, 623, 624, 625, 626, 627, 628, 629, 630, 631, 632, 633, 634, 635, 636, 637, 638, 639, 640, 641, 642, 643, 644, 645, 646, 647, 648, 649, 650, 651, 652, 653, 654, 655, 656, 657, 658, 659, 660, 661, 662, 663, 664, 665, 666, 667, 668, 669, 670, 671, 672, 673, 674, 675, 676, 677, 678, 679, 680, 681, 682, 683, 684, 685, 686, 687, 688, 689, 690, 691, 692, 693, 694, 695, 696, 697, 698, 699, 700, 701, 702, 703, 704, 705, 706, 707, 708, 709, 710, 711, 712, 713, 714, 715, 716, 717, 718, 719, 720, 721, 722, 723, 724, 725, 726, 727, 728, 729, 730, 731, 732, 733, 734, 735, 736, 737, 738, 739, 740, 741, 742, 743, 744, 745, 746, 747, 748, 749, 750, 751, 752, 753, 754, 755, 756, 757, 758, 759, 760, 761, 762, 763, 764, 765, 766, 767, 768, 769, 770, 771, 772, 773, 774, 775, 776, 777, 778, 779, 780, 781, 782, 783, 784, 785, 786, 787, 788, 789, 790, 791, 792, 793, 794, 795, 796, 797, 798, 799, 800, 801, 802, 803, 804, 805, 806, 807, 808, 809, 810, 811, 812, 813, 814, 815, 816, 817, 818, 819, 820, 821, 822, 823, 824, 825, 826, 827, 828, 829, 830, 831, 832, 833, 834, 835, 836, 837, 838, 839, 840, 841, 842, 843, 844, 845, 846, 847, 848, 849, 850, 851, 852, 853, 854, 855, 856, 857, 858, 859, 860, 861, 862, 863, 864, 865, 866, 867, 868, 869, 870, 871, 872, 873, 874, 875, 876, 877, 878, 879, 880, 881, 882, 883, 884, 885, 886, 887, 888, 889, 890, 891, 892, 893, 894, 895, 896, 897, 898, 899, 900, 901, 902, 903, 904, 905, 906, 907, 908, 909, 910, 911, 912, 913, 914, 915, 916, 917, 918, 919, 920, 921, 922, 923, 924, 925, 926, 927, 928, 929, 930, 931, 932, 933, 934, 935, 936, 937, 938, 939, 940, 941, 942, 943, 944, 945, 946, 947, 948, 949, 950, 951, 952, 953, 954, 955, 956, 957, 958, 959, 960, 961, 962, 963, 964, 965, 966, 967, 968, 969, 970, 971, 972, 973, 974, 975, 976, 977, 978, 979, 980, 981, 982, 983, 984, 985, 986, 987, 988, 989, 990, 991, 992, 993, 994, 995, 996, 997, 998, 999, 1000, 1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010, 1011, 1012, 1013, 1014, 1015, 1016, 1017, 1018, 1019, 1020, 1021, 1022, 1023, 1024, 1025, 1026, 1027, 1028, 1029, 1030, 1031, 1032, 1033, 1034, 1035, 1036, 1037, 1038, 1039, 1040, 1041, 1042, 1043, 1044, 1045, 1046, 1047, 1048, 1049, 1050, 1051, 1052, 1053, 1054, 1055, 1056, 1057, 1058, 1059, 1060, 1061, 1062, 1063, 1064, 1065, 1066, 1067, 1068, 1069, 1070, 1071, 1072, 1073, 1074, 1075, 1076, 1077, 1078, 1079, 1080, 1081, 1082, 1083, 1084, 1085, 1086, 1087, 1088, 1089, 1090, 1091, 1092, 1093, 1094, 1095, 1096, 1097, 1098, 1099, 1100, 1101, 1102, 1103, 1104, 1105, 1106, 1107, 1108, 1109, 1110, 1111, 1112, 1113, 1114, 1115, 1116, 1117, 1118, 1119, 1120, 1121, 1122, 1123, 1124, 1125, 1126, 1127, 1128, 1129, 1130, 1131, 1132, 1133, 1134, 1135, 1136, 1137, 1138, 1139, 1140, 1141, 1142, 1143, 1144, 1145, 1146, 1147, 1148, 1149, 1150, 1151, 1152, 1153, 1154, 1155, 1156, 1157, 1158, 1159, 1160, 1161, 1162, 1163, 1164, 1165, 1166, 1167, 1168, 1169, 1170, 1171, 1172, 1173, 1174, 1175, 1176, 1177, 1178, 1179, 1180, 1181, 1182, 1183, 1184, 1185, 1186, 1187, 1188, 1189, 1190, 1191, 1192, 1193, 1194, 1195, 1196, 1197, 1198, 1199, 1200, 1201, 1202, 1203, 1204, 1205, 1206, 1207, 1208, 1209, 1210, 1211, 1212, 1213, 1214, 1215, 1216, 1217, 1218, 1219, 1220, 1221, 1222, 1223, 1224, 1225, 1226, 1227, 1228, 1229, 1230, 1231, 1232, 1233, 1234, 1235, 1236, 1237, 1238, 1239, 1240, 1241, 1242, 1243, 1244, 1245, 1246, 1247, 1248, 1249, 1250, 1251, 1252, 1253, 1254, 1255, 1256, 1257, 1258, 1259, 1260, 1261, 1262, 1263, 1264, 1265, 1266, 1267, 1268, 1269, 1270, 1271, 1272, 1273, 1274, 1275, 1276, 1277, 1278, 1279, 1280, 1281, 1282, 1283, 1284, 1285, 1286, 1287, 1288, 1289, 1290, 1291, 1292, 1293, 1294, 1295, 1296, 1297, 1298, 1299, 1300, 1301, 1302, 1303, 1304, 1305, 1306, 1307, 1308, 1309, 1310, 1311, 1312, 1313, 1314, 1315, 1316, 1317, 1318, 1319, 1320, 1321, 1322, 1323, 1324, 1325, 1326, 1327, 1328, 1329, 1330, 1331, 1332, 1333, 1334, 1335, 1336, 1337, 1338, 1339, 1340, 1341, 1342, 1343, 1344, 1345, 1346, 1347, 1348, 1349, 1350, 1351, 1352, 1353, 1354, 1355, 1356, 1357, 1358, 1359, 1360, 1361, 1362, 1363, 1364, 1365, 1366, 1367, 1368, 1369, 1370, 1371, 1372, 1373, 1374, 1375, 1376, 1377, 1378, 1379, 1380, 1381, 1382, 1383, 1384, 1385, 1386, 1387, 1388, 1389, 1390, 1391, 1392, 1393, 1394, 1395, 1396, 1397, 1398, 1399, 1400, 1401, 1402, 1403, 1404, 1405, 1406, 1407, 1408, 1409, 1410, 1411, 1412, 1413, 1414, 1415, 1416, 1417, 1418, 1419, 1420, 1421, 1422, 1423, 1424, 1425, 1426, 1427, 1428, 1429, 1430, 1431, 1432, 1433, 1434, 1435, 1436, 1437, 1438, 1439, 1440, 1441, 1442, 1443, 1444, 1445, 1446, 1447, 1448, 1449, 1450, 1451, 1452, 1453, 1454, 1455, 1456, 1457, 1458, 1459, 1460, 1461, 1462, 1463, 1464, 1465, 1466, 1467, 1468, 1469, 1470, 1471, 1472, 1473, 1474, 1475, 1476, 1477, 1478, 1479, 1480, 1481, 1482, 1483, 1484, 1485, 1486, 1487, 1488, 1489, 1490, 1491, 1492, 1493, 1494, 1495, 1496, 1497, 1498, 1499, 1500, 1501, 1502, 1503, 1504, 1505, 1506, 1507, 1508, 1509, 1510, 1511, 1512, 1513, 1514, 1515, 1516, 1517, 1518, 1519, 1520, 1521, 1522, 1523, 1524, 1525, 1526, 1527, 1528, 1529, 1530, 1531, 1532, 1533, 1534, 1535, 1536, 1537, 1538, 1539, 1540, 1541, 1542, 1543, 1544, 1545, 1546, 1547, 1548, 1549, 1550, 1551, 1552, 1553, 1554, 1555, 1556, 1557, 1558, 1559, 1560, 1561, 1562, 1563, 1564, 1565, 1566, 1567, 1568, 1569, 1570, 1571, 1572, 1573, 1574, 1575, 1576, 1577, 1578, 1579, 1580, 1581, 1582, 1583, 1584, 1585, 1586, 1587, 1588, 1589, 1590, 1591, 1592, 1593, 1594, 1595, 1596, 1597, 1598, 1599, 1600, 1601, 1602, 1603, 1604, 1605, 1606, 1607, 1608, 1609, 1610, 1611, 1612, 1613, 1614, 1615, 1616, 1617, 1618, 1619, 1620, 1621, 1622, 1623, 1624, 1625, 1626, 1627, 1628, 1629, 1630, 1631, 1632, 1633, 1634, 1635, 1636, 1637, 1638, 1639, 1640, 1641, 1642, 1643, 1644, 1645, 1646, 1647, 1648, 1649, 1650, 1651, 1652, 1653, 1654, 1655, 1656, 1657, 1658, 1659, 1660, 1661, 1662, 1663, 1664, 1665, 1666, 1667, 1668, 1669, 1670, 1671, 1672, 1673, 1674, 1675, 1676, 1677, 1678, 1679, 1680, 1681, 1682, 1683, 1684, 1685, 1686, 1687, 1688, 1689, 1690, 1691, 1692, 1693, 1694, 1695, 1696, 1697, 1698, 1699, 1700, 1701, 1702, 1703, 1704, 1705, 1706, 1707, 1708, 1709, 1710, 1711, 1712, 1713, 1714, 1715, 1716, 1717, 1718, 1719, 1720, 1721, 1722, 1723, 1724, 1725, 1726, 1727, 1728, 1729, 1730, 1731, 1732, 1733, 1734, 1735, 1736, 1737, 1738, 1739, 1740, 1741, 1742, 1743, 1744, 1745, 1746, 1747, 1748, 1749, 1750, 1751, 1752, 1753, 1754, 1755, 1756, 1757, 1758, 1759, 1760, 1761, 1762, 1763, 1764, 1765, 1766, 1767, 1768, 1769, 1770, 1771, 1772, 1773, 1774, 1775, 1776, 1777, 1778, 1779, 1780, 1781, 1782, 1783, 1784, 1785, 1786, 1787, 1788, 1789, 1790, 1791, 1792, 1793, 1794, 1795, 1796, 1797, 1798, 1799, 1800, 1801, 1802, 1803, 1804, 1805, 1806, 1807, 1808, 1809, 1810, 1811, 1812, 1813, 1814, 1815, 1816, 1817, 1818, 1819, 1820, 1821, 1822, 1823, 1824, 1825, 1826, 1827, 1828, 1829, 1830, 1831, 1832, 1833, 1834, 1835, 1836, 1837, 1838, 1839, 1840, 1841, 1842, 1843, 1844, 1845, 1846, 1847, 1848, 1849, 1850, 1851, 1852, 1853, 1854, 1855, 1856, 1857, 1858, 1859, 1860, 1861, 1862, 1863, 1864, 1865, 1866, 1867, 1868, 1869, 1870, 1871, 1872, 1873, 1874, 1875, 1876, 1877, 1878, 1879, 1880, 1881, 1882, 1883, 1884, 1885, 1886, 1887, 1888, 1889, 1890, 1891, 1892, 1893, 1894, 1895, 1896, 1897, 1898, 1899, 1900, 1901, 1902, 1903, 1904, 1905, 1906, 1907, 1908, 1909, 1910, 1911, 1912, 1913, 1914, 1915, 1916, 1917, 1918, 1919, 1920, 1921, 1922, 1923, 1924, 1925, 1926, 1927, 1928, 1929, 1930, 1931, 1932, 1933, 1934, 1935, 1936, 1937, 1938, 1939, 1940, 1941, 1942, 1943, 1944, 1945, 1946, 1947, 1948, 1949, 1950, 1951, 1952, 1953, 1954, 1955, 1956, 1957, 1958, 1959, 1960, 1961, 1962, 1963, 1964, 1965, 1966, 1967, 1968, 1969, 1970, 1971, 1972, 1973, 1974, 1975, 1976, 1977, 1978, 1979, 1980, 1981, 1982, 1983, 1984, 1985, 1986, 1987, 1988, 1989, 1990, 1991, 1992, 1993, 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024, 2025, 2026, 2027, 2028, 2029, 2030, 2031, 2032, 2033, 2034, 2035, 2036, 2037, 2038, 2039, 2040, 2041, 2042, 2043, 2044, 2045, 2046, 2047, 2048, 2049, 2050, 2051, 2052, 2053, 2054, 2055, 2056, 2057, 2058, 2059, 2060, 2061, 2062, 2063, 2064, 2065, 2066, 2067, 2068, 2069, 2070, 2071, 2072, 2073, 2074, 2075, 2076, 2077, 2078, 2079, 2080, 2081, 2082, 2083, 2084, 2085, 2086, 2087, 2088, 2089, 2090, 2091, 2092, 2093, 2094, 2095, 2096, 2097, 2098, 2099, 2100, 2101, 2102, 2103, 2104, 2105, 2106, 2107, 2108, 2109, 2110, 2111, 2112, 2113, 2114, 2115, 2116, 2117, 2118, 2119, 2120, 2121, 2122, 2123, 2124, 2125, 2126, 2127, 2128, 2129, 2130, 2131, 2132, 2133, 2134, 2135, 2136, 2137, 2138, 2139, 2140, 2141, 2142, 2143, 2144, 2145, 2146, 2147, 2148, 2149, 2150, 2151, 2152, 2153, 2154, 2155, 2156, 2157, 2158, 2159, 2160, 2161, 2162, 2163, 2164, 2165, 2166, 2167, 2168, 2169, 2170, 2171, 2172, 2173, 2174, 2175, 2176, 2177, 2178, 2179, 2180, 2181, 2182, 2183, 2184, 2185, 2186, 2187, 2188, 2189, 2190, 2191, 2192, 2193, 2194, 2195, 2196, 2197, 2198, 2199, 2200, 2201, 2202, 2203, 2204, 2205, 2206, 2207, 2208, 2209, 2210, 2211, 2212, 2213, 2214, 2215, 2216, 2217, 2218, 2219, 2220, 2221, 2222, 2223, 2224, 2225, 2226, 2227, 2228, 2229, 2230, 2231, 2232, 2233, 2234, 2235, 2236, 2237, 2238, 2239, 2240, 2241, 2242, 2243, 2244, 2245, 2246, 2247, 2248, 2249, 2250, 2251, 2252, 2253, 2254, 2255, 2256, 2257, 2258, 2259, 2260, 2261, 2262, 2263, 2264, 2265, 2266, 2267, 2268, 2269, 2270, 2271, 2272, 2273, 2274, 2275, 2276, 2277, 2278, 2279, 2280, 2281, 2282, 2283, 2284, 2285, 2286, 2287, 2288, 2289, 2290, 2291, 2292, 2293, 2294, 2295, 2296, 2297, 2298, 2299, 2300, 2301, 2302, 2303, 2304, 2305, 2306, 2307, 2308, 2309, 2310, 2311, 2312, 2313, 2314, 2315, 2316, 2317, 2318, 2319, 2320, 2321, 2322, 2323, 2324, 2325, 2326, 2327, 2328, 2329, 2330, 2331, 2332, 2333, 2334, 2335, 2336, 2337, 2338, 2339, 2340, 2341, 2342, 2343, 2344, 2345, 2346, 2347, 2348, 2349, 2350, 2351, 2352, 2353, 2354, 2355, 2356, 2357, 2358, 2359, 2360, 2361, 2362, 2363, 2364, 2365, 2366, 2367, 2368, 2369, 2370, 2371, 2372, 2373, 2374, 2375, 2376, 2377, 2378, 2379, 2380, 2381, 2382, 2383, 2384, 2385, 2386, 2387, 2388, 2389, 2390, 2391, 2392, 2393, 2394, 2395, 2396, 2397, 2398, 2399, 2400, 2401, 2402, 2403, 2404, 2405, 2406, 2407, 2408, 2409, 2410, 2411, 2412, 2413, 2414, 2415, 2416, 2417, 2418, 2419, 2420, 2421, 2422, 2423, 2424, 2425, 2426, 2427, 2428, 2429, 2430, 2431, 2432, 2433, 2434, 2435, 2436, 2437, 2438, 2439, 2440, 2441, 2442, 2443, 2444, 2445, 2446, 2447, 2448, 2449, 2450, 2451, 2452, 2453, 2454, 2455, 2456, 2457, 2458, 2459, 2460, 2461, 2462, 2463, 2464, 2465, 2466, 2467, 2468, 2469, 2470, 2471, 2472, 2473, 2474, 2475, 2476, 2477, 2478, 2479, 2480, 2481, 2482, 2483, 2484, 2485, 2486, 2487, 2488, 2489, 2490, 2491, 2492, 2493, 2494, 2495, 2496, 2497, 2498, 2499, 2500, 2501, 2502, 2503, 2504, 2505, 2506, 2507, 2508, 2509, 2510, 2511, 2512, 2513, 2514, 2515, 2516, 2517, 2518, 2519, 2520, 2521, 2522, 2523, 2524, 2525, 2526, 2527, 2528, 2529, 2530, 2531, 2532, 2533, 2534, 2535, 2536, 2537, 2538, 2539, 2540, 2541, 2542, 2543, 2544, 2545, 2546, 2547, 2548, 2549, 2550, 2551, 2552, 2553, 2554, 2555, 2556, 2557, 2558, 2559, 2560, 2561, 2562, 2563, 2564, 2565, 2566, 2567, 2568, 2569, 2570, 2571, 2572, 2573, 2574, 2575, 2576, 2577, 2578, 2579, 2580, 2581, 2582, 2583, 2584, 2585, 2586, 2587, 2588, 2589, 2590, 2591, 2592, 2593, 2594, 2595, 2596, 2597, 2598, 2599, 2600, 2601, 2602, 2603, 2604, 2605, 2606, 2607, 2608, 2609, 2610, 2611, 2612, 2613, 2614, 2615, 2616, 2617, 2618, 2619, 2620, 2621, 2622, 2623, 2624, 2625, 2626, 2627, 2628, 2629, 2630, 2631, 2632, 2633, 2634, 2635, 2636, 2637, 2638, 2639, 2640, 2641, 2642, 2643, 2644, 2645, 2646, 2647, 2648, 2649, 2650, 2651, 2652, 2653, 2654, 2655, 2656, 2657, 2658, 2659, 2660, 2661, 2662, 2663, 2664, 2665, 2666, 2667, 2668, 2669, 2670, 2671, 2672, 2673, 2674, 2675, 2676, 2677, 2678, 2679, 2680, 2681, 2682, 2683, 2684, 2685, 2686, 2687, 2688, 2689, 2690, 2691, 2692, 2693, 2694, 2695, 2696, 2697, 2698, 2699, 2700, 2701, 2702, 2703, 2704, 2705, 2706, 2707, 2708, 2709, 2710, 2711, 2712, 2713, 2714, 2715, 2716, 2717, 2718, 2719, 2720, 2721, 2722, 2723, 2724, 2725, 2726, 2727, 2728, 2729, 2730, 2731, 2732, 2733, 2734, 2735, 2736, 2737, 2738, 2739, 2740, 2741, 2742, 2743, 2744, 2745, 2746, 2747, 2748, 2749, 2750, 2751, 2752, 2753, 2754, 2755, 2756, 2757, 2758, 2759, 2760, 2761, 2762, 2763, 2764, 2765, 2766, 2767, 2768, 2769, 2770, 2771, 2772, 2773, 2774, 2775, 2776, 2777, 2778, 2779, 2780, 2781, 2782, 2783, 2784, 2785, 2786, 2787, 2788, 2789, 2790, 2791, 2792, 2793, 2794, 2795, 2796, 2797, 2798, 2799, 2800, 2801, 2802, 2803, 2804, 2805, 2806, 2807, 2808, 2809, 2810, 2811, 2812, 2813, 2814, 2815, 2816, 2817, 2818, 2819, 2820, 2821, 2822, 2823, 2824, 2825, 2826, 2827, 2828, 2829, 2830, 2831, 2832, 2833, 2834, 2835, 2836, 2837, 2838, 2839, 2840, 2841, 2842, 2843, 2844, 2845, 2846, 2847, 2848, 2849, 2850, 2851, 2852, 2853, 2854, 2855, 2856, 2857, 2858, 2859, 2860, 2861, 2862, 2863, 2864, 2865, 2866, 2867, 2868, 2869, 2870, 2871, 2872, 2873, 2874, 2875, 2876, 2877, 2878, 2879, 2880, 2881, 2882, 2883, 2884, 2885, 2886, 2887, 2888, 2889, 2890, 2891, 2892, 2893, 2894, 2895, 2896, 2897, 2898, 2899, 2900, 2901, 2902, 2903, 2904, 2905, 2906, 2907, 2908, 2909, 2910, 2911, 2912, 2913, 2914, 2915, 2916, 2917, 2918, 2919, 2920, 2921, 2922, 2923, 2924, 2925, 2926, 2927, 2928, 2929, 2930, 2931, 2932, 2933, 2934, 2935, 2936, 2937, 2938, 2939, 2940, 2941, 2942, 2943, 2944, 2945, 2946, 2947, 2948, 2949, 2950, 2951, 2952, 2953, 2954, 2955, 2956, 2957, 2958, 2959, 2960, 2961, 2962, 2963, 2964, 2965, 2966, 2967, 2968, 2969, 2970, 2971, 2972, 2973, 2974, 2975, 2976, 2977, 2978, 2979, 2980, 2981, 2982, 2983, 2984, 2985, 2986, 2987, 2988, 2989, 2990, 2991, 2992, 2993, 2994, 2995, 2996, 2997, 2998, 2999, 3000, 3001, 3002, 3003, 3004, 3005, 3006, 3007, 3008, 3009, 3010, 3011, 3012, 3013, 3014, 3015, 3016, 3017, 3018, 3019, 3020, 3021, 3022, 3023, 3024, 3025, 3026, 3027, 3028, 3029, 3030, 3031, 3032, 3033, 3034, 3035, 3036, 3037, 3038, 3039, 3040, 3041, 3042, 3043, 3044, 3045, 3046, 3047, 3048, 3049, 3050, 3051, 3052, 3053, 3054, 3055, 3056, 3057, 3058, 3059, 3060, 3061, 3062, 3063, 3064, 3065, 3066, 3067, 3068, 3069, 3070, 3071, 3072, 3073, 3074, 3075, 3076, 3077, 3078, 3079, 3080, 3081, 3082, 3083, 3084, 3085, 3086, 3087, 3088, 3089, 3090, 3091, 3092, 3093, 3094, 3095, 3096, 3097, 3098, 3099, 3100, 3101, 3102, 3103, 3104, 3105, 3106, 3107, 3108, 3109, 3110, 3111, 3112, 3113, 3114, 3115, 3116, 3117, 3118, 3119, 3120, 3121, 3122, 3123, 3124, 3125, 3126, 3127, 3128, 3129, 3130, 3131, 3132, 3133, 3134, 3135, 3136, 3137, 3138, 3139, 3140, 3141, 3142, 3143, 3144, 3145, 3146, 3147, 3148, 3149, 3150, 3151, 3152, 3153, 3154, 3155, 3156, 3157, 3158, 3159, 3160, 3161, 3162, 3163, 3164, 3165, 3166, 3167, 3168, 3169, 3170, 3171, 3172, 3173, 3174, 3175, 3176, 3177, 3178, 3179, 3180, 3181, 3182, 3183, 3184, 3185, 3186, 3187, 3188, 3189, 3190, 3191, 3192, 3193, 3194, 3195, 3196, 3197, 3198, 3199, 3200, 3201, 3202, 3203, 3204, 3205, 3206, 3207, 3208, 3209, 3210, 3211, 3212, 3213, 3214, 3215, 3216, 3217, 3218, 3219, 3220, 3221, 3222, 3223, 3224, 3225, 3226, 3227, 3228, 3229, 3230, 3231, 3232, 3233, 3234, 3235, 3236, 3237, 3238, 3239, 3240, 3241, 3242, 3243, 3244, 3245, 3246, 3247, 3248, 3249, 3250, 3251, 3252, 3253, 3254, 3255, 3256, 3257, 3258, 3259, 3260, 3261, 3262, 3263, 3264, 3265, 3266, 3267, 3268, 3269, 3270, 3271, 3272, 3273, 3274, 3275, 3276, 3277, 3278, 3279, 3280, 3281, 3282, 3283, 3284, 3285, 3286, 3287, 3288, 3289, 3290, 3291, 3292, 3293, 3294, 3295, 3296, 3297, 3298, 3299, 3300, 3301, 3302, 3303, 3304, 3305, 3306, 3307, 3308, 3309, 3310, 3311, 3312, 3313, 3314, 3315, 3316, 3317, 3318, 3319, 3320, 3321, 3322, 3323, 3324, 3325, 3326, 3327, 3328, 3329, 3330, 3331, 3332, 3333, 3334, 3335, 3336, 3337, 3338, 3339, 3340, 3341, 3342, 3343, 3344, 3345, 3346, 3347, 3348, 3349, 3350, 3351, 3352, 3353, 3354, 3355, 3356, 3357, 3358, 3359, 3360, 3361, 3362, 3363, 3364, 3365, 3366, 3367, 3368, 3369, 3370, 3371, 3372, 3373, 3374, 3375, 3376, 3377, 3378, 3379, 3380, 3381, 3382, 3383, 3384, 3385, 3386, 3387, 3388, 3389, 3390, 3391, 3392, 3393, 3394, 3395, 3396, 3397, 3398, 3399, 3400, 3401, 3402, 3403, 3404, 3405, 3406, 3407, 3408, 3409, 3410, 3411, 3412, 3413, 3414, 3415, 3416, 3417, 3418, 3419, 3420, 3421, 3422, 3423, 3424, 3425, 3426, 3427, 3428, 3429, 3430, 3431, 3432, 3433, 3434, 3435, 3436, 3437, 3438, 3439, 3440, 3441, 3442, 3443, 3444, 3445, 3446, 3447, 3448, 3449, 3450, 3451, 3452, 3453, 3454, 3455, 3456, 3457, 3458, 3459, 3460, 3461, 3462, 3463, 3464, 3465, 3466, 3467, 3468, 3469, 3470, 3471, 3472, 3473, 3474, 3475, 3476, 3477, 3478, 3479, 3480, 3481, 3482, 3483, 3484, 3485, 3486, 3487, 3488, 3489, 3490, 3491, 3492, 3493, 3494, 3495, 3496, 3497, 3498, 3499, 3500, 3501, 3502, 3503, 3504, 3505, 3506, 3507, 3508, 3509, 3510, 3511, 3512, 3513, 3514, 3515, 3516, 3517, 3518, 3519, 3520, 3521, 3522, 3523, 3524, 3525, 3526, 3527, 3528, 3529, 3530, 3531, 3532, 3533, 3534, 3535, 3536, 3537, 3538, 3539, 3540, 3541, 3542, 3543, 3544, 3545, 3546, 3547, 3548, 3549, 3550, 3551, 3552, 3553, 3554, 3555, 3556, 3557, 3558, 3559, 3560, 3561, 3562, 3563, 3564, 3565, 3566, 3567, 3568, 3569, 3570, 3571, 3572, 3573, 3574, 3575, 3576, 3577, 3578, 3579, 3580, 3581, 3582, 3583, 3584, 3585, 3586, 3587, 3588, 3589, 3590, 3591, 3592, 3593, 3594, 3595, 3596, 3597, 3598, 3599, 3600, 3601, 3602, 3603, 3604, 3605, 3606, 3607, 3608, 3609, 3610, 3611, 3612, 3613, 3614, 3615, 3616, 3617, 3618, 3619, 3620, 3621, 3622, 3623, 3624, 3625, 3626, 3627, 3628, 3629, 3630, 3631, 3632, 3633, 3634, 3635, 3636, 3637, 3638, 3639, 3640, 3641, 3642, 3643, 3644, 3645, 3646, 3647, 3648, 3649, 3650, 3651, 3652, 3653, 3654, 3655, 3656, 3657, 3658, 3659, 3660, 3661, 3662, 3663, 3664, 3665, 3666, 3667, 3668, 3669, 3670, 3671, 3672, 3673, 3674, 3675, 3676, 3677, 3678, 3679, 3680, 3681, 3682, 3683, 3684, 3685, 3686, 3687, 3688, 3689, 3690, 3691, 3692, 3693, 3694, 3695, 3696, 3697, 3698, 3699, 3700, 3701, 3702, 3703, 3704, 3705, 3706, 3707, 3708, 3709, 3710, 3711, 3712, 3713, 3714, 3715, 3716, 3717, 3718, 3719, 3720, 3721, 3722, 3723, 3724, 3725, 3726, 3727, 3728, 3729, 3730, 3731, 3732, 3733, 3734, 3735, 3736, 3737, 3738, 3739, 3740, 3741, 3742, 3743, 3744, 3745, 3746, 3747, 3748, 3749, 3750, 3751, 3752, 3753, 3754, 3755, 3756, 3757, 3758, 3759, 3760, 3761, 3762, 3763, 3764, 3765, 3766, 3767, 3768, 3769, 3770, 3771, 3772, 3773, 3774, 3775, 3776, 3777, 3778, 3779, 3780, 3781, 3782, 3783, 3784, 3785, 3786, 3787, 3788, 3789, 3790, 3791, 3792, 3793, 3794, 3795, 3796, 3797, 3798, 3799, 3800, 3801, 3802, 3803, 3804, 3805, 3806, 3807, 3808, 3809, 3810, 3811, 3812, 3813, 3814, 3815, 3816, 3817, 3818, 3819, 3820, 3821, 3822, 3823, 3824, 3825, 3826, 3827, 3828, 3829, 3830, 3831, 3832, 3833, 3834, 3835, 3836, 3837, 3838, 3839, 3840, 3841, 3842, 3843, 3844, 3845, 3846, 3847, 3848, 3849, 3850, 3851, 3852, 3853, 3854, 3855, 3856, 3857, 3858, 3859, 3860, 3861, 3862, 3863, 3864, 3865, 3866, 3867, 3868, 3869, 3870, 3871, 3872, 3873, 3874, 3875, 3876, 3877, 3878, 3879, 3880, 3881, 3882, 3883, 3884, 3885, 3886, 3887, 3888, 3889, 3890, 3891, 3892, 3893, 3894, 3895, 3896, 3897, 3898, 3899, 3900, 3901, 3902, 3903, 3904, 3905, 3906, 3907, 3908, 3909, 3910, 3911, 3912, 3913, 3914, 3915, 3916, 3917, 3918, 3919, 3920, 3921, 3922, 3923, 3924, 3925, 3926, 3927, 3928, 3929, 3930, 3931, 3932, 3933, 3934, 3935, 3936, 3937, 3938, 3939, 3940, 3941, 3942, 3943, 3944, 3945, 3946, 3947, 3948, 3949, 3950, 3951, 3952, 3953, 3954, 3955, 3956, 3957, 3958, 3959, 3960, 3961, 3962, 3963, 3964, 3965, 3966, 3967, 3968, 3969, 3970, 3971, 3972, 3973, 3974, 3975, 3976, 3977, 3978, 3979, 3980, 3981, 3982, 3983, 3984, 3985, 3986, 3987, 3988, 3989, 3990, 3991, 3992, 3993, 3994, 3995, 3996, 3997, 3998, 3999, 4000, 4001, 4002, 4003, 4004, 4005, 4006, 4007, 4008, 4009, 4010, 4011, 4012, 4013, 4014, 4015, 4016, 4017, 4018, 4019, 4020, 4021, 4022, 4023, 4024, 4025, 4026, 4027, 4028, 4029, 4030, 4031, 4032, 4033, 4034, 4035, 4036, 4037, 4038, 4039, 4040, 4041, 4042, 4043, 4044, 4045, 4046, 4047, 4048, 4049, 4050, 4051, 4052, 4053, 4054, 4055, 4056, 4057, 4058, 4059, 4060, 4061, 4062, 4063, 4064, 4065, 4066, 4067, 4068, 4069, 4070, 4071, 4072, 4073, 4074, 4075, 4076, 4077, 4078, 4079, 4080, 4081, 4082, 4083, 4084, 4085, 4086, 4087, 4088, 4089, 4090, 4091, 4092, 4093, 4094, 4095, 4096, 4097, 4098, 4099, 4100, 4101, 4102, 4103, 4104, 4105, 4106, 4107, 4108, 4109, 4110, 4111, 4112, 4113, 4114, 4115, 4116, 4117, 4118, 4119, 4120, 4121, 4122, 4123, 4124, 4125, 4126, 4127, 4128, 4129, 4130, 4131, 4132, 4133, 4134, 4135, 4136, 4137, 4138, 4139, 4140, 4141, 4142, 4143, 4144, 4145, 4146, 4147, 4148, 4149, 4150, 4151, 4152, 4153, 4154, 4155, 4156, 4157, 4158, 4159, 4160, 4161, 4162, 4163, 4164, 4165, 4166, 4167, 4168, 4169, 4170, 4171, 4172, 4173, 4174, 4175, 4176, 4177, 4178, 4179, 4180, 4181, 4182, 4183, 4184, 4185, 4186, 4187, 4188, 4189, 4190, 4191, 4192, 4193, 4194, 4195, 4196, 4197, 4198, 4199, 4200, 4201, 4202, 4203, 4204, 4205, 4206, 4207, 4208, 4209, 4210, 4211, 4212, 4213, 4214, 4215, 4216, 4217, 4218, 4219, 4220, 4221, 4222, 4223, 4224, 4225, 4226, 4227, 4228, 4229, 4230, 4231, 4232, 4233, 4234, 4235, 4236, 4237, 4238, 4239, 4240, 4241, 4242, 4243, 4244, 4245, 4246, 4247, 4248, 4249, 4250, 4251, 4252, 4253, 4254, 4255, 4256, 4257, 4258, 4259, 4260, 4261, 4262, 4263, 4264, 4265, 4266, 4267, 4268, 4269, 4270, 4271, 4272, 4273, 4274, 4275, 4276, 4277, 4278, 4279, 4280, 4281, 4282, 4283, 4284, 4285, 4286, 4287, 4288, 4289, 4290, 4291, 4292, 4293, 4294, 4295, 4296, 4297, 4298, 4299, 4300, 4301, 4302, 4303, 4304, 4305, 4306, 4307, 4308, 4309, 4310, 4311, 4312, 4313, 4314, 4315, 4316, 4317, 4318, 4319, 4320, 4321, 4322, 4323, 4324, 4325, 4326, 4327, 4328, 4329, 4330, 4331, 4332, 4333, 4334, 4335, 4336, 4337, 4338, 4339, 4340, 4341, 4342, 4343, 4344, 4345, 4346, 4347, 4348, 4349, 4350, 4351, 4352, 4353, 4354, 4355, 4356, 4357, 4358, 4359, 4360, 4361, 4362, 4363, 4364, 4365, 4366, 4367, 4368, 4369, 4370, 4371, 4372, 4373, 4374, 4375, 4376, 4377, 4378, 4379, 4380, 4381, 4382, 4383, 4384, 4385, 4386, 4387, 4388, 4389, 4390, 4391, 4392, 4393, 4394, 4395, 4396, 4397, 4398, 4399, 4400, 4401, 4402, 4403, 4404, 4405, 4406, 4407, 4408, 4409, 4410, 4411, 4412, 4413, 4414, 4415, 4416, 4417, 4418, 4419, 4420, 4421, 4422, 4423, 4424, 4425, 4426, 4427, 4428, 4429, 4430, 4431, 4432, 4433, 4434, 4435, 4436, 4437, 4438, 4439, 4440, 4441, 4442, 4443, 4444, 4445, 4446, 4447, 4448, 4449, 4450, 4451, 4452, 4453, 4454, 4455, 4456, 4457, 4458, 4459, 4460, 4461, 4462, 4463, 4464, 4465, 4466, 4467, 4468, 4469, 4470, 4471, 4472, 4473, 4474, 4475, 4476, 4477, 4478, 4479, 4480, 4481, 4482, 4483, 4484, 4485, 4486, 4487, 4488, 4489, 4490, 4491, 4492, 4493, 4494, 4495, 4496, 4497, 4498, 4499, 4500, 4501, 4502, 4503, 4504, 4505, 4506, 4507, 4508, 4509, 4510, 4511, 4512, 4513, 4514, 4515, 4516, 4517, 4518, 4519, 4520, 4521, 4522, 4523, 4524, 4525, 4526, 4527, 4528, 4529, 4530, 4531, 4532, 4533, 4534, 4535, 4536, 4537, 4538, 4539, 4540, 4541, 4542, 4543, 4544, 4545, 4546, 4547, 4548, 4549, 4550, 4551, 4552, 4553, 4554, 4555, 4556, 4557, 4558, 4559, 4560, 4561, 4562, 4563, 4564, 4565, 4566, 4567, 4568, 4569, 4570, 4571, 4572, 4573, 4574, 4575, 4576, 4577, 4578, 4579, 4580, 4581, 4582, 4583, 4584, 4585, 4586, 4587, 4588, 4589, 4590, 4591, 4592, 4593, 4594, 4595, 4596, 4597, 4598, 4599, 4600, 4601, 4602, 4603, 4604, 4605, 4606, 4607, 4608, 4609, 4610, 4611, 4612, 4613, 4614, 4615, 4616, 4617, 4618, 4619, 4620, 4621, 4622, 4623, 4624, 4625, 4626, 4627, 4628, 4629, 4630, 4631, 4632, 4633, 4634, 4635, 4636, 4637, 4638, 4639, 4640, 4641, 4642, 4643, 4644, 4645, 4646, 4647, 4648, 4649, 4650, 4651, 4652, 4653, 4654, 4655, 4656, 4657, 4658, 4659, 4660, 4661, 4662, 4663, 4664, 4665, 4666, 4667, 4668, 4669, 4670, 4671, 4672, 4673, 4674, 4675, 4676, 4677, 4678, 4679, 4680, 4681, 4682, 4683, 4684, 4685, 4686, 4687, 4688, 4689, 4690, 4691, 4692, 4693, 4694, 4695, 4696, 4697, 4698, 4699, 4700, 4701, 4702, 4703, 4704, 4705, 4706, 4707, 4708, 4709, 4710, 4711, 4712, 4713, 4714, 4715, 4716, 4717, 4718, 4719, 4720, 4721, 4722, 4723, 4724, 4725, 4726, 4727, 4728, 4729, 4730, 4731, 4732, 4733, 4734, 4735, 4736, 4737, 4738, 4739, 4740, 4741, 4742, 4743, 4744, 4745, 4746, 4747, 4748, 4749, 4750, 4751, 4752, 4753, 4754, 4755, 4756, 4757, 4758, 4759, 4760, 4761, 4762, 4763, 4764, 4765, 4766, 4767, 4768, 4769, 4770, 4771, 4772, 4773, 4774, 4775, 4776, 4777, 4778, 4779, 4780, 4781, 4782, 4783, 4784, 4785, 4786, 4787, 4788, 4789, 4790, 4791, 4792, 4793, 4794, 4795, 4796, 4797, 4798, 4799, 4800, 4801, 4802, 4803, 4804, 4805, 4806, 4807, 4808, 4809, 4810, 4811, 4812, 4813, 4814, 4815, 4816, 4817, 4818, 4819, 4820, 4821, 4822, 4823, 4824, 4825, 4826, 4827, 4828, 4829, 4830, 4831, 4832, 4833, 4834, 4835, 4836, 4837, 4838, 4839, 4840, 4841, 4842, 4843, 4844, 4845, 4846, 4847, 4848, 4849, 4850, 4851, 4852, 4853, 4854, 4855, 4856, 4857, 4858, 4859, 4860, 4861, 4862, 4863, 4864, 4865, 4866, 4867, 4868, 4869, 4870, 4871, 4872, 4873, 4874, 4875, 4876, 4877, 4878, 4879, 4880, 4881, 4882, 4883, 4884, 4885, 4886, 4887, 4888, 4889, 4890, 4891, 4892, 4893, 4894, 4895, 4896, 4897, 4898, 4899, 4900, 4901, 4902, 4903, 4904, 4905, 4906, 4907, 4908, 4909, 4910, 4911, 4912, 4913, 4914, 4915, 4916, 4917, 4918, 4919, 4920, 4921, 4922, 4923, 4924, 4925, 4926, 4927, 4928, 4929, 4930, 4931, 4932, 4933, 4934, 4935, 4936, 4937, 4938, 4939, 4940, 4941, 4942, 4943, 4944, 4945, 4946, 4947, 4948, 4949, 4950, 4951, 4952, 4953, 4954, 4955, 4956, 4957, 4958, 4959, 4960, 4961, 4962, 4963, 4964, 4965, 4966, 4967, 4968, 4969, 4970, 4971, 4972, 4973, 4974, 4975, 4976, 4977, 4978, 4979, 4980, 4981, 4982, 4983, 4984, 4985, 4986, 4987, 4988, 4989, 4990, 4991, 4992, 4993, 4994, 4995, 4996, 4997, 4998, 4999, 5000, 5001, 5002, 5003, 5004, 5005, 5006, 5007, 5008, 5009, 5010, 5011, 5012, 5013, 5014, 5015, 5016, 5017, 5018, 5019, 5020, 5021, 5022, 5023, 5024, 5025, 5026, 5027, 5028, 5029, 5030, 5031, 5032, 5033, 5034, 5035, 5036, 5037, 5038, 5039, 5040, 5041, 5042, 5043, 5044, 5045, 5046, 5047, 5048, 5049, 5050, 5051, 5052, 5053, 5054, 5055, 5056, 5057, 5058, 5059, 5060, 5061, 5062, 5063, 5064, 5065, 5066, 5067, 5068, 5069, 5070, 5071, 5072, 5073, 5074, 5075, 5076, 5077, 5078, 5079, 5080, 5081, 5082, 5083, 5084, 5085, 5086, 5087, 5088, 5089, 5090, 5091, 5092, 5093, 5094, 5095, 5096, 5097, 5098, 5099, 5100, 5101, 5102, 5103, 5104, 5105, 5106, 5107, 5108, 5109, 5110, 5111, 5112, 5113, 5114, 5115, 5116, 5117, 5118, 5119, 5120, 5121, 5122, 5123, 5124, 5125, 5126, 5127, 5128, 5129, 5130, 5131, 5132, 5133, 5134, 5135, 5136, 5137, 5138, 5139, 5140, 5141, 5142, 5143, 5144, 5145, 5146, 5147, 5148, 5149, 5150, 5151, 5152, 5153, 5154, 5155, 5156, 5157, 5158, 5159, 5160, 5161, 5162, 5163, 5164, 5165, 5166, 5167, 5168, 5169, 5170, 5171, 5172, 5173, 5174, 5175, 5176, 5177, 5178, 5179, 5180, 5181, 5182, 5183, 5184, 5185, 5186, 5187, 5188, 5189, 5190, 5191, 5192, 5193, 5194, 5195, 5196, 5197, 5198, 5199, 5200, 5201, 5202, 5203, 5204, 5205, 5206, 5207, 5208, 5209, 5210, 5211, 5212, 5213, 5214, 5215, 5216, 5217, 5218, 5219, 5220, 5221, 5222, 5223, 5224, 5225, 5226, 5227, 5228, 5229, 5230, 5231, 5232, 5233, 5234, 5235, 5236, 5237, 5238, 5239, 5240, 5241, 5242, 5243, 5244, 5245, 5246, 5247, 5248, 5249, 5250, 5251, 5252, 5253, 5254, 5255, 5256, 5257, 5258, 5259, 5260, 5261, 5262, 5263, 5264, 5265, 5266, 5267, 5268, 5269, 5270, 5271, 5272, 5273, 5274, 5275, 5276, 5277, 5278, 5279, 5280, 5281, 5282, 5283, 5284, 5285, 5286, 5287, 5288, 5289, 5290, 5291, 5292, 5293, 5294, 5295, 5296, 5297, 5298, 5299, 5300, 5301, 5302, 5303, 5304, 5305, 5306, 5307, 5308, 5309, 5310, 5311, 5312, 5313, 5314, 5315, 5316, 5317, 5318, 5319, 5320, 5321, 5322, 5323, 5324, 5325, 5326, 5327, 5328, 5329, 5330, 5331, 5332, 5333, 5334, 5335, 5336, 5337, 5338, 5339, 5340, 5341, 5342, 5343, 5344, 5345, 5346, 5347, 5348, 5349, 5350, 5351, 5352, 5353, 5354, 5355, 5356, 5357, 5358, 5359, 5360, 5361, 5362, 5363, 5364, 5365, 5366, 5367, 5368, 5369, 5370, 5371, 5372, 5373, 5374, 5375, 5376, 5377, 5378, 5379, 5380, 5381, 5382, 5383, 5384, 5385, 5386, 5387, 5388, 5389, 5390, 5391, 5392, 5393, 5394, 5395, 5396, 5397, 5398, 5399, 5400, 5401, 5402, 5403, 5404, 5405, 5406, 5407, 5408, 5409, 5410, 5411, 5412, 5413, 5414, 5415, 5416, 5417, 5418, 5419, 5420, 5421, 5422, 5423, 5424, 5425, 5426, 5427, 5428, 5429, 5430, 5431, 5432, 5433, 5434, 5435, 5436, 5437, 5438, 5439, 5440, 5441, 5442, 5443, 5444, 5445, 5446, 5447, 5448, 5449, 5450, 5451, 5452, 5453, 5454, 5455, 5456, 5457, 5458, 5459, 5460, 5461, 5462, 5463, 5464, 5465, 5466, 5467, 5468, 5469, 5470, 5471, 5472, 5473, 5474, 5475, 5476, 5477, 5478, 5479, 5480, 5481, 5482, 5483, 5484, 5485, 5486, 5487, 5488, 5489, 5490, 5491, 5492, 5493, 5494, 5495, 5496, 5497, 5498, 5499, 5500, 5501, 5502, 5503, 5504, 5505, 5506, 5507, 5508, 5509, 5510, 5511, 5512, 5513, 5514, 5515, 5516, 5517, 5518, 5519, 5520, 5521, 5522, 5523, 5524, 5525, 5526, 5527, 5528, 5529, 5530, 5531, 5532, 5533, 5534, 5535, 5536, 5537, 5538, 5539, 5540, 5541, 5542, 5543, 5544, 5545, 5546, 5547, 5548, 5549, 5550, 5551, 5552, 5553, 5554, 5555, 5556, 5557, 5558, 5559, 5560, 5561, 5562, 5563, 5564, 5565, 5566, 5567, 5568, 5569, 5570, 5571, 5572, 5573, 5574, 5575, 5576, 5577, 5578, 5579, 5580, 5581, 5582, 5583, 5584, 5585, 5586, 5587, 5588, 5589, 5590, 5591, 5592, 5593, 5594, 5595, 5596, 5597, 5598, 5599, 5600, 5601, 5602, 5603, 5604, 5605, 5606, 5607, 5608, 5609, 5610, 5611, 5612, 5613, 5614, 5615, 5616, 5617, 5618, 5619, 5620, 5621, 5622, 5623, 5624, 5625, 5626, 5627, 5628, 5629, 5630, 5631, 5632, 5633, 5634, 5635, 5636, 5637, 5638, 5639, 5640, 5641, 5642, 5643, 5644, 5645, 5646, 5647, 5648, 5649, 5650, 5651, 5652, 5653, 5654, 5655, 5656, 5657, 5658, 5659, 5660, 5661, 5662, 5663, 5664, 5665, 5666, 5667, 5668, 5669, 5670, 5671, 5672, 5673, 5674, 5675, 5676, 5677, 5678, 5679, 5680, 5681, 5682, 5683, 5684, 5685, 5686, 5687, 5688, 5689, 5690, 5691, 5692, 5693, 5694, 5695, 5696, 5697, 5698, 5699, 5700, 5701, 5702, 5703, 5704, 5705, 5706, 5707, 5708, 5709, 5710, 5711, 5712, 5713, 5714, 5715, 5716, 5717, 5718, 5719, 5720, 5721, 5722, 5723, 5724, 5725, 5726, 5727, 5728, 5729, 5730, 5731, 5732, 5733, 5734, 5735, 5736, 5737, 5738, 5739, 5740, 5741, 5742, 5743, 5744, 5745, 5746, 5747, 5748, 5749, 5750, 5751, 5752, 5753, 5754, 5755, 5756, 5757, 5758, 5759, 5760, 5761, 5762, 5763, 5764, 5765, 5766, 5767, 5768, 5769, 5770, 5771, 5772, 5773, 5774, 5775, 5776, 5777, 5778, 5779, 5780, 5781, 5782, 5783, 5784, 5785, 5786, 5787, 5788, 5789, 5790, 5791, 5792, 5793, 5794, 5795, 5796, 5797, 5798, 5799, 5800, 5801, 5802, 5803, 5804, 5805, 5806, 5807, 5808, 5809, 5810, 5811, 5812, 5813, 5814, 5815, 5816, 5817, 5818, 5819, 5820, 5821, 5822, 5823, 5824, 5825, 5826, 5827, 5828, 5829, 5830, 5831, 5832, 5833, 5834, 5835, 5836, 5837, 5838, 5839, 5840, 5841, 5842, 5843, 5844, 5845, 5846, 5847, 5848, 5849, 5850, 5851, 5852, 5853, 5854, 5855, 5856, 5857, 5858, 5859, 5860, 5861, 5862, 5863, 5864, 5865, 5866, 5867, 5868, 5869, 5870, 5871, 5872, 5873, 5874, 5875, 5876, 5877, 5878, 5879, 5880, 5881, 5882, 5883, 5884, 5885, 5886, 5887, 5888, 5889, 5890, 5891, 5892, 5893, 5894, 5895, 5896, 5897, 5898, 5899, 5900, 5901, 5902, 5903, 5904, 5905, 5906, 5907, 5908, 5909, 5910, 5911, 5912, 5913, 5914, 5915, 5916, 5917, 5918, 5919, 5920, 5921, 5922, 5923, 5924, 5925, 5926, 5927, 5928, 5929, 5930, 5931, 5932, 5933, 5934, 5935, 5936, 5937, 5938, 5939, 5940, 5941, 5942, 5943, 5944, 5945, 5946, 5947, 5948, 5949, 5950, 5951, 5952, 5953, 5954, 5955, 5956, 5957, 5958, 5959, 5960, 5961, 5962, 5963, 5964, 5965, 5966, 5967, 5968, 5969, 5970, 5971, 5972, 5973, 5974, 5975, 5976, 5977, 5978, 5979, 5980, 5981, 5982, 5983, 5984, 5985, 5986, 5987, 5988, 5989, 5990, 5991, 5992, 5993, 5994, 5995, 5996, 5997, 5998, 5999, 6000, 6001, 6002, 6003, 6004, 6005, 6006, 6007, 6008, 6009, 6010, 6011, 6012, 6013, 6014, 6015, 6016, 6017, 6018, 6019, 6020, 6021, 6022, 6023, 6024, 6025, 6026, 6027, 6028, 6029, 6030, 6031, 6032, 6033, 6034, 6035, 6036, 6037, 6038, 6039, 6040, 6041, 6042, 6043, 6044, 6045, 6046, 6047, 6048, 6049, 6050, 6051, 6052, 6053, 6054, 6055, 6056, 6057, 6058, 6059, 6060, 6061, 6062, 6063, 6064, 6065, 6066, 6067, 6068, 6069, 6070, 6071, 6072, 6073, 6074, 6075, 6076, 6077, 6078, 6079, 6080, 6081, 6082, 6083, 6084, 6085, 6086, 6087, 6088, 6089, 6090, 6091, 6092, 6093, 6094, 6095, 6096, 6097, 6098, 6099, 6100, 6101, 6102, 6103, 6104, 6105, 6106, 6107, 6108, 6109, 6110, 6111, 6112, 6113, 6114, 6115, 6116, 6117, 6118, 6119, 6120, 6121, 6122, 6123, 6124, 6125, 6126, 6127, 6128, 6129, 6130, 6131, 6132, 6133, 6134, 6135, 6136, 6137, 6138, 6139, 6140, 6141, 6142, 6143, 6144, 6145, 6146, 6147, 6148, 6149, 6150, 6151, 6152, 6153, 6154, 6155, 6156, 6157, 6158, 6159, 6160, 6161, 6162, 6163, 6164, 6165, 6166, 6167, 6168, 6169, 6170, 6171, 6172, 6173, 6174, 6175, 6176, 6177, 6178, 6179, 6180, 6181, 6182, 6183, 6184, 6185, 6186, 6187, 6188, 6189, 6190, 6191, 6192, 6193, 6194, 6195, 6196, 6197, 6198, 6199, 6200, 6201, 6202, 6203, 6204, 6205, 6206, 6207, 6208, 6209, 6210, 6211, 6212, 6213, 6214, 6215, 6216, 6217, 6218, 6219, 6220, 6221, 6222, 6223, 6224, 6225, 6226, 6227, 6228, 6229, 6230, 6231, 6232, 6233, 6234, 6235, 6236, 6237, 6238, 6239, 6240, 6241, 6242, 6243, 6244, 6245, 6246, 6247, 6248, 6249, 6250, 6251, 6252, 6253, 6254, 6255, 6256, 6257, 6258, 6259, 6260, 6261, 6262, 6263, 6264, 6265, 6266, 6267, 6268, 6269, 6270, 6271, 6272, 6273, 6274, 6275, 6276, 6277, 6278, 6279, 6280, 6281, 6282, 6283, 6284, 6285, 6286, 6287, 6288, 6289, 6290, 6291, 6292, 6293, 6294, 6295, 6296, 6297, 6298, 6299, 6300, 6301, 6302, 6303, 6304, 6305, 6306, 6307, 6308, 6309, 6310, 6311, 6312, 6313, 6314, 6315, 6316, 6317, 6318, 6319, 6320, 6321, 6322, 6323, 6324, 6325, 6326, 6327, 6328, 6329, 6330, 6331, 6332, 6333, 6334, 6335, 6336, 6337, 6338, 6339, 6340, 6341, 6342, 6343, 6344, 6345, 6346, 6347, 6348, 6349, 6350, 6351, 6352, 6353, 6354, 6355, 6356, 6357, 6358, 6359, 6360, 6361, 6362, 6363, 6364, 6365, 6366, 6367, 6368, 6369, 6370, 6371, 6372, 6373, 6374, 6375, 6376, 6377, 6378, 6379, 6380, 6381, 6382, 6383, 6384, 6385, 6386, 6387, 6388, 6389, 6390, 6391, 6392, 6393, 6394, 6395, 6396, 6397, 6398, 6399, 6400, 6401, 6402, 6403, 6404, 6405, 6406, 6407, 6408, 6409, 6410, 6411, 6412, 6413, 6414, 6415, 6416, 6417, 6418, 6419, 6420, 6421, 6422, 6423, 6424, 6425, 6426, 6427, 6428, 6429, 6430, 6431, 6432, 6433, 6434, 6435, 6436, 6437, 6438, 6439, 6440, 6441, 6442, 6443, 6444, 6445, 6446, 6447, 6448, 6449, 6450, 6451, 6452, 6453, 6454, 6455, 6456, 6457, 6458, 6459, 6460, 6461, 6462, 6463, 6464, 6465, 6466, 6467, 6468, 6469, 6470, 6471, 6472, 6473, 6474, 6475, 6476, 6477, 6478, 6479, 6480, 6481, 6482, 6483, 6484, 6485, 6486, 6487, 6488, 6489, 6490, 6491, 6492, 6493, 6494, 6495, 6496, 6497, 6498, 6499, 6500, 6501, 6502, 6503, 6504, 6505, 6506, 6507, 6508, 6509, 6510, 6511, 6512, 6513, 6514, 6515, 6516, 6517, 6518, 6519, 6520, 6521, 6522, 6523, 6524, 6525, 6526, 6527, 6528, 6529, 6530, 6531, 6532, 6533, 6534, 6535, 6536, 6537, 6538, 6539, 6540, 6541, 6542, 6543, 6544, 6545, 6546, 6547, 6548, 6549, 6550, 6551, 6552, 6553, 6554, 6555, 6556, 6557, 6558, 6559, 6560, 6561, 6562, 6563, 6564, 6565, 6566, 6567, 6568, 6569, 6570, 6571, 6572, 6573, 6574, 6575, 6576, 6577, 6578, 6579, 6580, 6581, 6582, 6583, 6584, 6585, 6586, 6587, 6588, 6589, 6590, 6591, 6592, 6593, 6594, 6595, 6596, 6597, 6598, 6599, 6600, 6601, 6602, 6603, 6604, 6605, 6606, 6607, 6608, 6609, 6610, 6611, 6612, 6613, 6614, 6615, 6616, 6617, 6618, 6619, 6620, 6621, 6622, 6623, 6624, 6625, 6626, 6627, 6628, 6629, 6630, 6631, 6632, 6633, 6634, 6635, 6636, 6637, 6638, 6639, 6640, 6641, 6642, 6643, 6644, 6645, 6646, 6647, 6648, 6649, 6650, 6651, 6652, 6653, 6654, 6655, 6656, 6657, 6658, 6659, 6660, 6661, 6662, 6663, 6664, 6665, 6666, 6667, 6668, 6669, 6670, 6671, 6672, 6673, 6674, 6675, 6676, 6677, 6678, 6679, 6680, 6681, 6682, 6683, 6684, 6685, 6686, 6687, 6688, 6689, 6690, 6691, 6692, 6693, 6694, 6695, 6696, 6697, 6698, 6699, 6700, 6701, 6702, 6703, 6704, 6705, 6706, 6707, 6708, 6709, 6710, 6711, 6712, 6713, 6714, 6715, 6716, 6717, 6718, 6719, 6720, 6721, 6722, 6723, 6724, 6725, 6726, 6727, 6728, 6729, 6730, 6731, 6732, 6733, 6734, 6735, 6736, 6737, 6738, 6739, 6740, 6741, 6742, 6743, 6744, 6745, 6746, 6747, 6748, 6749, 6750, 6751, 6752, 6753, 6754, 6755, 6756, 6757, 6758, 6759, 6760, 6761, 6762, 6763, 6764, 6765, 6766, 6767, 6768, 6769, 6770, 6771, 6772, 6773, 6774, 6775, 6776, 6777, 6778, 6779, 6780, 6781, 6782, 6783, 6784, 6785, 6786, 6787, 6788, 6789, 6790, 6791, 6792, 6793, 6794, 6795, 6796, 6797, 6798, 6799, 6800, 6801, 6802, 6803, 6804, 6805, 6806, 6807, 6808, 6809, 6810, 6811, 6812, 6813, 6814, 6815, 6816, 6817, 6818, 6819, 6820, 6821, 6822, 6823, 6824, 6825, 6826, 6827, 6828, 6829, 6830, 6831, 6832, 6833, 6834, 6835, 6836, 6837, 6838, 6839, 6840, 6841, 6842, 6843, 6844, 6845, 6846, 6847, 6848, 6849, 6850, 6851, 6852, 6853, 6854, 6855, 6856, 6857, 6858, 6859, 6860, 6861, 6862, 6863, 6864, 6865, 6866, 6867, 6868, 6869, 6870, 6871, 6872, 6873, 6874, 6875, 6876, 6877, 6878, 6879, 6880, 6881, 6882, 6883, 6884, 6885, 6886, 6887, 6888, 6889, 6890, 6891, 6892, 6893, 6894, 6895, 6896, 6897, 6898, 6899, 6900, 6901, 6902, 6903, 6904, 6905, 6906, 6907, 6908, 6909, 6910, 6911, 6912, 6913, 6914, 6915, 6916, 6917, 6918, 6919, 6920, 6921, 6922, 6923, 6924, 6925, 6926, 6927, 6928, 6929, 6930, 6931, 6932, 6933, 6934, 6935, 6936, 6937, 6938, 6939, 6940, 6941, 6942, 6943, 6944, 6945, 6946, 6947, 6948, 6949, 6950, 6951, 6952, 6953, 6954, 6955, 6956, 6957, 6958, 6959, 6960, 6961, 6962, 6963, 6964, 6965, 6966, 6967, 6968, 6969, 6970, 6971, 6972, 6973, 6974, 6975, 6976, 6977, 6978, 6979, 6980, 6981, 6982, 6983, 6984, 6985, 6986, 6987, 6988, 6989, 6990, 6991, 6992, 6993, 6994, 6995, 6996, 6997, 6998, 6999, 7000, 7001, 7002, 7003, 7004, 7005, 7006, 7007, 7008, 7009, 7010, 7011, 7012, 7013, 7014, 7015, 7016, 7017, 7018, 7019, 7020, 7021, 7022, 7023, 7024, 7025, 7026, 7027, 7028, 7029, 7030, 7031, 7032, 7033, 7034, 7035, 7036, 7037, 7038, 7039, 7040, 7041, 7042, 7043, 7044, 7045, 7046, 7047, 7048, 7049, 7050, 7051, 7052, 7053, 7054, 7055, 7056, 7057, 7058, 7059, 7060, 7061, 7062, 7063, 7064, 7065, 7066, 7067, 7068, 7069, 7070, 7071, 7072, 7073, 7074, 7075, 7076, 7077, 7078, 7079, 7080, 7081, 7082, 7083, 7084, 7085, 7086, 7087, 7088, 7089, 7090, 7091, 7092, 7093, 7094, 7095, 7096, 7097, 7098, 7099, 7100, 7101, 7102, 7103, 7104, 7105, 7106, 7107, 7108, 7109, 7110, 7111, 7112, 7113, 7114, 7115, 7116, 7117, 7118, 7119, 7120, 7121, 7122, 7123, 7124, 7125, 7126, 7127, 7128, 7129, 7130, 7131, 7132, 7133, 7134, 7135, 7136, 7137, 7138, 7139, 7140, 7141, 7142, 7143, 7144, 7145, 7146, 7147, 7148, 7149, 7150, 7151, 7152, 7153, 7154, 7155, 7156, 7157, 7158, 7159, 7160, 7161, 7162, 7163, 7164, 7165, 7166, 7167, 7168, 7169, 7170, 7171, 7172, 7173, 7174, 7175, 7176, 7177, 7178, 7179, 7180, 7181, 7182, 7183, 7184, 7185, 7186, 7187, 7188, 7189, 7190, 7191, 7192, 7193, 7194, 7195, 7196, 7197, 7198, 7199, 7200, 7201, 7202, 7203, 7204, 7205, 7206, 7207, 7208, 7209, 7210, 7211, 7212, 7213, 7214, 7215, 7216, 7217, 7218, 7219, 7220, 7221, 7222, 7223, 7224, 7225, 7226, 7227, 7228, 7229, 7230, 7231, 7232, 7233, 7234, 7235, 7236, 7237, 7238, 7239, 7240, 7241, 7242, 7243, 7244, 7245, 7246, 7247, 7248, 7249, 7250, 7251, 7252, 7253, 7254, 7255, 7256, 7257, 7258, 7259, 7260, 7261, 7262, 7263, 7264, 7265, 7266, 7267, 7268, 7269, 7270, 7271, 7272, 7273, 7274, 7275, 7276, 7277, 7278, 7279, 7280, 7281, 7282, 7283, 7284, 7285, 7286, 7287, 7288, 7289, 7290, 7291, 7292, 7293, 7294, 7295, 7296, 7297, 7298, 7299, 7300, 7301, 7302, 7303, 7304, 7305, 7306, 7307, 7308, 7309, 7310, 7311, 7312, 7313, 7314, 7315, 7316, 7317, 7318, 7319, 7320, 7321, 7322, 7323, 7324, 7325, 7326, 7327, 7328, 7329, 7330, 7331, 7332, 7333, 7334, 7335, 7336, 7337, 7338, 7339, 7340, 7341, 7342, 7343, 7344, 7345, 7346, 7347, 7348, 7349, 7350, 7351, 7352, 7353, 7354, 7355, 7356, 7357, 7358, 7359, 7360, 7361, 7362, 7363, 7364, 7365, 7366, 7367, 7368, 7369, 7370, 7371, 7372, 7373, 7374, 7375, 7376, 7377, 7378, 7379, 7380, 7381, 7382, 7383, 7384, 7385, 7386, 7387, 7388, 7389, 7390, 7391, 7392, 7393, 7394, 7395, 7396, 7397, 7398, 7399, 7400, 7401, 7402, 7403, 7404, 7405, 7406, 7407, 7408, 7409, 7410, 7411, 7412, 7413, 7414, 7415, 7416, 7417, 7418, 7419, 7420, 7421, 7422, 7423, 7424, 7425, 7426, 7427, 7428, 7429, 7430, 7431, 7432, 7433, 7434, 7435, 7436, 7437, 7438, 7439, 7440, 7441, 7442, 7443, 7444, 7445, 7446, 7447, 7448, 7449, 7450, 7451, 7452, 7453, 7454, 7455, 7456, 7457, 7458, 7459, 7460, 7461, 7462, 7463, 7464, 7465, 7466, 7467, 7468, 7469, 7470, 7471, 7472, 7473, 7474, 7475, 7476, 7477, 7478, 7479, 7480, 7481, 7482, 7483, 7484, 7485, 7486, 7487, 7488, 7489, 7490, 7491, 7492, 7493, 7494, 7495, 7496, 7497, 7498, 7499, 7500, 7501, 7502, 7503, 7504, 7505, 7506, 7507, 7508, 7509, 7510, 7511, 7512, 7513, 7514, 7515, 7516, 7517, 7518, 7519, 7520, 7521, 7522, 7523, 7524, 7525, 7526, 7527, 7528, 7529, 7530, 7531, 7532, 7533, 7534, 7535, 7536, 7537, 7538, 7539, 7540, 7541, 7542, 7543, 7544, 7545, 7546, 7547, 7548, 7549, 7550, 7551, 7552, 7553, 7554, 7555, 7556, 7557, 7558, 7559, 7560, 7561, 7562, 7563, 7564, 7565, 7566, 7567, 7568, 7569, 7570, 7571, 7572, 7573, 7574, 7575, 7576, 7577, 7578, 7579, 7580, 7581, 7582, 7583, 7584, 7585, 7586, 7587, 7588, 7589, 7590, 7591, 7592, 7593, 7594, 7595, 7596, 7597, 7598, 7599, 7600, 7601, 7602, 7603, 7604, 7605, 7606, 7607, 7608, 7609, 7610, 7611, 7612, 7613, 7614, 7615, 7616, 7617, 7618, 7619, 7620, 7621, 7622, 7623, 7624, 7625, 7626, 7627, 7628, 7629, 7630, 7631, 7632, 7633, 7634, 7635, 7636, 7637, 7638, 7639, 7640, 7641, 7642, 7643, 7644, 7645, 7646, 7647, 7648, 7649, 7650, 7651, 7652, 7653, 7654, 7655, 7656, 7657, 7658, 7659, 7660, 7661, 7662, 7663, 7664, 7665, 7666, 7667, 7668, 7669, 7670, 7671, 7672, 7673, 7674, 7675, 7676, 7677, 7678, 7679, 7680, 7681, 7682, 7683, 7684, 7685, 7686, 7687, 7688, 7689, 7690, 7691, 7692, 7693, 7694, 7695, 7696, 7697, 7698, 7699, 7700, 7701, 7702, 7703, 7704, 7705, 7706, 7707, 7708, 7709, 7710, 7711, 7712, 7713, 7714, 7715, 7716, 7717, 7718, 7719, 7720, 7721, 7722, 7723, 7724, 7725, 7726, 7727, 7728, 7729, 7730, 7731, 7732, 7733, 7734, 7735, 7736, 7737, 7738, 7739, 7740, 7741, 7742, 7743, 7744, 7745, 7746, 7747, 7748, 7749, 7750, 7751, 7752, 7753, 7754, 7755, 7756, 7757, 7758, 7759, 7760, 7761, 7762, 7763, 7764, 7765, 7766, 7767, 7768, 7769, 7770, 7771, 7772, 7773, 7774, 7775, 7776, 7777, 7778, 7779, 7780, 7781, 7782, 7783, 7784, 7785, 7786, 7787, 7788, 7789, 7790, 7791, 7792, 7793, 7794, 7795, 7796, 7797, 7798, 7799, 7800, 7801, 7802, 7803, 7804, 7805, 7806, 7807, 7808, 7809, 7810, 7811, 7812, 7813, 7814, 7815, 7816, 7817, 7818, 7819, 7820, 7821, 7822, 7823, 7824, 7825, 7826, 7827, 7828, 7829, 7830, 7831, 7832, 7833, 7834, 7835, 7836, 7837, 7838, 7839, 7840, 7841, 7842, 7843, 7844, 7845, 7846, 7847, 7848, 7849, 7850, 7851, 7852, 7853, 7854, 7855, 7856, 7857, 7858, 7859, 7860, 7861, 7862, 7863, 7864, 7865, 7866, 7867, 7868, 7869, 7870, 7871, 7872, 7873, 7874, 7875, 7876, 7877, 7878, 7879, 7880, 7881, 7882, 7883, 7884, 7885, 7886, 7887, 7888, 7889, 7890, 7891, 7892, 7893, 7894, 7895, 7896, 7897, 7898, 7899, 7900, 7901, 7902, 7903, 7904, 7905, 7906, 7907, 7908, 7909, 7910, 7911, 7912, 7913, 7914, 7915, 7916, 7917, 7918, 7919, 7920, 7921, 7922, 7923, 7924, 7925, 7926, 7927, 7928, 7929, 7930, 7931, 7932, 7933, 7934, 7935, 7936, 7937, 7938, 7939, 7940, 7941, 7942, 7943, 7944, 7945, 7946, 7947, 7948, 7949, 7950, 7951, 7952, 7953, 7954, 7955, 7956, 7957, 7958, 7959, 7960, 7961, 7962, 7963, 7964, 7965, 7966, 7967, 7968, 7969, 7970, 7971, 7972, 7973, 7974, 7975, 7976, 7977, 7978, 7979, 7980, 7981, 7982, 7983, 7984, 7985, 7986, 7987, 7988, 7989, 7990, 7991, 7992, 7993, 7994, 7995, 7996, 7997, 7998, 7999, 8000, 8001, 8002, 8003, 8004, 8005, 8006, 8007, 8008, 8009, 8010, 8011, 8012, 8013, 8014, 8015, 8016, 8017, 8018, 8019, 8020, 8021, 8022, 8023, 8024, 8025, 8026, 8027, 8028, 8029, 8030, 8031, 8032, 8033, 8034, 8035, 8036, 8037, 8038, 8039, 8040, 8041, 8042, 8043, 8044, 8045, 8046, 8047, 8048, 8049, 8050, 8051, 8052, 8053, 8054, 8055, 8056, 8057, 8058, 8059, 8060, 8061, 8062, 8063, 8064, 8065, 8066, 8067, 8068, 8069, 8070, 8071, 8072, 8073, 8074, 8075, 8076, 8077, 8078, 8079, 8080, 8081, 8082, 8083, 8084, 8085, 8086, 8087, 8088, 8089, 8090, 8091, 8092, 8093, 8094, 8095, 8096, 8097, 8098, 8099, 8100, 8101, 8102, 8103, 8104, 8105, 8106, 8107, 8108, 8109, 8110, 8111, 8112, 8113, 8114, 8115, 8116, 8117, 8118, 8119, 8120, 8121, 8122, 8123, 8124, 8125, 8126, 8127, 8128, 8129, 8130, 8131, 8132, 8133, 8134, 8135, 8136, 8137, 8138, 8139, 8140, 8141, 8142, 8143, 8144, 8145, 8146, 8147, 8148, 8149, 8150, 8151, 8152, 8153, 8154, 8155, 8156, 8157, 8158, 8159, 8160, 8161, 8162, 8163, 8164, 8165, 8166, 8167, 8168, 8169, 8170, 8171, 8172, 8173, 8174, 8175, 8176, 8177, 8178, 8179, 8180, 8181, 8182, 8183, 8184, 8185, 8186, 8187, 8188, 8189, 8190, 8191, 8192, 8193, 8194, 8195, 8196, 8197, 8198, 8199, 8200, 8201, 8202, 8203, 8204, 8205, 8206, 8207, 8208, 8209, 8210, 8211, 8212, 8213, 8214, 8215, 8216, 8217, 8218, 8219, 8220, 8221, 8222, 8223, 8224, 8225, 8226, 8227, 8228, 8229, 8230, 8231, 8232, 8233, 8234, 8235, 8236, 8237, 8238, 8239, 8240, 8241, 8242, 8243, 8244, 8245, 8246, 8247, 8248, 8249, 8250, 8251, 8252, 8253, 8254, 8255, 8256, 8257, 8258, 8259, 8260, 8261, 8262, 8263, 8264, 8265, 8266, 8267, 8268, 8269, 8270, 8271, 8272, 8273, 8274, 8275, 8276, 8277, 8278, 8279, 8280, 8281, 8282, 8283, 8284, 8285, 8286, 8287, 8288, 8289, 8290, 8291, 8292, 8293, 8294, 8295, 8296, 8297, 8298, 8299, 8300, 8301, 8302, 8303, 8304, 8305, 8306, 8307, 8308, 8309, 8310, 8311, 8312, 8313, 8314, 8315, 8316, 8317, 8318, 8319, 8320, 8321, 8322, 8323, 8324, 8325, 8326, 8327, 8328, 8329, 8330, 8331, 8332, 8333, 8334, 8335, 8336, 8337, 8338, 8339, 8340, 8341, 8342, 8343, 8344, 8345, 8346, 8347, 8348, 8349, 8350, 8351, 8352, 8353, 8354, 8355, 8356, 8357, 8358, 8359, 8360, 8361, 8362, 8363, 8364, 8365, 8366, 8367, 8368, 8369, 8370, 8371, 8372, 8373, 8374, 8375, 8376, 8377, 8378, 8379, 8380, 8381, 8382, 8383, 8384, 8385, 8386, 8387, 8388, 8389, 8390, 8391, 8392, 8393, 8394, 8395, 8396, 8397, 8398, 8399, 8400, 8401, 8402, 8403, 8404, 8405, 8406, 8407, 8408, 8409, 8410, 8411, 8412, 8413, 8414, 8415, 8416, 8417, 8418, 8419, 8420, 8421, 8422, 8423, 8424, 8425, 8426, 8427, 8428, 8429, 8430, 8431, 8432, 8433, 8434, 8435, 8436, 8437, 8438, 8439, 8440, 8441, 8442, 8443, 8444, 8445, 8446, 8447, 8448, 8449, 8450, 8451, 8452, 8453, 8454, 8455, 8456, 8457, 8458, 8459, 8460, 8461, 8462, 8463, 8464, 8465, 8466, 8467, 8468, 8469, 8470, 8471, 8472, 8473, 8474, 8475, 8476, 8477, 8478, 8479, 8480, 8481, 8482, 8483, 8484, 8485, 8486, 8487, 8488, 8489, 8490, 8491, 8492, 8493, 8494, 8495, 8496, 8497, 8498, 8499, 8500, 8501, 8502, 8503, 8504, 8505, 8506, 8507, 8508, 8509, 8510, 8511, 8512, 8513, 8514, 8515, 8516, 8517, 8518, 8519, 8520, 8521, 8522, 8523, 8524, 8525, 8526, 8527, 8528, 8529, 8530, 8531, 8532, 8533, 8534, 8535, 8536, 8537, 8538, 8539, 8540, 8541, 8542, 8543, 8544, 8545, 8546, 8547, 8548, 8549, 8550, 8551, 8552, 8553, 8554, 8555, 8556, 8557, 8558, 8559, 8560, 8561, 8562, 8563, 8564, 8565, 8566, 8567, 8568, 8569, 8570, 8571, 8572, 8573, 8574, 8575, 8576, 8577, 8578, 8579, 8580, 8581, 8582, 8583, 8584, 8585, 8586, 8587, 8588, 8589, 8590, 8591, 8592, 8593, 8594, 8595, 8596, 8597, 8598, 8599, 8600, 8601, 8602, 8603, 8604, 8605, 8606, 8607, 8608, 8609, 8610, 8611, 8612, 8613, 8614, 8615, 8616, 8617, 8618, 8619, 8620, 8621, 8622, 8623, 8624, 8625, 8626, 8627, 8628, 8629, 8630, 8631, 8632, 8633, 8634, 8635, 8636, 8637, 8638, 8639, 8640, 8641, 8642, 8643, 8644, 8645, 8646, 8647, 8648, 8649, 8650, 8651, 8652, 8653, 8654, 8655, 8656, 8657, 8658, 8659, 8660, 8661, 8662, 8663, 8664, 8665, 8666, 8667, 8668, 8669, 8670, 8671, 8672, 8673, 8674, 8675, 8676, 8677, 8678, 8679, 8680, 8681, 8682, 8683, 8684, 8685, 8686, 8687, 8688, 8689, 8690, 8691, 8692, 8693, 8694, 8695, 8696, 8697, 8698, 8699, 8700, 8701, 8702, 8703, 8704, 8705, 8706, 8707, 8708, 8709, 8710, 8711, 8712, 8713, 8714, 8715, 8716, 8717, 8718, 8719, 8720, 8721, 8722, 8723, 8724, 8725, 8726, 8727, 8728, 8729, 8730, 8731, 8732, 8733, 8734, 8735, 8736, 8737, 8738, 8739, 8740, 8741, 8742, 8743, 8744, 8745, 8746, 8747, 8748, 8749, 8750, 8751, 8752, 8753, 8754, 8755, 8756, 8757, 8758, 8759, 8760, 8761, 8762, 8763, 8764, 8765, 8766, 8767, 8768, 8769, 8770, 8771, 8772, 8773, 8774, 8775, 8776, 8777, 8778, 8779, 8780, 8781, 8782, 8783, 8784, 8785, 8786, 8787, 8788, 8789, 8790, 8791, 8792, 8793, 8794, 8795, 8796, 8797, 8798, 8799, 8800, 8801, 8802, 8803, 8804, 8805, 8806, 8807, 8808, 8809, 8810, 8811, 8812, 8813, 8814, 8815, 8816, 8817, 8818, 8819, 8820, 8821, 8822, 8823, 8824, 8825, 8826, 8827, 8828, 8829, 8830, 8831, 8832, 8833, 8834, 8835, 8836, 8837, 8838, 8839, 8840, 8841, 8842, 8843, 8844, 8845, 8846, 8847, 8848, 8849, 8850, 8851, 8852, 8853, 8854, 8855, 8856, 8857, 8858, 8859, 8860, 8861, 8862, 8863, 8864, 8865, 8866, 8867, 8868, 8869, 8870, 8871, 8872, 8873, 8874, 8875, 8876, 8877, 8878, 8879, 8880, 8881, 8882, 8883, 8884, 8885, 8886, 8887, 8888, 8889, 8890, 8891, 8892, 8893, 8894, 8895, 8896, 8897, 8898, 8899, 8900, 8901, 8902, 8903, 8904, 8905, 8906, 8907, 8908, 8909, 8910, 8911, 8912, 8913, 8914, 8915, 8916, 8917, 8918, 8919, 8920, 8921, 8922, 8923, 8924, 8925, 8926, 8927, 8928, 8929, 8930, 8931, 8932, 8933, 8934, 8935, 8936, 8937, 8938, 8939, 8940, 8941, 8942, 8943, 8944, 8945, 8946, 8947, 8948, 8949, 8950, 8951, 8952, 8953, 8954, 8955, 8956, 8957, 8958, 8959, 8960, 8961, 8962, 8963, 8964, 8965, 8966, 8967, 8968, 8969, 8970, 8971, 8972, 8973, 8974, 8975, 8976, 8977, 8978, 8979, 8980, 8981, 8982, 8983, 8984, 8985, 8986, 8987, 8988, 8989, 8990, 8991, 8992, 8993, 8994, 8995, 8996, 8997, 8998, 8999, 9000, 9001, 9002, 9003, 9004, 9005, 9006, 9007, 9008, 9009, 9010, 9011, 9012, 9013, 9014, 9015, 9016, 9017, 9018, 9019, 9020, 9021, 9022, 9023, 9024, 9025, 9026, 9027, 9028, 9029, 9030, 9031, 9032, 9033, 9034, 9035, 9036, 9037, 9038, 9039, 9040, 9041, 9042, 9043, 9044, 9045, 9046, 9047, 9048, 9049, 9050, 9051, 9052, 9053, 9054, 9055, 9056, 9057, 9058, 9059, 9060, 9061, 9062, 9063, 9064, 9065, 9066, 9067, 9068, 9069, 9070, 9071, 9072, 9073, 9074, 9075, 9076, 9077, 9078, 9079, 9080, 9081, 9082, 9083, 9084, 9085, 9086, 9087, 9088, 9089, 9090, 9091, 9092, 9093, 9094, 9095, 9096, 9097, 9098, 9099, 9100, 9101, 9102, 9103, 9104, 9105, 9106, 9107, 9108, 9109, 9110, 9111, 9112, 9113, 9114, 9115, 9116, 9117, 9118, 9119, 9120, 9121, 9122, 9123, 9124, 9125, 9126, 9127, 9128, 9129, 9130, 9131, 9132, 9133, 9134, 9135, 9136, 9137, 9138, 9139, 9140, 9141, 9142, 9143, 9144, 9145, 9146, 9147, 9148, 9149, 9150, 9151, 9152, 9153, 9154, 9155, 9156, 9157, 9158, 9159, 9160, 9161, 9162, 9163, 9164, 9165, 9166, 9167, 9168, 9169, 9170, 9171, 9172, 9173, 9174, 9175, 9176, 9177, 9178, 9179, 9180, 9181, 9182, 9183, 9184, 9185, 9186, 9187, 9188, 9189, 9190, 9191, 9192, 9193, 9194, 9195, 9196, 9197, 9198, 9199, 9200, 9201, 9202, 9203, 9204, 9205, 9206, 9207, 9208, 9209, 9210, 9211, 9212, 9213, 9214, 9215, 9216, 9217, 9218, 9219, 9220, 9221, 9222, 9223, 9224, 9225, 9226, 9227, 9228, 9229, 9230, 9231, 9232, 9233, 9234, 9235, 9236, 9237, 9238, 9239, 9240, 9241, 9242, 9243, 9244, 9245, 9246, 9247, 9248, 9249, 9250, 9251, 9252, 9253, 9254, 9255, 9256, 9257, 9258, 9259, 9260, 9261, 9262, 9263, 9264, 9265, 9266, 9267, 9268, 9269, 9270, 9271, 9272, 9273, 9274, 9275, 9276, 9277, 9278, 9279, 9280, 9281, 9282, 9283, 9284, 9285, 9286, 9287, 9288, 9289, 9290, 9291, 9292, 9293, 9294, 9295, 9296, 9297, 9298, 9299, 9300, 9301, 9302, 9303, 9304, 9305, 9306, 9307, 9308, 9309, 9310, 9311, 9312, 9313, 9314, 9315, 9316, 9317, 9318, 9319, 9320, 9321, 9322, 9323, 9324, 9325, 9326, 9327, 9328, 9329, 9330, 9331, 9332, 9333, 9334, 9335, 9336, 9337, 9338, 9339, 9340, 9341, 9342, 9343, 9344, 9345, 9346, 9347, 9348, 9349, 9350, 9351, 9352, 9353, 9354, 9355, 9356, 9357, 9358, 9359, 9360, 9361, 9362, 9363, 9364, 9365, 9366, 9367, 9368, 9369, 9370, 9371, 9372, 9373, 9374, 9375, 9376, 9377, 9378, 9379, 9380, 9381, 9382, 9383, 9384, 9385, 9386, 9387, 9388, 9389, 9390, 9391, 9392, 9393, 9394, 9395, 9396, 9397, 9398, 9399, 9400, 9401, 9402, 9403, 9404, 9405, 9406, 9407, 9408, 9409, 9410, 9411, 9412, 9413, 9414, 9415, 9416, 9417, 9418, 9419, 9420, 9421, 9422, 9423, 9424, 9425, 9426, 9427, 9428, 9429, 9430, 9431, 9432, 9433, 9434, 9435, 9436, 9437, 9438, 9439, 9440, 9441, 9442, 9443, 9444, 9445, 9446, 9447, 9448, 9449, 9450, 9451, 9452, 9453, 9454, 9455, 9456, 9457, 9458, 9459, 9460, 9461, 9462, 9463, 9464, 9465, 9466, 9467, 9468, 9469, 9470, 9471, 9472, 9473, 9474, 9475, 9476, 9477, 9478, 9479, 9480, 9481, 9482, 9483, 9484, 9485, 9486, 9487, 9488, 9489, 9490, 9491, 9492, 9493, 9494, 9495, 9496, 9497, 9498, 9499, 9500, 9501, 9502, 9503, 9504, 9505, 9506, 9507, 9508, 9509, 9510, 9511, 9512, 9513, 9514, 9515, 9516, 9517, 9518, 9519, 9520, 9521, 9522, 9523, 9524, 9525, 9526, 9527, 9528, 9529, 9530, 9531, 9532, 9533, 9534, 9535, 9536, 9537, 9538, 9539, 9540, 9541, 9542, 9543, 9544, 9545, 9546, 9547, 9548, 9549, 9550, 9551, 9552, 9553, 9554, 9555, 9556, 9557, 9558, 9559, 9560, 9561, 9562, 9563, 9564, 9565, 9566, 9567, 9568, 9569, 9570, 9571, 9572, 9573, 9574, 9575, 9576, 9577, 9578, 9579, 9580, 9581, 9582, 9583, 9584, 9585, 9586, 9587, 9588, 9589, 9590, 9591, 9592, 9593, 9594, 9595, 9596, 9597, 9598, 9599, 9600, 9601, 9602, 9603, 9604, 9605, 9606, 9607, 9608, 9609, 9610, 9611, 9612, 9613, 9614, 9615, 9616, 9617, 9618, 9619, 9620, 9621, 9622, 9623, 9624, 9625, 9626, 9627, 9628, 9629, 9630, 9631, 9632, 9633, 9634, 9635, 9636, 9637, 9638, 9639, 9640, 9641, 9642, 9643, 9644, 9645, 9646, 9647, 9648, 9649, 9650, 9651, 9652, 9653, 9654, 9655, 9656, 9657, 9658, 9659, 9660, 9661, 9662, 9663, 9664, 9665, 9666, 9667, 9668, 9669, 9670, 9671, 9672, 9673, 9674, 9675, 9676, 9677, 9678, 9679, 9680, 9681, 9682, 9683, 9684, 9685, 9686, 9687, 9688, 9689, 9690, 9691, 9692, 9693, 9694, 9695, 9696, 9697, 9698, 9699, 9700, 9701, 9702, 9703, 9704, 9705, 9706, 9707, 9708, 9709, 9710, 9711, 9712, 9713, 9714, 9715, 9716, 9717, 9718, 9719, 9720, 9721, 9722, 9723, 9724, 9725, 9726, 9727, 9728, 9729, 9730, 9731, 9732, 9733, 9734, 9735, 9736, 9737, 9738, 9739, 9740, 9741, 9742, 9743, 9744, 9745, 9746, 9747, 9748, 9749, 9750, 9751, 9752, 9753, 9754, 9755, 9756, 9757, 9758, 9759, 9760, 9761, 9762, 9763, 9764, 9765, 9766, 9767, 9768, 9769, 9770, 9771, 9772, 9773, 9774, 9775, 9776, 9777, 9778, 9779, 9780, 9781, 9782, 9783, 9784, 9785, 9786, 9787, 9788, 9789, 9790, 9791, 9792, 9793, 9794, 9795, 9796, 9797, 9798, 9799, 9800, 9801, 9802, 9803, 9804, 9805, 9806, 9807, 9808, 9809, 9810, 9811, 9812, 9813, 9814, 9815, 9816, 9817, 9818, 9819, 9820, 9821, 9822, 9823, 9824, 9825, 9826, 9827, 9828, 9829, 9830, 9831, 9832, 9833, 9834, 9835, 9836, 9837, 9838, 9839, 9840, 9841, 9842, 9843, 9844, 9845, 9846, 9847, 9848, 9849, 9850, 9851, 9852, 9853, 9854, 9855, 9856, 9857, 9858, 9859, 9860, 9861, 9862, 9863, 9864, 9865, 9866, 9867, 9868, 9869, 9870, 9871, 9872, 9873, 9874, 9875, 9876, 9877, 9878, 9879, 9880, 9881, 9882, 9883, 9884, 9885, 9886, 9887, 9888, 9889, 9890, 9891, 9892, 9893, 9894, 9895, 9896, 9897, 9898, 9899, 9900, 9901, 9902, 9903, 9904, 9905, 9906, 9907, 9908, 9909, 9910, 9911, 9912, 9913, 9914, 9915, 9916, 9917, 9918, 9919, 9920, 9921, 9922, 9923, 9924, 9925, 9926, 9927, 9928, 9929, 9930, 9931, 9932, 9933, 9934, 9935, 9936, 9937, 9938, 9939, 9940, 9941, 9942, 9943, 9944, 9945, 9946, 9947, 9948, 9949, 9950, 9951, 9952, 9953, 9954, 9955, 9956, 9957, 9958, 9959, 9960, 9961, 9962, 9963, 9964, 9965, 9966, 9967, 9968, 9969, 9970, 9971, 9972, 9973, 9974, 9975, 9976, 9977, 9978, 9979, 9980, 9981, 9982, 9983, 9984, 9985, 9986, 9987, 9988, 9989, 9990, 9991, 9992, 9993, 9994, 9995, 9996, 9997, 9998, 9999]\n" ] } ], "source": [ "# Let's collect the data\n", "print subRDD.collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (3d) Perform action `count` to view counts **\n", "#### One of the most basic jobs that we can run is the `count()` job which will count the number of elements in an RDD using the `count()` action. Since `map()` creates a new RDD with the same number of elements as the starting RDD, we expect that applying `count()` to each RDD will return the same result.\n", "#### Note that because `count()` is an action operation, if we had not already performed an action with `collect()`, then Spark would now perform the transformation operations when we executed `count()`.\n", "#### Each task counts the entries in its partition and sends the result to your SparkContext, which adds up all of the counts. The figure below shows what would happen if we ran `count()` on a small example dataset with just four partitions.\n", "![count](http://spark-mooc.github.io/web-assets/images/count.png)" ] }, { "cell_type": "code", "execution_count": 24, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "10000\n", "10000\n" ] } ], "source": [ "print xrangeRDD.count()\n", "print subRDD.count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (3e) Apply transformation `filter` and view results with `collect` **\n", "#### Next, we'll create a new RDD that only contains the values less than ten by using the `filter(f)` data-parallel operation. The `filter(f)` method is a transformation operation that creates a new RDD from the input RDD by applying filter function `f` to each item in the parent RDD and only passing those elements where the filter function returns `True`. Elements that do not return `True` will be dropped. Like `map()`, filter can be applied individually to each entry in the dataset, so is easily parallelized using Spark.\n", "#### The figure below shows how this would work on the small four-partition dataset.\n", "![filter](http://spark-mooc.github.io/web-assets/images/filter.png)\n", "#### To filter this dataset, we'll define a function called `ten()`, which returns `True` if the input is less than 10 and `False` otherwise. This function will be passed to the `filter()` transformation as the filter function `f`.\n", "#### To view the filtered list of elements less than ten, we need to create a new list on the driver from the distributed data on the executor nodes. We use the `collect()` method to return a list that contains all of the elements in this filtered RDD to the driver program." ] }, { "cell_type": "code", "execution_count": 25, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]\n" ] } ], "source": [ "# Define a function to filter a single value\n", "def ten(value):\n", " \"\"\"Return whether value is below ten.\n", "\n", " Args:\n", " value (int): A number.\n", "\n", " Returns:\n", " bool: Whether `value` is less than ten.\n", " \"\"\"\n", " if (value < 10):\n", " return True\n", " else:\n", " return False\n", "# The ten function could also be written concisely as: def ten(value): return value < 10\n", "\n", "# Pass the function ten to the filter transformation\n", "# Filter is a transformation so no tasks are run\n", "filteredRDD = subRDD.filter(ten)\n", "\n", "# View the results using collect()\n", "# Collect is an action and triggers the filter transformation to run\n", "print filteredRDD.collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### ** Part 4: Lambda Functions **" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (4a) Using Python `lambda()` functions **\n", "#### Python supports the use of small one-line anonymous functions that are not bound to a name at runtime. Borrowed from LISP, these `lambda` functions can be used wherever function objects are required. They are syntactically restricted to a single expression. Remember that `lambda` functions are a matter of style and using them is never required - semantically, they are just syntactic sugar for a normal function definition. You can always define a separate normal function instead, but using a `lambda()` function is an equivalent and more compact form of coding. Ideally you should consider using `lambda` functions where you want to encapsulate non-reusable code without littering your code with one-line functions.\n", "#### Here, instead of defining a separate function for the `filter()` transformation, we will use an inline `lambda()` function." ] }, { "cell_type": "code", "execution_count": 26, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]" ] }, "execution_count": 26, "metadata": {}, "output_type": "execute_result" } ], "source": [ "lambdaRDD = subRDD.filter(lambda x: x < 10)\n", "lambdaRDD.collect()" ] }, { "cell_type": "code", "execution_count": 27, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "[0, 2, 4, 6, 8]" ] }, "execution_count": 27, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Let's collect the even values less than 10\n", "evenRDD = lambdaRDD.filter(lambda x: x % 2 == 0)\n", "evenRDD.collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### ** Part 5: Additional RDD actions **" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (5a) Other common actions **\n", "#### Let's investigate the additional actions: [first()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.first), [take()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.take), [top()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.top), [takeOrdered()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.takeOrdered), and [reduce()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduce)\n", "#### One useful thing to do when we have a new dataset is to look at the first few entries to obtain a rough idea of what information is available. In Spark, we can do that using the `first()`, `take()`, `top()`, and `takeOrdered()` actions. Note that for the `first()` and `take()` actions, the elements that are returned depend on how the RDD is *partitioned*.\n", "#### Instead of using the `collect()` action, we can use the `take(n)` action to return the first n elements of the RDD. The `first()` action returns the first element of an RDD, and is equivalent to `take(1)`.\n", "#### The `takeOrdered()` action returns the first n elements of the RDD, using either their natural order or a custom comparator. The key advantage of using `takeOrdered()` instead of `first()` or `take()` is that `takeOrdered()` returns a deterministic result, while the other two actions may return differing results, depending on the number of partions or execution environment. `takeOrdered()` returns the list sorted in *ascending order*. The `top()` action is similar to `takeOrdered()` except that it returns the list in *descending order.*\n", "#### The `reduce()` action reduces the elements of a RDD to a single value by applying a function that takes two parameters and returns a single value. The function should be commutative and associative, as `reduce()` is applied at the partition level and then again to aggregate results from partitions. If these rules don't hold, the results from `reduce()` will be inconsistent. Reducing locally at partitions makes `reduce()` very efficient." ] }, { "cell_type": "code", "execution_count": 28, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "0\n", "[0, 1, 2, 3]\n", "[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]\n" ] } ], "source": [ "# Let's get the first element\n", "print filteredRDD.first()\n", "# The first 4\n", "print filteredRDD.take(4)\n", "# Note that it is ok to take more elements than the RDD has\n", "print filteredRDD.take(12)" ] }, { "cell_type": "code", "execution_count": 29, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[0, 1, 2]\n", "[9, 8, 7, 6, 5]\n" ] } ], "source": [ "# Retrieve the three smallest elements\n", "print filteredRDD.takeOrdered(3)\n", "# Retrieve the five largest elements\n", "print filteredRDD.top(5)" ] }, { "cell_type": "code", "execution_count": 30, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "[9, 8, 7, 6]" ] }, "execution_count": 30, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Pass a lambda function to takeOrdered to reverse the order\n", "filteredRDD.takeOrdered(4, lambda s: -s)" ] }, { "cell_type": "code", "execution_count": 31, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "45\n", "45\n", "-45\n", "21\n", "45\n" ] } ], "source": [ "# Obtain Python's add function\n", "from operator import add\n", "# Efficiently sum the RDD using reduce\n", "print filteredRDD.reduce(add)\n", "# Sum using reduce with a lambda function\n", "print filteredRDD.reduce(lambda a, b: a + b)\n", "# Note that subtraction is not both associative and commutative\n", "print filteredRDD.reduce(lambda a, b: a - b)\n", "print filteredRDD.repartition(4).reduce(lambda a, b: a - b)\n", "# While addition is\n", "print filteredRDD.repartition(4).reduce(lambda a, b: a + b)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (5b) Advanced actions **\n", "#### Here are two additional actions that are useful for retrieving information from an RDD: [takeSample()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.takeSample) and [countByValue()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.countByValue)\n", "#### The `takeSample()` action returns an array with a random sample of elements from the dataset. It takes in a `withReplacement` argument, which specifies whether it is okay to randomly pick the same item multiple times from the parent RDD (so when `withReplacement=True`, you can get the same item back multiple times). It also takes an optional `seed` parameter that allows you to specify a seed value for the random number generator, so that reproducible results can be obtained.\n", "#### The `countByValue()` action returns the count of each unique value in the RDD as a dictionary that maps values to counts." ] }, { "cell_type": "code", "execution_count": 32, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[4, 3, 2, 5, 7, 1]\n", "[1, 9, 4, 7, 8, 2]\n" ] } ], "source": [ "# takeSample reusing elements\n", "print filteredRDD.takeSample(withReplacement=True, num=6)\n", "# takeSample without reuse\n", "print filteredRDD.takeSample(withReplacement=False, num=6)" ] }, { "cell_type": "code", "execution_count": 33, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[0, 2, 5, 3, 6, 9]\n" ] } ], "source": [ "# Set seed for predictability\n", "print filteredRDD.takeSample(withReplacement=False, num=6, seed=500)\n", "# Try reruning this cell and the cell above -- the results from this cell will remain constant\n", "# Use ctrl-enter to run without moving to the next cell" ] }, { "cell_type": "code", "execution_count": 34, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "defaultdict(, {1: 4, 2: 4, 3: 5, 4: 2, 5: 1, 6: 1})\n" ] } ], "source": [ "# Create new base RDD to show countByValue\n", "repetitiveRDD = sc.parallelize([1, 2, 3, 1, 2, 3, 1, 2, 1, 2, 3, 3, 3, 4, 5, 4, 6])\n", "print repetitiveRDD.countByValue()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### ** Part 6: Additional RDD transformations **" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (6a) `flatMap` **\n", "#### When performing a `map()` transformation using a function, sometimes the function will return more (or less) than one element. We would like the newly created RDD to consist of the elements outputted by the function. Simply applying a `map()` transformation would yield a new RDD made up of iterators. Each iterator could have zero or more elements. Instead, we often want an RDD consisting of the values contained in those iterators. The solution is to use a [flatMap()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.flatMap) transformation, `flatMap()` is similar to `map()`, except that with `flatMap()` each input item can be mapped to zero or more output elements.\n", "#### To demonstrate `flatMap()`, we will first emit a word along with its plural, and then a range that grows in length with each subsequent operation." ] }, { "cell_type": "code", "execution_count": 35, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[('cat', 'cats'), ('elephant', 'elephants'), ('rat', 'rats'), ('rat', 'rats'), ('cat', 'cats')]\n", "['cat', 'cats', 'elephant', 'elephants', 'rat', 'rats', 'rat', 'rats', 'cat', 'cats']\n", "5\n", "10\n" ] } ], "source": [ "# Let's create a new base RDD to work from\n", "wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']\n", "wordsRDD = sc.parallelize(wordsList, 4)\n", "\n", "# Use map\n", "singularAndPluralWordsRDDMap = wordsRDD.map(lambda x: (x, x + 's'))\n", "# Use flatMap\n", "singularAndPluralWordsRDD = wordsRDD.flatMap(lambda x: (x, x + 's'))\n", "\n", "# View the results\n", "print singularAndPluralWordsRDDMap.collect()\n", "print singularAndPluralWordsRDD.collect()\n", "# View the number of elements in the RDD\n", "print singularAndPluralWordsRDDMap.count()\n", "print singularAndPluralWordsRDD.count()" ] }, { "cell_type": "code", "execution_count": 36, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[[1], [1, 2], [1, 2, 3]]\n", "[1, 1, 2, 1, 2, 3]\n" ] } ], "source": [ "simpleRDD = sc.parallelize([2, 3, 4])\n", "print simpleRDD.map(lambda x: range(1, x)).collect()\n", "print simpleRDD.flatMap(lambda x: range(1, x)).collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (6b) `groupByKey` and `reduceByKey` **\n", "#### Let's investigate the additional transformations: [groupByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.groupByKey) and [reduceByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey).\n", "#### Both of these transformations operate on pair RDDs. A pair RDD is an RDD where each element is a pair tuple (key, value). For example, `sc.parallelize([('a', 1), ('a', 2), ('b', 1)])` would create a pair RDD where the keys are 'a', 'a', 'b' and the values are 1, 2, 1.\n", "#### The `reduceByKey()` transformation gathers together pairs that have the same key and applies a function to two associated values at a time. `reduceByKey()` operates by applying the function first within each partition on a per-key basis and then across the partitions.\n", "#### While both the `groupByKey()` and `reduceByKey()` transformations can often be used to solve the same problem and will produce the same answer, the `reduceByKey()` transformation works much better for large distributed datasets. This is because Spark knows it can combine output with a common key on each partition *before* shuffling (redistributing) the data across nodes. Only use `groupByKey()` if the operation would not benefit from reducing the data before the shuffle occurs.\n", "#### Look at the diagram below to understand how `reduceByKey` works. Notice how pairs on the same machine with the same key are combined (by using the lamdba function passed into reduceByKey) before the data is shuffled. Then the lamdba function is called again to reduce all the values from each partition to produce one final result.\n", "![reduceByKey() figure](http://spark-mooc.github.io/web-assets/images/reduce_by.png)\n", "#### On the other hand, when using the `groupByKey()` transformation - all the key-value pairs are shuffled around, causing a lot of unnecessary data to being transferred over the network.\n", "#### To determine which machine to shuffle a pair to, Spark calls a partitioning function on the key of the pair. Spark spills data to disk when there is more data shuffled onto a single executor machine than can fit in memory. However, it flushes out the data to disk one key at a time, so if a single key has more key-value pairs than can fit in memory an out of memory exception occurs. This will be more gracefully handled in a later release of Spark so that the job can still proceed, but should still be avoided. When Spark needs to spill to disk, performance is severely impacted.\n", "![groupByKey() figure](http://spark-mooc.github.io/web-assets/images/group_by.png)\n", "#### As your dataset grows, the difference in the amount of data that needs to be shuffled, between the `reduceByKey()` and `groupByKey()` transformations, becomes increasingly exaggerated.\n", "#### Here are more transformations to prefer over `groupByKey()`:\n", " + #### [combineByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.combineByKey) can be used when you are combining elements but your return type differs from your input value type.\n", " + #### [foldByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.foldByKey) merges the values for each key using an associative function and a neutral \"zero value\".\n", "#### Now let's go through a simple `groupByKey()` and `reduceByKey()` example." ] }, { "cell_type": "code", "execution_count": 37, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[('a', [1, 2]), ('b', [1])]\n", "[('a', 3), ('b', 1)]\n", "[('a', 3), ('b', 1)]\n", "[('a', 3), ('b', 1)]\n" ] } ], "source": [ "pairRDD = sc.parallelize([('a', 1), ('a', 2), ('b', 1)])\n", "# mapValues only used to improve format for printing\n", "print pairRDD.groupByKey().mapValues(lambda x: list(x)).collect()\n", "\n", "# Different ways to sum by key\n", "print pairRDD.groupByKey().map(lambda (k, v): (k, sum(v))).collect()\n", "# Using mapValues, which is recommended when they key doesn't change\n", "print pairRDD.groupByKey().mapValues(lambda x: sum(x)).collect()\n", "# reduceByKey is more efficient / scalable\n", "print pairRDD.reduceByKey(add).collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (6c) Advanced transformations ** [Optional]\n", "#### Let's investigate the advanced transformations: [mapPartitions()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.mapPartitions) and [mapPartitionsWithIndex()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.mapPartitionsWithIndex)\n", "#### The `mapPartitions()` transformation uses a function that takes in an iterator (to the items in that specific partition) and returns an iterator. The function is applied on a partition by partition basis.\n", "#### The `mapPartitionsWithIndex()` transformation uses a function that takes in a partition index (think of this like the partition number) and an iterator (to the items in that specific partition). For every partition (index, iterator) pair, the function returns a tuple of the same partition index number and an iterator of the transformed items in that partition." ] }, { "cell_type": "code", "execution_count": 38, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "['cat', 'elephant', 'rat', 'rat', 'cat']\n", "['cat', 'elephant', 'rat', 'rat,cat']\n" ] } ], "source": [ "# mapPartitions takes a function that takes an iterator and returns an iterator\n", "print wordsRDD.collect()\n", "itemsRDD = wordsRDD.mapPartitions(lambda iterator: [','.join(iterator)])\n", "print itemsRDD.collect()" ] }, { "cell_type": "code", "execution_count": 39, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[(0, ['cat']), (1, ['elephant']), (2, ['rat']), (3, ['rat', 'cat'])]\n", "[0, ['cat'], 1, ['elephant'], 2, ['rat'], 3, ['rat', 'cat']]\n" ] } ], "source": [ "itemsByPartRDD = wordsRDD.mapPartitionsWithIndex(lambda index, iterator: [(index, list(iterator))])\n", "# We can see that three of the (partitions) workers have one element and the fourth worker has two\n", "# elements, although things may not bode well for the rat...\n", "print itemsByPartRDD.collect()\n", "# Rerun without returning a list (acts more like flatMap)\n", "itemsByPartRDD = wordsRDD.mapPartitionsWithIndex(lambda index, iterator: (index, list(iterator)))\n", "print itemsByPartRDD.collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### ** Part 7: Caching RDDs and storage options **" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (7a) Caching RDDs **\n", "#### For efficiency Spark keeps your RDDs in memory. By keeping the contents in memory, Spark can quickly access the data. However, memory is limited, so if you try to keep too many RDDs in memory, Spark will automatically delete RDDs from memory to make space for new RDDs. If you later refer to one of the RDDs, Spark will automatically recreate the RDD for you, but that takes time.\n", "#### So, if you plan to use an RDD more than once, then you should tell Spark to cache that RDD. You can use the `cache()` operation to keep the RDD in memory. However, if you cache too many RDDs and Spark runs out of memory, it will delete the least recently used (LRU) RDD first. Again, the RDD will be automatically recreated when accessed.\n", "#### You can check if an RDD is cached by using the `is_cached` attribute, and you can see your cached RDD in the \"Storage\" section of the Spark web UI. If you click on the RDD's name, you can see more information about where the RDD is stored." ] }, { "cell_type": "code", "execution_count": 40, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True\n" ] } ], "source": [ "# Name the RDD\n", "filteredRDD.setName('My Filtered RDD')\n", "# Cache the RDD\n", "filteredRDD.cache()\n", "# Is it cached\n", "print filteredRDD.is_cached" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (7b) Unpersist and storage options **\n", "#### Spark automatically manages the RDDs cached in memory and will save them to disk if it runs out of memory. For efficiency, once you are finished using an RDD, you can optionally tell Spark to stop caching it in memory by using the RDD's `unpersist()` method to inform Spark that you no longer need the RDD in memory.\n", "#### You can see the set of transformations that were applied to create an RDD by using the `toDebugString()` method, which will provide storage information, and you can directly query the current storage information for an RDD using the `getStorageLevel()` operation.\n", "#### ** Advanced: ** Spark provides many more options for managing how RDDs are stored in memory or even saved to disk. You can explore the API for RDD's [persist()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.persist) operation using Python's [help()](https://docs.python.org/2/library/functions.html?highlight=help#help) command. The `persist()` operation, optionally, takes a pySpark [StorageLevel](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.StorageLevel) object." ] }, { "cell_type": "code", "execution_count": 41, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "(8) My Filtered RDD PythonRDD[6] at collect at :23 [Memory Serialized 1x Replicated]\n", " | ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:392 [Memory Serialized 1x Replicated]\n" ] } ], "source": [ "# Note that toDebugString also provides storage information\n", "print filteredRDD.toDebugString()" ] }, { "cell_type": "code", "execution_count": 42, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Serialized 1x Replicated\n", "Memory Serialized 1x Replicated\n" ] } ], "source": [ "# If we are done with the RDD we can unpersist it so that its memory can be reclaimed\n", "filteredRDD.unpersist()\n", "# Storage level for a non cached RDD\n", "print filteredRDD.getStorageLevel()\n", "filteredRDD.cache()\n", "# Storage level for a cached RDD\n", "print filteredRDD.getStorageLevel()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### ** Part 8: Debugging Spark applications and lazy evaluation **" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** How Python is Executed in Spark **\n", "#### Internally, Spark executes using a Java Virtual Machine (JVM). pySpark runs Python code in a JVM using [Py4J](http://py4j.sourceforge.net). Py4J enables Python programs running in a Python interpreter to dynamically access Java objects in a Java Virtual Machine. Methods are called as if the Java objects resided in the Python interpreter and Java collections can be accessed through standard Python collection methods. Py4J also enables Java programs to call back Python objects.\n", "#### Because pySpark uses Py4J, coding errors often result in a complicated, confusing stack trace that can be difficult to understand. In the following section, we'll explore how to understand stack traces." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (8a) Challenges with lazy evaluation using transformations and actions **\n", "#### Spark's use of lazy evaluation can make debugging more difficult because code is not always executed immediately. To see an example of how this can happen, let's first define a broken filter function.\n", "#### Next we perform a `filter()` operation using the broken filtering function. No error will occur at this point due to Spark's use of lazy evaluation.\n", "#### The `filter()` method will not be executed *until* an action operation is invoked on the RDD. We will perform an action by using the `collect()` method to return a list that contains all of the elements in this RDD." ] }, { "cell_type": "code", "execution_count": 43, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def brokenTen(value):\n", " \"\"\"Incorrect implementation of the ten function.\n", "\n", " Note:\n", " The `if` statement checks an undefined variable `val` instead of `value`.\n", "\n", " Args:\n", " value (int): A number.\n", "\n", " Returns:\n", " bool: Whether `value` is less than ten.\n", "\n", " Raises:\n", " NameError: The function references `val`, which is not available in the local or global\n", " namespace, so a `NameError` is raised.\n", " \"\"\"\n", " if (val < 10):\n", " return True\n", " else:\n", " return False\n", "\n", "brokenRDD = subRDD.filter(brokenTen)" ] }, { "cell_type": "code", "execution_count": 44, "metadata": { "collapsed": false }, "outputs": [ { "ename": "Py4JJavaError", "evalue": "An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 48.0 failed 1 times, most recent failure: Lost task 0.0 in stage 48.0 (TID 221, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/worker.py\", line 101, in main\n process()\n File \"/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/worker.py\", line 96, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/serializers.py\", line 236, in dump_stream\n vs = list(itertools.islice(iterator, batch))\n File \"\", line 17, in brokenTen\nNameError: global name 'val' is not defined\n\n\tat org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)\n\tat org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:176)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:244)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:64)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)\n\tat scala.Option.foreach(Option.scala:236)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n", "output_type": "error", "traceback": [ "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[1;31mPy4JJavaError\u001b[0m Traceback (most recent call last)", "\u001b[1;32m\u001b[0m in \u001b[0;36m\u001b[1;34m()\u001b[0m\n\u001b[0;32m 1\u001b[0m \u001b[1;31m# Now we'll see the error\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m----> 2\u001b[1;33m \u001b[0mbrokenRDD\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcollect\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m", "\u001b[1;32m/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mcollect\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m 711\u001b[0m \"\"\"\n\u001b[0;32m 712\u001b[0m \u001b[1;32mwith\u001b[0m \u001b[0mSCCallSiteSync\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcontext\u001b[0m\u001b[1;33m)\u001b[0m \u001b[1;32mas\u001b[0m \u001b[0mcss\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 713\u001b[1;33m \u001b[0mport\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mctx\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_jvm\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mPythonRDD\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcollectAndServe\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_jrdd\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mrdd\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 714\u001b[0m \u001b[1;32mreturn\u001b[0m \u001b[0mlist\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0m_load_from_socket\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mport\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_jrdd_deserializer\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 715\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[1;34m(self, *args)\u001b[0m\n\u001b[0;32m 536\u001b[0m \u001b[0manswer\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 537\u001b[0m return_value = get_return_value(answer, self.gateway_client,\n\u001b[1;32m--> 538\u001b[1;33m self.target_id, self.name)\n\u001b[0m\u001b[0;32m 539\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 540\u001b[0m \u001b[1;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[1;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;32m/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[1;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[0;32m 298\u001b[0m raise Py4JJavaError(\n\u001b[0;32m 299\u001b[0m \u001b[1;34m'An error occurred while calling {0}{1}{2}.\\n'\u001b[0m\u001b[1;33m.\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 300\u001b[1;33m format(target_id, '.', name), value)\n\u001b[0m\u001b[0;32m 301\u001b[0m \u001b[1;32melse\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 302\u001b[0m raise Py4JError(\n", "\u001b[1;31mPy4JJavaError\u001b[0m: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 48.0 failed 1 times, most recent failure: Lost task 0.0 in stage 48.0 (TID 221, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/worker.py\", line 101, in main\n process()\n File \"/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/worker.py\", line 96, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/serializers.py\", line 236, in dump_stream\n vs = list(itertools.islice(iterator, batch))\n File \"\", line 17, in brokenTen\nNameError: global name 'val' is not defined\n\n\tat org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)\n\tat org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:176)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:244)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:64)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)\n\tat scala.Option.foreach(Option.scala:236)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n" ] } ], "source": [ "# Now we'll see the error\n", "brokenRDD.collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (8b) Finding the bug **\n", "#### When the `filter()` method is executed, Spark evaluates the RDD by executing the `parallelize()` and `filter()` methods. Since our `filter()` method has an error in the filtering function `brokenTen()`, an error occurs.\n", "#### Scroll through the output \"Py4JJavaError Traceback (most recent call last)\" part of the cell and first you will see that the line that generated the error is the `collect()` method line. There is *nothing wrong with this line*. However, it is an action and that caused other methods to be executed. Continue scrolling through the Traceback and you will see the following error line:\n", " NameError: global name 'val' is not defined\n", "#### Looking at this error line, we can see that we used the wrong variable name in our filtering function `brokenTen()`." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (8c) Moving toward expert style **\n", "#### As you are learning Spark, I recommend that you write your code in the form:\n", " RDD.transformation1()\n", " RDD.action1()\n", " RDD.transformation2()\n", " RDD.action2()\n", "#### Using this style will make debugging your code much easier as it makes errors easier to localize - errors in your transformations will occur when the next action is executed.\n", "#### Once you become more experienced with Spark, you can write your code with the form:\n", " RDD.transformation1().transformation2().action()\n", "#### We can also use `lambda()` functions instead of separately defined functions when their use improves readability and conciseness." ] }, { "cell_type": "code", "execution_count": 45, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]" ] }, "execution_count": 45, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Cleaner code through lambda use\n", "subRDD.filter(lambda x: x < 10).collect()" ] }, { "cell_type": "code", "execution_count": 46, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]" ] }, "execution_count": 46, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Even better by moving our chain of operators into a single line.\n", "sc.parallelize(data).map(lambda y: y - 1).filter(lambda x: x < 10).collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ** (8d) Readability and code style **\n", "#### To make the expert coding style more readable, enclose the statement in parentheses and put each method, transformation, or action on a separate line." ] }, { "cell_type": "code", "execution_count": 47, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]" ] }, "execution_count": 47, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Final version\n", "(sc\n", " .parallelize(data)\n", " .map(lambda y: y - 1)\n", " .filter(lambda x: x < 10)\n", " .collect())" ] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.6" } }, "nbformat": 4, "nbformat_minor": 0 }