{ "metadata": { "celltoolbar": "Slideshow", "name": "", "signature": "sha256:613d7c0f204aea3f3eab4c0c2ec4158f0faa2cd88f47d82030477d35a94bdf9b" }, "nbformat": 3, "nbformat_minor": 0, "worksheets": [ { "cells": [ { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "![Imgur](http://i.imgur.com/AkRFP3U.png)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "# Introduction to (Py)Spark\n", "\n", "* Tim Hopper \u2013\u00a0[@tdhopper](http://www.twitter.com/tdhopper) \u2013\u00a0Raleigh, NC\n", "* Developer and Anecdote Scientist\n", "* IPython Notebook available at http://tinyurl.com/PySpark\n", "\n", "
![](http://ak.picdn.net/shutterstock/videos/5081630/preview/stock-footage-electrical-spark-and-smoke-between-two-insulated-copper-wires-looped.jpg)
" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "# Introduction to (Py)Spark\n", "\n", "> Apache Spark\u2122 is a fast and general engine for large-scale data processing." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "fragment" } }, "source": [ "> It provides high-level APIs in Java, Scala and Python, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming. \n", "\n", "[source](http://spark.apache.org/docs/latest/)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "# Introduction to (Py)Spark\n", "\n", "* Originally developed at Berkeley's AMPLab in 2009.\n", "* BSD-ed in 2010.\n", "* Donated to Apache in 2013.\n", "* Apache Top-Level Project in 2014.\n", "* 1.0.0 released in May 2014.\n", "* Currently on 1.2.0 (released December 2014).\n", "* Backed by Databricks (databricks.com)." ] }, { "cell_type": "heading", "level": 2, "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Example" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "-" } }, "source": [ "Sum the squares of the integers from 1 to 10." ] }, { "cell_type": "code", "collapsed": false, "input": [ "sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).map(lambda x: x**2).sum()" ], "language": "python", "metadata": { "slideshow": { "slide_type": "-" } }, "outputs": [ { "metadata": {}, "output_type": "pyout", "prompt_number": 1, "text": [ "385" ] } ], "prompt_number": 1 }, { "cell_type": "heading", "level": 2, "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Example" ] }, { "cell_type": "code", "collapsed": false, "input": [ "try:\n", " import nltk.corpus as corpus\n", " stopwords = set(corpus.stopwords.words())\n", "except ImportError: \n", " stopwords = []" ], "language": "python", "metadata": { "slideshow": { "slide_type": "skip" } }, "outputs": [], "prompt_number": 2 }, { "cell_type": "code", "collapsed": false, "input": [ "# Most common words in \"THE DEVELOPMENT OF EMBROIDERY IN AMERICA\"\n", "\n", "rdd = sc.textFile(\"example.txt\")\n", "rdd \\\n", " .flatMap(lambda line: line.split()) \\\n", " .map(lambda word: word.strip().lower()) \\\n", " .filter(lambda word: word not in stopwords) \\\n", " .map(lambda word: (word, 1)) \\\n", " .reduceByKey(lambda a, b: a + b) \\\n", " .map(lambda (key, cnt): (cnt, key)) \\\n", " .top(10)\n" ], "language": "python", "metadata": { "slideshow": { "slide_type": "-" } }, "outputs": [ { "metadata": {}, "output_type": "pyout", "prompt_number": 3, "text": [ "[(117, u'upon'),\n", " (105, u'embroidery'),\n", " (103, u'one'),\n", " (101, u'art'),\n", " (94, u'work'),\n", " (86, u'made'),\n", " (84, u'new'),\n", " (82, u'project'),\n", " (70, u'embroidered'),\n", " (62, u'worked')]" ] } ], "prompt_number": 3 }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Example\n", "\n", "These examples is running locally on my laptop.\n", "\n", "The data file (example.txt) is loaded into a _local_ Resilient Distributed Dataset (__RDD__).\n", "\n", "If my Spark Context (`sc`) were created on a Spark cluster, the data would have be _partitioned_ across the worker nodes." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Example\n", "\n", "(Py)spark evaluates expressions lazily. \"The transformations are only computed when an action requires a result to be returned to the driver program.\" [source](http://spark.apache.org/docs/1.2.0/programming-guide.html#rdd-operations)" ] }, { "cell_type": "code", "collapsed": false, "input": [ "%%time\n", "rdd = sc.parallelize(xrange(10**8)).map(lambda x: float(x) ** 2)" ], "language": "python", "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "CPU times: user 856 \u00b5s, sys: 654 \u00b5s, total: 1.51 ms\n", "Wall time: 2.33 ms\n" ] } ], "prompt_number": 8 }, { "cell_type": "code", "collapsed": false, "input": [ "%%time \n", "_ = rdd.count()" ], "language": "python", "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "CPU times: user 4.85 ms, sys: 1.98 ms, total: 6.83 ms\n", "Wall time: 8.87 s\n" ] } ], "prompt_number": 9 }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Spark vs Pyspark?\n", "\n", "Spark is written in Scala. The 'native' API is in Scala.\n", "\n", "Pyspark is a very lightweight wrapper around the native API. (You can see its implementation [here](https://github.com/apache/spark/tree/master/python/pyspark).)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "![](http://i.imgur.com/YlI8AqEl.png)\n", "\n", "[source](https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Spark vs Pyspark?\n", "\n", "__Key difference:__\n", "\n", "* Python (unlike Scala) is dynamically typed. (RDDs can hold objects of multiple types!)\n", "* Pyspark sometimes lags behind Spark in feature releases.\n", "\n", "(There's also a Java API in case you really hate life.)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Spark vs Pyspark?\n", "\n", "It must be slower, right?" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "fragment" } }, "source": [ "> Spark\u2019s core developers have worked extensively to bridge the performance gap between JVM languages and Python. \n", "\n", "> In particular, PySpark can now run on PyPy to leverage the just-in-time compiler. (Up to 50x speedup) \n", "\n", "> The way Python processes communicate with the main Spark JVM programs have also been redesigned to enable worker reuse.\n", "\n", "[source](http://radar.oreilly.com/2015/02/recent-performance-improvements-in-apache-spark.html)\n", "\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## How is this better than Hadoop?" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "__Major difference:__\n", "\n", "Spark keep data in worker memory while tends to keep data on disk.\n", "\n", "According to the Spark webpage it can run \"100x faster than Hadoop by exploiting in memory computing and other optimizations \"\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## How is this better than Hadoop?" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "-" } }, "source": [ "> ### Spark officially sets a new record in large-scale sorting\n", "\n", "> Using Spark on 206 EC2 machines, we sorted 100 TB of data on disk in 23 minutes. In comparison, the previous world record set by Hadoop MapReduce used 2100 machines and took 72 minutes. This means that Spark sorted the same data 3X faster using 10X fewer machines. \n", "\n", "[source](http://databricks.com/blog/2014/11/05/spark-officially-sets-a-new-record-in-large-scale-sorting.html)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## How is this better than Hadoop?\n", "\n", "__Also:__\n", "\n", "__RDD__ is a key development: RDD's provide \"immutable resilient distributed collection of records\"." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "fragment" } }, "source": [ "> Unlike existing storage\n", "abstractions for clusters, which require data replication\n", "for fault tolerance, RDDs offer an API based on coarsegrained\n", "transformations that lets them recover data efficiently using lineage.\n", "\n", "See: [Resilient Distributed Datasets: A Fault-Tolerant Abstraction for\n", "In-Memory Cluster Computing](http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf), [Spark: Cluster Computing with Working Sets](https://amplab.cs.berkeley.edu/wp-content/uploads/2011/06/Spark-Cluster-Computing-with-Working-Sets.pdf), [Spark Research](https://spark.apache.org/research.html)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## How is this better than Hadoop?\n", "\n", "__Also:__\n", " \n", "Spark provides 80+ high(er)-level, functional-style operators beyond simple \"map\" and \"reduce\". (Not even to mention high-level tools Spark Streaming, Spark SQL, MLib, and GraphX.)\n", "\n", "For example:\n", "\n", "* count\n", "* countApprox\n", "* flatMap\n", "* filter\n", "* flatMap\n", "* groupBy\n", "* map\n", "* reduce\n", "* reduceByKey\n", "* sample\n", "* sortBy\n", "* union" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## How is this better than Hadoop?\n", "\n", "__Native Python Code:__\n", "\n", "* Unlike Hive/Pig\n", "\n", "__No Java:__\n", "\n", "* Unlike native Hadoop\n", "\n", "__High(er)-level operators:__\n", "\n", "* Unlike mrjob\n", "\n", "__Functional style:__\n", "\n", "> Spark imitates Scala\u2019s collections API and functional style, which is a boon to Java and Scala developers, but also somewhat familiar to developers coming from Python. [source](http://blog.cloudera.com/blog/2014/03/why-apache-spark-is-a-crossover-hit-for-data-scientists/)\n", "\n", "\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "[pyspark-pictures](http://nbviewer.ipython.org/github/jkthompson/pyspark-pictures/blob/master/pyspark-pictures.ipynb) is a handy help for the Spark API:\n", "\n", "```\n", "rdd1.cartesian(rdd2)\n", "```\n", "\n", "![](http://nbviewer.ipython.org/github/jkthompson/pyspark-pictures/blob/master/images/pyspark-page17.svg)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Installing (Py)Spark locally\n", "\n", "For Mac users using [Homebrew](http://brew.sh/):\n", "\n", "```\n", "$ brew install apache-spark\n", "```\n", "\n", "Install [Java SDK](http://www.oracle.com/technetwork/java/javase/downloads/index.html)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Launching the Pyspark REPL\n", "\n", "\n", "```\n", "$ IPYTHON=1 pyspark\n", "```" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "fragment" } }, "source": [ "You should see:\n", "\n", "```\n", "Welcome to\n", " ____ __\n", " / __/__ ___ _____/ /__\n", " _\\ \\/ _ \\/ _ `/ __/ '_/\n", " /__ / .__/\\_,_/_/ /_/\\_\\ version 1.2.1\n", " /_/\n", "\n", "Using Python version 2.7.6 (default, Sep 9 2014 15:04:36)\n", "SparkContext available as sc.\n", ">>>\n", "```" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Launching the Pyspark in an IPython notebook\n", "\n", "```\n", "$ IPYTHON_OPTS=\"notebook --matplotlib inline\" pyspark\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This creates a special IPython notebook that is initialized with a SparkContext object called `sc`:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "sc" ], "language": "python", "metadata": {}, "outputs": [ { "metadata": {}, "output_type": "pyout", "prompt_number": 9, "text": [ "" ] } ], "prompt_number": 9 }, { "cell_type": "markdown", "metadata": {}, "source": [ "(You can also create [IPython profiles](http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/) [to automate some of this](http://ramhiser.com/2015/02/01/configuring-ipython-notebook-support-for-pyspark/).)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "These commands will start Pyspark in __local__ mode. As opposed to __cluster__ mode." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The exact same code can be run in local and cluster modes! It just depends on how you initialize your Spark session." ] }, { "cell_type": "heading", "level": 1, "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Getting Data" ] }, { "cell_type": "code", "collapsed": false, "input": [ "# Load a Python iterable into an RDD\n", "\n", "sc.parallelize(range(10))" ], "language": "python", "metadata": { "slideshow": { "slide_type": "-" } }, "outputs": [ { "metadata": {}, "output_type": "pyout", "prompt_number": 10, "text": [ "ParallelCollectionRDD[16] at parallelize at PythonRDD.scala:364" ] } ], "prompt_number": 10 }, { "cell_type": "code", "collapsed": false, "input": [ "# Load a text file\n", "\n", "sc.textFile(\"example.txt\") # Each line is a separate element in the RDD" ], "language": "python", "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "metadata": {}, "output_type": "pyout", "prompt_number": 11, "text": [ "example.txt MappedRDD[18] at textFile at NativeMethodAccessorImpl.java:-2" ] } ], "prompt_number": 11 }, { "cell_type": "code", "collapsed": false, "input": [ "# Load text files\n", "\n", "sc.textFile(\"example.txt,example2.txt\").collect()[-1001:-991]" ], "language": "python", "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "metadata": {}, "output_type": "pyout", "prompt_number": 12, "text": [ "[u'least surpass the work sent by the Decorative Art societies of most of',\n", " u'our American cities.',\n", " u'',\n", " u'',\n", " u'',\n", " u'',\n", " u'CHAPTER VII -- AMERICAN TAPESTRY',\n", " u'',\n", " u'',\n", " u'The Society of Decorative Art, has proved itself a means for the']" ] } ], "prompt_number": 12 }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "fragment" } }, "source": [ "These can be used to load text files from Amazon S3." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "`SparkContext.wholeTextFile`...\n", "\n", "> ...lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`SparkContext.newAPIHadoopRDD`\n", "\n", "> PySpark can also read any Hadoop InputFormat or write any Hadoop OutputFormat, for both \u2018new\u2019 and \u2018old\u2019 Hadoop MapReduce APIs. \n", "\n", "For example, [Cassandra](https://github.com/apache/spark/blob/master/examples/src/main/python/cassandra_inputformat.py)." ] }, { "cell_type": "heading", "level": 1, "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Saving Data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`rdd.collect()` converts a RDD object into a Python list on the host machine.\n", "\n", "`rdd.saveAsTextFile()` saves an RDD as a string. See also `rdd.saveAsPickleFile()`.\n", "\n", "`rdd.saveAsNewAPIHadoopDataset()` saves an RDD object to a Hadoop data source (e.g. HDFS, [Cassandra](https://github.com/Parsely/pyspark-cassandra))." ] }, { "cell_type": "heading", "level": 1, "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Manipulating Data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Sort last three presidents by last name" ] }, { "cell_type": "code", "collapsed": false, "input": [ "rdd = sc.parallelize([\"Barack Hussein Obama\", \"George Walker Bush\", \"William Jefferson Clinton\"])\n", "\n", "rdd.sortBy(keyfunc=lambda k: k.split(\" \")[-1]).collect()" ], "language": "python", "metadata": {}, "outputs": [ { "metadata": {}, "output_type": "pyout", "prompt_number": 70, "text": [ "['George Walker Bush', 'William Jefferson Clinton', 'Barack Hussein Obama']" ] } ], "prompt_number": 70 }, { "cell_type": "heading", "level": 1, "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Manipulating Data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Join Datasets" ] }, { "cell_type": "code", "collapsed": false, "input": [ "rdd1 = sc.parallelize([(\"a\", 1), (\"b\", 2), (\"c\", 3)])\n", "rdd2 = sc.parallelize([(\"a\", 6), (\"b\", 7), (\"b\", 8), (\"d\", 9)])" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 73 }, { "cell_type": "code", "collapsed": false, "input": [ "rdd1.join(rdd2).collect()" ], "language": "python", "metadata": {}, "outputs": [ { "metadata": {}, "output_type": "pyout", "prompt_number": 74, "text": [ "[('a', (1, 6)), ('b', (2, 7)), ('b', (2, 8))]" ] } ], "prompt_number": 74 }, { "cell_type": "code", "collapsed": false, "input": [ "rdd1.fullOuterJoin(rdd2).collect()" ], "language": "python", "metadata": {}, "outputs": [ { "metadata": {}, "output_type": "pyout", "prompt_number": 75, "text": [ "[('a', (1, 6)),\n", " ('c', (3, None)),\n", " ('b', (2, 7)),\n", " ('b', (2, 8)),\n", " ('d', (None, 9))]" ] } ], "prompt_number": 75 }, { "cell_type": "heading", "level": 1, "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Manipulating Data" ] }, { "cell_type": "code", "collapsed": false, "input": [ "from pyspark.sql import SQLContext, Row, StructField, StructType, FloatType, Row\n", "import pandas as pd" ], "language": "python", "metadata": { "slideshow": { "slide_type": "skip" } }, "outputs": [], "prompt_number": 13 }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's load in the [Wake County Real Estate Data](http://www.wakegov.com/tax/realestate/redatafile/pages/default.aspx)." ] }, { "cell_type": "code", "collapsed": false, "input": [ "raw_real_estate = sc.textFile(\"all.txt\")\n", "print raw_real_estate.take(1)[0][:250]" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "FISHER, GEORGE NORMAN 2720 BEDFORD AVE RALEIGH NC 27607-7114 000000101 01 001506 WAKE FOREST RD RA 01 0\n" ] } ], "prompt_number": 80 }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "fragment" } }, "source": [ "" ] }, { "cell_type": "heading", "level": 1, "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Manipulating Data" ] }, { "cell_type": "code", "collapsed": false, "input": [ "wake_county_real_estate = raw_real_estate.map(lambda row: \n", " dict(\n", " owner = row[0:35].strip().title(),\n", " last_name = row[0:35].strip().title().split(\",\")[0],\n", " address = row[70:105].strip().title(), \n", " sale_price = int(row[273:(284)].strip() or -1),\n", " value = int(row[305:(316)].strip() or -1),\n", " use = int(row[653:656].strip() or -1),\n", " heated_area = int(row[471:482].strip() or -1),\n", " year_built = int(row[455:459].strip() or -1),\n", " height = row[509:510].strip(),\n", " ))\n", "\n", "sqlContext = SQLContext(sc)\n", "schemaWake = sqlContext.inferSchema(wake_county_real_estate.map(lambda d: Row(**d))) \\\n", " .registerTempTable(\"wake\")" ], "language": "python", "metadata": { "slideshow": { "slide_type": "-" } }, "outputs": [], "prompt_number": 29 }, { "cell_type": "heading", "level": 1, "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Manipulating Data" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "-" } }, "source": [ "Who owns the most expensive church buildings in Raleigh?" ] }, { "cell_type": "code", "collapsed": false, "input": [ "pd.DataFrame.from_records(\n", " sqlContext.sql(\"\"\"SELECT DISTINCT owner, address, \n", " year_built, value\n", " FROM wake \n", " WHERE value > 4000000 AND \n", " use = 66 AND \n", " owner LIKE '%Church%'\n", " \"\"\").collect(),\n", "columns=[\"Name\",\"Street\",\"Year Built\",\"Value\"])" ], "language": "python", "metadata": {}, "outputs": [ { "html": [ "
\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
NameStreetYear BuiltValue
0 Crosspointe Church At Cary 6911 Carpenter Fire Station Rd 2003 6853333
1 Edenton St Methodist Church 228 W Edenton St 2002 6207300
2 Edenton St Methodist Church 228 W Edenton St 1959 6207300
3 First Presbyterian Church 111 W Morgan St 2013 4617350
4 Providence Baptist Church 6339 Glenwood Ave Ste 451 1972 5540832
5 First Presbyterian Church 111 W Morgan St 1987 4617350
6 Tabernacle Baptist Church Of Ral 8304 Leesville Rd 2001 4600500
\n", "
" ], "metadata": {}, "output_type": "pyout", "prompt_number": 81, "text": [ " Name Street \\\n", "0 Crosspointe Church At Cary 6911 Carpenter Fire Station Rd \n", "1 Edenton St Methodist Church 228 W Edenton St \n", "2 Edenton St Methodist Church 228 W Edenton St \n", "3 First Presbyterian Church 111 W Morgan St \n", "4 Providence Baptist Church 6339 Glenwood Ave Ste 451 \n", "5 First Presbyterian Church 111 W Morgan St \n", "6 Tabernacle Baptist Church Of Ral 8304 Leesville Rd \n", "\n", " Year Built Value \n", "0 2003 6853333 \n", "1 2002 6207300 \n", "2 1959 6207300 \n", "3 2013 4617350 \n", "4 1972 5540832 \n", "5 1987 4617350 \n", "6 2001 4600500 " ] } ], "prompt_number": 81 }, { "cell_type": "heading", "level": 1, "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Manipulating Data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "What is the 43rd richest American's house worth?" ] }, { "cell_type": "code", "collapsed": false, "input": [ "sqlContext.sql(\"\"\"SELECT MAX(value) as price \n", " FROM wake \n", " WHERE owner \n", " LIKE 'Goodnight, James H% & Ann B%'\n", " GROUP BY last_name\n", " \"\"\").collect()[0].price" ], "language": "python", "metadata": {}, "outputs": [ { "metadata": {}, "output_type": "pyout", "prompt_number": 27, "text": [ "3996360" ] } ], "prompt_number": 27 }, { "cell_type": "markdown", "metadata": {}, "source": [ "(We could have done these same queries with the 'native' Spark functional method chaining.)" ] }, { "cell_type": "heading", "level": 1, "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Manipulating Data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Again, if you\u00a0wanted to load terabytes of real estate data from HDFS or S3 (for example), you could run this exact same code on a Spark cluster." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "" ] }, { "cell_type": "heading", "level": 2, "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Data Frames! (Coming Soon in 1.3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Constructs a DataFrame from the users table in Hive.\n", "\n", "```python\n", "users = sc.table(\"users\")\n", "```\n", "\n", "Create a new DataFrame that contains \u201cyoung users\u201d only\n", "\n", "```python\n", "young = users.filter(users.age < 21)\n", "```\n", " \n", "Count the number of young users by gender\n", " \n", "```python\n", "young.groupBy(\"gender\").count()\n", "```" ] }, { "cell_type": "heading", "level": 2, "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Data Frames! (Coming Soon in 1.3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "From JSON files in S3\n", "\n", "```python\n", "logs = sc.load(\"s3n://path/to/data.json\", \"json\")\n", "```\n", "\n", "Join young users with another DataFrame called logs\n", " \n", "```python\n", "young.join(logs, logs.userId == users.userId, \"left_outer\")\n", "```\n", "\n", "\n", "[source](https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html)" ] }, { "cell_type": "heading", "level": 2, "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Data Frames! (Coming Soon in 1.3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "Convert Spark DataFrame to Pandas\n", "\n", "```\n", "pandas_df = young.toPandas()\n", "```\n", "\n", "Create a Spark DataFrame from Pandas\n", "\n", "```\n", "spark_df = context.createDataFrame(pandas_df)\n", "```" ] }, { "cell_type": "heading", "level": 1, "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Machine Learning with (Py)Spark" ] }, { "cell_type": "code", "collapsed": false, "input": [ "from pyspark.mllib.tree import DecisionTree, LabeledPoint\n", "from pyspark.mllib import feature\n", "from pyspark.mllib.stat import Statistics\n", "from random import choice" ], "language": "python", "metadata": { "slideshow": { "slide_type": "skip" } }, "outputs": [], "prompt_number": 91 }, { "cell_type": "markdown", "metadata": {}, "source": [ "Subset to Apartment Buildings and Office Buildings" ] }, { "cell_type": "code", "collapsed": false, "input": [ "subset = wake_county_real_estate.filter(lambda d: \n", " d[\"use\"] in [7, 34])\n", "subset = subset.filter(lambda d: d[\"heated_area\"] > 0 \n", " and d[\"year_built\"] > 1900) \\\n", " .map(lambda d: LabeledPoint(\n", " 1 if d[\"use\"] == 7 else 0, \n", " [d[\"year_built\"], \n", " d[\"heated_area\"]]))\n", " \n", "subset.take(2)" ], "language": "python", "metadata": {}, "outputs": [ { "metadata": {}, "output_type": "pyout", "prompt_number": 85, "text": [ "[LabeledPoint(0.0, [1989.0,4008.0]), LabeledPoint(0.0, [1976.0,5426.0])]" ] } ], "prompt_number": 85 }, { "cell_type": "heading", "level": 1, "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Machine Learning with (Py)Spark" ] }, { "cell_type": "code", "collapsed": false, "input": [ "(trainingData, testData) = subset.randomSplit([0.7, 0.3])" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 86 }, { "cell_type": "code", "collapsed": false, "input": [ "tree = DecisionTree.trainClassifier(trainingData, 2, categoricalFeaturesInfo={})" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 87 }, { "cell_type": "code", "collapsed": false, "input": [ "predictions = tree.predict(testData.map(lambda x: x.features)) \n", "labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)\n", "testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count())\n", "print 'Test Error = ' + str(testErr)" ], "language": "python", "metadata": { "slideshow": { "slide_type": "-" } }, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "Test Error = 0.141468682505\n" ] } ], "prompt_number": 88 }, { "cell_type": "code", "collapsed": false, "input": [ "# This is much better performance than on a random classifier\n", "\n", "labelsAndPredictions = testData.map(lambda lp: (lp.label, choice([0, 1])))\n", "testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count())\n", "print 'Test Error = ' + str(testErr)" ], "language": "python", "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "Test Error = 0.497840172786\n" ] } ], "prompt_number": 92 }, { "cell_type": "heading", "level": 1, "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Machine Learning with (Py)Spark" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Spark 1.2 brought [MLib pipelines to the Scala API](https://databricks.com/blog/2015/01/07/ml-pipelines-a-new-high-level-api-for-mllib.html).\n", "\n", "> __inspired by the scikit-learn project__\n", "\n", "![](https://lifeconfusions.files.wordpress.com/2014/09/giphy-1.gif)\n", "\n", "(For more on [scikit-learn pipelines](http://nbviewer.ipython.org/github/tdhopper/Research-Triangle-Analysts--Intro-to-scikit-learn/blob/master/Intro%20to%20Scikit-Learn.ipynb).)" ] }, { "cell_type": "heading", "level": 1, "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Machine Learning with (Py)Spark" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Scala Pipeline:\n", "\n", "```scala\n", "val tokenizer = new Tokenizer()\n", " .setInputCol(\"text\")\n", " .setOutputCol(\"words\")\n", "val hashingTF = new HashingTF()\n", " .setNumFeatures(1000)\n", " .setInputCol(tokenizer.getOutputCol)\n", " .setOutputCol(\"features\")\n", "val lr = new LogisticRegression()\n", " .setMaxIter(10)\n", " .setRegParam(0.01)\n", "val pipeline = new Pipeline()\n", " .setStages(Array(tokenizer, hashingTF, lr))\n", " \n", "val model = pipeline.fit(trainingDataset)\n", "\n", "```\n", "\n", "(Python Pipelines are coming in 1.2.)" ] }, { "cell_type": "heading", "level": 2, "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Future of Spark" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* [Merging SparkR into core Spark](http://amplab-extras.github.io/SparkR-pkg/)\n", "* [Managed clusters on the DataBricks Cloud](http://databricks.com/product/databricks-cloud)\n", "* MLib Pipelines and other MLib improvements\n", " * [Latent Dirichlet Allocation in MLib](https://issues.apache.org/jira/browse/SPARK-1405?jql=project%20%3D%20SPARK%20AND%20fixVersion%20%3D%201.3.0%20and%20type%20!%3D%20Bug%20and%20component%20!%3D%20%22Build%22%20and%20component%20!%3D%20%22Documentation%22%20order%20by%20priority%2C%20component)\n", "* Data Frames" ] }, { "cell_type": "heading", "level": 2, "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Learning More" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* [DataBricks has Spark workshops](https://databricks.com/services/spark-training)\n", "* [Introduction to Big Data with Apache Spark](https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x#.VOUjIlPF88Y) edX MOOC starts in April\n", "* [Scalable Machine Learning](https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x#.VOUjIlPF88Y) edX MOOC starts in Q2 2015\n", "\n", "O'Reilly also has [Advanced Analytics with Spark](http://shop.oreilly.com/product/0636920035091.do?sortby=publicationDate) and [Learning Spark](http://shop.oreilly.com/product/0636920028512.do). They're Scala-focused, but still valuable." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Thank you!\n", "\n", "### Questions?" ] }, { "cell_type": "code", "collapsed": false, "input": [], "language": "python", "metadata": {}, "outputs": [] } ], "metadata": {} } ] }