{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# 1.1 RDD Basics\n", "\n", "This notebook demonstates the basics of RDDs (Resilient Distributed Datasets) - the low level programming abstractions of Apache Spark.\n", "\n", "The RDDs can be thought of as immutable, possibly very long lists of elements, which can be transformed to other RDDs by operations like `map` or `filter` or aggregated with operations such as `reduce`.\n", "\n", "Spark takes care of efficiently running these operations in the distributed manner on a cluster. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`sc`, the `SparkContext` instance, is the main entry point for RDD operations.\n", "\n", "Let's create a simple RDD and look at some basic operations:\n" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['The numbers: 10 20 30 are chosen.', 'hello 11 there.', \"it's 12 pm.\"]" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# The most direct way to create an `RDD` is to convert an existing python `list` to \n", "# an `RDD` using `parallelize`. After all RDDs are kind of lists.\n", "\n", "textRDD = sc.parallelize([\n", " \"The numbers: 10 20 30 are chosen.\", \n", " \"hello 11 there.\",\n", " \"it's 12 pm.\"\n", "])\n", "\n", "# We can also do the opposite, that is convert an `RDD` to a `list` using `collect`.\n", "# Please be mindful however of the size of the RDD: the resulting python `list` must fit into \n", "# the memory of your (driver) machine and the whole point of using Spark is to deal with data sets \n", "# that are larger than that!\n", "\n", "# But is a useful function for small RDDs.\n", "\n", "textRDD.collect()" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "RDD size: 3\n", "RDD first element: The numbers: 10 20 30 are chosen.\n", "RDD first 2 elements: ['The numbers: 10 20 30 are chosen.', 'hello 11 there.']\n" ] } ], "source": [ "# Here are some other basic functions:\n", "\n", "# `count` - returns the number of elements in the `RDD`\n", "\n", "print(\"RDD size: %s\" % textRDD.count())\n", "\n", "\n", "# `first` - gets the first element \n", "\n", "print(\"RDD first element: %s\" % textRDD.first())\n", "\n", "# `take` - takes the n first elements as a python `list` - useful to preview an RDD of any size\n", "print(\"RDD first 2 elements: %s\" % textRDD.take(2))\n", "\n", "# `foreach` apply a function to each element of an RDD \n", "# The code below prints all the elements. \n", "# The output however goes to the driver's `stdout` - check the console window or `My Cluster/View Driver Logs`\n", "\n", "from __future__ import print_function\n", "textRDD.foreach(print)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can also save the entire `RDD` to a disk:" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": true }, "outputs": [], "source": [ "%%sh \n", "\n", "# Spark by default will fail if the output file exits, so we need to make to remove it first.\n", "# (It might have been created by the previous run of this notebook)\n", "\n", "rm -rf output/sample.txt" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "# save the RDD to a text file\n", "\n", "textRDD.saveAsTextFile('output/sample.txt')" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "total 24\n", "-rw-r--r-- 1 szu004 staff 0B 10 Jul 10:45 _SUCCESS\n", "-rw-r--r-- 1 szu004 staff 0B 10 Jul 10:45 part-00000\n", "-rw-r--r-- 1 szu004 staff 34B 10 Jul 10:45 part-00001\n", "-rw-r--r-- 1 szu004 staff 16B 10 Jul 10:45 part-00002\n", "-rw-r--r-- 1 szu004 staff 12B 10 Jul 10:45 part-00003\n", "\n", "Content:\n", "The numbers: 10 20 30 are chosen.\n", "hello 11 there.\n", "it's 12 pm.\n" ] } ], "source": [ "%%sh \n", "\n", "# Let's preview the results.\n", "\n", "ls -lh output/sample.txt\n", "\n", "\n", "# The output is a directory with the actual content partitioned amongst multiple `part-*` files.\n", "# The simplistic explanation for this is that the RDD itself is partitioned amongst the many \n", "# executors (e.g. nodes in the cluster) and each executor writes its own partition.\n", "# The `_SUCCESS` indicates that the complete output has been successfully produced. \n", "\n", "\n", "# Now let's display the actual content:\n", "\n", "echo \n", "echo \"Content:\"\n", "cat output/sample.txt/part-*" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's try to solve the follwowing simple problem using the Spark RDD API:\n", "\n", "*Find the sum of all the numbers in the given text.*\n", "*For example in the text:*\n", "\n", " The numbers: 10 20 30 are chosen. \n", " hello 11 there.\n", " it's 12 pm.\n", " \n", "*we wouild identify: `10 20 30 11` and `12` as numbers a produce the sum of: `83`.*" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We will first go through all the transformations step by step and then combine them together.\n", "\n", "We have already loaded the text into the `textRDD`:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['The numbers: 10 20 30 are chosen.', 'hello 11 there.', \"it's 12 pm.\"]" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# this is our initial RDD - each elements a line of text\n", "textRDD.collect()" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "PythonRDD[10] at RDD at PythonRDD.scala:48" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# in the first step we will split each line into tokens \n", "# and combine all the tokens into a single RDD (list)\n", "\n", "textRDD.flatMap(lambda line:line.split())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Please note the output above - the result of flatMap as another RDD and no actual computation has been performed yet.**\n", "\n", "Rather a formula on how to transform one RDD to another has been created.\n", "It will be executed only when the actual data of the RDD are needed. \n", "\n", "Spark makes the distintions between the *transformations*, which **lazily transform** RDDs and *actions*, which produce the results. \n", "\n", "\n", "Here are some examples of both:\n", "\n", "* transformations: `map`, `flatMap`, `filter`\n", "* operations: `collect`, `count`, `reduce`, `saveAsTextFile`\n", "\n", "For more information on *transformations* vs *operations* please check the [Spark Programming Guide](https://spark.apache.org/docs/latest/programming-guide.html#rdd-operations).\n", "\n", "\n", "To to preview result of each *transformation* step we will call `collect` *action* at the end:" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['The',\n", " 'numbers:',\n", " '10',\n", " '20',\n", " '30',\n", " 'are',\n", " 'chosen.',\n", " 'hello',\n", " '11',\n", " 'there.',\n", " \"it's\",\n", " '12',\n", " 'pm.']" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "textRDD \\\n", " .flatMap(lambda line:line.split()) \\\n", " .collect()" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['10', '20', '30', '11', '12']" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import re\n", "\n", "# a simple function to check if a string is a number\n", "# params: \n", "# s : a string\n", "# returns:\n", "# True if `s` is a number and False otherwise\n", "def is_number(s):\n", " return re.match(r'[0-9]+', s)\n", "\n", "\n", "# in the second step we produce the RDD, which only includes words which are numbers\n", "textRDD.flatMap(lambda line:line.split()) \\\n", " .filter(is_number) \\\n", " .collect()" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[10, 20, 30, 11, 12]" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# in the third step we convert each `number word` to the actual `int` number\n", "\n", "textRDD.flatMap(lambda line:line.split()) \\\n", " .filter(is_number) \\\n", " .map(int) \\\n", " .collect()" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "83" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# in the step last we `reduce` the RDD by adding all it's elements togeher \n", "\n", "import operator\n", "textRDD.flatMap(lambda line:line.split()) \\\n", " .filter(is_number) \\\n", " .map(int) \\\n", " .reduce(operator.add)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We have obtained the expected result of **83**.\n", "\n", "Please note that `reduce` is an *action* (we need the actual data to add them together), so it will triger the execution of all the *transformation* steps before." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can now play around modifying pieces of the code.\n", "\n", "When you are done and you are running off the local machine remember to *close the notebook* with `File/Close and Halt`." ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pyspark" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.11" } }, "nbformat": 4, "nbformat_minor": 1 }