{ "metadata": { "name": "", "signature": "sha256:8581d2dfe951591985d0f9eb665f33044c479321d2a0b77699d2d79ad8ef0641" }, "nbformat": 3, "nbformat_minor": 0, "worksheets": [ { "cells": [ { "cell_type": "heading", "level": 1, "metadata": {}, "source": [ "Sampling RDDs" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "[Introduction to Spark with Python, by Jose A. Dianes](https://github.com/jadianes/spark-py-notebooks)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "So far we have introduced RDD creation together with some basic transformations such as `map` and `filter` and some actions such as `count`, `take`, and `collect`. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This notebook will show how to sample RDDs. Regarding transformations, `sample` will be introduced since it will be useful in many statistical learning scenarios. Then we will compare results with the `takeSample` action. " ] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Getting the data and creating the RDD" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this case we will use the complete dataset provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally." ] }, { "cell_type": "code", "collapsed": false, "input": [ "import urllib\n", "f = urllib.urlretrieve (\"http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz\", \"kddcup.data.gz\")" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 1 }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we can use this file to create our RDD." ] }, { "cell_type": "code", "collapsed": false, "input": [ "data_file = \"./kddcup.data.gz\"\n", "raw_data = sc.textFile(data_file)" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 2 }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Sampling RDDs " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In Spark, there are two sampling operations, the transformation `sample` and the action `takeSample`. By using a transformation we can tell Spark to apply successive transformation on a sample of a given RDD. By using an action we retrieve a given sample and we can have it in local memory to be used by any other standard library (e.g. Scikit-learn). " ] }, { "cell_type": "heading", "level": 3, "metadata": {}, "source": [ "The `sample` transformation" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `sample` transformation takes up to three parameters. First is whether the sampling is done with replacement or not. Second is the sample size as a fraction. Finally we can optionally provide a *random seed*. " ] }, { "cell_type": "code", "collapsed": false, "input": [ "raw_data_sample = raw_data.sample(False, 0.1, 1234)\n", "sample_size = raw_data_sample.count()\n", "total_size = raw_data.count()\n", "print \"Sample size is {} of {}\".format(sample_size, total_size)" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "Sample size is 489957 of 4898431\n" ] } ], "prompt_number": 3 }, { "cell_type": "markdown", "metadata": {}, "source": [ "But the power of sampling as a transformation comes from doing it as part of a sequence of additional transformations. This will show more powerful once we start doing aggregations and key-value pairs operations, and will be specially useful when using Spark's machine learning library MLlib. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In the meantime, imagine we want to have an approximation of the proportion of `normal.` interactions in our dataset. We could do this by counting the total number of tags as we did in previous notebooks. However we want a quicker response and we don't need the exact answer but just an approximation. We can do it as follows. " ] }, { "cell_type": "code", "collapsed": false, "input": [ "from time import time\n", "\n", "# transformations to be applied\n", "raw_data_sample_items = raw_data_sample.map(lambda x: x.split(\",\"))\n", "sample_normal_tags = raw_data_sample_items.filter(lambda x: \"normal.\" in x)\n", "\n", "# actions + time\n", "t0 = time()\n", "sample_normal_tags_count = sample_normal_tags.count()\n", "tt = time() - t0\n", "\n", "sample_normal_ratio = sample_normal_tags_count / float(sample_size)\n", "print \"The ratio of 'normal' interactions is {}\".format(round(sample_normal_ratio,3)) \n", "print \"Count done in {} seconds\".format(round(tt,3))" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "The ratio of 'normal' interactions is 0.199\n", "Count done in 44.523 seconds\n" ] } ], "prompt_number": 4 }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's compare this with calculating the ratio without sampling. " ] }, { "cell_type": "code", "collapsed": false, "input": [ "# transformations to be applied\n", "raw_data_items = raw_data.map(lambda x: x.split(\",\"))\n", "normal_tags = raw_data_items.filter(lambda x: \"normal.\" in x)\n", "\n", "# actions + time\n", "t0 = time()\n", "normal_tags_count = normal_tags.count()\n", "tt = time() - t0\n", "\n", "normal_ratio = normal_tags_count / float(total_size)\n", "print \"The ratio of 'normal' interactions is {}\".format(round(normal_ratio,3)) \n", "print \"Count done in {} seconds\".format(round(tt,3))" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "The ratio of 'normal' interactions is 0.199\n", "Count done in 91.09 seconds\n" ] } ], "prompt_number": 5 }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can see a gain in time. The more transformations we apply after the sampling the bigger this gain. This is because without sampling all the transformations are applied to the complete set of data. " ] }, { "cell_type": "heading", "level": 3, "metadata": {}, "source": [ "The `takeSample` action " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If what we need is to grab a sample of raw data from our RDD into local memory in order to be used by other non-Spark libraries, `takeSample` can be used. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The syntax is very similar, but in this case we specify the number of items instead of the sample size as a fraction of the complete data size. " ] }, { "cell_type": "code", "collapsed": false, "input": [ "t0 = time()\n", "raw_data_sample = raw_data.takeSample(False, 400000, 1234)\n", "normal_data_sample = [x.split(\",\") for x in raw_data_sample if \"normal.\" in x]\n", "tt = time() - t0\n", "\n", "normal_sample_size = len(normal_data_sample)\n", "\n", "normal_ratio = normal_sample_size / 400000.0\n", "print \"The ratio of 'normal' interactions is {}\".format(normal_ratio)\n", "print \"Count done in {} seconds\".format(round(tt,3))" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "The ratio of 'normal' interactions is 0.1988025\n", "Count done in 76.166 seconds\n" ] } ], "prompt_number": 6 }, { "cell_type": "markdown", "metadata": {}, "source": [ "The process was very similar as before. We obtained a sample of about 10 percent of the data, and then filter and split. \n", "\n", "However, it took longer, even with a slightly smaller sample. The reason is that Spark just distributed the execution of the sampling process. The filtering and splitting of the results were done locally in a single node. " ] } ], "metadata": {} } ] }