{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* [DataFrame API](http://nbviewer.ipython.org/github/jkthompson/pyspark-pictures/blob/master/pyspark-pictures-dataframes.ipynb)\n", "* [GitHub](https://github.com/jkthompson/pyspark-pictures)\n", "* [related blog post](http://data-frack.blogspot.com/2015/01/visual-mnemonics-for-pyspark-api.html)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Click on a picture to view pyspark docs" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "pyspark version:1.6.1\n", "Ipython version:4.2.0\n" ] } ], "source": [ "import IPython\n", "print(\"pyspark version:\" + str(sc.version))\n", "print(\"Ipython version:\" + str(IPython.__version__))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 2, 3]\n", "[(1, 1), (2, 4), (3, 9)]\n" ] } ], "source": [ "# map\n", "x = sc.parallelize([1,2,3]) # sc = spark context, parallelize creates an RDD from the passed object\n", "y = x.map(lambda x: (x,x**2))\n", "print(x.collect()) # collect copies RDD elements to a list on the driver\n", "print(y.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 2, 3]\n", "[1, 100, 1, 2, 200, 4, 3, 300, 9]\n" ] } ], "source": [ "# flatMap\n", "x = sc.parallelize([1,2,3])\n", "y = x.flatMap(lambda x: (x, 100*x, x**2))\n", "print(x.collect())\n", "print(y.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[[1], [2, 3]]\n", "[[1], [5]]\n" ] } ], "source": [ "# mapPartitions\n", "x = sc.parallelize([1,2,3], 2)\n", "def f(iterator): yield sum(iterator)\n", "y = x.mapPartitions(f)\n", "print(x.glom().collect()) # glom() flattens elements on the same partition\n", "print(y.glom().collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[[1], [2, 3]]\n", "[[(0, 1)], [(1, 5)]]\n" ] } ], "source": [ "# mapPartitionsWithIndex\n", "x = sc.parallelize([1,2,3], 2)\n", "def f(partitionIndex, iterator): yield (partitionIndex,sum(iterator))\n", "y = x.mapPartitionsWithIndex(f)\n", "print(x.glom().collect()) # glom() flattens elements on the same partition\n", "print(y.glom().collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[[1], [2, 3]]\n", "2\n" ] } ], "source": [ "# getNumPartitions\n", "x = sc.parallelize([1,2,3], 2)\n", "y = x.getNumPartitions()\n", "print(x.glom().collect())\n", "print(y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 2, 3]\n", "[1, 3]\n" ] } ], "source": [ "# filter\n", "x = sc.parallelize([1,2,3])\n", "y = x.filter(lambda x: x%2 == 1) # filters out even elements\n", "print(x.collect())\n", "print(y.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "['A', 'A', 'B']\n", "['A', 'B']\n" ] } ], "source": [ "# distinct\n", "x = sc.parallelize(['A','A','B'])\n", "y = x.distinct()\n", "print(x.collect())\n", "print(y.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "x = [0, 1, 2, 3, 4, 5, 6]\n", "sample:0 y = [0, 6]\n", "sample:1 y = [4]\n", "sample:2 y = [1, 2, 3]\n", "sample:3 y = [2, 3, 5, 6]\n", "sample:4 y = [1, 2]\n" ] } ], "source": [ "# sample\n", "x = sc.parallelize(range(7))\n", "ylist = [x.sample(withReplacement=False, fraction=0.5) for i in range(5)] # call 'sample' 5 times\n", "print('x = ' + str(x.collect()))\n", "for cnt,y in zip(range(len(ylist)), ylist):\n", " print('sample:' + str(cnt) + ' y = ' + str(y.collect()))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "x = [0, 1, 2, 3, 4, 5, 6]\n", "sample:0 y = [5, 4, 3]\n", "sample:1 y = [4, 0, 2]\n", "sample:2 y = [1, 2, 4]\n", "sample:3 y = [5, 6, 0]\n", "sample:4 y = [3, 1, 6]\n" ] } ], "source": [ "# takeSample\n", "x = sc.parallelize(range(7))\n", "ylist = [x.takeSample(withReplacement=False, num=3) for i in range(5)] # call 'sample' 5 times\n", "print('x = ' + str(x.collect()))\n", "for cnt,y in zip(range(len(ylist)), ylist):\n", " print('sample:' + str(cnt) + ' y = ' + str(y)) # no collect on y" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "['A', 'A', 'B']\n", "['D', 'C', 'A']\n", "['A', 'A', 'B', 'D', 'C', 'A']\n" ] } ], "source": [ "# union\n", "x = sc.parallelize(['A','A','B'])\n", "y = sc.parallelize(['D','C','A'])\n", "z = x.union(y)\n", "print(x.collect())\n", "print(y.collect())\n", "print(z.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "['A', 'A', 'B']\n", "['A', 'C', 'D']\n", "['A']\n" ] } ], "source": [ "# intersection\n", "x = sc.parallelize(['A','A','B'])\n", "y = sc.parallelize(['A','C','D'])\n", "z = x.intersection(y)\n", "print(x.collect())\n", "print(y.collect())\n", "print(z.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[('B', 1), ('A', 2), ('C', 3)]\n", "[('A', 2), ('B', 1), ('C', 3)]\n" ] } ], "source": [ "# sortByKey\n", "x = sc.parallelize([('B',1),('A',2),('C',3)])\n", "y = x.sortByKey()\n", "print(x.collect())\n", "print(y.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "['Apple', 'Bat', 'Cat']\n" ] } ], "source": [ "# sortBy\n", "x = sc.parallelize(['Cat','Apple','Bat'])\n", "def keyGen(val): return val[0]\n", "y = x.sortBy(keyGen)\n", "print(y.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "['C', 'B', 'A']\n", "[['C'], ['B', 'A']]\n" ] } ], "source": [ "# glom\n", "x = sc.parallelize(['C','B','A'], 2)\n", "y = x.glom()\n", "print(x.collect()) \n", "print(y.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "['A', 'B']\n", "['C', 'D']\n", "[('A', 'C'), ('A', 'D'), ('B', 'C'), ('B', 'D')]\n" ] } ], "source": [ "# cartesian\n", "x = sc.parallelize(['A','B'])\n", "y = sc.parallelize(['C','D'])\n", "z = x.cartesian(y)\n", "print(x.collect())\n", "print(y.collect())\n", "print(z.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "<" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 2, 3]\n", "[('A', [1, 3]), ('B', [2])]\n" ] } ], "source": [ "# groupBy\n", "x = sc.parallelize([1,2,3])\n", "y = x.groupBy(lambda x: 'A' if (x%2 == 1) else 'B' )\n", "print(x.collect())\n", "print([(j[0],[i for i in j[1]]) for j in y.collect()]) # y is nested, this iterates through it" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "['A', 'Ba', 'C', 'AD']\n", "[u'A', u'Ba', u'AD']\n" ] } ], "source": [ "# pipe\n", "x = sc.parallelize(['A', 'Ba', 'C', 'AD'])\n", "y = x.pipe('grep -i \"A\"') # calls out to grep, may fail under Windows \n", "print(x.collect())\n", "print(y.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 2, 3]\n", "None\n", "1\n", "3\n", "2\n", "\n" ] } ], "source": [ "# foreach\n", "from __future__ import print_function\n", "x = sc.parallelize([1,2,3])\n", "def f(el):\n", " '''side effect: append the current RDD elements to a file'''\n", " f1=open(\"./foreachExample.txt\", 'a+') \n", " print(el,file=f1)\n", "\n", "open('./foreachExample.txt', 'w').close() # first clear the file contents\n", "\n", "y = x.foreach(f) # writes into foreachExample.txt\n", "\n", "print(x.collect())\n", "print(y) # foreach returns 'None'\n", "# print the contents of foreachExample.txt\n", "with open(\"./foreachExample.txt\", \"r\") as foreachExample:\n", " print (foreachExample.read())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[[], [1], [], [2], [3]]\n", "None\n", "[]\n", "[1]\n", "[]\n", "[2]\n", "[3]\n", "\n" ] } ], "source": [ "# foreachPartition\n", "from __future__ import print_function\n", "x = sc.parallelize([1,2,3],5)\n", "def f(parition):\n", " '''side effect: append the current RDD partition contents to a file'''\n", " f1=open(\"./foreachPartitionExample.txt\", 'a+') \n", " print([el for el in parition],file=f1)\n", "\n", "open('./foreachPartitionExample.txt', 'w').close() # first clear the file contents\n", "\n", "y = x.foreachPartition(f) # writes into foreachExample.txt\n", "\n", "print(x.glom().collect())\n", "print(y) # foreach returns 'None'\n", "# print the contents of foreachExample.txt\n", "with open(\"./foreachPartitionExample.txt\", \"r\") as foreachExample:\n", " print (foreachExample.read())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "ParallelCollectionRDD[84] at parallelize at PythonRDD.scala:423\n", "[1, 2, 3]\n" ] } ], "source": [ "# collect\n", "x = sc.parallelize([1,2,3])\n", "y = x.collect()\n", "print(x) # distributed\n", "print(y) # not distributed" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 2, 3]\n", "6\n" ] } ], "source": [ "# reduce\n", "x = sc.parallelize([1,2,3])\n", "y = x.reduce(lambda obj, accumulated: obj + accumulated) # computes a cumulative sum\n", "print(x.collect())\n", "print(y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 2, 3]\n", "6\n" ] } ], "source": [ "# fold\n", "x = sc.parallelize([1,2,3])\n", "neutral_zero_value = 0 # 0 for sum, 1 for multiplication\n", "y = x.fold(neutral_zero_value,lambda obj, accumulated: accumulated + obj) # computes cumulative sum\n", "print(x.collect())\n", "print(y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[2, 3, 4]\n", "(9, 24)\n" ] } ], "source": [ "# aggregate\n", "x = sc.parallelize([2,3,4])\n", "neutral_zero_value = (0,1) # sum: x+0 = x, product: 1*x = x\n", "seqOp = (lambda aggregated, el: (aggregated[0] + el, aggregated[1] * el)) \n", "combOp = (lambda aggregated, el: (aggregated[0] + el[0], aggregated[1] * el[1]))\n", "y = x.aggregate(neutral_zero_value,seqOp,combOp) # computes (cumulative sum, cumulative product)\n", "print(x.collect())\n", "print(y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 3, 2]\n", "3\n" ] } ], "source": [ "# max\n", "x = sc.parallelize([1,3,2])\n", "y = x.max()\n", "print(x.collect())\n", "print(y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 3, 2]\n", "1\n" ] } ], "source": [ "# min\n", "x = sc.parallelize([1,3,2])\n", "y = x.min()\n", "print(x.collect())\n", "print(y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 3, 2]\n", "6\n" ] } ], "source": [ "# sum\n", "x = sc.parallelize([1,3,2])\n", "y = x.sum()\n", "print(x.collect())\n", "print(y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 3, 2]\n", "3\n" ] } ], "source": [ "# count\n", "x = sc.parallelize([1,3,2])\n", "y = x.count()\n", "print(x.collect())\n", "print(y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 3, 1, 2, 3]\n", "([1, 2, 3], [2, 3])\n" ] } ], "source": [ "# histogram (example #1)\n", "x = sc.parallelize([1,3,1,2,3])\n", "y = x.histogram(buckets = 2)\n", "print(x.collect())\n", "print(y)" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 3, 1, 2, 3]\n", "([0, 0.5, 1, 1.5, 2, 2.5, 3, 3.5], [0, 0, 2, 0, 1, 0, 2])\n" ] } ], "source": [ "# histogram (example #2)\n", "x = sc.parallelize([1,3,1,2,3])\n", "y = x.histogram([0,0.5,1,1.5,2,2.5,3,3.5])\n", "print(x.collect())\n", "print(y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 3, 2]\n", "2.0\n" ] } ], "source": [ "# mean\n", "x = sc.parallelize([1,3,2])\n", "y = x.mean()\n", "print(x.collect())\n", "print(y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 32, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 3, 2]\n", "0.666666666667\n" ] } ], "source": [ "# variance\n", "x = sc.parallelize([1,3,2])\n", "y = x.variance() # divides by N\n", "print(x.collect())\n", "print(y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 33, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 3, 2]\n", "0.816496580928\n" ] } ], "source": [ "# stdev\n", "x = sc.parallelize([1,3,2])\n", "y = x.stdev() # divides by N\n", "print(x.collect())\n", "print(y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 34, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 3, 2]\n", "1.0\n" ] } ], "source": [ "# sampleStdev\n", "x = sc.parallelize([1,3,2])\n", "y = x.sampleStdev() # divides by N-1\n", "print(x.collect())\n", "print(y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 35, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 3, 2]\n", "1.0\n" ] } ], "source": [ "# sampleVariance\n", "x = sc.parallelize([1,3,2])\n", "y = x.sampleVariance() # divides by N-1\n", "print(x.collect())\n", "print(y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 36, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 3, 1, 2, 3]\n", "defaultdict(, {1: 2, 2: 1, 3: 2})\n" ] } ], "source": [ "# countByValue\n", "x = sc.parallelize([1,3,1,2,3])\n", "y = x.countByValue()\n", "print(x.collect())\n", "print(y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 37, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 3, 1, 2, 3]\n", "[3, 3, 2]\n" ] } ], "source": [ "# top\n", "x = sc.parallelize([1,3,1,2,3])\n", "y = x.top(num = 3)\n", "print(x.collect())\n", "print(y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 38, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 3, 1, 2, 3]\n", "[1, 1, 2]\n" ] } ], "source": [ "# takeOrdered\n", "x = sc.parallelize([1,3,1,2,3])\n", "y = x.takeOrdered(num = 3)\n", "print(x.collect())\n", "print(y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 39, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 3, 1, 2, 3]\n", "[1, 3, 1]\n" ] } ], "source": [ "# take\n", "x = sc.parallelize([1,3,1,2,3])\n", "y = x.take(num = 3)\n", "print(x.collect())\n", "print(y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 40, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 3, 1, 2, 3]\n", "1\n" ] } ], "source": [ "# first\n", "x = sc.parallelize([1,3,1,2,3])\n", "y = x.first()\n", "print(x.collect())\n", "print(y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 41, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[('C', 3), ('A', 1), ('B', 2)]\n", "{'A': 1, 'C': 3, 'B': 2}\n" ] } ], "source": [ "# collectAsMap\n", "x = sc.parallelize([('C',3),('A',1),('B',2)])\n", "y = x.collectAsMap()\n", "print(x.collect())\n", "print(y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 42, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[('C', 3), ('A', 1), ('B', 2)]\n", "['C', 'A', 'B']\n" ] } ], "source": [ "# keys\n", "x = sc.parallelize([('C',3),('A',1),('B',2)])\n", "y = x.keys()\n", "print(x.collect())\n", "print(y.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 43, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[('C', 3), ('A', 1), ('B', 2)]\n", "[3, 1, 2]\n" ] } ], "source": [ "# values\n", "x = sc.parallelize([('C',3),('A',1),('B',2)])\n", "y = x.values()\n", "print(x.collect())\n", "print(y.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 44, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]\n", "[('A', 12), ('B', 3)]\n" ] } ], "source": [ "# reduceByKey\n", "x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])\n", "y = x.reduceByKey(lambda agg, obj: agg + obj)\n", "print(x.collect())\n", "print(y.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 45, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]\n", "{'A': 12, 'B': 3}\n" ] } ], "source": [ "# reduceByKeyLocally\n", "x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])\n", "y = x.reduceByKeyLocally(lambda agg, obj: agg + obj)\n", "print(x.collect())\n", "print(y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 46, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]\n", "defaultdict(, {'A': 3, 'B': 2})\n" ] } ], "source": [ "# countByKey\n", "x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])\n", "y = x.countByKey()\n", "print(x.collect())\n", "print(y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 47, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[('C', 4), ('B', 3), ('A', 2), ('A', 1)]\n", "[('A', 8), ('B', 7), ('A', 6), ('D', 5)]\n", "[('A', (2, 8)), ('A', (2, 6)), ('A', (1, 8)), ('A', (1, 6)), ('B', (3, 7))]\n" ] } ], "source": [ "# join\n", "x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)])\n", "y = sc.parallelize([('A',8),('B',7),('A',6),('D',5)])\n", "z = x.join(y)\n", "print(x.collect())\n", "print(y.collect())\n", "print(z.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 48, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[('C', 4), ('B', 3), ('A', 2), ('A', 1)]\n", "[('A', 8), ('B', 7), ('A', 6), ('D', 5)]\n", "[('A', (2, 8)), ('A', (2, 6)), ('A', (1, 8)), ('A', (1, 6)), ('C', (4, None)), ('B', (3, 7))]\n" ] } ], "source": [ "# leftOuterJoin\n", "x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)])\n", "y = sc.parallelize([('A',8),('B',7),('A',6),('D',5)])\n", "z = x.leftOuterJoin(y)\n", "print(x.collect())\n", "print(y.collect())\n", "print(z.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 49, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[('C', 4), ('B', 3), ('A', 2), ('A', 1)]\n", "[('A', 8), ('B', 7), ('A', 6), ('D', 5)]\n", "[('A', (2, 8)), ('A', (2, 6)), ('A', (1, 8)), ('A', (1, 6)), ('B', (3, 7)), ('D', (None, 5))]\n" ] } ], "source": [ "# rightOuterJoin\n", "x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)])\n", "y = sc.parallelize([('A',8),('B',7),('A',6),('D',5)])\n", "z = x.rightOuterJoin(y)\n", "print(x.collect())\n", "print(y.collect())\n", "print(z.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 50, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[[(0, 1)], [(1, 2), (2, 3)]]\n", "[[(0, 1)], [(1, 2)], [(2, 3)]]\n" ] } ], "source": [ "# partitionBy\n", "x = sc.parallelize([(0,1),(1,2),(2,3)],2)\n", "y = x.partitionBy(numPartitions = 3, partitionFunc = lambda x: x) # only key is passed to paritionFunc\n", "print(x.glom().collect())\n", "print(y.glom().collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 51, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]\n", "[('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])]\n" ] } ], "source": [ "# combineByKey\n", "x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])\n", "createCombiner = (lambda el: [(el,el**2)]) \n", "mergeVal = (lambda aggregated, el: aggregated + [(el,el**2)]) # append to aggregated\n", "mergeComb = (lambda agg1,agg2: agg1 + agg2 ) # append agg1 with agg2\n", "y = x.combineByKey(createCombiner,mergeVal,mergeComb)\n", "print(x.collect())\n", "print(y.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 52, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]\n", "[('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])]\n" ] } ], "source": [ "# aggregateByKey\n", "x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])\n", "zeroValue = [] # empty list is 'zero value' for append operation\n", "mergeVal = (lambda aggregated, el: aggregated + [(el,el**2)])\n", "mergeComb = (lambda agg1,agg2: agg1 + agg2 )\n", "y = x.aggregateByKey(zeroValue,mergeVal,mergeComb)\n", "print(x.collect())\n", "print(y.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 53, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]\n", "[('A', 60), ('B', 2)]\n" ] } ], "source": [ "# foldByKey\n", "x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])\n", "zeroValue = 1 # one is 'zero value' for multiplication\n", "y = x.foldByKey(zeroValue,lambda agg,x: agg*x ) # computes cumulative product within each key\n", "print(x.collect())\n", "print(y.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 54, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[('B', 5), ('B', 4), ('A', 3), ('A', 2), ('A', 1)]\n", "[('A', [3, 2, 1]), ('B', [5, 4])]\n" ] } ], "source": [ "# groupByKey\n", "x = sc.parallelize([('B',5),('B',4),('A',3),('A',2),('A',1)])\n", "y = x.groupByKey()\n", "print(x.collect())\n", "print([(j[0],[i for i in j[1]]) for j in y.collect()])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 55, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[('A', (1, 2, 3)), ('B', (4, 5))]\n", "[('A', 1), ('A', 4), ('A', 9), ('B', 16), ('B', 25)]\n" ] } ], "source": [ "# flatMapValues\n", "x = sc.parallelize([('A',(1,2,3)),('B',(4,5))])\n", "y = x.flatMapValues(lambda x: [i**2 for i in x]) # function is applied to entire value, then result is flattened\n", "print(x.collect())\n", "print(y.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 56, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[('A', (1, 2, 3)), ('B', (4, 5))]\n", "[('A', [1, 4, 9]), ('B', [16, 25])]\n" ] } ], "source": [ "# mapValues\n", "x = sc.parallelize([('A',(1,2,3)),('B',(4,5))])\n", "y = x.mapValues(lambda x: [i**2 for i in x]) # function is applied to entire value\n", "print(x.collect())\n", "print(y.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 57, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[('C', 4), ('B', (3, 3)), ('A', 2), ('A', (1, 1))]\n", "[('B', (7, 7)), ('A', 6), ('D', (5, 5))]\n", "[('D', 9), ('B', (8, 8))]\n", "Result:\n", "D [[], [(5, 5)], [9]]\n", "C [[4], [], []]\n", "B [[(3, 3)], [(7, 7)], [(8, 8)]]\n", "A [[2, (1, 1)], [6], []]\n" ] } ], "source": [ "# groupWith\n", "x = sc.parallelize([('C',4),('B',(3,3)),('A',2),('A',(1,1))])\n", "y = sc.parallelize([('B',(7,7)),('A',6),('D',(5,5))])\n", "z = sc.parallelize([('D',9),('B',(8,8))])\n", "a = x.groupWith(y,z)\n", "print(x.collect())\n", "print(y.collect())\n", "print(z.collect())\n", "print(\"Result:\")\n", "for key,val in list(a.collect()): \n", " print(key, [list(i) for i in val])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 58, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[('C', 4), ('B', (3, 3)), ('A', 2), ('A', (1, 1))]\n", "[('A', 8), ('B', 7), ('A', 6), ('D', (5, 5))]\n", "A [[2, (1, 1)], [8, 6]]\n", "C [[4], []]\n", "B [[(3, 3)], [7]]\n", "D [[], [(5, 5)]]\n" ] } ], "source": [ "# cogroup\n", "x = sc.parallelize([('C',4),('B',(3,3)),('A',2),('A',(1,1))])\n", "y = sc.parallelize([('A',8),('B',7),('A',6),('D',(5,5))])\n", "z = x.cogroup(y)\n", "print(x.collect())\n", "print(y.collect())\n", "for key,val in list(z.collect()):\n", " print(key, [list(i) for i in val])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 59, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[('A', 1), ('B', 2), ('C', 3), ('B', 4), ('A', 5)]\n", "[('A', 1), ('B', 2), ('B', 4)]\n" ] } ], "source": [ "# sampleByKey\n", "x = sc.parallelize([('A',1),('B',2),('C',3),('B',4),('A',5)])\n", "y = x.sampleByKey(withReplacement=False, fractions={'A':0.5, 'B':1, 'C':0.2})\n", "print(x.collect())\n", "print(y.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 60, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[('C', 1), ('B', 2), ('A', 3), ('A', 4)]\n", "[('A', 5), ('D', 6), ('A', 7), ('D', 8)]\n", "[('C', 1), ('B', 2)]\n" ] } ], "source": [ "# subtractByKey\n", "x = sc.parallelize([('C',1),('B',2),('A',3),('A',4)])\n", "y = sc.parallelize([('A',5),('D',6),('A',7),('D',8)])\n", "z = x.subtractByKey(y)\n", "print(x.collect())\n", "print(y.collect())\n", "print(z.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 61, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[('C', 4), ('B', 3), ('A', 2), ('A', 1)]\n", "[('C', 8), ('A', 2), ('D', 1)]\n", "[('A', 1), ('C', 4), ('B', 3)]\n" ] } ], "source": [ "# subtract\n", "x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)])\n", "y = sc.parallelize([('C',8),('A',2),('D',1)])\n", "z = x.subtract(y)\n", "print(x.collect())\n", "print(y.collect())\n", "print(z.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 62, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 2, 3]\n", "[(1, 1), (4, 2), (9, 3)]\n" ] } ], "source": [ "# keyBy\n", "x = sc.parallelize([1,2,3])\n", "y = x.keyBy(lambda x: x**2)\n", "print(x.collect())\n", "print(y.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 63, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[[1, 2], [3, 4, 5]]\n", "[[], [1, 2, 3, 4], [5]]\n" ] } ], "source": [ "# repartition\n", "x = sc.parallelize([1,2,3,4,5],2)\n", "y = x.repartition(numPartitions=3)\n", "print(x.glom().collect())\n", "print(y.glom().collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 64, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[[1, 2], [3, 4, 5]]\n", "[[1, 2, 3, 4, 5]]\n" ] } ], "source": [ "# coalesce\n", "x = sc.parallelize([1,2,3,4,5],2)\n", "y = x.coalesce(numPartitions=1)\n", "print(x.glom().collect())\n", "print(y.glom().collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 65, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "['B', 'A', 'A']\n", "[66, 65, 65]\n", "[('B', 66), ('A', 65), ('A', 65)]\n" ] } ], "source": [ "# zip\n", "x = sc.parallelize(['B','A','A'])\n", "y = x.map(lambda x: ord(x)) # zip expects x and y to have same #partitions and #elements/partition\n", "z = x.zip(y)\n", "print(x.collect())\n", "print(y.collect())\n", "print(z.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 66, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[['B'], ['A', 'A']]\n", "[('B', 0), ('A', 1), ('A', 2)]\n" ] } ], "source": [ "# zipWithIndex\n", "x = sc.parallelize(['B','A','A'],2)\n", "y = x.zipWithIndex()\n", "print(x.glom().collect())\n", "print(y.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "" ] }, { "cell_type": "code", "execution_count": 67, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[['B'], ['A', 'A']]\n", "[('B', 0), ('A', 1), ('A', 3)]\n" ] } ], "source": [ "# zipWithUniqueId\n", "x = sc.parallelize(['B','A','A'],2)\n", "y = x.zipWithUniqueId()\n", "print(x.glom().collect())\n", "print(y.collect())" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" } }, "nbformat": 4, "nbformat_minor": 1 }